Merge pull request #21 from ptgoetz/cluster-ids
WIP on configurable cluster names
This commit is contained in:
@ -33,9 +33,11 @@ public abstract class GossipMember implements Comparable<GossipMember>{
|
||||
public static final String JSON_PORT = "port";
|
||||
public static final String JSON_HEARTBEAT = "heartbeat";
|
||||
public static final String JSON_ID = "id";
|
||||
public static final String JSON_CLUSTER = "cluster";
|
||||
protected final String _host;
|
||||
protected final int _port;
|
||||
protected volatile long _heartbeat;
|
||||
protected final String _clusterName;
|
||||
/**
|
||||
* The purpose of the id field is to be able for nodes to identify themselves beyond there host/port. For example
|
||||
* an application might generate a persistent id so if they rejoin the cluster at a different host and port we
|
||||
@ -50,13 +52,23 @@ public abstract class GossipMember implements Comparable<GossipMember>{
|
||||
* @param heartbeat The current heartbeat.
|
||||
* @param id an id that may be replaced after contact
|
||||
*/
|
||||
public GossipMember(String host, int port, String id, long heartbeat) {
|
||||
public GossipMember(String clusterName, String host, int port, String id, long heartbeat) {
|
||||
_clusterName = clusterName;
|
||||
_host = host;
|
||||
_port = port;
|
||||
_id = id;
|
||||
_heartbeat = heartbeat;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the name of the cluster the member belongs to.
|
||||
*
|
||||
* @return The cluster name
|
||||
*/
|
||||
public String getClusterName(){
|
||||
return _clusterName;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the hostname or IP address of the remote gossip member.
|
||||
* @return The hostname or IP address.
|
||||
@ -119,7 +131,8 @@ public abstract class GossipMember implements Comparable<GossipMember>{
|
||||
int result = 1;
|
||||
String address = getAddress();
|
||||
result = prime * result
|
||||
+ ((address == null) ? 0 : address.hashCode());
|
||||
+ ((address == null) ? 0 : address.hashCode())
|
||||
+ _clusterName == null ? 0 : _clusterName.hashCode();
|
||||
return result;
|
||||
}
|
||||
|
||||
@ -140,7 +153,8 @@ public abstract class GossipMember implements Comparable<GossipMember>{
|
||||
return false;
|
||||
}
|
||||
// The object is the same of they both have the same address (hostname and port).
|
||||
return getAddress().equals(((LocalGossipMember) obj).getAddress());
|
||||
return getAddress().equals(((LocalGossipMember) obj).getAddress()) &&
|
||||
getClusterName().equals(((LocalGossipMember) obj).getClusterName());
|
||||
}
|
||||
|
||||
/**
|
||||
@ -150,6 +164,7 @@ public abstract class GossipMember implements Comparable<GossipMember>{
|
||||
public JSONObject toJSONObject() {
|
||||
try {
|
||||
JSONObject jsonObject = new JSONObject();
|
||||
jsonObject.put(JSON_CLUSTER, _clusterName);
|
||||
jsonObject.put(JSON_HOST, _host);
|
||||
jsonObject.put(JSON_PORT, _port);
|
||||
jsonObject.put(JSON_ID, _id);
|
||||
|
@ -46,7 +46,7 @@ public class GossipService {
|
||||
*/
|
||||
public GossipService(StartupSettings startupSettings) throws InterruptedException,
|
||||
UnknownHostException {
|
||||
this(InetAddress.getLocalHost().getHostAddress(), startupSettings.getPort(), startupSettings.getId(),
|
||||
this(startupSettings.getCluster(), InetAddress.getLocalHost().getHostAddress(), startupSettings.getPort(), startupSettings.getId(),
|
||||
startupSettings.getGossipMembers(), startupSettings
|
||||
.getGossipSettings(), null);
|
||||
}
|
||||
@ -57,10 +57,10 @@ public class GossipService {
|
||||
* @throws InterruptedException
|
||||
* @throws UnknownHostException
|
||||
*/
|
||||
public GossipService(String ipAddress, int port, String id,
|
||||
public GossipService(String cluster, String ipAddress, int port, String id,
|
||||
List<GossipMember> gossipMembers, GossipSettings settings, GossipListener listener)
|
||||
throws InterruptedException, UnknownHostException {
|
||||
_gossipManager = new RandomGossipManager(ipAddress, port, id, settings, gossipMembers, listener);
|
||||
_gossipManager = new RandomGossipManager(cluster, ipAddress, port, id, settings, gossipMembers, listener);
|
||||
}
|
||||
|
||||
public void start() {
|
||||
|
@ -43,9 +43,9 @@ public class LocalGossipMember extends GossipMember {
|
||||
* @param cleanupTimeout
|
||||
* The cleanup timeout for this gossip member.
|
||||
*/
|
||||
public LocalGossipMember(String hostname, int port, String id, long heartbeat,
|
||||
public LocalGossipMember(String clusterName, String hostname, int port, String id, long heartbeat,
|
||||
NotificationListener notificationListener, int cleanupTimeout) {
|
||||
super(hostname, port, id, heartbeat);
|
||||
super(clusterName, hostname, port, id, heartbeat);
|
||||
|
||||
this.timeoutTimer = new GossipTimeoutTimer(cleanupTimeout, notificationListener, this);
|
||||
}
|
||||
|
@ -35,8 +35,8 @@ public class RemoteGossipMember extends GossipMember {
|
||||
* @param heartbeat
|
||||
* The current heartbeat.
|
||||
*/
|
||||
public RemoteGossipMember(String hostname, int port, String id, long heartbeat) {
|
||||
super(hostname, port, id, heartbeat);
|
||||
public RemoteGossipMember(String clusterName, String hostname, int port, String id, long heartbeat) {
|
||||
super(clusterName, hostname, port, id, heartbeat);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -47,7 +47,7 @@ public class RemoteGossipMember extends GossipMember {
|
||||
* @param port
|
||||
* The port number.
|
||||
*/
|
||||
public RemoteGossipMember(String hostname, int port, String id) {
|
||||
super(hostname, port, id, System.currentTimeMillis());
|
||||
public RemoteGossipMember(String clusterName, String hostname, int port, String id) {
|
||||
super(clusterName, hostname, port, id, System.currentTimeMillis());
|
||||
}
|
||||
}
|
||||
|
@ -44,6 +44,8 @@ public class StartupSettings {
|
||||
/** The port to start the gossip service on. */
|
||||
private int _port;
|
||||
|
||||
private String cluster;
|
||||
|
||||
/** The gossip settings used at startup. */
|
||||
private final GossipSettings _gossipSettings;
|
||||
|
||||
@ -79,6 +81,14 @@ public class StartupSettings {
|
||||
_gossipMembers = new ArrayList<>();
|
||||
}
|
||||
|
||||
public void setCluster(String cluster){
|
||||
this.cluster = cluster;
|
||||
}
|
||||
|
||||
public String getCluster(){
|
||||
return this.cluster;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the id to be used for this service.
|
||||
*
|
||||
@ -192,7 +202,7 @@ public class StartupSettings {
|
||||
JSONArray membersJSON = jsonObject.getJSONArray("members");
|
||||
for (int i = 0; i < membersJSON.length(); i++) {
|
||||
JSONObject memberJSON = membersJSON.getJSONObject(i);
|
||||
RemoteGossipMember member = new RemoteGossipMember(memberJSON.getString("host"),
|
||||
RemoteGossipMember member = new RemoteGossipMember(memberJSON.getString("cluster"), memberJSON.getString("host"),
|
||||
memberJSON.getInt("port"), "");
|
||||
settings.addGossipMember(member);
|
||||
configMembersDetails += member.getAddress();
|
||||
|
@ -63,18 +63,20 @@ public class GossipExample extends Thread {
|
||||
// Get my ip address.
|
||||
String myIpAddress = InetAddress.getLocalHost().getHostAddress();
|
||||
|
||||
String cluster = "My Gossip Cluster";
|
||||
|
||||
// Create the gossip members and put them in a list and give them a port number starting with
|
||||
// 2000.
|
||||
List<GossipMember> startupMembers = new ArrayList<>();
|
||||
for (int i = 0; i < NUMBER_OF_CLIENTS; ++i) {
|
||||
startupMembers.add(new RemoteGossipMember(myIpAddress, 2000 + i, ""));
|
||||
startupMembers.add(new RemoteGossipMember(cluster, myIpAddress, 2000 + i, ""));
|
||||
}
|
||||
|
||||
// Lets start the gossip clients.
|
||||
// Start the clients, waiting cleaning-interval + 1 second between them which will show the
|
||||
// dead list handling.
|
||||
for (GossipMember member : startupMembers) {
|
||||
GossipService gossipService = new GossipService(myIpAddress, member.getPort(), "",
|
||||
GossipService gossipService = new GossipService(cluster, myIpAddress, member.getPort(), "",
|
||||
startupMembers, settings, null);
|
||||
clients.add(gossipService);
|
||||
gossipService.start();
|
||||
|
@ -57,17 +57,17 @@ public abstract class GossipManager extends Thread implements NotificationListen
|
||||
private ExecutorService _gossipThreadExecutor;
|
||||
|
||||
public GossipManager(Class<? extends PassiveGossipThread> passiveGossipThreadClass,
|
||||
Class<? extends ActiveGossipThread> activeGossipThreadClass, String address, int port,
|
||||
Class<? extends ActiveGossipThread> activeGossipThreadClass, String cluster, String address, int port,
|
||||
String id, GossipSettings settings, List<GossipMember> gossipMembers,
|
||||
GossipListener listener) {
|
||||
_passiveGossipThreadClass = passiveGossipThreadClass;
|
||||
_activeGossipThreadClass = activeGossipThreadClass;
|
||||
_settings = settings;
|
||||
_me = new LocalGossipMember(address, port, id, System.currentTimeMillis(), this, settings.getCleanupInterval());
|
||||
_me = new LocalGossipMember(cluster, address, port, id, System.currentTimeMillis(), this, settings.getCleanupInterval());
|
||||
members = new ConcurrentSkipListMap<>();
|
||||
for (GossipMember startupMember : gossipMembers) {
|
||||
if (!startupMember.equals(_me)) {
|
||||
LocalGossipMember member = new LocalGossipMember(startupMember.getHost(),
|
||||
LocalGossipMember member = new LocalGossipMember(startupMember.getClusterName(), startupMember.getHost(),
|
||||
startupMember.getPort(), startupMember.getId(), System.currentTimeMillis(), this,
|
||||
settings.getCleanupInterval());
|
||||
members.put(member, GossipState.UP);
|
||||
|
@ -53,6 +53,8 @@ abstract public class PassiveGossipThread implements Runnable {
|
||||
|
||||
private AtomicBoolean _keepRunning;
|
||||
|
||||
private final String _cluster;
|
||||
|
||||
public PassiveGossipThread(GossipManager gossipManager) {
|
||||
_gossipManager = gossipManager;
|
||||
try {
|
||||
@ -62,6 +64,7 @@ abstract public class PassiveGossipThread implements Runnable {
|
||||
GossipService.LOGGER.debug("Gossip service successfully initialized on port "
|
||||
+ _gossipManager.getMyself().getPort());
|
||||
GossipService.LOGGER.debug("I am " + _gossipManager.getMyself());
|
||||
_cluster = _gossipManager.getMyself().getClusterName();
|
||||
} catch (SocketException ex) {
|
||||
GossipService.LOGGER.warn(ex);
|
||||
throw new RuntimeException(ex);
|
||||
@ -95,8 +98,9 @@ abstract public class PassiveGossipThread implements Runnable {
|
||||
JSONArray jsonArray = new JSONArray(receivedMessage);
|
||||
for (int i = 0; i < jsonArray.length(); i++) {
|
||||
JSONObject memberJSONObject = jsonArray.getJSONObject(i);
|
||||
if (memberJSONObject.length() == 4) {
|
||||
if (memberJSONObject.length() == 5 && _cluster.equals(memberJSONObject.get(GossipMember.JSON_CLUSTER))) {
|
||||
RemoteGossipMember member = new RemoteGossipMember(
|
||||
memberJSONObject.getString(GossipMember.JSON_CLUSTER),
|
||||
memberJSONObject.getString(GossipMember.JSON_HOST),
|
||||
memberJSONObject.getInt(GossipMember.JSON_PORT),
|
||||
memberJSONObject.getString(GossipMember.JSON_ID),
|
||||
@ -108,9 +112,11 @@ abstract public class PassiveGossipThread implements Runnable {
|
||||
senderMember = member;
|
||||
}
|
||||
remoteGossipMembers.add(member);
|
||||
} else if(memberJSONObject.length() == 5) {
|
||||
GossipService.LOGGER.warn("The member object does not belong to this cluster.");
|
||||
} else {
|
||||
GossipService.LOGGER
|
||||
.error("The received member object does not contain 4 objects:\n"
|
||||
.error("The received member object does not contain 5 objects:\n"
|
||||
+ memberJSONObject.toString());
|
||||
}
|
||||
|
||||
|
@ -48,7 +48,7 @@ public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread
|
||||
for (LocalGossipMember i : gossipManager.getDeadList()){
|
||||
if (i.getId().equals(senderMember.getId())){
|
||||
System.out.println(gossipManager.getMyself() +" caught a live one!");
|
||||
LocalGossipMember newLocalMember = new LocalGossipMember(senderMember.getHost(),
|
||||
LocalGossipMember newLocalMember = new LocalGossipMember(senderMember.getClusterName(), senderMember.getHost(),
|
||||
senderMember.getPort(), senderMember.getId(), senderMember.getHeartbeat(),
|
||||
gossipManager, gossipManager.getSettings().getCleanupInterval());
|
||||
gossipManager.revivieMember(newLocalMember);
|
||||
@ -68,7 +68,7 @@ public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread
|
||||
}
|
||||
} else if (!gossipManager.getMemberList().contains(remoteMember)
|
||||
&& !gossipManager.getDeadList().contains(remoteMember) ){
|
||||
LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getHost(),
|
||||
LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getClusterName(), remoteMember.getHost(),
|
||||
remoteMember.getPort(), remoteMember.getId(), remoteMember.getHeartbeat(),
|
||||
gossipManager, gossipManager.getSettings().getCleanupInterval());
|
||||
gossipManager.createOrRevivieMember(newLocalMember);
|
||||
@ -78,7 +78,7 @@ public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread
|
||||
LocalGossipMember localDeadMember = gossipManager.getDeadList().get(
|
||||
gossipManager.getDeadList().indexOf(remoteMember));
|
||||
if (remoteMember.getHeartbeat() > localDeadMember.getHeartbeat()) {
|
||||
LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getHost(),
|
||||
LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getClusterName(), remoteMember.getHost(),
|
||||
remoteMember.getPort(), remoteMember.getId(), remoteMember.getHeartbeat(),
|
||||
gossipManager, gossipManager.getSettings().getCleanupInterval());
|
||||
gossipManager.revivieMember(newLocalMember);
|
||||
|
@ -26,9 +26,9 @@ import com.google.code.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThrea
|
||||
import java.util.List;
|
||||
|
||||
public class RandomGossipManager extends GossipManager {
|
||||
public RandomGossipManager(String address, int port, String id, GossipSettings settings,
|
||||
public RandomGossipManager(String cluster, String address, int port, String id, GossipSettings settings,
|
||||
List<GossipMember> gossipMembers, GossipListener listener) {
|
||||
super(OnlyProcessReceivedPassiveGossipThread.class, RandomActiveGossipThread.class, address,
|
||||
super(OnlyProcessReceivedPassiveGossipThread.class, RandomActiveGossipThread.class, cluster, address,
|
||||
port, id, settings, gossipMembers, listener);
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user