From 17f1ad6f4fe27927c357bfb74754e3c574efc2aa Mon Sep 17 00:00:00 2001 From: Edward Capriolo Date: Sat, 17 Jan 2015 18:21:57 -0500 Subject: [PATCH] Apply formatting --- .../google/code/gossip/GossipSettings.java | 117 +++---- .../code/gossip/GossipTimeoutTimer.java | 86 ++--- .../google/code/gossip/LocalGossipMember.java | 65 ++-- .../java/com/google/code/gossip/LogLevel.java | 42 +-- .../code/gossip/RemoteGossipMember.java | 46 +-- .../google/code/gossip/StartupSettings.java | 303 ++++++++++-------- .../code/gossip/examples/GossipExample.java | 118 +++---- .../gossip/manager/ActiveGossipThread.java | 85 ++--- .../code/gossip/manager/GossipManager.java | 233 +++++++------- .../gossip/manager/PassiveGossipThread.java | 78 ++--- ...nlyProcessReceivedPassiveGossipThread.java | 164 +++++----- .../impl/SendMembersActiveGossipThread.java | 143 +++++---- .../random/RandomActiveGossipThread.java | 48 +-- .../manager/random/RandomGossipManager.java | 8 +- 14 files changed, 811 insertions(+), 725 deletions(-) diff --git a/src/main/java/com/google/code/gossip/GossipSettings.java b/src/main/java/com/google/code/gossip/GossipSettings.java index 9694724..10c20ff 100644 --- a/src/main/java/com/google/code/gossip/GossipSettings.java +++ b/src/main/java/com/google/code/gossip/GossipSettings.java @@ -7,58 +7,67 @@ package com.google.code.gossip; */ public class GossipSettings { - /** Time between gossip'ing in ms. Default is 1 second. */ - private int _gossipInterval = 1000; - - /** Time between cleanups in ms. Default is 10 seconds. */ - private int _cleanupInterval = 10000; - - /** - * Construct GossipSettings with default settings. - */ - public GossipSettings() {} - - /** - * Construct GossipSettings with given settings. - * @param gossipInterval The gossip interval in ms. - * @param cleanupInterval The cleanup interval in ms. - */ - public GossipSettings(int gossipInterval, int cleanupInterval) { - _gossipInterval = gossipInterval; - _cleanupInterval = cleanupInterval; - } - - /** - * Set the gossip interval. - * This is the time between a gossip message is send. - * @param gossipInterval The gossip interval in ms. - */ - public void setGossipTimeout(int gossipInterval) { - _gossipInterval = gossipInterval; - } - - /** - * Set the cleanup interval. - * This is the time between the last heartbeat received from a member and when it will be marked as dead. - * @param cleanupInterval The cleanup interval in ms. - */ - public void setCleanupInterval(int cleanupInterval) { - _cleanupInterval = cleanupInterval; - } - - /** - * Get the gossip interval. - * @return The gossip interval in ms. - */ - public int getGossipInterval() { - return _gossipInterval; - } - - /** - * Get the clean interval. - * @return The cleanup interval. - */ - public int getCleanupInterval() { - return _cleanupInterval; - } + /** Time between gossip'ing in ms. Default is 1 second. */ + private int _gossipInterval = 1000; + + /** Time between cleanups in ms. Default is 10 seconds. */ + private int _cleanupInterval = 10000; + + /** + * Construct GossipSettings with default settings. + */ + public GossipSettings() { + } + + /** + * Construct GossipSettings with given settings. + * + * @param gossipInterval + * The gossip interval in ms. + * @param cleanupInterval + * The cleanup interval in ms. + */ + public GossipSettings(int gossipInterval, int cleanupInterval) { + _gossipInterval = gossipInterval; + _cleanupInterval = cleanupInterval; + } + + /** + * Set the gossip interval. This is the time between a gossip message is send. + * + * @param gossipInterval + * The gossip interval in ms. + */ + public void setGossipTimeout(int gossipInterval) { + _gossipInterval = gossipInterval; + } + + /** + * Set the cleanup interval. This is the time between the last heartbeat received from a member + * and when it will be marked as dead. + * + * @param cleanupInterval + * The cleanup interval in ms. + */ + public void setCleanupInterval(int cleanupInterval) { + _cleanupInterval = cleanupInterval; + } + + /** + * Get the gossip interval. + * + * @return The gossip interval in ms. + */ + public int getGossipInterval() { + return _gossipInterval; + } + + /** + * Get the clean interval. + * + * @return The cleanup interval. + */ + public int getCleanupInterval() { + return _cleanupInterval; + } } diff --git a/src/main/java/com/google/code/gossip/GossipTimeoutTimer.java b/src/main/java/com/google/code/gossip/GossipTimeoutTimer.java index a58d330..94cc3a4 100644 --- a/src/main/java/com/google/code/gossip/GossipTimeoutTimer.java +++ b/src/main/java/com/google/code/gossip/GossipTimeoutTimer.java @@ -6,56 +6,58 @@ import javax.management.NotificationListener; import javax.management.timer.Timer; /** - * This object represents a timer for a gossip member. - * When the timer has elapsed without being reset in the meantime, it will inform the GossipService about this - * who in turn will put the gossip member on the dead list, because it is apparantly not alive anymore. + * This object represents a timer for a gossip member. When the timer has elapsed without being + * reset in the meantime, it will inform the GossipService about this who in turn will put the + * gossip member on the dead list, because it is apparantly not alive anymore. * * @author joshclemm, harmenw */ public class GossipTimeoutTimer extends Timer { - /** The amount of time this timer waits before generating a wake-up event. */ - private long _sleepTime; + /** The amount of time this timer waits before generating a wake-up event. */ + private long _sleepTime; - /** The gossip member this timer is for. */ - private LocalGossipMember _source; + /** The gossip member this timer is for. */ + private LocalGossipMember _source; - /** - * Constructor. - * Creates a reset-able timer that wakes up after millisecondsSleepTime. - * @param millisecondsSleepTime The time for this timer to wait before an event. - * @param service - * @param member - */ - public GossipTimeoutTimer(long millisecondsSleepTime, NotificationListener notificationListener, LocalGossipMember member) { - super(); - _sleepTime = millisecondsSleepTime; - _source = member; - addNotificationListener(notificationListener, null, null); - } + /** + * Constructor. Creates a reset-able timer that wakes up after millisecondsSleepTime. + * + * @param millisecondsSleepTime + * The time for this timer to wait before an event. + * @param service + * @param member + */ + public GossipTimeoutTimer(long millisecondsSleepTime, NotificationListener notificationListener, + LocalGossipMember member) { + super(); + _sleepTime = millisecondsSleepTime; + _source = member; + addNotificationListener(notificationListener, null, null); + } - /** - * @see javax.management.timer.Timer#start() - */ - public void start() { - this.reset(); - super.start(); - } + /** + * @see javax.management.timer.Timer#start() + */ + public void start() { + this.reset(); + super.start(); + } - /** - * Resets timer to start counting down from original time. - */ - public void reset() { - removeAllNotifications(); - setWakeupTime(_sleepTime); - } + /** + * Resets timer to start counting down from original time. + */ + public void reset() { + removeAllNotifications(); + setWakeupTime(_sleepTime); + } - /** - * Adds a new wake-up time for this timer. - * @param milliseconds - */ - private void setWakeupTime(long milliseconds) { - addNotification("type", "message", _source, new Date(System.currentTimeMillis()+milliseconds)); - } + /** + * Adds a new wake-up time for this timer. + * + * @param milliseconds + */ + private void setWakeupTime(long milliseconds) { + addNotification("type", "message", _source, new Date(System.currentTimeMillis() + milliseconds)); + } } - diff --git a/src/main/java/com/google/code/gossip/LocalGossipMember.java b/src/main/java/com/google/code/gossip/LocalGossipMember.java index 9c08856..6d040ae 100644 --- a/src/main/java/com/google/code/gossip/LocalGossipMember.java +++ b/src/main/java/com/google/code/gossip/LocalGossipMember.java @@ -3,40 +3,47 @@ package com.google.code.gossip; import javax.management.NotificationListener; /** - * This object represent a gossip member with the properties known locally. - * These objects are stored in the local list of gossip member.s + * This object represent a gossip member with the properties known locally. These objects are stored + * in the local list of gossip member.s * * @author harmenw */ public class LocalGossipMember extends GossipMember { - /** The timeout timer for this gossip member. */ - private transient GossipTimeoutTimer timeoutTimer; + /** The timeout timer for this gossip member. */ + private transient GossipTimeoutTimer timeoutTimer; - /** - * Constructor. - * @param host The hostname or IP address. - * @param port The port number. - * @param heartbeat The current heartbeat. - * @param gossipService The GossipService object. - * @param cleanupTimeout The cleanup timeout for this gossip member. - */ - public LocalGossipMember(String hostname, int port, String id, int heartbeat, NotificationListener notificationListener, int cleanupTimeout) { - super(hostname, port, id, heartbeat); - - this.timeoutTimer = new GossipTimeoutTimer(cleanupTimeout, notificationListener, this); - } + /** + * Constructor. + * + * @param host + * The hostname or IP address. + * @param port + * The port number. + * @param heartbeat + * The current heartbeat. + * @param gossipService + * The GossipService object. + * @param cleanupTimeout + * The cleanup timeout for this gossip member. + */ + public LocalGossipMember(String hostname, int port, String id, int heartbeat, + NotificationListener notificationListener, int cleanupTimeout) { + super(hostname, port, id, heartbeat); - /** - * Start the timeout timer. - */ - public void startTimeoutTimer() { - this.timeoutTimer.start(); - } + this.timeoutTimer = new GossipTimeoutTimer(cleanupTimeout, notificationListener, this); + } - /** - * Reset the timeout timer. - */ - public void resetTimeoutTimer() { - this.timeoutTimer.reset(); - } + /** + * Start the timeout timer. + */ + public void startTimeoutTimer() { + this.timeoutTimer.start(); + } + + /** + * Reset the timeout timer. + */ + public void resetTimeoutTimer() { + this.timeoutTimer.reset(); + } } diff --git a/src/main/java/com/google/code/gossip/LogLevel.java b/src/main/java/com/google/code/gossip/LogLevel.java index d486df4..bb22dac 100644 --- a/src/main/java/com/google/code/gossip/LogLevel.java +++ b/src/main/java/com/google/code/gossip/LogLevel.java @@ -2,24 +2,26 @@ package com.google.code.gossip; public class LogLevel { - public static final String CONFIG_ERROR = "ERROR"; - - public static final String CONFIG_INFO = "INFO"; - - public static final String CONFIG_DEBUG = "DEBUG"; - - public static final int ERROR = 1; - public static final int INFO = 2; - public static final int DEBUG = 3; - - public static int fromString(String logLevel) { - if (logLevel.equals(CONFIG_ERROR)) - return ERROR; - else if (logLevel.equals(CONFIG_INFO)) - return INFO; - else if (logLevel.equals(CONFIG_DEBUG)) - return DEBUG; - else - return INFO; - } + public static final String CONFIG_ERROR = "ERROR"; + + public static final String CONFIG_INFO = "INFO"; + + public static final String CONFIG_DEBUG = "DEBUG"; + + public static final int ERROR = 1; + + public static final int INFO = 2; + + public static final int DEBUG = 3; + + public static int fromString(String logLevel) { + if (logLevel.equals(CONFIG_ERROR)) + return ERROR; + else if (logLevel.equals(CONFIG_INFO)) + return INFO; + else if (logLevel.equals(CONFIG_DEBUG)) + return DEBUG; + else + return INFO; + } } diff --git a/src/main/java/com/google/code/gossip/RemoteGossipMember.java b/src/main/java/com/google/code/gossip/RemoteGossipMember.java index 37cdb35..f42f699 100644 --- a/src/main/java/com/google/code/gossip/RemoteGossipMember.java +++ b/src/main/java/com/google/code/gossip/RemoteGossipMember.java @@ -1,28 +1,36 @@ package com.google.code.gossip; /** - * The object represents a gossip member with the properties as received from a remote gossip member. + * The object represents a gossip member with the properties as received from a remote gossip + * member. * * @author harmenw */ public class RemoteGossipMember extends GossipMember { - /** - * Constructor. - * @param host The hostname or IP address. - * @param port The port number. - * @param heartbeat The current heartbeat. - */ - public RemoteGossipMember(String hostname, int port, String id, int heartbeat) { - super(hostname, port, id, heartbeat); - } - - /** - * Construct a RemoteGossipMember with a heartbeat of 0. - * @param host The hostname or IP address. - * @param port The port number. - */ - public RemoteGossipMember(String hostname, int port, String id) { - super(hostname, port, id, 0); - } + /** + * Constructor. + * + * @param host + * The hostname or IP address. + * @param port + * The port number. + * @param heartbeat + * The current heartbeat. + */ + public RemoteGossipMember(String hostname, int port, String id, int heartbeat) { + super(hostname, port, id, heartbeat); + } + + /** + * Construct a RemoteGossipMember with a heartbeat of 0. + * + * @param host + * The hostname or IP address. + * @param port + * The port number. + */ + public RemoteGossipMember(String hostname, int port, String id) { + super(hostname, port, id, 0); + } } diff --git a/src/main/java/com/google/code/gossip/StartupSettings.java b/src/main/java/com/google/code/gossip/StartupSettings.java index 3fa261d..2e558ef 100644 --- a/src/main/java/com/google/code/gossip/StartupSettings.java +++ b/src/main/java/com/google/code/gossip/StartupSettings.java @@ -17,145 +17,168 @@ import org.json.JSONObject; * @author harmenw */ public class StartupSettings { - - /** The port to start the gossip service on. */ - private int _port; - - /** The logging level of the gossip service. */ - private int _logLevel; - - /** The gossip settings used at startup. */ - private GossipSettings _gossipSettings; - - /** The list with gossip members to start with. */ - private ArrayList _gossipMembers; - - /** - * Constructor. - * @param port The port to start the service on. - */ - public StartupSettings(int port, int logLevel) { - this(port, logLevel, new GossipSettings()); - } - - /** - * Constructor. - * @param port The port to start the service on. - */ - public StartupSettings(int port, int logLevel, GossipSettings gossipSettings) { - _port = port; - _logLevel = logLevel; - _gossipSettings = gossipSettings; - _gossipMembers = new ArrayList(); - } - - /** - * Set the port of the gossip service. - * @param port The port for the gossip service. - */ - public void setPort(int port) { - _port = port; - } - - /** - * Get the port for the gossip service. - * @return The port of the gossip service. - */ - public int getPort() { - return _port; - } - - /** - * Set the log level of the gossip service. - * @param logLevel The log level({LogLevel}). - */ - public void setLogLevel(int logLevel) { - _logLevel = logLevel; - } - - /** - * Get the log level of the gossip service. - * @return The log level. - */ - public int getLogLevel() { - return _logLevel; - } - - /** - * Get the GossipSettings. - * @return The GossipSettings object. - */ - public GossipSettings getGossipSettings() { - return _gossipSettings; - } - - /** - * Add a gossip member to the list of members to start with. - * @param member The member to add. - */ - public void addGossipMember(GossipMember member) { - _gossipMembers.add(member); - } - - /** - * Get the list with gossip members. - * @return The gossip members. - */ - public ArrayList getGossipMembers() { - return _gossipMembers; - } - - /** - * Parse the settings for the gossip service from a JSON file. - * @param jsonFile The file object which refers to the JSON config file. - * @return The StartupSettings object with the settings from the config file. - * @throws JSONException Thrown when the file is not well-formed JSON. - * @throws FileNotFoundException Thrown when the file cannot be found. - * @throws IOException Thrown when reading the file gives problems. - */ - public static StartupSettings fromJSONFile(File jsonFile) throws JSONException, FileNotFoundException, IOException { - // Read the file to a String. - BufferedReader br = new BufferedReader(new FileReader(jsonFile)); - StringBuffer buffer = new StringBuffer(); - String line; - while((line = br.readLine()) != null) { - buffer.append(line.trim()); - } - - // Lets parse the String as JSON. - JSONObject jsonObject = new JSONArray(buffer.toString()).getJSONObject(0); - - // Now get the port number. - int port = jsonObject.getInt("port"); - // Get the log level from the config file. - int logLevel = LogLevel.fromString(jsonObject.getString("log_level")); - - // Get the gossip_interval from the config file. - int gossipInterval = jsonObject.getInt("gossip_interval"); - - // Get the cleanup_interval from the config file. - int cleanupInterval = jsonObject.getInt("cleanup_interval"); - - System.out.println("Config [port: " + port + ", log_level: " + logLevel + ", gossip_interval: " + gossipInterval + ", cleanup_interval: " + cleanupInterval + "]"); - - // Initiate the settings with the port number. - StartupSettings settings = new StartupSettings(port, logLevel, new GossipSettings(gossipInterval, cleanupInterval)); - - // Now iterate over the members from the config file and add them to the settings. - System.out.print("Config-members ["); - JSONArray membersJSON = jsonObject.getJSONArray("members"); - for (int i=0; i _gossipMembers; + + /** + * Constructor. + * + * @param port + * The port to start the service on. + */ + public StartupSettings(int port, int logLevel) { + this(port, logLevel, new GossipSettings()); + } + + /** + * Constructor. + * + * @param port + * The port to start the service on. + */ + public StartupSettings(int port, int logLevel, GossipSettings gossipSettings) { + _port = port; + _logLevel = logLevel; + _gossipSettings = gossipSettings; + _gossipMembers = new ArrayList(); + } + + /** + * Set the port of the gossip service. + * + * @param port + * The port for the gossip service. + */ + public void setPort(int port) { + _port = port; + } + + /** + * Get the port for the gossip service. + * + * @return The port of the gossip service. + */ + public int getPort() { + return _port; + } + + /** + * Set the log level of the gossip service. + * + * @param logLevel + * The log level({LogLevel}). + */ + public void setLogLevel(int logLevel) { + _logLevel = logLevel; + } + + /** + * Get the log level of the gossip service. + * + * @return The log level. + */ + public int getLogLevel() { + return _logLevel; + } + + /** + * Get the GossipSettings. + * + * @return The GossipSettings object. + */ + public GossipSettings getGossipSettings() { + return _gossipSettings; + } + + /** + * Add a gossip member to the list of members to start with. + * + * @param member + * The member to add. + */ + public void addGossipMember(GossipMember member) { + _gossipMembers.add(member); + } + + /** + * Get the list with gossip members. + * + * @return The gossip members. + */ + public ArrayList getGossipMembers() { + return _gossipMembers; + } + + /** + * Parse the settings for the gossip service from a JSON file. + * + * @param jsonFile + * The file object which refers to the JSON config file. + * @return The StartupSettings object with the settings from the config file. + * @throws JSONException + * Thrown when the file is not well-formed JSON. + * @throws FileNotFoundException + * Thrown when the file cannot be found. + * @throws IOException + * Thrown when reading the file gives problems. + */ + public static StartupSettings fromJSONFile(File jsonFile) throws JSONException, + FileNotFoundException, IOException { + // Read the file to a String. + BufferedReader br = new BufferedReader(new FileReader(jsonFile)); + StringBuffer buffer = new StringBuffer(); + String line; + while ((line = br.readLine()) != null) { + buffer.append(line.trim()); + } + + // Lets parse the String as JSON. + JSONObject jsonObject = new JSONArray(buffer.toString()).getJSONObject(0); + + // Now get the port number. + int port = jsonObject.getInt("port"); + + // Get the log level from the config file. + int logLevel = LogLevel.fromString(jsonObject.getString("log_level")); + + // Get the gossip_interval from the config file. + int gossipInterval = jsonObject.getInt("gossip_interval"); + + // Get the cleanup_interval from the config file. + int cleanupInterval = jsonObject.getInt("cleanup_interval"); + + System.out.println("Config [port: " + port + ", log_level: " + logLevel + ", gossip_interval: " + + gossipInterval + ", cleanup_interval: " + cleanupInterval + "]"); + + // Initiate the settings with the port number. + StartupSettings settings = new StartupSettings(port, logLevel, new GossipSettings( + gossipInterval, cleanupInterval)); + + // Now iterate over the members from the config file and add them to the settings. + System.out.print("Config-members ["); + JSONArray membersJSON = jsonObject.getJSONArray("members"); + for (int i = 0; i < membersJSON.length(); i++) { + JSONObject memberJSON = membersJSON.getJSONObject(i); + RemoteGossipMember member = new RemoteGossipMember(memberJSON.getString("host"), + memberJSON.getInt("port"), ""); + settings.addGossipMember(member); + System.out.print(member.getAddress()); + if (i < (membersJSON.length() - 1)) + System.out.print(", "); + } + System.out.println("]"); + + // Return the created settings object. + return settings; + } } diff --git a/src/main/java/com/google/code/gossip/examples/GossipExample.java b/src/main/java/com/google/code/gossip/examples/GossipExample.java index 6fb1cd5..abc28e1 100644 --- a/src/main/java/com/google/code/gossip/examples/GossipExample.java +++ b/src/main/java/com/google/code/gossip/examples/GossipExample.java @@ -1,6 +1,5 @@ package com.google.code.gossip.examples; - import java.net.InetAddress; import java.net.UnknownHostException; import java.util.ArrayList; @@ -12,67 +11,70 @@ import com.google.code.gossip.LogLevel; import com.google.code.gossip.RemoteGossipMember; /** - * This class is an example of how one could use the gossip service. - * Here we start multiple gossip clients on this host as specified in the config file. + * This class is an example of how one could use the gossip service. Here we start multiple gossip + * clients on this host as specified in the config file. * * @author harmenw */ public class GossipExample extends Thread { - /** The number of clients to start. */ - private static final int NUMBER_OF_CLIENTS = 4; + /** The number of clients to start. */ + private static final int NUMBER_OF_CLIENTS = 4; - /** - * @param args - */ - public static void main(String[] args) { - new GossipExample(); - } - - /** - * Constructor. - * This will start the this thread. - */ - public GossipExample() { - start(); - } + /** + * @param args + */ + public static void main(String[] args) { + new GossipExample(); + } - /** - * @see java.lang.Thread#run() - */ - public void run() { - try { - GossipSettings settings = new GossipSettings(); - - ArrayList clients = new ArrayList(); - - // Get my ip address. - String myIpAddress = InetAddress.getLocalHost().getHostAddress(); - - // Create the gossip members and put them in a list and give them a port number starting with 2000. - ArrayList startupMembers = new ArrayList(); - for (int i=0; i clients = new ArrayList(); + + // Get my ip address. + String myIpAddress = InetAddress.getLocalHost().getHostAddress(); + + // Create the gossip members and put them in a list and give them a port number starting with + // 2000. + ArrayList startupMembers = new ArrayList(); + for (int i = 0; i < NUMBER_OF_CLIENTS; ++i) { + startupMembers.add(new RemoteGossipMember(myIpAddress, 2000 + i, "")); + } + + // Lets start the gossip clients. + // Start the clients, waiting cleaning-interval + 1 second between them which will show the + // dead list handling. + for (GossipMember member : startupMembers) { + GossipService gossipService = new GossipService(myIpAddress, member.getPort(), "", + LogLevel.DEBUG, startupMembers, settings); + clients.add(gossipService); + gossipService.start(); + sleep(settings.getCleanupInterval() + 1000); + } + + // After starting all gossip clients, first wait 10 seconds and then shut them down. + sleep(10000); + System.err.println("Going to shutdown all services..."); + // Since they all run in the same virtual machine and share the same executor, if one is + // shutdown they will all stop. + clients.get(0).shutdown(); + + } catch (UnknownHostException e) { + e.printStackTrace(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } } diff --git a/src/main/java/com/google/code/gossip/manager/ActiveGossipThread.java b/src/main/java/com/google/code/gossip/manager/ActiveGossipThread.java index 49516d5..e5ab754 100644 --- a/src/main/java/com/google/code/gossip/manager/ActiveGossipThread.java +++ b/src/main/java/com/google/code/gossip/manager/ActiveGossipThread.java @@ -8,51 +8,52 @@ import com.google.code.gossip.GossipService; import com.google.code.gossip.LocalGossipMember; /** - * [The active thread: periodically send gossip request.] - * The class handles gossiping the membership list. - * This information is important to maintaining a common - * state among all the nodes, and is important for detecting - * failures. + * [The active thread: periodically send gossip request.] The class handles gossiping the membership + * list. This information is important to maintaining a common state among all the nodes, and is + * important for detecting failures. */ abstract public class ActiveGossipThread implements Runnable { - - private GossipManager _gossipManager; - - private final AtomicBoolean _keepRunning; - public ActiveGossipThread(GossipManager gossipManager) { - _gossipManager = gossipManager; - _keepRunning = new AtomicBoolean(true); - } + private GossipManager _gossipManager; - @Override - public void run() { - while(_keepRunning.get()) { - try { - TimeUnit.MILLISECONDS.sleep(_gossipManager.getSettings().getGossipInterval()); - sendMembershipList(_gossipManager.getMyself(), _gossipManager.getMemberList()); - } catch (InterruptedException e) { - GossipService.LOGGER.error(e); - _keepRunning.set(false); - } - } - shutdown(); - } - - public void shutdown(){ - _keepRunning.set(false); - } - /** - * Performs the sending of the membership list, after we have - * incremented our own heartbeat. - */ - abstract protected void sendMembershipList(LocalGossipMember me, ArrayList memberList); + private final AtomicBoolean _keepRunning; - /** - * Abstract method which should be implemented by a subclass. - * This method should return a member of the list to gossip with. - * @param memberList The list of members which are stored in the local list of members. - * @return The chosen LocalGossipMember to gossip with. - */ - abstract protected LocalGossipMember selectPartner(ArrayList memberList); + public ActiveGossipThread(GossipManager gossipManager) { + _gossipManager = gossipManager; + _keepRunning = new AtomicBoolean(true); + } + + @Override + public void run() { + while (_keepRunning.get()) { + try { + TimeUnit.MILLISECONDS.sleep(_gossipManager.getSettings().getGossipInterval()); + sendMembershipList(_gossipManager.getMyself(), _gossipManager.getMemberList()); + } catch (InterruptedException e) { + GossipService.LOGGER.error(e); + _keepRunning.set(false); + } + } + shutdown(); + } + + public void shutdown() { + _keepRunning.set(false); + } + + /** + * Performs the sending of the membership list, after we have incremented our own heartbeat. + */ + abstract protected void sendMembershipList(LocalGossipMember me, + ArrayList memberList); + + /** + * Abstract method which should be implemented by a subclass. This method should return a member + * of the list to gossip with. + * + * @param memberList + * The list of members which are stored in the local list of members. + * @return The chosen LocalGossipMember to gossip with. + */ + abstract protected LocalGossipMember selectPartner(ArrayList memberList); } diff --git a/src/main/java/com/google/code/gossip/manager/GossipManager.java b/src/main/java/com/google/code/gossip/manager/GossipManager.java index f0e3356..99e308c 100644 --- a/src/main/java/com/google/code/gossip/manager/GossipManager.java +++ b/src/main/java/com/google/code/gossip/manager/GossipManager.java @@ -16,142 +16,147 @@ import com.google.code.gossip.GossipSettings; import com.google.code.gossip.LocalGossipMember; public abstract class GossipManager extends Thread implements NotificationListener { - /** The maximal number of bytes the packet with the GOSSIP may be. (Default is 100 kb) */ - public static final int MAX_PACKET_SIZE = 102400; - - /** The list of members which are in the gossip group (not including myself). */ - private ArrayList _memberList; + /** The maximal number of bytes the packet with the GOSSIP may be. (Default is 100 kb) */ + public static final int MAX_PACKET_SIZE = 102400; - /** The list of members which are known to be dead. */ - private ArrayList _deadList; - - /** The member I am representing. */ - private LocalGossipMember _me; - - /** The settings for gossiping. */ - private GossipSettings _settings; - - /** A boolean whether the gossip service should keep running. */ - private AtomicBoolean _gossipServiceRunning; + /** The list of members which are in the gossip group (not including myself). */ + private ArrayList _memberList; + + /** The list of members which are known to be dead. */ + private ArrayList _deadList; + + /** The member I am representing. */ + private LocalGossipMember _me; + + /** The settings for gossiping. */ + private GossipSettings _settings; + + /** A boolean whether the gossip service should keep running. */ + private AtomicBoolean _gossipServiceRunning; + + /** A ExecutorService used for executing the active and passive gossip threads. */ + private ExecutorService _gossipThreadExecutor; + + private Class _passiveGossipThreadClass; + + private PassiveGossipThread passiveGossipThread; + + private Class _activeGossipThreadClass; + + private ActiveGossipThread activeGossipThread; - /** A ExecutorService used for executing the active and passive gossip threads. */ - private ExecutorService _gossipThreadExecutor; - - private Class _passiveGossipThreadClass; - private PassiveGossipThread passiveGossipThread; - - private Class _activeGossipThreadClass; - private ActiveGossipThread activeGossipThread; - public GossipManager(Class passiveGossipThreadClass, Class activeGossipThreadClass, String address, int port, String id, GossipSettings settings, ArrayList gossipMembers) { - _passiveGossipThreadClass = passiveGossipThreadClass; - _activeGossipThreadClass = activeGossipThreadClass; - _settings = settings; - _me = new LocalGossipMember(address, port, id, 0, this, settings.getCleanupInterval()); - _memberList = new ArrayList(); - _deadList = new ArrayList(); - for (GossipMember startupMember : gossipMembers) { - if (!startupMember.equals(_me)) { + _passiveGossipThreadClass = passiveGossipThreadClass; + _activeGossipThreadClass = activeGossipThreadClass; + _settings = settings; + _me = new LocalGossipMember(address, port, id, 0, this, settings.getCleanupInterval()); + _memberList = new ArrayList(); + _deadList = new ArrayList(); + for (GossipMember startupMember : gossipMembers) { + if (!startupMember.equals(_me)) { LocalGossipMember member = new LocalGossipMember(startupMember.getHost(), startupMember.getPort(), startupMember.getId(), 0, this, settings.getCleanupInterval()); - _memberList.add(member); - GossipService.LOGGER.debug(member); - } - } - - _gossipServiceRunning = new AtomicBoolean(true); - Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { - public void run() { - GossipService.LOGGER.info("Service has been shutdown..."); - } - })); - } - - /** - * All timers associated with a member will trigger this method when it goes - * off. The timer will go off if we have not heard from this member in - * _settings.T_CLEANUP time. - */ - @Override - public void handleNotification(Notification notification, Object handback) { - LocalGossipMember deadMember = (LocalGossipMember) notification.getUserData(); - GossipService.LOGGER.info("Dead member detected: " + deadMember); - synchronized (this._memberList) { - this._memberList.remove(deadMember); - } - synchronized (this._deadList) { - this._deadList.add(deadMember); - } - } + _memberList.add(member); + GossipService.LOGGER.debug(member); + } + } - public GossipSettings getSettings() { - return _settings; - } + _gossipServiceRunning = new AtomicBoolean(true); + Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { + public void run() { + GossipService.LOGGER.info("Service has been shutdown..."); + } + })); + } - /** - * Get a clone of the memberlist. - * @return - */ - public ArrayList getMemberList() { - return _memberList; - } - - public LocalGossipMember getMyself() { - return _me; - } + /** + * All timers associated with a member will trigger this method when it goes off. The timer will + * go off if we have not heard from this member in _settings.T_CLEANUP time. + */ + @Override + public void handleNotification(Notification notification, Object handback) { + LocalGossipMember deadMember = (LocalGossipMember) notification.getUserData(); + GossipService.LOGGER.info("Dead member detected: " + deadMember); + synchronized (this._memberList) { + this._memberList.remove(deadMember); + } + synchronized (this._deadList) { + this._deadList.add(deadMember); + } + } - public ArrayList getDeadList() { - return _deadList; - } - - /** - * Starts the client. Specifically, start the various cycles for this protocol. - * Start the gossip thread and start the receiver thread. - * @throws InterruptedException - */ - public void run() { - for (LocalGossipMember member : _memberList) { - if (member != _me) { - member.startTimeoutTimer(); - } - } + public GossipSettings getSettings() { + return _settings; + } + + /** + * Get a clone of the memberlist. + * + * @return + */ + public ArrayList getMemberList() { + return _memberList; + } + + public LocalGossipMember getMyself() { + return _me; + } + + public ArrayList getDeadList() { + return _deadList; + } + + /** + * Starts the client. Specifically, start the various cycles for this protocol. Start the gossip + * thread and start the receiver thread. + * + * @throws InterruptedException + */ + public void run() { + for (LocalGossipMember member : _memberList) { + if (member != _me) { + member.startTimeoutTimer(); + } + } _gossipThreadExecutor = Executors.newCachedThreadPool(); try { - passiveGossipThread = _passiveGossipThreadClass.getConstructor(GossipManager.class).newInstance(this); + passiveGossipThread = _passiveGossipThreadClass.getConstructor(GossipManager.class) + .newInstance(this); _gossipThreadExecutor.execute(passiveGossipThread); - activeGossipThread = _activeGossipThreadClass.getConstructor(GossipManager.class).newInstance(this); + activeGossipThread = _activeGossipThreadClass.getConstructor(GossipManager.class) + .newInstance(this); _gossipThreadExecutor.execute(activeGossipThread); } catch (InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException | NoSuchMethodException | SecurityException e1) { throw new RuntimeException(e1); } - GossipService.LOGGER.info("The GossipService is started."); - while(_gossipServiceRunning.get()) { - try { - //TODO - TimeUnit.MILLISECONDS.sleep(1); - } catch (InterruptedException e) { - GossipService.LOGGER.info("The GossipClient was interrupted."); - } - } - } - - /** - * Shutdown the gossip service. - */ - public void shutdown() { - _gossipThreadExecutor.shutdown(); - passiveGossipThread.shutdown(); - activeGossipThread.shutdown(); - try { + GossipService.LOGGER.info("The GossipService is started."); + while (_gossipServiceRunning.get()) { + try { + // TODO + TimeUnit.MILLISECONDS.sleep(1); + } catch (InterruptedException e) { + GossipService.LOGGER.info("The GossipClient was interrupted."); + } + } + } + + /** + * Shutdown the gossip service. + */ + public void shutdown() { + _gossipThreadExecutor.shutdown(); + passiveGossipThread.shutdown(); + activeGossipThread.shutdown(); + try { boolean result = _gossipThreadExecutor.awaitTermination(1000, TimeUnit.MILLISECONDS); System.err.println("Terminate retuned " + result); } catch (InterruptedException e) { e.printStackTrace(); } - _gossipServiceRunning.set(false); - } + _gossipServiceRunning.set(false); + } } diff --git a/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java b/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java index 7992bf5..08342b4 100644 --- a/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java +++ b/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java @@ -18,36 +18,36 @@ import com.google.code.gossip.GossipService; import com.google.code.gossip.RemoteGossipMember; /** - * [The passive thread: reply to incoming gossip request.] - * This class handles the passive cycle, where this client - * has received an incoming message. For now, this message - * is always the membership list, but if you choose to gossip - * additional information, you will need some logic to determine - * the incoming message. + * [The passive thread: reply to incoming gossip request.] This class handles the passive cycle, + * where this client has received an incoming message. For now, this message is always the + * membership list, but if you choose to gossip additional information, you will need some logic to + * determine the incoming message. */ abstract public class PassiveGossipThread implements Runnable { - - /** The socket used for the passive thread of the gossip service. */ - private DatagramSocket _server; - - private GossipManager _gossipManager; - private AtomicBoolean _keepRunning; + /** The socket used for the passive thread of the gossip service. */ + private DatagramSocket _server; - public PassiveGossipThread(GossipManager gossipManager) { - _gossipManager = gossipManager; - try { - SocketAddress socketAddress = new InetSocketAddress(_gossipManager.getMyself().getHost(), _gossipManager.getMyself().getPort()); - _server = new DatagramSocket(socketAddress); - GossipService.LOGGER.info("Gossip service successfully initialized on port " + _gossipManager.getMyself().getPort()); - GossipService.LOGGER.debug("I am " + _gossipManager.getMyself()); - } catch (SocketException ex) { - GossipService.LOGGER.error(ex); - _server = null; - throw new RuntimeException(ex); - } - _keepRunning = new AtomicBoolean(true); - } + private GossipManager _gossipManager; + + private AtomicBoolean _keepRunning; + + public PassiveGossipThread(GossipManager gossipManager) { + _gossipManager = gossipManager; + try { + SocketAddress socketAddress = new InetSocketAddress(_gossipManager.getMyself().getHost(), + _gossipManager.getMyself().getPort()); + _server = new DatagramSocket(socketAddress); + GossipService.LOGGER.info("Gossip service successfully initialized on port " + + _gossipManager.getMyself().getPort()); + GossipService.LOGGER.debug("I am " + _gossipManager.getMyself()); + } catch (SocketException ex) { + GossipService.LOGGER.error(ex); + _server = null; + throw new RuntimeException(ex); + } + _keepRunning = new AtomicBoolean(true); + } @Override public void run() { @@ -127,16 +127,22 @@ abstract public class PassiveGossipThread implements Runnable { } shutdown(); } - - public void shutdown(){ + + public void shutdown() { _server.close(); } - - /** - * Abstract method for merging the local and remote list. - * @param gossipManager The GossipManager for retrieving the local members and dead members list. - * @param senderMember The member who is sending this list, this could be used to send a response if the remote list contains out-dated information. - * @param remoteList The list of members known at the remote side. - */ - abstract protected void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember, ArrayList remoteList); + + /** + * Abstract method for merging the local and remote list. + * + * @param gossipManager + * The GossipManager for retrieving the local members and dead members list. + * @param senderMember + * The member who is sending this list, this could be used to send a response if the + * remote list contains out-dated information. + * @param remoteList + * The list of members known at the remote side. + */ + abstract protected void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember, + ArrayList remoteList); } \ No newline at end of file diff --git a/src/main/java/com/google/code/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java b/src/main/java/com/google/code/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java index e915240..206d5c5 100644 --- a/src/main/java/com/google/code/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java +++ b/src/main/java/com/google/code/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java @@ -11,83 +11,101 @@ import com.google.code.gossip.manager.PassiveGossipThread; public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread { - public OnlyProcessReceivedPassiveGossipThread(GossipManager gossipManager) { - super(gossipManager); - } + public OnlyProcessReceivedPassiveGossipThread(GossipManager gossipManager) { + super(gossipManager); + } - /** - * Merge remote list (received from peer), and our local member list. - * Simply, we must update the heartbeats that the remote list has with - * our list. Also, some additional logic is needed to make sure we have - * not timed out a member and then immediately received a list with that - * member. - * @param remoteList - */ - protected void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember, ArrayList remoteList) { + /** + * Merge remote list (received from peer), and our local member list. Simply, we must update the + * heartbeats that the remote list has with our list. Also, some additional logic is needed to + * make sure we have not timed out a member and then immediately received a list with that member. + * + * @param remoteList + */ + protected void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember, + ArrayList remoteList) { - synchronized (gossipManager.getDeadList()) { + synchronized (gossipManager.getDeadList()) { - synchronized (gossipManager.getMemberList()) { + synchronized (gossipManager.getMemberList()) { - for (GossipMember remoteMember : remoteList) { - // Skip myself. We don't want ourselves in the local member list. - if (!remoteMember.equals(gossipManager.getMyself())) { - if (gossipManager.getMemberList().contains(remoteMember)) { - GossipService.LOGGER.debug("The local list already contains the remote member (" + remoteMember + ")."); - // The local memberlist contains the remote member. - LocalGossipMember localMember = gossipManager.getMemberList().get(gossipManager.getMemberList().indexOf(remoteMember)); + for (GossipMember remoteMember : remoteList) { + // Skip myself. We don't want ourselves in the local member list. + if (!remoteMember.equals(gossipManager.getMyself())) { + if (gossipManager.getMemberList().contains(remoteMember)) { + GossipService.LOGGER.debug("The local list already contains the remote member (" + + remoteMember + ")."); + // The local memberlist contains the remote member. + LocalGossipMember localMember = gossipManager.getMemberList().get( + gossipManager.getMemberList().indexOf(remoteMember)); - // Let's synchronize it's heartbeat. - if (remoteMember.getHeartbeat() > localMember.getHeartbeat()) { - // update local list with latest heartbeat - localMember.setHeartbeat(remoteMember.getHeartbeat()); - // and reset the timeout of that member - localMember.resetTimeoutTimer(); - } - // TODO: Otherwise, should we inform the other when the heartbeat is already higher? - } else { - // The local list does not contain the remote member. - GossipService.LOGGER.debug("The local list does not contain the remote member (" + remoteMember + ")."); + // Let's synchronize it's heartbeat. + if (remoteMember.getHeartbeat() > localMember.getHeartbeat()) { + // update local list with latest heartbeat + localMember.setHeartbeat(remoteMember.getHeartbeat()); + // and reset the timeout of that member + localMember.resetTimeoutTimer(); + } + // TODO: Otherwise, should we inform the other when the heartbeat is already higher? + } else { + // The local list does not contain the remote member. + GossipService.LOGGER.debug("The local list does not contain the remote member (" + + remoteMember + ")."); + + // The remote member is either brand new, or a previously declared dead member. + // If its dead, check the heartbeat because it may have come back from the dead. + if (gossipManager.getDeadList().contains(remoteMember)) { + // The remote member is known here as a dead member. + GossipService.LOGGER.debug("The remote member is known here as a dead member."); + LocalGossipMember localDeadMember = gossipManager.getDeadList().get( + gossipManager.getDeadList().indexOf(remoteMember)); + // If a member is restarted the heartbeat will restart from 1, so we should check + // that here. + // So a member can become from the dead when it is either larger than a previous + // heartbeat (due to network failure) + // or when the heartbeat is 1 (after a restart of the service). + // TODO: What if the first message of a gossip service is sent to a dead node? The + // second member will receive a heartbeat of two. + // TODO: The above does happen. Maybe a special message for a revived member? + // TODO: Or maybe when a member is declared dead for more than + // _settings.getCleanupInterval() ms, reset the heartbeat to 0. + // It will then accept a revived member. + // The above is now handle by checking whether the heartbeat differs + // _settings.getCleanupInterval(), it must be restarted. + if (remoteMember.getHeartbeat() == 1 + || ((localDeadMember.getHeartbeat() - remoteMember.getHeartbeat()) * -1) > (gossipManager + .getSettings().getCleanupInterval() / 1000) + || remoteMember.getHeartbeat() > localDeadMember.getHeartbeat()) { + GossipService.LOGGER + .debug("The remote member is back from the dead. We will remove it from the dead list and add it as a new member."); + // The remote member is back from the dead. + // Remove it from the dead list. + gossipManager.getDeadList().remove(localDeadMember); + // Add it as a new member and add it to the member list. + LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getHost(), + remoteMember.getPort(), remoteMember.getId(), + remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings() + .getCleanupInterval()); + gossipManager.getMemberList().add(newLocalMember); + newLocalMember.startTimeoutTimer(); + GossipService.LOGGER.info("Removed remote member " + remoteMember.getAddress() + + " from dead list and added to local member list."); + } + } else { + // Brand spanking new member - welcome. + LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getHost(), + remoteMember.getPort(), remoteMember.getId(), remoteMember.getHeartbeat(), + gossipManager, gossipManager.getSettings().getCleanupInterval()); + gossipManager.getMemberList().add(newLocalMember); + newLocalMember.startTimeoutTimer(); + GossipService.LOGGER.info("Added new remote member " + remoteMember.getAddress() + + " to local member list."); + } + } + } + } + } + } + } - // The remote member is either brand new, or a previously declared dead member. - // If its dead, check the heartbeat because it may have come back from the dead. - if (gossipManager.getDeadList().contains(remoteMember)) { - // The remote member is known here as a dead member. - GossipService.LOGGER.debug("The remote member is known here as a dead member."); - LocalGossipMember localDeadMember = gossipManager.getDeadList().get(gossipManager.getDeadList().indexOf(remoteMember)); - // If a member is restarted the heartbeat will restart from 1, so we should check that here. - // So a member can become from the dead when it is either larger than a previous heartbeat (due to network failure) - // or when the heartbeat is 1 (after a restart of the service). - // TODO: What if the first message of a gossip service is sent to a dead node? The second member will receive a heartbeat of two. - // TODO: The above does happen. Maybe a special message for a revived member? - // TODO: Or maybe when a member is declared dead for more than _settings.getCleanupInterval() ms, reset the heartbeat to 0. - // It will then accept a revived member. - // The above is now handle by checking whether the heartbeat differs _settings.getCleanupInterval(), it must be restarted. - if (remoteMember.getHeartbeat() == 1 - || ((localDeadMember.getHeartbeat() - remoteMember.getHeartbeat()) * -1) > (gossipManager.getSettings().getCleanupInterval() / 1000) - || remoteMember.getHeartbeat() > localDeadMember.getHeartbeat()) { - GossipService.LOGGER.debug("The remote member is back from the dead. We will remove it from the dead list and add it as a new member."); - // The remote member is back from the dead. - // Remove it from the dead list. - gossipManager.getDeadList().remove(localDeadMember); - // Add it as a new member and add it to the member list. - LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getHost(), remoteMember.getPort(), remoteMember.getId(), remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings().getCleanupInterval()); - gossipManager.getMemberList().add(newLocalMember); - newLocalMember.startTimeoutTimer(); - GossipService.LOGGER.info("Removed remote member " + remoteMember.getAddress() + " from dead list and added to local member list."); - } - } else { - // Brand spanking new member - welcome. - LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getHost(), remoteMember.getPort(), remoteMember.getId(), remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings().getCleanupInterval()); - gossipManager.getMemberList().add(newLocalMember); - newLocalMember.startTimeoutTimer(); - GossipService.LOGGER.info("Added new remote member " + remoteMember.getAddress() + " to local member list."); - } - } - } - } - } - } - } - } diff --git a/src/main/java/com/google/code/gossip/manager/impl/SendMembersActiveGossipThread.java b/src/main/java/com/google/code/gossip/manager/impl/SendMembersActiveGossipThread.java index 0b5c9be..3e2baf9 100644 --- a/src/main/java/com/google/code/gossip/manager/impl/SendMembersActiveGossipThread.java +++ b/src/main/java/com/google/code/gossip/manager/impl/SendMembersActiveGossipThread.java @@ -16,79 +16,80 @@ import com.google.code.gossip.manager.GossipManager; abstract public class SendMembersActiveGossipThread extends ActiveGossipThread { - public SendMembersActiveGossipThread(GossipManager gossipManager) { - super(gossipManager); - } - - /** - * Performs the sending of the membership list, after we have - * incremented our own heartbeat. - */ - protected void sendMembershipList(LocalGossipMember me, ArrayList memberList) { - GossipService.LOGGER.debug("Send sendMembershipList() is called."); + public SendMembersActiveGossipThread(GossipManager gossipManager) { + super(gossipManager); + } - // Increase the heartbeat of myself by 1. - me.setHeartbeat(me.getHeartbeat() + 1); - - synchronized (memberList) { - try { - LocalGossipMember member = selectPartner(memberList); + /** + * Performs the sending of the membership list, after we have incremented our own heartbeat. + */ + protected void sendMembershipList(LocalGossipMember me, ArrayList memberList) { + GossipService.LOGGER.debug("Send sendMembershipList() is called."); + + // Increase the heartbeat of myself by 1. + me.setHeartbeat(me.getHeartbeat() + 1); + + synchronized (memberList) { + try { + LocalGossipMember member = selectPartner(memberList); + + if (member != null) { + InetAddress dest = InetAddress.getByName(member.getHost()); + + // Create a StringBuffer for the JSON message. + JSONArray jsonArray = new JSONArray(); + GossipService.LOGGER.debug("Sending memberlist to " + dest + ":" + member.getPort()); + GossipService.LOGGER.debug("---------------------"); + + // First write myself, append the JSON representation of the member to the buffer. + jsonArray.put(me.toJSONObject()); + GossipService.LOGGER.debug(me); + + // Then write the others. + for (int i = 0; i < memberList.size(); i++) { + LocalGossipMember other = memberList.get(i); + // Append the JSON representation of the member to the buffer. + jsonArray.put(other.toJSONObject()); + GossipService.LOGGER.debug(other); + } + GossipService.LOGGER.debug("---------------------"); + + // Write the objects to a byte array. + byte[] json_bytes = jsonArray.toString().getBytes(); + + int packet_length = json_bytes.length; + + if (packet_length < GossipManager.MAX_PACKET_SIZE) { + + // Convert the packet length to the byte representation of the int. + byte[] length_bytes = new byte[4]; + length_bytes[0] = (byte) (packet_length >> 24); + length_bytes[1] = (byte) ((packet_length << 8) >> 24); + length_bytes[2] = (byte) ((packet_length << 16) >> 24); + length_bytes[3] = (byte) ((packet_length << 24) >> 24); + + GossipService.LOGGER.debug("Sending message (" + packet_length + " bytes): " + + jsonArray.toString()); - if (member != null) { - InetAddress dest = InetAddress.getByName(member.getHost()); - - // Create a StringBuffer for the JSON message. - JSONArray jsonArray = new JSONArray(); - GossipService.LOGGER.debug("Sending memberlist to " + dest + ":" + member.getPort()); - GossipService.LOGGER.debug("---------------------"); - - // First write myself, append the JSON representation of the member to the buffer. - jsonArray.put(me.toJSONObject()); - GossipService.LOGGER.debug(me); - - // Then write the others. - for (int i=0; i> 24 ); - length_bytes[1] =(byte)( (packet_length << 8) >> 24 ); - length_bytes[2] =(byte)( (packet_length << 16) >> 24 ); - length_bytes[3] =(byte)( (packet_length << 24) >> 24 ); - - - GossipService.LOGGER.debug("Sending message ("+packet_length+" bytes): " + jsonArray.toString()); - ByteBuffer byteBuffer = ByteBuffer.allocate(4 + json_bytes.length); - byteBuffer.put(length_bytes); - byteBuffer.put(json_bytes); - byte[] buf = byteBuffer.array(); - - DatagramSocket socket = new DatagramSocket(); - DatagramPacket datagramPacket = new DatagramPacket(buf, buf.length, dest, member.getPort()); - socket.send(datagramPacket); - socket.close(); - } else { - GossipService.LOGGER.error("The length of the to be send message is too large (" + packet_length + " > " + GossipManager.MAX_PACKET_SIZE + ")."); - } - } + byteBuffer.put(length_bytes); + byteBuffer.put(json_bytes); + byte[] buf = byteBuffer.array(); - } catch (IOException e1) { - e1.printStackTrace(); - } - } - } + DatagramSocket socket = new DatagramSocket(); + DatagramPacket datagramPacket = new DatagramPacket(buf, buf.length, dest, + member.getPort()); + socket.send(datagramPacket); + socket.close(); + } else { + GossipService.LOGGER.error("The length of the to be send message is too large (" + + packet_length + " > " + GossipManager.MAX_PACKET_SIZE + ")."); + } + } + + } catch (IOException e1) { + e1.printStackTrace(); + } + } + } } diff --git a/src/main/java/com/google/code/gossip/manager/random/RandomActiveGossipThread.java b/src/main/java/com/google/code/gossip/manager/random/RandomActiveGossipThread.java index fa2ec70..917c362 100644 --- a/src/main/java/com/google/code/gossip/manager/random/RandomActiveGossipThread.java +++ b/src/main/java/com/google/code/gossip/manager/random/RandomActiveGossipThread.java @@ -9,30 +9,30 @@ import com.google.code.gossip.manager.GossipManager; import com.google.code.gossip.manager.impl.SendMembersActiveGossipThread; public class RandomActiveGossipThread extends SendMembersActiveGossipThread { - - /** The Random used for choosing a member to gossip with. */ - private Random _random; - public RandomActiveGossipThread(GossipManager gossipManager) { - super(gossipManager); - _random = new Random(); - } - - /** - * [The selectToSend() function.] - * Find a random peer from the local membership list. - * In the case where this client is the only member in the list, this method will return null. - * @return Member random member if list is greater than 1, null otherwise - */ - protected LocalGossipMember selectPartner(ArrayList memberList) { - LocalGossipMember member = null; - if (memberList.size() > 0) { - int randomNeighborIndex = _random.nextInt(memberList.size()); - member = memberList.get(randomNeighborIndex); - } else { - GossipService.LOGGER.debug("I am alone in this world."); - } - return member; - } + /** The Random used for choosing a member to gossip with. */ + private Random _random; + + public RandomActiveGossipThread(GossipManager gossipManager) { + super(gossipManager); + _random = new Random(); + } + + /** + * [The selectToSend() function.] Find a random peer from the local membership list. In the case + * where this client is the only member in the list, this method will return null. + * + * @return Member random member if list is greater than 1, null otherwise + */ + protected LocalGossipMember selectPartner(ArrayList memberList) { + LocalGossipMember member = null; + if (memberList.size() > 0) { + int randomNeighborIndex = _random.nextInt(memberList.size()); + member = memberList.get(randomNeighborIndex); + } else { + GossipService.LOGGER.debug("I am alone in this world."); + } + return member; + } } diff --git a/src/main/java/com/google/code/gossip/manager/random/RandomGossipManager.java b/src/main/java/com/google/code/gossip/manager/random/RandomGossipManager.java index d09bd83..8893ff5 100644 --- a/src/main/java/com/google/code/gossip/manager/random/RandomGossipManager.java +++ b/src/main/java/com/google/code/gossip/manager/random/RandomGossipManager.java @@ -8,7 +8,9 @@ import com.google.code.gossip.manager.GossipManager; import com.google.code.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread; public class RandomGossipManager extends GossipManager { - public RandomGossipManager(String address, int port, String id, GossipSettings settings, ArrayList gossipMembers) { - super(OnlyProcessReceivedPassiveGossipThread.class, RandomActiveGossipThread.class, address, port, id, settings, gossipMembers); - } + public RandomGossipManager(String address, int port, String id, GossipSettings settings, + ArrayList gossipMembers) { + super(OnlyProcessReceivedPassiveGossipThread.class, RandomActiveGossipThread.class, address, + port, id, settings, gossipMembers); + } }