Merge pull request #2 from edwardcapriolo/dos2unix

Dos2unix
This commit is contained in:
edwardcapriolo
2015-01-17 18:24:42 -05:00
17 changed files with 1401 additions and 1307 deletions

View File

@ -40,7 +40,8 @@ public class GossipRunner {
System.err.println("Error while starting the gossip service: " + e.getMessage()); System.err.println("Error while starting the gossip service: " + e.getMessage());
} }
} else { } else {
System.out.println("The gossip.conf file is not found.\n\nEither specify the path to the startup settings file or place the gossip.json file in the same folder as the JAR file."); System.out
.println("The gossip.conf file is not found.\n\nEither specify the path to the startup settings file or place the gossip.json file in the same folder as the JAR file.");
} }
} }
} }

View File

@ -17,24 +17,32 @@ import com.google.code.gossip.manager.random.RandomGossipManager;
public class GossipService { public class GossipService {
public static final Logger LOGGER = Logger.getLogger(GossipService.class); public static final Logger LOGGER = Logger.getLogger(GossipService.class);
private GossipManager _gossipManager; private GossipManager _gossipManager;
/** /**
* Constructor with the default settings. * Constructor with the default settings.
*
* @throws InterruptedException * @throws InterruptedException
* @throws UnknownHostException * @throws UnknownHostException
*/ */
public GossipService(StartupSettings startupSettings) throws InterruptedException, UnknownHostException { public GossipService(StartupSettings startupSettings) throws InterruptedException,
this(InetAddress.getLocalHost().getHostAddress(), startupSettings.getPort(), "", startupSettings.getLogLevel(), startupSettings.getGossipMembers(), startupSettings.getGossipSettings()); UnknownHostException {
this(InetAddress.getLocalHost().getHostAddress(), startupSettings.getPort(), "",
startupSettings.getLogLevel(), startupSettings.getGossipMembers(), startupSettings
.getGossipSettings());
} }
/** /**
* Setup the client's lists, gossiping parameters, and parse the startup config file. * Setup the client's lists, gossiping parameters, and parse the startup config file.
*
* @throws SocketException * @throws SocketException
* @throws InterruptedException * @throws InterruptedException
* @throws UnknownHostException * @throws UnknownHostException
*/ */
public GossipService(String ipAddress, int port, String id, int logLevel, ArrayList<GossipMember> gossipMembers, GossipSettings settings) throws InterruptedException, UnknownHostException { public GossipService(String ipAddress, int port, String id, int logLevel,
ArrayList<GossipMember> gossipMembers, GossipSettings settings)
throws InterruptedException, UnknownHostException {
_gossipManager = new RandomGossipManager(ipAddress, port, id, settings, gossipMembers); _gossipManager = new RandomGossipManager(ipAddress, port, id, settings, gossipMembers);
} }
@ -54,5 +62,4 @@ public class GossipService {
this._gossipManager = _gossipManager; this._gossipManager = _gossipManager;
} }
} }

View File

