WIP on configurable cluster names

This commit is contained in:
P. Taylor Goetz
2016-04-26 17:08:56 -04:00
parent bea44ea12e
commit 723c0a050b
13 changed files with 71 additions and 32 deletions

View File

@ -33,9 +33,11 @@ public abstract class GossipMember implements Comparable<GossipMember>{
public static final String JSON_PORT = "port";
public static final String JSON_HEARTBEAT = "heartbeat";
public static final String JSON_ID = "id";
public static final String JSON_CLUSTER = "cluster";
protected final String _host;
protected final int _port;
protected volatile long _heartbeat;
protected final String _clusterName;
/**
* The purpose of the id field is to be able for nodes to identify themselves beyond there host/port. For example
* an application might generate a persistent id so if they rejoin the cluster at a different host and port we
@ -50,13 +52,23 @@ public abstract class GossipMember implements Comparable<GossipMember>{
* @param heartbeat The current heartbeat.
* @param id an id that may be replaced after contact
*/
public GossipMember(String host, int port, String id, long heartbeat) {
public GossipMember(String clusterName, String host, int port, String id, long heartbeat) {
_clusterName = clusterName;
_host = host;
_port = port;
_id = id;
_heartbeat = heartbeat;
}
/**
* Get the name of the cluster the member belongs to.
*
* @return The cluster name
*/
public String getClusterName(){
return _clusterName;
}
/**
* Get the hostname or IP address of the remote gossip member.
* @return The hostname or IP address.
@ -119,7 +131,8 @@ public abstract class GossipMember implements Comparable<GossipMember>{
int result = 1;
String address = getAddress();
result = prime * result
+ ((address == null) ? 0 : address.hashCode());
+ ((address == null) ? 0 : address.hashCode())
+ _clusterName == null ? 0 : _clusterName.hashCode();
return result;
}
@ -140,7 +153,8 @@ public abstract class GossipMember implements Comparable<GossipMember>{
return false;
}
// The object is the same of they both have the same address (hostname and port).
return getAddress().equals(((LocalGossipMember) obj).getAddress());
return getAddress().equals(((LocalGossipMember) obj).getAddress()) &&
getClusterName().equals(((LocalGossipMember) obj).getClusterName());
}
/**
@ -150,6 +164,7 @@ public abstract class GossipMember implements Comparable<GossipMember>{
public JSONObject toJSONObject() {
try {
JSONObject jsonObject = new JSONObject();
jsonObject.put(JSON_CLUSTER, _clusterName);
jsonObject.put(JSON_HOST, _host);
jsonObject.put(JSON_PORT, _port);
jsonObject.put(JSON_ID, _id);

View File

@ -46,7 +46,7 @@ public class GossipService {
*/
public GossipService(StartupSettings startupSettings) throws InterruptedException,
UnknownHostException {
this(InetAddress.getLocalHost().getHostAddress(), startupSettings.getPort(), startupSettings.getId(),
this(startupSettings.getCluster(), InetAddress.getLocalHost().getHostAddress(), startupSettings.getPort(), startupSettings.getId(),
startupSettings.getGossipMembers(), startupSettings
.getGossipSettings(), null);
}
@ -57,10 +57,10 @@ public class GossipService {
* @throws InterruptedException
* @throws UnknownHostException
*/
public GossipService(String ipAddress, int port, String id,
public GossipService(String cluster, String ipAddress, int port, String id,
List<GossipMember> gossipMembers, GossipSettings settings, GossipListener listener)
throws InterruptedException, UnknownHostException {
_gossipManager = new RandomGossipManager(ipAddress, port, id, settings, gossipMembers, listener);
_gossipManager = new RandomGossipManager(cluster, ipAddress, port, id, settings, gossipMembers, listener);
}
public void start() {

View File

@ -43,9 +43,9 @@ public class LocalGossipMember extends GossipMember {
* @param cleanupTimeout
* The cleanup timeout for this gossip member.
*/
public LocalGossipMember(String hostname, int port, String id, long heartbeat,
public LocalGossipMember(String clusterName, String hostname, int port, String id, long heartbeat,
NotificationListener notificationListener, int cleanupTimeout) {
super(hostname, port, id, heartbeat);
super(clusterName, hostname, port, id, heartbeat);
this.timeoutTimer = new GossipTimeoutTimer(cleanupTimeout, notificationListener, this);
}

View File

@ -35,8 +35,8 @@ public class RemoteGossipMember extends GossipMember {
* @param heartbeat
* The current heartbeat.
*/
public RemoteGossipMember(String hostname, int port, String id, long heartbeat) {
super(hostname, port, id, heartbeat);
public RemoteGossipMember(String clusterName, String hostname, int port, String id, long heartbeat) {
super(clusterName, hostname, port, id, heartbeat);
}
/**
@ -47,7 +47,7 @@ public class RemoteGossipMember extends GossipMember {
* @param port
* The port number.
*/
public RemoteGossipMember(String hostname, int port, String id) {
super(hostname, port, id, System.currentTimeMillis());
public RemoteGossipMember(String clusterName, String hostname, int port, String id) {
super(clusterName, hostname, port, id, System.currentTimeMillis());
}
}

View File

@ -44,6 +44,8 @@ public class StartupSettings {
/** The port to start the gossip service on. */
private int _port;
private String cluster;
/** The gossip settings used at startup. */
private final GossipSettings _gossipSettings;
@ -79,6 +81,14 @@ public class StartupSettings {
_gossipMembers = new ArrayList<>();
}
public void setCluster(String cluster){
this.cluster = cluster;
}
public String getCluster(){
return this.cluster;
}
/**
* Set the id to be used for this service.
*
@ -192,7 +202,7 @@ public class StartupSettings {
JSONArray membersJSON = jsonObject.getJSONArray("members");
for (int i = 0; i < membersJSON.length(); i++) {
JSONObject memberJSON = membersJSON.getJSONObject(i);
RemoteGossipMember member = new RemoteGossipMember(memberJSON.getString("host"),
RemoteGossipMember member = new RemoteGossipMember(memberJSON.getString("cluster"), memberJSON.getString("host"),
memberJSON.getInt("port"), "");
settings.addGossipMember(member);
configMembersDetails += member.getAddress();

View File

@ -63,18 +63,20 @@ public class GossipExample extends Thread {
// Get my ip address.
String myIpAddress = InetAddress.getLocalHost().getHostAddress();
String cluster = "My Gossip Cluster";
// Create the gossip members and put them in a list and give them a port number starting with
// 2000.
List<GossipMember> startupMembers = new ArrayList<>();
for (int i = 0; i < NUMBER_OF_CLIENTS; ++i) {
startupMembers.add(new RemoteGossipMember(myIpAddress, 2000 + i, ""));
startupMembers.add(new RemoteGossipMember(cluster, myIpAddress, 2000 + i, ""));
}
// Lets start the gossip clients.
// Start the clients, waiting cleaning-interval + 1 second between them which will show the
// dead list handling.
for (GossipMember member : startupMembers) {
GossipService gossipService = new GossipService(myIpAddress, member.getPort(), "",
GossipService gossipService = new GossipService(cluster, myIpAddress, member.getPort(), "",
startupMembers, settings, null);
clients.add(gossipService);
gossipService.start();

View File

@ -57,17 +57,17 @@ public abstract class GossipManager extends Thread implements NotificationListen
private ExecutorService _gossipThreadExecutor;
public GossipManager(Class<? extends PassiveGossipThread> passiveGossipThreadClass,
Class<? extends ActiveGossipThread> activeGossipThreadClass, String address, int port,
Class<? extends ActiveGossipThread> activeGossipThreadClass, String cluster, String address, int port,
String id, GossipSettings settings, List<GossipMember> gossipMembers,
GossipListener listener) {
_passiveGossipThreadClass = passiveGossipThreadClass;
_activeGossipThreadClass = activeGossipThreadClass;
_settings = settings;
_me = new LocalGossipMember(address, port, id, System.currentTimeMillis(), this, settings.getCleanupInterval());
_me = new LocalGossipMember(cluster, address, port, id, System.currentTimeMillis(), this, settings.getCleanupInterval());
members = new ConcurrentSkipListMap<>();
for (GossipMember startupMember : gossipMembers) {
if (!startupMember.equals(_me)) {
LocalGossipMember member = new LocalGossipMember(startupMember.getHost(),
LocalGossipMember member = new LocalGossipMember(startupMember.getClusterName(), startupMember.getHost(),
startupMember.getPort(), startupMember.getId(), System.currentTimeMillis(), this,
settings.getCleanupInterval());
members.put(member, GossipState.UP);

View File

@ -53,6 +53,8 @@ abstract public class PassiveGossipThread implements Runnable {
private AtomicBoolean _keepRunning;
private final String _cluster;
public PassiveGossipThread(GossipManager gossipManager) {
_gossipManager = gossipManager;
try {
@ -62,6 +64,7 @@ abstract public class PassiveGossipThread implements Runnable {
GossipService.LOGGER.debug("Gossip service successfully initialized on port "
+ _gossipManager.getMyself().getPort());
GossipService.LOGGER.debug("I am " + _gossipManager.getMyself());
_cluster = _gossipManager.getMyself().getClusterName();
} catch (SocketException ex) {
GossipService.LOGGER.warn(ex);
_server = null;
@ -96,8 +99,9 @@ abstract public class PassiveGossipThread implements Runnable {
JSONArray jsonArray = new JSONArray(receivedMessage);
for (int i = 0; i < jsonArray.length(); i++) {
JSONObject memberJSONObject = jsonArray.getJSONObject(i);
if (memberJSONObject.length() == 4) {
if (memberJSONObject.length() == 5 && _cluster.equals(memberJSONObject.get(GossipMember.JSON_CLUSTER))) {
RemoteGossipMember member = new RemoteGossipMember(
memberJSONObject.getString(GossipMember.JSON_CLUSTER),
memberJSONObject.getString(GossipMember.JSON_HOST),
memberJSONObject.getInt(GossipMember.JSON_PORT),
memberJSONObject.getString(GossipMember.JSON_ID),
@ -109,9 +113,11 @@ abstract public class PassiveGossipThread implements Runnable {
senderMember = member;
}
remoteGossipMembers.add(member);
} else if(memberJSONObject.length() == 5) {
GossipService.LOGGER.warn("The member object does not belong to this cluster.");
} else {
GossipService.LOGGER
.error("The received member object does not contain 4 objects:\n"
.error("The received member object does not contain 5 objects:\n"
+ memberJSONObject.toString());
}

View File

@ -48,7 +48,7 @@ public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread
for (LocalGossipMember i : gossipManager.getDeadList()){
if (i.getId().equals(senderMember.getId())){
System.out.println(gossipManager.getMyself() +" caught a live one!");
LocalGossipMember newLocalMember = new LocalGossipMember(senderMember.getHost(),
LocalGossipMember newLocalMember = new LocalGossipMember(senderMember.getClusterName(), senderMember.getHost(),
senderMember.getPort(), senderMember.getId(), senderMember.getHeartbeat(),
gossipManager, gossipManager.getSettings().getCleanupInterval());
gossipManager.revivieMember(newLocalMember);
@ -68,7 +68,7 @@ public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread
}
} else if (!gossipManager.getMemberList().contains(remoteMember)
&& !gossipManager.getDeadList().contains(remoteMember) ){
LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getHost(),
LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getClusterName(), remoteMember.getHost(),
remoteMember.getPort(), remoteMember.getId(), remoteMember.getHeartbeat(),
gossipManager, gossipManager.getSettings().getCleanupInterval());
gossipManager.createOrRevivieMember(newLocalMember);
@ -78,7 +78,7 @@ public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread
LocalGossipMember localDeadMember = gossipManager.getDeadList().get(
gossipManager.getDeadList().indexOf(remoteMember));
if (remoteMember.getHeartbeat() > localDeadMember.getHeartbeat()) {
LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getHost(),
LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getClusterName(), remoteMember.getHost(),
remoteMember.getPort(), remoteMember.getId(), remoteMember.getHeartbeat(),
gossipManager, gossipManager.getSettings().getCleanupInterval());
gossipManager.revivieMember(newLocalMember);

View File

@ -26,9 +26,9 @@ import com.google.code.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThrea
import java.util.List;
public class RandomGossipManager extends GossipManager {
public RandomGossipManager(String address, int port, String id, GossipSettings settings,
public RandomGossipManager(String cluster, String address, int port, String id, GossipSettings settings,
List<GossipMember> gossipMembers, GossipListener listener) {
super(OnlyProcessReceivedPassiveGossipThread.class, RandomActiveGossipThread.class, address,
super(OnlyProcessReceivedPassiveGossipThread.class, RandomActiveGossipThread.class, cluster, address,
port, id, settings, gossipMembers, listener);
}
}