GOSSIP-26 Gossip shared data

This commit is contained in:
Edward Capriolo
2016-10-07 03:04:59 -04:00
parent f35dddd8f2
commit 201b101a91
10 changed files with 252 additions and 28 deletions

View File

@ -24,7 +24,8 @@ import java.util.List;
import org.apache.gossip.event.GossipListener;
import org.apache.gossip.manager.GossipManager;
import org.apache.gossip.manager.random.RandomGossipManager;
import org.apache.gossip.model.GossipDataMessage;
import org.apache.gossip.model.GossipDataMessage;
import org.apache.gossip.model.SharedGossipDataMessage;
import org.apache.log4j.Logger;
/**
@ -97,7 +98,23 @@ public class GossipService {
* @return return the value if found or null if not found or expired
*/
public GossipDataMessage findPerNodeData(String nodeId, String key){
return getGossipManager().findGossipData(nodeId, key);
return getGossipManager().findPerNodeGossipData(nodeId, key);
}
/**
* Gossip shared data
* @param message
*/
public void gossipSharedData(SharedGossipDataMessage message){
gossipManager.gossipSharedData(message);
}
/**
*
* @param key the key to search for
* @return
*/
public SharedGossipDataMessage findSharedData(String key){
return getGossipManager().findSharedGossipData(key);
}
}

View File

