From 201b101a91cf02d4ef2b0d9536cf0ceda99f6115 Mon Sep 17 00:00:00 2001 From: Edward Capriolo Date: Fri, 7 Oct 2016 03:04:59 -0400 Subject: [PATCH] GOSSIP-26 Gossip shared data --- .../java/org/apache/gossip/GossipService.java | 21 ++++++++- .../gossip/manager/ActiveGossipThread.java | 42 +++++++++++++++-- .../org/apache/gossip/manager/DataReaper.java | 14 +++++- .../org/apache/gossip/manager/GossipCore.java | 25 +++++++++- .../apache/gossip/manager/GossipManager.java | 23 ++++++++- .../java/org/apache/gossip/model/Base.java | 5 +- .../gossip/model/SharedGossipDataMessage.java | 47 +++++++++++++++++++ .../udp/UdpSharedGossipDataMessage.java | 31 ++++++++++++ src/test/java/org/apache/gossip/DataTest.java | 47 ++++++++++++++----- .../apache/gossip/manager/DataReaperTest.java | 25 ++++++++-- 10 files changed, 252 insertions(+), 28 deletions(-) create mode 100644 src/main/java/org/apache/gossip/model/SharedGossipDataMessage.java create mode 100644 src/main/java/org/apache/gossip/udp/UdpSharedGossipDataMessage.java diff --git a/src/main/java/org/apache/gossip/GossipService.java b/src/main/java/org/apache/gossip/GossipService.java index ab0da97..e50f260 100644 --- a/src/main/java/org/apache/gossip/GossipService.java +++ b/src/main/java/org/apache/gossip/GossipService.java @@ -24,7 +24,8 @@ import java.util.List; import org.apache.gossip.event.GossipListener; import org.apache.gossip.manager.GossipManager; import org.apache.gossip.manager.random.RandomGossipManager; -import org.apache.gossip.model.GossipDataMessage; +import org.apache.gossip.model.GossipDataMessage; +import org.apache.gossip.model.SharedGossipDataMessage; import org.apache.log4j.Logger; /** @@ -97,7 +98,23 @@ public class GossipService { * @return return the value if found or null if not found or expired */ public GossipDataMessage findPerNodeData(String nodeId, String key){ - return getGossipManager().findGossipData(nodeId, key); + return getGossipManager().findPerNodeGossipData(nodeId, key); } + /** + * Gossip shared data + * @param message + */ + public void gossipSharedData(SharedGossipDataMessage message){ + gossipManager.gossipSharedData(message); + } + + /** + * + * @param key the key to search for + * @return + */ + public SharedGossipDataMessage findSharedData(String key){ + return getGossipManager().findSharedGossipData(key); + } } diff --git a/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java b/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java index 19caffe..28de244 100644 --- a/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java +++ b/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java @@ -34,9 +34,10 @@ import org.apache.gossip.model.ActiveGossipOk; import org.apache.gossip.model.GossipDataMessage; import org.apache.gossip.model.GossipMember; import org.apache.gossip.model.Response; +import org.apache.gossip.model.SharedGossipDataMessage; import org.apache.gossip.udp.UdpActiveGossipMessage; import org.apache.gossip.udp.UdpGossipDataMessage; - +import org.apache.gossip.udp.UdpSharedGossipDataMessage; import org.apache.log4j.Logger; import org.codehaus.jackson.map.ObjectMapper; @@ -70,7 +71,10 @@ public class ActiveGossipThread { () -> sendMembershipList(gossipManager.getMyself(), gossipManager.getDeadMembers()), 0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS); scheduledExecutorService.scheduleAtFixedRate( - () -> sendData(gossipManager.getMyself(), gossipManager.getLiveMembers()), 0, + () -> sendPerNodeData(gossipManager.getMyself(), gossipManager.getLiveMembers()), 0, + gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS); + scheduledExecutorService.scheduleAtFixedRate( + () -> sendSharedData(gossipManager.getMyself(), gossipManager.getLiveMembers()), 0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS); } @@ -83,7 +87,39 @@ public class ActiveGossipThread { } } - public void sendData(LocalGossipMember me, List memberList){ + public void sendSharedData(LocalGossipMember me, List memberList){ + LocalGossipMember member = selectPartner(memberList); + if (member == null) { + LOGGER.debug("Send sendMembershipList() is called without action"); + return; + } + try (DatagramSocket socket = new DatagramSocket()) { + socket.setSoTimeout(gossipManager.getSettings().getGossipInterval()); + for (Entry innerEntry : this.gossipCore.getSharedData().entrySet()){ + UdpSharedGossipDataMessage message = new UdpSharedGossipDataMessage(); + message.setUuid(UUID.randomUUID().toString()); + message.setUriFrom(me.getId()); + message.setExpireAt(innerEntry.getValue().getExpireAt()); + message.setKey(innerEntry.getValue().getKey()); + message.setNodeId(innerEntry.getValue().getNodeId()); + message.setTimestamp(innerEntry.getValue().getTimestamp()); + message.setPayload(innerEntry.getValue().getPayload()); + message.setTimestamp(innerEntry.getValue().getTimestamp()); + byte[] json_bytes = MAPPER.writeValueAsString(message).getBytes(); + int packet_length = json_bytes.length; + if (packet_length < GossipManager.MAX_PACKET_SIZE) { + gossipCore.sendOneWay(message, member.getUri()); + } else { + LOGGER.error("The length of the to be send message is too large (" + + packet_length + " > " + GossipManager.MAX_PACKET_SIZE + ")."); + } + } + } catch (IOException e1) { + LOGGER.warn(e1); + } + } + + public void sendPerNodeData(LocalGossipMember me, List memberList){ LocalGossipMember member = selectPartner(memberList); if (member == null) { LOGGER.debug("Send sendMembershipList() is called without action"); diff --git a/src/main/java/org/apache/gossip/manager/DataReaper.java b/src/main/java/org/apache/gossip/manager/DataReaper.java index 237ffb6..4f4616b 100644 --- a/src/main/java/org/apache/gossip/manager/DataReaper.java +++ b/src/main/java/org/apache/gossip/manager/DataReaper.java @@ -7,6 +7,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.apache.gossip.model.GossipDataMessage; +import org.apache.gossip.model.SharedGossipDataMessage; /** * We wish to periodically sweep user data and remove entries past their timestamp. This @@ -28,12 +29,21 @@ public class DataReaper { public void init(){ Runnable reapPerNodeData = () -> { - runOnce(); + runPerNodeOnce(); + runSharedOnce(); }; scheduledExecutor.scheduleAtFixedRate(reapPerNodeData, 0, 5, TimeUnit.SECONDS); } - void runOnce(){ + void runSharedOnce(){ + for (Entry entry : gossipCore.getSharedData().entrySet()){ + if (entry.getValue().getExpireAt() < clock.currentTimeMillis()){ + gossipCore.getSharedData().remove(entry.getKey(), entry.getValue()); + } + } + } + + void runPerNodeOnce(){ for (Entry> node : gossipCore.getPerNodeData().entrySet()){ reapData(node.getValue()); } diff --git a/src/main/java/org/apache/gossip/manager/GossipCore.java b/src/main/java/org/apache/gossip/manager/GossipCore.java index 08ec5b4..6dc4a5c 100644 --- a/src/main/java/org/apache/gossip/manager/GossipCore.java +++ b/src/main/java/org/apache/gossip/manager/GossipCore.java @@ -23,11 +23,13 @@ import org.apache.gossip.model.ActiveGossipMessage; import org.apache.gossip.model.Base; import org.apache.gossip.model.GossipDataMessage; import org.apache.gossip.model.Response; +import org.apache.gossip.model.SharedGossipDataMessage; import org.apache.gossip.udp.Trackable; import org.apache.gossip.udp.UdpActiveGossipMessage; import org.apache.gossip.udp.UdpActiveGossipOk; import org.apache.gossip.udp.UdpGossipDataMessage; import org.apache.gossip.udp.UdpNotAMemberFault; +import org.apache.gossip.udp.UdpSharedGossipDataMessage; import org.apache.log4j.Logger; import org.codehaus.jackson.map.ObjectMapper; @@ -39,24 +41,35 @@ public class GossipCore { private ConcurrentHashMap requests; private ExecutorService service; private final ConcurrentHashMap> perNodeData; + private final ConcurrentHashMap sharedData; public GossipCore(GossipManager manager){ this.gossipManager = manager; requests = new ConcurrentHashMap<>(); service = Executors.newFixedThreadPool(500); perNodeData = new ConcurrentHashMap<>(); + sharedData = new ConcurrentHashMap<>(); } + public void addSharedData(SharedGossipDataMessage message){ + SharedGossipDataMessage previous = sharedData.get(message.getKey()); + if (previous == null){ + sharedData.putIfAbsent(message.getKey(), message); + } else { + if (previous.getTimestamp() < message.getTimestamp()){ + sharedData.replace(message.getKey(), previous, message); + } + } + } public void addPerNodeData(GossipDataMessage message){ ConcurrentHashMap nodeMap = new ConcurrentHashMap<>(); nodeMap.put(message.getKey(), message); nodeMap = perNodeData.putIfAbsent(message.getNodeId(), nodeMap); if (nodeMap != null){ - //m.put(message.getKey(), message); //TODO only put if > ts GossipDataMessage current = nodeMap.get(message.getKey()); if (current == null){ - nodeMap.replace(message.getKey(), null, message); + nodeMap.putIfAbsent(message.getKey(), message); } else { if (current.getTimestamp() < message.getTimestamp()){ nodeMap.replace(message.getKey(), current, message); @@ -69,6 +82,10 @@ public class GossipCore { return perNodeData; } + public ConcurrentHashMap getSharedData() { + return sharedData; + } + public void shutdown(){ service.shutdown(); try { @@ -89,6 +106,10 @@ public class GossipCore { UdpGossipDataMessage message = (UdpGossipDataMessage) base; addPerNodeData(message); } + if (base instanceof SharedGossipDataMessage){ + UdpSharedGossipDataMessage message = (UdpSharedGossipDataMessage) base; + addSharedData(message); + } if (base instanceof ActiveGossipMessage){ List remoteGossipMembers = new ArrayList<>(); RemoteGossipMember senderMember = null; diff --git a/src/main/java/org/apache/gossip/manager/GossipManager.java b/src/main/java/org/apache/gossip/manager/GossipManager.java index 3c66208..9f75fe3 100644 --- a/src/main/java/org/apache/gossip/manager/GossipManager.java +++ b/src/main/java/org/apache/gossip/manager/GossipManager.java @@ -44,6 +44,7 @@ import org.apache.gossip.event.GossipState; import org.apache.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread; import org.apache.gossip.model.GossipDataMessage; +import org.apache.gossip.model.SharedGossipDataMessage; public abstract class GossipManager implements NotificationListener { @@ -235,7 +236,15 @@ public abstract class GossipManager implements NotificationListener { gossipCore.addPerNodeData(message); } - public GossipDataMessage findGossipData(String nodeId, String key){ + public void gossipSharedData(SharedGossipDataMessage message){ + Objects.nonNull(message.getKey()); + Objects.nonNull(message.getTimestamp()); + Objects.nonNull(message.getPayload()); + message.setNodeId(me.getId()); + gossipCore.addSharedData(message); + } + + public GossipDataMessage findPerNodeGossipData(String nodeId, String key){ ConcurrentHashMap j = gossipCore.getPerNodeData().get(nodeId); if (j == null){ return null; @@ -250,6 +259,18 @@ public abstract class GossipManager implements NotificationListener { return l; } } + + public SharedGossipDataMessage findSharedGossipData(String key){ + SharedGossipDataMessage l = gossipCore.getSharedData().get(key); + if (l == null){ + return null; + } + if (l.getExpireAt() < clock.currentTimeMillis()){ + return null; + } else { + return l; + } + } public DataReaper getDataReaper() { return dataReaper; diff --git a/src/main/java/org/apache/gossip/model/Base.java b/src/main/java/org/apache/gossip/model/Base.java index 66c2be6..2bbb7af 100644 --- a/src/main/java/org/apache/gossip/model/Base.java +++ b/src/main/java/org/apache/gossip/model/Base.java @@ -4,6 +4,7 @@ import org.apache.gossip.udp.UdpActiveGossipMessage; import org.apache.gossip.udp.UdpActiveGossipOk; import org.apache.gossip.udp.UdpGossipDataMessage; import org.apache.gossip.udp.UdpNotAMemberFault; +import org.apache.gossip.udp.UdpSharedGossipDataMessage; import org.codehaus.jackson.annotate.JsonSubTypes; import org.codehaus.jackson.annotate.JsonSubTypes.Type; import org.codehaus.jackson.annotate.JsonTypeInfo; @@ -20,7 +21,9 @@ import org.codehaus.jackson.annotate.JsonTypeInfo; @Type(value = UdpActiveGossipMessage.class, name = "UdpActiveGossipMessage"), @Type(value = UdpNotAMemberFault.class, name = "UdpNotAMemberFault"), @Type(value = GossipDataMessage.class, name = "GossipDataMessage"), - @Type(value = UdpGossipDataMessage.class, name = "UdpGossipDataMessage") + @Type(value = UdpGossipDataMessage.class, name = "UdpGossipDataMessage"), + @Type(value = SharedGossipDataMessage.class, name = "SharedGossipDataMessage"), + @Type(value = UdpSharedGossipDataMessage.class, name = "UdpSharedGossipDataMessage") }) public class Base { diff --git a/src/main/java/org/apache/gossip/model/SharedGossipDataMessage.java b/src/main/java/org/apache/gossip/model/SharedGossipDataMessage.java new file mode 100644 index 0000000..bac9ddf --- /dev/null +++ b/src/main/java/org/apache/gossip/model/SharedGossipDataMessage.java @@ -0,0 +1,47 @@ +package org.apache.gossip.model; + +public class SharedGossipDataMessage extends Base { + + private String nodeId; + private String key; + private Object payload; + private Long timestamp; + private Long expireAt; + + public String getNodeId() { + return nodeId; + } + public void setNodeId(String nodeId) { + this.nodeId = nodeId; + } + public String getKey() { + return key; + } + public void setKey(String key) { + this.key = key; + } + public Object getPayload() { + return payload; + } + public void setPayload(Object payload) { + this.payload = payload; + } + public Long getTimestamp() { + return timestamp; + } + public void setTimestamp(Long timestamp) { + this.timestamp = timestamp; + } + public Long getExpireAt() { + return expireAt; + } + public void setExpireAt(Long expireAt) { + this.expireAt = expireAt; + } + @Override + public String toString() { + return "SharedGossipDataMessage [nodeId=" + nodeId + ", key=" + key + ", payload=" + payload + + ", timestamp=" + timestamp + ", expireAt=" + expireAt + "]"; + } +} + diff --git a/src/main/java/org/apache/gossip/udp/UdpSharedGossipDataMessage.java b/src/main/java/org/apache/gossip/udp/UdpSharedGossipDataMessage.java new file mode 100644 index 0000000..cb99759 --- /dev/null +++ b/src/main/java/org/apache/gossip/udp/UdpSharedGossipDataMessage.java @@ -0,0 +1,31 @@ +package org.apache.gossip.udp; + +import org.apache.gossip.model.SharedGossipDataMessage; + +public class UdpSharedGossipDataMessage extends SharedGossipDataMessage implements Trackable { + + private String uriFrom; + private String uuid; + + public String getUriFrom() { + return uriFrom; + } + + public void setUriFrom(String uriFrom) { + this.uriFrom = uriFrom; + } + + public String getUuid() { + return uuid; + } + + public void setUuid(String uuid) { + this.uuid = uuid; + } + + @Override + public String toString() { + return "UdpSharedGossipDataMessage [uriFrom=" + uriFrom + ", uuid=" + uuid + "]"; + } + +} diff --git a/src/test/java/org/apache/gossip/DataTest.java b/src/test/java/org/apache/gossip/DataTest.java index 02c89a8..4909bf8 100644 --- a/src/test/java/org/apache/gossip/DataTest.java +++ b/src/test/java/org/apache/gossip/DataTest.java @@ -12,6 +12,7 @@ import java.util.concurrent.TimeUnit; import org.apache.gossip.event.GossipListener; import org.apache.gossip.event.GossipState; import org.apache.gossip.model.GossipDataMessage; +import org.apache.gossip.model.SharedGossipDataMessage; import org.junit.Test; import io.teknek.tunit.TUnit; @@ -19,7 +20,7 @@ import io.teknek.tunit.TUnit; public class DataTest { @Test - public void abc() throws InterruptedException, UnknownHostException, URISyntaxException{ + public void dataTest() throws InterruptedException, UnknownHostException, URISyntaxException{ GossipSettings settings = new GossipSettings(); String cluster = UUID.randomUUID().toString(); int seedNodes = 1; @@ -51,20 +52,32 @@ public class DataTest { return total; }}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(2); clients.get(0).gossipPerNodeData(msg()); + clients.get(0).gossipSharedData(sharedMsg()); Thread.sleep(10000); TUnit.assertThat( - - new Callable (){ + new Callable() { public Object call() throws Exception { - GossipDataMessage x = clients.get(1).findPerNodeData(1+"" , "a"); - if (x == null) return ""; - else return x.getPayload(); - }}) - - - //() -> clients.get(1).findGossipData(1+"" , "a").getPayload()) - .afterWaitingAtMost(20, TimeUnit.SECONDS) - .isEqualTo("b"); + GossipDataMessage x = clients.get(1).findPerNodeData(1 + "", "a"); + if (x == null) + return ""; + else + return x.getPayload(); + } + }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("b"); + + + TUnit.assertThat( + new Callable() { + public Object call() throws Exception { + SharedGossipDataMessage x = clients.get(1).findSharedData("a"); + if (x == null) + return ""; + else + return x.getPayload(); + } + }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("c"); + + for (int i = 0; i < clusterMembers; ++i) { clients.get(i).shutdown(); } @@ -78,4 +91,14 @@ public class DataTest { g.setTimestamp(System.currentTimeMillis()); return g; } + + private SharedGossipDataMessage sharedMsg(){ + SharedGossipDataMessage g = new SharedGossipDataMessage(); + g.setExpireAt(Long.MAX_VALUE); + g.setKey("a"); + g.setPayload("c"); + g.setTimestamp(System.currentTimeMillis()); + return g; + } + } diff --git a/src/test/java/org/apache/gossip/manager/DataReaperTest.java b/src/test/java/org/apache/gossip/manager/DataReaperTest.java index 4cd5dfe..d0164b1 100644 --- a/src/test/java/org/apache/gossip/manager/DataReaperTest.java +++ b/src/test/java/org/apache/gossip/manager/DataReaperTest.java @@ -5,6 +5,7 @@ import java.net.URI; import org.apache.gossip.GossipSettings; import org.apache.gossip.manager.random.RandomGossipManager; import org.apache.gossip.model.GossipDataMessage; +import org.apache.gossip.model.SharedGossipDataMessage; import org.junit.Assert; import org.junit.Test; @@ -21,9 +22,13 @@ public class DataReaperTest { GossipManager gm = RandomGossipManager.newBuilder().cluster("abc").settings(settings) .withId(myId).uri(URI.create("udp://localhost:5000")).build(); gm.gossipPerNodeData(perNodeDatum(key, value)); - Assert.assertEquals(value, gm.findGossipData(myId, key).getPayload()); - gm.getDataReaper().runOnce(); - TUnit.assertThat(() -> gm.findGossipData(myId, key)).equals(null); + gm.gossipSharedData(sharedDatum(key, value)); + Assert.assertEquals(value, gm.findPerNodeGossipData(myId, key).getPayload()); + Assert.assertEquals(value, gm.findSharedGossipData(key).getPayload()); + gm.getDataReaper().runPerNodeOnce(); + gm.getDataReaper().runSharedOnce(); + TUnit.assertThat(() -> gm.findPerNodeGossipData(myId, key)).equals(null); + TUnit.assertThat(() -> gm.findSharedGossipData(key)).equals(null); } private GossipDataMessage perNodeDatum(String key, String value) { @@ -34,6 +39,16 @@ public class DataReaperTest { m.setTimestamp(System.currentTimeMillis()); return m; } + + private SharedGossipDataMessage sharedDatum(String key, String value) { + SharedGossipDataMessage m = new SharedGossipDataMessage(); + m.setExpireAt(System.currentTimeMillis() + 5L); + m.setKey(key); + m.setPayload(value); + m.setTimestamp(System.currentTimeMillis()); + return m; + } + @Test public void testHigherTimestampWins() { @@ -47,9 +62,9 @@ public class DataReaperTest { GossipDataMessage after = perNodeDatum(key, "b"); after.setTimestamp(after.getTimestamp() - 1); gm.gossipPerNodeData(before); - Assert.assertEquals(value, gm.findGossipData(myId, key).getPayload()); + Assert.assertEquals(value, gm.findPerNodeGossipData(myId, key).getPayload()); gm.gossipPerNodeData(after); - Assert.assertEquals(value, gm.findGossipData(myId, key).getPayload()); + Assert.assertEquals(value, gm.findPerNodeGossipData(myId, key).getPayload()); } }