GOSSIP-21 Gossip user defined data

This commit is contained in:
Edward Capriolo
2016-10-02 15:02:45 -04:00
13 changed files with 187 additions and 306 deletions

View File

@ -20,6 +20,7 @@ package org.apache.gossip.manager;
import java.io.IOException;
import java.net.DatagramSocket;
import java.util.List;
import java.util.Map.Entry;
import java.util.Random;
import java.util.UUID;
@ -28,7 +29,6 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.gossip.GossipService;
import org.apache.gossip.LocalGossipMember;
import org.apache.gossip.model.ActiveGossipOk;
import org.apache.gossip.model.GossipDataMessage;
@ -36,6 +36,7 @@ import org.apache.gossip.model.GossipMember;
import org.apache.gossip.model.Response;
import org.apache.gossip.udp.UdpActiveGossipMessage;
import org.apache.gossip.udp.UdpGossipDataMessage;
import org.apache.log4j.Logger;
import org.codehaus.jackson.map.ObjectMapper;
@ -60,60 +61,32 @@ public class ActiveGossipThread {
this.gossipCore = gossipCore;
this.scheduledExecutorService = Executors.newScheduledThreadPool(1024);
}
public void init(){
Runnable liveGossip = new Runnable(){
@Override
public void run() {
try {
sendMembershipList(gossipManager.getMyself(), gossipManager.getLiveMembers());
} catch (RuntimeException ex){
LOGGER.warn(ex);
}
}
};
scheduledExecutorService.scheduleAtFixedRate(liveGossip, 0,
public void init() {
scheduledExecutorService.scheduleAtFixedRate(
() -> sendMembershipList(gossipManager.getMyself(), gossipManager.getLiveMembers()), 0,
gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
Runnable deadGossip = new Runnable(){
@Override
public void run() {
try {
sendMembershipList(gossipManager.getMyself(), gossipManager.getDeadMembers());
} catch (RuntimeException ex){
LOGGER.warn(ex);
}
}
};
scheduledExecutorService.scheduleAtFixedRate(deadGossip, 0,
scheduledExecutorService.scheduleAtFixedRate(
() -> sendMembershipList(gossipManager.getMyself(), gossipManager.getDeadMembers()), 0,
gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
Runnable dataGossip = new Runnable(){
@Override
public void run() {
try {
sendData(gossipManager.getMyself(), gossipManager.getLiveMembers());
} catch (RuntimeException ex){
LOGGER.warn(ex);
}
}
};
scheduledExecutorService.scheduleAtFixedRate(dataGossip, 0,
scheduledExecutorService.scheduleAtFixedRate(
() -> sendData(gossipManager.getMyself(), gossipManager.getLiveMembers()), 0,
gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
}
public void shutdown() {
scheduledExecutorService.shutdown();
try {
scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
LOGGER.warn(e);
LOGGER.debug("Issue during shurdown" + e);
}
}
public void sendData(LocalGossipMember me, List<LocalGossipMember> memberList){
LocalGossipMember member = selectPartner(memberList);
if (member == null) {
GossipService.LOGGER.debug("Send sendMembershipList() is called without action");
LOGGER.debug("Send sendMembershipList() is called without action");
return;
}
try (DatagramSocket socket = new DatagramSocket()) {
@ -121,7 +94,6 @@ public class ActiveGossipThread {
for (Entry<String, ConcurrentHashMap<String, GossipDataMessage>> entry : gossipCore.getPerNodeData().entrySet()){
for (Entry<String, GossipDataMessage> innerEntry : entry.getValue().entrySet()){
UdpGossipDataMessage message = new UdpGossipDataMessage();
System.out.println("sending message " + message);
message.setUuid(UUID.randomUUID().toString());
message.setUriFrom(me.getId());
message.setExpireAt(innerEntry.getValue().getExpireAt());
@ -133,32 +105,29 @@ public class ActiveGossipThread {
byte[] json_bytes = MAPPER.writeValueAsString(message).getBytes();
int packet_length = json_bytes.length;
if (packet_length < GossipManager.MAX_PACKET_SIZE) {
//Response r = gossipCore.send(message, member.getUri());
gossipCore.sendOneWay(message, member.getUri());
//TODO: ack this message
} else {
GossipService.LOGGER.error("The length of the to be send message is too large ("
LOGGER.error("The length of the to be send message is too large ("
+ packet_length + " > " + GossipManager.MAX_PACKET_SIZE + ").");
}
}
}
} catch (IOException e1) {
GossipService.LOGGER.warn(e1);
LOGGER.warn(e1);
}
}
/**
* Performs the sending of the membership list, after we have incremented our own heartbeat.
*/
protected void sendMembershipList(LocalGossipMember me, List<LocalGossipMember> memberList) {
protected void sendMembershipList(LocalGossipMember me, List<LocalGossipMember> memberList) {
me.setHeartbeat(System.currentTimeMillis());
LocalGossipMember member = selectPartner(memberList);
if (member == null) {
GossipService.LOGGER.debug("Send sendMembershipList() is called without action");
LOGGER.debug("Send sendMembershipList() is called without action");
return;
} else {
GossipService.LOGGER.debug("Send sendMembershipList() is called to " + member.toString());
LOGGER.debug("Send sendMembershipList() is called to " + member.toString());
}
try (DatagramSocket socket = new DatagramSocket()) {
@ -180,17 +149,15 @@ public class ActiveGossipThread {
LOGGER.warn("Message "+ message + " generated response "+ r);
}
} else {
GossipService.LOGGER.error("The length of the to be send message is too large ("
LOGGER.error("The length of the to be send message is too large ("
+ packet_length + " > " + GossipManager.MAX_PACKET_SIZE + ").");
}
} catch (IOException e1) {
GossipService.LOGGER.warn(e1);
LOGGER.warn(e1);
}
}
/**
* Abstract method which should be implemented by a subclass. This method should return a member
* of the list to gossip with.
*
* @param memberList
* The list of members which are stored in the local list of members.
@ -202,8 +169,7 @@ public class ActiveGossipThread {
int randomNeighborIndex = random.nextInt(memberList.size());
member = memberList.get(randomNeighborIndex);
} else {
GossipService.LOGGER.debug("I am alone in this world.");
LOGGER.debug("I am alone in this world.");
}
return member;
}

View File

@ -47,6 +47,10 @@ public class GossipCore {
perNodeData = new ConcurrentHashMap<>();
}
/**
*
* @param message
*/
public void addPerNodeData(GossipDataMessage message){
ConcurrentHashMap<String,GossipDataMessage> m = new ConcurrentHashMap<>();
m.put(message.getKey(), message);
@ -70,7 +74,6 @@ public class GossipCore {
}
public void recieve(Base base){
System.out.println(base);
if (base instanceof Response){
if (base instanceof Trackable){
Trackable t = (Trackable) base;
@ -80,11 +83,6 @@ public class GossipCore {
if (base instanceof GossipDataMessage) {
UdpGossipDataMessage message = (UdpGossipDataMessage) base;
addPerNodeData(message);
/*
UdpActiveGossipOk o = new UdpActiveGossipOk();
o.setUriFrom(message.getUriFrom());
o.setUuid(message.getUuid());
sendOneWay(o, senderMember.getUri());*/
}
if (base instanceof ActiveGossipMessage){
List<GossipMember> remoteGossipMembers = new ArrayList<>();
@ -178,11 +176,11 @@ public class GossipCore {
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
LOGGER.error(e.getMessage(), e);
LOGGER.debug(e.getMessage(), e);
return null;
} catch (TimeoutException e) {
boolean cancelled = response.cancel(true);
LOGGER.error(String.format("Threadpool timeout attempting to contact %s, cancelled ? %b", uri.toString(), cancelled));
LOGGER.debug(String.format("Threadpool timeout attempting to contact %s, cancelled ? %b", uri.toString(), cancelled));
return null;
} finally {
if (t != null){

View File

@ -41,9 +41,11 @@ import org.apache.gossip.LocalGossipMember;
import org.apache.gossip.event.GossipListener;
import org.apache.gossip.event.GossipState;
import org.apache.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread;
import org.apache.gossip.model.GossipDataMessage;
public abstract class GossipManager extends Thread implements NotificationListener {
public abstract class GossipManager implements NotificationListener {
public static final Logger LOGGER = Logger.getLogger(GossipManager.class);
@ -180,7 +182,7 @@ public abstract class GossipManager extends Thread implements NotificationListen
* Starts the client. Specifically, start the various cycles for this protocol. Start the gossip
* thread and start the receiver thread.
*/
public void run() {
public void init() {
for (LocalGossipMember member : members.keySet()) {
if (member != me) {
member.startTimeoutTimer();
@ -191,14 +193,6 @@ public abstract class GossipManager extends Thread implements NotificationListen
activeGossipThread = new ActiveGossipThread(this, this.gossipCore);
activeGossipThread.init();
GossipService.LOGGER.debug("The GossipService is started.");
while (gossipServiceRunning.get()) {
try {
// TODO
TimeUnit.MILLISECONDS.sleep(1);
} catch (InterruptedException e) {
GossipService.LOGGER.warn("The GossipClient was interrupted.");
}
}
}
/**
@ -227,7 +221,6 @@ public abstract class GossipManager extends Thread implements NotificationListen
public void gossipData(GossipDataMessage message){
message.setNodeId(me.getId());
gossipCore.addPerNodeData(message);
System.out.println(this.getMyself() + " " + gossipCore.getPerNodeData());
}
public GossipDataMessage findGossipData(String nodeId, String key){

View File

@ -121,17 +121,4 @@ abstract public class PassiveGossipThread implements Runnable {
}
}
/**
* Abstract method for merging the local and remote list.
*
* @param gossipManager
* The GossipManager for retrieving the local members and dead members list.
* @param senderMember
* The member who is sending this list, this could be used to send a response if the
* remote list contains out-dated information.
* @param remoteList
* The list of members known at the remote side.
*/
abstract protected void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember,
List<GossipMember> remoteList);
}

View File

@ -17,11 +17,6 @@
*/
package org.apache.gossip.manager.impl;
import java.util.List;
import org.apache.gossip.GossipMember;
import org.apache.gossip.LocalGossipMember;
import org.apache.gossip.RemoteGossipMember;
import org.apache.gossip.manager.GossipCore;
import org.apache.gossip.manager.GossipManager;
import org.apache.gossip.manager.PassiveGossipThread;
@ -35,79 +30,4 @@ public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread
super(gossipManager, gossipCore);
}
/**
* Merge remote list (received from peer), and our local member list. Simply, we must update the
* heartbeats that the remote list has with our list. Also, some additional logic is needed to
* make sure we have not timed out a member and then immediately received a list with that member.
*
* @param gossipManager
* @param senderMember
* @param remoteList
*/
protected void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember,
List<GossipMember> remoteList) {
// if the person sending to us is in the dead list consider them up
for (LocalGossipMember i : gossipManager.getDeadList()) {
if (i.getId().equals(senderMember.getId())) {
LOGGER.info(gossipManager.getMyself() + " contacted by dead member " + senderMember.getUri());
LocalGossipMember newLocalMember = new LocalGossipMember(senderMember.getClusterName(),
senderMember.getUri(), senderMember.getId(),
senderMember.getHeartbeat(), gossipManager, gossipManager.getSettings()
.getCleanupInterval());
gossipManager.reviveMember(newLocalMember);
newLocalMember.startTimeoutTimer();
}
}
for (GossipMember remoteMember : remoteList) {
if (remoteMember.getId().equals(gossipManager.getMyself().getId())) {
continue;
}
if (gossipManager.getLiveMembers().contains(remoteMember)) {
LocalGossipMember localMember = gossipManager.getLiveMembers().get(
gossipManager.getLiveMembers().indexOf(remoteMember));
if (remoteMember.getHeartbeat() > localMember.getHeartbeat()) {
localMember.setHeartbeat(remoteMember.getHeartbeat());
localMember.resetTimeoutTimer();
}
} else if (!gossipManager.getLiveMembers().contains(remoteMember)
&& !gossipManager.getDeadList().contains(remoteMember)) {
LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getClusterName(),
remoteMember.getUri(), remoteMember.getId(),
remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings()
.getCleanupInterval());
gossipManager.createOrReviveMember(newLocalMember);
newLocalMember.startTimeoutTimer();
} else {
if (gossipManager.getDeadList().contains(remoteMember)) {
LocalGossipMember localDeadMember = gossipManager.getDeadList().get(
gossipManager.getDeadList().indexOf(remoteMember));
if (remoteMember.getHeartbeat() > localDeadMember.getHeartbeat()) {
LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getClusterName(),
remoteMember.getUri(), remoteMember.getId(),
remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings()
.getCleanupInterval());
gossipManager.reviveMember(newLocalMember);
newLocalMember.startTimeoutTimer();
LOGGER.debug("Removed remote member " + remoteMember.getAddress()
+ " from dead list and added to local member list.");
} else {
LOGGER.debug("me " + gossipManager.getMyself());
LOGGER.debug("sender " + senderMember);
LOGGER.debug("remote " + remoteList);
LOGGER.debug("live " + gossipManager.getLiveMembers());
LOGGER.debug("dead " + gossipManager.getDeadList());
}
} else {
LOGGER.debug("me " + gossipManager.getMyself());
LOGGER.debug("sender " + senderMember);
LOGGER.debug("remote " + remoteList);
LOGGER.debug("live " + gossipManager.getLiveMembers());
LOGGER.debug("dead " + gossipManager.getDeadList());
// throw new IllegalArgumentException("wtf");
}
}
}
}
}