diff --git a/pom.xml b/pom.xml index 912da3b..581c8b1 100644 --- a/pom.xml +++ b/pom.xml @@ -41,6 +41,8 @@ 1.8.0 5.0.0-M2 + 1.0.0-M2 + 4.12.0-M2 1.2.17 0.0.0 @@ -78,15 +80,30 @@ jackson-datatype-json-org ${jackson-datatype-json-org.version} - org.junit.jupiter junit-jupiter-api ${junit.jupiter.version} test - - + + org.junit.jupiter + junit-jupiter-engine + ${junit.jupiter.version} + test + + + org.junit.vintage + junit-vintage-engine + ${junit.vintage.version} + test + + + org.junit.platform + junit-platform-runner + ${junit.platform.version} + test + io.teknek tunit @@ -119,24 +136,6 @@ - - - org.apache.maven.plugins - maven-gpg-plugin - ${maven-gpg-plugin.version} - - - sign-artifacts - verify - - sign - - - - org.apache.maven.plugins maven-jar-plugin @@ -151,24 +150,6 @@ - - org.apache.maven.plugins - maven-eclipse-plugin - ${maven-eclipse-plugin.version} - - [artifactId] - true - true - - org.eclipse.jdt.core.javabuilder - org.maven.ide.eclipse.maven2Builder - - - org.eclipse.jdt.core.javanature - org.maven.ide.eclipse.maven2Nature - - - org.apache.maven.plugins maven-compiler-plugin @@ -186,6 +167,27 @@ + + + maven-compiler-plugin + 3.1 + + ${java.version} + ${java.version} + + + + maven-surefire-plugin + 2.19.1 + + + org.junit.platform + junit-platform-surefire-provider + ${junit.platform.version} + + + + diff --git a/src/main/java/org/apache/gossip/GossipService.java b/src/main/java/org/apache/gossip/GossipService.java index 6f9b5be..6c02e2c 100644 --- a/src/main/java/org/apache/gossip/GossipService.java +++ b/src/main/java/org/apache/gossip/GossipService.java @@ -71,8 +71,8 @@ public class GossipService { } public void start() { - LOGGER.debug("Starting: " + gossipManager.getName() + " - " + get_gossipManager().getMyself().getUri()); - gossipManager.start(); + LOGGER.debug("Starting: " + get_gossipManager().getMyself().getUri()); + gossipManager.init(); } public void shutdown() { diff --git a/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java b/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java index 4790c09..19caffe 100644 --- a/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java +++ b/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java @@ -20,6 +20,7 @@ package org.apache.gossip.manager; import java.io.IOException; import java.net.DatagramSocket; import java.util.List; + import java.util.Map.Entry; import java.util.Random; import java.util.UUID; @@ -28,7 +29,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import org.apache.gossip.GossipService; import org.apache.gossip.LocalGossipMember; import org.apache.gossip.model.ActiveGossipOk; import org.apache.gossip.model.GossipDataMessage; @@ -36,6 +36,7 @@ import org.apache.gossip.model.GossipMember; import org.apache.gossip.model.Response; import org.apache.gossip.udp.UdpActiveGossipMessage; import org.apache.gossip.udp.UdpGossipDataMessage; + import org.apache.log4j.Logger; import org.codehaus.jackson.map.ObjectMapper; @@ -60,60 +61,32 @@ public class ActiveGossipThread { this.gossipCore = gossipCore; this.scheduledExecutorService = Executors.newScheduledThreadPool(1024); } - - public void init(){ - Runnable liveGossip = new Runnable(){ - @Override - public void run() { - try { - sendMembershipList(gossipManager.getMyself(), gossipManager.getLiveMembers()); - } catch (RuntimeException ex){ - LOGGER.warn(ex); - } - } - }; - scheduledExecutorService.scheduleAtFixedRate(liveGossip, 0, + + public void init() { + scheduledExecutorService.scheduleAtFixedRate( + () -> sendMembershipList(gossipManager.getMyself(), gossipManager.getLiveMembers()), 0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS); - Runnable deadGossip = new Runnable(){ - @Override - public void run() { - try { - sendMembershipList(gossipManager.getMyself(), gossipManager.getDeadMembers()); - } catch (RuntimeException ex){ - LOGGER.warn(ex); - } - } - }; - scheduledExecutorService.scheduleAtFixedRate(deadGossip, 0, + scheduledExecutorService.scheduleAtFixedRate( + () -> sendMembershipList(gossipManager.getMyself(), gossipManager.getDeadMembers()), 0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS); - Runnable dataGossip = new Runnable(){ - @Override - public void run() { - try { - sendData(gossipManager.getMyself(), gossipManager.getLiveMembers()); - } catch (RuntimeException ex){ - LOGGER.warn(ex); - } - } - }; - scheduledExecutorService.scheduleAtFixedRate(dataGossip, 0, + scheduledExecutorService.scheduleAtFixedRate( + () -> sendData(gossipManager.getMyself(), gossipManager.getLiveMembers()), 0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS); - } - + public void shutdown() { scheduledExecutorService.shutdown(); try { scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS); } catch (InterruptedException e) { - LOGGER.warn(e); + LOGGER.debug("Issue during shurdown" + e); } } public void sendData(LocalGossipMember me, List memberList){ LocalGossipMember member = selectPartner(memberList); if (member == null) { - GossipService.LOGGER.debug("Send sendMembershipList() is called without action"); + LOGGER.debug("Send sendMembershipList() is called without action"); return; } try (DatagramSocket socket = new DatagramSocket()) { @@ -121,7 +94,6 @@ public class ActiveGossipThread { for (Entry> entry : gossipCore.getPerNodeData().entrySet()){ for (Entry innerEntry : entry.getValue().entrySet()){ UdpGossipDataMessage message = new UdpGossipDataMessage(); - System.out.println("sending message " + message); message.setUuid(UUID.randomUUID().toString()); message.setUriFrom(me.getId()); message.setExpireAt(innerEntry.getValue().getExpireAt()); @@ -133,32 +105,29 @@ public class ActiveGossipThread { byte[] json_bytes = MAPPER.writeValueAsString(message).getBytes(); int packet_length = json_bytes.length; if (packet_length < GossipManager.MAX_PACKET_SIZE) { - //Response r = gossipCore.send(message, member.getUri()); gossipCore.sendOneWay(message, member.getUri()); - //TODO: ack this message } else { - GossipService.LOGGER.error("The length of the to be send message is too large (" + LOGGER.error("The length of the to be send message is too large (" + packet_length + " > " + GossipManager.MAX_PACKET_SIZE + ")."); } } } } catch (IOException e1) { - GossipService.LOGGER.warn(e1); + LOGGER.warn(e1); } } /** * Performs the sending of the membership list, after we have incremented our own heartbeat. */ - protected void sendMembershipList(LocalGossipMember me, List memberList) { - + protected void sendMembershipList(LocalGossipMember me, List memberList) { me.setHeartbeat(System.currentTimeMillis()); LocalGossipMember member = selectPartner(memberList); if (member == null) { - GossipService.LOGGER.debug("Send sendMembershipList() is called without action"); + LOGGER.debug("Send sendMembershipList() is called without action"); return; } else { - GossipService.LOGGER.debug("Send sendMembershipList() is called to " + member.toString()); + LOGGER.debug("Send sendMembershipList() is called to " + member.toString()); } try (DatagramSocket socket = new DatagramSocket()) { @@ -180,17 +149,15 @@ public class ActiveGossipThread { LOGGER.warn("Message "+ message + " generated response "+ r); } } else { - GossipService.LOGGER.error("The length of the to be send message is too large (" + LOGGER.error("The length of the to be send message is too large (" + packet_length + " > " + GossipManager.MAX_PACKET_SIZE + ")."); } } catch (IOException e1) { - GossipService.LOGGER.warn(e1); + LOGGER.warn(e1); } } - + /** - * Abstract method which should be implemented by a subclass. This method should return a member - * of the list to gossip with. * * @param memberList * The list of members which are stored in the local list of members. @@ -202,8 +169,7 @@ public class ActiveGossipThread { int randomNeighborIndex = random.nextInt(memberList.size()); member = memberList.get(randomNeighborIndex); } else { - GossipService.LOGGER.debug("I am alone in this world."); - + LOGGER.debug("I am alone in this world."); } return member; } diff --git a/src/main/java/org/apache/gossip/manager/GossipCore.java b/src/main/java/org/apache/gossip/manager/GossipCore.java index 8bcba46..46d855a 100644 --- a/src/main/java/org/apache/gossip/manager/GossipCore.java +++ b/src/main/java/org/apache/gossip/manager/GossipCore.java @@ -47,6 +47,10 @@ public class GossipCore { perNodeData = new ConcurrentHashMap<>(); } + /** + * + * @param message + */ public void addPerNodeData(GossipDataMessage message){ ConcurrentHashMap m = new ConcurrentHashMap<>(); m.put(message.getKey(), message); @@ -70,7 +74,6 @@ public class GossipCore { } public void recieve(Base base){ - System.out.println(base); if (base instanceof Response){ if (base instanceof Trackable){ Trackable t = (Trackable) base; @@ -80,11 +83,6 @@ public class GossipCore { if (base instanceof GossipDataMessage) { UdpGossipDataMessage message = (UdpGossipDataMessage) base; addPerNodeData(message); - /* - UdpActiveGossipOk o = new UdpActiveGossipOk(); - o.setUriFrom(message.getUriFrom()); - o.setUuid(message.getUuid()); - sendOneWay(o, senderMember.getUri());*/ } if (base instanceof ActiveGossipMessage){ List remoteGossipMembers = new ArrayList<>(); @@ -178,11 +176,11 @@ public class GossipCore { } catch (InterruptedException e) { throw new RuntimeException(e); } catch (ExecutionException e) { - LOGGER.error(e.getMessage(), e); + LOGGER.debug(e.getMessage(), e); return null; } catch (TimeoutException e) { boolean cancelled = response.cancel(true); - LOGGER.error(String.format("Threadpool timeout attempting to contact %s, cancelled ? %b", uri.toString(), cancelled)); + LOGGER.debug(String.format("Threadpool timeout attempting to contact %s, cancelled ? %b", uri.toString(), cancelled)); return null; } finally { if (t != null){ diff --git a/src/main/java/org/apache/gossip/manager/GossipManager.java b/src/main/java/org/apache/gossip/manager/GossipManager.java index 36bc10a..94b57d1 100644 --- a/src/main/java/org/apache/gossip/manager/GossipManager.java +++ b/src/main/java/org/apache/gossip/manager/GossipManager.java @@ -41,9 +41,11 @@ import org.apache.gossip.LocalGossipMember; import org.apache.gossip.event.GossipListener; import org.apache.gossip.event.GossipState; import org.apache.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread; + import org.apache.gossip.model.GossipDataMessage; -public abstract class GossipManager extends Thread implements NotificationListener { + +public abstract class GossipManager implements NotificationListener { public static final Logger LOGGER = Logger.getLogger(GossipManager.class); @@ -180,7 +182,7 @@ public abstract class GossipManager extends Thread implements NotificationListen * Starts the client. Specifically, start the various cycles for this protocol. Start the gossip * thread and start the receiver thread. */ - public void run() { + public void init() { for (LocalGossipMember member : members.keySet()) { if (member != me) { member.startTimeoutTimer(); @@ -191,14 +193,6 @@ public abstract class GossipManager extends Thread implements NotificationListen activeGossipThread = new ActiveGossipThread(this, this.gossipCore); activeGossipThread.init(); GossipService.LOGGER.debug("The GossipService is started."); - while (gossipServiceRunning.get()) { - try { - // TODO - TimeUnit.MILLISECONDS.sleep(1); - } catch (InterruptedException e) { - GossipService.LOGGER.warn("The GossipClient was interrupted."); - } - } } /** @@ -227,7 +221,6 @@ public abstract class GossipManager extends Thread implements NotificationListen public void gossipData(GossipDataMessage message){ message.setNodeId(me.getId()); gossipCore.addPerNodeData(message); - System.out.println(this.getMyself() + " " + gossipCore.getPerNodeData()); } public GossipDataMessage findGossipData(String nodeId, String key){ diff --git a/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java b/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java index 6d440de..11c371e 100644 --- a/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java +++ b/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java @@ -121,17 +121,4 @@ abstract public class PassiveGossipThread implements Runnable { } } - /** - * Abstract method for merging the local and remote list. - * - * @param gossipManager - * The GossipManager for retrieving the local members and dead members list. - * @param senderMember - * The member who is sending this list, this could be used to send a response if the - * remote list contains out-dated information. - * @param remoteList - * The list of members known at the remote side. - */ - abstract protected void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember, - List remoteList); } \ No newline at end of file diff --git a/src/main/java/org/apache/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java b/src/main/java/org/apache/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java index 0b51573..dff5056 100644 --- a/src/main/java/org/apache/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java +++ b/src/main/java/org/apache/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java @@ -17,11 +17,6 @@ */ package org.apache.gossip.manager.impl; -import java.util.List; - -import org.apache.gossip.GossipMember; -import org.apache.gossip.LocalGossipMember; -import org.apache.gossip.RemoteGossipMember; import org.apache.gossip.manager.GossipCore; import org.apache.gossip.manager.GossipManager; import org.apache.gossip.manager.PassiveGossipThread; @@ -35,79 +30,4 @@ public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread super(gossipManager, gossipCore); } - /** - * Merge remote list (received from peer), and our local member list. Simply, we must update the - * heartbeats that the remote list has with our list. Also, some additional logic is needed to - * make sure we have not timed out a member and then immediately received a list with that member. - * - * @param gossipManager - * @param senderMember - * @param remoteList - */ - protected void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember, - List remoteList) { - - // if the person sending to us is in the dead list consider them up - for (LocalGossipMember i : gossipManager.getDeadList()) { - if (i.getId().equals(senderMember.getId())) { - LOGGER.info(gossipManager.getMyself() + " contacted by dead member " + senderMember.getUri()); - LocalGossipMember newLocalMember = new LocalGossipMember(senderMember.getClusterName(), - senderMember.getUri(), senderMember.getId(), - senderMember.getHeartbeat(), gossipManager, gossipManager.getSettings() - .getCleanupInterval()); - gossipManager.reviveMember(newLocalMember); - newLocalMember.startTimeoutTimer(); - } - } - for (GossipMember remoteMember : remoteList) { - if (remoteMember.getId().equals(gossipManager.getMyself().getId())) { - continue; - } - if (gossipManager.getLiveMembers().contains(remoteMember)) { - LocalGossipMember localMember = gossipManager.getLiveMembers().get( - gossipManager.getLiveMembers().indexOf(remoteMember)); - if (remoteMember.getHeartbeat() > localMember.getHeartbeat()) { - localMember.setHeartbeat(remoteMember.getHeartbeat()); - localMember.resetTimeoutTimer(); - } - } else if (!gossipManager.getLiveMembers().contains(remoteMember) - && !gossipManager.getDeadList().contains(remoteMember)) { - LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getClusterName(), - remoteMember.getUri(), remoteMember.getId(), - remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings() - .getCleanupInterval()); - gossipManager.createOrReviveMember(newLocalMember); - newLocalMember.startTimeoutTimer(); - } else { - if (gossipManager.getDeadList().contains(remoteMember)) { - LocalGossipMember localDeadMember = gossipManager.getDeadList().get( - gossipManager.getDeadList().indexOf(remoteMember)); - if (remoteMember.getHeartbeat() > localDeadMember.getHeartbeat()) { - LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getClusterName(), - remoteMember.getUri(), remoteMember.getId(), - remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings() - .getCleanupInterval()); - gossipManager.reviveMember(newLocalMember); - newLocalMember.startTimeoutTimer(); - LOGGER.debug("Removed remote member " + remoteMember.getAddress() - + " from dead list and added to local member list."); - } else { - LOGGER.debug("me " + gossipManager.getMyself()); - LOGGER.debug("sender " + senderMember); - LOGGER.debug("remote " + remoteList); - LOGGER.debug("live " + gossipManager.getLiveMembers()); - LOGGER.debug("dead " + gossipManager.getDeadList()); - } - } else { - LOGGER.debug("me " + gossipManager.getMyself()); - LOGGER.debug("sender " + senderMember); - LOGGER.debug("remote " + remoteList); - LOGGER.debug("live " + gossipManager.getLiveMembers()); - LOGGER.debug("dead " + gossipManager.getDeadList()); - // throw new IllegalArgumentException("wtf"); - } - } - } - } - } diff --git a/src/main/java/org/apache/gossip/model/GossipDataMessage.java b/src/main/java/org/apache/gossip/model/GossipDataMessage.java index 2128dfe..835c668 100644 --- a/src/main/java/org/apache/gossip/model/GossipDataMessage.java +++ b/src/main/java/org/apache/gossip/model/GossipDataMessage.java @@ -2,7 +2,6 @@ package org.apache.gossip.model; public class GossipDataMessage extends Base { - private String nodeId; private String key; private Object payload; diff --git a/src/test/java/org/apache/gossip/DataTest.java b/src/test/java/org/apache/gossip/DataTest.java index 3f42eeb..6260f9b 100644 --- a/src/test/java/org/apache/gossip/DataTest.java +++ b/src/test/java/org/apache/gossip/DataTest.java @@ -17,7 +17,7 @@ import org.junit.Test; import io.teknek.tunit.TUnit; public class DataTest { - + @Test public void abc() throws InterruptedException, UnknownHostException, URISyntaxException{ GossipSettings settings = new GossipSettings(); @@ -36,7 +36,7 @@ public class DataTest { startupMembers, settings, new GossipListener(){ public void gossipEvent(GossipMember member, GossipState state) { - System.out.println(member + " " + state); + } }); clients.add(gossipService); diff --git a/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java b/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java index 251550b..82cb625 100644 --- a/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java +++ b/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java @@ -29,107 +29,115 @@ import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; - import org.apache.log4j.Logger; import org.apache.gossip.event.GossipListener; import org.apache.gossip.event.GossipState; +import org.junit.platform.runner.JUnitPlatform; import org.junit.jupiter.api.Test; +import org.junit.runner.RunWith; + +@RunWith(JUnitPlatform.class) public class ShutdownDeadtimeTest { - private static final Logger log = Logger.getLogger(ShutdownDeadtimeTest.class ); + private static final Logger log = Logger.getLogger(ShutdownDeadtimeTest.class); + @Test - //@Ignore - public void DeadNodesDoNotComeAliveAgain() throws InterruptedException, UnknownHostException, URISyntaxException { - GossipSettings settings = new GossipSettings(1000, 10000); - String cluster = UUID.randomUUID().toString(); - - log.info( "Adding seed nodes" ); - int seedNodes = 3; - List startupMembers = new ArrayList<>(); - for (int i = 1; i < seedNodes + 1; ++i) { - URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i)); - startupMembers.add(new RemoteGossipMember(cluster, uri, i + "")); - } + public void DeadNodesDoNotComeAliveAgain() + throws InterruptedException, UnknownHostException, URISyntaxException { + GossipSettings settings = new GossipSettings(1000, 10000); + String cluster = UUID.randomUUID().toString(); - log.info( "Adding clients" ); - final List clients = new ArrayList<>(); - final int clusterMembers = 5; - for (int i = 1; i < clusterMembers+1; ++i) { - final int j = i; - URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i)); - GossipService gossipService = new GossipService(cluster, uri, i + "", - startupMembers, settings, - new GossipListener(){ - @Override - public void gossipEvent(GossipMember member, GossipState state) { - System.out.println(System.currentTimeMillis() + " Member "+j + " reports "+ member+" "+ state); - } - }); - clients.add(gossipService); - gossipService.start(); - } - TUnit.assertThat(new Callable (){ - public Integer call() throws Exception { - int total = 0; - for (int i = 0; i < clusterMembers; ++i) { - total += clients.get(i).get_gossipManager().getLiveMembers().size(); - } - return total; - }}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(20); + log.info("Adding seed nodes"); + int seedNodes = 3; + List startupMembers = new ArrayList<>(); + for (int i = 1; i < seedNodes + 1; ++i) { + URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i)); + startupMembers.add(new RemoteGossipMember(cluster, uri, i + "")); + } - // shutdown one client and verify that one client is lost. - Random r = new Random(); - int randomClientId = r.nextInt(clusterMembers); - log.info( "shutting down " + randomClientId ); - final int shutdownPort = clients.get(randomClientId).get_gossipManager().getMyself().getUri().getPort(); - final String shutdownId = clients.get(randomClientId).get_gossipManager().getMyself().getId(); - clients.get(randomClientId).shutdown(); - TUnit.assertThat(new Callable (){ - public Integer call() throws Exception { - int total = 0; - for (int i = 0; i < clusterMembers; ++i) { - total += clients.get(i).get_gossipManager().getLiveMembers().size(); - } - return total; - }}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(16); - clients.remove(randomClientId); - - TUnit.assertThat(new Callable (){ - public Integer call() throws Exception { - int total = 0; - for (int i = 0; i < clusterMembers - 1; ++i) { - total += clients.get(i).get_gossipManager().getDeadList().size(); - } - return total; - }}).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(4); - - URI uri = new URI("udp://" + "127.0.0.1" + ":" + shutdownPort); - // start client again - GossipService gossipService = new GossipService(cluster, uri, shutdownId + "", - startupMembers, settings, - new GossipListener(){ - @Override - public void gossipEvent(GossipMember member, GossipState state) { - //System.out.println("revived " + member+" "+ state); - } + log.info("Adding clients"); + final List clients = new ArrayList<>(); + final int clusterMembers = 5; + for (int i = 1; i < clusterMembers + 1; ++i) { + final int j = i; + URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i)); + GossipService gossipService = new GossipService(cluster, uri, i + "", startupMembers, + settings, new GossipListener() { + @Override + public void gossipEvent(GossipMember member, GossipState state) { + System.out.println(System.currentTimeMillis() + " Member " + j + " reports " + + member + " " + state); + } }); clients.add(gossipService); gossipService.start(); - - // verify that the client is alive again for every node - TUnit.assertThat(new Callable (){ - public Integer call() throws Exception { - int total = 0; - for (int i = 0; i < clusterMembers; ++i) { - total += clients.get(i).get_gossipManager().getLiveMembers().size(); - } - return total; - }}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(20); - - for (int i = 0; i < clusterMembers; ++i) { - clients.get(i).shutdown(); + } + TUnit.assertThat(new Callable() { + public Integer call() throws Exception { + int total = 0; + for (int i = 0; i < clusterMembers; ++i) { + total += clients.get(i).get_gossipManager().getLiveMembers().size(); + } + return total; } + }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(20); + + // shutdown one client and verify that one client is lost. + Random r = new Random(); + int randomClientId = r.nextInt(clusterMembers); + log.info("shutting down " + randomClientId); + final int shutdownPort = clients.get(randomClientId).get_gossipManager().getMyself().getUri() + .getPort(); + final String shutdownId = clients.get(randomClientId).get_gossipManager().getMyself().getId(); + clients.get(randomClientId).shutdown(); + TUnit.assertThat(new Callable() { + public Integer call() throws Exception { + int total = 0; + for (int i = 0; i < clusterMembers; ++i) { + total += clients.get(i).get_gossipManager().getLiveMembers().size(); + } + return total; + } + }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(16); + clients.remove(randomClientId); + + TUnit.assertThat(new Callable() { + public Integer call() throws Exception { + int total = 0; + for (int i = 0; i < clusterMembers - 1; ++i) { + total += clients.get(i).get_gossipManager().getDeadList().size(); + } + return total; + } + }).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(4); + + URI uri = new URI("udp://" + "127.0.0.1" + ":" + shutdownPort); + // start client again + GossipService gossipService = new GossipService(cluster, uri, shutdownId + "", startupMembers, + settings, new GossipListener() { + @Override + public void gossipEvent(GossipMember member, GossipState state) { + // System.out.println("revived " + member+" "+ state); + } + }); + clients.add(gossipService); + gossipService.start(); + + // verify that the client is alive again for every node + TUnit.assertThat(new Callable() { + public Integer call() throws Exception { + int total = 0; + for (int i = 0; i < clusterMembers; ++i) { + total += clients.get(i).get_gossipManager().getLiveMembers().size(); + } + return total; + } + }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(20); + + for (int i = 0; i < clusterMembers; ++i) { + clients.get(i).shutdown(); + } } } diff --git a/src/test/java/org/apache/gossip/StartupSettingsTest.java b/src/test/java/org/apache/gossip/StartupSettingsTest.java index ed069c3..3a52fc7 100644 --- a/src/test/java/org/apache/gossip/StartupSettingsTest.java +++ b/src/test/java/org/apache/gossip/StartupSettingsTest.java @@ -21,6 +21,8 @@ import org.apache.log4j.Logger; import org.json.JSONException; import io.teknek.tunit.TUnit; + +import org.junit.After; import org.junit.jupiter.api.Test; import java.io.File; @@ -32,15 +34,19 @@ import java.util.ArrayList; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; +import org.junit.platform.runner.JUnitPlatform; +import org.junit.runner.RunWith; /** * Tests support of using {@code StartupSettings} and thereby reading * setup config from file. */ +@RunWith(JUnitPlatform.class) public class StartupSettingsTest { private static final Logger log = Logger.getLogger( StartupSettingsTest.class ); private static final String CLUSTER = UUID.randomUUID().toString(); + @Test public void testUsingSettingsFile() throws IOException, InterruptedException, JSONException, URISyntaxException { File settingsFile = File.createTempFile("gossipTest",".json"); diff --git a/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java b/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java index 350fc6f..0faa968 100644 --- a/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java +++ b/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.gossip; +package org.apache.gossip; import io.teknek.tunit.TUnit; @@ -27,14 +27,16 @@ import java.util.List; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; - - +import org.junit.platform.runner.JUnitPlatform; +import org.junit.runner.RunWith; import org.apache.log4j.Logger; import org.apache.gossip.event.GossipListener; import org.apache.gossip.event.GossipState; +import org.junit.After; import org.junit.jupiter.api.Test; +@RunWith(JUnitPlatform.class) public class TenNodeThreeSeedTest { private static final Logger log = Logger.getLogger( TenNodeThreeSeedTest.class ); diff --git a/src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java b/src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java index 6c63516..875a7ab 100644 --- a/src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java +++ b/src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java @@ -24,6 +24,8 @@ import org.apache.gossip.event.GossipListener; import org.apache.gossip.event.GossipState; import org.apache.gossip.manager.random.RandomGossipManager; import org.junit.jupiter.api.Test; +import org.junit.platform.runner.JUnitPlatform; +import org.junit.runner.RunWith; import javax.management.Notification; import javax.management.NotificationListener; @@ -37,20 +39,18 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.expectThrows; - +@RunWith(JUnitPlatform.class) public class RandomGossipManagerBuilderTest { public static class TestGossipListener implements GossipListener { @Override public void gossipEvent(GossipMember member, GossipState state) { - System.out.println("Got gossip event"); } } public static class TestNotificationListener implements NotificationListener { @Override public void handleNotification(Notification notification, Object o) { - System.out.println("Got notification event"); } } @@ -73,8 +73,8 @@ public class RandomGossipManagerBuilderTest { expectThrows(IllegalArgumentException.class,() -> { RandomGossipManager.newBuilder().withId("id").cluster("aCluster").build(); }); - } + @Test public void createMembersListIfNull() throws URISyntaxException { RandomGossipManager gossipManager = RandomGossipManager.newBuilder()