diff --git a/src/main/java/com/google/code/gossip/GossipMember.java b/src/main/java/com/google/code/gossip/GossipMember.java index b62313e..0160dea 100644 --- a/src/main/java/com/google/code/gossip/GossipMember.java +++ b/src/main/java/com/google/code/gossip/GossipMember.java @@ -33,9 +33,11 @@ public abstract class GossipMember implements Comparable{ public static final String JSON_PORT = "port"; public static final String JSON_HEARTBEAT = "heartbeat"; public static final String JSON_ID = "id"; + public static final String JSON_CLUSTER = "cluster"; protected final String _host; protected final int _port; protected volatile long _heartbeat; + protected final String _clusterName; /** * The purpose of the id field is to be able for nodes to identify themselves beyond there host/port. For example * an application might generate a persistent id so if they rejoin the cluster at a different host and port we @@ -50,13 +52,23 @@ public abstract class GossipMember implements Comparable{ * @param heartbeat The current heartbeat. * @param id an id that may be replaced after contact */ - public GossipMember(String host, int port, String id, long heartbeat) { + public GossipMember(String clusterName, String host, int port, String id, long heartbeat) { + _clusterName = clusterName; _host = host; _port = port; _id = id; _heartbeat = heartbeat; } + /** + * Get the name of the cluster the member belongs to. + * + * @return The cluster name + */ + public String getClusterName(){ + return _clusterName; + } + /** * Get the hostname or IP address of the remote gossip member. * @return The hostname or IP address. @@ -119,7 +131,8 @@ public abstract class GossipMember implements Comparable{ int result = 1; String address = getAddress(); result = prime * result - + ((address == null) ? 0 : address.hashCode()); + + ((address == null) ? 0 : address.hashCode()) + + _clusterName == null ? 0 : _clusterName.hashCode(); return result; } @@ -140,7 +153,8 @@ public abstract class GossipMember implements Comparable{ return false; } // The object is the same of they both have the same address (hostname and port). - return getAddress().equals(((LocalGossipMember) obj).getAddress()); + return getAddress().equals(((LocalGossipMember) obj).getAddress()) && + getClusterName().equals(((LocalGossipMember) obj).getClusterName()); } /** @@ -150,6 +164,7 @@ public abstract class GossipMember implements Comparable{ public JSONObject toJSONObject() { try { JSONObject jsonObject = new JSONObject(); + jsonObject.put(JSON_CLUSTER, _clusterName); jsonObject.put(JSON_HOST, _host); jsonObject.put(JSON_PORT, _port); jsonObject.put(JSON_ID, _id); diff --git a/src/main/java/com/google/code/gossip/GossipService.java b/src/main/java/com/google/code/gossip/GossipService.java index 13be3ec..3d578a4 100644 --- a/src/main/java/com/google/code/gossip/GossipService.java +++ b/src/main/java/com/google/code/gossip/GossipService.java @@ -46,7 +46,7 @@ public class GossipService { */ public GossipService(StartupSettings startupSettings) throws InterruptedException, UnknownHostException { - this(InetAddress.getLocalHost().getHostAddress(), startupSettings.getPort(), startupSettings.getId(), + this(startupSettings.getCluster(), InetAddress.getLocalHost().getHostAddress(), startupSettings.getPort(), startupSettings.getId(), startupSettings.getGossipMembers(), startupSettings .getGossipSettings(), null); } @@ -57,10 +57,10 @@ public class GossipService { * @throws InterruptedException * @throws UnknownHostException */ - public GossipService(String ipAddress, int port, String id, + public GossipService(String cluster, String ipAddress, int port, String id, List gossipMembers, GossipSettings settings, GossipListener listener) throws InterruptedException, UnknownHostException { - _gossipManager = new RandomGossipManager(ipAddress, port, id, settings, gossipMembers, listener); + _gossipManager = new RandomGossipManager(cluster, ipAddress, port, id, settings, gossipMembers, listener); } public void start() { diff --git a/src/main/java/com/google/code/gossip/LocalGossipMember.java b/src/main/java/com/google/code/gossip/LocalGossipMember.java index 719c1f3..978e822 100644 --- a/src/main/java/com/google/code/gossip/LocalGossipMember.java +++ b/src/main/java/com/google/code/gossip/LocalGossipMember.java @@ -43,9 +43,9 @@ public class LocalGossipMember extends GossipMember { * @param cleanupTimeout * The cleanup timeout for this gossip member. */ - public LocalGossipMember(String hostname, int port, String id, long heartbeat, + public LocalGossipMember(String clusterName, String hostname, int port, String id, long heartbeat, NotificationListener notificationListener, int cleanupTimeout) { - super(hostname, port, id, heartbeat); + super(clusterName, hostname, port, id, heartbeat); this.timeoutTimer = new GossipTimeoutTimer(cleanupTimeout, notificationListener, this); } diff --git a/src/main/java/com/google/code/gossip/RemoteGossipMember.java b/src/main/java/com/google/code/gossip/RemoteGossipMember.java index 6435bf2..d3848fd 100644 --- a/src/main/java/com/google/code/gossip/RemoteGossipMember.java +++ b/src/main/java/com/google/code/gossip/RemoteGossipMember.java @@ -35,8 +35,8 @@ public class RemoteGossipMember extends GossipMember { * @param heartbeat * The current heartbeat. */ - public RemoteGossipMember(String hostname, int port, String id, long heartbeat) { - super(hostname, port, id, heartbeat); + public RemoteGossipMember(String clusterName, String hostname, int port, String id, long heartbeat) { + super(clusterName, hostname, port, id, heartbeat); } /** @@ -47,7 +47,7 @@ public class RemoteGossipMember extends GossipMember { * @param port * The port number. */ - public RemoteGossipMember(String hostname, int port, String id) { - super(hostname, port, id, System.currentTimeMillis()); + public RemoteGossipMember(String clusterName, String hostname, int port, String id) { + super(clusterName, hostname, port, id, System.currentTimeMillis()); } } diff --git a/src/main/java/com/google/code/gossip/StartupSettings.java b/src/main/java/com/google/code/gossip/StartupSettings.java index b6475de..5e0a674 100644 --- a/src/main/java/com/google/code/gossip/StartupSettings.java +++ b/src/main/java/com/google/code/gossip/StartupSettings.java @@ -44,6 +44,8 @@ public class StartupSettings { /** The port to start the gossip service on. */ private int _port; + private String cluster; + /** The gossip settings used at startup. */ private final GossipSettings _gossipSettings; @@ -79,6 +81,14 @@ public class StartupSettings { _gossipMembers = new ArrayList<>(); } + public void setCluster(String cluster){ + this.cluster = cluster; + } + + public String getCluster(){ + return this.cluster; + } + /** * Set the id to be used for this service. * @@ -192,7 +202,7 @@ public class StartupSettings { JSONArray membersJSON = jsonObject.getJSONArray("members"); for (int i = 0; i < membersJSON.length(); i++) { JSONObject memberJSON = membersJSON.getJSONObject(i); - RemoteGossipMember member = new RemoteGossipMember(memberJSON.getString("host"), + RemoteGossipMember member = new RemoteGossipMember(memberJSON.getString("cluster"), memberJSON.getString("host"), memberJSON.getInt("port"), ""); settings.addGossipMember(member); configMembersDetails += member.getAddress(); diff --git a/src/main/java/com/google/code/gossip/examples/GossipExample.java b/src/main/java/com/google/code/gossip/examples/GossipExample.java index 2c8ea94..bb5c884 100644 --- a/src/main/java/com/google/code/gossip/examples/GossipExample.java +++ b/src/main/java/com/google/code/gossip/examples/GossipExample.java @@ -63,18 +63,20 @@ public class GossipExample extends Thread { // Get my ip address. String myIpAddress = InetAddress.getLocalHost().getHostAddress(); + String cluster = "My Gossip Cluster"; + // Create the gossip members and put them in a list and give them a port number starting with // 2000. List startupMembers = new ArrayList<>(); for (int i = 0; i < NUMBER_OF_CLIENTS; ++i) { - startupMembers.add(new RemoteGossipMember(myIpAddress, 2000 + i, "")); + startupMembers.add(new RemoteGossipMember(cluster, myIpAddress, 2000 + i, "")); } // Lets start the gossip clients. // Start the clients, waiting cleaning-interval + 1 second between them which will show the // dead list handling. for (GossipMember member : startupMembers) { - GossipService gossipService = new GossipService(myIpAddress, member.getPort(), "", + GossipService gossipService = new GossipService(cluster, myIpAddress, member.getPort(), "", startupMembers, settings, null); clients.add(gossipService); gossipService.start(); diff --git a/src/main/java/com/google/code/gossip/manager/GossipManager.java b/src/main/java/com/google/code/gossip/manager/GossipManager.java index 99aef68..b695fcc 100644 --- a/src/main/java/com/google/code/gossip/manager/GossipManager.java +++ b/src/main/java/com/google/code/gossip/manager/GossipManager.java @@ -57,17 +57,17 @@ public abstract class GossipManager extends Thread implements NotificationListen private ExecutorService _gossipThreadExecutor; public GossipManager(Class passiveGossipThreadClass, - Class activeGossipThreadClass, String address, int port, + Class activeGossipThreadClass, String cluster, String address, int port, String id, GossipSettings settings, List gossipMembers, GossipListener listener) { _passiveGossipThreadClass = passiveGossipThreadClass; _activeGossipThreadClass = activeGossipThreadClass; _settings = settings; - _me = new LocalGossipMember(address, port, id, System.currentTimeMillis(), this, settings.getCleanupInterval()); + _me = new LocalGossipMember(cluster, address, port, id, System.currentTimeMillis(), this, settings.getCleanupInterval()); members = new ConcurrentSkipListMap<>(); for (GossipMember startupMember : gossipMembers) { if (!startupMember.equals(_me)) { - LocalGossipMember member = new LocalGossipMember(startupMember.getHost(), + LocalGossipMember member = new LocalGossipMember(startupMember.getClusterName(), startupMember.getHost(), startupMember.getPort(), startupMember.getId(), System.currentTimeMillis(), this, settings.getCleanupInterval()); members.put(member, GossipState.UP); diff --git a/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java b/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java index acffb94..5dcea09 100644 --- a/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java +++ b/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java @@ -53,6 +53,8 @@ abstract public class PassiveGossipThread implements Runnable { private AtomicBoolean _keepRunning; + private final String _cluster; + public PassiveGossipThread(GossipManager gossipManager) { _gossipManager = gossipManager; try { @@ -62,6 +64,7 @@ abstract public class PassiveGossipThread implements Runnable { GossipService.LOGGER.debug("Gossip service successfully initialized on port " + _gossipManager.getMyself().getPort()); GossipService.LOGGER.debug("I am " + _gossipManager.getMyself()); + _cluster = _gossipManager.getMyself().getClusterName(); } catch (SocketException ex) { GossipService.LOGGER.warn(ex); throw new RuntimeException(ex); @@ -95,8 +98,9 @@ abstract public class PassiveGossipThread implements Runnable { JSONArray jsonArray = new JSONArray(receivedMessage); for (int i = 0; i < jsonArray.length(); i++) { JSONObject memberJSONObject = jsonArray.getJSONObject(i); - if (memberJSONObject.length() == 4) { + if (memberJSONObject.length() == 5 && _cluster.equals(memberJSONObject.get(GossipMember.JSON_CLUSTER))) { RemoteGossipMember member = new RemoteGossipMember( + memberJSONObject.getString(GossipMember.JSON_CLUSTER), memberJSONObject.getString(GossipMember.JSON_HOST), memberJSONObject.getInt(GossipMember.JSON_PORT), memberJSONObject.getString(GossipMember.JSON_ID), @@ -108,9 +112,11 @@ abstract public class PassiveGossipThread implements Runnable { senderMember = member; } remoteGossipMembers.add(member); + } else if(memberJSONObject.length() == 5) { + GossipService.LOGGER.warn("The member object does not belong to this cluster."); } else { GossipService.LOGGER - .error("The received member object does not contain 4 objects:\n" + .error("The received member object does not contain 5 objects:\n" + memberJSONObject.toString()); } diff --git a/src/main/java/com/google/code/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java b/src/main/java/com/google/code/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java index acfc43e..16584f9 100644 --- a/src/main/java/com/google/code/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java +++ b/src/main/java/com/google/code/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java @@ -48,7 +48,7 @@ public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread for (LocalGossipMember i : gossipManager.getDeadList()){ if (i.getId().equals(senderMember.getId())){ System.out.println(gossipManager.getMyself() +" caught a live one!"); - LocalGossipMember newLocalMember = new LocalGossipMember(senderMember.getHost(), + LocalGossipMember newLocalMember = new LocalGossipMember(senderMember.getClusterName(), senderMember.getHost(), senderMember.getPort(), senderMember.getId(), senderMember.getHeartbeat(), gossipManager, gossipManager.getSettings().getCleanupInterval()); gossipManager.revivieMember(newLocalMember); @@ -68,7 +68,7 @@ public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread } } else if (!gossipManager.getMemberList().contains(remoteMember) && !gossipManager.getDeadList().contains(remoteMember) ){ - LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getHost(), + LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getClusterName(), remoteMember.getHost(), remoteMember.getPort(), remoteMember.getId(), remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings().getCleanupInterval()); gossipManager.createOrRevivieMember(newLocalMember); @@ -78,7 +78,7 @@ public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread LocalGossipMember localDeadMember = gossipManager.getDeadList().get( gossipManager.getDeadList().indexOf(remoteMember)); if (remoteMember.getHeartbeat() > localDeadMember.getHeartbeat()) { - LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getHost(), + LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getClusterName(), remoteMember.getHost(), remoteMember.getPort(), remoteMember.getId(), remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings().getCleanupInterval()); gossipManager.revivieMember(newLocalMember); diff --git a/src/main/java/com/google/code/gossip/manager/random/RandomGossipManager.java b/src/main/java/com/google/code/gossip/manager/random/RandomGossipManager.java index d04f913..e256ec2 100644 --- a/src/main/java/com/google/code/gossip/manager/random/RandomGossipManager.java +++ b/src/main/java/com/google/code/gossip/manager/random/RandomGossipManager.java @@ -26,9 +26,9 @@ import com.google.code.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThrea import java.util.List; public class RandomGossipManager extends GossipManager { - public RandomGossipManager(String address, int port, String id, GossipSettings settings, + public RandomGossipManager(String cluster, String address, int port, String id, GossipSettings settings, List gossipMembers, GossipListener listener) { - super(OnlyProcessReceivedPassiveGossipThread.class, RandomActiveGossipThread.class, address, + super(OnlyProcessReceivedPassiveGossipThread.class, RandomActiveGossipThread.class, cluster, address, port, id, settings, gossipMembers, listener); } } diff --git a/src/test/java/io/teknek/gossip/ShutdownDeadtimeTest.java b/src/test/java/io/teknek/gossip/ShutdownDeadtimeTest.java index ddd8364..af30eb7 100644 --- a/src/test/java/io/teknek/gossip/ShutdownDeadtimeTest.java +++ b/src/test/java/io/teknek/gossip/ShutdownDeadtimeTest.java @@ -23,6 +23,7 @@ import java.net.UnknownHostException; import java.util.ArrayList; import java.util.List; import java.util.Random; +import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; @@ -44,12 +45,13 @@ public class ShutdownDeadtimeTest { //@Ignore public void DeadNodesDoNotComeAliveAgain() throws InterruptedException, UnknownHostException { GossipSettings settings = new GossipSettings(1000, 10000); + String cluster = UUID.randomUUID().toString(); log.info( "Adding seed nodes" ); int seedNodes = 3; List startupMembers = new ArrayList<>(); for (int i = 1; i < seedNodes + 1; ++i) { - startupMembers.add(new RemoteGossipMember("127.0.0.1", 50000 + i, i + "")); + startupMembers.add(new RemoteGossipMember(cluster, "127.0.0.1", 50000 + i, i + "")); } log.info( "Adding clients" ); @@ -57,7 +59,7 @@ public class ShutdownDeadtimeTest { final int clusterMembers = 5; for (int i = 1; i < clusterMembers+1; ++i) { final int j = i; - GossipService gossipService = new GossipService("127.0.0.1", 50000 + i, i + "", + GossipService gossipService = new GossipService(cluster, "127.0.0.1", 50000 + i, i + "", startupMembers, settings, new GossipListener(){ @Override @@ -104,7 +106,7 @@ public class ShutdownDeadtimeTest { }}).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(4); // start client again - GossipService gossipService = new GossipService("127.0.0.1", shutdownPort, shutdownId + "", + GossipService gossipService = new GossipService(cluster, "127.0.0.1", shutdownPort, shutdownId + "", startupMembers, settings, new GossipListener(){ @Override diff --git a/src/test/java/io/teknek/gossip/StartupSettingsTest.java b/src/test/java/io/teknek/gossip/StartupSettingsTest.java index 5cb52e0..5f25dd9 100644 --- a/src/test/java/io/teknek/gossip/StartupSettingsTest.java +++ b/src/test/java/io/teknek/gossip/StartupSettingsTest.java @@ -44,6 +44,7 @@ import static org.junit.Assert.assertTrue; */ public class StartupSettingsTest { private static final Logger log = Logger.getLogger( StartupSettingsTest.class ); + private static final String CLUSTER = UUID.randomUUID().toString(); @Test public void testUsingSettingsFile() throws IOException, InterruptedException, JSONException { @@ -52,7 +53,7 @@ public class StartupSettingsTest { settingsFile.deleteOnExit(); writeSettingsFile(settingsFile); final GossipService firstService = new GossipService( - "127.0.0.1", 50000, UUID.randomUUID().toString(), + CLUSTER, "127.0.0.1", 50000, UUID.randomUUID().toString(), new ArrayList(), new GossipSettings(), null); firstService.start(); @@ -76,12 +77,13 @@ public class StartupSettingsTest { private void writeSettingsFile( File target ) throws IOException { String settings = "[{\n" + // It is odd that this is meant to be in an array, but oh well. + " \"cluster\":\"" + CLUSTER + "\",\n" + " \"id\":\"" + UUID.randomUUID() + "\",\n" + " \"port\":50001,\n" + " \"gossip_interval\":1000,\n" + " \"cleanup_interval\":10000,\n" + " \"members\":[\n" + - " {\"host\":\"127.0.0.1\", \"port\":50000}\n" + + " {\"cluster\": \"" + CLUSTER + "\",\"host\":\"127.0.0.1\", \"port\":50000}\n" + " ]\n" + "}]"; diff --git a/src/test/java/io/teknek/gossip/TenNodeThreeSeedTest.java b/src/test/java/io/teknek/gossip/TenNodeThreeSeedTest.java index 39844ac..0065ade 100644 --- a/src/test/java/io/teknek/gossip/TenNodeThreeSeedTest.java +++ b/src/test/java/io/teknek/gossip/TenNodeThreeSeedTest.java @@ -22,6 +22,7 @@ import io.teknek.tunit.TUnit; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.List; +import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; @@ -52,19 +53,20 @@ public class TenNodeThreeSeedTest { public void abc() throws InterruptedException, UnknownHostException{ GossipSettings settings = new GossipSettings(); + String cluster = UUID.randomUUID().toString(); log.info( "Adding seed nodes" ); int seedNodes = 3; List startupMembers = new ArrayList<>(); for (int i = 1; i < seedNodes+1; ++i) { - startupMembers.add(new RemoteGossipMember("127.0.0.1", 50000 + i, i + "")); + startupMembers.add(new RemoteGossipMember(cluster, "127.0.0.1", 50000 + i, i + "")); } log.info( "Adding clients" ); final List clients = new ArrayList<>(); final int clusterMembers = 5; for (int i = 1; i < clusterMembers+1; ++i) { - GossipService gossipService = new GossipService("127.0.0.1", 50000 + i, i + "", + GossipService gossipService = new GossipService(cluster, "127.0.0.1", 50000 + i, i + "", startupMembers, settings, new GossipListener(){ @Override