This commit is contained in:
Edward Capriolo
2016-04-29 13:35:51 -04:00
parent a040e2cd36
commit a7f90aae08
18 changed files with 674 additions and 356 deletions

View File

@ -31,31 +31,31 @@ import com.google.code.gossip.LocalGossipMember;
*/
abstract public class ActiveGossipThread implements Runnable {
protected final GossipManager _gossipManager;
protected final GossipManager gossipManager;
private final AtomicBoolean _keepRunning;
private final AtomicBoolean keepRunning;
public ActiveGossipThread(GossipManager gossipManager) {
_gossipManager = gossipManager;
_keepRunning = new AtomicBoolean(true);
this.gossipManager = gossipManager;
this.keepRunning = new AtomicBoolean(true);
}
@Override
public void run() {
while (_keepRunning.get()) {
while (keepRunning.get()) {
try {
TimeUnit.MILLISECONDS.sleep(_gossipManager.getSettings().getGossipInterval());
sendMembershipList(_gossipManager.getMyself(), _gossipManager.getMemberList());
TimeUnit.MILLISECONDS.sleep(gossipManager.getSettings().getGossipInterval());
sendMembershipList(gossipManager.getMyself(), gossipManager.getMemberList());
} catch (InterruptedException e) {
GossipService.LOGGER.error(e);
_keepRunning.set(false);
keepRunning.set(false);
}
}
shutdown();
}
public void shutdown() {
_keepRunning.set(false);
keepRunning.set(false);
}
/**
@ -67,7 +67,7 @@ abstract public class ActiveGossipThread implements Runnable {
/**
* Abstract method which should be implemented by a subclass. This method should return a member
* of the list to gossip with.
*
*
* @param memberList
* The list of members which are stored in the local list of members.
* @return The chosen LocalGossipMember to gossip with.

View File

@ -43,39 +43,50 @@ import com.google.code.gossip.event.GossipState;
public abstract class GossipManager extends Thread implements NotificationListener {
public static final Logger LOGGER = Logger.getLogger(GossipManager.class);
public static final int MAX_PACKET_SIZE = 102400;
private final ConcurrentSkipListMap<LocalGossipMember,GossipState> members;
private final LocalGossipMember _me;
private final GossipSettings _settings;
private final AtomicBoolean _gossipServiceRunning;
private final Class<? extends PassiveGossipThread> _passiveGossipThreadClass;
private final Class<? extends ActiveGossipThread> _activeGossipThreadClass;
private final ConcurrentSkipListMap<LocalGossipMember, GossipState> members;
private final LocalGossipMember me;
private final GossipSettings settings;
private final AtomicBoolean gossipServiceRunning;
private final Class<? extends PassiveGossipThread> passiveGossipThreadClass;
private final Class<? extends ActiveGossipThread> activeGossipThreadClass;
private final GossipListener listener;
private ActiveGossipThread activeGossipThread;
private PassiveGossipThread passiveGossipThread;
private ExecutorService _gossipThreadExecutor;
private ExecutorService gossipThreadExecutor;
public GossipManager(Class<? extends PassiveGossipThread> passiveGossipThreadClass,
Class<? extends ActiveGossipThread> activeGossipThreadClass, String cluster, String address, int port,
String id, GossipSettings settings, List<GossipMember> gossipMembers,
GossipListener listener) {
_passiveGossipThreadClass = passiveGossipThreadClass;
_activeGossipThreadClass = activeGossipThreadClass;
_settings = settings;
_me = new LocalGossipMember(cluster, address, port, id, System.currentTimeMillis(), this, settings.getCleanupInterval());
Class<? extends ActiveGossipThread> activeGossipThreadClass, String cluster,
String address, int port, String id, GossipSettings settings,
List<GossipMember> gossipMembers, GossipListener listener) {
this.passiveGossipThreadClass = passiveGossipThreadClass;
this.activeGossipThreadClass = activeGossipThreadClass;
this.settings = settings;
me = new LocalGossipMember(cluster, address, port, id, System.currentTimeMillis(), this,
settings.getCleanupInterval());
members = new ConcurrentSkipListMap<>();
for (GossipMember startupMember : gossipMembers) {
if (!startupMember.equals(_me)) {
LocalGossipMember member = new LocalGossipMember(startupMember.getClusterName(), startupMember.getHost(),
startupMember.getPort(), startupMember.getId(), System.currentTimeMillis(), this,
settings.getCleanupInterval());
if (!startupMember.equals(me)) {
LocalGossipMember member = new LocalGossipMember(startupMember.getClusterName(),
startupMember.getHost(), startupMember.getPort(), startupMember.getId(),
System.currentTimeMillis(), this, settings.getCleanupInterval());
members.put(member, GossipState.UP);
GossipService.LOGGER.debug(member);
}
}
_gossipThreadExecutor = Executors.newCachedThreadPool();
_gossipServiceRunning = new AtomicBoolean(true);
gossipThreadExecutor = Executors.newCachedThreadPool();
gossipServiceRunning = new AtomicBoolean(true);
this.listener = listener;
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
public void run() {
@ -98,9 +109,9 @@ public abstract class GossipManager extends Thread implements NotificationListen
}
}
public void revivieMember(LocalGossipMember m){
for ( Entry<LocalGossipMember, GossipState> it : this.members.entrySet()){
if (it.getKey().getId().equals(m.getId())){
public void revivieMember(LocalGossipMember m) {
for (Entry<LocalGossipMember, GossipState> it : this.members.entrySet()) {
if (it.getKey().getId().equals(m.getId())) {
it.getKey().disableTimer();
}
}
@ -110,8 +121,8 @@ public abstract class GossipManager extends Thread implements NotificationListen
listener.gossipEvent(m, GossipState.UP);
}
}
public void createOrRevivieMember(LocalGossipMember m){
public void createOrRevivieMember(LocalGossipMember m) {
members.put(m, GossipState.UP);
if (listener != null) {
listener.gossipEvent(m, GossipState.UP);
@ -119,7 +130,7 @@ public abstract class GossipManager extends Thread implements NotificationListen
}
public GossipSettings getSettings() {
return _settings;
return settings;
}
/**
@ -128,8 +139,8 @@ public abstract class GossipManager extends Thread implements NotificationListen
*/
public List<LocalGossipMember> getMemberList() {
List<LocalGossipMember> up = new ArrayList<>();
for (Entry<LocalGossipMember, GossipState> entry : members.entrySet()){
if (GossipState.UP.equals(entry.getValue())){
for (Entry<LocalGossipMember, GossipState> entry : members.entrySet()) {
if (GossipState.UP.equals(entry.getValue())) {
up.add(entry.getKey());
}
}
@ -137,13 +148,13 @@ public abstract class GossipManager extends Thread implements NotificationListen
}
public LocalGossipMember getMyself() {
return _me;
return me;
}
public List<LocalGossipMember> getDeadList() {
List<LocalGossipMember> up = new ArrayList<>();
for (Entry<LocalGossipMember, GossipState> entry : members.entrySet()){
if (GossipState.DOWN.equals(entry.getValue())){
for (Entry<LocalGossipMember, GossipState> entry : members.entrySet()) {
if (GossipState.DOWN.equals(entry.getValue())) {
up.add(entry.getKey());
}
}
@ -156,23 +167,23 @@ public abstract class GossipManager extends Thread implements NotificationListen
*/
public void run() {
for (LocalGossipMember member : members.keySet()) {
if (member != _me) {
if (member != me) {
member.startTimeoutTimer();
}
}
try {
passiveGossipThread = _passiveGossipThreadClass.getConstructor(GossipManager.class)
passiveGossipThread = passiveGossipThreadClass.getConstructor(GossipManager.class)
.newInstance(this);
_gossipThreadExecutor.execute(passiveGossipThread);
activeGossipThread = _activeGossipThreadClass.getConstructor(GossipManager.class)
gossipThreadExecutor.execute(passiveGossipThread);
activeGossipThread = activeGossipThreadClass.getConstructor(GossipManager.class)
.newInstance(this);
_gossipThreadExecutor.execute(activeGossipThread);
gossipThreadExecutor.execute(activeGossipThread);
} catch (InstantiationException | IllegalAccessException | IllegalArgumentException
| InvocationTargetException | NoSuchMethodException | SecurityException e1) {
throw new RuntimeException(e1);
}
GossipService.LOGGER.debug("The GossipService is started.");
while (_gossipServiceRunning.get()) {
while (gossipServiceRunning.get()) {
try {
// TODO
TimeUnit.MILLISECONDS.sleep(1);
@ -186,17 +197,17 @@ public abstract class GossipManager extends Thread implements NotificationListen
* Shutdown the gossip service.
*/
public void shutdown() {
_gossipServiceRunning.set(false);
_gossipThreadExecutor.shutdown();
if (passiveGossipThread != null){
gossipServiceRunning.set(false);
gossipThreadExecutor.shutdown();
if (passiveGossipThread != null) {
passiveGossipThread.shutdown();
}
if (activeGossipThread != null){
if (activeGossipThread != null) {
activeGossipThread.shutdown();
}
try {
boolean result = _gossipThreadExecutor.awaitTermination(1000, TimeUnit.MILLISECONDS);
if (!result){
boolean result = gossipThreadExecutor.awaitTermination(1000, TimeUnit.MILLISECONDS);
if (!result) {
LOGGER.error("executor shutdown timed out");
}
} catch (InterruptedException e) {

View File

@ -47,38 +47,38 @@ abstract public class PassiveGossipThread implements Runnable {
public static final Logger LOGGER = Logger.getLogger(PassiveGossipThread.class);
/** The socket used for the passive thread of the gossip service. */
private DatagramSocket _server;
private DatagramSocket server;
private final GossipManager _gossipManager;
private final GossipManager gossipManager;
private AtomicBoolean _keepRunning;
private AtomicBoolean keepRunning;
private final String _cluster;
private final String cluster;
public PassiveGossipThread(GossipManager gossipManager) {
_gossipManager = gossipManager;
this.gossipManager = gossipManager;
try {
SocketAddress socketAddress = new InetSocketAddress(_gossipManager.getMyself().getHost(),
_gossipManager.getMyself().getPort());
_server = new DatagramSocket(socketAddress);
SocketAddress socketAddress = new InetSocketAddress(gossipManager.getMyself().getHost(),
gossipManager.getMyself().getPort());
server = new DatagramSocket(socketAddress);
GossipService.LOGGER.debug("Gossip service successfully initialized on port "
+ _gossipManager.getMyself().getPort());
GossipService.LOGGER.debug("I am " + _gossipManager.getMyself());
_cluster = _gossipManager.getMyself().getClusterName();
+ gossipManager.getMyself().getPort());
GossipService.LOGGER.debug("I am " + gossipManager.getMyself());
cluster = gossipManager.getMyself().getClusterName();
} catch (SocketException ex) {
GossipService.LOGGER.warn(ex);
throw new RuntimeException(ex);
}
_keepRunning = new AtomicBoolean(true);
keepRunning = new AtomicBoolean(true);
}
@Override
public void run() {
while (_keepRunning.get()) {
while (keepRunning.get()) {
try {
byte[] buf = new byte[_server.getReceiveBufferSize()];
byte[] buf = new byte[server.getReceiveBufferSize()];
DatagramPacket p = new DatagramPacket(buf, buf.length);
_server.receive(p);
server.receive(p);
int packet_length = 0;
for (int i = 0; i < 4; i++) {
int shift = (4 - 1 - i) * 8;
@ -98,7 +98,8 @@ abstract public class PassiveGossipThread implements Runnable {
JSONArray jsonArray = new JSONArray(receivedMessage);
for (int i = 0; i < jsonArray.length(); i++) {
JSONObject memberJSONObject = jsonArray.getJSONObject(i);
if (memberJSONObject.length() == 5 && _cluster.equals(memberJSONObject.get(GossipMember.JSON_CLUSTER))) {
if (memberJSONObject.length() == 5
&& cluster.equals(memberJSONObject.get(GossipMember.JSON_CLUSTER))) {
RemoteGossipMember member = new RemoteGossipMember(
memberJSONObject.getString(GossipMember.JSON_CLUSTER),
memberJSONObject.getString(GossipMember.JSON_HOST),
@ -112,7 +113,7 @@ abstract public class PassiveGossipThread implements Runnable {
senderMember = member;
}
remoteGossipMembers.add(member);
} else if(memberJSONObject.length() == 5) {
} else if (memberJSONObject.length() == 5) {
GossipService.LOGGER.warn("The member object does not belong to this cluster.");
} else {
GossipService.LOGGER
@ -121,7 +122,7 @@ abstract public class PassiveGossipThread implements Runnable {
}
}
mergeLists(_gossipManager, senderMember, remoteGossipMembers);
mergeLists(gossipManager, senderMember, remoteGossipMembers);
} catch (JSONException e) {
GossipService.LOGGER
.error("The received message is not well-formed JSON. The following message has been dropped:\n"
@ -137,7 +138,7 @@ abstract public class PassiveGossipThread implements Runnable {
} catch (IOException e) {
GossipService.LOGGER.error(e);
System.out.println(e);
_keepRunning.set(false);
keepRunning.set(false);
}
}
shutdown();
@ -145,13 +146,14 @@ abstract public class PassiveGossipThread implements Runnable {
public void shutdown() {
try {
_server.close();
} catch (RuntimeException ex){ }
server.close();
} catch (RuntimeException ex) {
}
}
/**
* Abstract method for merging the local and remote list.
*
*
* @param gossipManager
* The GossipManager for retrieving the local members and dead members list.
* @param senderMember
@ -165,11 +167,9 @@ abstract public class PassiveGossipThread implements Runnable {
}
/*
* random comments
* // Check whether the package is smaller than the maximal packet length.
// A package larger than this would not be possible to be send from a GossipService,
// since this is check before sending the message.
// This could normally only occur when the list of members is very big,
// or when the packet is malformed, and the first 4 bytes is not the right in anymore.
// For this reason we regards the message.
* */
* random comments // Check whether the package is smaller than the maximal packet length. // A
* package larger than this would not be possible to be send from a GossipService, // since this is
* check before sending the message. // This could normally only occur when the list of members is
* very big, // or when the packet is malformed, and the first 4 bytes is not the right in anymore.
* // For this reason we regards the message.
*/

View File

@ -36,7 +36,7 @@ public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread
* Merge remote list (received from peer), and our local member list. Simply, we must update the
* heartbeats that the remote list has with our list. Also, some additional logic is needed to
* make sure we have not timed out a member and then immediately received a list with that member.
*
*
* @param gossipManager
* @param senderMember
* @param remoteList
@ -44,13 +44,14 @@ public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread
protected void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember,
List<GossipMember> remoteList) {
//if the person sending to us is in the dead list consider them up
for (LocalGossipMember i : gossipManager.getDeadList()){
if (i.getId().equals(senderMember.getId())){
System.out.println(gossipManager.getMyself() +" caught a live one!");
LocalGossipMember newLocalMember = new LocalGossipMember(senderMember.getClusterName(), senderMember.getHost(),
senderMember.getPort(), senderMember.getId(), senderMember.getHeartbeat(),
gossipManager, gossipManager.getSettings().getCleanupInterval());
// if the person sending to us is in the dead list consider them up
for (LocalGossipMember i : gossipManager.getDeadList()) {
if (i.getId().equals(senderMember.getId())) {
System.out.println(gossipManager.getMyself() + " caught a live one!");
LocalGossipMember newLocalMember = new LocalGossipMember(senderMember.getClusterName(),
senderMember.getHost(), senderMember.getPort(), senderMember.getId(),
senderMember.getHeartbeat(), gossipManager, gossipManager.getSettings()
.getCleanupInterval());
gossipManager.revivieMember(newLocalMember);
newLocalMember.startTimeoutTimer();
}
@ -58,7 +59,7 @@ public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread
for (GossipMember remoteMember : remoteList) {
if (remoteMember.getId().equals(gossipManager.getMyself().getId())) {
continue;
}
}
if (gossipManager.getMemberList().contains(remoteMember)) {
LocalGossipMember localMember = gossipManager.getMemberList().get(
gossipManager.getMemberList().indexOf(remoteMember));
@ -66,11 +67,12 @@ public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread
localMember.setHeartbeat(remoteMember.getHeartbeat());
localMember.resetTimeoutTimer();
}
} else if (!gossipManager.getMemberList().contains(remoteMember)
&& !gossipManager.getDeadList().contains(remoteMember) ){
LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getClusterName(), remoteMember.getHost(),
remoteMember.getPort(), remoteMember.getId(), remoteMember.getHeartbeat(),
gossipManager, gossipManager.getSettings().getCleanupInterval());
} else if (!gossipManager.getMemberList().contains(remoteMember)
&& !gossipManager.getDeadList().contains(remoteMember)) {
LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getClusterName(),
remoteMember.getHost(), remoteMember.getPort(), remoteMember.getId(),
remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings()
.getCleanupInterval());
gossipManager.createOrRevivieMember(newLocalMember);
newLocalMember.startTimeoutTimer();
} else {
@ -78,9 +80,10 @@ public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread
LocalGossipMember localDeadMember = gossipManager.getDeadList().get(
gossipManager.getDeadList().indexOf(remoteMember));
if (remoteMember.getHeartbeat() > localDeadMember.getHeartbeat()) {
LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getClusterName(), remoteMember.getHost(),
remoteMember.getPort(), remoteMember.getId(), remoteMember.getHeartbeat(),
gossipManager, gossipManager.getSettings().getCleanupInterval());
LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getClusterName(),
remoteMember.getHost(), remoteMember.getPort(), remoteMember.getId(),
remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings()
.getCleanupInterval());
gossipManager.revivieMember(newLocalMember);
newLocalMember.startTimeoutTimer();
GossipService.LOGGER.debug("Removed remote member " + remoteMember.getAddress()
@ -98,7 +101,7 @@ public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread
GossipService.LOGGER.debug("remote " + remoteList);
GossipService.LOGGER.debug("live " + gossipManager.getMemberList());
GossipService.LOGGER.debug("dead " + gossipManager.getDeadList());
//throw new IllegalArgumentException("wtf");
// throw new IllegalArgumentException("wtf");
}
}
}
@ -107,25 +110,19 @@ public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread
}
/**
old comment section:
// If a member is restarted the heartbeat will restart from 1, so we should check
// that here.
// So a member can become from the dead when it is either larger than a previous
// heartbeat (due to network failure)
// or when the heartbeat is 1 (after a restart of the service).
// TODO: What if the first message of a gossip service is sent to a dead node? The
// second member will receive a heartbeat of two.
// TODO: The above does happen. Maybe a special message for a revived member?
// TODO: Or maybe when a member is declared dead for more than
// _settings.getCleanupInterval() ms, reset the heartbeat to 0.
// It will then accept a revived member.
// The above is now handle by checking whether the heartbeat differs
// _settings.getCleanupInterval(), it must be restarted.
*/
* old comment section: // If a member is restarted the heartbeat will restart from 1, so we should
* check // that here. // So a member can become from the dead when it is either larger than a
* previous // heartbeat (due to network failure) // or when the heartbeat is 1 (after a restart of
* the service). // TODO: What if the first message of a gossip service is sent to a dead node? The
* // second member will receive a heartbeat of two. // TODO: The above does happen. Maybe a special
* message for a revived member? // TODO: Or maybe when a member is declared dead for more than //
* _settings.getCleanupInterval() ms, reset the heartbeat to 0. // It will then accept a revived
* member. // The above is now handle by checking whether the heartbeat differs //
* _settings.getCleanupInterval(), it must be restarted.
*/
/*
// The remote member is back from the dead.
// Remove it from the dead list.
// gossipManager.getDeadList().remove(localDeadMember);
// Add it as a new member and add it to the member list.
*/
* // The remote member is back from the dead. // Remove it from the dead list. //
* gossipManager.getDeadList().remove(localDeadMember); // Add it as a new member and add it to the
* member list.
*/

View File

@ -47,8 +47,8 @@ abstract public class SendMembersActiveGossipThread extends ActiveGossipThread {
if (member == null) {
return;
}
try (DatagramSocket socket = new DatagramSocket()){
socket.setSoTimeout(_gossipManager.getSettings().getGossipInterval());
try (DatagramSocket socket = new DatagramSocket()) {
socket.setSoTimeout(gossipManager.getSettings().getGossipInterval());
InetAddress dest = InetAddress.getByName(member.getHost());
JSONArray jsonArray = new JSONArray();
jsonArray.put(me.toJSONObject());
@ -60,8 +60,7 @@ abstract public class SendMembersActiveGossipThread extends ActiveGossipThread {
int packet_length = json_bytes.length;
if (packet_length < GossipManager.MAX_PACKET_SIZE) {
byte[] buf = createBuffer(packet_length, json_bytes);
DatagramPacket datagramPacket = new DatagramPacket(buf, buf.length, dest,
member.getPort());
DatagramPacket datagramPacket = new DatagramPacket(buf, buf.length, dest, member.getPort());
socket.send(datagramPacket);
} else {
GossipService.LOGGER.error("The length of the to be send message is too large ("
@ -71,8 +70,8 @@ abstract public class SendMembersActiveGossipThread extends ActiveGossipThread {
GossipService.LOGGER.warn(e1);
}
}
private byte[] createBuffer(int packetLength, byte [] jsonBytes){
private byte[] createBuffer(int packetLength, byte[] jsonBytes) {
byte[] lengthBytes = new byte[4];
lengthBytes[0] = (byte) (packetLength >> 24);
lengthBytes[1] = (byte) ((packetLength << 8) >> 24);
@ -84,5 +83,5 @@ abstract public class SendMembersActiveGossipThread extends ActiveGossipThread {
byte[] buf = byteBuffer.array();
return buf;
}
}

View File

@ -28,23 +28,23 @@ import com.google.code.gossip.manager.impl.SendMembersActiveGossipThread;
public class RandomActiveGossipThread extends SendMembersActiveGossipThread {
/** The Random used for choosing a member to gossip with. */
private final Random _random;
private final Random random;
public RandomActiveGossipThread(GossipManager gossipManager) {
super(gossipManager);
_random = new Random();
random = new Random();
}
/**
* [The selectToSend() function.] Find a random peer from the local membership list. In the case
* where this client is the only member in the list, this method will return null.
*
*
* @return Member random member if list is greater than 1, null otherwise
*/
protected LocalGossipMember selectPartner(List<LocalGossipMember> memberList) {
LocalGossipMember member = null;
if (memberList.size() > 0) {
int randomNeighborIndex = _random.nextInt(memberList.size());
int randomNeighborIndex = random.nextInt(memberList.size());
member = memberList.get(randomNeighborIndex);
} else {
GossipService.LOGGER.debug("I am alone in this world.");

View File

@ -26,9 +26,9 @@ import com.google.code.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThrea
import java.util.List;
public class RandomGossipManager extends GossipManager {
public RandomGossipManager(String cluster, String address, int port, String id, GossipSettings settings,
List<GossipMember> gossipMembers, GossipListener listener) {
super(OnlyProcessReceivedPassiveGossipThread.class, RandomActiveGossipThread.class, cluster, address,
port, id, settings, gossipMembers, listener);
public RandomGossipManager(String cluster, String address, int port, String id,
GossipSettings settings, List<GossipMember> gossipMembers, GossipListener listener) {
super(OnlyProcessReceivedPassiveGossipThread.class, RandomActiveGossipThread.class, cluster,
address, port, id, settings, gossipMembers, listener);
}
}