From 9a1af76df8dba1752836c445335e3f3e37c9b9e2 Mon Sep 17 00:00:00 2001 From: Edward Capriolo Date: Wed, 14 Sep 2016 11:16:57 -0400 Subject: [PATCH 1/3] Run tests --- pom.xml | 86 ++++++++++--------- .../apache/gossip/ShutdownDeadtimeTest.java | 8 +- .../apache/gossip/StartupSettingsTest.java | 3 + .../apache/gossip/TenNodeThreeSeedTest.java | 5 +- .../RandomGossipManagerBuilderTest.java | 4 +- 5 files changed, 62 insertions(+), 44 deletions(-) diff --git a/pom.xml b/pom.xml index 912da3b..5b80e17 100644 --- a/pom.xml +++ b/pom.xml @@ -41,6 +41,8 @@ 1.8.0 5.0.0-M2 + 1.0.0-M2 + 4.12.0-M2 1.2.17 0.0.0 @@ -78,15 +80,36 @@ jackson-datatype-json-org ${jackson-datatype-json-org.version} - org.junit.jupiter junit-jupiter-api ${junit.jupiter.version} test - - + + org.junit.jupiter + junit-jupiter-engine + ${junit.jupiter.version} + test + + + org.junit.vintage + junit-vintage-engine + ${junit.vintage.version} + test + + + org.junit.platform + junit-platform-runner + ${junit.platform.version} + test + + + org.junit.platform + junit-platform-runner + ${junit.platform.version} + test + io.teknek tunit @@ -119,24 +142,6 @@ - - - org.apache.maven.plugins - maven-gpg-plugin - ${maven-gpg-plugin.version} - - - sign-artifacts - verify - - sign - - - - org.apache.maven.plugins maven-jar-plugin @@ -151,24 +156,6 @@ - - org.apache.maven.plugins - maven-eclipse-plugin - ${maven-eclipse-plugin.version} - - [artifactId] - true - true - - org.eclipse.jdt.core.javabuilder - org.maven.ide.eclipse.maven2Builder - - - org.eclipse.jdt.core.javanature - org.maven.ide.eclipse.maven2Nature - - - org.apache.maven.plugins maven-compiler-plugin @@ -186,6 +173,27 @@ + + + maven-compiler-plugin + 3.1 + + ${java.version} + ${java.version} + + + + maven-surefire-plugin + 2.19.1 + + + org.junit.platform + junit-platform-surefire-provider + ${junit.platform.version} + + + + diff --git a/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java b/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java index 251550b..f0d7f10 100644 --- a/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java +++ b/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java @@ -34,13 +34,17 @@ import org.apache.log4j.Logger; import org.apache.gossip.event.GossipListener; import org.apache.gossip.event.GossipState; +import org.junit.platform.runner.JUnitPlatform; import org.junit.jupiter.api.Test; +import org.junit.runner.RunWith; + +@RunWith(JUnitPlatform.class) public class ShutdownDeadtimeTest { - private static final Logger log = Logger.getLogger(ShutdownDeadtimeTest.class ); + private static final Logger log = Logger.getLogger(ShutdownDeadtimeTest.class); + @Test - //@Ignore public void DeadNodesDoNotComeAliveAgain() throws InterruptedException, UnknownHostException, URISyntaxException { GossipSettings settings = new GossipSettings(1000, 10000); String cluster = UUID.randomUUID().toString(); diff --git a/src/test/java/org/apache/gossip/StartupSettingsTest.java b/src/test/java/org/apache/gossip/StartupSettingsTest.java index ed069c3..9019ac1 100644 --- a/src/test/java/org/apache/gossip/StartupSettingsTest.java +++ b/src/test/java/org/apache/gossip/StartupSettingsTest.java @@ -32,11 +32,14 @@ import java.util.ArrayList; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; +import org.junit.platform.runner.JUnitPlatform; +import org.junit.runner.RunWith; /** * Tests support of using {@code StartupSettings} and thereby reading * setup config from file. */ +@RunWith(JUnitPlatform.class) public class StartupSettingsTest { private static final Logger log = Logger.getLogger( StartupSettingsTest.class ); private static final String CLUSTER = UUID.randomUUID().toString(); diff --git a/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java b/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java index 350fc6f..c98b0d3 100644 --- a/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java +++ b/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java @@ -27,14 +27,15 @@ import java.util.List; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; - - +import org.junit.platform.runner.JUnitPlatform; +import org.junit.runner.RunWith; import org.apache.log4j.Logger; import org.apache.gossip.event.GossipListener; import org.apache.gossip.event.GossipState; import org.junit.jupiter.api.Test; +@RunWith(JUnitPlatform.class) public class TenNodeThreeSeedTest { private static final Logger log = Logger.getLogger( TenNodeThreeSeedTest.class ); diff --git a/src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java b/src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java index 6c63516..ab3242c 100644 --- a/src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java +++ b/src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java @@ -24,6 +24,8 @@ import org.apache.gossip.event.GossipListener; import org.apache.gossip.event.GossipState; import org.apache.gossip.manager.random.RandomGossipManager; import org.junit.jupiter.api.Test; +import org.junit.platform.runner.JUnitPlatform; +import org.junit.runner.RunWith; import javax.management.Notification; import javax.management.NotificationListener; @@ -37,7 +39,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.expectThrows; - +@RunWith(JUnitPlatform.class) public class RandomGossipManagerBuilderTest { public static class TestGossipListener implements GossipListener { From 375cee2a3b899931e3aa5372da1199f49484b44c Mon Sep 17 00:00:00 2001 From: Edward Capriolo Date: Fri, 23 Sep 2016 22:13:40 -0400 Subject: [PATCH 2/3] GOSSIP-15 avoid busy loop (ChiaHung Lin via egc) --- .../java/org/apache/gossip/GossipService.java | 4 +- .../gossip/manager/ActiveGossipThread.java | 124 +++++++++++++----- .../apache/gossip/manager/GossipManager.java | 17 +-- .../random/RandomActiveGossipThread.java | 120 ----------------- 4 files changed, 99 insertions(+), 166 deletions(-) delete mode 100644 src/main/java/org/apache/gossip/manager/random/RandomActiveGossipThread.java diff --git a/src/main/java/org/apache/gossip/GossipService.java b/src/main/java/org/apache/gossip/GossipService.java index ce15992..68a4ca2 100644 --- a/src/main/java/org/apache/gossip/GossipService.java +++ b/src/main/java/org/apache/gossip/GossipService.java @@ -70,8 +70,8 @@ public class GossipService { } public void start() { - LOGGER.debug("Starting: " + gossipManager.getName() + " - " + get_gossipManager().getMyself().getUri()); - gossipManager.start(); + LOGGER.debug("Starting: " + get_gossipManager().getMyself().getUri()); + gossipManager.init(); } public void shutdown() { diff --git a/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java b/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java index 181d9ae..b57c25a 100644 --- a/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java +++ b/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java @@ -17,59 +17,103 @@ */ package org.apache.gossip.manager; +import java.io.IOException; +import java.net.DatagramSocket; import java.util.List; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - import org.apache.gossip.GossipService; import org.apache.gossip.LocalGossipMember; +import org.apache.gossip.model.ActiveGossipOk; +import org.apache.gossip.model.GossipMember; +import org.apache.gossip.model.Response; +import org.apache.gossip.udp.UdpActiveGossipMessage; +import org.apache.log4j.Logger; +import org.codehaus.jackson.map.ObjectMapper; /** * [The active thread: periodically send gossip request.] The class handles gossiping the membership * list. This information is important to maintaining a common state among all the nodes, and is * important for detecting failures. */ -abstract public class ActiveGossipThread implements Runnable { +public class ActiveGossipThread { + public static final Logger LOGGER = Logger.getLogger(ActiveGossipThread.class); + + private ScheduledExecutorService scheduledExecutorService ; + private ObjectMapper MAPPER = new ObjectMapper(); + private final Random random; protected final GossipManager gossipManager; + private final GossipCore gossipCore; - private final AtomicBoolean keepRunning; - - public ActiveGossipThread(GossipManager gossipManager) { + public ActiveGossipThread(GossipManager gossipManager, GossipCore gossipCore) { this.gossipManager = gossipManager; - this.keepRunning = new AtomicBoolean(true); + this.scheduledExecutorService = Executors.newScheduledThreadPool(1024); + this.gossipCore = gossipCore; + this.random = new Random(); } - @Override - public void run() { - while (keepRunning.get()) { - try { - TimeUnit.MILLISECONDS.sleep(gossipManager.getSettings().getGossipInterval()); - - // contact a live member. - sendMembershipList(gossipManager.getMyself(), gossipManager.getLiveMembers()); - - // contact a dead member. - sendMembershipList(gossipManager.getMyself(), gossipManager.getDeadMembers()); - - } catch (InterruptedException e) { - GossipService.LOGGER.error(e); - keepRunning.set(false); - } - } - shutdown(); + public void init() { + scheduledExecutorService.scheduleAtFixedRate( + () -> sendMembershipList(gossipManager.getMyself(), gossipManager.getLiveMembers()), 0, + gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS); + scheduledExecutorService.scheduleAtFixedRate( + () -> sendMembershipList(gossipManager.getMyself(), gossipManager.getDeadMembers()), 0, + gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS); } - + public void shutdown() { - keepRunning.set(false); + this.scheduledExecutorService.shutdown(); + try { + this.scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOGGER.warn("Did not complete shutdown", e); + } } /** * Performs the sending of the membership list, after we have incremented our own heartbeat. */ - abstract protected void sendMembershipList(LocalGossipMember me, - List memberList); - + protected void sendMembershipList(LocalGossipMember me, List memberList) { + + me.setHeartbeat(System.currentTimeMillis()); + LocalGossipMember member = selectPartner(memberList); + if (member == null) { + GossipService.LOGGER.debug("Send sendMembershipList() is called without action"); + return; + } else { + GossipService.LOGGER.debug("Send sendMembershipList() is called to " + member.toString()); + } + + try (DatagramSocket socket = new DatagramSocket()) { + socket.setSoTimeout(gossipManager.getSettings().getGossipInterval()); + UdpActiveGossipMessage message = new UdpActiveGossipMessage(); + message.setUriFrom(gossipManager.getMyself().getUri().toASCIIString()); + message.setUuid(UUID.randomUUID().toString()); + message.getMembers().add(convert(me)); + for (LocalGossipMember other : memberList) { + message.getMembers().add(convert(other)); + } + byte[] json_bytes = MAPPER.writeValueAsString(message).getBytes(); + int packet_length = json_bytes.length; + if (packet_length < GossipManager.MAX_PACKET_SIZE) { + Response r = gossipCore.send(message, member.getUri()); + if (r instanceof ActiveGossipOk){ + //maybe count metrics here + } else { + LOGGER.warn("Message "+ message + " generated response "+ r); + } + } else { + GossipService.LOGGER.error("The length of the to be send message is too large (" + + packet_length + " > " + GossipManager.MAX_PACKET_SIZE + ")."); + } + } catch (IOException e1) { + GossipService.LOGGER.warn(e1); + } + } /** * Abstract method which should be implemented by a subclass. This method should return a member * of the list to gossip with. @@ -78,5 +122,23 @@ abstract public class ActiveGossipThread implements Runnable { * The list of members which are stored in the local list of members. * @return The chosen LocalGossipMember to gossip with. */ - abstract protected LocalGossipMember selectPartner(List memberList); + protected LocalGossipMember selectPartner(List memberList) { + LocalGossipMember member = null; + if (memberList.size() > 0) { + int randomNeighborIndex = random.nextInt(memberList.size()); + member = memberList.get(randomNeighborIndex); + } else { + LOGGER.debug("I am alone in this world."); + } + return member; + } + + private GossipMember convert(LocalGossipMember member){ + GossipMember gm = new GossipMember(); + gm.setCluster(member.getClusterName()); + gm.setHeartbeat(member.getHeartbeat()); + gm.setUri(member.getUri().toASCIIString()); + gm.setId(member.getId()); + return gm; + } } diff --git a/src/main/java/org/apache/gossip/manager/GossipManager.java b/src/main/java/org/apache/gossip/manager/GossipManager.java index 79be431..0b2cfd2 100644 --- a/src/main/java/org/apache/gossip/manager/GossipManager.java +++ b/src/main/java/org/apache/gossip/manager/GossipManager.java @@ -40,9 +40,8 @@ import org.apache.gossip.LocalGossipMember; import org.apache.gossip.event.GossipListener; import org.apache.gossip.event.GossipState; import org.apache.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread; -import org.apache.gossip.manager.random.RandomActiveGossipThread; -public abstract class GossipManager extends Thread implements NotificationListener { +public abstract class GossipManager implements NotificationListener { public static final Logger LOGGER = Logger.getLogger(GossipManager.class); @@ -179,7 +178,7 @@ public abstract class GossipManager extends Thread implements NotificationListen * Starts the client. Specifically, start the various cycles for this protocol. Start the gossip * thread and start the receiver thread. */ - public void run() { + public void init() { for (LocalGossipMember member : members.keySet()) { if (member != me) { member.startTimeoutTimer(); @@ -187,17 +186,9 @@ public abstract class GossipManager extends Thread implements NotificationListen } passiveGossipThread = new OnlyProcessReceivedPassiveGossipThread(this, gossipCore); gossipThreadExecutor.execute(passiveGossipThread); - activeGossipThread = new RandomActiveGossipThread(this, this.gossipCore); - gossipThreadExecutor.execute(activeGossipThread); + activeGossipThread = new ActiveGossipThread(this, this.gossipCore); + activeGossipThread.init(); GossipService.LOGGER.debug("The GossipService is started."); - while (gossipServiceRunning.get()) { - try { - // TODO - TimeUnit.MILLISECONDS.sleep(1); - } catch (InterruptedException e) { - GossipService.LOGGER.warn("The GossipClient was interrupted."); - } - } } /** diff --git a/src/main/java/org/apache/gossip/manager/random/RandomActiveGossipThread.java b/src/main/java/org/apache/gossip/manager/random/RandomActiveGossipThread.java deleted file mode 100644 index 03d550c..0000000 --- a/src/main/java/org/apache/gossip/manager/random/RandomActiveGossipThread.java +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gossip.manager.random; - -import java.io.IOException; -import java.net.DatagramSocket; -import java.net.InetAddress; -import java.nio.ByteBuffer; -import java.util.List; -import java.util.Random; -import java.util.UUID; - -import org.apache.gossip.GossipService; -import org.apache.gossip.LocalGossipMember; -import org.apache.gossip.manager.ActiveGossipThread; -import org.apache.gossip.manager.GossipCore; -import org.apache.gossip.manager.GossipManager; -import org.apache.gossip.model.ActiveGossipOk; -import org.apache.gossip.model.GossipMember; -import org.apache.gossip.model.Response; -import org.apache.gossip.udp.UdpActiveGossipMessage; -import org.apache.log4j.Logger; -import org.codehaus.jackson.map.ObjectMapper; - -public class RandomActiveGossipThread extends ActiveGossipThread { - - public static final Logger LOGGER = Logger.getLogger(RandomActiveGossipThread.class); - protected ObjectMapper MAPPER = new ObjectMapper(); - - /** The Random used for choosing a member to gossip with. */ - private final Random random; - private final GossipCore gossipCore; - - public RandomActiveGossipThread(GossipManager gossipManager, GossipCore gossipCore) { - super(gossipManager); - random = new Random(); - this.gossipCore = gossipCore; - } - - /** - * [The selectToSend() function.] Find a random peer from the local membership list. In the case - * where this client is the only member in the list, this method will return null. - * - * @return Member random member if list is greater than 1, null otherwise - */ - protected LocalGossipMember selectPartner(List memberList) { - LocalGossipMember member = null; - if (memberList.size() > 0) { - int randomNeighborIndex = random.nextInt(memberList.size()); - member = memberList.get(randomNeighborIndex); - } else { - GossipService.LOGGER.debug("I am alone in this world."); - - } - return member; - } - - protected void sendMembershipList(LocalGossipMember me, List memberList) { - - me.setHeartbeat(System.currentTimeMillis()); - LocalGossipMember member = selectPartner(memberList); - if (member == null) { - GossipService.LOGGER.debug("Send sendMembershipList() is called without action"); - return; - } else { - GossipService.LOGGER.debug("Send sendMembershipList() is called to " + member.toString()); - } - - try (DatagramSocket socket = new DatagramSocket()) { - socket.setSoTimeout(gossipManager.getSettings().getGossipInterval()); - UdpActiveGossipMessage message = new UdpActiveGossipMessage(); - message.setUriFrom(gossipManager.getMyself().getUri().toASCIIString()); - message.setUuid(UUID.randomUUID().toString()); - message.getMembers().add(convert(me)); - for (LocalGossipMember other : memberList) { - message.getMembers().add(convert(other)); - } - byte[] json_bytes = MAPPER.writeValueAsString(message).getBytes(); - int packet_length = json_bytes.length; - if (packet_length < GossipManager.MAX_PACKET_SIZE) { - Response r = gossipCore.send(message, member.getUri()); - if (r instanceof ActiveGossipOk){ - //maybe count metrics here - } else { - LOGGER.warn("Message "+ message + " generated response "+ r); - } - } else { - GossipService.LOGGER.error("The length of the to be send message is too large (" - + packet_length + " > " + GossipManager.MAX_PACKET_SIZE + ")."); - } - } catch (IOException e1) { - GossipService.LOGGER.warn(e1); - } - } - - private GossipMember convert(LocalGossipMember member){ - GossipMember gm = new GossipMember(); - gm.setCluster(member.getClusterName()); - gm.setHeartbeat(member.getHeartbeat()); - gm.setUri(member.getUri().toASCIIString()); - gm.setId(member.getId()); - return gm; - } - -} From 5590758c423558144a06810b2dae41cd47428856 Mon Sep 17 00:00:00 2001 From: Edward Capriolo Date: Fri, 23 Sep 2016 22:34:43 -0400 Subject: [PATCH 3/3] Duplicate definition in pom file --- pom.xml | 6 ------ 1 file changed, 6 deletions(-) diff --git a/pom.xml b/pom.xml index 5b80e17..581c8b1 100644 --- a/pom.xml +++ b/pom.xml @@ -104,12 +104,6 @@ ${junit.platform.version} test - - org.junit.platform - junit-platform-runner - ${junit.platform.version} - test - io.teknek tunit