Merge pull request #18 from edwardcapriolo/ts_as_heartbeat

Ts as heartbeat
This commit is contained in:
edwardcapriolo
2016-03-30 14:28:59 -04:30
9 changed files with 137 additions and 92 deletions

View File

@ -18,7 +18,7 @@ public abstract class GossipMember implements Comparable<GossipMember>{
public static final String JSON_ID = "id"; public static final String JSON_ID = "id";
protected final String _host; protected final String _host;
protected final int _port; protected final int _port;
protected int _heartbeat; protected volatile long _heartbeat;
/** /**
* The purpose of the id field is to be able for nodes to identify themselves beyond there host/port. For example * The purpose of the id field is to be able for nodes to identify themselves beyond there host/port. For example
* an application might generate a persistent id so if they rejoin the cluster at a different host and port we * an application might generate a persistent id so if they rejoin the cluster at a different host and port we
@ -33,7 +33,7 @@ public abstract class GossipMember implements Comparable<GossipMember>{
* @param heartbeat The current heartbeat. * @param heartbeat The current heartbeat.
* @param id an id that may be replaced after contact * @param id an id that may be replaced after contact
*/ */
public GossipMember(String host, int port, String id, int heartbeat) { public GossipMember(String host, int port, String id, long heartbeat) {
_host = host; _host = host;
_port = port; _port = port;
_id = id; _id = id;
@ -68,7 +68,7 @@ public abstract class GossipMember implements Comparable<GossipMember>{
* Get the heartbeat of this gossip member. * Get the heartbeat of this gossip member.
* @return The current heartbeat. * @return The current heartbeat.
*/ */
public int getHeartbeat() { public long getHeartbeat() {
return _heartbeat; return _heartbeat;
} }
@ -76,7 +76,7 @@ public abstract class GossipMember implements Comparable<GossipMember>{
* Set the heartbeat of this gossip member. * Set the heartbeat of this gossip member.
* @param heartbeat The new heartbeat. * @param heartbeat The new heartbeat.
*/ */
public void setHeartbeat(int heartbeat) { public void setHeartbeat(long heartbeat) {
this._heartbeat = heartbeat; this._heartbeat = heartbeat;
} }

View File

@ -26,7 +26,7 @@ public class LocalGossipMember extends GossipMember {
* @param cleanupTimeout * @param cleanupTimeout
* The cleanup timeout for this gossip member. * The cleanup timeout for this gossip member.
*/ */
public LocalGossipMember(String hostname, int port, String id, int heartbeat, public LocalGossipMember(String hostname, int port, String id, long heartbeat,
NotificationListener notificationListener, int cleanupTimeout) { NotificationListener notificationListener, int cleanupTimeout) {
super(hostname, port, id, heartbeat); super(hostname, port, id, heartbeat);
@ -46,4 +46,8 @@ public class LocalGossipMember extends GossipMember {
public void resetTimeoutTimer() { public void resetTimeoutTimer() {
this.timeoutTimer.reset(); this.timeoutTimer.reset();
} }
public void disableTimer() {
this.timeoutTimer.removeAllNotifications();
}
} }

View File

@ -18,7 +18,7 @@ public class RemoteGossipMember extends GossipMember {
* @param heartbeat * @param heartbeat
* The current heartbeat. * The current heartbeat.
*/ */
public RemoteGossipMember(String hostname, int port, String id, int heartbeat) { public RemoteGossipMember(String hostname, int port, String id, long heartbeat) {
super(hostname, port, id, heartbeat); super(hostname, port, id, heartbeat);
} }
@ -31,6 +31,6 @@ public class RemoteGossipMember extends GossipMember {
* The port number. * The port number.
*/ */
public RemoteGossipMember(String hostname, int port, String id) { public RemoteGossipMember(String hostname, int port, String id) {
super(hostname, port, id, 0); super(hostname, port, id, System.currentTimeMillis());
} }
} }

View File

@ -46,12 +46,12 @@ public abstract class GossipManager extends Thread implements NotificationListen
_passiveGossipThreadClass = passiveGossipThreadClass; _passiveGossipThreadClass = passiveGossipThreadClass;
_activeGossipThreadClass = activeGossipThreadClass; _activeGossipThreadClass = activeGossipThreadClass;
_settings = settings; _settings = settings;
_me = new LocalGossipMember(address, port, id, 0, this, settings.getCleanupInterval()); _me = new LocalGossipMember(address, port, id, System.currentTimeMillis(), this, settings.getCleanupInterval());
members = new ConcurrentSkipListMap<>(); members = new ConcurrentSkipListMap<>();
for (GossipMember startupMember : gossipMembers) { for (GossipMember startupMember : gossipMembers) {
if (!startupMember.equals(_me)) { if (!startupMember.equals(_me)) {
LocalGossipMember member = new LocalGossipMember(startupMember.getHost(), LocalGossipMember member = new LocalGossipMember(startupMember.getHost(),
startupMember.getPort(), startupMember.getId(), 0, this, startupMember.getPort(), startupMember.getId(), System.currentTimeMillis(), this,
settings.getCleanupInterval()); settings.getCleanupInterval());
members.put(member, GossipState.UP); members.put(member, GossipState.UP);
GossipService.LOGGER.debug(member); GossipService.LOGGER.debug(member);
@ -81,6 +81,19 @@ public abstract class GossipManager extends Thread implements NotificationListen
} }
} }
public void revivieMember(LocalGossipMember m){
for ( Entry<LocalGossipMember, GossipState> it : this.members.entrySet()){
if (it.getKey().getId().equals(m.getId())){
it.getKey().disableTimer();
}
}
members.remove(m);
members.put(m, GossipState.UP);
if (listener != null) {
listener.gossipEvent(m, GossipState.UP);
}
}
public void createOrRevivieMember(LocalGossipMember m){ public void createOrRevivieMember(LocalGossipMember m){
members.put(m, GossipState.UP); members.put(m, GossipState.UP);
if (listener != null) { if (listener != null) {

View File

@ -60,21 +60,11 @@ abstract public class PassiveGossipThread implements Runnable {
byte[] buf = new byte[_server.getReceiveBufferSize()]; byte[] buf = new byte[_server.getReceiveBufferSize()];
DatagramPacket p = new DatagramPacket(buf, buf.length); DatagramPacket p = new DatagramPacket(buf, buf.length);
_server.receive(p); _server.receive(p);
GossipService.LOGGER.debug("A message has been received from " + p.getAddress() + ":"
+ p.getPort() + ".");
int packet_length = 0; int packet_length = 0;
for (int i = 0; i < 4; i++) { for (int i = 0; i < 4; i++) {
int shift = (4 - 1 - i) * 8; int shift = (4 - 1 - i) * 8;
packet_length += (buf[i] & 0x000000FF) << shift; packet_length += (buf[i] & 0x000000FF) << shift;
} }
// Check whether the package is smaller than the maximal packet length.
// A package larger than this would not be possible to be send from a GossipService,
// since this is check before sending the message.
// This could normally only occur when the list of members is very big,
// or when the packet is malformed, and the first 4 bytes is not the right in anymore.
// For this reason we regards the message.
if (packet_length <= GossipManager.MAX_PACKET_SIZE) { if (packet_length <= GossipManager.MAX_PACKET_SIZE) {
byte[] json_bytes = new byte[packet_length]; byte[] json_bytes = new byte[packet_length];
for (int i = 0; i < packet_length; i++) { for (int i = 0; i < packet_length; i++) {
@ -86,7 +76,6 @@ abstract public class PassiveGossipThread implements Runnable {
try { try {
List<GossipMember> remoteGossipMembers = new ArrayList<>(); List<GossipMember> remoteGossipMembers = new ArrayList<>();
RemoteGossipMember senderMember = null; RemoteGossipMember senderMember = null;
GossipService.LOGGER.debug("Received member list:");
JSONArray jsonArray = new JSONArray(receivedMessage); JSONArray jsonArray = new JSONArray(receivedMessage);
for (int i = 0; i < jsonArray.length(); i++) { for (int i = 0; i < jsonArray.length(); i++) {
JSONObject memberJSONObject = jsonArray.getJSONObject(i); JSONObject memberJSONObject = jsonArray.getJSONObject(i);
@ -95,7 +84,7 @@ abstract public class PassiveGossipThread implements Runnable {
memberJSONObject.getString(GossipMember.JSON_HOST), memberJSONObject.getString(GossipMember.JSON_HOST),
memberJSONObject.getInt(GossipMember.JSON_PORT), memberJSONObject.getInt(GossipMember.JSON_PORT),
memberJSONObject.getString(GossipMember.JSON_ID), memberJSONObject.getString(GossipMember.JSON_ID),
memberJSONObject.getInt(GossipMember.JSON_HEARTBEAT)); memberJSONObject.getLong(GossipMember.JSON_HEARTBEAT));
GossipService.LOGGER.debug(member.toString()); GossipService.LOGGER.debug(member.toString());
// This is the first member found, so this should be the member who is communicating // This is the first member found, so this should be the member who is communicating
// with me. // with me.
@ -115,6 +104,7 @@ abstract public class PassiveGossipThread implements Runnable {
GossipService.LOGGER GossipService.LOGGER
.error("The received message is not well-formed JSON. The following message has been dropped:\n" .error("The received message is not well-formed JSON. The following message has been dropped:\n"
+ receivedMessage); + receivedMessage);
System.out.println(e);
} }
} else { } else {
@ -124,6 +114,7 @@ abstract public class PassiveGossipThread implements Runnable {
} catch (IOException e) { } catch (IOException e) {
GossipService.LOGGER.error(e); GossipService.LOGGER.error(e);
System.out.println(e);
_keepRunning.set(false); _keepRunning.set(false);
} }
} }
@ -148,3 +139,13 @@ abstract public class PassiveGossipThread implements Runnable {
abstract protected void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember, abstract protected void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember,
List<GossipMember> remoteList); List<GossipMember> remoteList);
} }
/*
* random comments
* // Check whether the package is smaller than the maximal packet length.
// A package larger than this would not be possible to be send from a GossipService,
// since this is check before sending the message.
// This could normally only occur when the list of members is very big,
// or when the packet is malformed, and the first 4 bytes is not the right in anymore.
// For this reason we regards the message.
* */

View File

@ -27,11 +27,21 @@ public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread
protected void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember, protected void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember,
List<GossipMember> remoteList) { List<GossipMember> remoteList) {
for (GossipMember remoteMember : remoteList) { //if the person sending to us is in the dead list consider them up
// Skip myself. We don't want ourselves in the local member list. for (LocalGossipMember i : gossipManager.getDeadList()){
if (remoteMember.equals(gossipManager.getMyself())) { if (i.getId().equals(senderMember.getId())){
continue; System.out.println(gossipManager.getMyself() +" caught a live one!");
LocalGossipMember newLocalMember = new LocalGossipMember(senderMember.getHost(),
senderMember.getPort(), senderMember.getId(), senderMember.getHeartbeat(),
gossipManager, gossipManager.getSettings().getCleanupInterval());
gossipManager.revivieMember(newLocalMember);
newLocalMember.startTimeoutTimer();
} }
}
for (GossipMember remoteMember : remoteList) {
if (remoteMember.getId().equals(gossipManager.getMyself().getId())) {
continue;
}
if (gossipManager.getMemberList().contains(remoteMember)) { if (gossipManager.getMemberList().contains(remoteMember)) {
LocalGossipMember localMember = gossipManager.getMemberList().get( LocalGossipMember localMember = gossipManager.getMemberList().get(
gossipManager.getMemberList().indexOf(remoteMember)); gossipManager.getMemberList().indexOf(remoteMember));
@ -39,58 +49,66 @@ public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread
localMember.setHeartbeat(remoteMember.getHeartbeat()); localMember.setHeartbeat(remoteMember.getHeartbeat());
localMember.resetTimeoutTimer(); localMember.resetTimeoutTimer();
} }
} else if (!gossipManager.getMemberList().contains(remoteMember)
&& !gossipManager.getDeadList().contains(remoteMember) ){
LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getHost(),
remoteMember.getPort(), remoteMember.getId(), remoteMember.getHeartbeat(),
gossipManager, gossipManager.getSettings().getCleanupInterval());
gossipManager.createOrRevivieMember(newLocalMember);
newLocalMember.startTimeoutTimer();
} else { } else {
// The remote member is either brand new, or a previously declared dead member.
// If its dead, check the heartbeat because it may have come back from the dead.
if (gossipManager.getDeadList().contains(remoteMember)) { if (gossipManager.getDeadList().contains(remoteMember)) {
// The remote member is known here as a dead member.
GossipService.LOGGER.debug("The remote member is known here as a dead member.");
LocalGossipMember localDeadMember = gossipManager.getDeadList().get( LocalGossipMember localDeadMember = gossipManager.getDeadList().get(
gossipManager.getDeadList().indexOf(remoteMember)); gossipManager.getDeadList().indexOf(remoteMember));
// If a member is restarted the heartbeat will restart from 1, so we should check if (remoteMember.getHeartbeat() > localDeadMember.getHeartbeat()) {
// that here.
// So a member can become from the dead when it is either larger than a previous
// heartbeat (due to network failure)
// or when the heartbeat is 1 (after a restart of the service).
// TODO: What if the first message of a gossip service is sent to a dead node? The
// second member will receive a heartbeat of two.
// TODO: The above does happen. Maybe a special message for a revived member?
// TODO: Or maybe when a member is declared dead for more than
// _settings.getCleanupInterval() ms, reset the heartbeat to 0.
// It will then accept a revived member.
// The above is now handle by checking whether the heartbeat differs
// _settings.getCleanupInterval(), it must be restarted.
if (remoteMember.getHeartbeat() == 1
|| ((localDeadMember.getHeartbeat() - remoteMember.getHeartbeat()) * -1) > (gossipManager
.getSettings().getCleanupInterval() / 1000)
|| remoteMember.getHeartbeat() > localDeadMember.getHeartbeat()) {
GossipService.LOGGER
.debug("The remote member is back from the dead. We will remove it from the dead list and add it as a new member.");
// The remote member is back from the dead.
// Remove it from the dead list.
// gossipManager.getDeadList().remove(localDeadMember);
// Add it as a new member and add it to the member list.
LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getHost(), LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getHost(),
remoteMember.getPort(), remoteMember.getId(), remoteMember.getHeartbeat(), remoteMember.getPort(), remoteMember.getId(), remoteMember.getHeartbeat(),
gossipManager, gossipManager.getSettings().getCleanupInterval()); gossipManager, gossipManager.getSettings().getCleanupInterval());
// gossipManager.getMemberList().add(newLocalMember); gossipManager.revivieMember(newLocalMember);
gossipManager.createOrRevivieMember(newLocalMember);
newLocalMember.startTimeoutTimer(); newLocalMember.startTimeoutTimer();
GossipService.LOGGER.debug("Removed remote member " + remoteMember.getAddress() GossipService.LOGGER.debug("Removed remote member " + remoteMember.getAddress()
+ " from dead list and added to local member list."); + " from dead list and added to local member list.");
} else {
GossipService.LOGGER.debug("me " + gossipManager.getMyself());
GossipService.LOGGER.debug("sender " + senderMember);
GossipService.LOGGER.debug("remote " + remoteList);
GossipService.LOGGER.debug("live " + gossipManager.getMemberList());
GossipService.LOGGER.debug("dead " + gossipManager.getDeadList());
} }
} else { } else {
// Brand spanking new member - welcome. GossipService.LOGGER.debug("me " + gossipManager.getMyself());
LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getHost(), GossipService.LOGGER.debug("sender " + senderMember);
remoteMember.getPort(), remoteMember.getId(), remoteMember.getHeartbeat(), GossipService.LOGGER.debug("remote " + remoteList);
gossipManager, gossipManager.getSettings().getCleanupInterval()); GossipService.LOGGER.debug("live " + gossipManager.getMemberList());
gossipManager.createOrRevivieMember(newLocalMember); GossipService.LOGGER.debug("dead " + gossipManager.getDeadList());
newLocalMember.startTimeoutTimer(); //throw new IllegalArgumentException("wtf");
GossipService.LOGGER.debug("Added new remote member " + remoteMember.getAddress()
+ " to local member list.");
} }
} }
} }
} }
} }
/**
old comment section:
// If a member is restarted the heartbeat will restart from 1, so we should check
// that here.
// So a member can become from the dead when it is either larger than a previous
// heartbeat (due to network failure)
// or when the heartbeat is 1 (after a restart of the service).
// TODO: What if the first message of a gossip service is sent to a dead node? The
// second member will receive a heartbeat of two.
// TODO: The above does happen. Maybe a special message for a revived member?
// TODO: Or maybe when a member is declared dead for more than
// _settings.getCleanupInterval() ms, reset the heartbeat to 0.
// It will then accept a revived member.
// The above is now handle by checking whether the heartbeat differs
// _settings.getCleanupInterval(), it must be restarted.
*/
/*
// The remote member is back from the dead.
// Remove it from the dead list.
// gossipManager.getDeadList().remove(localDeadMember);
// Add it as a new member and add it to the member list.
*/

View File

@ -25,7 +25,7 @@ abstract public class SendMembersActiveGossipThread extends ActiveGossipThread {
*/ */
protected void sendMembershipList(LocalGossipMember me, List<LocalGossipMember> memberList) { protected void sendMembershipList(LocalGossipMember me, List<LocalGossipMember> memberList) {
GossipService.LOGGER.debug("Send sendMembershipList() is called."); GossipService.LOGGER.debug("Send sendMembershipList() is called.");
me.setHeartbeat(me.getHeartbeat() + 1); me.setHeartbeat(System.currentTimeMillis());
LocalGossipMember member = selectPartner(memberList); LocalGossipMember member = selectPartner(memberList);
if (member == null) { if (member == null) {
return; return;

View File

@ -11,7 +11,6 @@ import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import com.google.code.gossip.GossipMember; import com.google.code.gossip.GossipMember;
@ -24,10 +23,10 @@ import com.google.code.gossip.event.GossipState;
public class ShutdownDeadtimeTest { public class ShutdownDeadtimeTest {
private static final Logger log = Logger.getLogger(ShutdownDeadtimeTest.class ); private static final Logger log = Logger.getLogger(ShutdownDeadtimeTest.class );
//@Test @Test
@Ignore //@Ignore
public void DeadNodesDoNotComeAliveAgain() throws InterruptedException, UnknownHostException { public void DeadNodesDoNotComeAliveAgain() throws InterruptedException, UnknownHostException {
GossipSettings settings = new GossipSettings(10,10000); GossipSettings settings = new GossipSettings(1000, 10000);
log.info( "Adding seed nodes" ); log.info( "Adding seed nodes" );
int seedNodes = 3; int seedNodes = 3;
@ -59,7 +58,7 @@ public class ShutdownDeadtimeTest {
total += clients.get(i).get_gossipManager().getMemberList().size(); total += clients.get(i).get_gossipManager().getMemberList().size();
} }
return total; return total;
}}).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(20); }}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(20);
// shutdown one client and verify that one client is lost. // shutdown one client and verify that one client is lost.
Random r = new Random(); Random r = new Random();
@ -75,8 +74,18 @@ public class ShutdownDeadtimeTest {
total += clients.get(i).get_gossipManager().getMemberList().size(); total += clients.get(i).get_gossipManager().getMemberList().size();
} }
return total; return total;
}}).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(16); }}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(16);
clients.remove(randomClientId);
TUnit.assertThat(new Callable<Integer> (){
public Integer call() throws Exception {
int total = 0;
for (int i = 0; i < clusterMembers - 1; ++i) {
total += clients.get(i).get_gossipManager().getDeadList().size();
}
return total;
}}).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(4);
// start client again // start client again
GossipService gossipService = new GossipService("127.0.0.1", shutdownPort, shutdownId + "", GossipService gossipService = new GossipService("127.0.0.1", shutdownPort, shutdownId + "",
startupMembers, settings, startupMembers, settings,
@ -97,7 +106,10 @@ public class ShutdownDeadtimeTest {
total += clients.get(i).get_gossipManager().getMemberList().size(); total += clients.get(i).get_gossipManager().getMemberList().size();
} }
return total; return total;
}}).afterWaitingAtMost(70, TimeUnit.SECONDS).isEqualTo(20); }}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(20);
for (int i = 0; i < clusterMembers; ++i) {
clients.get(i).shutdown();
}
} }
} }