@ -34,9 +34,10 @@ import org.apache.gossip.model.ActiveGossipOk;
import org.apache.gossip.model.GossipDataMessage;
import org.apache.gossip.model.GossipMember;
import org.apache.gossip.model.Response;
import org.apache.gossip.model.SharedGossipDataMessage;
import org.apache.gossip.udp.UdpActiveGossipMessage;
import org.apache.gossip.udp.UdpGossipDataMessage;
import org.apache.gossip.udp.UdpSharedGossipDataMessage;
import org.apache.log4j.Logger;
import org.codehaus.jackson.map.ObjectMapper;
@ -70,7 +71,10 @@ public class ActiveGossipThread {
() -> sendMembershipList(gossipManager.getMyself(), gossipManager.getDeadMembers()), 0,
gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
scheduledExecutorService.scheduleAtFixedRate(
() -> sendData(gossipManager.getMyself(), gossipManager.getLiveMembers()), 0,
() -> sendPerNodeData(gossipManager.getMyself(), gossipManager.getLiveMembers()), 0,
gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
scheduledExecutorService.scheduleAtFixedRate(
() -> sendSharedData(gossipManager.getMyself(), gossipManager.getLiveMembers()), 0,
gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
}
@ -83,7 +87,39 @@ public class ActiveGossipThread {
}
}
public void sendData(LocalGossipMember me, List<LocalGossipMember> memberList){
public void sendSharedData(LocalGossipMember me, List<LocalGossipMember> memberList){
LocalGossipMember member = selectPartner(memberList);
if (member == null) {
LOGGER.debug("Send sendMembershipList() is called without action");
return;
}
try (DatagramSocket socket = new DatagramSocket()) {
socket.setSoTimeout(gossipManager.getSettings().getGossipInterval());
for (Entry<String, SharedGossipDataMessage> innerEntry : this.gossipCore.getSharedData().entrySet()){
UdpSharedGossipDataMessage message = new UdpSharedGossipDataMessage();
message.setUuid(UUID.randomUUID().toString());
message.setUriFrom(me.getId());
message.setExpireAt(innerEntry.getValue().getExpireAt());
message.setKey(innerEntry.getValue().getKey());
message.setNodeId(innerEntry.getValue().getNodeId());
message.setTimestamp(innerEntry.getValue().getTimestamp());
message.setPayload(innerEntry.getValue().getPayload());
message.setTimestamp(innerEntry.getValue().getTimestamp());
byte[] json_bytes = MAPPER.writeValueAsString(message).getBytes();
int packet_length = json_bytes.length;
if (packet_length < GossipManager.MAX_PACKET_SIZE) {
gossipCore.sendOneWay(message, member.getUri());
} else {
LOGGER.error("The length of the to be send message is too large ("
+ packet_length + " > " + GossipManager.MAX_PACKET_SIZE + ").");
}
}
} catch (IOException e1) {
LOGGER.warn(e1);
}
}
public void sendPerNodeData(LocalGossipMember me, List<LocalGossipMember> memberList){
LocalGossipMember member = selectPartner(memberList);
if (member == null) {
LOGGER.debug("Send sendMembershipList() is called without action");

View File

@ -7,6 +7,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.gossip.model.GossipDataMessage;
import org.apache.gossip.model.SharedGossipDataMessage;
/**
* We wish to periodically sweep user data and remove entries past their timestamp. This
@ -28,12 +29,21 @@ public class DataReaper {
public void init(){
Runnable reapPerNodeData = () -> {
runOnce();
runPerNodeOnce();
runSharedOnce();
};
scheduledExecutor.scheduleAtFixedRate(reapPerNodeData, 0, 5, TimeUnit.SECONDS);
}
void runOnce(){
void runSharedOnce(){
for (Entry<String, SharedGossipDataMessage> entry : gossipCore.getSharedData().entrySet()){
if (entry.getValue().getExpireAt() < clock.currentTimeMillis()){
gossipCore.getSharedData().remove(entry.getKey(), entry.getValue());
}
}
}
void runPerNodeOnce(){
for (Entry<String, ConcurrentHashMap<String, GossipDataMessage>> node : gossipCore.getPerNodeData().entrySet()){
reapData(node.getValue());
}

View File

@ -23,11 +23,13 @@ import org.apache.gossip.model.ActiveGossipMessage;
import org.apache.gossip.model.Base;
import org.apache.gossip.model.GossipDataMessage;
import org.apache.gossip.model.Response;
import org.apache.gossip.model.SharedGossipDataMessage;
import org.apache.gossip.udp.Trackable;
import org.apache.gossip.udp.UdpActiveGossipMessage;
import org.apache.gossip.udp.UdpActiveGossipOk;
import org.apache.gossip.udp.UdpGossipDataMessage;
import org.apache.gossip.udp.UdpNotAMemberFault;
import org.apache.gossip.udp.UdpSharedGossipDataMessage;
import org.apache.log4j.Logger;
import org.codehaus.jackson.map.ObjectMapper;
@ -39,24 +41,35 @@ public class GossipCore {
private ConcurrentHashMap<String, Base> requests;
private ExecutorService service;
private final ConcurrentHashMap<String, ConcurrentHashMap<String, GossipDataMessage>> perNodeData;
private final ConcurrentHashMap<String, SharedGossipDataMessage> sharedData;
public GossipCore(GossipManager manager){
this.gossipManager = manager;
requests = new ConcurrentHashMap<>();
service = Executors.newFixedThreadPool(500);
perNodeData = new ConcurrentHashMap<>();
sharedData = new ConcurrentHashMap<>();
}
public void addSharedData(SharedGossipDataMessage message){
SharedGossipDataMessage previous = sharedData.get(message.getKey());
if (previous == null){
sharedData.putIfAbsent(message.getKey(), message);
} else {
if (previous.getTimestamp() < message.getTimestamp()){
sharedData.replace(message.getKey(), previous, message);
}
}
}
public void addPerNodeData(GossipDataMessage message){
ConcurrentHashMap<String,GossipDataMessage> nodeMap = new ConcurrentHashMap<>();
nodeMap.put(message.getKey(), message);
nodeMap = perNodeData.putIfAbsent(message.getNodeId(), nodeMap);
if (nodeMap != null){
//m.put(message.getKey(), message); //TODO only put if > ts
GossipDataMessage current = nodeMap.get(message.getKey());
if (current == null){
nodeMap.replace(message.getKey(), null, message);
nodeMap.putIfAbsent(message.getKey(), message);
} else {
if (current.getTimestamp() < message.getTimestamp()){
nodeMap.replace(message.getKey(), current, message);
@ -69,6 +82,10 @@ public class GossipCore {
return perNodeData;
}
public ConcurrentHashMap<String, SharedGossipDataMessage> getSharedData() {
return sharedData;
}
public void shutdown(){
service.shutdown();
try {
@ -89,6 +106,10 @@ public class GossipCore {
UdpGossipDataMessage message = (UdpGossipDataMessage) base;
addPerNodeData(message);
}
if (base instanceof SharedGossipDataMessage){
UdpSharedGossipDataMessage message = (UdpSharedGossipDataMessage) base;
addSharedData(message);
}
if (base instanceof ActiveGossipMessage){
List<GossipMember> remoteGossipMembers = new ArrayList<>();
RemoteGossipMember senderMember = null;

View File

@ -44,6 +44,7 @@ import org.apache.gossip.event.GossipState;
import org.apache.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread;
import org.apache.gossip.model.GossipDataMessage;
import org.apache.gossip.model.SharedGossipDataMessage;
public abstract class GossipManager implements NotificationListener {
@ -235,7 +236,15 @@ public abstract class GossipManager implements NotificationListener {
gossipCore.addPerNodeData(message);
}
public GossipDataMessage findGossipData(String nodeId, String key){
public void gossipSharedData(SharedGossipDataMessage message){
Objects.nonNull(message.getKey());
Objects.nonNull(message.getTimestamp());
Objects.nonNull(message.getPayload());
message.setNodeId(me.getId());
gossipCore.addSharedData(message);
}
public GossipDataMessage findPerNodeGossipData(String nodeId, String key){
ConcurrentHashMap<String, GossipDataMessage> j = gossipCore.getPerNodeData().get(nodeId);
if (j == null){
return null;
@ -250,6 +259,18 @@ public abstract class GossipManager implements NotificationListener {
return l;
}
}
public SharedGossipDataMessage findSharedGossipData(String key){
SharedGossipDataMessage l = gossipCore.getSharedData().get(key);
if (l == null){
return null;
}
if (l.getExpireAt() < clock.currentTimeMillis()){
return null;
} else {
return l;
}
}
public DataReaper getDataReaper() {
return dataReaper;

View File

@ -4,6 +4,7 @@ import org.apache.gossip.udp.UdpActiveGossipMessage;
import org.apache.gossip.udp.UdpActiveGossipOk;
import org.apache.gossip.udp.UdpGossipDataMessage;
import org.apache.gossip.udp.UdpNotAMemberFault;
import org.apache.gossip.udp.UdpSharedGossipDataMessage;
import org.codehaus.jackson.annotate.JsonSubTypes;
import org.codehaus.jackson.annotate.JsonSubTypes.Type;
import org.codehaus.jackson.annotate.JsonTypeInfo;
@ -20,7 +21,9 @@ import org.codehaus.jackson.annotate.JsonTypeInfo;
@Type(value = UdpActiveGossipMessage.class, name = "UdpActiveGossipMessage"),
@Type(value = UdpNotAMemberFault.class, name = "UdpNotAMemberFault"),
@Type(value = GossipDataMessage.class, name = "GossipDataMessage"),
@Type(value = UdpGossipDataMessage.class, name = "UdpGossipDataMessage")
@Type(value = UdpGossipDataMessage.class, name = "UdpGossipDataMessage"),
@Type(value = SharedGossipDataMessage.class, name = "SharedGossipDataMessage"),
@Type(value = UdpSharedGossipDataMessage.class, name = "UdpSharedGossipDataMessage")
})
public class Base {

View File

@ -0,0 +1,47 @@
package org.apache.gossip.model;
public class SharedGossipDataMessage extends Base {
private String nodeId;
private String key;
private Object payload;
private Long timestamp;
private Long expireAt;
public String getNodeId() {
return nodeId;
}
public void setNodeId(String nodeId) {
this.nodeId = nodeId;
}
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
public Object getPayload() {
return payload;
}
public void setPayload(Object payload) {
this.payload = payload;
}
public Long getTimestamp() {
return timestamp;
}
public void setTimestamp(Long timestamp) {
this.timestamp = timestamp;
}
public Long getExpireAt() {
return expireAt;
}
public void setExpireAt(Long expireAt) {
this.expireAt = expireAt;
}
@Override
public String toString() {
return "SharedGossipDataMessage [nodeId=" + nodeId + ", key=" + key + ", payload=" + payload
+ ", timestamp=" + timestamp + ", expireAt=" + expireAt + "]";
}
}

View File

@ -0,0 +1,31 @@
package org.apache.gossip.udp;
import org.apache.gossip.model.SharedGossipDataMessage;
public class UdpSharedGossipDataMessage extends SharedGossipDataMessage implements Trackable {
private String uriFrom;
private String uuid;
public String getUriFrom() {
return uriFrom;
}
public void setUriFrom(String uriFrom) {
this.uriFrom = uriFrom;
}
public String getUuid() {
return uuid;
}
public void setUuid(String uuid) {
this.uuid = uuid;
}
@Override
public String toString() {
return "UdpSharedGossipDataMessage [uriFrom=" + uriFrom + ", uuid=" + uuid + "]";
}
}

View File

@ -12,6 +12,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.gossip.event.GossipListener;
import org.apache.gossip.event.GossipState;
import org.apache.gossip.model.GossipDataMessage;
import org.apache.gossip.model.SharedGossipDataMessage;
import org.junit.Test;
import io.teknek.tunit.TUnit;
@ -19,7 +20,7 @@ import io.teknek.tunit.TUnit;
public class DataTest {
@Test
public void abc() throws InterruptedException, UnknownHostException, URISyntaxException{
public void dataTest() throws InterruptedException, UnknownHostException, URISyntaxException{
GossipSettings settings = new GossipSettings();
String cluster = UUID.randomUUID().toString();
int seedNodes = 1;
@ -51,20 +52,32 @@ public class DataTest {
return total;
}}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(2);
clients.get(0).gossipPerNodeData(msg());
clients.get(0).gossipSharedData(sharedMsg());
Thread.sleep(10000);
TUnit.assertThat(
new Callable<Object> (){
new Callable<Object>() {
public Object call() throws Exception {
GossipDataMessage x = clients.get(1).findPerNodeData(1+"" , "a");
if (x == null) return "";
else return x.getPayload();
}})
//() -> clients.get(1).findGossipData(1+"" , "a").getPayload())
.afterWaitingAtMost(20, TimeUnit.SECONDS)
.isEqualTo("b");
GossipDataMessage x = clients.get(1).findPerNodeData(1 + "", "a");
if (x == null)
return "";
else
return x.getPayload();
}
}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("b");
TUnit.assertThat(
new Callable<Object>() {
public Object call() throws Exception {
SharedGossipDataMessage x = clients.get(1).findSharedData("a");
if (x == null)
return "";
else
return x.getPayload();
}
}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("c");
for (int i = 0; i < clusterMembers; ++i) {
clients.get(i).shutdown();
}
@ -78,4 +91,14 @@ public class DataTest {
g.setTimestamp(System.currentTimeMillis());
return g;
}
private SharedGossipDataMessage sharedMsg(){
SharedGossipDataMessage g = new SharedGossipDataMessage();
g.setExpireAt(Long.MAX_VALUE);
g.setKey("a");
g.setPayload("c");
g.setTimestamp(System.currentTimeMillis());
return g;
}
}

View File

@ -5,6 +5,7 @@ import java.net.URI;
import org.apache.gossip.GossipSettings;
import org.apache.gossip.manager.random.RandomGossipManager;
import org.apache.gossip.model.GossipDataMessage;
import org.apache.gossip.model.SharedGossipDataMessage;
import org.junit.Assert;
import org.junit.Test;
@ -21,9 +22,13 @@ public class DataReaperTest {
GossipManager gm = RandomGossipManager.newBuilder().cluster("abc").settings(settings)
.withId(myId).uri(URI.create("udp://localhost:5000")).build();
gm.gossipPerNodeData(perNodeDatum(key, value));
Assert.assertEquals(value, gm.findGossipData(myId, key).getPayload());
gm.getDataReaper().runOnce();
TUnit.assertThat(() -> gm.findGossipData(myId, key)).equals(null);
gm.gossipSharedData(sharedDatum(key, value));
Assert.assertEquals(value, gm.findPerNodeGossipData(myId, key).getPayload());
Assert.assertEquals(value, gm.findSharedGossipData(key).getPayload());
gm.getDataReaper().runPerNodeOnce();
gm.getDataReaper().runSharedOnce();
TUnit.assertThat(() -> gm.findPerNodeGossipData(myId, key)).equals(null);
TUnit.assertThat(() -> gm.findSharedGossipData(key)).equals(null);
}
private GossipDataMessage perNodeDatum(String key, String value) {
@ -34,6 +39,16 @@ public class DataReaperTest {
m.setTimestamp(System.currentTimeMillis());
return m;
}
private SharedGossipDataMessage sharedDatum(String key, String value) {
SharedGossipDataMessage m = new SharedGossipDataMessage();
m.setExpireAt(System.currentTimeMillis() + 5L);
m.setKey(key);
m.setPayload(value);
m.setTimestamp(System.currentTimeMillis());
return m;
}
@Test
public void testHigherTimestampWins() {
@ -47,9 +62,9 @@ public class DataReaperTest {
GossipDataMessage after = perNodeDatum(key, "b");
after.setTimestamp(after.getTimestamp() - 1);
gm.gossipPerNodeData(before);
Assert.assertEquals(value, gm.findGossipData(myId, key).getPayload());
Assert.assertEquals(value, gm.findPerNodeGossipData(myId, key).getPayload());
gm.gossipPerNodeData(after);
Assert.assertEquals(value, gm.findGossipData(myId, key).getPayload());
Assert.assertEquals(value, gm.findPerNodeGossipData(myId, key).getPayload());
}
}