Merge branch 'GOSSIP-21' of github.com:edwardcapriolo/incubator-gossip
This commit is contained in:
@ -24,6 +24,7 @@ import java.util.List;
|
||||
import org.apache.gossip.event.GossipListener;
|
||||
import org.apache.gossip.manager.GossipManager;
|
||||
import org.apache.gossip.manager.random.RandomGossipManager;
|
||||
import org.apache.gossip.model.GossipDataMessage;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
/**
|
||||
@ -82,6 +83,19 @@ public class GossipService {
|
||||
return gossipManager;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gossip data to the entire cluster
|
||||
* @param message
|
||||
*/
|
||||
public void gossipData(GossipDataMessage message){
|
||||
gossipManager.gossipData(message);
|
||||
}
|
||||
|
||||
|
||||
public GossipDataMessage findGossipData(String nodeId, String key){
|
||||
return this.get_gossipManager().findGossipData(nodeId, key);
|
||||
}
|
||||
|
||||
public void set_gossipManager(GossipManager _gossipManager) {
|
||||
this.gossipManager = _gossipManager;
|
||||
}
|
||||
|
@ -20,17 +20,23 @@ package org.apache.gossip.manager;
|
||||
import java.io.IOException;
|
||||
import java.net.DatagramSocket;
|
||||
import java.util.List;
|
||||
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Random;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.apache.gossip.GossipService;
|
||||
|
||||
import org.apache.gossip.LocalGossipMember;
|
||||
import org.apache.gossip.model.ActiveGossipOk;
|
||||
import org.apache.gossip.model.GossipDataMessage;
|
||||
import org.apache.gossip.model.GossipMember;
|
||||
import org.apache.gossip.model.Response;
|
||||
import org.apache.gossip.udp.UdpActiveGossipMessage;
|
||||
import org.apache.gossip.udp.UdpGossipDataMessage;
|
||||
|
||||
import org.apache.log4j.Logger;
|
||||
import org.codehaus.jackson.map.ObjectMapper;
|
||||
|
||||
@ -41,19 +47,19 @@ import org.codehaus.jackson.map.ObjectMapper;
|
||||
*/
|
||||
public class ActiveGossipThread {
|
||||
|
||||
public static final Logger LOGGER = Logger.getLogger(ActiveGossipThread.class);
|
||||
private static final Logger LOGGER = Logger.getLogger(ActiveGossipThread.class);
|
||||
|
||||
private final GossipManager gossipManager;
|
||||
private final Random random;
|
||||
private final GossipCore gossipCore;
|
||||
private ScheduledExecutorService scheduledExecutorService;
|
||||
private ObjectMapper MAPPER = new ObjectMapper();
|
||||
private final Random random;
|
||||
protected final GossipManager gossipManager;
|
||||
private final GossipCore gossipCore;
|
||||
|
||||
public ActiveGossipThread(GossipManager gossipManager, GossipCore gossipCore) {
|
||||
this.gossipManager = gossipManager;
|
||||
this.scheduledExecutorService = Executors.newScheduledThreadPool(1024);
|
||||
random = new Random();
|
||||
this.gossipCore = gossipCore;
|
||||
this.random = new Random();
|
||||
this.scheduledExecutorService = Executors.newScheduledThreadPool(1024);
|
||||
}
|
||||
|
||||
public void init() {
|
||||
@ -63,14 +69,51 @@ public class ActiveGossipThread {
|
||||
scheduledExecutorService.scheduleAtFixedRate(
|
||||
() -> sendMembershipList(gossipManager.getMyself(), gossipManager.getDeadMembers()), 0,
|
||||
gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
|
||||
scheduledExecutorService.scheduleAtFixedRate(
|
||||
() -> sendData(gossipManager.getMyself(), gossipManager.getLiveMembers()), 0,
|
||||
gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
public void shutdown() {
|
||||
this.scheduledExecutorService.shutdown();
|
||||
scheduledExecutorService.shutdown();
|
||||
try {
|
||||
this.scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS);
|
||||
scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS);
|
||||
} catch (InterruptedException e) {
|
||||
LOGGER.warn("Did not complete shutdown", e);
|
||||
LOGGER.debug("Issue during shurdown" + e);
|
||||
}
|
||||
}
|
||||
|
||||
public void sendData(LocalGossipMember me, List<LocalGossipMember> memberList){
|
||||
LocalGossipMember member = selectPartner(memberList);
|
||||
if (member == null) {
|
||||
LOGGER.debug("Send sendMembershipList() is called without action");
|
||||
return;
|
||||
}
|
||||
try (DatagramSocket socket = new DatagramSocket()) {
|
||||
socket.setSoTimeout(gossipManager.getSettings().getGossipInterval());
|
||||
for (Entry<String, ConcurrentHashMap<String, GossipDataMessage>> entry : gossipCore.getPerNodeData().entrySet()){
|
||||
for (Entry<String, GossipDataMessage> innerEntry : entry.getValue().entrySet()){
|
||||
UdpGossipDataMessage message = new UdpGossipDataMessage();
|
||||
message.setUuid(UUID.randomUUID().toString());
|
||||
message.setUriFrom(me.getId());
|
||||
message.setExpireAt(innerEntry.getValue().getExpireAt());
|
||||
message.setKey(innerEntry.getValue().getKey());
|
||||
message.setNodeId(innerEntry.getValue().getNodeId());
|
||||
message.setTimestamp(innerEntry.getValue().getTimestamp());
|
||||
message.setPayload(innerEntry.getValue().getPayload());
|
||||
message.setTimestamp(innerEntry.getValue().getTimestamp());
|
||||
byte[] json_bytes = MAPPER.writeValueAsString(message).getBytes();
|
||||
int packet_length = json_bytes.length;
|
||||
if (packet_length < GossipManager.MAX_PACKET_SIZE) {
|
||||
gossipCore.sendOneWay(message, member.getUri());
|
||||
} else {
|
||||
LOGGER.error("The length of the to be send message is too large ("
|
||||
+ packet_length + " > " + GossipManager.MAX_PACKET_SIZE + ").");
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (IOException e1) {
|
||||
LOGGER.warn(e1);
|
||||
}
|
||||
}
|
||||
|
||||
@ -78,14 +121,13 @@ public class ActiveGossipThread {
|
||||
* Performs the sending of the membership list, after we have incremented our own heartbeat.
|
||||
*/
|
||||
protected void sendMembershipList(LocalGossipMember me, List<LocalGossipMember> memberList) {
|
||||
|
||||
me.setHeartbeat(System.currentTimeMillis());
|
||||
LocalGossipMember member = selectPartner(memberList);
|
||||
if (member == null) {
|
||||
GossipService.LOGGER.debug("Send sendMembershipList() is called without action");
|
||||
LOGGER.debug("Send sendMembershipList() is called without action");
|
||||
return;
|
||||
} else {
|
||||
GossipService.LOGGER.debug("Send sendMembershipList() is called to " + member.toString());
|
||||
LOGGER.debug("Send sendMembershipList() is called to " + member.toString());
|
||||
}
|
||||
|
||||
try (DatagramSocket socket = new DatagramSocket()) {
|
||||
@ -107,16 +149,15 @@ public class ActiveGossipThread {
|
||||
LOGGER.warn("Message "+ message + " generated response "+ r);
|
||||
}
|
||||
} else {
|
||||
GossipService.LOGGER.error("The length of the to be send message is too large ("
|
||||
LOGGER.error("The length of the to be send message is too large ("
|
||||
+ packet_length + " > " + GossipManager.MAX_PACKET_SIZE + ").");
|
||||
}
|
||||
} catch (IOException e1) {
|
||||
GossipService.LOGGER.warn(e1);
|
||||
LOGGER.warn(e1);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Abstract method which should be implemented by a subclass. This method should return a member
|
||||
* of the list to gossip with.
|
||||
*
|
||||
* @param memberList
|
||||
* The list of members which are stored in the local list of members.
|
||||
|
@ -21,10 +21,12 @@ import org.apache.gossip.LocalGossipMember;
|
||||
import org.apache.gossip.RemoteGossipMember;
|
||||
import org.apache.gossip.model.ActiveGossipMessage;
|
||||
import org.apache.gossip.model.Base;
|
||||
import org.apache.gossip.model.GossipDataMessage;
|
||||
import org.apache.gossip.model.Response;
|
||||
import org.apache.gossip.udp.Trackable;
|
||||
import org.apache.gossip.udp.UdpActiveGossipMessage;
|
||||
import org.apache.gossip.udp.UdpActiveGossipOk;
|
||||
import org.apache.gossip.udp.UdpGossipDataMessage;
|
||||
import org.apache.gossip.udp.UdpNotAMemberFault;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.codehaus.jackson.map.ObjectMapper;
|
||||
@ -34,15 +36,32 @@ public class GossipCore {
|
||||
public static final Logger LOGGER = Logger.getLogger(GossipCore.class);
|
||||
private static final ObjectMapper MAPPER = new ObjectMapper();
|
||||
private final GossipManager gossipManager;
|
||||
|
||||
private ConcurrentHashMap<String, Base> requests;
|
||||
|
||||
private ExecutorService service;
|
||||
private final ConcurrentHashMap<String, ConcurrentHashMap<String, GossipDataMessage>> perNodeData;
|
||||
|
||||
public GossipCore(GossipManager manager){
|
||||
this.gossipManager = manager;
|
||||
requests = new ConcurrentHashMap<>();
|
||||
service = Executors.newFixedThreadPool(500);
|
||||
perNodeData = new ConcurrentHashMap<>();
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @param message
|
||||
*/
|
||||
public void addPerNodeData(GossipDataMessage message){
|
||||
ConcurrentHashMap<String,GossipDataMessage> m = new ConcurrentHashMap<>();
|
||||
m.put(message.getKey(), message);
|
||||
m = perNodeData.putIfAbsent(message.getNodeId(), m);
|
||||
if (m != null){
|
||||
m.put(message.getKey(), message); //TODO only put if > ts
|
||||
}
|
||||
}
|
||||
|
||||
public ConcurrentHashMap<String, ConcurrentHashMap<String, GossipDataMessage>> getPerNodeData(){
|
||||
return perNodeData;
|
||||
}
|
||||
|
||||
public void shutdown(){
|
||||
@ -61,6 +80,10 @@ public class GossipCore {
|
||||
requests.put(t.getUuid() + "/" + t.getUriFrom(), (Base) t);
|
||||
}
|
||||
}
|
||||
if (base instanceof GossipDataMessage) {
|
||||
UdpGossipDataMessage message = (UdpGossipDataMessage) base;
|
||||
addPerNodeData(message);
|
||||
}
|
||||
if (base instanceof ActiveGossipMessage){
|
||||
List<GossipMember> remoteGossipMembers = new ArrayList<>();
|
||||
RemoteGossipMember senderMember = null;
|
||||
@ -153,11 +176,11 @@ public class GossipCore {
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
} catch (ExecutionException e) {
|
||||
LOGGER.error(e.getMessage(), e);
|
||||
LOGGER.debug(e.getMessage(), e);
|
||||
return null;
|
||||
} catch (TimeoutException e) {
|
||||
boolean cancelled = response.cancel(true);
|
||||
LOGGER.error(String.format("Threadpool timeout attempting to contact %s, cancelled ? %b", uri.toString(), cancelled));
|
||||
LOGGER.debug(String.format("Threadpool timeout attempting to contact %s, cancelled ? %b", uri.toString(), cancelled));
|
||||
return null;
|
||||
} finally {
|
||||
if (t != null){
|
||||
|
@ -22,6 +22,7 @@ import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentSkipListMap;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
@ -41,6 +42,9 @@ import org.apache.gossip.event.GossipListener;
|
||||
import org.apache.gossip.event.GossipState;
|
||||
import org.apache.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread;
|
||||
|
||||
import org.apache.gossip.model.GossipDataMessage;
|
||||
|
||||
|
||||
public abstract class GossipManager implements NotificationListener {
|
||||
|
||||
public static final Logger LOGGER = Logger.getLogger(GossipManager.class);
|
||||
@ -213,4 +217,19 @@ public abstract class GossipManager implements NotificationListener {
|
||||
LOGGER.error(e);
|
||||
}
|
||||
}
|
||||
|
||||
public void gossipData(GossipDataMessage message){
|
||||
message.setNodeId(me.getId());
|
||||
gossipCore.addPerNodeData(message);
|
||||
}
|
||||
|
||||
public GossipDataMessage findGossipData(String nodeId, String key){
|
||||
ConcurrentHashMap<String, GossipDataMessage> j = gossipCore.getPerNodeData().get(nodeId);
|
||||
if (j == null){
|
||||
return null;
|
||||
} else {
|
||||
return j.get(key);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -121,17 +121,4 @@ abstract public class PassiveGossipThread implements Runnable {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Abstract method for merging the local and remote list.
|
||||
*
|
||||
* @param gossipManager
|
||||
* The GossipManager for retrieving the local members and dead members list.
|
||||
* @param senderMember
|
||||
* The member who is sending this list, this could be used to send a response if the
|
||||
* remote list contains out-dated information.
|
||||
* @param remoteList
|
||||
* The list of members known at the remote side.
|
||||
*/
|
||||
abstract protected void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember,
|
||||
List<GossipMember> remoteList);
|
||||
}
|
@ -17,11 +17,6 @@
|
||||
*/
|
||||
package org.apache.gossip.manager.impl;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.gossip.GossipMember;
|
||||
import org.apache.gossip.LocalGossipMember;
|
||||
import org.apache.gossip.RemoteGossipMember;
|
||||
import org.apache.gossip.manager.GossipCore;
|
||||
import org.apache.gossip.manager.GossipManager;
|
||||
import org.apache.gossip.manager.PassiveGossipThread;
|
||||
@ -35,79 +30,4 @@ public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread
|
||||
super(gossipManager, gossipCore);
|
||||
}
|
||||
|
||||
/**
|
||||
* Merge remote list (received from peer), and our local member list. Simply, we must update the
|
||||
* heartbeats that the remote list has with our list. Also, some additional logic is needed to
|
||||
* make sure we have not timed out a member and then immediately received a list with that member.
|
||||
*
|
||||
* @param gossipManager
|
||||
* @param senderMember
|
||||
* @param remoteList
|
||||
*/
|
||||
protected void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember,
|
||||
List<GossipMember> remoteList) {
|
||||
|
||||
// if the person sending to us is in the dead list consider them up
|
||||
for (LocalGossipMember i : gossipManager.getDeadList()) {
|
||||
if (i.getId().equals(senderMember.getId())) {
|
||||
LOGGER.info(gossipManager.getMyself() + " contacted by dead member " + senderMember.getUri());
|
||||
LocalGossipMember newLocalMember = new LocalGossipMember(senderMember.getClusterName(),
|
||||
senderMember.getUri(), senderMember.getId(),
|
||||
senderMember.getHeartbeat(), gossipManager, gossipManager.getSettings()
|
||||
.getCleanupInterval());
|
||||
gossipManager.reviveMember(newLocalMember);
|
||||
newLocalMember.startTimeoutTimer();
|
||||
}
|
||||
}
|
||||
for (GossipMember remoteMember : remoteList) {
|
||||
if (remoteMember.getId().equals(gossipManager.getMyself().getId())) {
|
||||
continue;
|
||||
}
|
||||
if (gossipManager.getLiveMembers().contains(remoteMember)) {
|
||||
LocalGossipMember localMember = gossipManager.getLiveMembers().get(
|
||||
gossipManager.getLiveMembers().indexOf(remoteMember));
|
||||
if (remoteMember.getHeartbeat() > localMember.getHeartbeat()) {
|
||||
localMember.setHeartbeat(remoteMember.getHeartbeat());
|
||||
localMember.resetTimeoutTimer();
|
||||
}
|
||||
} else if (!gossipManager.getLiveMembers().contains(remoteMember)
|
||||
&& !gossipManager.getDeadList().contains(remoteMember)) {
|
||||
LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getClusterName(),
|
||||
remoteMember.getUri(), remoteMember.getId(),
|
||||
remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings()
|
||||
.getCleanupInterval());
|
||||
gossipManager.createOrReviveMember(newLocalMember);
|
||||
newLocalMember.startTimeoutTimer();
|
||||
} else {
|
||||
if (gossipManager.getDeadList().contains(remoteMember)) {
|
||||
LocalGossipMember localDeadMember = gossipManager.getDeadList().get(
|
||||
gossipManager.getDeadList().indexOf(remoteMember));
|
||||
if (remoteMember.getHeartbeat() > localDeadMember.getHeartbeat()) {
|
||||
LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getClusterName(),
|
||||
remoteMember.getUri(), remoteMember.getId(),
|
||||
remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings()
|
||||
.getCleanupInterval());
|
||||
gossipManager.reviveMember(newLocalMember);
|
||||
newLocalMember.startTimeoutTimer();
|
||||
LOGGER.debug("Removed remote member " + remoteMember.getAddress()
|
||||
+ " from dead list and added to local member list.");
|
||||
} else {
|
||||
LOGGER.debug("me " + gossipManager.getMyself());
|
||||
LOGGER.debug("sender " + senderMember);
|
||||
LOGGER.debug("remote " + remoteList);
|
||||
LOGGER.debug("live " + gossipManager.getLiveMembers());
|
||||
LOGGER.debug("dead " + gossipManager.getDeadList());
|
||||
}
|
||||
} else {
|
||||
LOGGER.debug("me " + gossipManager.getMyself());
|
||||
LOGGER.debug("sender " + senderMember);
|
||||
LOGGER.debug("remote " + remoteList);
|
||||
LOGGER.debug("live " + gossipManager.getLiveMembers());
|
||||
LOGGER.debug("dead " + gossipManager.getDeadList());
|
||||
// throw new IllegalArgumentException("wtf");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -2,6 +2,7 @@ package org.apache.gossip.model;
|
||||
|
||||
import org.apache.gossip.udp.UdpActiveGossipMessage;
|
||||
import org.apache.gossip.udp.UdpActiveGossipOk;
|
||||
import org.apache.gossip.udp.UdpGossipDataMessage;
|
||||
import org.apache.gossip.udp.UdpNotAMemberFault;
|
||||
import org.codehaus.jackson.annotate.JsonSubTypes;
|
||||
import org.codehaus.jackson.annotate.JsonSubTypes.Type;
|
||||
@ -17,7 +18,9 @@ import org.codehaus.jackson.annotate.JsonTypeInfo;
|
||||
@Type(value = ActiveGossipOk.class, name = "ActiveGossipOk"),
|
||||
@Type(value = UdpActiveGossipOk.class, name = "UdpActiveGossipOk"),
|
||||
@Type(value = UdpActiveGossipMessage.class, name = "UdpActiveGossipMessage"),
|
||||
@Type(value = UdpNotAMemberFault.class, name = "UdpNotAMemberFault")
|
||||
@Type(value = UdpNotAMemberFault.class, name = "UdpNotAMemberFault"),
|
||||
@Type(value = GossipDataMessage.class, name = "GossipDataMessage"),
|
||||
@Type(value = UdpGossipDataMessage.class, name = "UdpGossipDataMessage")
|
||||
})
|
||||
public class Base {
|
||||
|
||||
|
49
src/main/java/org/apache/gossip/model/GossipDataMessage.java
Normal file
49
src/main/java/org/apache/gossip/model/GossipDataMessage.java
Normal file
@ -0,0 +1,49 @@
|
||||
package org.apache.gossip.model;
|
||||
|
||||
public class GossipDataMessage extends Base {
|
||||
|
||||
private String nodeId;
|
||||
private String key;
|
||||
private Object payload;
|
||||
private Long timestamp;
|
||||
private Long expireAt;
|
||||
|
||||
public String getNodeId() {
|
||||
return nodeId;
|
||||
}
|
||||
public void setNodeId(String nodeId) {
|
||||
this.nodeId = nodeId;
|
||||
}
|
||||
public String getKey() {
|
||||
return key;
|
||||
}
|
||||
public void setKey(String key) {
|
||||
this.key = key;
|
||||
}
|
||||
public Object getPayload() {
|
||||
return payload;
|
||||
}
|
||||
public void setPayload(Object payload) {
|
||||
this.payload = payload;
|
||||
}
|
||||
public Long getTimestamp() {
|
||||
return timestamp;
|
||||
}
|
||||
public void setTimestamp(Long timestamp) {
|
||||
this.timestamp = timestamp;
|
||||
}
|
||||
public Long getExpireAt() {
|
||||
return expireAt;
|
||||
}
|
||||
public void setExpireAt(Long expireAt) {
|
||||
this.expireAt = expireAt;
|
||||
}
|
||||
@Override
|
||||
public String toString() {
|
||||
return "GossipDataMessage [nodeId=" + nodeId + ", key=" + key + ", payload=" + payload
|
||||
+ ", timestamp=" + timestamp + ", expireAt=" + expireAt + "]";
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
@ -0,0 +1,31 @@
|
||||
package org.apache.gossip.udp;
|
||||
|
||||
import org.apache.gossip.model.GossipDataMessage;
|
||||
|
||||
public class UdpGossipDataMessage extends GossipDataMessage implements Trackable {
|
||||
|
||||
private String uriFrom;
|
||||
private String uuid;
|
||||
|
||||
public String getUriFrom() {
|
||||
return uriFrom;
|
||||
}
|
||||
|
||||
public void setUriFrom(String uriFrom) {
|
||||
this.uriFrom = uriFrom;
|
||||
}
|
||||
|
||||
public String getUuid() {
|
||||
return uuid;
|
||||
}
|
||||
|
||||
public void setUuid(String uuid) {
|
||||
this.uuid = uuid;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "UdpGossipDataMessage [uriFrom=" + uriFrom + ", uuid=" + uuid + "]";
|
||||
}
|
||||
|
||||
}
|
81
src/test/java/org/apache/gossip/DataTest.java
Normal file
81
src/test/java/org/apache/gossip/DataTest.java
Normal file
@ -0,0 +1,81 @@
|
||||
package org.apache.gossip;
|
||||
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.gossip.event.GossipListener;
|
||||
import org.apache.gossip.event.GossipState;
|
||||
import org.apache.gossip.model.GossipDataMessage;
|
||||
import org.junit.Test;
|
||||
|
||||
import io.teknek.tunit.TUnit;
|
||||
|
||||
public class DataTest {
|
||||
|
||||
@Test
|
||||
public void abc() throws InterruptedException, UnknownHostException, URISyntaxException{
|
||||
GossipSettings settings = new GossipSettings();
|
||||
String cluster = UUID.randomUUID().toString();
|
||||
int seedNodes = 1;
|
||||
List<GossipMember> startupMembers = new ArrayList<>();
|
||||
for (int i = 1; i < seedNodes+1; ++i) {
|
||||
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
|
||||
startupMembers.add(new RemoteGossipMember(cluster, uri, i + ""));
|
||||
}
|
||||
final List<GossipService> clients = new ArrayList<>();
|
||||
final int clusterMembers = 2;
|
||||
for (int i = 1; i < clusterMembers+1; ++i) {
|
||||
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
|
||||
GossipService gossipService = new GossipService(cluster, uri, i + "",
|
||||
startupMembers, settings,
|
||||
new GossipListener(){
|
||||
public void gossipEvent(GossipMember member, GossipState state) {
|
||||
|
||||
}
|
||||
});
|
||||
clients.add(gossipService);
|
||||
gossipService.start();
|
||||
}
|
||||
TUnit.assertThat(new Callable<Integer> (){
|
||||
public Integer call() throws Exception {
|
||||
int total = 0;
|
||||
for (int i = 0; i < clusterMembers; ++i) {
|
||||
total += clients.get(i).get_gossipManager().getLiveMembers().size();
|
||||
}
|
||||
return total;
|
||||
}}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(2);
|
||||
clients.get(0).gossipData(msg());
|
||||
Thread.sleep(10000);
|
||||
TUnit.assertThat(
|
||||
|
||||
new Callable<Object> (){
|
||||
public Object call() throws Exception {
|
||||
GossipDataMessage x = clients.get(1).findGossipData(1+"" , "a");
|
||||
if (x == null) return "";
|
||||
else return x.getPayload();
|
||||
}})
|
||||
|
||||
|
||||
//() -> clients.get(1).findGossipData(1+"" , "a").getPayload())
|
||||
.afterWaitingAtMost(20, TimeUnit.SECONDS)
|
||||
.isEqualTo("b");
|
||||
for (int i = 0; i < clusterMembers; ++i) {
|
||||
clients.get(i).shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
private GossipDataMessage msg(){
|
||||
GossipDataMessage g = new GossipDataMessage();
|
||||
g.setExpireAt(Long.MAX_VALUE);
|
||||
g.setKey("a");
|
||||
g.setPayload("b");
|
||||
g.setTimestamp(System.currentTimeMillis());
|
||||
return g;
|
||||
}
|
||||
}
|
@ -29,7 +29,6 @@ import java.util.UUID;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
import org.apache.gossip.event.GossipListener;
|
||||
@ -45,7 +44,8 @@ public class ShutdownDeadtimeTest {
|
||||
private static final Logger log = Logger.getLogger(ShutdownDeadtimeTest.class);
|
||||
|
||||
@Test
|
||||
public void DeadNodesDoNotComeAliveAgain() throws InterruptedException, UnknownHostException, URISyntaxException {
|
||||
public void DeadNodesDoNotComeAliveAgain()
|
||||
throws InterruptedException, UnknownHostException, URISyntaxException {
|
||||
GossipSettings settings = new GossipSettings(1000, 10000);
|
||||
String cluster = UUID.randomUUID().toString();
|
||||
|
||||
@ -63,12 +63,12 @@ public class ShutdownDeadtimeTest {
|
||||
for (int i = 1; i < clusterMembers + 1; ++i) {
|
||||
final int j = i;
|
||||
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
|
||||
GossipService gossipService = new GossipService(cluster, uri, i + "",
|
||||
startupMembers, settings,
|
||||
new GossipListener(){
|
||||
GossipService gossipService = new GossipService(cluster, uri, i + "", startupMembers,
|
||||
settings, new GossipListener() {
|
||||
@Override
|
||||
public void gossipEvent(GossipMember member, GossipState state) {
|
||||
System.out.println(System.currentTimeMillis() + " Member "+j + " reports "+ member+" "+ state);
|
||||
System.out.println(System.currentTimeMillis() + " Member " + j + " reports "
|
||||
+ member + " " + state);
|
||||
}
|
||||
});
|
||||
clients.add(gossipService);
|
||||
@ -81,13 +81,15 @@ public class ShutdownDeadtimeTest {
|
||||
total += clients.get(i).get_gossipManager().getLiveMembers().size();
|
||||
}
|
||||
return total;
|
||||
}}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(20);
|
||||
}
|
||||
}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(20);
|
||||
|
||||
// shutdown one client and verify that one client is lost.
|
||||
Random r = new Random();
|
||||
int randomClientId = r.nextInt(clusterMembers);
|
||||
log.info("shutting down " + randomClientId);
|
||||
final int shutdownPort = clients.get(randomClientId).get_gossipManager().getMyself().getUri().getPort();
|
||||
final int shutdownPort = clients.get(randomClientId).get_gossipManager().getMyself().getUri()
|
||||
.getPort();
|
||||
final String shutdownId = clients.get(randomClientId).get_gossipManager().getMyself().getId();
|
||||
clients.get(randomClientId).shutdown();
|
||||
TUnit.assertThat(new Callable<Integer>() {
|
||||
@ -97,7 +99,8 @@ public class ShutdownDeadtimeTest {
|
||||
total += clients.get(i).get_gossipManager().getLiveMembers().size();
|
||||
}
|
||||
return total;
|
||||
}}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(16);
|
||||
}
|
||||
}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(16);
|
||||
clients.remove(randomClientId);
|
||||
|
||||
TUnit.assertThat(new Callable<Integer>() {
|
||||
@ -107,13 +110,13 @@ public class ShutdownDeadtimeTest {
|
||||
total += clients.get(i).get_gossipManager().getDeadList().size();
|
||||
}
|
||||
return total;
|
||||
}}).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(4);
|
||||
}
|
||||
}).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(4);
|
||||
|
||||
URI uri = new URI("udp://" + "127.0.0.1" + ":" + shutdownPort);
|
||||
// start client again
|
||||
GossipService gossipService = new GossipService(cluster, uri, shutdownId + "",
|
||||
startupMembers, settings,
|
||||
new GossipListener(){
|
||||
GossipService gossipService = new GossipService(cluster, uri, shutdownId + "", startupMembers,
|
||||
settings, new GossipListener() {
|
||||
@Override
|
||||
public void gossipEvent(GossipMember member, GossipState state) {
|
||||
// System.out.println("revived " + member+" "+ state);
|
||||
@ -130,7 +133,8 @@ public class ShutdownDeadtimeTest {
|
||||
total += clients.get(i).get_gossipManager().getLiveMembers().size();
|
||||
}
|
||||
return total;
|
||||
}}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(20);
|
||||
}
|
||||
}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(20);
|
||||
|
||||
for (int i = 0; i < clusterMembers; ++i) {
|
||||
clients.get(i).shutdown();
|
||||
|
@ -21,6 +21,8 @@ import org.apache.log4j.Logger;
|
||||
import org.json.JSONException;
|
||||
|
||||
import io.teknek.tunit.TUnit;
|
||||
|
||||
import org.junit.After;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.File;
|
||||
@ -44,6 +46,7 @@ public class StartupSettingsTest {
|
||||
private static final Logger log = Logger.getLogger( StartupSettingsTest.class );
|
||||
private static final String CLUSTER = UUID.randomUUID().toString();
|
||||
|
||||
|
||||
@Test
|
||||
public void testUsingSettingsFile() throws IOException, InterruptedException, JSONException, URISyntaxException {
|
||||
File settingsFile = File.createTempFile("gossipTest",".json");
|
||||
|
@ -33,6 +33,7 @@ import org.apache.log4j.Logger;
|
||||
|
||||
import org.apache.gossip.event.GossipListener;
|
||||
import org.apache.gossip.event.GossipState;
|
||||
import org.junit.After;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
@RunWith(JUnitPlatform.class)
|
||||
|
@ -45,14 +45,12 @@ public class RandomGossipManagerBuilderTest {
|
||||
public static class TestGossipListener implements GossipListener {
|
||||
@Override
|
||||
public void gossipEvent(GossipMember member, GossipState state) {
|
||||
System.out.println("Got gossip event");
|
||||
}
|
||||
}
|
||||
|
||||
public static class TestNotificationListener implements NotificationListener {
|
||||
@Override
|
||||
public void handleNotification(Notification notification, Object o) {
|
||||
System.out.println("Got notification event");
|
||||
}
|
||||
}
|
||||
|
||||
@ -75,8 +73,8 @@ public class RandomGossipManagerBuilderTest {
|
||||
expectThrows(IllegalArgumentException.class,() -> {
|
||||
RandomGossipManager.newBuilder().withId("id").cluster("aCluster").build();
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void createMembersListIfNull() throws URISyntaxException {
|
||||
RandomGossipManager gossipManager = RandomGossipManager.newBuilder()
|
||||
|
Reference in New Issue
Block a user