diff --git a/src/main/java/org/apache/gossip/GossipService.java b/src/main/java/org/apache/gossip/GossipService.java index 68a4ca2..6c02e2c 100644 --- a/src/main/java/org/apache/gossip/GossipService.java +++ b/src/main/java/org/apache/gossip/GossipService.java @@ -24,6 +24,7 @@ import java.util.List; import org.apache.gossip.event.GossipListener; import org.apache.gossip.manager.GossipManager; import org.apache.gossip.manager.random.RandomGossipManager; +import org.apache.gossip.model.GossipDataMessage; import org.apache.log4j.Logger; /** @@ -81,6 +82,19 @@ public class GossipService { public GossipManager get_gossipManager() { return gossipManager; } + + /** + * Gossip data to the entire cluster + * @param message + */ + public void gossipData(GossipDataMessage message){ + gossipManager.gossipData(message); + } + + + public GossipDataMessage findGossipData(String nodeId, String key){ + return this.get_gossipManager().findGossipData(nodeId, key); + } public void set_gossipManager(GossipManager _gossipManager) { this.gossipManager = _gossipManager; diff --git a/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java b/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java index b57c25a..19caffe 100644 --- a/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java +++ b/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java @@ -20,17 +20,23 @@ package org.apache.gossip.manager; import java.io.IOException; import java.net.DatagramSocket; import java.util.List; + +import java.util.Map.Entry; import java.util.Random; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import org.apache.gossip.GossipService; + import org.apache.gossip.LocalGossipMember; import org.apache.gossip.model.ActiveGossipOk; +import org.apache.gossip.model.GossipDataMessage; import org.apache.gossip.model.GossipMember; import org.apache.gossip.model.Response; import org.apache.gossip.udp.UdpActiveGossipMessage; +import org.apache.gossip.udp.UdpGossipDataMessage; + import org.apache.log4j.Logger; import org.codehaus.jackson.map.ObjectMapper; @@ -41,21 +47,21 @@ import org.codehaus.jackson.map.ObjectMapper; */ public class ActiveGossipThread { - public static final Logger LOGGER = Logger.getLogger(ActiveGossipThread.class); + private static final Logger LOGGER = Logger.getLogger(ActiveGossipThread.class); - private ScheduledExecutorService scheduledExecutorService ; - private ObjectMapper MAPPER = new ObjectMapper(); + private final GossipManager gossipManager; private final Random random; - protected final GossipManager gossipManager; private final GossipCore gossipCore; + private ScheduledExecutorService scheduledExecutorService; + private ObjectMapper MAPPER = new ObjectMapper(); public ActiveGossipThread(GossipManager gossipManager, GossipCore gossipCore) { this.gossipManager = gossipManager; - this.scheduledExecutorService = Executors.newScheduledThreadPool(1024); + random = new Random(); this.gossipCore = gossipCore; - this.random = new Random(); + this.scheduledExecutorService = Executors.newScheduledThreadPool(1024); } - + public void init() { scheduledExecutorService.scheduleAtFixedRate( () -> sendMembershipList(gossipManager.getMyself(), gossipManager.getLiveMembers()), 0, @@ -63,29 +69,65 @@ public class ActiveGossipThread { scheduledExecutorService.scheduleAtFixedRate( () -> sendMembershipList(gossipManager.getMyself(), gossipManager.getDeadMembers()), 0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS); + scheduledExecutorService.scheduleAtFixedRate( + () -> sendData(gossipManager.getMyself(), gossipManager.getLiveMembers()), 0, + gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS); } public void shutdown() { - this.scheduledExecutorService.shutdown(); + scheduledExecutorService.shutdown(); try { - this.scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS); + scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS); } catch (InterruptedException e) { - LOGGER.warn("Did not complete shutdown", e); + LOGGER.debug("Issue during shurdown" + e); } } + public void sendData(LocalGossipMember me, List memberList){ + LocalGossipMember member = selectPartner(memberList); + if (member == null) { + LOGGER.debug("Send sendMembershipList() is called without action"); + return; + } + try (DatagramSocket socket = new DatagramSocket()) { + socket.setSoTimeout(gossipManager.getSettings().getGossipInterval()); + for (Entry> entry : gossipCore.getPerNodeData().entrySet()){ + for (Entry innerEntry : entry.getValue().entrySet()){ + UdpGossipDataMessage message = new UdpGossipDataMessage(); + message.setUuid(UUID.randomUUID().toString()); + message.setUriFrom(me.getId()); + message.setExpireAt(innerEntry.getValue().getExpireAt()); + message.setKey(innerEntry.getValue().getKey()); + message.setNodeId(innerEntry.getValue().getNodeId()); + message.setTimestamp(innerEntry.getValue().getTimestamp()); + message.setPayload(innerEntry.getValue().getPayload()); + message.setTimestamp(innerEntry.getValue().getTimestamp()); + byte[] json_bytes = MAPPER.writeValueAsString(message).getBytes(); + int packet_length = json_bytes.length; + if (packet_length < GossipManager.MAX_PACKET_SIZE) { + gossipCore.sendOneWay(message, member.getUri()); + } else { + LOGGER.error("The length of the to be send message is too large (" + + packet_length + " > " + GossipManager.MAX_PACKET_SIZE + ")."); + } + } + } + } catch (IOException e1) { + LOGGER.warn(e1); + } + } + /** * Performs the sending of the membership list, after we have incremented our own heartbeat. */ - protected void sendMembershipList(LocalGossipMember me, List memberList) { - + protected void sendMembershipList(LocalGossipMember me, List memberList) { me.setHeartbeat(System.currentTimeMillis()); LocalGossipMember member = selectPartner(memberList); if (member == null) { - GossipService.LOGGER.debug("Send sendMembershipList() is called without action"); + LOGGER.debug("Send sendMembershipList() is called without action"); return; } else { - GossipService.LOGGER.debug("Send sendMembershipList() is called to " + member.toString()); + LOGGER.debug("Send sendMembershipList() is called to " + member.toString()); } try (DatagramSocket socket = new DatagramSocket()) { @@ -107,31 +149,30 @@ public class ActiveGossipThread { LOGGER.warn("Message "+ message + " generated response "+ r); } } else { - GossipService.LOGGER.error("The length of the to be send message is too large (" + LOGGER.error("The length of the to be send message is too large (" + packet_length + " > " + GossipManager.MAX_PACKET_SIZE + ")."); } } catch (IOException e1) { - GossipService.LOGGER.warn(e1); + LOGGER.warn(e1); } } + /** - * Abstract method which should be implemented by a subclass. This method should return a member - * of the list to gossip with. * * @param memberList * The list of members which are stored in the local list of members. * @return The chosen LocalGossipMember to gossip with. */ - protected LocalGossipMember selectPartner(List memberList) { - LocalGossipMember member = null; - if (memberList.size() > 0) { - int randomNeighborIndex = random.nextInt(memberList.size()); - member = memberList.get(randomNeighborIndex); - } else { - LOGGER.debug("I am alone in this world."); - } - return member; - } + protected LocalGossipMember selectPartner(List memberList) { + LocalGossipMember member = null; + if (memberList.size() > 0) { + int randomNeighborIndex = random.nextInt(memberList.size()); + member = memberList.get(randomNeighborIndex); + } else { + LOGGER.debug("I am alone in this world."); + } + return member; + } private GossipMember convert(LocalGossipMember member){ GossipMember gm = new GossipMember(); diff --git a/src/main/java/org/apache/gossip/manager/GossipCore.java b/src/main/java/org/apache/gossip/manager/GossipCore.java index ab24621..46d855a 100644 --- a/src/main/java/org/apache/gossip/manager/GossipCore.java +++ b/src/main/java/org/apache/gossip/manager/GossipCore.java @@ -21,10 +21,12 @@ import org.apache.gossip.LocalGossipMember; import org.apache.gossip.RemoteGossipMember; import org.apache.gossip.model.ActiveGossipMessage; import org.apache.gossip.model.Base; +import org.apache.gossip.model.GossipDataMessage; import org.apache.gossip.model.Response; import org.apache.gossip.udp.Trackable; import org.apache.gossip.udp.UdpActiveGossipMessage; import org.apache.gossip.udp.UdpActiveGossipOk; +import org.apache.gossip.udp.UdpGossipDataMessage; import org.apache.gossip.udp.UdpNotAMemberFault; import org.apache.log4j.Logger; import org.codehaus.jackson.map.ObjectMapper; @@ -34,15 +36,32 @@ public class GossipCore { public static final Logger LOGGER = Logger.getLogger(GossipCore.class); private static final ObjectMapper MAPPER = new ObjectMapper(); private final GossipManager gossipManager; - private ConcurrentHashMap requests; - private ExecutorService service; + private final ConcurrentHashMap> perNodeData; public GossipCore(GossipManager manager){ this.gossipManager = manager; requests = new ConcurrentHashMap<>(); service = Executors.newFixedThreadPool(500); + perNodeData = new ConcurrentHashMap<>(); + } + + /** + * + * @param message + */ + public void addPerNodeData(GossipDataMessage message){ + ConcurrentHashMap m = new ConcurrentHashMap<>(); + m.put(message.getKey(), message); + m = perNodeData.putIfAbsent(message.getNodeId(), m); + if (m != null){ + m.put(message.getKey(), message); //TODO only put if > ts + } + } + + public ConcurrentHashMap> getPerNodeData(){ + return perNodeData; } public void shutdown(){ @@ -61,6 +80,10 @@ public class GossipCore { requests.put(t.getUuid() + "/" + t.getUriFrom(), (Base) t); } } + if (base instanceof GossipDataMessage) { + UdpGossipDataMessage message = (UdpGossipDataMessage) base; + addPerNodeData(message); + } if (base instanceof ActiveGossipMessage){ List remoteGossipMembers = new ArrayList<>(); RemoteGossipMember senderMember = null; @@ -153,11 +176,11 @@ public class GossipCore { } catch (InterruptedException e) { throw new RuntimeException(e); } catch (ExecutionException e) { - LOGGER.error(e.getMessage(), e); + LOGGER.debug(e.getMessage(), e); return null; } catch (TimeoutException e) { boolean cancelled = response.cancel(true); - LOGGER.error(String.format("Threadpool timeout attempting to contact %s, cancelled ? %b", uri.toString(), cancelled)); + LOGGER.debug(String.format("Threadpool timeout attempting to contact %s, cancelled ? %b", uri.toString(), cancelled)); return null; } finally { if (t != null){ diff --git a/src/main/java/org/apache/gossip/manager/GossipManager.java b/src/main/java/org/apache/gossip/manager/GossipManager.java index 0b2cfd2..94b57d1 100644 --- a/src/main/java/org/apache/gossip/manager/GossipManager.java +++ b/src/main/java/org/apache/gossip/manager/GossipManager.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -41,6 +42,9 @@ import org.apache.gossip.event.GossipListener; import org.apache.gossip.event.GossipState; import org.apache.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread; +import org.apache.gossip.model.GossipDataMessage; + + public abstract class GossipManager implements NotificationListener { public static final Logger LOGGER = Logger.getLogger(GossipManager.class); @@ -213,4 +217,19 @@ public abstract class GossipManager implements NotificationListener { LOGGER.error(e); } } + + public void gossipData(GossipDataMessage message){ + message.setNodeId(me.getId()); + gossipCore.addPerNodeData(message); + } + + public GossipDataMessage findGossipData(String nodeId, String key){ + ConcurrentHashMap j = gossipCore.getPerNodeData().get(nodeId); + if (j == null){ + return null; + } else { + return j.get(key); + } + } + } diff --git a/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java b/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java index 6d440de..11c371e 100644 --- a/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java +++ b/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java @@ -121,17 +121,4 @@ abstract public class PassiveGossipThread implements Runnable { } } - /** - * Abstract method for merging the local and remote list. - * - * @param gossipManager - * The GossipManager for retrieving the local members and dead members list. - * @param senderMember - * The member who is sending this list, this could be used to send a response if the - * remote list contains out-dated information. - * @param remoteList - * The list of members known at the remote side. - */ - abstract protected void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember, - List remoteList); } \ No newline at end of file diff --git a/src/main/java/org/apache/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java b/src/main/java/org/apache/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java index 0b51573..dff5056 100644 --- a/src/main/java/org/apache/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java +++ b/src/main/java/org/apache/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java @@ -17,11 +17,6 @@ */ package org.apache.gossip.manager.impl; -import java.util.List; - -import org.apache.gossip.GossipMember; -import org.apache.gossip.LocalGossipMember; -import org.apache.gossip.RemoteGossipMember; import org.apache.gossip.manager.GossipCore; import org.apache.gossip.manager.GossipManager; import org.apache.gossip.manager.PassiveGossipThread; @@ -35,79 +30,4 @@ public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread super(gossipManager, gossipCore); } - /** - * Merge remote list (received from peer), and our local member list. Simply, we must update the - * heartbeats that the remote list has with our list. Also, some additional logic is needed to - * make sure we have not timed out a member and then immediately received a list with that member. - * - * @param gossipManager - * @param senderMember - * @param remoteList - */ - protected void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember, - List remoteList) { - - // if the person sending to us is in the dead list consider them up - for (LocalGossipMember i : gossipManager.getDeadList()) { - if (i.getId().equals(senderMember.getId())) { - LOGGER.info(gossipManager.getMyself() + " contacted by dead member " + senderMember.getUri()); - LocalGossipMember newLocalMember = new LocalGossipMember(senderMember.getClusterName(), - senderMember.getUri(), senderMember.getId(), - senderMember.getHeartbeat(), gossipManager, gossipManager.getSettings() - .getCleanupInterval()); - gossipManager.reviveMember(newLocalMember); - newLocalMember.startTimeoutTimer(); - } - } - for (GossipMember remoteMember : remoteList) { - if (remoteMember.getId().equals(gossipManager.getMyself().getId())) { - continue; - } - if (gossipManager.getLiveMembers().contains(remoteMember)) { - LocalGossipMember localMember = gossipManager.getLiveMembers().get( - gossipManager.getLiveMembers().indexOf(remoteMember)); - if (remoteMember.getHeartbeat() > localMember.getHeartbeat()) { - localMember.setHeartbeat(remoteMember.getHeartbeat()); - localMember.resetTimeoutTimer(); - } - } else if (!gossipManager.getLiveMembers().contains(remoteMember) - && !gossipManager.getDeadList().contains(remoteMember)) { - LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getClusterName(), - remoteMember.getUri(), remoteMember.getId(), - remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings() - .getCleanupInterval()); - gossipManager.createOrReviveMember(newLocalMember); - newLocalMember.startTimeoutTimer(); - } else { - if (gossipManager.getDeadList().contains(remoteMember)) { - LocalGossipMember localDeadMember = gossipManager.getDeadList().get( - gossipManager.getDeadList().indexOf(remoteMember)); - if (remoteMember.getHeartbeat() > localDeadMember.getHeartbeat()) { - LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getClusterName(), - remoteMember.getUri(), remoteMember.getId(), - remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings() - .getCleanupInterval()); - gossipManager.reviveMember(newLocalMember); - newLocalMember.startTimeoutTimer(); - LOGGER.debug("Removed remote member " + remoteMember.getAddress() - + " from dead list and added to local member list."); - } else { - LOGGER.debug("me " + gossipManager.getMyself()); - LOGGER.debug("sender " + senderMember); - LOGGER.debug("remote " + remoteList); - LOGGER.debug("live " + gossipManager.getLiveMembers()); - LOGGER.debug("dead " + gossipManager.getDeadList()); - } - } else { - LOGGER.debug("me " + gossipManager.getMyself()); - LOGGER.debug("sender " + senderMember); - LOGGER.debug("remote " + remoteList); - LOGGER.debug("live " + gossipManager.getLiveMembers()); - LOGGER.debug("dead " + gossipManager.getDeadList()); - // throw new IllegalArgumentException("wtf"); - } - } - } - } - } diff --git a/src/main/java/org/apache/gossip/model/Base.java b/src/main/java/org/apache/gossip/model/Base.java index ebb3215..66c2be6 100644 --- a/src/main/java/org/apache/gossip/model/Base.java +++ b/src/main/java/org/apache/gossip/model/Base.java @@ -2,6 +2,7 @@ package org.apache.gossip.model; import org.apache.gossip.udp.UdpActiveGossipMessage; import org.apache.gossip.udp.UdpActiveGossipOk; +import org.apache.gossip.udp.UdpGossipDataMessage; import org.apache.gossip.udp.UdpNotAMemberFault; import org.codehaus.jackson.annotate.JsonSubTypes; import org.codehaus.jackson.annotate.JsonSubTypes.Type; @@ -17,7 +18,9 @@ import org.codehaus.jackson.annotate.JsonTypeInfo; @Type(value = ActiveGossipOk.class, name = "ActiveGossipOk"), @Type(value = UdpActiveGossipOk.class, name = "UdpActiveGossipOk"), @Type(value = UdpActiveGossipMessage.class, name = "UdpActiveGossipMessage"), - @Type(value = UdpNotAMemberFault.class, name = "UdpNotAMemberFault") + @Type(value = UdpNotAMemberFault.class, name = "UdpNotAMemberFault"), + @Type(value = GossipDataMessage.class, name = "GossipDataMessage"), + @Type(value = UdpGossipDataMessage.class, name = "UdpGossipDataMessage") }) public class Base { diff --git a/src/main/java/org/apache/gossip/model/GossipDataMessage.java b/src/main/java/org/apache/gossip/model/GossipDataMessage.java new file mode 100644 index 0000000..835c668 --- /dev/null +++ b/src/main/java/org/apache/gossip/model/GossipDataMessage.java @@ -0,0 +1,49 @@ +package org.apache.gossip.model; + +public class GossipDataMessage extends Base { + + private String nodeId; + private String key; + private Object payload; + private Long timestamp; + private Long expireAt; + + public String getNodeId() { + return nodeId; + } + public void setNodeId(String nodeId) { + this.nodeId = nodeId; + } + public String getKey() { + return key; + } + public void setKey(String key) { + this.key = key; + } + public Object getPayload() { + return payload; + } + public void setPayload(Object payload) { + this.payload = payload; + } + public Long getTimestamp() { + return timestamp; + } + public void setTimestamp(Long timestamp) { + this.timestamp = timestamp; + } + public Long getExpireAt() { + return expireAt; + } + public void setExpireAt(Long expireAt) { + this.expireAt = expireAt; + } + @Override + public String toString() { + return "GossipDataMessage [nodeId=" + nodeId + ", key=" + key + ", payload=" + payload + + ", timestamp=" + timestamp + ", expireAt=" + expireAt + "]"; + } + + + +} diff --git a/src/main/java/org/apache/gossip/udp/UdpGossipDataMessage.java b/src/main/java/org/apache/gossip/udp/UdpGossipDataMessage.java new file mode 100644 index 0000000..2ee4bbf --- /dev/null +++ b/src/main/java/org/apache/gossip/udp/UdpGossipDataMessage.java @@ -0,0 +1,31 @@ +package org.apache.gossip.udp; + +import org.apache.gossip.model.GossipDataMessage; + +public class UdpGossipDataMessage extends GossipDataMessage implements Trackable { + + private String uriFrom; + private String uuid; + + public String getUriFrom() { + return uriFrom; + } + + public void setUriFrom(String uriFrom) { + this.uriFrom = uriFrom; + } + + public String getUuid() { + return uuid; + } + + public void setUuid(String uuid) { + this.uuid = uuid; + } + + @Override + public String toString() { + return "UdpGossipDataMessage [uriFrom=" + uriFrom + ", uuid=" + uuid + "]"; + } + +} diff --git a/src/test/java/org/apache/gossip/DataTest.java b/src/test/java/org/apache/gossip/DataTest.java new file mode 100644 index 0000000..6260f9b --- /dev/null +++ b/src/test/java/org/apache/gossip/DataTest.java @@ -0,0 +1,81 @@ +package org.apache.gossip; + +import java.net.URI; +import java.net.URISyntaxException; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + +import org.apache.gossip.event.GossipListener; +import org.apache.gossip.event.GossipState; +import org.apache.gossip.model.GossipDataMessage; +import org.junit.Test; + +import io.teknek.tunit.TUnit; + +public class DataTest { + + @Test + public void abc() throws InterruptedException, UnknownHostException, URISyntaxException{ + GossipSettings settings = new GossipSettings(); + String cluster = UUID.randomUUID().toString(); + int seedNodes = 1; + List startupMembers = new ArrayList<>(); + for (int i = 1; i < seedNodes+1; ++i) { + URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i)); + startupMembers.add(new RemoteGossipMember(cluster, uri, i + "")); + } + final List clients = new ArrayList<>(); + final int clusterMembers = 2; + for (int i = 1; i < clusterMembers+1; ++i) { + URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i)); + GossipService gossipService = new GossipService(cluster, uri, i + "", + startupMembers, settings, + new GossipListener(){ + public void gossipEvent(GossipMember member, GossipState state) { + + } + }); + clients.add(gossipService); + gossipService.start(); + } + TUnit.assertThat(new Callable (){ + public Integer call() throws Exception { + int total = 0; + for (int i = 0; i < clusterMembers; ++i) { + total += clients.get(i).get_gossipManager().getLiveMembers().size(); + } + return total; + }}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(2); + clients.get(0).gossipData(msg()); + Thread.sleep(10000); + TUnit.assertThat( + + new Callable (){ + public Object call() throws Exception { + GossipDataMessage x = clients.get(1).findGossipData(1+"" , "a"); + if (x == null) return ""; + else return x.getPayload(); + }}) + + + //() -> clients.get(1).findGossipData(1+"" , "a").getPayload()) + .afterWaitingAtMost(20, TimeUnit.SECONDS) + .isEqualTo("b"); + for (int i = 0; i < clusterMembers; ++i) { + clients.get(i).shutdown(); + } + } + + private GossipDataMessage msg(){ + GossipDataMessage g = new GossipDataMessage(); + g.setExpireAt(Long.MAX_VALUE); + g.setKey("a"); + g.setPayload("b"); + g.setTimestamp(System.currentTimeMillis()); + return g; + } +} diff --git a/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java b/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java index f0d7f10..82cb625 100644 --- a/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java +++ b/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java @@ -29,7 +29,6 @@ import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; - import org.apache.log4j.Logger; import org.apache.gossip.event.GossipListener; @@ -43,97 +42,102 @@ import org.junit.runner.RunWith; public class ShutdownDeadtimeTest { private static final Logger log = Logger.getLogger(ShutdownDeadtimeTest.class); - + @Test - public void DeadNodesDoNotComeAliveAgain() throws InterruptedException, UnknownHostException, URISyntaxException { - GossipSettings settings = new GossipSettings(1000, 10000); - String cluster = UUID.randomUUID().toString(); - - log.info( "Adding seed nodes" ); - int seedNodes = 3; - List startupMembers = new ArrayList<>(); - for (int i = 1; i < seedNodes + 1; ++i) { - URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i)); - startupMembers.add(new RemoteGossipMember(cluster, uri, i + "")); - } + public void DeadNodesDoNotComeAliveAgain() + throws InterruptedException, UnknownHostException, URISyntaxException { + GossipSettings settings = new GossipSettings(1000, 10000); + String cluster = UUID.randomUUID().toString(); - log.info( "Adding clients" ); - final List clients = new ArrayList<>(); - final int clusterMembers = 5; - for (int i = 1; i < clusterMembers+1; ++i) { - final int j = i; - URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i)); - GossipService gossipService = new GossipService(cluster, uri, i + "", - startupMembers, settings, - new GossipListener(){ - @Override - public void gossipEvent(GossipMember member, GossipState state) { - System.out.println(System.currentTimeMillis() + " Member "+j + " reports "+ member+" "+ state); - } - }); - clients.add(gossipService); - gossipService.start(); - } - TUnit.assertThat(new Callable (){ - public Integer call() throws Exception { - int total = 0; - for (int i = 0; i < clusterMembers; ++i) { - total += clients.get(i).get_gossipManager().getLiveMembers().size(); - } - return total; - }}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(20); + log.info("Adding seed nodes"); + int seedNodes = 3; + List startupMembers = new ArrayList<>(); + for (int i = 1; i < seedNodes + 1; ++i) { + URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i)); + startupMembers.add(new RemoteGossipMember(cluster, uri, i + "")); + } - // shutdown one client and verify that one client is lost. - Random r = new Random(); - int randomClientId = r.nextInt(clusterMembers); - log.info( "shutting down " + randomClientId ); - final int shutdownPort = clients.get(randomClientId).get_gossipManager().getMyself().getUri().getPort(); - final String shutdownId = clients.get(randomClientId).get_gossipManager().getMyself().getId(); - clients.get(randomClientId).shutdown(); - TUnit.assertThat(new Callable (){ - public Integer call() throws Exception { - int total = 0; - for (int i = 0; i < clusterMembers; ++i) { - total += clients.get(i).get_gossipManager().getLiveMembers().size(); - } - return total; - }}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(16); - clients.remove(randomClientId); - - TUnit.assertThat(new Callable (){ - public Integer call() throws Exception { - int total = 0; - for (int i = 0; i < clusterMembers - 1; ++i) { - total += clients.get(i).get_gossipManager().getDeadList().size(); - } - return total; - }}).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(4); - - URI uri = new URI("udp://" + "127.0.0.1" + ":" + shutdownPort); - // start client again - GossipService gossipService = new GossipService(cluster, uri, shutdownId + "", - startupMembers, settings, - new GossipListener(){ - @Override - public void gossipEvent(GossipMember member, GossipState state) { - //System.out.println("revived " + member+" "+ state); - } + log.info("Adding clients"); + final List clients = new ArrayList<>(); + final int clusterMembers = 5; + for (int i = 1; i < clusterMembers + 1; ++i) { + final int j = i; + URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i)); + GossipService gossipService = new GossipService(cluster, uri, i + "", startupMembers, + settings, new GossipListener() { + @Override + public void gossipEvent(GossipMember member, GossipState state) { + System.out.println(System.currentTimeMillis() + " Member " + j + " reports " + + member + " " + state); + } }); clients.add(gossipService); gossipService.start(); - - // verify that the client is alive again for every node - TUnit.assertThat(new Callable (){ - public Integer call() throws Exception { - int total = 0; - for (int i = 0; i < clusterMembers; ++i) { - total += clients.get(i).get_gossipManager().getLiveMembers().size(); - } - return total; - }}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(20); - - for (int i = 0; i < clusterMembers; ++i) { - clients.get(i).shutdown(); + } + TUnit.assertThat(new Callable() { + public Integer call() throws Exception { + int total = 0; + for (int i = 0; i < clusterMembers; ++i) { + total += clients.get(i).get_gossipManager().getLiveMembers().size(); + } + return total; } + }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(20); + + // shutdown one client and verify that one client is lost. + Random r = new Random(); + int randomClientId = r.nextInt(clusterMembers); + log.info("shutting down " + randomClientId); + final int shutdownPort = clients.get(randomClientId).get_gossipManager().getMyself().getUri() + .getPort(); + final String shutdownId = clients.get(randomClientId).get_gossipManager().getMyself().getId(); + clients.get(randomClientId).shutdown(); + TUnit.assertThat(new Callable() { + public Integer call() throws Exception { + int total = 0; + for (int i = 0; i < clusterMembers; ++i) { + total += clients.get(i).get_gossipManager().getLiveMembers().size(); + } + return total; + } + }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(16); + clients.remove(randomClientId); + + TUnit.assertThat(new Callable() { + public Integer call() throws Exception { + int total = 0; + for (int i = 0; i < clusterMembers - 1; ++i) { + total += clients.get(i).get_gossipManager().getDeadList().size(); + } + return total; + } + }).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(4); + + URI uri = new URI("udp://" + "127.0.0.1" + ":" + shutdownPort); + // start client again + GossipService gossipService = new GossipService(cluster, uri, shutdownId + "", startupMembers, + settings, new GossipListener() { + @Override + public void gossipEvent(GossipMember member, GossipState state) { + // System.out.println("revived " + member+" "+ state); + } + }); + clients.add(gossipService); + gossipService.start(); + + // verify that the client is alive again for every node + TUnit.assertThat(new Callable() { + public Integer call() throws Exception { + int total = 0; + for (int i = 0; i < clusterMembers; ++i) { + total += clients.get(i).get_gossipManager().getLiveMembers().size(); + } + return total; + } + }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(20); + + for (int i = 0; i < clusterMembers; ++i) { + clients.get(i).shutdown(); + } } } diff --git a/src/test/java/org/apache/gossip/StartupSettingsTest.java b/src/test/java/org/apache/gossip/StartupSettingsTest.java index 9019ac1..3a52fc7 100644 --- a/src/test/java/org/apache/gossip/StartupSettingsTest.java +++ b/src/test/java/org/apache/gossip/StartupSettingsTest.java @@ -21,6 +21,8 @@ import org.apache.log4j.Logger; import org.json.JSONException; import io.teknek.tunit.TUnit; + +import org.junit.After; import org.junit.jupiter.api.Test; import java.io.File; @@ -44,6 +46,7 @@ public class StartupSettingsTest { private static final Logger log = Logger.getLogger( StartupSettingsTest.class ); private static final String CLUSTER = UUID.randomUUID().toString(); + @Test public void testUsingSettingsFile() throws IOException, InterruptedException, JSONException, URISyntaxException { File settingsFile = File.createTempFile("gossipTest",".json"); diff --git a/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java b/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java index c98b0d3..0faa968 100644 --- a/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java +++ b/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.gossip; +package org.apache.gossip; import io.teknek.tunit.TUnit; @@ -33,6 +33,7 @@ import org.apache.log4j.Logger; import org.apache.gossip.event.GossipListener; import org.apache.gossip.event.GossipState; +import org.junit.After; import org.junit.jupiter.api.Test; @RunWith(JUnitPlatform.class) diff --git a/src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java b/src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java index ab3242c..875a7ab 100644 --- a/src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java +++ b/src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java @@ -45,14 +45,12 @@ public class RandomGossipManagerBuilderTest { public static class TestGossipListener implements GossipListener { @Override public void gossipEvent(GossipMember member, GossipState state) { - System.out.println("Got gossip event"); } } public static class TestNotificationListener implements NotificationListener { @Override public void handleNotification(Notification notification, Object o) { - System.out.println("Got notification event"); } } @@ -75,8 +73,8 @@ public class RandomGossipManagerBuilderTest { expectThrows(IllegalArgumentException.class,() -> { RandomGossipManager.newBuilder().withId("id").cluster("aCluster").build(); }); - } + @Test public void createMembersListIfNull() throws URISyntaxException { RandomGossipManager gossipManager = RandomGossipManager.newBuilder()