@ -16,12 +16,16 @@ public class GossipSettings {
/** /**
* Construct GossipSettings with default settings. * Construct GossipSettings with default settings.
*/ */
public GossipSettings() {} public GossipSettings() {
}
/** /**
* Construct GossipSettings with given settings. * Construct GossipSettings with given settings.
* @param gossipInterval The gossip interval in ms. *
* @param cleanupInterval The cleanup interval in ms. * @param gossipInterval
* The gossip interval in ms.
* @param cleanupInterval
* The cleanup interval in ms.
*/ */
public GossipSettings(int gossipInterval, int cleanupInterval) { public GossipSettings(int gossipInterval, int cleanupInterval) {
_gossipInterval = gossipInterval; _gossipInterval = gossipInterval;
@ -29,18 +33,21 @@ public class GossipSettings {
} }
/** /**
* Set the gossip interval. * Set the gossip interval. This is the time between a gossip message is send.
* This is the time between a gossip message is send. *
* @param gossipInterval The gossip interval in ms. * @param gossipInterval
* The gossip interval in ms.
*/ */
public void setGossipTimeout(int gossipInterval) { public void setGossipTimeout(int gossipInterval) {
_gossipInterval = gossipInterval; _gossipInterval = gossipInterval;
} }
/** /**
* Set the cleanup interval. * Set the cleanup interval. This is the time between the last heartbeat received from a member
* This is the time between the last heartbeat received from a member and when it will be marked as dead. * and when it will be marked as dead.
* @param cleanupInterval The cleanup interval in ms. *
* @param cleanupInterval
* The cleanup interval in ms.
*/ */
public void setCleanupInterval(int cleanupInterval) { public void setCleanupInterval(int cleanupInterval) {
_cleanupInterval = cleanupInterval; _cleanupInterval = cleanupInterval;
@ -48,6 +55,7 @@ public class GossipSettings {
/** /**
* Get the gossip interval. * Get the gossip interval.
*
* @return The gossip interval in ms. * @return The gossip interval in ms.
*/ */
public int getGossipInterval() { public int getGossipInterval() {
@ -56,6 +64,7 @@ public class GossipSettings {
/** /**
* Get the clean interval. * Get the clean interval.
*
* @return The cleanup interval. * @return The cleanup interval.
*/ */
public int getCleanupInterval() { public int getCleanupInterval() {

View File

@ -6,9 +6,9 @@ import javax.management.NotificationListener;
import javax.management.timer.Timer; import javax.management.timer.Timer;
/** /**
* This object represents a timer for a gossip member. * This object represents a timer for a gossip member. When the timer has elapsed without being
* When the timer has elapsed without being reset in the meantime, it will inform the GossipService about this * reset in the meantime, it will inform the GossipService about this who in turn will put the
* who in turn will put the gossip member on the dead list, because it is apparantly not alive anymore. * gossip member on the dead list, because it is apparantly not alive anymore.
* *
* @author joshclemm, harmenw * @author joshclemm, harmenw
*/ */
@ -21,13 +21,15 @@ public class GossipTimeoutTimer extends Timer {
private LocalGossipMember _source; private LocalGossipMember _source;
/** /**
* Constructor. * Constructor. Creates a reset-able timer that wakes up after millisecondsSleepTime.
* Creates a reset-able timer that wakes up after millisecondsSleepTime. *
* @param millisecondsSleepTime The time for this timer to wait before an event. * @param millisecondsSleepTime
* The time for this timer to wait before an event.
* @param service * @param service
* @param member * @param member
*/ */
public GossipTimeoutTimer(long millisecondsSleepTime, NotificationListener notificationListener, LocalGossipMember member) { public GossipTimeoutTimer(long millisecondsSleepTime, NotificationListener notificationListener,
LocalGossipMember member) {
super(); super();
_sleepTime = millisecondsSleepTime; _sleepTime = millisecondsSleepTime;
_source = member; _source = member;
@ -52,10 +54,10 @@ public class GossipTimeoutTimer extends Timer {
/** /**
* Adds a new wake-up time for this timer. * Adds a new wake-up time for this timer.
*
* @param milliseconds * @param milliseconds
*/ */
private void setWakeupTime(long milliseconds) { private void setWakeupTime(long milliseconds) {
addNotification("type", "message", _source, new Date(System.currentTimeMillis() + milliseconds)); addNotification("type", "message", _source, new Date(System.currentTimeMillis() + milliseconds));
} }
} }

View File

@ -3,8 +3,8 @@ package com.google.code.gossip;
import javax.management.NotificationListener; import javax.management.NotificationListener;
/** /**
* This object represent a gossip member with the properties known locally. * This object represent a gossip member with the properties known locally. These objects are stored
* These objects are stored in the local list of gossip member.s * in the local list of gossip member.s
* *
* @author harmenw * @author harmenw
*/ */
@ -14,13 +14,20 @@ public class LocalGossipMember extends GossipMember {
/** /**
* Constructor. * Constructor.
* @param host The hostname or IP address. *
* @param port The port number. * @param host
* @param heartbeat The current heartbeat. * The hostname or IP address.
* @param gossipService The GossipService object. * @param port
* @param cleanupTimeout The cleanup timeout for this gossip member. * The port number.
* @param heartbeat
* The current heartbeat.
* @param gossipService
* The GossipService object.
* @param cleanupTimeout
* The cleanup timeout for this gossip member.
*/ */
public LocalGossipMember(String hostname, int port, String id, int heartbeat, NotificationListener notificationListener, int cleanupTimeout) { public LocalGossipMember(String hostname, int port, String id, int heartbeat,
NotificationListener notificationListener, int cleanupTimeout) {
super(hostname, port, id, heartbeat); super(hostname, port, id, heartbeat);
this.timeoutTimer = new GossipTimeoutTimer(cleanupTimeout, notificationListener, this); this.timeoutTimer = new GossipTimeoutTimer(cleanupTimeout, notificationListener, this);

View File

@ -9,7 +9,9 @@ public class LogLevel {
public static final String CONFIG_DEBUG = "DEBUG"; public static final String CONFIG_DEBUG = "DEBUG";
public static final int ERROR = 1; public static final int ERROR = 1;
public static final int INFO = 2; public static final int INFO = 2;
public static final int DEBUG = 3; public static final int DEBUG = 3;
public static int fromString(String logLevel) { public static int fromString(String logLevel) {

View File

@ -1,7 +1,8 @@
package com.google.code.gossip; package com.google.code.gossip;
/** /**
* The object represents a gossip member with the properties as received from a remote gossip member. * The object represents a gossip member with the properties as received from a remote gossip
* member.
* *
* @author harmenw * @author harmenw
*/ */
@ -9,9 +10,13 @@ public class RemoteGossipMember extends GossipMember {
/** /**
* Constructor. * Constructor.
* @param host The hostname or IP address. *
* @param port The port number. * @param host
* @param heartbeat The current heartbeat. * The hostname or IP address.
* @param port
* The port number.
* @param heartbeat
* The current heartbeat.
*/ */
public RemoteGossipMember(String hostname, int port, String id, int heartbeat) { public RemoteGossipMember(String hostname, int port, String id, int heartbeat) {
super(hostname, port, id, heartbeat); super(hostname, port, id, heartbeat);
@ -19,8 +24,11 @@ public class RemoteGossipMember extends GossipMember {
/** /**
* Construct a RemoteGossipMember with a heartbeat of 0. * Construct a RemoteGossipMember with a heartbeat of 0.
* @param host The hostname or IP address. *
* @param port The port number. * @param host
* The hostname or IP address.
* @param port
* The port number.
*/ */
public RemoteGossipMember(String hostname, int port, String id) { public RemoteGossipMember(String hostname, int port, String id) {
super(hostname, port, id, 0); super(hostname, port, id, 0);

View File

@ -32,7 +32,9 @@ public class StartupSettings {
/** /**
* Constructor. * Constructor.
* @param port The port to start the service on. *
* @param port
* The port to start the service on.
*/ */
public StartupSettings(int port, int logLevel) { public StartupSettings(int port, int logLevel) {
this(port, logLevel, new GossipSettings()); this(port, logLevel, new GossipSettings());
@ -40,7 +42,9 @@ public class StartupSettings {
/** /**
* Constructor. * Constructor.
* @param port The port to start the service on. *
* @param port
* The port to start the service on.
*/ */
public StartupSettings(int port, int logLevel, GossipSettings gossipSettings) { public StartupSettings(int port, int logLevel, GossipSettings gossipSettings) {
_port = port; _port = port;
@ -51,7 +55,9 @@ public class StartupSettings {
/** /**
* Set the port of the gossip service. * Set the port of the gossip service.
* @param port The port for the gossip service. *
* @param port
* The port for the gossip service.
*/ */
public void setPort(int port) { public void setPort(int port) {
_port = port; _port = port;
@ -59,6 +65,7 @@ public class StartupSettings {
/** /**
* Get the port for the gossip service. * Get the port for the gossip service.
*
* @return The port of the gossip service. * @return The port of the gossip service.
*/ */
public int getPort() { public int getPort() {
@ -67,7 +74,9 @@ public class StartupSettings {
/** /**
* Set the log level of the gossip service. * Set the log level of the gossip service.
* @param logLevel The log level({LogLevel}). *
* @param logLevel
* The log level({LogLevel}).
*/ */
public void setLogLevel(int logLevel) { public void setLogLevel(int logLevel) {
_logLevel = logLevel; _logLevel = logLevel;
@ -75,6 +84,7 @@ public class StartupSettings {
/** /**
* Get the log level of the gossip service. * Get the log level of the gossip service.
*
* @return The log level. * @return The log level.
*/ */
public int getLogLevel() { public int getLogLevel() {
@ -83,6 +93,7 @@ public class StartupSettings {
/** /**
* Get the GossipSettings. * Get the GossipSettings.
*
* @return The GossipSettings object. * @return The GossipSettings object.
*/ */
public GossipSettings getGossipSettings() { public GossipSettings getGossipSettings() {
@ -91,7 +102,9 @@ public class StartupSettings {
/** /**
* Add a gossip member to the list of members to start with. * Add a gossip member to the list of members to start with.
* @param member The member to add. *
* @param member
* The member to add.
*/ */
public void addGossipMember(GossipMember member) { public void addGossipMember(GossipMember member) {
_gossipMembers.add(member); _gossipMembers.add(member);
@ -99,6 +112,7 @@ public class StartupSettings {
/** /**
* Get the list with gossip members. * Get the list with gossip members.
*
* @return The gossip members. * @return The gossip members.
*/ */
public ArrayList<GossipMember> getGossipMembers() { public ArrayList<GossipMember> getGossipMembers() {
@ -107,13 +121,19 @@ public class StartupSettings {
/** /**
* Parse the settings for the gossip service from a JSON file. * Parse the settings for the gossip service from a JSON file.
* @param jsonFile The file object which refers to the JSON config file. *
* @param jsonFile
* The file object which refers to the JSON config file.
* @return The StartupSettings object with the settings from the config file. * @return The StartupSettings object with the settings from the config file.
* @throws JSONException Thrown when the file is not well-formed JSON. * @throws JSONException
* @throws FileNotFoundException Thrown when the file cannot be found. * Thrown when the file is not well-formed JSON.
* @throws IOException Thrown when reading the file gives problems. * @throws FileNotFoundException
* Thrown when the file cannot be found.
* @throws IOException
* Thrown when reading the file gives problems.
*/ */
public static StartupSettings fromJSONFile(File jsonFile) throws JSONException, FileNotFoundException, IOException { public static StartupSettings fromJSONFile(File jsonFile) throws JSONException,
FileNotFoundException, IOException {
// Read the file to a String. // Read the file to a String.
BufferedReader br = new BufferedReader(new FileReader(jsonFile)); BufferedReader br = new BufferedReader(new FileReader(jsonFile));
StringBuffer buffer = new StringBuffer(); StringBuffer buffer = new StringBuffer();
@ -137,17 +157,20 @@ public class StartupSettings {
// Get the cleanup_interval from the config file. // Get the cleanup_interval from the config file.
int cleanupInterval = jsonObject.getInt("cleanup_interval"); int cleanupInterval = jsonObject.getInt("cleanup_interval");
System.out.println("Config [port: " + port + ", log_level: " + logLevel + ", gossip_interval: " + gossipInterval + ", cleanup_interval: " + cleanupInterval + "]"); System.out.println("Config [port: " + port + ", log_level: " + logLevel + ", gossip_interval: "
+ gossipInterval + ", cleanup_interval: " + cleanupInterval + "]");
// Initiate the settings with the port number. // Initiate the settings with the port number.
StartupSettings settings = new StartupSettings(port, logLevel, new GossipSettings(gossipInterval, cleanupInterval)); StartupSettings settings = new StartupSettings(port, logLevel, new GossipSettings(
gossipInterval, cleanupInterval));
// Now iterate over the members from the config file and add them to the settings. // Now iterate over the members from the config file and add them to the settings.
System.out.print("Config-members ["); System.out.print("Config-members [");
JSONArray membersJSON = jsonObject.getJSONArray("members"); JSONArray membersJSON = jsonObject.getJSONArray("members");
for (int i = 0; i < membersJSON.length(); i++) { for (int i = 0; i < membersJSON.length(); i++) {
JSONObject memberJSON = membersJSON.getJSONObject(i); JSONObject memberJSON = membersJSON.getJSONObject(i);
RemoteGossipMember member = new RemoteGossipMember(memberJSON.getString("host"), memberJSON.getInt("port"), ""); RemoteGossipMember member = new RemoteGossipMember(memberJSON.getString("host"),
memberJSON.getInt("port"), "");
settings.addGossipMember(member); settings.addGossipMember(member);
System.out.print(member.getAddress()); System.out.print(member.getAddress());
if (i < (membersJSON.length() - 1)) if (i < (membersJSON.length() - 1))

View File

@ -1,6 +1,5 @@
package com.google.code.gossip.examples; package com.google.code.gossip.examples;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.util.ArrayList; import java.util.ArrayList;
@ -12,8 +11,8 @@ import com.google.code.gossip.LogLevel;
import com.google.code.gossip.RemoteGossipMember; import com.google.code.gossip.RemoteGossipMember;
/** /**
* This class is an example of how one could use the gossip service. * This class is an example of how one could use the gossip service. Here we start multiple gossip
* Here we start multiple gossip clients on this host as specified in the config file. * clients on this host as specified in the config file.
* *
* @author harmenw * @author harmenw
*/ */
@ -29,8 +28,7 @@ public class GossipExample extends Thread {
} }
/** /**
* Constructor. * Constructor. This will start the this thread.
* This will start the this thread.
*/ */
public GossipExample() { public GossipExample() {
start(); start();
@ -48,16 +46,19 @@ public class GossipExample extends Thread {
// Get my ip address. // Get my ip address.
String myIpAddress = InetAddress.getLocalHost().getHostAddress(); String myIpAddress = InetAddress.getLocalHost().getHostAddress();
// Create the gossip members and put them in a list and give them a port number starting with 2000. // Create the gossip members and put them in a list and give them a port number starting with
// 2000.
ArrayList<GossipMember> startupMembers = new ArrayList<GossipMember>(); ArrayList<GossipMember> startupMembers = new ArrayList<GossipMember>();
for (int i = 0; i < NUMBER_OF_CLIENTS; ++i) { for (int i = 0; i < NUMBER_OF_CLIENTS; ++i) {
startupMembers.add(new RemoteGossipMember(myIpAddress, 2000 + i, "")); startupMembers.add(new RemoteGossipMember(myIpAddress, 2000 + i, ""));
} }
// Lets start the gossip clients. // Lets start the gossip clients.
// Start the clients, waiting cleaning-interval + 1 second between them which will show the dead list handling. // Start the clients, waiting cleaning-interval + 1 second between them which will show the
// dead list handling.
for (GossipMember member : startupMembers) { for (GossipMember member : startupMembers) {
GossipService gossipService = new GossipService(myIpAddress, member.getPort(), "", LogLevel.DEBUG, startupMembers, settings); GossipService gossipService = new GossipService(myIpAddress, member.getPort(), "",
LogLevel.DEBUG, startupMembers, settings);
clients.add(gossipService); clients.add(gossipService);
gossipService.start(); gossipService.start();
sleep(settings.getCleanupInterval() + 1000); sleep(settings.getCleanupInterval() + 1000);
@ -66,7 +67,8 @@ public class GossipExample extends Thread {
// After starting all gossip clients, first wait 10 seconds and then shut them down. // After starting all gossip clients, first wait 10 seconds and then shut them down.
sleep(10000); sleep(10000);
System.err.println("Going to shutdown all services..."); System.err.println("Going to shutdown all services...");
// Since they all run in the same virtual machine and share the same executor, if one is shutdown they will all stop. // Since they all run in the same virtual machine and share the same executor, if one is
// shutdown they will all stop.
clients.get(0).shutdown(); clients.get(0).shutdown();
} catch (UnknownHostException e) { } catch (UnknownHostException e) {

View File

@ -8,11 +8,9 @@ import com.google.code.gossip.GossipService;
import com.google.code.gossip.LocalGossipMember; import com.google.code.gossip.LocalGossipMember;
/** /**
* [The active thread: periodically send gossip request.] * [The active thread: periodically send gossip request.] The class handles gossiping the membership
* The class handles gossiping the membership list. * list. This information is important to maintaining a common state among all the nodes, and is
* This information is important to maintaining a common * important for detecting failures.
* state among all the nodes, and is important for detecting
* failures.
*/ */
abstract public class ActiveGossipThread implements Runnable { abstract public class ActiveGossipThread implements Runnable {
@ -42,16 +40,19 @@ abstract public class ActiveGossipThread implements Runnable {
public void shutdown() { public void shutdown() {
_keepRunning.set(false); _keepRunning.set(false);
} }
/**
* Performs the sending of the membership list, after we have
* incremented our own heartbeat.
*/
abstract protected void sendMembershipList(LocalGossipMember me, ArrayList<LocalGossipMember> memberList);
/** /**
* Abstract method which should be implemented by a subclass. * Performs the sending of the membership list, after we have incremented our own heartbeat.
* This method should return a member of the list to gossip with. */
* @param memberList The list of members which are stored in the local list of members. abstract protected void sendMembershipList(LocalGossipMember me,
ArrayList<LocalGossipMember> memberList);
/**
* Abstract method which should be implemented by a subclass. This method should return a member
* of the list to gossip with.
*
* @param memberList
* The list of members which are stored in the local list of members.
* @return The chosen LocalGossipMember to gossip with. * @return The chosen LocalGossipMember to gossip with.
*/ */
abstract protected LocalGossipMember selectPartner(ArrayList<LocalGossipMember> memberList); abstract protected LocalGossipMember selectPartner(ArrayList<LocalGossipMember> memberList);

View File

@ -38,9 +38,11 @@ public abstract class GossipManager extends Thread implements NotificationListen
private ExecutorService _gossipThreadExecutor; private ExecutorService _gossipThreadExecutor;
private Class<? extends PassiveGossipThread> _passiveGossipThreadClass; private Class<? extends PassiveGossipThread> _passiveGossipThreadClass;
private PassiveGossipThread passiveGossipThread; private PassiveGossipThread passiveGossipThread;
private Class<? extends ActiveGossipThread> _activeGossipThreadClass; private Class<? extends ActiveGossipThread> _activeGossipThreadClass;
private ActiveGossipThread activeGossipThread; private ActiveGossipThread activeGossipThread;
public GossipManager(Class<? extends PassiveGossipThread> passiveGossipThreadClass, public GossipManager(Class<? extends PassiveGossipThread> passiveGossipThreadClass,
@ -71,9 +73,8 @@ public abstract class GossipManager extends Thread implements NotificationListen
} }
/** /**
* All timers associated with a member will trigger this method when it goes * All timers associated with a member will trigger this method when it goes off. The timer will
* off. The timer will go off if we have not heard from this member in * go off if we have not heard from this member in <code> _settings.T_CLEANUP </code> time.
* <code> _settings.T_CLEANUP </code> time.
*/ */
@Override @Override
public void handleNotification(Notification notification, Object handback) { public void handleNotification(Notification notification, Object handback) {
@ -93,6 +94,7 @@ public abstract class GossipManager extends Thread implements NotificationListen
/** /**
* Get a clone of the memberlist. * Get a clone of the memberlist.
*
* @return * @return
*/ */
public ArrayList<LocalGossipMember> getMemberList() { public ArrayList<LocalGossipMember> getMemberList() {
@ -108,8 +110,9 @@ public abstract class GossipManager extends Thread implements NotificationListen
} }
/** /**
* Starts the client. Specifically, start the various cycles for this protocol. * Starts the client. Specifically, start the various cycles for this protocol. Start the gossip
* Start the gossip thread and start the receiver thread. * thread and start the receiver thread.
*
* @throws InterruptedException * @throws InterruptedException
*/ */
public void run() { public void run() {
@ -120,9 +123,11 @@ public abstract class GossipManager extends Thread implements NotificationListen
} }
_gossipThreadExecutor = Executors.newCachedThreadPool(); _gossipThreadExecutor = Executors.newCachedThreadPool();
try { try {
passiveGossipThread = _passiveGossipThreadClass.getConstructor(GossipManager.class).newInstance(this); passiveGossipThread = _passiveGossipThreadClass.getConstructor(GossipManager.class)
.newInstance(this);
_gossipThreadExecutor.execute(passiveGossipThread); _gossipThreadExecutor.execute(passiveGossipThread);
activeGossipThread = _activeGossipThreadClass.getConstructor(GossipManager.class).newInstance(this); activeGossipThread = _activeGossipThreadClass.getConstructor(GossipManager.class)
.newInstance(this);
_gossipThreadExecutor.execute(activeGossipThread); _gossipThreadExecutor.execute(activeGossipThread);
} catch (InstantiationException | IllegalAccessException | IllegalArgumentException } catch (InstantiationException | IllegalAccessException | IllegalArgumentException
| InvocationTargetException | NoSuchMethodException | SecurityException e1) { | InvocationTargetException | NoSuchMethodException | SecurityException e1) {

View File

@ -18,12 +18,10 @@ import com.google.code.gossip.GossipService;
import com.google.code.gossip.RemoteGossipMember; import com.google.code.gossip.RemoteGossipMember;
/** /**
* [The passive thread: reply to incoming gossip request.] * [The passive thread: reply to incoming gossip request.] This class handles the passive cycle,
* This class handles the passive cycle, where this client * where this client has received an incoming message. For now, this message is always the
* has received an incoming message. For now, this message * membership list, but if you choose to gossip additional information, you will need some logic to
* is always the membership list, but if you choose to gossip * determine the incoming message.
* additional information, you will need some logic to determine
* the incoming message.
*/ */
abstract public class PassiveGossipThread implements Runnable { abstract public class PassiveGossipThread implements Runnable {
@ -37,9 +35,11 @@ abstract public class PassiveGossipThread implements Runnable {
public PassiveGossipThread(GossipManager gossipManager) { public PassiveGossipThread(GossipManager gossipManager) {
_gossipManager = gossipManager; _gossipManager = gossipManager;
try { try {
SocketAddress socketAddress = new InetSocketAddress(_gossipManager.getMyself().getHost(), _gossipManager.getMyself().getPort()); SocketAddress socketAddress = new InetSocketAddress(_gossipManager.getMyself().getHost(),
_gossipManager.getMyself().getPort());
_server = new DatagramSocket(socketAddress); _server = new DatagramSocket(socketAddress);
GossipService.LOGGER.info("Gossip service successfully initialized on port " + _gossipManager.getMyself().getPort()); GossipService.LOGGER.info("Gossip service successfully initialized on port "
+ _gossipManager.getMyself().getPort());
GossipService.LOGGER.debug("I am " + _gossipManager.getMyself()); GossipService.LOGGER.debug("I am " + _gossipManager.getMyself());
} catch (SocketException ex) { } catch (SocketException ex) {
GossipService.LOGGER.error(ex); GossipService.LOGGER.error(ex);
@ -134,9 +134,15 @@ abstract public class PassiveGossipThread implements Runnable {
/** /**
* Abstract method for merging the local and remote list. * Abstract method for merging the local and remote list.
* @param gossipManager The GossipManager for retrieving the local members and dead members list. *
* @param senderMember The member who is sending this list, this could be used to send a response if the remote list contains out-dated information. * @param gossipManager
* @param remoteList The list of members known at the remote side. * The GossipManager for retrieving the local members and dead members list.
* @param senderMember
* The member who is sending this list, this could be used to send a response if the
* remote list contains out-dated information.
* @param remoteList
* The list of members known at the remote side.
*/ */
abstract protected void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember, ArrayList<GossipMember> remoteList); abstract protected void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember,
ArrayList<GossipMember> remoteList);
} }

View File

@ -16,14 +16,14 @@ public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread
} }
/** /**
* Merge remote list (received from peer), and our local member list. * Merge remote list (received from peer), and our local member list. Simply, we must update the
* Simply, we must update the heartbeats that the remote list has with * heartbeats that the remote list has with our list. Also, some additional logic is needed to
* our list. Also, some additional logic is needed to make sure we have * make sure we have not timed out a member and then immediately received a list with that member.
* not timed out a member and then immediately received a list with that *
* member.
* @param remoteList * @param remoteList
*/ */
protected void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember, ArrayList<GossipMember> remoteList) { protected void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember,
ArrayList<GossipMember> remoteList) {
synchronized (gossipManager.getDeadList()) { synchronized (gossipManager.getDeadList()) {
@ -33,9 +33,11 @@ public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread
// Skip myself. We don't want ourselves in the local member list. // Skip myself. We don't want ourselves in the local member list.
if (!remoteMember.equals(gossipManager.getMyself())) { if (!remoteMember.equals(gossipManager.getMyself())) {
if (gossipManager.getMemberList().contains(remoteMember)) { if (gossipManager.getMemberList().contains(remoteMember)) {
GossipService.LOGGER.debug("The local list already contains the remote member (" + remoteMember + ")."); GossipService.LOGGER.debug("The local list already contains the remote member ("
+ remoteMember + ").");
// The local memberlist contains the remote member. // The local memberlist contains the remote member.
LocalGossipMember localMember = gossipManager.getMemberList().get(gossipManager.getMemberList().indexOf(remoteMember)); LocalGossipMember localMember = gossipManager.getMemberList().get(
gossipManager.getMemberList().indexOf(remoteMember));
// Let's synchronize it's heartbeat. // Let's synchronize it's heartbeat.
if (remoteMember.getHeartbeat() > localMember.getHeartbeat()) { if (remoteMember.getHeartbeat() > localMember.getHeartbeat()) {
@ -47,41 +49,57 @@ public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread
// TODO: Otherwise, should we inform the other when the heartbeat is already higher? // TODO: Otherwise, should we inform the other when the heartbeat is already higher?
} else { } else {
// The local list does not contain the remote member. // The local list does not contain the remote member.
GossipService.LOGGER.debug("The local list does not contain the remote member (" + remoteMember + ")."); GossipService.LOGGER.debug("The local list does not contain the remote member ("
+ remoteMember + ").");
// The remote member is either brand new, or a previously declared dead member. // The remote member is either brand new, or a previously declared dead member.
// If its dead, check the heartbeat because it may have come back from the dead. // If its dead, check the heartbeat because it may have come back from the dead.
if (gossipManager.getDeadList().contains(remoteMember)) { if (gossipManager.getDeadList().contains(remoteMember)) {
// The remote member is known here as a dead member. // The remote member is known here as a dead member.
GossipService.LOGGER.debug("The remote member is known here as a dead member."); GossipService.LOGGER.debug("The remote member is known here as a dead member.");
LocalGossipMember localDeadMember = gossipManager.getDeadList().get(gossipManager.getDeadList().indexOf(remoteMember)); LocalGossipMember localDeadMember = gossipManager.getDeadList().get(
// If a member is restarted the heartbeat will restart from 1, so we should check that here. gossipManager.getDeadList().indexOf(remoteMember));
// So a member can become from the dead when it is either larger than a previous heartbeat (due to network failure) // If a member is restarted the heartbeat will restart from 1, so we should check
// that here.
// So a member can become from the dead when it is either larger than a previous
// heartbeat (due to network failure)
// or when the heartbeat is 1 (after a restart of the service). // or when the heartbeat is 1 (after a restart of the service).
// TODO: What if the first message of a gossip service is sent to a dead node? The second member will receive a heartbeat of two. // TODO: What if the first message of a gossip service is sent to a dead node? The
// second member will receive a heartbeat of two.
// TODO: The above does happen. Maybe a special message for a revived member? // TODO: The above does happen. Maybe a special message for a revived member?
// TODO: Or maybe when a member is declared dead for more than _settings.getCleanupInterval() ms, reset the heartbeat to 0. // TODO: Or maybe when a member is declared dead for more than
// _settings.getCleanupInterval() ms, reset the heartbeat to 0.
// It will then accept a revived member. // It will then accept a revived member.
// The above is now handle by checking whether the heartbeat differs _settings.getCleanupInterval(), it must be restarted. // The above is now handle by checking whether the heartbeat differs
// _settings.getCleanupInterval(), it must be restarted.
if (remoteMember.getHeartbeat() == 1 if (remoteMember.getHeartbeat() == 1
|| ((localDeadMember.getHeartbeat() - remoteMember.getHeartbeat()) * -1) > (gossipManager.getSettings().getCleanupInterval() / 1000) || ((localDeadMember.getHeartbeat() - remoteMember.getHeartbeat()) * -1) > (gossipManager
.getSettings().getCleanupInterval() / 1000)
|| remoteMember.getHeartbeat() > localDeadMember.getHeartbeat()) { || remoteMember.getHeartbeat() > localDeadMember.getHeartbeat()) {
GossipService.LOGGER.debug("The remote member is back from the dead. We will remove it from the dead list and add it as a new member."); GossipService.LOGGER
.debug("The remote member is back from the dead. We will remove it from the dead list and add it as a new member.");
// The remote member is back from the dead. // The remote member is back from the dead.
// Remove it from the dead list. // Remove it from the dead list.
gossipManager.getDeadList().remove(localDeadMember); gossipManager.getDeadList().remove(localDeadMember);
// Add it as a new member and add it to the member list. // Add it as a new member and add it to the member list.
LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getHost(), remoteMember.getPort(), remoteMember.getId(), remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings().getCleanupInterval()); LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getHost(),
remoteMember.getPort(), remoteMember.getId(),
remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings()
.getCleanupInterval());
gossipManager.getMemberList().add(newLocalMember); gossipManager.getMemberList().add(newLocalMember);
newLocalMember.startTimeoutTimer(); newLocalMember.startTimeoutTimer();
GossipService.LOGGER.info("Removed remote member " + remoteMember.getAddress() + " from dead list and added to local member list."); GossipService.LOGGER.info("Removed remote member " + remoteMember.getAddress()
+ " from dead list and added to local member list.");
} }
} else { } else {
// Brand spanking new member - welcome. // Brand spanking new member - welcome.
LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getHost(), remoteMember.getPort(), remoteMember.getId(), remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings().getCleanupInterval()); LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getHost(),
remoteMember.getPort(), remoteMember.getId(), remoteMember.getHeartbeat(),
gossipManager, gossipManager.getSettings().getCleanupInterval());
gossipManager.getMemberList().add(newLocalMember); gossipManager.getMemberList().add(newLocalMember);
newLocalMember.startTimeoutTimer(); newLocalMember.startTimeoutTimer();
GossipService.LOGGER.info("Added new remote member " + remoteMember.getAddress() + " to local member list."); GossipService.LOGGER.info("Added new remote member " + remoteMember.getAddress()
+ " to local member list.");
} }
} }
} }

View File

@ -21,8 +21,7 @@ abstract public class SendMembersActiveGossipThread extends ActiveGossipThread {
} }
/** /**
* Performs the sending of the membership list, after we have * Performs the sending of the membership list, after we have incremented our own heartbeat.
* incremented our own heartbeat.
*/ */
protected void sendMembershipList(LocalGossipMember me, ArrayList<LocalGossipMember> memberList) { protected void sendMembershipList(LocalGossipMember me, ArrayList<LocalGossipMember> memberList) {
GossipService.LOGGER.debug("Send sendMembershipList() is called."); GossipService.LOGGER.debug("Send sendMembershipList() is called.");
@ -69,8 +68,8 @@ abstract public class SendMembersActiveGossipThread extends ActiveGossipThread {
length_bytes[2] = (byte) ((packet_length << 16) >> 24); length_bytes[2] = (byte) ((packet_length << 16) >> 24);
length_bytes[3] = (byte) ((packet_length << 24) >> 24); length_bytes[3] = (byte) ((packet_length << 24) >> 24);
GossipService.LOGGER.debug("Sending message (" + packet_length + " bytes): "
GossipService.LOGGER.debug("Sending message ("+packet_length+" bytes): " + jsonArray.toString()); + jsonArray.toString());
ByteBuffer byteBuffer = ByteBuffer.allocate(4 + json_bytes.length); ByteBuffer byteBuffer = ByteBuffer.allocate(4 + json_bytes.length);
byteBuffer.put(length_bytes); byteBuffer.put(length_bytes);
@ -78,11 +77,13 @@ abstract public class SendMembersActiveGossipThread extends ActiveGossipThread {
byte[] buf = byteBuffer.array(); byte[] buf = byteBuffer.array();
DatagramSocket socket = new DatagramSocket(); DatagramSocket socket = new DatagramSocket();
DatagramPacket datagramPacket = new DatagramPacket(buf, buf.length, dest, member.getPort()); DatagramPacket datagramPacket = new DatagramPacket(buf, buf.length, dest,
member.getPort());
socket.send(datagramPacket); socket.send(datagramPacket);
socket.close(); socket.close();
} else { } else {
GossipService.LOGGER.error("The length of the to be send message is too large (" + packet_length + " > " + GossipManager.MAX_PACKET_SIZE + ")."); GossipService.LOGGER.error("The length of the to be send message is too large ("
+ packet_length + " > " + GossipManager.MAX_PACKET_SIZE + ").");
} }
} }

View File

@ -19,9 +19,9 @@ public class RandomActiveGossipThread extends SendMembersActiveGossipThread {
} }
/** /**
* [The selectToSend() function.] * [The selectToSend() function.] Find a random peer from the local membership list. In the case
* Find a random peer from the local membership list. * where this client is the only member in the list, this method will return null.
* In the case where this client is the only member in the list, this method will return null. *
* @return Member random member if list is greater than 1, null otherwise * @return Member random member if list is greater than 1, null otherwise
*/ */
protected LocalGossipMember selectPartner(ArrayList<LocalGossipMember> memberList) { protected LocalGossipMember selectPartner(ArrayList<LocalGossipMember> memberList) {

View File

@ -8,7 +8,9 @@ import com.google.code.gossip.manager.GossipManager;
import com.google.code.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread; import com.google.code.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread;
public class RandomGossipManager extends GossipManager { public class RandomGossipManager extends GossipManager {
public RandomGossipManager(String address, int port, String id, GossipSettings settings, ArrayList<GossipMember> gossipMembers) { public RandomGossipManager(String address, int port, String id, GossipSettings settings,
super(OnlyProcessReceivedPassiveGossipThread.class, RandomActiveGossipThread.class, address, port, id, settings, gossipMembers); ArrayList<GossipMember> gossipMembers) {
super(OnlyProcessReceivedPassiveGossipThread.class, RandomActiveGossipThread.class, address,
port, id, settings, gossipMembers);
} }
} }