diff --git a/src/main/java/com/google/code/gossip/GossipMember.java b/src/main/java/com/google/code/gossip/GossipMember.java index 0a859bc..e24964e 100644 --- a/src/main/java/com/google/code/gossip/GossipMember.java +++ b/src/main/java/com/google/code/gossip/GossipMember.java @@ -1,149 +1,149 @@ -package com.google.code.gossip; - -import java.net.InetSocketAddress; - -import org.json.JSONException; -import org.json.JSONObject; - -/** - * A abstract class representing a gossip member. - * - * @author joshclemm, harmenw - */ -public abstract class GossipMember { - /** The JSON key for the host property. */ - public static final String JSON_HOST = "host"; - /** The JSON key for the port property. */ - public static final String JSON_PORT = "port"; - /** The JSON key for the heartbeat property. */ - public static final String JSON_HEARTBEAT = "heartbeat"; - - public static final String JSON_ID = "id"; - - /** The hostname or IP address of this gossip member. */ - protected String _host; - - /** The port number of this gossip member. */ - protected int _port; - - /** The current heartbeat of this gossip member. */ - protected int _heartbeat; - - protected String _id; - - /** - * Constructor. - * @param host The hostname or IP address. - * @param port The port number. - * @param heartbeat The current heartbeat. - */ - public GossipMember(String host, int port, String id, int heartbeat) { - _host = host; - _port = port; - _id = id; - _heartbeat = heartbeat; - } - - /** - * Get the hostname or IP address of the remote gossip member. - * @return The hostname or IP address. - */ - public String getHost() { - return _host; - } - - /** - * Get the port number of the remote gossip member. - * @return The port number. - */ - public int getPort() { - return _port; - } - - /** - * The member address in the form IP/host:port - * Similar to the toString in {@link InetSocketAddress} - */ - public String getAddress() { - return _host+":"+_port; - } - - /** - * Get the heartbeat of this gossip member. - * @return The current heartbeat. - */ - public int getHeartbeat() { - return _heartbeat; - } - - /** - * Set the heartbeat of this gossip member. - * @param heartbeat The new heartbeat. - */ - public void setHeartbeat(int heartbeat) { - this._heartbeat = heartbeat; - } - - - public String getId() { - return _id; - } - - public void setId(String _id) { - this._id = _id; - } - - public String toString() { - return "Member [address=" + getAddress() + ", id=" + _id + ", heartbeat=" + _heartbeat + "]"; - } - - /** - * @see java.lang.Object#hashCode() - */ - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - String address = getAddress(); - result = prime * result - + ((address == null) ? 0 : address.hashCode()); - return result; - } - - /** - * @see java.lang.Object#equals(java.lang.Object) - */ - @Override - public boolean equals(Object obj) { - if (this == obj) { - return true; - } - if (obj == null) { - System.err.println("equals(): obj is null."); - return false; - } - if (! (obj instanceof GossipMember) ) { - System.err.println("equals(): obj is not of type GossipMember."); - return false; - } - // The object is the same of they both have the same address (hostname and port). - return getAddress().equals(((LocalGossipMember) obj).getAddress()); - } - - /** - * Get the JSONObject which is the JSON representation of this GossipMember. - * @return The JSONObject of this GossipMember. - */ - public JSONObject toJSONObject() { - try { - JSONObject jsonObject = new JSONObject(); - jsonObject.put(JSON_HOST, _host); - jsonObject.put(JSON_PORT, _port); - jsonObject.put(JSON_ID, _id); - jsonObject.put(JSON_HEARTBEAT, _heartbeat); - return jsonObject; - } catch (JSONException e) { - throw new RuntimeException(e); - } - } -} +package com.google.code.gossip; + +import java.net.InetSocketAddress; + +import org.json.JSONException; +import org.json.JSONObject; + +/** + * A abstract class representing a gossip member. + * + * @author joshclemm, harmenw + */ +public abstract class GossipMember { + /** The JSON key for the host property. */ + public static final String JSON_HOST = "host"; + /** The JSON key for the port property. */ + public static final String JSON_PORT = "port"; + /** The JSON key for the heartbeat property. */ + public static final String JSON_HEARTBEAT = "heartbeat"; + + public static final String JSON_ID = "id"; + + /** The hostname or IP address of this gossip member. */ + protected String _host; + + /** The port number of this gossip member. */ + protected int _port; + + /** The current heartbeat of this gossip member. */ + protected int _heartbeat; + + protected String _id; + + /** + * Constructor. + * @param host The hostname or IP address. + * @param port The port number. + * @param heartbeat The current heartbeat. + */ + public GossipMember(String host, int port, String id, int heartbeat) { + _host = host; + _port = port; + _id = id; + _heartbeat = heartbeat; + } + + /** + * Get the hostname or IP address of the remote gossip member. + * @return The hostname or IP address. + */ + public String getHost() { + return _host; + } + + /** + * Get the port number of the remote gossip member. + * @return The port number. + */ + public int getPort() { + return _port; + } + + /** + * The member address in the form IP/host:port + * Similar to the toString in {@link InetSocketAddress} + */ + public String getAddress() { + return _host+":"+_port; + } + + /** + * Get the heartbeat of this gossip member. + * @return The current heartbeat. + */ + public int getHeartbeat() { + return _heartbeat; + } + + /** + * Set the heartbeat of this gossip member. + * @param heartbeat The new heartbeat. + */ + public void setHeartbeat(int heartbeat) { + this._heartbeat = heartbeat; + } + + + public String getId() { + return _id; + } + + public void setId(String _id) { + this._id = _id; + } + + public String toString() { + return "Member [address=" + getAddress() + ", id=" + _id + ", heartbeat=" + _heartbeat + "]"; + } + + /** + * @see java.lang.Object#hashCode() + */ + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + String address = getAddress(); + result = prime * result + + ((address == null) ? 0 : address.hashCode()); + return result; + } + + /** + * @see java.lang.Object#equals(java.lang.Object) + */ + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + System.err.println("equals(): obj is null."); + return false; + } + if (! (obj instanceof GossipMember) ) { + System.err.println("equals(): obj is not of type GossipMember."); + return false; + } + // The object is the same of they both have the same address (hostname and port). + return getAddress().equals(((LocalGossipMember) obj).getAddress()); + } + + /** + * Get the JSONObject which is the JSON representation of this GossipMember. + * @return The JSONObject of this GossipMember. + */ + public JSONObject toJSONObject() { + try { + JSONObject jsonObject = new JSONObject(); + jsonObject.put(JSON_HOST, _host); + jsonObject.put(JSON_PORT, _port); + jsonObject.put(JSON_ID, _id); + jsonObject.put(JSON_HEARTBEAT, _heartbeat); + return jsonObject; + } catch (JSONException e) { + throw new RuntimeException(e); + } + } +} diff --git a/src/main/java/com/google/code/gossip/GossipRunner.java b/src/main/java/com/google/code/gossip/GossipRunner.java index 95d2bb8..e452f93 100644 --- a/src/main/java/com/google/code/gossip/GossipRunner.java +++ b/src/main/java/com/google/code/gossip/GossipRunner.java @@ -1,46 +1,47 @@ -package com.google.code.gossip; - -import java.io.File; -import java.io.FileNotFoundException; -import java.io.IOException; - -import org.json.JSONException; - -public class GossipRunner { - private StartupSettings _settings; - - public static void main(String[] args) { - File configFile; - - if (args.length == 1) { - configFile = new File("./"+args[0]); - } else { - configFile = new File("gossip.conf"); - } - - new GossipRunner(configFile); - } - - public GossipRunner(File configFile) { - - if (configFile != null && configFile.exists()) { - try { - System.out.println("Parsing the configuration file..."); - _settings = StartupSettings.fromJSONFile(configFile); - GossipService gossipService = new GossipService(_settings); - System.out.println("Gossip service successfully inialized, let's start it..."); - gossipService.start(); - } catch (FileNotFoundException e) { - System.err.println("The given file is not found!"); - } catch (JSONException e) { - System.err.println("The given file is not in the correct JSON format!"); - } catch (IOException e) { - System.err.println("Could not read the configuration file: " + e.getMessage()); - } catch (InterruptedException e) { - System.err.println("Error while starting the gossip service: " + e.getMessage()); - } - } else { - System.out.println("The gossip.conf file is not found.\n\nEither specify the path to the startup settings file or place the gossip.json file in the same folder as the JAR file."); - } - } -} +package com.google.code.gossip; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; + +import org.json.JSONException; + +public class GossipRunner { + private StartupSettings _settings; + + public static void main(String[] args) { + File configFile; + + if (args.length == 1) { + configFile = new File("./" + args[0]); + } else { + configFile = new File("gossip.conf"); + } + + new GossipRunner(configFile); + } + + public GossipRunner(File configFile) { + + if (configFile != null && configFile.exists()) { + try { + System.out.println("Parsing the configuration file..."); + _settings = StartupSettings.fromJSONFile(configFile); + GossipService gossipService = new GossipService(_settings); + System.out.println("Gossip service successfully inialized, let's start it..."); + gossipService.start(); + } catch (FileNotFoundException e) { + System.err.println("The given file is not found!"); + } catch (JSONException e) { + System.err.println("The given file is not in the correct JSON format!"); + } catch (IOException e) { + System.err.println("Could not read the configuration file: " + e.getMessage()); + } catch (InterruptedException e) { + System.err.println("Error while starting the gossip service: " + e.getMessage()); + } + } else { + System.out + .println("The gossip.conf file is not found.\n\nEither specify the path to the startup settings file or place the gossip.json file in the same folder as the JAR file."); + } + } +} diff --git a/src/main/java/com/google/code/gossip/GossipService.java b/src/main/java/com/google/code/gossip/GossipService.java index cd71731..00027bc 100644 --- a/src/main/java/com/google/code/gossip/GossipService.java +++ b/src/main/java/com/google/code/gossip/GossipService.java @@ -1,58 +1,65 @@ -package com.google.code.gossip; - -import java.net.InetAddress; -import java.net.SocketException; -import java.net.UnknownHostException; -import java.util.ArrayList; -import org.apache.log4j.Logger; - -import com.google.code.gossip.manager.GossipManager; -import com.google.code.gossip.manager.random.RandomGossipManager; - -/** - * This object represents the service which is responsible for gossiping with other gossip members. - * - * @author joshclemm, harmenw - */ -public class GossipService { - - public static final Logger LOGGER = Logger.getLogger(GossipService.class); - private GossipManager _gossipManager; - - /** - * Constructor with the default settings. - * @throws InterruptedException - * @throws UnknownHostException - */ - public GossipService(StartupSettings startupSettings) throws InterruptedException, UnknownHostException { - this(InetAddress.getLocalHost().getHostAddress(), startupSettings.getPort(), "", startupSettings.getLogLevel(), startupSettings.getGossipMembers(), startupSettings.getGossipSettings()); - } - - /** - * Setup the client's lists, gossiping parameters, and parse the startup config file. - * @throws SocketException - * @throws InterruptedException - * @throws UnknownHostException - */ - public GossipService(String ipAddress, int port, String id, int logLevel, ArrayList gossipMembers, GossipSettings settings) throws InterruptedException, UnknownHostException { - _gossipManager = new RandomGossipManager(ipAddress, port, id, settings, gossipMembers); - } - - public void start() { - _gossipManager.start(); - } - - public void shutdown() { - _gossipManager.shutdown(); - } - - public GossipManager get_gossipManager() { - return _gossipManager; - } - - public void set_gossipManager(GossipManager _gossipManager) { - this._gossipManager = _gossipManager; - } - - -} +package com.google.code.gossip; + +import java.net.InetAddress; +import java.net.SocketException; +import java.net.UnknownHostException; +import java.util.ArrayList; +import org.apache.log4j.Logger; + +import com.google.code.gossip.manager.GossipManager; +import com.google.code.gossip.manager.random.RandomGossipManager; + +/** + * This object represents the service which is responsible for gossiping with other gossip members. + * + * @author joshclemm, harmenw + */ +public class GossipService { + + public static final Logger LOGGER = Logger.getLogger(GossipService.class); + + private GossipManager _gossipManager; + + /** + * Constructor with the default settings. + * + * @throws InterruptedException + * @throws UnknownHostException + */ + public GossipService(StartupSettings startupSettings) throws InterruptedException, + UnknownHostException { + this(InetAddress.getLocalHost().getHostAddress(), startupSettings.getPort(), "", + startupSettings.getLogLevel(), startupSettings.getGossipMembers(), startupSettings + .getGossipSettings()); + } + + /** + * Setup the client's lists, gossiping parameters, and parse the startup config file. + * + * @throws SocketException + * @throws InterruptedException + * @throws UnknownHostException + */ + public GossipService(String ipAddress, int port, String id, int logLevel, + ArrayList gossipMembers, GossipSettings settings) + throws InterruptedException, UnknownHostException { + _gossipManager = new RandomGossipManager(ipAddress, port, id, settings, gossipMembers); + } + + public void start() { + _gossipManager.start(); + } + + public void shutdown() { + _gossipManager.shutdown(); + } + + public GossipManager get_gossipManager() { + return _gossipManager; + } + + public void set_gossipManager(GossipManager _gossipManager) { + this._gossipManager = _gossipManager; + } + +} diff --git a/src/main/java/com/google/code/gossip/GossipSettings.java b/src/main/java/com/google/code/gossip/GossipSettings.java index f81f458..10c20ff 100644 --- a/src/main/java/com/google/code/gossip/GossipSettings.java +++ b/src/main/java/com/google/code/gossip/GossipSettings.java @@ -1,64 +1,73 @@ -package com.google.code.gossip; - -/** - * In this object the settings used by the GossipService are held. - * - * @author harmenw - */ -public class GossipSettings { - - /** Time between gossip'ing in ms. Default is 1 second. */ - private int _gossipInterval = 1000; - - /** Time between cleanups in ms. Default is 10 seconds. */ - private int _cleanupInterval = 10000; - - /** - * Construct GossipSettings with default settings. - */ - public GossipSettings() {} - - /** - * Construct GossipSettings with given settings. - * @param gossipInterval The gossip interval in ms. - * @param cleanupInterval The cleanup interval in ms. - */ - public GossipSettings(int gossipInterval, int cleanupInterval) { - _gossipInterval = gossipInterval; - _cleanupInterval = cleanupInterval; - } - - /** - * Set the gossip interval. - * This is the time between a gossip message is send. - * @param gossipInterval The gossip interval in ms. - */ - public void setGossipTimeout(int gossipInterval) { - _gossipInterval = gossipInterval; - } - - /** - * Set the cleanup interval. - * This is the time between the last heartbeat received from a member and when it will be marked as dead. - * @param cleanupInterval The cleanup interval in ms. - */ - public void setCleanupInterval(int cleanupInterval) { - _cleanupInterval = cleanupInterval; - } - - /** - * Get the gossip interval. - * @return The gossip interval in ms. - */ - public int getGossipInterval() { - return _gossipInterval; - } - - /** - * Get the clean interval. - * @return The cleanup interval. - */ - public int getCleanupInterval() { - return _cleanupInterval; - } -} +package com.google.code.gossip; + +/** + * In this object the settings used by the GossipService are held. + * + * @author harmenw + */ +public class GossipSettings { + + /** Time between gossip'ing in ms. Default is 1 second. */ + private int _gossipInterval = 1000; + + /** Time between cleanups in ms. Default is 10 seconds. */ + private int _cleanupInterval = 10000; + + /** + * Construct GossipSettings with default settings. + */ + public GossipSettings() { + } + + /** + * Construct GossipSettings with given settings. + * + * @param gossipInterval + * The gossip interval in ms. + * @param cleanupInterval + * The cleanup interval in ms. + */ + public GossipSettings(int gossipInterval, int cleanupInterval) { + _gossipInterval = gossipInterval; + _cleanupInterval = cleanupInterval; + } + + /** + * Set the gossip interval. This is the time between a gossip message is send. + * + * @param gossipInterval + * The gossip interval in ms. + */ + public void setGossipTimeout(int gossipInterval) { + _gossipInterval = gossipInterval; + } + + /** + * Set the cleanup interval. This is the time between the last heartbeat received from a member + * and when it will be marked as dead. + * + * @param cleanupInterval + * The cleanup interval in ms. + */ + public void setCleanupInterval(int cleanupInterval) { + _cleanupInterval = cleanupInterval; + } + + /** + * Get the gossip interval. + * + * @return The gossip interval in ms. + */ + public int getGossipInterval() { + return _gossipInterval; + } + + /** + * Get the clean interval. + * + * @return The cleanup interval. + */ + public int getCleanupInterval() { + return _cleanupInterval; + } +} diff --git a/src/main/java/com/google/code/gossip/GossipTimeoutTimer.java b/src/main/java/com/google/code/gossip/GossipTimeoutTimer.java index 5f4eca0..94cc3a4 100644 --- a/src/main/java/com/google/code/gossip/GossipTimeoutTimer.java +++ b/src/main/java/com/google/code/gossip/GossipTimeoutTimer.java @@ -1,61 +1,63 @@ -package com.google.code.gossip; - -import java.util.Date; - -import javax.management.NotificationListener; -import javax.management.timer.Timer; - -/** - * This object represents a timer for a gossip member. - * When the timer has elapsed without being reset in the meantime, it will inform the GossipService about this - * who in turn will put the gossip member on the dead list, because it is apparantly not alive anymore. - * - * @author joshclemm, harmenw - */ -public class GossipTimeoutTimer extends Timer { - - /** The amount of time this timer waits before generating a wake-up event. */ - private long _sleepTime; - - /** The gossip member this timer is for. */ - private LocalGossipMember _source; - - /** - * Constructor. - * Creates a reset-able timer that wakes up after millisecondsSleepTime. - * @param millisecondsSleepTime The time for this timer to wait before an event. - * @param service - * @param member - */ - public GossipTimeoutTimer(long millisecondsSleepTime, NotificationListener notificationListener, LocalGossipMember member) { - super(); - _sleepTime = millisecondsSleepTime; - _source = member; - addNotificationListener(notificationListener, null, null); - } - - /** - * @see javax.management.timer.Timer#start() - */ - public void start() { - this.reset(); - super.start(); - } - - /** - * Resets timer to start counting down from original time. - */ - public void reset() { - removeAllNotifications(); - setWakeupTime(_sleepTime); - } - - /** - * Adds a new wake-up time for this timer. - * @param milliseconds - */ - private void setWakeupTime(long milliseconds) { - addNotification("type", "message", _source, new Date(System.currentTimeMillis()+milliseconds)); - } -} - +package com.google.code.gossip; + +import java.util.Date; + +import javax.management.NotificationListener; +import javax.management.timer.Timer; + +/** + * This object represents a timer for a gossip member. When the timer has elapsed without being + * reset in the meantime, it will inform the GossipService about this who in turn will put the + * gossip member on the dead list, because it is apparantly not alive anymore. + * + * @author joshclemm, harmenw + */ +public class GossipTimeoutTimer extends Timer { + + /** The amount of time this timer waits before generating a wake-up event. */ + private long _sleepTime; + + /** The gossip member this timer is for. */ + private LocalGossipMember _source; + + /** + * Constructor. Creates a reset-able timer that wakes up after millisecondsSleepTime. + * + * @param millisecondsSleepTime + * The time for this timer to wait before an event. + * @param service + * @param member + */ + public GossipTimeoutTimer(long millisecondsSleepTime, NotificationListener notificationListener, + LocalGossipMember member) { + super(); + _sleepTime = millisecondsSleepTime; + _source = member; + addNotificationListener(notificationListener, null, null); + } + + /** + * @see javax.management.timer.Timer#start() + */ + public void start() { + this.reset(); + super.start(); + } + + /** + * Resets timer to start counting down from original time. + */ + public void reset() { + removeAllNotifications(); + setWakeupTime(_sleepTime); + } + + /** + * Adds a new wake-up time for this timer. + * + * @param milliseconds + */ + private void setWakeupTime(long milliseconds) { + addNotification("type", "message", _source, new Date(System.currentTimeMillis() + milliseconds)); + } +} diff --git a/src/main/java/com/google/code/gossip/LocalGossipMember.java b/src/main/java/com/google/code/gossip/LocalGossipMember.java index 6920d23..6d040ae 100644 --- a/src/main/java/com/google/code/gossip/LocalGossipMember.java +++ b/src/main/java/com/google/code/gossip/LocalGossipMember.java @@ -1,42 +1,49 @@ -package com.google.code.gossip; - -import javax.management.NotificationListener; - -/** - * This object represent a gossip member with the properties known locally. - * These objects are stored in the local list of gossip member.s - * - * @author harmenw - */ -public class LocalGossipMember extends GossipMember { - /** The timeout timer for this gossip member. */ - private transient GossipTimeoutTimer timeoutTimer; - - /** - * Constructor. - * @param host The hostname or IP address. - * @param port The port number. - * @param heartbeat The current heartbeat. - * @param gossipService The GossipService object. - * @param cleanupTimeout The cleanup timeout for this gossip member. - */ - public LocalGossipMember(String hostname, int port, String id, int heartbeat, NotificationListener notificationListener, int cleanupTimeout) { - super(hostname, port, id, heartbeat); - - this.timeoutTimer = new GossipTimeoutTimer(cleanupTimeout, notificationListener, this); - } - - /** - * Start the timeout timer. - */ - public void startTimeoutTimer() { - this.timeoutTimer.start(); - } - - /** - * Reset the timeout timer. - */ - public void resetTimeoutTimer() { - this.timeoutTimer.reset(); - } -} +package com.google.code.gossip; + +import javax.management.NotificationListener; + +/** + * This object represent a gossip member with the properties known locally. These objects are stored + * in the local list of gossip member.s + * + * @author harmenw + */ +public class LocalGossipMember extends GossipMember { + /** The timeout timer for this gossip member. */ + private transient GossipTimeoutTimer timeoutTimer; + + /** + * Constructor. + * + * @param host + * The hostname or IP address. + * @param port + * The port number. + * @param heartbeat + * The current heartbeat. + * @param gossipService + * The GossipService object. + * @param cleanupTimeout + * The cleanup timeout for this gossip member. + */ + public LocalGossipMember(String hostname, int port, String id, int heartbeat, + NotificationListener notificationListener, int cleanupTimeout) { + super(hostname, port, id, heartbeat); + + this.timeoutTimer = new GossipTimeoutTimer(cleanupTimeout, notificationListener, this); + } + + /** + * Start the timeout timer. + */ + public void startTimeoutTimer() { + this.timeoutTimer.start(); + } + + /** + * Reset the timeout timer. + */ + public void resetTimeoutTimer() { + this.timeoutTimer.reset(); + } +} diff --git a/src/main/java/com/google/code/gossip/LogLevel.java b/src/main/java/com/google/code/gossip/LogLevel.java index 16189ea..bb22dac 100644 --- a/src/main/java/com/google/code/gossip/LogLevel.java +++ b/src/main/java/com/google/code/gossip/LogLevel.java @@ -1,25 +1,27 @@ -package com.google.code.gossip; - -public class LogLevel { - - public static final String CONFIG_ERROR = "ERROR"; - - public static final String CONFIG_INFO = "INFO"; - - public static final String CONFIG_DEBUG = "DEBUG"; - - public static final int ERROR = 1; - public static final int INFO = 2; - public static final int DEBUG = 3; - - public static int fromString(String logLevel) { - if (logLevel.equals(CONFIG_ERROR)) - return ERROR; - else if (logLevel.equals(CONFIG_INFO)) - return INFO; - else if (logLevel.equals(CONFIG_DEBUG)) - return DEBUG; - else - return INFO; - } -} +package com.google.code.gossip; + +public class LogLevel { + + public static final String CONFIG_ERROR = "ERROR"; + + public static final String CONFIG_INFO = "INFO"; + + public static final String CONFIG_DEBUG = "DEBUG"; + + public static final int ERROR = 1; + + public static final int INFO = 2; + + public static final int DEBUG = 3; + + public static int fromString(String logLevel) { + if (logLevel.equals(CONFIG_ERROR)) + return ERROR; + else if (logLevel.equals(CONFIG_INFO)) + return INFO; + else if (logLevel.equals(CONFIG_DEBUG)) + return DEBUG; + else + return INFO; + } +} diff --git a/src/main/java/com/google/code/gossip/RemoteGossipMember.java b/src/main/java/com/google/code/gossip/RemoteGossipMember.java index d6403e0..f42f699 100644 --- a/src/main/java/com/google/code/gossip/RemoteGossipMember.java +++ b/src/main/java/com/google/code/gossip/RemoteGossipMember.java @@ -1,28 +1,36 @@ -package com.google.code.gossip; - -/** - * The object represents a gossip member with the properties as received from a remote gossip member. - * - * @author harmenw - */ -public class RemoteGossipMember extends GossipMember { - - /** - * Constructor. - * @param host The hostname or IP address. - * @param port The port number. - * @param heartbeat The current heartbeat. - */ - public RemoteGossipMember(String hostname, int port, String id, int heartbeat) { - super(hostname, port, id, heartbeat); - } - - /** - * Construct a RemoteGossipMember with a heartbeat of 0. - * @param host The hostname or IP address. - * @param port The port number. - */ - public RemoteGossipMember(String hostname, int port, String id) { - super(hostname, port, id, 0); - } -} +package com.google.code.gossip; + +/** + * The object represents a gossip member with the properties as received from a remote gossip + * member. + * + * @author harmenw + */ +public class RemoteGossipMember extends GossipMember { + + /** + * Constructor. + * + * @param host + * The hostname or IP address. + * @param port + * The port number. + * @param heartbeat + * The current heartbeat. + */ + public RemoteGossipMember(String hostname, int port, String id, int heartbeat) { + super(hostname, port, id, heartbeat); + } + + /** + * Construct a RemoteGossipMember with a heartbeat of 0. + * + * @param host + * The hostname or IP address. + * @param port + * The port number. + */ + public RemoteGossipMember(String hostname, int port, String id) { + super(hostname, port, id, 0); + } +} diff --git a/src/main/java/com/google/code/gossip/StartupSettings.java b/src/main/java/com/google/code/gossip/StartupSettings.java index 8cc1275..2e558ef 100644 --- a/src/main/java/com/google/code/gossip/StartupSettings.java +++ b/src/main/java/com/google/code/gossip/StartupSettings.java @@ -1,161 +1,184 @@ -package com.google.code.gossip; - -import java.io.BufferedReader; -import java.io.File; -import java.io.FileNotFoundException; -import java.io.FileReader; -import java.io.IOException; -import java.util.ArrayList; - -import org.json.JSONArray; -import org.json.JSONException; -import org.json.JSONObject; - -/** - * This object represents the settings used when starting the gossip service. - * - * @author harmenw - */ -public class StartupSettings { - - /** The port to start the gossip service on. */ - private int _port; - - /** The logging level of the gossip service. */ - private int _logLevel; - - /** The gossip settings used at startup. */ - private GossipSettings _gossipSettings; - - /** The list with gossip members to start with. */ - private ArrayList _gossipMembers; - - /** - * Constructor. - * @param port The port to start the service on. - */ - public StartupSettings(int port, int logLevel) { - this(port, logLevel, new GossipSettings()); - } - - /** - * Constructor. - * @param port The port to start the service on. - */ - public StartupSettings(int port, int logLevel, GossipSettings gossipSettings) { - _port = port; - _logLevel = logLevel; - _gossipSettings = gossipSettings; - _gossipMembers = new ArrayList(); - } - - /** - * Set the port of the gossip service. - * @param port The port for the gossip service. - */ - public void setPort(int port) { - _port = port; - } - - /** - * Get the port for the gossip service. - * @return The port of the gossip service. - */ - public int getPort() { - return _port; - } - - /** - * Set the log level of the gossip service. - * @param logLevel The log level({LogLevel}). - */ - public void setLogLevel(int logLevel) { - _logLevel = logLevel; - } - - /** - * Get the log level of the gossip service. - * @return The log level. - */ - public int getLogLevel() { - return _logLevel; - } - - /** - * Get the GossipSettings. - * @return The GossipSettings object. - */ - public GossipSettings getGossipSettings() { - return _gossipSettings; - } - - /** - * Add a gossip member to the list of members to start with. - * @param member The member to add. - */ - public void addGossipMember(GossipMember member) { - _gossipMembers.add(member); - } - - /** - * Get the list with gossip members. - * @return The gossip members. - */ - public ArrayList getGossipMembers() { - return _gossipMembers; - } - - /** - * Parse the settings for the gossip service from a JSON file. - * @param jsonFile The file object which refers to the JSON config file. - * @return The StartupSettings object with the settings from the config file. - * @throws JSONException Thrown when the file is not well-formed JSON. - * @throws FileNotFoundException Thrown when the file cannot be found. - * @throws IOException Thrown when reading the file gives problems. - */ - public static StartupSettings fromJSONFile(File jsonFile) throws JSONException, FileNotFoundException, IOException { - // Read the file to a String. - BufferedReader br = new BufferedReader(new FileReader(jsonFile)); - StringBuffer buffer = new StringBuffer(); - String line; - while((line = br.readLine()) != null) { - buffer.append(line.trim()); - } - - // Lets parse the String as JSON. - JSONObject jsonObject = new JSONArray(buffer.toString()).getJSONObject(0); - - // Now get the port number. - int port = jsonObject.getInt("port"); - - // Get the log level from the config file. - int logLevel = LogLevel.fromString(jsonObject.getString("log_level")); - - // Get the gossip_interval from the config file. - int gossipInterval = jsonObject.getInt("gossip_interval"); - - // Get the cleanup_interval from the config file. - int cleanupInterval = jsonObject.getInt("cleanup_interval"); - - System.out.println("Config [port: " + port + ", log_level: " + logLevel + ", gossip_interval: " + gossipInterval + ", cleanup_interval: " + cleanupInterval + "]"); - - // Initiate the settings with the port number. - StartupSettings settings = new StartupSettings(port, logLevel, new GossipSettings(gossipInterval, cleanupInterval)); - - // Now iterate over the members from the config file and add them to the settings. - System.out.print("Config-members ["); - JSONArray membersJSON = jsonObject.getJSONArray("members"); - for (int i=0; i _gossipMembers; + + /** + * Constructor. + * + * @param port + * The port to start the service on. + */ + public StartupSettings(int port, int logLevel) { + this(port, logLevel, new GossipSettings()); + } + + /** + * Constructor. + * + * @param port + * The port to start the service on. + */ + public StartupSettings(int port, int logLevel, GossipSettings gossipSettings) { + _port = port; + _logLevel = logLevel; + _gossipSettings = gossipSettings; + _gossipMembers = new ArrayList(); + } + + /** + * Set the port of the gossip service. + * + * @param port + * The port for the gossip service. + */ + public void setPort(int port) { + _port = port; + } + + /** + * Get the port for the gossip service. + * + * @return The port of the gossip service. + */ + public int getPort() { + return _port; + } + + /** + * Set the log level of the gossip service. + * + * @param logLevel + * The log level({LogLevel}). + */ + public void setLogLevel(int logLevel) { + _logLevel = logLevel; + } + + /** + * Get the log level of the gossip service. + * + * @return The log level. + */ + public int getLogLevel() { + return _logLevel; + } + + /** + * Get the GossipSettings. + * + * @return The GossipSettings object. + */ + public GossipSettings getGossipSettings() { + return _gossipSettings; + } + + /** + * Add a gossip member to the list of members to start with. + * + * @param member + * The member to add. + */ + public void addGossipMember(GossipMember member) { + _gossipMembers.add(member); + } + + /** + * Get the list with gossip members. + * + * @return The gossip members. + */ + public ArrayList getGossipMembers() { + return _gossipMembers; + } + + /** + * Parse the settings for the gossip service from a JSON file. + * + * @param jsonFile + * The file object which refers to the JSON config file. + * @return The StartupSettings object with the settings from the config file. + * @throws JSONException + * Thrown when the file is not well-formed JSON. + * @throws FileNotFoundException + * Thrown when the file cannot be found. + * @throws IOException + * Thrown when reading the file gives problems. + */ + public static StartupSettings fromJSONFile(File jsonFile) throws JSONException, + FileNotFoundException, IOException { + // Read the file to a String. + BufferedReader br = new BufferedReader(new FileReader(jsonFile)); + StringBuffer buffer = new StringBuffer(); + String line; + while ((line = br.readLine()) != null) { + buffer.append(line.trim()); + } + + // Lets parse the String as JSON. + JSONObject jsonObject = new JSONArray(buffer.toString()).getJSONObject(0); + + // Now get the port number. + int port = jsonObject.getInt("port"); + + // Get the log level from the config file. + int logLevel = LogLevel.fromString(jsonObject.getString("log_level")); + + // Get the gossip_interval from the config file. + int gossipInterval = jsonObject.getInt("gossip_interval"); + + // Get the cleanup_interval from the config file. + int cleanupInterval = jsonObject.getInt("cleanup_interval"); + + System.out.println("Config [port: " + port + ", log_level: " + logLevel + ", gossip_interval: " + + gossipInterval + ", cleanup_interval: " + cleanupInterval + "]"); + + // Initiate the settings with the port number. + StartupSettings settings = new StartupSettings(port, logLevel, new GossipSettings( + gossipInterval, cleanupInterval)); + + // Now iterate over the members from the config file and add them to the settings. + System.out.print("Config-members ["); + JSONArray membersJSON = jsonObject.getJSONArray("members"); + for (int i = 0; i < membersJSON.length(); i++) { + JSONObject memberJSON = membersJSON.getJSONObject(i); + RemoteGossipMember member = new RemoteGossipMember(memberJSON.getString("host"), + memberJSON.getInt("port"), ""); + settings.addGossipMember(member); + System.out.print(member.getAddress()); + if (i < (membersJSON.length() - 1)) + System.out.print(", "); + } + System.out.println("]"); + + // Return the created settings object. + return settings; + } +} diff --git a/src/main/java/com/google/code/gossip/examples/GossipExample.java b/src/main/java/com/google/code/gossip/examples/GossipExample.java index 5b6df28..abc28e1 100644 --- a/src/main/java/com/google/code/gossip/examples/GossipExample.java +++ b/src/main/java/com/google/code/gossip/examples/GossipExample.java @@ -1,78 +1,80 @@ -package com.google.code.gossip.examples; - - -import java.net.InetAddress; -import java.net.UnknownHostException; -import java.util.ArrayList; - -import com.google.code.gossip.GossipMember; -import com.google.code.gossip.GossipService; -import com.google.code.gossip.GossipSettings; -import com.google.code.gossip.LogLevel; -import com.google.code.gossip.RemoteGossipMember; - -/** - * This class is an example of how one could use the gossip service. - * Here we start multiple gossip clients on this host as specified in the config file. - * - * @author harmenw - */ -public class GossipExample extends Thread { - /** The number of clients to start. */ - private static final int NUMBER_OF_CLIENTS = 4; - - /** - * @param args - */ - public static void main(String[] args) { - new GossipExample(); - } - - /** - * Constructor. - * This will start the this thread. - */ - public GossipExample() { - start(); - } - - /** - * @see java.lang.Thread#run() - */ - public void run() { - try { - GossipSettings settings = new GossipSettings(); - - ArrayList clients = new ArrayList(); - - // Get my ip address. - String myIpAddress = InetAddress.getLocalHost().getHostAddress(); - - // Create the gossip members and put them in a list and give them a port number starting with 2000. - ArrayList startupMembers = new ArrayList(); - for (int i=0; i clients = new ArrayList(); + + // Get my ip address. + String myIpAddress = InetAddress.getLocalHost().getHostAddress(); + + // Create the gossip members and put them in a list and give them a port number starting with + // 2000. + ArrayList startupMembers = new ArrayList(); + for (int i = 0; i < NUMBER_OF_CLIENTS; ++i) { + startupMembers.add(new RemoteGossipMember(myIpAddress, 2000 + i, "")); + } + + // Lets start the gossip clients. + // Start the clients, waiting cleaning-interval + 1 second between them which will show the + // dead list handling. + for (GossipMember member : startupMembers) { + GossipService gossipService = new GossipService(myIpAddress, member.getPort(), "", + LogLevel.DEBUG, startupMembers, settings); + clients.add(gossipService); + gossipService.start(); + sleep(settings.getCleanupInterval() + 1000); + } + + // After starting all gossip clients, first wait 10 seconds and then shut them down. + sleep(10000); + System.err.println("Going to shutdown all services..."); + // Since they all run in the same virtual machine and share the same executor, if one is + // shutdown they will all stop. + clients.get(0).shutdown(); + + } catch (UnknownHostException e) { + e.printStackTrace(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } +} diff --git a/src/main/java/com/google/code/gossip/manager/ActiveGossipThread.java b/src/main/java/com/google/code/gossip/manager/ActiveGossipThread.java index 3f3964b..e5ab754 100644 --- a/src/main/java/com/google/code/gossip/manager/ActiveGossipThread.java +++ b/src/main/java/com/google/code/gossip/manager/ActiveGossipThread.java @@ -1,58 +1,59 @@ -package com.google.code.gossip.manager; - -import java.util.ArrayList; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -import com.google.code.gossip.GossipService; -import com.google.code.gossip.LocalGossipMember; - -/** - * [The active thread: periodically send gossip request.] - * The class handles gossiping the membership list. - * This information is important to maintaining a common - * state among all the nodes, and is important for detecting - * failures. - */ -abstract public class ActiveGossipThread implements Runnable { - - private GossipManager _gossipManager; - - private final AtomicBoolean _keepRunning; - - public ActiveGossipThread(GossipManager gossipManager) { - _gossipManager = gossipManager; - _keepRunning = new AtomicBoolean(true); - } - - @Override - public void run() { - while(_keepRunning.get()) { - try { - TimeUnit.MILLISECONDS.sleep(_gossipManager.getSettings().getGossipInterval()); - sendMembershipList(_gossipManager.getMyself(), _gossipManager.getMemberList()); - } catch (InterruptedException e) { - GossipService.LOGGER.error(e); - _keepRunning.set(false); - } - } - shutdown(); - } - - public void shutdown(){ - _keepRunning.set(false); - } - /** - * Performs the sending of the membership list, after we have - * incremented our own heartbeat. - */ - abstract protected void sendMembershipList(LocalGossipMember me, ArrayList memberList); - - /** - * Abstract method which should be implemented by a subclass. - * This method should return a member of the list to gossip with. - * @param memberList The list of members which are stored in the local list of members. - * @return The chosen LocalGossipMember to gossip with. - */ - abstract protected LocalGossipMember selectPartner(ArrayList memberList); -} +package com.google.code.gossip.manager; + +import java.util.ArrayList; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import com.google.code.gossip.GossipService; +import com.google.code.gossip.LocalGossipMember; + +/** + * [The active thread: periodically send gossip request.] The class handles gossiping the membership + * list. This information is important to maintaining a common state among all the nodes, and is + * important for detecting failures. + */ +abstract public class ActiveGossipThread implements Runnable { + + private GossipManager _gossipManager; + + private final AtomicBoolean _keepRunning; + + public ActiveGossipThread(GossipManager gossipManager) { + _gossipManager = gossipManager; + _keepRunning = new AtomicBoolean(true); + } + + @Override + public void run() { + while (_keepRunning.get()) { + try { + TimeUnit.MILLISECONDS.sleep(_gossipManager.getSettings().getGossipInterval()); + sendMembershipList(_gossipManager.getMyself(), _gossipManager.getMemberList()); + } catch (InterruptedException e) { + GossipService.LOGGER.error(e); + _keepRunning.set(false); + } + } + shutdown(); + } + + public void shutdown() { + _keepRunning.set(false); + } + + /** + * Performs the sending of the membership list, after we have incremented our own heartbeat. + */ + abstract protected void sendMembershipList(LocalGossipMember me, + ArrayList memberList); + + /** + * Abstract method which should be implemented by a subclass. This method should return a member + * of the list to gossip with. + * + * @param memberList + * The list of members which are stored in the local list of members. + * @return The chosen LocalGossipMember to gossip with. + */ + abstract protected LocalGossipMember selectPartner(ArrayList memberList); +} diff --git a/src/main/java/com/google/code/gossip/manager/GossipManager.java b/src/main/java/com/google/code/gossip/manager/GossipManager.java index ed7a03e..99e308c 100644 --- a/src/main/java/com/google/code/gossip/manager/GossipManager.java +++ b/src/main/java/com/google/code/gossip/manager/GossipManager.java @@ -1,157 +1,162 @@ -package com.google.code.gossip.manager; - -import java.lang.reflect.InvocationTargetException; -import java.util.ArrayList; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -import javax.management.Notification; -import javax.management.NotificationListener; - -import com.google.code.gossip.GossipMember; -import com.google.code.gossip.GossipService; -import com.google.code.gossip.GossipSettings; -import com.google.code.gossip.LocalGossipMember; - -public abstract class GossipManager extends Thread implements NotificationListener { - /** The maximal number of bytes the packet with the GOSSIP may be. (Default is 100 kb) */ - public static final int MAX_PACKET_SIZE = 102400; - - /** The list of members which are in the gossip group (not including myself). */ - private ArrayList _memberList; - - /** The list of members which are known to be dead. */ - private ArrayList _deadList; - - /** The member I am representing. */ - private LocalGossipMember _me; - - /** The settings for gossiping. */ - private GossipSettings _settings; - - /** A boolean whether the gossip service should keep running. */ - private AtomicBoolean _gossipServiceRunning; - - /** A ExecutorService used for executing the active and passive gossip threads. */ - private ExecutorService _gossipThreadExecutor; - - private Class _passiveGossipThreadClass; - private PassiveGossipThread passiveGossipThread; - - private Class _activeGossipThreadClass; - private ActiveGossipThread activeGossipThread; - - public GossipManager(Class passiveGossipThreadClass, - Class activeGossipThreadClass, String address, int port, - String id, GossipSettings settings, ArrayList gossipMembers) { - _passiveGossipThreadClass = passiveGossipThreadClass; - _activeGossipThreadClass = activeGossipThreadClass; - _settings = settings; - _me = new LocalGossipMember(address, port, id, 0, this, settings.getCleanupInterval()); - _memberList = new ArrayList(); - _deadList = new ArrayList(); - for (GossipMember startupMember : gossipMembers) { - if (!startupMember.equals(_me)) { - LocalGossipMember member = new LocalGossipMember(startupMember.getHost(), - startupMember.getPort(), startupMember.getId(), 0, this, - settings.getCleanupInterval()); - _memberList.add(member); - GossipService.LOGGER.debug(member); - } - } - - _gossipServiceRunning = new AtomicBoolean(true); - Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { - public void run() { - GossipService.LOGGER.info("Service has been shutdown..."); - } - })); - } - - /** - * All timers associated with a member will trigger this method when it goes - * off. The timer will go off if we have not heard from this member in - * _settings.T_CLEANUP time. - */ - @Override - public void handleNotification(Notification notification, Object handback) { - LocalGossipMember deadMember = (LocalGossipMember) notification.getUserData(); - GossipService.LOGGER.info("Dead member detected: " + deadMember); - synchronized (this._memberList) { - this._memberList.remove(deadMember); - } - synchronized (this._deadList) { - this._deadList.add(deadMember); - } - } - - public GossipSettings getSettings() { - return _settings; - } - - /** - * Get a clone of the memberlist. - * @return - */ - public ArrayList getMemberList() { - return _memberList; - } - - public LocalGossipMember getMyself() { - return _me; - } - - public ArrayList getDeadList() { - return _deadList; - } - - /** - * Starts the client. Specifically, start the various cycles for this protocol. - * Start the gossip thread and start the receiver thread. - * @throws InterruptedException - */ - public void run() { - for (LocalGossipMember member : _memberList) { - if (member != _me) { - member.startTimeoutTimer(); - } - } - _gossipThreadExecutor = Executors.newCachedThreadPool(); - try { - passiveGossipThread = _passiveGossipThreadClass.getConstructor(GossipManager.class).newInstance(this); - _gossipThreadExecutor.execute(passiveGossipThread); - activeGossipThread = _activeGossipThreadClass.getConstructor(GossipManager.class).newInstance(this); - _gossipThreadExecutor.execute(activeGossipThread); - } catch (InstantiationException | IllegalAccessException | IllegalArgumentException - | InvocationTargetException | NoSuchMethodException | SecurityException e1) { - throw new RuntimeException(e1); - } - GossipService.LOGGER.info("The GossipService is started."); - while(_gossipServiceRunning.get()) { - try { - //TODO - TimeUnit.MILLISECONDS.sleep(1); - } catch (InterruptedException e) { - GossipService.LOGGER.info("The GossipClient was interrupted."); - } - } - } - - /** - * Shutdown the gossip service. - */ - public void shutdown() { - _gossipThreadExecutor.shutdown(); - passiveGossipThread.shutdown(); - activeGossipThread.shutdown(); - try { - boolean result = _gossipThreadExecutor.awaitTermination(1000, TimeUnit.MILLISECONDS); - System.err.println("Terminate retuned " + result); - } catch (InterruptedException e) { - e.printStackTrace(); - } - _gossipServiceRunning.set(false); - } -} +package com.google.code.gossip.manager; + +import java.lang.reflect.InvocationTargetException; +import java.util.ArrayList; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.management.Notification; +import javax.management.NotificationListener; + +import com.google.code.gossip.GossipMember; +import com.google.code.gossip.GossipService; +import com.google.code.gossip.GossipSettings; +import com.google.code.gossip.LocalGossipMember; + +public abstract class GossipManager extends Thread implements NotificationListener { + /** The maximal number of bytes the packet with the GOSSIP may be. (Default is 100 kb) */ + public static final int MAX_PACKET_SIZE = 102400; + + /** The list of members which are in the gossip group (not including myself). */ + private ArrayList _memberList; + + /** The list of members which are known to be dead. */ + private ArrayList _deadList; + + /** The member I am representing. */ + private LocalGossipMember _me; + + /** The settings for gossiping. */ + private GossipSettings _settings; + + /** A boolean whether the gossip service should keep running. */ + private AtomicBoolean _gossipServiceRunning; + + /** A ExecutorService used for executing the active and passive gossip threads. */ + private ExecutorService _gossipThreadExecutor; + + private Class _passiveGossipThreadClass; + + private PassiveGossipThread passiveGossipThread; + + private Class _activeGossipThreadClass; + + private ActiveGossipThread activeGossipThread; + + public GossipManager(Class passiveGossipThreadClass, + Class activeGossipThreadClass, String address, int port, + String id, GossipSettings settings, ArrayList gossipMembers) { + _passiveGossipThreadClass = passiveGossipThreadClass; + _activeGossipThreadClass = activeGossipThreadClass; + _settings = settings; + _me = new LocalGossipMember(address, port, id, 0, this, settings.getCleanupInterval()); + _memberList = new ArrayList(); + _deadList = new ArrayList(); + for (GossipMember startupMember : gossipMembers) { + if (!startupMember.equals(_me)) { + LocalGossipMember member = new LocalGossipMember(startupMember.getHost(), + startupMember.getPort(), startupMember.getId(), 0, this, + settings.getCleanupInterval()); + _memberList.add(member); + GossipService.LOGGER.debug(member); + } + } + + _gossipServiceRunning = new AtomicBoolean(true); + Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { + public void run() { + GossipService.LOGGER.info("Service has been shutdown..."); + } + })); + } + + /** + * All timers associated with a member will trigger this method when it goes off. The timer will + * go off if we have not heard from this member in _settings.T_CLEANUP time. + */ + @Override + public void handleNotification(Notification notification, Object handback) { + LocalGossipMember deadMember = (LocalGossipMember) notification.getUserData(); + GossipService.LOGGER.info("Dead member detected: " + deadMember); + synchronized (this._memberList) { + this._memberList.remove(deadMember); + } + synchronized (this._deadList) { + this._deadList.add(deadMember); + } + } + + public GossipSettings getSettings() { + return _settings; + } + + /** + * Get a clone of the memberlist. + * + * @return + */ + public ArrayList getMemberList() { + return _memberList; + } + + public LocalGossipMember getMyself() { + return _me; + } + + public ArrayList getDeadList() { + return _deadList; + } + + /** + * Starts the client. Specifically, start the various cycles for this protocol. Start the gossip + * thread and start the receiver thread. + * + * @throws InterruptedException + */ + public void run() { + for (LocalGossipMember member : _memberList) { + if (member != _me) { + member.startTimeoutTimer(); + } + } + _gossipThreadExecutor = Executors.newCachedThreadPool(); + try { + passiveGossipThread = _passiveGossipThreadClass.getConstructor(GossipManager.class) + .newInstance(this); + _gossipThreadExecutor.execute(passiveGossipThread); + activeGossipThread = _activeGossipThreadClass.getConstructor(GossipManager.class) + .newInstance(this); + _gossipThreadExecutor.execute(activeGossipThread); + } catch (InstantiationException | IllegalAccessException | IllegalArgumentException + | InvocationTargetException | NoSuchMethodException | SecurityException e1) { + throw new RuntimeException(e1); + } + GossipService.LOGGER.info("The GossipService is started."); + while (_gossipServiceRunning.get()) { + try { + // TODO + TimeUnit.MILLISECONDS.sleep(1); + } catch (InterruptedException e) { + GossipService.LOGGER.info("The GossipClient was interrupted."); + } + } + } + + /** + * Shutdown the gossip service. + */ + public void shutdown() { + _gossipThreadExecutor.shutdown(); + passiveGossipThread.shutdown(); + activeGossipThread.shutdown(); + try { + boolean result = _gossipThreadExecutor.awaitTermination(1000, TimeUnit.MILLISECONDS); + System.err.println("Terminate retuned " + result); + } catch (InterruptedException e) { + e.printStackTrace(); + } + _gossipServiceRunning.set(false); + } +} diff --git a/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java b/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java index 044a7d4..08342b4 100644 --- a/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java +++ b/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java @@ -1,142 +1,148 @@ -package com.google.code.gossip.manager; - -import java.io.IOException; -import java.net.DatagramPacket; -import java.net.DatagramSocket; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.net.SocketException; -import java.util.ArrayList; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.json.JSONArray; -import org.json.JSONException; -import org.json.JSONObject; - -import com.google.code.gossip.GossipMember; -import com.google.code.gossip.GossipService; -import com.google.code.gossip.RemoteGossipMember; - -/** - * [The passive thread: reply to incoming gossip request.] - * This class handles the passive cycle, where this client - * has received an incoming message. For now, this message - * is always the membership list, but if you choose to gossip - * additional information, you will need some logic to determine - * the incoming message. - */ -abstract public class PassiveGossipThread implements Runnable { - - /** The socket used for the passive thread of the gossip service. */ - private DatagramSocket _server; - - private GossipManager _gossipManager; - - private AtomicBoolean _keepRunning; - - public PassiveGossipThread(GossipManager gossipManager) { - _gossipManager = gossipManager; - try { - SocketAddress socketAddress = new InetSocketAddress(_gossipManager.getMyself().getHost(), _gossipManager.getMyself().getPort()); - _server = new DatagramSocket(socketAddress); - GossipService.LOGGER.info("Gossip service successfully initialized on port " + _gossipManager.getMyself().getPort()); - GossipService.LOGGER.debug("I am " + _gossipManager.getMyself()); - } catch (SocketException ex) { - GossipService.LOGGER.error(ex); - _server = null; - throw new RuntimeException(ex); - } - _keepRunning = new AtomicBoolean(true); - } - - @Override - public void run() { - while (_keepRunning.get()) { - try { - byte[] buf = new byte[_server.getReceiveBufferSize()]; - DatagramPacket p = new DatagramPacket(buf, buf.length); - _server.receive(p); - GossipService.LOGGER.debug("A message has been received from " + p.getAddress() + ":" - + p.getPort() + "."); - - int packet_length = 0; - for (int i = 0; i < 4; i++) { - int shift = (4 - 1 - i) * 8; - packet_length += (buf[i] & 0x000000FF) << shift; - } - - // Check whether the package is smaller than the maximal packet length. - // A package larger than this would not be possible to be send from a GossipService, - // since this is check before sending the message. - // This could normally only occur when the list of members is very big, - // or when the packet is misformed, and the first 4 bytes is not the right in anymore. - // For this reason we regards the message. - if (packet_length <= GossipManager.MAX_PACKET_SIZE) { - byte[] json_bytes = new byte[packet_length]; - for (int i = 0; i < packet_length; i++) { - json_bytes[i] = buf[i + 4]; - } - String receivedMessage = new String(json_bytes); - GossipService.LOGGER.debug("Received message (" + packet_length + " bytes): " - + receivedMessage); - try { - ArrayList remoteGossipMembers = new ArrayList(); - RemoteGossipMember senderMember = null; - GossipService.LOGGER.debug("Received member list:"); - JSONArray jsonArray = new JSONArray(receivedMessage); - for (int i = 0; i < jsonArray.length(); i++) { - JSONObject memberJSONObject = jsonArray.getJSONObject(i); - if (memberJSONObject.length() == 4) { - RemoteGossipMember member = new RemoteGossipMember( - memberJSONObject.getString(GossipMember.JSON_HOST), - memberJSONObject.getInt(GossipMember.JSON_PORT), - memberJSONObject.getString(GossipMember.JSON_ID), - memberJSONObject.getInt(GossipMember.JSON_HEARTBEAT)); - GossipService.LOGGER.debug(member.toString()); - // This is the first member found, so this should be the member who is communicating - // with me. - if (i == 0) { - senderMember = member; - } - remoteGossipMembers.add(member); - } else { - GossipService.LOGGER - .error("The received member object does not contain 4 objects:\n" - + memberJSONObject.toString()); - } - - } - - // Merge our list with the one we just received - mergeLists(_gossipManager, senderMember, remoteGossipMembers); - } catch (JSONException e) { - GossipService.LOGGER - .error("The received message is not well-formed JSON. The following message has been dropped:\n" - + receivedMessage); - } - - } else { - GossipService.LOGGER - .error("The received message is not of the expected size, it has been dropped."); - } - - } catch (IOException e) { - e.printStackTrace(); - _keepRunning.set(false); - } - } - shutdown(); - } - - public void shutdown(){ - _server.close(); - } - - /** - * Abstract method for merging the local and remote list. - * @param gossipManager The GossipManager for retrieving the local members and dead members list. - * @param senderMember The member who is sending this list, this could be used to send a response if the remote list contains out-dated information. - * @param remoteList The list of members known at the remote side. - */ - abstract protected void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember, ArrayList remoteList); +package com.google.code.gossip.manager; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.net.SocketException; +import java.util.ArrayList; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.json.JSONArray; +import org.json.JSONException; +import org.json.JSONObject; + +import com.google.code.gossip.GossipMember; +import com.google.code.gossip.GossipService; +import com.google.code.gossip.RemoteGossipMember; + +/** + * [The passive thread: reply to incoming gossip request.] This class handles the passive cycle, + * where this client has received an incoming message. For now, this message is always the + * membership list, but if you choose to gossip additional information, you will need some logic to + * determine the incoming message. + */ +abstract public class PassiveGossipThread implements Runnable { + + /** The socket used for the passive thread of the gossip service. */ + private DatagramSocket _server; + + private GossipManager _gossipManager; + + private AtomicBoolean _keepRunning; + + public PassiveGossipThread(GossipManager gossipManager) { + _gossipManager = gossipManager; + try { + SocketAddress socketAddress = new InetSocketAddress(_gossipManager.getMyself().getHost(), + _gossipManager.getMyself().getPort()); + _server = new DatagramSocket(socketAddress); + GossipService.LOGGER.info("Gossip service successfully initialized on port " + + _gossipManager.getMyself().getPort()); + GossipService.LOGGER.debug("I am " + _gossipManager.getMyself()); + } catch (SocketException ex) { + GossipService.LOGGER.error(ex); + _server = null; + throw new RuntimeException(ex); + } + _keepRunning = new AtomicBoolean(true); + } + + @Override + public void run() { + while (_keepRunning.get()) { + try { + byte[] buf = new byte[_server.getReceiveBufferSize()]; + DatagramPacket p = new DatagramPacket(buf, buf.length); + _server.receive(p); + GossipService.LOGGER.debug("A message has been received from " + p.getAddress() + ":" + + p.getPort() + "."); + + int packet_length = 0; + for (int i = 0; i < 4; i++) { + int shift = (4 - 1 - i) * 8; + packet_length += (buf[i] & 0x000000FF) << shift; + } + + // Check whether the package is smaller than the maximal packet length. + // A package larger than this would not be possible to be send from a GossipService, + // since this is check before sending the message. + // This could normally only occur when the list of members is very big, + // or when the packet is misformed, and the first 4 bytes is not the right in anymore. + // For this reason we regards the message. + if (packet_length <= GossipManager.MAX_PACKET_SIZE) { + byte[] json_bytes = new byte[packet_length]; + for (int i = 0; i < packet_length; i++) { + json_bytes[i] = buf[i + 4]; + } + String receivedMessage = new String(json_bytes); + GossipService.LOGGER.debug("Received message (" + packet_length + " bytes): " + + receivedMessage); + try { + ArrayList remoteGossipMembers = new ArrayList(); + RemoteGossipMember senderMember = null; + GossipService.LOGGER.debug("Received member list:"); + JSONArray jsonArray = new JSONArray(receivedMessage); + for (int i = 0; i < jsonArray.length(); i++) { + JSONObject memberJSONObject = jsonArray.getJSONObject(i); + if (memberJSONObject.length() == 4) { + RemoteGossipMember member = new RemoteGossipMember( + memberJSONObject.getString(GossipMember.JSON_HOST), + memberJSONObject.getInt(GossipMember.JSON_PORT), + memberJSONObject.getString(GossipMember.JSON_ID), + memberJSONObject.getInt(GossipMember.JSON_HEARTBEAT)); + GossipService.LOGGER.debug(member.toString()); + // This is the first member found, so this should be the member who is communicating + // with me. + if (i == 0) { + senderMember = member; + } + remoteGossipMembers.add(member); + } else { + GossipService.LOGGER + .error("The received member object does not contain 4 objects:\n" + + memberJSONObject.toString()); + } + + } + + // Merge our list with the one we just received + mergeLists(_gossipManager, senderMember, remoteGossipMembers); + } catch (JSONException e) { + GossipService.LOGGER + .error("The received message is not well-formed JSON. The following message has been dropped:\n" + + receivedMessage); + } + + } else { + GossipService.LOGGER + .error("The received message is not of the expected size, it has been dropped."); + } + + } catch (IOException e) { + e.printStackTrace(); + _keepRunning.set(false); + } + } + shutdown(); + } + + public void shutdown() { + _server.close(); + } + + /** + * Abstract method for merging the local and remote list. + * + * @param gossipManager + * The GossipManager for retrieving the local members and dead members list. + * @param senderMember + * The member who is sending this list, this could be used to send a response if the + * remote list contains out-dated information. + * @param remoteList + * The list of members known at the remote side. + */ + abstract protected void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember, + ArrayList remoteList); } \ No newline at end of file diff --git a/src/main/java/com/google/code/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java b/src/main/java/com/google/code/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java index 743f555..206d5c5 100644 --- a/src/main/java/com/google/code/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java +++ b/src/main/java/com/google/code/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java @@ -1,93 +1,111 @@ -package com.google.code.gossip.manager.impl; - -import java.util.ArrayList; - -import com.google.code.gossip.GossipMember; -import com.google.code.gossip.GossipService; -import com.google.code.gossip.LocalGossipMember; -import com.google.code.gossip.RemoteGossipMember; -import com.google.code.gossip.manager.GossipManager; -import com.google.code.gossip.manager.PassiveGossipThread; - -public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread { - - public OnlyProcessReceivedPassiveGossipThread(GossipManager gossipManager) { - super(gossipManager); - } - - /** - * Merge remote list (received from peer), and our local member list. - * Simply, we must update the heartbeats that the remote list has with - * our list. Also, some additional logic is needed to make sure we have - * not timed out a member and then immediately received a list with that - * member. - * @param remoteList - */ - protected void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember, ArrayList remoteList) { - - synchronized (gossipManager.getDeadList()) { - - synchronized (gossipManager.getMemberList()) { - - for (GossipMember remoteMember : remoteList) { - // Skip myself. We don't want ourselves in the local member list. - if (!remoteMember.equals(gossipManager.getMyself())) { - if (gossipManager.getMemberList().contains(remoteMember)) { - GossipService.LOGGER.debug("The local list already contains the remote member (" + remoteMember + ")."); - // The local memberlist contains the remote member. - LocalGossipMember localMember = gossipManager.getMemberList().get(gossipManager.getMemberList().indexOf(remoteMember)); - - // Let's synchronize it's heartbeat. - if (remoteMember.getHeartbeat() > localMember.getHeartbeat()) { - // update local list with latest heartbeat - localMember.setHeartbeat(remoteMember.getHeartbeat()); - // and reset the timeout of that member - localMember.resetTimeoutTimer(); - } - // TODO: Otherwise, should we inform the other when the heartbeat is already higher? - } else { - // The local list does not contain the remote member. - GossipService.LOGGER.debug("The local list does not contain the remote member (" + remoteMember + ")."); - - // The remote member is either brand new, or a previously declared dead member. - // If its dead, check the heartbeat because it may have come back from the dead. - if (gossipManager.getDeadList().contains(remoteMember)) { - // The remote member is known here as a dead member. - GossipService.LOGGER.debug("The remote member is known here as a dead member."); - LocalGossipMember localDeadMember = gossipManager.getDeadList().get(gossipManager.getDeadList().indexOf(remoteMember)); - // If a member is restarted the heartbeat will restart from 1, so we should check that here. - // So a member can become from the dead when it is either larger than a previous heartbeat (due to network failure) - // or when the heartbeat is 1 (after a restart of the service). - // TODO: What if the first message of a gossip service is sent to a dead node? The second member will receive a heartbeat of two. - // TODO: The above does happen. Maybe a special message for a revived member? - // TODO: Or maybe when a member is declared dead for more than _settings.getCleanupInterval() ms, reset the heartbeat to 0. - // It will then accept a revived member. - // The above is now handle by checking whether the heartbeat differs _settings.getCleanupInterval(), it must be restarted. - if (remoteMember.getHeartbeat() == 1 - || ((localDeadMember.getHeartbeat() - remoteMember.getHeartbeat()) * -1) > (gossipManager.getSettings().getCleanupInterval() / 1000) - || remoteMember.getHeartbeat() > localDeadMember.getHeartbeat()) { - GossipService.LOGGER.debug("The remote member is back from the dead. We will remove it from the dead list and add it as a new member."); - // The remote member is back from the dead. - // Remove it from the dead list. - gossipManager.getDeadList().remove(localDeadMember); - // Add it as a new member and add it to the member list. - LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getHost(), remoteMember.getPort(), remoteMember.getId(), remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings().getCleanupInterval()); - gossipManager.getMemberList().add(newLocalMember); - newLocalMember.startTimeoutTimer(); - GossipService.LOGGER.info("Removed remote member " + remoteMember.getAddress() + " from dead list and added to local member list."); - } - } else { - // Brand spanking new member - welcome. - LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getHost(), remoteMember.getPort(), remoteMember.getId(), remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings().getCleanupInterval()); - gossipManager.getMemberList().add(newLocalMember); - newLocalMember.startTimeoutTimer(); - GossipService.LOGGER.info("Added new remote member " + remoteMember.getAddress() + " to local member list."); - } - } - } - } - } - } - } - -} +package com.google.code.gossip.manager.impl; + +import java.util.ArrayList; + +import com.google.code.gossip.GossipMember; +import com.google.code.gossip.GossipService; +import com.google.code.gossip.LocalGossipMember; +import com.google.code.gossip.RemoteGossipMember; +import com.google.code.gossip.manager.GossipManager; +import com.google.code.gossip.manager.PassiveGossipThread; + +public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread { + + public OnlyProcessReceivedPassiveGossipThread(GossipManager gossipManager) { + super(gossipManager); + } + + /** + * Merge remote list (received from peer), and our local member list. Simply, we must update the + * heartbeats that the remote list has with our list. Also, some additional logic is needed to + * make sure we have not timed out a member and then immediately received a list with that member. + * + * @param remoteList + */ + protected void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember, + ArrayList remoteList) { + + synchronized (gossipManager.getDeadList()) { + + synchronized (gossipManager.getMemberList()) { + + for (GossipMember remoteMember : remoteList) { + // Skip myself. We don't want ourselves in the local member list. + if (!remoteMember.equals(gossipManager.getMyself())) { + if (gossipManager.getMemberList().contains(remoteMember)) { + GossipService.LOGGER.debug("The local list already contains the remote member (" + + remoteMember + ")."); + // The local memberlist contains the remote member. + LocalGossipMember localMember = gossipManager.getMemberList().get( + gossipManager.getMemberList().indexOf(remoteMember)); + + // Let's synchronize it's heartbeat. + if (remoteMember.getHeartbeat() > localMember.getHeartbeat()) { + // update local list with latest heartbeat + localMember.setHeartbeat(remoteMember.getHeartbeat()); + // and reset the timeout of that member + localMember.resetTimeoutTimer(); + } + // TODO: Otherwise, should we inform the other when the heartbeat is already higher? + } else { + // The local list does not contain the remote member. + GossipService.LOGGER.debug("The local list does not contain the remote member (" + + remoteMember + ")."); + + // The remote member is either brand new, or a previously declared dead member. + // If its dead, check the heartbeat because it may have come back from the dead. + if (gossipManager.getDeadList().contains(remoteMember)) { + // The remote member is known here as a dead member. + GossipService.LOGGER.debug("The remote member is known here as a dead member."); + LocalGossipMember localDeadMember = gossipManager.getDeadList().get( + gossipManager.getDeadList().indexOf(remoteMember)); + // If a member is restarted the heartbeat will restart from 1, so we should check + // that here. + // So a member can become from the dead when it is either larger than a previous + // heartbeat (due to network failure) + // or when the heartbeat is 1 (after a restart of the service). + // TODO: What if the first message of a gossip service is sent to a dead node? The + // second member will receive a heartbeat of two. + // TODO: The above does happen. Maybe a special message for a revived member? + // TODO: Or maybe when a member is declared dead for more than + // _settings.getCleanupInterval() ms, reset the heartbeat to 0. + // It will then accept a revived member. + // The above is now handle by checking whether the heartbeat differs + // _settings.getCleanupInterval(), it must be restarted. + if (remoteMember.getHeartbeat() == 1 + || ((localDeadMember.getHeartbeat() - remoteMember.getHeartbeat()) * -1) > (gossipManager + .getSettings().getCleanupInterval() / 1000) + || remoteMember.getHeartbeat() > localDeadMember.getHeartbeat()) { + GossipService.LOGGER + .debug("The remote member is back from the dead. We will remove it from the dead list and add it as a new member."); + // The remote member is back from the dead. + // Remove it from the dead list. + gossipManager.getDeadList().remove(localDeadMember); + // Add it as a new member and add it to the member list. + LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getHost(), + remoteMember.getPort(), remoteMember.getId(), + remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings() + .getCleanupInterval()); + gossipManager.getMemberList().add(newLocalMember); + newLocalMember.startTimeoutTimer(); + GossipService.LOGGER.info("Removed remote member " + remoteMember.getAddress() + + " from dead list and added to local member list."); + } + } else { + // Brand spanking new member - welcome. + LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getHost(), + remoteMember.getPort(), remoteMember.getId(), remoteMember.getHeartbeat(), + gossipManager, gossipManager.getSettings().getCleanupInterval()); + gossipManager.getMemberList().add(newLocalMember); + newLocalMember.startTimeoutTimer(); + GossipService.LOGGER.info("Added new remote member " + remoteMember.getAddress() + + " to local member list."); + } + } + } + } + } + } + } + +} diff --git a/src/main/java/com/google/code/gossip/manager/impl/SendMembersActiveGossipThread.java b/src/main/java/com/google/code/gossip/manager/impl/SendMembersActiveGossipThread.java index 4ccfde8..3e2baf9 100644 --- a/src/main/java/com/google/code/gossip/manager/impl/SendMembersActiveGossipThread.java +++ b/src/main/java/com/google/code/gossip/manager/impl/SendMembersActiveGossipThread.java @@ -1,94 +1,95 @@ -package com.google.code.gossip.manager.impl; - -import java.io.IOException; -import java.net.DatagramPacket; -import java.net.DatagramSocket; -import java.net.InetAddress; -import java.nio.ByteBuffer; -import java.util.ArrayList; - -import org.json.JSONArray; - -import com.google.code.gossip.GossipService; -import com.google.code.gossip.LocalGossipMember; -import com.google.code.gossip.manager.ActiveGossipThread; -import com.google.code.gossip.manager.GossipManager; - -abstract public class SendMembersActiveGossipThread extends ActiveGossipThread { - - public SendMembersActiveGossipThread(GossipManager gossipManager) { - super(gossipManager); - } - - /** - * Performs the sending of the membership list, after we have - * incremented our own heartbeat. - */ - protected void sendMembershipList(LocalGossipMember me, ArrayList memberList) { - GossipService.LOGGER.debug("Send sendMembershipList() is called."); - - // Increase the heartbeat of myself by 1. - me.setHeartbeat(me.getHeartbeat() + 1); - - synchronized (memberList) { - try { - LocalGossipMember member = selectPartner(memberList); - - if (member != null) { - InetAddress dest = InetAddress.getByName(member.getHost()); - - // Create a StringBuffer for the JSON message. - JSONArray jsonArray = new JSONArray(); - GossipService.LOGGER.debug("Sending memberlist to " + dest + ":" + member.getPort()); - GossipService.LOGGER.debug("---------------------"); - - // First write myself, append the JSON representation of the member to the buffer. - jsonArray.put(me.toJSONObject()); - GossipService.LOGGER.debug(me); - - // Then write the others. - for (int i=0; i> 24 ); - length_bytes[1] =(byte)( (packet_length << 8) >> 24 ); - length_bytes[2] =(byte)( (packet_length << 16) >> 24 ); - length_bytes[3] =(byte)( (packet_length << 24) >> 24 ); - - - GossipService.LOGGER.debug("Sending message ("+packet_length+" bytes): " + jsonArray.toString()); - - ByteBuffer byteBuffer = ByteBuffer.allocate(4 + json_bytes.length); - byteBuffer.put(length_bytes); - byteBuffer.put(json_bytes); - byte[] buf = byteBuffer.array(); - - DatagramSocket socket = new DatagramSocket(); - DatagramPacket datagramPacket = new DatagramPacket(buf, buf.length, dest, member.getPort()); - socket.send(datagramPacket); - socket.close(); - } else { - GossipService.LOGGER.error("The length of the to be send message is too large (" + packet_length + " > " + GossipManager.MAX_PACKET_SIZE + ")."); - } - } - - } catch (IOException e1) { - e1.printStackTrace(); - } - } - } -} +package com.google.code.gossip.manager.impl; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.InetAddress; +import java.nio.ByteBuffer; +import java.util.ArrayList; + +import org.json.JSONArray; + +import com.google.code.gossip.GossipService; +import com.google.code.gossip.LocalGossipMember; +import com.google.code.gossip.manager.ActiveGossipThread; +import com.google.code.gossip.manager.GossipManager; + +abstract public class SendMembersActiveGossipThread extends ActiveGossipThread { + + public SendMembersActiveGossipThread(GossipManager gossipManager) { + super(gossipManager); + } + + /** + * Performs the sending of the membership list, after we have incremented our own heartbeat. + */ + protected void sendMembershipList(LocalGossipMember me, ArrayList memberList) { + GossipService.LOGGER.debug("Send sendMembershipList() is called."); + + // Increase the heartbeat of myself by 1. + me.setHeartbeat(me.getHeartbeat() + 1); + + synchronized (memberList) { + try { + LocalGossipMember member = selectPartner(memberList); + + if (member != null) { + InetAddress dest = InetAddress.getByName(member.getHost()); + + // Create a StringBuffer for the JSON message. + JSONArray jsonArray = new JSONArray(); + GossipService.LOGGER.debug("Sending memberlist to " + dest + ":" + member.getPort()); + GossipService.LOGGER.debug("---------------------"); + + // First write myself, append the JSON representation of the member to the buffer. + jsonArray.put(me.toJSONObject()); + GossipService.LOGGER.debug(me); + + // Then write the others. + for (int i = 0; i < memberList.size(); i++) { + LocalGossipMember other = memberList.get(i); + // Append the JSON representation of the member to the buffer. + jsonArray.put(other.toJSONObject()); + GossipService.LOGGER.debug(other); + } + GossipService.LOGGER.debug("---------------------"); + + // Write the objects to a byte array. + byte[] json_bytes = jsonArray.toString().getBytes(); + + int packet_length = json_bytes.length; + + if (packet_length < GossipManager.MAX_PACKET_SIZE) { + + // Convert the packet length to the byte representation of the int. + byte[] length_bytes = new byte[4]; + length_bytes[0] = (byte) (packet_length >> 24); + length_bytes[1] = (byte) ((packet_length << 8) >> 24); + length_bytes[2] = (byte) ((packet_length << 16) >> 24); + length_bytes[3] = (byte) ((packet_length << 24) >> 24); + + GossipService.LOGGER.debug("Sending message (" + packet_length + " bytes): " + + jsonArray.toString()); + + ByteBuffer byteBuffer = ByteBuffer.allocate(4 + json_bytes.length); + byteBuffer.put(length_bytes); + byteBuffer.put(json_bytes); + byte[] buf = byteBuffer.array(); + + DatagramSocket socket = new DatagramSocket(); + DatagramPacket datagramPacket = new DatagramPacket(buf, buf.length, dest, + member.getPort()); + socket.send(datagramPacket); + socket.close(); + } else { + GossipService.LOGGER.error("The length of the to be send message is too large (" + + packet_length + " > " + GossipManager.MAX_PACKET_SIZE + ")."); + } + } + + } catch (IOException e1) { + e1.printStackTrace(); + } + } + } +} diff --git a/src/main/java/com/google/code/gossip/manager/random/RandomActiveGossipThread.java b/src/main/java/com/google/code/gossip/manager/random/RandomActiveGossipThread.java index c4dcb2e..917c362 100644 --- a/src/main/java/com/google/code/gossip/manager/random/RandomActiveGossipThread.java +++ b/src/main/java/com/google/code/gossip/manager/random/RandomActiveGossipThread.java @@ -1,38 +1,38 @@ -package com.google.code.gossip.manager.random; - -import java.util.ArrayList; -import java.util.Random; - -import com.google.code.gossip.GossipService; -import com.google.code.gossip.LocalGossipMember; -import com.google.code.gossip.manager.GossipManager; -import com.google.code.gossip.manager.impl.SendMembersActiveGossipThread; - -public class RandomActiveGossipThread extends SendMembersActiveGossipThread { - - /** The Random used for choosing a member to gossip with. */ - private Random _random; - - public RandomActiveGossipThread(GossipManager gossipManager) { - super(gossipManager); - _random = new Random(); - } - - /** - * [The selectToSend() function.] - * Find a random peer from the local membership list. - * In the case where this client is the only member in the list, this method will return null. - * @return Member random member if list is greater than 1, null otherwise - */ - protected LocalGossipMember selectPartner(ArrayList memberList) { - LocalGossipMember member = null; - if (memberList.size() > 0) { - int randomNeighborIndex = _random.nextInt(memberList.size()); - member = memberList.get(randomNeighborIndex); - } else { - GossipService.LOGGER.debug("I am alone in this world."); - } - return member; - } - -} +package com.google.code.gossip.manager.random; + +import java.util.ArrayList; +import java.util.Random; + +import com.google.code.gossip.GossipService; +import com.google.code.gossip.LocalGossipMember; +import com.google.code.gossip.manager.GossipManager; +import com.google.code.gossip.manager.impl.SendMembersActiveGossipThread; + +public class RandomActiveGossipThread extends SendMembersActiveGossipThread { + + /** The Random used for choosing a member to gossip with. */ + private Random _random; + + public RandomActiveGossipThread(GossipManager gossipManager) { + super(gossipManager); + _random = new Random(); + } + + /** + * [The selectToSend() function.] Find a random peer from the local membership list. In the case + * where this client is the only member in the list, this method will return null. + * + * @return Member random member if list is greater than 1, null otherwise + */ + protected LocalGossipMember selectPartner(ArrayList memberList) { + LocalGossipMember member = null; + if (memberList.size() > 0) { + int randomNeighborIndex = _random.nextInt(memberList.size()); + member = memberList.get(randomNeighborIndex); + } else { + GossipService.LOGGER.debug("I am alone in this world."); + } + return member; + } + +} diff --git a/src/main/java/com/google/code/gossip/manager/random/RandomGossipManager.java b/src/main/java/com/google/code/gossip/manager/random/RandomGossipManager.java index 9e2c60f..8893ff5 100644 --- a/src/main/java/com/google/code/gossip/manager/random/RandomGossipManager.java +++ b/src/main/java/com/google/code/gossip/manager/random/RandomGossipManager.java @@ -1,14 +1,16 @@ -package com.google.code.gossip.manager.random; - -import java.util.ArrayList; - -import com.google.code.gossip.GossipMember; -import com.google.code.gossip.GossipSettings; -import com.google.code.gossip.manager.GossipManager; -import com.google.code.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread; - -public class RandomGossipManager extends GossipManager { - public RandomGossipManager(String address, int port, String id, GossipSettings settings, ArrayList gossipMembers) { - super(OnlyProcessReceivedPassiveGossipThread.class, RandomActiveGossipThread.class, address, port, id, settings, gossipMembers); - } -} +package com.google.code.gossip.manager.random; + +import java.util.ArrayList; + +import com.google.code.gossip.GossipMember; +import com.google.code.gossip.GossipSettings; +import com.google.code.gossip.manager.GossipManager; +import com.google.code.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread; + +public class RandomGossipManager extends GossipManager { + public RandomGossipManager(String address, int port, String id, GossipSettings settings, + ArrayList gossipMembers) { + super(OnlyProcessReceivedPassiveGossipThread.class, RandomActiveGossipThread.class, address, + port, id, settings, gossipMembers); + } +}