Merge pull request #1 from edwardcapriolo/add_id

Add
This commit is contained in:
edwardcapriolo
2015-01-12 10:48:45 -05:00
11 changed files with 46 additions and 33 deletions

View File

@ -17,6 +17,8 @@ public abstract class GossipMember {
public static final String JSON_PORT = "port";
/** The JSON key for the heartbeat property. */
public static final String JSON_HEARTBEAT = "heartbeat";
public static final String JSON_ID = "id";
/** The hostname or IP address of this gossip member. */
protected String _host;
@ -27,15 +29,18 @@ public abstract class GossipMember {
/** The current heartbeat of this gossip member. */
protected int _heartbeat;
protected String _id;
/**
* Constructor.
* @param host The hostname or IP address.
* @param port The port number.
* @param heartbeat The current heartbeat.
*/
public GossipMember(String host, int port, int heartbeat) {
public GossipMember(String host, int port, String id, int heartbeat) {
_host = host;
_port = port;
_id = id;
_heartbeat = heartbeat;
}
@ -78,14 +83,20 @@ public abstract class GossipMember {
public void setHeartbeat(int heartbeat) {
this._heartbeat = heartbeat;
}
public String getId() {
return _id;
}
public void setId(String _id) {
this._id = _id;
}
/**
* @see java.lang.Object#toString()
*/
public String toString() {
return "Member [address=" + getAddress() + ", heartbeat=" + _heartbeat + "]";
return "Member [address=" + getAddress() + ", id=" + _id + ", heartbeat=" + _heartbeat + "]";
}
/**
* @see java.lang.Object#hashCode()
*/
@ -128,6 +139,7 @@ public abstract class GossipMember {
JSONObject jsonObject = new JSONObject();
jsonObject.put(JSON_HOST, _host);
jsonObject.put(JSON_PORT, _port);
jsonObject.put(JSON_ID, _id);
jsonObject.put(JSON_HEARTBEAT, _heartbeat);
return jsonObject;
} catch (JSONException e) {

View File

@ -25,7 +25,7 @@ public class GossipService {
* @throws UnknownHostException
*/
public GossipService(StartupSettings startupSettings) throws InterruptedException, UnknownHostException {
this(InetAddress.getLocalHost().getHostAddress(), startupSettings.getPort(), startupSettings.getLogLevel(), startupSettings.getGossipMembers(), startupSettings.getGossipSettings());
this(InetAddress.getLocalHost().getHostAddress(), startupSettings.getPort(), "", startupSettings.getLogLevel(), startupSettings.getGossipMembers(), startupSettings.getGossipSettings());
}
/**
@ -34,8 +34,8 @@ public class GossipService {
* @throws InterruptedException
* @throws UnknownHostException
*/
public GossipService(String ipAddress, int port, int logLevel, ArrayList<GossipMember> gossipMembers, GossipSettings settings) throws InterruptedException, UnknownHostException {
_gossipManager = new RandomGossipManager(ipAddress, port, settings, gossipMembers);
public GossipService(String ipAddress, int port, String id, int logLevel, ArrayList<GossipMember> gossipMembers, GossipSettings settings) throws InterruptedException, UnknownHostException {
_gossipManager = new RandomGossipManager(ipAddress, port, id, settings, gossipMembers);
}
public void start() {

View File

@ -20,8 +20,8 @@ public class LocalGossipMember extends GossipMember {
* @param gossipService The GossipService object.
* @param cleanupTimeout The cleanup timeout for this gossip member.
*/
public LocalGossipMember(String hostname, int port, int heartbeat, NotificationListener notificationListener, int cleanupTimeout) {
super(hostname, port, heartbeat);
public LocalGossipMember(String hostname, int port, String id, int heartbeat, NotificationListener notificationListener, int cleanupTimeout) {
super(hostname, port, id, heartbeat);
this.timeoutTimer = new GossipTimeoutTimer(cleanupTimeout, notificationListener, this);
}

View File

@ -13,8 +13,8 @@ public class RemoteGossipMember extends GossipMember {
* @param port The port number.
* @param heartbeat The current heartbeat.
*/
public RemoteGossipMember(String hostname, int port, int heartbeat) {
super(hostname, port, heartbeat);
public RemoteGossipMember(String hostname, int port, String id, int heartbeat) {
super(hostname, port, id, heartbeat);
}
/**
@ -22,7 +22,7 @@ public class RemoteGossipMember extends GossipMember {
* @param host The hostname or IP address.
* @param port The port number.
*/
public RemoteGossipMember(String hostname, int port) {
super(hostname, port, 0);
public RemoteGossipMember(String hostname, int port, String id) {
super(hostname, port, id, 0);
}
}

View File

@ -147,7 +147,7 @@ public class StartupSettings {
JSONArray membersJSON = jsonObject.getJSONArray("members");
for (int i=0; i<membersJSON.length(); i++) {
JSONObject memberJSON = membersJSON.getJSONObject(i);
RemoteGossipMember member = new RemoteGossipMember(memberJSON.getString("host"), memberJSON.getInt("port"));
RemoteGossipMember member = new RemoteGossipMember(memberJSON.getString("host"), memberJSON.getInt("port"), "");
settings.addGossipMember(member);
System.out.print(member.getAddress());
if (i < (membersJSON.length() - 1))

View File

@ -51,13 +51,13 @@ public class GossipExample extends Thread {
// Create the gossip members and put them in a list and give them a port number starting with 2000.
ArrayList<GossipMember> startupMembers = new ArrayList<GossipMember>();
for (int i=0; i<NUMBER_OF_CLIENTS; ++i) {
startupMembers.add(new RemoteGossipMember(myIpAddress, 2000+i));
startupMembers.add(new RemoteGossipMember(myIpAddress, 2000+i, ""));
}
// Lets start the gossip clients.
// Start the clients, waiting cleaning-interval + 1 second between them which will show the dead list handling.
for (GossipMember member : startupMembers) {
GossipService gossipService = new GossipService(myIpAddress, member.getPort(), LogLevel.DEBUG, startupMembers, settings);
GossipService gossipService = new GossipService(myIpAddress, member.getPort(), "", LogLevel.DEBUG, startupMembers, settings);
clients.add(gossipService);
gossipService.start();
sleep(settings.getCleanupInterval() + 1000);

View File

@ -41,20 +41,20 @@ public abstract class GossipManager extends Thread implements NotificationListen
private Class<? extends ActiveGossipThread> _activeGossipThreadClass;
public GossipManager(Class<? extends PassiveGossipThread> passiveGossipThreadClass, Class<? extends ActiveGossipThread> activeGossipThreadClass, String address, int port, GossipSettings settings, ArrayList<GossipMember> gossipMembers) {
public GossipManager(Class<? extends PassiveGossipThread> passiveGossipThreadClass,
Class<? extends ActiveGossipThread> activeGossipThreadClass, String address, int port,
String id, GossipSettings settings, ArrayList<GossipMember> gossipMembers) {
_passiveGossipThreadClass = passiveGossipThreadClass;
_activeGossipThreadClass = activeGossipThreadClass;
_settings = settings;
_me = new LocalGossipMember(address, port, 0, this, settings.getCleanupInterval());
_me = new LocalGossipMember(address, port, id, 0, this, settings.getCleanupInterval());
_memberList = new ArrayList<LocalGossipMember>();
_deadList = new ArrayList<LocalGossipMember>();
GossipService.LOGGER.debug("Startup member list:");
GossipService.LOGGER.debug("---------------------");
GossipService.LOGGER.debug(_me);
for (GossipMember startupMember : gossipMembers) {
if (!startupMember.equals(_me)) {
LocalGossipMember member = new LocalGossipMember(startupMember.getHost(), startupMember.getPort(), 0, this, settings.getCleanupInterval());
LocalGossipMember member = new LocalGossipMember(startupMember.getHost(),
startupMember.getPort(), startupMember.getId(), 0, this,
settings.getCleanupInterval());
_memberList.add(member);
GossipService.LOGGER.debug(member);
}

View File

@ -86,10 +86,11 @@ abstract public class PassiveGossipThread implements Runnable {
JSONArray jsonArray = new JSONArray(receivedMessage);
for (int i = 0; i < jsonArray.length(); i++) {
JSONObject memberJSONObject = jsonArray.getJSONObject(i);
if (memberJSONObject.length() == 3) {
if (memberJSONObject.length() == 4) {
RemoteGossipMember member = new RemoteGossipMember(
memberJSONObject.getString(GossipMember.JSON_HOST),
memberJSONObject.getInt(GossipMember.JSON_PORT),
memberJSONObject.getString(GossipMember.JSON_ID),
memberJSONObject.getInt(GossipMember.JSON_HEARTBEAT));
GossipService.LOGGER.debug(member.toString());
// This is the first member found, so this should be the member who is communicating
@ -100,7 +101,7 @@ abstract public class PassiveGossipThread implements Runnable {
remoteGossipMembers.add(member);
} else {
GossipService.LOGGER
.error("The received member object does not contain 3 objects:\n"
.error("The received member object does not contain 4 objects:\n"
+ memberJSONObject.toString());
}

View File

@ -71,14 +71,14 @@ public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread
// Remove it from the dead list.
gossipManager.getDeadList().remove(localDeadMember);
// Add it as a new member and add it to the member list.
LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getHost(), remoteMember.getPort(), remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings().getCleanupInterval());
LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getHost(), remoteMember.getPort(), remoteMember.getId(), remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings().getCleanupInterval());
gossipManager.getMemberList().add(newLocalMember);
newLocalMember.startTimeoutTimer();
GossipService.LOGGER.info("Removed remote member " + remoteMember.getAddress() + " from dead list and added to local member list.");
}
} else {
// Brand spanking new member - welcome.
LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getHost(), remoteMember.getPort(), remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings().getCleanupInterval());
LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getHost(), remoteMember.getPort(), remoteMember.getId(), remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings().getCleanupInterval());
gossipManager.getMemberList().add(newLocalMember);
newLocalMember.startTimeoutTimer();
GossipService.LOGGER.info("Added new remote member " + remoteMember.getAddress() + " to local member list.");

View File

@ -8,7 +8,7 @@ import com.google.code.gossip.manager.GossipManager;
import com.google.code.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread;
public class RandomGossipManager extends GossipManager {
public RandomGossipManager(String address, int port, GossipSettings settings, ArrayList<GossipMember> gossipMembers) {
super(OnlyProcessReceivedPassiveGossipThread.class, RandomActiveGossipThread.class, address, port, settings, gossipMembers);
public RandomGossipManager(String address, int port, String id, GossipSettings settings, ArrayList<GossipMember> gossipMembers) {
super(OnlyProcessReceivedPassiveGossipThread.class, RandomActiveGossipThread.class, address, port, id, settings, gossipMembers);
}
}

View File

@ -22,12 +22,12 @@ public class TenNodeThreeSeedTest {
int seedNodes = 3;
ArrayList<GossipMember> startupMembers = new ArrayList<GossipMember>();
for (int i = 1; i < seedNodes+1; ++i) {
startupMembers.add(new RemoteGossipMember("127.0.0." + i, 2000));
startupMembers.add(new RemoteGossipMember("127.0.0." + i, 2000, i+""));
}
ArrayList<GossipService> clients = new ArrayList<GossipService>();
int clusterMembers = 5;
for (int i = 1; i < clusterMembers+1; ++i) {
GossipService gossipService = new GossipService("127.0.0."+i, 2000, LogLevel.DEBUG, startupMembers, settings);
GossipService gossipService = new GossipService("127.0.0."+i, 2000, i+"", LogLevel.DEBUG, startupMembers, settings);
clients.add(gossipService);
gossipService.start();
Thread.sleep(1000);