From 5a5f16515f0807e6929ed01930f7c29f33e88861 Mon Sep 17 00:00:00 2001 From: Edward Capriolo Date: Tue, 29 Mar 2016 13:50:12 -0400 Subject: [PATCH 1/3] Use timestamp as heartbeat --- .../com/google/code/gossip/GossipMember.java | 8 +- .../google/code/gossip/LocalGossipMember.java | 6 +- .../code/gossip/RemoteGossipMember.java | 4 +- .../code/gossip/manager/GossipManager.java | 17 ++- .../gossip/manager/PassiveGossipThread.java | 25 ++--- ...nlyProcessReceivedPassiveGossipThread.java | 100 +++++++++++------- .../impl/SendMembersActiveGossipThread.java | 2 +- .../teknek/gossip/ShutdownDeadtimeTest.java | 27 +++-- 8 files changed, 119 insertions(+), 70 deletions(-) diff --git a/src/main/java/com/google/code/gossip/GossipMember.java b/src/main/java/com/google/code/gossip/GossipMember.java index 204ab5a..0d332e8 100644 --- a/src/main/java/com/google/code/gossip/GossipMember.java +++ b/src/main/java/com/google/code/gossip/GossipMember.java @@ -18,7 +18,7 @@ public abstract class GossipMember implements Comparable{ public static final String JSON_ID = "id"; protected final String _host; protected final int _port; - protected int _heartbeat; + protected volatile long _heartbeat; /** * The purpose of the id field is to be able for nodes to identify themselves beyond there host/port. For example * an application might generate a persistent id so if they rejoin the cluster at a different host and port we @@ -33,7 +33,7 @@ public abstract class GossipMember implements Comparable{ * @param heartbeat The current heartbeat. * @param id an id that may be replaced after contact */ - public GossipMember(String host, int port, String id, int heartbeat) { + public GossipMember(String host, int port, String id, long heartbeat) { _host = host; _port = port; _id = id; @@ -68,7 +68,7 @@ public abstract class GossipMember implements Comparable{ * Get the heartbeat of this gossip member. * @return The current heartbeat. */ - public int getHeartbeat() { + public long getHeartbeat() { return _heartbeat; } @@ -76,7 +76,7 @@ public abstract class GossipMember implements Comparable{ * Set the heartbeat of this gossip member. * @param heartbeat The new heartbeat. */ - public void setHeartbeat(int heartbeat) { + public void setHeartbeat(long heartbeat) { this._heartbeat = heartbeat; } diff --git a/src/main/java/com/google/code/gossip/LocalGossipMember.java b/src/main/java/com/google/code/gossip/LocalGossipMember.java index 1c651a9..b13d504 100644 --- a/src/main/java/com/google/code/gossip/LocalGossipMember.java +++ b/src/main/java/com/google/code/gossip/LocalGossipMember.java @@ -26,7 +26,7 @@ public class LocalGossipMember extends GossipMember { * @param cleanupTimeout * The cleanup timeout for this gossip member. */ - public LocalGossipMember(String hostname, int port, String id, int heartbeat, + public LocalGossipMember(String hostname, int port, String id, long heartbeat, NotificationListener notificationListener, int cleanupTimeout) { super(hostname, port, id, heartbeat); @@ -46,4 +46,8 @@ public class LocalGossipMember extends GossipMember { public void resetTimeoutTimer() { this.timeoutTimer.reset(); } + + public void disableTimer() { + this.timeoutTimer.removeAllNotifications(); + } } diff --git a/src/main/java/com/google/code/gossip/RemoteGossipMember.java b/src/main/java/com/google/code/gossip/RemoteGossipMember.java index 5a95004..1d4ed74 100644 --- a/src/main/java/com/google/code/gossip/RemoteGossipMember.java +++ b/src/main/java/com/google/code/gossip/RemoteGossipMember.java @@ -18,7 +18,7 @@ public class RemoteGossipMember extends GossipMember { * @param heartbeat * The current heartbeat. */ - public RemoteGossipMember(String hostname, int port, String id, int heartbeat) { + public RemoteGossipMember(String hostname, int port, String id, long heartbeat) { super(hostname, port, id, heartbeat); } @@ -31,6 +31,6 @@ public class RemoteGossipMember extends GossipMember { * The port number. */ public RemoteGossipMember(String hostname, int port, String id) { - super(hostname, port, id, 0); + super(hostname, port, id, System.currentTimeMillis()); } } diff --git a/src/main/java/com/google/code/gossip/manager/GossipManager.java b/src/main/java/com/google/code/gossip/manager/GossipManager.java index 81afaa3..57c0f6d 100644 --- a/src/main/java/com/google/code/gossip/manager/GossipManager.java +++ b/src/main/java/com/google/code/gossip/manager/GossipManager.java @@ -46,12 +46,12 @@ public abstract class GossipManager extends Thread implements NotificationListen _passiveGossipThreadClass = passiveGossipThreadClass; _activeGossipThreadClass = activeGossipThreadClass; _settings = settings; - _me = new LocalGossipMember(address, port, id, 0, this, settings.getCleanupInterval()); + _me = new LocalGossipMember(address, port, id, System.currentTimeMillis(), this, settings.getCleanupInterval()); members = new ConcurrentSkipListMap<>(); for (GossipMember startupMember : gossipMembers) { if (!startupMember.equals(_me)) { LocalGossipMember member = new LocalGossipMember(startupMember.getHost(), - startupMember.getPort(), startupMember.getId(), 0, this, + startupMember.getPort(), startupMember.getId(), System.currentTimeMillis(), this, settings.getCleanupInterval()); members.put(member, GossipState.UP); GossipService.LOGGER.debug(member); @@ -81,6 +81,19 @@ public abstract class GossipManager extends Thread implements NotificationListen } } + public void revivieMember(LocalGossipMember m){ + for ( Entry it : this.members.entrySet()){ + if (it.getKey().getId().equals(m.getId())){ + it.getKey().disableTimer(); + } + } + members.remove(m); + members.put(m, GossipState.UP); + if (listener != null) { + listener.gossipEvent(m, GossipState.UP); + } + } + public void createOrRevivieMember(LocalGossipMember m){ members.put(m, GossipState.UP); if (listener != null) { diff --git a/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java b/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java index dc1e9b4..ae54596 100644 --- a/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java +++ b/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java @@ -60,21 +60,11 @@ abstract public class PassiveGossipThread implements Runnable { byte[] buf = new byte[_server.getReceiveBufferSize()]; DatagramPacket p = new DatagramPacket(buf, buf.length); _server.receive(p); - GossipService.LOGGER.debug("A message has been received from " + p.getAddress() + ":" - + p.getPort() + "."); - int packet_length = 0; for (int i = 0; i < 4; i++) { int shift = (4 - 1 - i) * 8; packet_length += (buf[i] & 0x000000FF) << shift; } - - // Check whether the package is smaller than the maximal packet length. - // A package larger than this would not be possible to be send from a GossipService, - // since this is check before sending the message. - // This could normally only occur when the list of members is very big, - // or when the packet is malformed, and the first 4 bytes is not the right in anymore. - // For this reason we regards the message. if (packet_length <= GossipManager.MAX_PACKET_SIZE) { byte[] json_bytes = new byte[packet_length]; for (int i = 0; i < packet_length; i++) { @@ -86,7 +76,6 @@ abstract public class PassiveGossipThread implements Runnable { try { List remoteGossipMembers = new ArrayList<>(); RemoteGossipMember senderMember = null; - GossipService.LOGGER.debug("Received member list:"); JSONArray jsonArray = new JSONArray(receivedMessage); for (int i = 0; i < jsonArray.length(); i++) { JSONObject memberJSONObject = jsonArray.getJSONObject(i); @@ -95,7 +84,7 @@ abstract public class PassiveGossipThread implements Runnable { memberJSONObject.getString(GossipMember.JSON_HOST), memberJSONObject.getInt(GossipMember.JSON_PORT), memberJSONObject.getString(GossipMember.JSON_ID), - memberJSONObject.getInt(GossipMember.JSON_HEARTBEAT)); + memberJSONObject.getLong(GossipMember.JSON_HEARTBEAT)); GossipService.LOGGER.debug(member.toString()); // This is the first member found, so this should be the member who is communicating // with me. @@ -115,6 +104,7 @@ abstract public class PassiveGossipThread implements Runnable { GossipService.LOGGER .error("The received message is not well-formed JSON. The following message has been dropped:\n" + receivedMessage); + System.out.println(e); } } else { @@ -124,6 +114,7 @@ abstract public class PassiveGossipThread implements Runnable { } catch (IOException e) { GossipService.LOGGER.error(e); + System.out.println(e); _keepRunning.set(false); } } @@ -148,3 +139,13 @@ abstract public class PassiveGossipThread implements Runnable { abstract protected void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember, List remoteList); } + +/* + * random comments + * // Check whether the package is smaller than the maximal packet length. + // A package larger than this would not be possible to be send from a GossipService, + // since this is check before sending the message. + // This could normally only occur when the list of members is very big, + // or when the packet is malformed, and the first 4 bytes is not the right in anymore. + // For this reason we regards the message. + * */ diff --git a/src/main/java/com/google/code/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java b/src/main/java/com/google/code/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java index 92fecdf..32bdb61 100644 --- a/src/main/java/com/google/code/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java +++ b/src/main/java/com/google/code/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java @@ -27,11 +27,21 @@ public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread protected void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember, List remoteList) { - for (GossipMember remoteMember : remoteList) { - // Skip myself. We don't want ourselves in the local member list. - if (remoteMember.equals(gossipManager.getMyself())) { - continue; + //if the person sending to us is in the dead list consider them up + for (LocalGossipMember i : gossipManager.getDeadList()){ + if (i.getId().equals(senderMember.getId())){ + System.out.println(gossipManager.getMyself() +" caught a live one!"); + LocalGossipMember newLocalMember = new LocalGossipMember(senderMember.getHost(), + senderMember.getPort(), senderMember.getId(), senderMember.getHeartbeat(), + gossipManager, gossipManager.getSettings().getCleanupInterval()); + gossipManager.revivieMember(newLocalMember); + newLocalMember.startTimeoutTimer(); } + } + for (GossipMember remoteMember : remoteList) { + if (remoteMember.getId().equals(gossipManager.getMyself().getId())) { + continue; + } if (gossipManager.getMemberList().contains(remoteMember)) { LocalGossipMember localMember = gossipManager.getMemberList().get( gossipManager.getMemberList().indexOf(remoteMember)); @@ -39,58 +49,66 @@ public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread localMember.setHeartbeat(remoteMember.getHeartbeat()); localMember.resetTimeoutTimer(); } + } else if (!gossipManager.getMemberList().contains(remoteMember) + && !gossipManager.getDeadList().contains(remoteMember) ){ + LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getHost(), + remoteMember.getPort(), remoteMember.getId(), remoteMember.getHeartbeat(), + gossipManager, gossipManager.getSettings().getCleanupInterval()); + gossipManager.createOrRevivieMember(newLocalMember); + newLocalMember.startTimeoutTimer(); } else { - // The remote member is either brand new, or a previously declared dead member. - // If its dead, check the heartbeat because it may have come back from the dead. if (gossipManager.getDeadList().contains(remoteMember)) { - // The remote member is known here as a dead member. - GossipService.LOGGER.debug("The remote member is known here as a dead member."); LocalGossipMember localDeadMember = gossipManager.getDeadList().get( gossipManager.getDeadList().indexOf(remoteMember)); - // If a member is restarted the heartbeat will restart from 1, so we should check - // that here. - // So a member can become from the dead when it is either larger than a previous - // heartbeat (due to network failure) - // or when the heartbeat is 1 (after a restart of the service). - // TODO: What if the first message of a gossip service is sent to a dead node? The - // second member will receive a heartbeat of two. - // TODO: The above does happen. Maybe a special message for a revived member? - // TODO: Or maybe when a member is declared dead for more than - // _settings.getCleanupInterval() ms, reset the heartbeat to 0. - // It will then accept a revived member. - // The above is now handle by checking whether the heartbeat differs - // _settings.getCleanupInterval(), it must be restarted. - if (remoteMember.getHeartbeat() == 1 - || ((localDeadMember.getHeartbeat() - remoteMember.getHeartbeat()) * -1) > (gossipManager - .getSettings().getCleanupInterval() / 1000) - || remoteMember.getHeartbeat() > localDeadMember.getHeartbeat()) { - GossipService.LOGGER - .debug("The remote member is back from the dead. We will remove it from the dead list and add it as a new member."); - // The remote member is back from the dead. - // Remove it from the dead list. - // gossipManager.getDeadList().remove(localDeadMember); - // Add it as a new member and add it to the member list. + if (remoteMember.getHeartbeat() > localDeadMember.getHeartbeat()) { LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getHost(), remoteMember.getPort(), remoteMember.getId(), remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings().getCleanupInterval()); - // gossipManager.getMemberList().add(newLocalMember); - gossipManager.createOrRevivieMember(newLocalMember); + gossipManager.revivieMember(newLocalMember); newLocalMember.startTimeoutTimer(); GossipService.LOGGER.debug("Removed remote member " + remoteMember.getAddress() + " from dead list and added to local member list."); + } else { + GossipService.LOGGER.debug("me " + gossipManager.getMyself()); + GossipService.LOGGER.debug("sender " + senderMember); + GossipService.LOGGER.debug("remote " + remoteList); + GossipService.LOGGER.debug("live " + gossipManager.getMemberList()); + GossipService.LOGGER.debug("dead " + gossipManager.getDeadList()); } } else { - // Brand spanking new member - welcome. - LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getHost(), - remoteMember.getPort(), remoteMember.getId(), remoteMember.getHeartbeat(), - gossipManager, gossipManager.getSettings().getCleanupInterval()); - gossipManager.createOrRevivieMember(newLocalMember); - newLocalMember.startTimeoutTimer(); - GossipService.LOGGER.debug("Added new remote member " + remoteMember.getAddress() - + " to local member list."); + GossipService.LOGGER.debug("me " + gossipManager.getMyself()); + GossipService.LOGGER.debug("sender " + senderMember); + GossipService.LOGGER.debug("remote " + remoteList); + GossipService.LOGGER.debug("live " + gossipManager.getMemberList()); + GossipService.LOGGER.debug("dead " + gossipManager.getDeadList()); + //throw new IllegalArgumentException("wtf"); } } } } } + +/** +old comment section: +// If a member is restarted the heartbeat will restart from 1, so we should check +// that here. +// So a member can become from the dead when it is either larger than a previous +// heartbeat (due to network failure) +// or when the heartbeat is 1 (after a restart of the service). +// TODO: What if the first message of a gossip service is sent to a dead node? The +// second member will receive a heartbeat of two. +// TODO: The above does happen. Maybe a special message for a revived member? +// TODO: Or maybe when a member is declared dead for more than +// _settings.getCleanupInterval() ms, reset the heartbeat to 0. +// It will then accept a revived member. +// The above is now handle by checking whether the heartbeat differs +// _settings.getCleanupInterval(), it must be restarted. +*/ + +/* +// The remote member is back from the dead. +// Remove it from the dead list. +// gossipManager.getDeadList().remove(localDeadMember); +// Add it as a new member and add it to the member list. +*/ \ No newline at end of file diff --git a/src/main/java/com/google/code/gossip/manager/impl/SendMembersActiveGossipThread.java b/src/main/java/com/google/code/gossip/manager/impl/SendMembersActiveGossipThread.java index 88159f2..d1acd45 100644 --- a/src/main/java/com/google/code/gossip/manager/impl/SendMembersActiveGossipThread.java +++ b/src/main/java/com/google/code/gossip/manager/impl/SendMembersActiveGossipThread.java @@ -25,7 +25,7 @@ abstract public class SendMembersActiveGossipThread extends ActiveGossipThread { */ protected void sendMembershipList(LocalGossipMember me, List memberList) { GossipService.LOGGER.debug("Send sendMembershipList() is called."); - me.setHeartbeat(me.getHeartbeat() + 1); + me.setHeartbeat(System.currentTimeMillis()); LocalGossipMember member = selectPartner(memberList); if (member == null) { return; diff --git a/src/test/java/io/teknek/gossip/ShutdownDeadtimeTest.java b/src/test/java/io/teknek/gossip/ShutdownDeadtimeTest.java index 27e34ba..4e73482 100644 --- a/src/test/java/io/teknek/gossip/ShutdownDeadtimeTest.java +++ b/src/test/java/io/teknek/gossip/ShutdownDeadtimeTest.java @@ -24,10 +24,10 @@ import com.google.code.gossip.event.GossipState; public class ShutdownDeadtimeTest { private static final Logger log = Logger.getLogger(ShutdownDeadtimeTest.class ); - //@Test - @Ignore + @Test + //@Ignore public void DeadNodesDoNotComeAliveAgain() throws InterruptedException, UnknownHostException { - GossipSettings settings = new GossipSettings(10,10000); + GossipSettings settings = new GossipSettings(1000, 10000); log.info( "Adding seed nodes" ); int seedNodes = 3; @@ -59,7 +59,7 @@ public class ShutdownDeadtimeTest { total += clients.get(i).get_gossipManager().getMemberList().size(); } return total; - }}).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(20); + }}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(20); // shutdown one client and verify that one client is lost. Random r = new Random(); @@ -75,8 +75,18 @@ public class ShutdownDeadtimeTest { total += clients.get(i).get_gossipManager().getMemberList().size(); } return total; - }}).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(16); - + }}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(16); + clients.remove(randomClientId); + + TUnit.assertThat(new Callable (){ + public Integer call() throws Exception { + int total = 0; + for (int i = 0; i < clusterMembers - 1; ++i) { + total += clients.get(i).get_gossipManager().getDeadList().size(); + } + return total; + }}).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(4); + // start client again GossipService gossipService = new GossipService("127.0.0.1", shutdownPort, shutdownId + "", startupMembers, settings, @@ -97,7 +107,10 @@ public class ShutdownDeadtimeTest { total += clients.get(i).get_gossipManager().getMemberList().size(); } return total; - }}).afterWaitingAtMost(70, TimeUnit.SECONDS).isEqualTo(20); + }}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(20); + for (int i = 0; i < clusterMembers; ++i) { + clients.get(i).shutdown(); + } } } From 3cbd29cfe20c801ba3cba2a4e6dad3ac7c1b86fc Mon Sep 17 00:00:00 2001 From: Edward Capriolo Date: Tue, 29 Mar 2016 15:37:05 -0400 Subject: [PATCH 2/3] Merge master --- .../teknek/gossip/ShutdownDeadtimeTest.java | 1 - .../io/teknek/gossip/StartupSettingsTest.java | 39 +++++++++---------- 2 files changed, 18 insertions(+), 22 deletions(-) diff --git a/src/test/java/io/teknek/gossip/ShutdownDeadtimeTest.java b/src/test/java/io/teknek/gossip/ShutdownDeadtimeTest.java index 4e73482..33887dd 100644 --- a/src/test/java/io/teknek/gossip/ShutdownDeadtimeTest.java +++ b/src/test/java/io/teknek/gossip/ShutdownDeadtimeTest.java @@ -11,7 +11,6 @@ import java.util.concurrent.TimeUnit; import org.apache.log4j.Logger; -import org.junit.Ignore; import org.junit.Test; import com.google.code.gossip.GossipMember; diff --git a/src/test/java/io/teknek/gossip/StartupSettingsTest.java b/src/test/java/io/teknek/gossip/StartupSettingsTest.java index 7fd8487..38c0a25 100644 --- a/src/test/java/io/teknek/gossip/StartupSettingsTest.java +++ b/src/test/java/io/teknek/gossip/StartupSettingsTest.java @@ -8,11 +8,14 @@ import org.apache.log4j.Logger; import org.json.JSONException; import org.junit.Test; +import io.teknek.tunit.TUnit; + import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.util.ArrayList; import java.util.UUID; +import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; @@ -31,30 +34,24 @@ public class StartupSettingsTest { log.debug( "Using settings file: " + settingsFile.getAbsolutePath() ); settingsFile.deleteOnExit(); writeSettingsFile(settingsFile); - - // Start the other simple node that the settings file points to - GossipService firstService = new GossipService( + final GossipService firstService = new GossipService( "127.0.0.1", 50000, UUID.randomUUID().toString(), - new ArrayList(), new GossipSettings(), null - ); + new ArrayList(), new GossipSettings(), null); + firstService.start(); - - // Start a node with the settings file - GossipService serviceUnderTest = new GossipService( - StartupSettings.fromJSONFile( settingsFile ) - ); + + TUnit.assertThat(new Callable (){ + public Integer call() throws Exception { + return firstService.get_gossipManager().getMemberList().size(); + }}).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(0); + final GossipService serviceUnderTest = new GossipService( + StartupSettings.fromJSONFile( settingsFile ) + ); serviceUnderTest.start(); - - // Let the sync up - TimeUnit.SECONDS.sleep(2); - - // Check the results - assertEquals(1, firstService.get_gossipManager().getMemberList().size() ); - assertEquals(1, serviceUnderTest.get_gossipManager().getMemberList().size() ); - assertTrue( - firstService.get_gossipManager().getMemberList().size() == - serviceUnderTest.get_gossipManager().getMemberList().size() ); - + TUnit.assertThat(new Callable (){ + public Integer call() throws Exception { + return serviceUnderTest.get_gossipManager().getMemberList().size(); + }}).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(1); firstService.shutdown(); serviceUnderTest.shutdown(); } From 2eebb7b9522079abeea375b5a4f13ebf4cf6904d Mon Sep 17 00:00:00 2001 From: Edward Capriolo Date: Tue, 29 Mar 2016 15:43:29 -0400 Subject: [PATCH 3/3] Longer --- src/test/java/io/teknek/gossip/StartupSettingsTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/io/teknek/gossip/StartupSettingsTest.java b/src/test/java/io/teknek/gossip/StartupSettingsTest.java index 38c0a25..a6a461d 100644 --- a/src/test/java/io/teknek/gossip/StartupSettingsTest.java +++ b/src/test/java/io/teknek/gossip/StartupSettingsTest.java @@ -43,7 +43,7 @@ public class StartupSettingsTest { TUnit.assertThat(new Callable (){ public Integer call() throws Exception { return firstService.get_gossipManager().getMemberList().size(); - }}).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(0); + }}).afterWaitingAtMost(30, TimeUnit.SECONDS).isEqualTo(0); final GossipService serviceUnderTest = new GossipService( StartupSettings.fromJSONFile( settingsFile ) );