diff --git a/src/main/java/org/apache/gossip/GossipService.java b/src/main/java/org/apache/gossip/GossipService.java index ce15992..6f9b5be 100644 --- a/src/main/java/org/apache/gossip/GossipService.java +++ b/src/main/java/org/apache/gossip/GossipService.java @@ -24,6 +24,7 @@ import java.util.List; import org.apache.gossip.event.GossipListener; import org.apache.gossip.manager.GossipManager; import org.apache.gossip.manager.random.RandomGossipManager; +import org.apache.gossip.model.GossipDataMessage; import org.apache.log4j.Logger; /** @@ -81,6 +82,19 @@ public class GossipService { public GossipManager get_gossipManager() { return gossipManager; } + + /** + * Gossip data to the entire cluster + * @param message + */ + public void gossipData(GossipDataMessage message){ + gossipManager.gossipData(message); + } + + + public GossipDataMessage findGossipData(String nodeId, String key){ + return this.get_gossipManager().findGossipData(nodeId, key); + } public void set_gossipManager(GossipManager _gossipManager) { this.gossipManager = _gossipManager; diff --git a/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java b/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java index 181d9ae..4790c09 100644 --- a/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java +++ b/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java @@ -17,58 +17,176 @@ */ package org.apache.gossip.manager; +import java.io.IOException; +import java.net.DatagramSocket; import java.util.List; +import java.util.Map.Entry; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import org.apache.gossip.GossipService; import org.apache.gossip.LocalGossipMember; +import org.apache.gossip.model.ActiveGossipOk; +import org.apache.gossip.model.GossipDataMessage; +import org.apache.gossip.model.GossipMember; +import org.apache.gossip.model.Response; +import org.apache.gossip.udp.UdpActiveGossipMessage; +import org.apache.gossip.udp.UdpGossipDataMessage; +import org.apache.log4j.Logger; +import org.codehaus.jackson.map.ObjectMapper; /** * [The active thread: periodically send gossip request.] The class handles gossiping the membership * list. This information is important to maintaining a common state among all the nodes, and is * important for detecting failures. */ -abstract public class ActiveGossipThread implements Runnable { +public class ActiveGossipThread { - protected final GossipManager gossipManager; + private static final Logger LOGGER = Logger.getLogger(ActiveGossipThread.class); + + private final GossipManager gossipManager; + private final Random random; + private final GossipCore gossipCore; + private ScheduledExecutorService scheduledExecutorService; + private ObjectMapper MAPPER = new ObjectMapper(); - private final AtomicBoolean keepRunning; - - public ActiveGossipThread(GossipManager gossipManager) { + public ActiveGossipThread(GossipManager gossipManager, GossipCore gossipCore) { this.gossipManager = gossipManager; - this.keepRunning = new AtomicBoolean(true); + random = new Random(); + this.gossipCore = gossipCore; + this.scheduledExecutorService = Executors.newScheduledThreadPool(1024); } - @Override - public void run() { - while (keepRunning.get()) { - try { - TimeUnit.MILLISECONDS.sleep(gossipManager.getSettings().getGossipInterval()); - - // contact a live member. - sendMembershipList(gossipManager.getMyself(), gossipManager.getLiveMembers()); - - // contact a dead member. - sendMembershipList(gossipManager.getMyself(), gossipManager.getDeadMembers()); - - } catch (InterruptedException e) { - GossipService.LOGGER.error(e); - keepRunning.set(false); + public void init(){ + Runnable liveGossip = new Runnable(){ + @Override + public void run() { + try { + sendMembershipList(gossipManager.getMyself(), gossipManager.getLiveMembers()); + } catch (RuntimeException ex){ + LOGGER.warn(ex); + } } - } - shutdown(); + }; + scheduledExecutorService.scheduleAtFixedRate(liveGossip, 0, + gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS); + Runnable deadGossip = new Runnable(){ + @Override + public void run() { + try { + sendMembershipList(gossipManager.getMyself(), gossipManager.getDeadMembers()); + } catch (RuntimeException ex){ + LOGGER.warn(ex); + } + } + }; + scheduledExecutorService.scheduleAtFixedRate(deadGossip, 0, + gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS); + Runnable dataGossip = new Runnable(){ + @Override + public void run() { + try { + sendData(gossipManager.getMyself(), gossipManager.getLiveMembers()); + } catch (RuntimeException ex){ + LOGGER.warn(ex); + } + } + }; + scheduledExecutorService.scheduleAtFixedRate(dataGossip, 0, + gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS); + } public void shutdown() { - keepRunning.set(false); + scheduledExecutorService.shutdown(); + try { + scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOGGER.warn(e); + } } + public void sendData(LocalGossipMember me, List memberList){ + LocalGossipMember member = selectPartner(memberList); + if (member == null) { + GossipService.LOGGER.debug("Send sendMembershipList() is called without action"); + return; + } + try (DatagramSocket socket = new DatagramSocket()) { + socket.setSoTimeout(gossipManager.getSettings().getGossipInterval()); + for (Entry> entry : gossipCore.getPerNodeData().entrySet()){ + for (Entry innerEntry : entry.getValue().entrySet()){ + UdpGossipDataMessage message = new UdpGossipDataMessage(); + System.out.println("sending message " + message); + message.setUuid(UUID.randomUUID().toString()); + message.setUriFrom(me.getId()); + message.setExpireAt(innerEntry.getValue().getExpireAt()); + message.setKey(innerEntry.getValue().getKey()); + message.setNodeId(innerEntry.getValue().getNodeId()); + message.setTimestamp(innerEntry.getValue().getTimestamp()); + message.setPayload(innerEntry.getValue().getPayload()); + message.setTimestamp(innerEntry.getValue().getTimestamp()); + byte[] json_bytes = MAPPER.writeValueAsString(message).getBytes(); + int packet_length = json_bytes.length; + if (packet_length < GossipManager.MAX_PACKET_SIZE) { + //Response r = gossipCore.send(message, member.getUri()); + gossipCore.sendOneWay(message, member.getUri()); + //TODO: ack this message + } else { + GossipService.LOGGER.error("The length of the to be send message is too large (" + + packet_length + " > " + GossipManager.MAX_PACKET_SIZE + ")."); + } + } + } + } catch (IOException e1) { + GossipService.LOGGER.warn(e1); + } + } + /** * Performs the sending of the membership list, after we have incremented our own heartbeat. */ - abstract protected void sendMembershipList(LocalGossipMember me, - List memberList); + protected void sendMembershipList(LocalGossipMember me, List memberList) { + + me.setHeartbeat(System.currentTimeMillis()); + LocalGossipMember member = selectPartner(memberList); + if (member == null) { + GossipService.LOGGER.debug("Send sendMembershipList() is called without action"); + return; + } else { + GossipService.LOGGER.debug("Send sendMembershipList() is called to " + member.toString()); + } + + try (DatagramSocket socket = new DatagramSocket()) { + socket.setSoTimeout(gossipManager.getSettings().getGossipInterval()); + UdpActiveGossipMessage message = new UdpActiveGossipMessage(); + message.setUriFrom(gossipManager.getMyself().getUri().toASCIIString()); + message.setUuid(UUID.randomUUID().toString()); + message.getMembers().add(convert(me)); + for (LocalGossipMember other : memberList) { + message.getMembers().add(convert(other)); + } + byte[] json_bytes = MAPPER.writeValueAsString(message).getBytes(); + int packet_length = json_bytes.length; + if (packet_length < GossipManager.MAX_PACKET_SIZE) { + Response r = gossipCore.send(message, member.getUri()); + if (r instanceof ActiveGossipOk){ + //maybe count metrics here + } else { + LOGGER.warn("Message "+ message + " generated response "+ r); + } + } else { + GossipService.LOGGER.error("The length of the to be send message is too large (" + + packet_length + " > " + GossipManager.MAX_PACKET_SIZE + ")."); + } + } catch (IOException e1) { + GossipService.LOGGER.warn(e1); + } + } /** * Abstract method which should be implemented by a subclass. This method should return a member @@ -78,5 +196,24 @@ abstract public class ActiveGossipThread implements Runnable { * The list of members which are stored in the local list of members. * @return The chosen LocalGossipMember to gossip with. */ - abstract protected LocalGossipMember selectPartner(List memberList); + protected LocalGossipMember selectPartner(List memberList) { + LocalGossipMember member = null; + if (memberList.size() > 0) { + int randomNeighborIndex = random.nextInt(memberList.size()); + member = memberList.get(randomNeighborIndex); + } else { + GossipService.LOGGER.debug("I am alone in this world."); + + } + return member; + } + + private GossipMember convert(LocalGossipMember member){ + GossipMember gm = new GossipMember(); + gm.setCluster(member.getClusterName()); + gm.setHeartbeat(member.getHeartbeat()); + gm.setUri(member.getUri().toASCIIString()); + gm.setId(member.getId()); + return gm; + } } diff --git a/src/main/java/org/apache/gossip/manager/GossipCore.java b/src/main/java/org/apache/gossip/manager/GossipCore.java index ab24621..8bcba46 100644 --- a/src/main/java/org/apache/gossip/manager/GossipCore.java +++ b/src/main/java/org/apache/gossip/manager/GossipCore.java @@ -21,10 +21,12 @@ import org.apache.gossip.LocalGossipMember; import org.apache.gossip.RemoteGossipMember; import org.apache.gossip.model.ActiveGossipMessage; import org.apache.gossip.model.Base; +import org.apache.gossip.model.GossipDataMessage; import org.apache.gossip.model.Response; import org.apache.gossip.udp.Trackable; import org.apache.gossip.udp.UdpActiveGossipMessage; import org.apache.gossip.udp.UdpActiveGossipOk; +import org.apache.gossip.udp.UdpGossipDataMessage; import org.apache.gossip.udp.UdpNotAMemberFault; import org.apache.log4j.Logger; import org.codehaus.jackson.map.ObjectMapper; @@ -34,15 +36,28 @@ public class GossipCore { public static final Logger LOGGER = Logger.getLogger(GossipCore.class); private static final ObjectMapper MAPPER = new ObjectMapper(); private final GossipManager gossipManager; - private ConcurrentHashMap requests; - private ExecutorService service; + private final ConcurrentHashMap> perNodeData; public GossipCore(GossipManager manager){ this.gossipManager = manager; requests = new ConcurrentHashMap<>(); service = Executors.newFixedThreadPool(500); + perNodeData = new ConcurrentHashMap<>(); + } + + public void addPerNodeData(GossipDataMessage message){ + ConcurrentHashMap m = new ConcurrentHashMap<>(); + m.put(message.getKey(), message); + m = perNodeData.putIfAbsent(message.getNodeId(), m); + if (m != null){ + m.put(message.getKey(), message); //TODO only put if > ts + } + } + + public ConcurrentHashMap> getPerNodeData(){ + return perNodeData; } public void shutdown(){ @@ -55,12 +70,22 @@ public class GossipCore { } public void recieve(Base base){ + System.out.println(base); if (base instanceof Response){ if (base instanceof Trackable){ Trackable t = (Trackable) base; requests.put(t.getUuid() + "/" + t.getUriFrom(), (Base) t); } } + if (base instanceof GossipDataMessage) { + UdpGossipDataMessage message = (UdpGossipDataMessage) base; + addPerNodeData(message); + /* + UdpActiveGossipOk o = new UdpActiveGossipOk(); + o.setUriFrom(message.getUriFrom()); + o.setUuid(message.getUuid()); + sendOneWay(o, senderMember.getUri());*/ + } if (base instanceof ActiveGossipMessage){ List remoteGossipMembers = new ArrayList<>(); RemoteGossipMember senderMember = null; diff --git a/src/main/java/org/apache/gossip/manager/GossipManager.java b/src/main/java/org/apache/gossip/manager/GossipManager.java index 79be431..36bc10a 100644 --- a/src/main/java/org/apache/gossip/manager/GossipManager.java +++ b/src/main/java/org/apache/gossip/manager/GossipManager.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map.Entry; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -40,7 +41,7 @@ import org.apache.gossip.LocalGossipMember; import org.apache.gossip.event.GossipListener; import org.apache.gossip.event.GossipState; import org.apache.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread; -import org.apache.gossip.manager.random.RandomActiveGossipThread; +import org.apache.gossip.model.GossipDataMessage; public abstract class GossipManager extends Thread implements NotificationListener { @@ -187,8 +188,8 @@ public abstract class GossipManager extends Thread implements NotificationListen } passiveGossipThread = new OnlyProcessReceivedPassiveGossipThread(this, gossipCore); gossipThreadExecutor.execute(passiveGossipThread); - activeGossipThread = new RandomActiveGossipThread(this, this.gossipCore); - gossipThreadExecutor.execute(activeGossipThread); + activeGossipThread = new ActiveGossipThread(this, this.gossipCore); + activeGossipThread.init(); GossipService.LOGGER.debug("The GossipService is started."); while (gossipServiceRunning.get()) { try { @@ -222,4 +223,20 @@ public abstract class GossipManager extends Thread implements NotificationListen LOGGER.error(e); } } + + public void gossipData(GossipDataMessage message){ + message.setNodeId(me.getId()); + gossipCore.addPerNodeData(message); + System.out.println(this.getMyself() + " " + gossipCore.getPerNodeData()); + } + + public GossipDataMessage findGossipData(String nodeId, String key){ + ConcurrentHashMap j = gossipCore.getPerNodeData().get(nodeId); + if (j == null){ + return null; + } else { + return j.get(key); + } + } + } diff --git a/src/main/java/org/apache/gossip/manager/random/RandomActiveGossipThread.java b/src/main/java/org/apache/gossip/manager/random/RandomActiveGossipThread.java deleted file mode 100644 index 03d550c..0000000 --- a/src/main/java/org/apache/gossip/manager/random/RandomActiveGossipThread.java +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gossip.manager.random; - -import java.io.IOException; -import java.net.DatagramSocket; -import java.net.InetAddress; -import java.nio.ByteBuffer; -import java.util.List; -import java.util.Random; -import java.util.UUID; - -import org.apache.gossip.GossipService; -import org.apache.gossip.LocalGossipMember; -import org.apache.gossip.manager.ActiveGossipThread; -import org.apache.gossip.manager.GossipCore; -import org.apache.gossip.manager.GossipManager; -import org.apache.gossip.model.ActiveGossipOk; -import org.apache.gossip.model.GossipMember; -import org.apache.gossip.model.Response; -import org.apache.gossip.udp.UdpActiveGossipMessage; -import org.apache.log4j.Logger; -import org.codehaus.jackson.map.ObjectMapper; - -public class RandomActiveGossipThread extends ActiveGossipThread { - - public static final Logger LOGGER = Logger.getLogger(RandomActiveGossipThread.class); - protected ObjectMapper MAPPER = new ObjectMapper(); - - /** The Random used for choosing a member to gossip with. */ - private final Random random; - private final GossipCore gossipCore; - - public RandomActiveGossipThread(GossipManager gossipManager, GossipCore gossipCore) { - super(gossipManager); - random = new Random(); - this.gossipCore = gossipCore; - } - - /** - * [The selectToSend() function.] Find a random peer from the local membership list. In the case - * where this client is the only member in the list, this method will return null. - * - * @return Member random member if list is greater than 1, null otherwise - */ - protected LocalGossipMember selectPartner(List memberList) { - LocalGossipMember member = null; - if (memberList.size() > 0) { - int randomNeighborIndex = random.nextInt(memberList.size()); - member = memberList.get(randomNeighborIndex); - } else { - GossipService.LOGGER.debug("I am alone in this world."); - - } - return member; - } - - protected void sendMembershipList(LocalGossipMember me, List memberList) { - - me.setHeartbeat(System.currentTimeMillis()); - LocalGossipMember member = selectPartner(memberList); - if (member == null) { - GossipService.LOGGER.debug("Send sendMembershipList() is called without action"); - return; - } else { - GossipService.LOGGER.debug("Send sendMembershipList() is called to " + member.toString()); - } - - try (DatagramSocket socket = new DatagramSocket()) { - socket.setSoTimeout(gossipManager.getSettings().getGossipInterval()); - UdpActiveGossipMessage message = new UdpActiveGossipMessage(); - message.setUriFrom(gossipManager.getMyself().getUri().toASCIIString()); - message.setUuid(UUID.randomUUID().toString()); - message.getMembers().add(convert(me)); - for (LocalGossipMember other : memberList) { - message.getMembers().add(convert(other)); - } - byte[] json_bytes = MAPPER.writeValueAsString(message).getBytes(); - int packet_length = json_bytes.length; - if (packet_length < GossipManager.MAX_PACKET_SIZE) { - Response r = gossipCore.send(message, member.getUri()); - if (r instanceof ActiveGossipOk){ - //maybe count metrics here - } else { - LOGGER.warn("Message "+ message + " generated response "+ r); - } - } else { - GossipService.LOGGER.error("The length of the to be send message is too large (" - + packet_length + " > " + GossipManager.MAX_PACKET_SIZE + ")."); - } - } catch (IOException e1) { - GossipService.LOGGER.warn(e1); - } - } - - private GossipMember convert(LocalGossipMember member){ - GossipMember gm = new GossipMember(); - gm.setCluster(member.getClusterName()); - gm.setHeartbeat(member.getHeartbeat()); - gm.setUri(member.getUri().toASCIIString()); - gm.setId(member.getId()); - return gm; - } - -} diff --git a/src/main/java/org/apache/gossip/model/Base.java b/src/main/java/org/apache/gossip/model/Base.java index ebb3215..66c2be6 100644 --- a/src/main/java/org/apache/gossip/model/Base.java +++ b/src/main/java/org/apache/gossip/model/Base.java @@ -2,6 +2,7 @@ package org.apache.gossip.model; import org.apache.gossip.udp.UdpActiveGossipMessage; import org.apache.gossip.udp.UdpActiveGossipOk; +import org.apache.gossip.udp.UdpGossipDataMessage; import org.apache.gossip.udp.UdpNotAMemberFault; import org.codehaus.jackson.annotate.JsonSubTypes; import org.codehaus.jackson.annotate.JsonSubTypes.Type; @@ -17,7 +18,9 @@ import org.codehaus.jackson.annotate.JsonTypeInfo; @Type(value = ActiveGossipOk.class, name = "ActiveGossipOk"), @Type(value = UdpActiveGossipOk.class, name = "UdpActiveGossipOk"), @Type(value = UdpActiveGossipMessage.class, name = "UdpActiveGossipMessage"), - @Type(value = UdpNotAMemberFault.class, name = "UdpNotAMemberFault") + @Type(value = UdpNotAMemberFault.class, name = "UdpNotAMemberFault"), + @Type(value = GossipDataMessage.class, name = "GossipDataMessage"), + @Type(value = UdpGossipDataMessage.class, name = "UdpGossipDataMessage") }) public class Base { diff --git a/src/main/java/org/apache/gossip/model/GossipDataMessage.java b/src/main/java/org/apache/gossip/model/GossipDataMessage.java new file mode 100644 index 0000000..2128dfe --- /dev/null +++ b/src/main/java/org/apache/gossip/model/GossipDataMessage.java @@ -0,0 +1,50 @@ +package org.apache.gossip.model; + +public class GossipDataMessage extends Base { + + + private String nodeId; + private String key; + private Object payload; + private Long timestamp; + private Long expireAt; + + public String getNodeId() { + return nodeId; + } + public void setNodeId(String nodeId) { + this.nodeId = nodeId; + } + public String getKey() { + return key; + } + public void setKey(String key) { + this.key = key; + } + public Object getPayload() { + return payload; + } + public void setPayload(Object payload) { + this.payload = payload; + } + public Long getTimestamp() { + return timestamp; + } + public void setTimestamp(Long timestamp) { + this.timestamp = timestamp; + } + public Long getExpireAt() { + return expireAt; + } + public void setExpireAt(Long expireAt) { + this.expireAt = expireAt; + } + @Override + public String toString() { + return "GossipDataMessage [nodeId=" + nodeId + ", key=" + key + ", payload=" + payload + + ", timestamp=" + timestamp + ", expireAt=" + expireAt + "]"; + } + + + +} diff --git a/src/main/java/org/apache/gossip/udp/UdpGossipDataMessage.java b/src/main/java/org/apache/gossip/udp/UdpGossipDataMessage.java new file mode 100644 index 0000000..2ee4bbf --- /dev/null +++ b/src/main/java/org/apache/gossip/udp/UdpGossipDataMessage.java @@ -0,0 +1,31 @@ +package org.apache.gossip.udp; + +import org.apache.gossip.model.GossipDataMessage; + +public class UdpGossipDataMessage extends GossipDataMessage implements Trackable { + + private String uriFrom; + private String uuid; + + public String getUriFrom() { + return uriFrom; + } + + public void setUriFrom(String uriFrom) { + this.uriFrom = uriFrom; + } + + public String getUuid() { + return uuid; + } + + public void setUuid(String uuid) { + this.uuid = uuid; + } + + @Override + public String toString() { + return "UdpGossipDataMessage [uriFrom=" + uriFrom + ", uuid=" + uuid + "]"; + } + +} diff --git a/src/test/java/org/apache/gossip/DataTest.java b/src/test/java/org/apache/gossip/DataTest.java new file mode 100644 index 0000000..3f42eeb --- /dev/null +++ b/src/test/java/org/apache/gossip/DataTest.java @@ -0,0 +1,81 @@ +package org.apache.gossip; + +import java.net.URI; +import java.net.URISyntaxException; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + +import org.apache.gossip.event.GossipListener; +import org.apache.gossip.event.GossipState; +import org.apache.gossip.model.GossipDataMessage; +import org.junit.Test; + +import io.teknek.tunit.TUnit; + +public class DataTest { + + @Test + public void abc() throws InterruptedException, UnknownHostException, URISyntaxException{ + GossipSettings settings = new GossipSettings(); + String cluster = UUID.randomUUID().toString(); + int seedNodes = 1; + List startupMembers = new ArrayList<>(); + for (int i = 1; i < seedNodes+1; ++i) { + URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i)); + startupMembers.add(new RemoteGossipMember(cluster, uri, i + "")); + } + final List clients = new ArrayList<>(); + final int clusterMembers = 2; + for (int i = 1; i < clusterMembers+1; ++i) { + URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i)); + GossipService gossipService = new GossipService(cluster, uri, i + "", + startupMembers, settings, + new GossipListener(){ + public void gossipEvent(GossipMember member, GossipState state) { + System.out.println(member + " " + state); + } + }); + clients.add(gossipService); + gossipService.start(); + } + TUnit.assertThat(new Callable (){ + public Integer call() throws Exception { + int total = 0; + for (int i = 0; i < clusterMembers; ++i) { + total += clients.get(i).get_gossipManager().getLiveMembers().size(); + } + return total; + }}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(2); + clients.get(0).gossipData(msg()); + Thread.sleep(10000); + TUnit.assertThat( + + new Callable (){ + public Object call() throws Exception { + GossipDataMessage x = clients.get(1).findGossipData(1+"" , "a"); + if (x == null) return ""; + else return x.getPayload(); + }}) + + + //() -> clients.get(1).findGossipData(1+"" , "a").getPayload()) + .afterWaitingAtMost(20, TimeUnit.SECONDS) + .isEqualTo("b"); + for (int i = 0; i < clusterMembers; ++i) { + clients.get(i).shutdown(); + } + } + + private GossipDataMessage msg(){ + GossipDataMessage g = new GossipDataMessage(); + g.setExpireAt(Long.MAX_VALUE); + g.setKey("a"); + g.setPayload("b"); + g.setTimestamp(System.currentTimeMillis()); + return g; + } +}