View File

@ -8,11 +8,14 @@ import org.apache.log4j.Logger;
import org.json.JSONException; import org.json.JSONException;
import org.junit.Test; import org.junit.Test;
import io.teknek.tunit.TUnit;
import java.io.File; import java.io.File;
import java.io.FileOutputStream; import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
@ -31,30 +34,24 @@ public class StartupSettingsTest {
log.debug( "Using settings file: " + settingsFile.getAbsolutePath() ); log.debug( "Using settings file: " + settingsFile.getAbsolutePath() );
settingsFile.deleteOnExit(); settingsFile.deleteOnExit();
writeSettingsFile(settingsFile); writeSettingsFile(settingsFile);
final GossipService firstService = new GossipService(
// Start the other simple node that the settings file points to
GossipService firstService = new GossipService(
"127.0.0.1", 50000, UUID.randomUUID().toString(), "127.0.0.1", 50000, UUID.randomUUID().toString(),
new ArrayList<GossipMember>(), new GossipSettings(), null new ArrayList<GossipMember>(), new GossipSettings(), null);
);
firstService.start(); firstService.start();
// Start a node with the settings file TUnit.assertThat(new Callable<Integer> (){
GossipService serviceUnderTest = new GossipService( public Integer call() throws Exception {
StartupSettings.fromJSONFile( settingsFile ) return firstService.get_gossipManager().getMemberList().size();
); }}).afterWaitingAtMost(30, TimeUnit.SECONDS).isEqualTo(0);
final GossipService serviceUnderTest = new GossipService(
StartupSettings.fromJSONFile( settingsFile )
);
serviceUnderTest.start(); serviceUnderTest.start();
TUnit.assertThat(new Callable<Integer> (){
// Let the sync up public Integer call() throws Exception {
TimeUnit.SECONDS.sleep(2); return serviceUnderTest.get_gossipManager().getMemberList().size();
}}).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(1);
// Check the results
assertEquals(1, firstService.get_gossipManager().getMemberList().size() );
assertEquals(1, serviceUnderTest.get_gossipManager().getMemberList().size() );
assertTrue(
firstService.get_gossipManager().getMemberList().size() ==
serviceUnderTest.get_gossipManager().getMemberList().size() );
firstService.shutdown(); firstService.shutdown();
serviceUnderTest.shutdown(); serviceUnderTest.shutdown();
} }