Merge pull request #6 from joprice/housekeeping

Small cleanup
This commit is contained in:
edwardcapriolo
2015-02-21 19:01:28 -05:00
19 changed files with 193 additions and 164 deletions

16
.gitignore vendored Normal file
View File

@ -0,0 +1,16 @@
# Eclipse
.classpath
.project
.settings/
# Intellij
.idea/
*.iml
*.iws
# Mac
.DS_Store
# Maven
log/
target/

145
README.md
View File

@ -1,66 +1,79 @@
# Gossip
Gossip protocol is a method for a group of nodes to discover and check the livelyness of a cluster. More information can be found at http://en.wikipedia.org/wiki/Gossip_protocol.
Gossip protocol is a method for a group of nodes to discover and check the liveliness of a cluster. More information can be found at http://en.wikipedia.org/wiki/Gossip_protocol.
The original implementation was forked from https://code.google.com/p/java-gossip/. Several bug fixes and changes have already been added.
The original implementation was forked from https://code.google.com/p/java-gossip/. Several bug fixes and changes have already been added.
Usage
----- Usage
-----
To gossip you need one or more seed nodes. Seed is just a list of places to initially connect to.
To gossip you need one or more seed nodes. Seed is just a list of places to initially connect to.
GossipSettings settings = new GossipSettings();
int seedNodes = 3; ```java
ArrayList<GossipMember> startupMembers = new ArrayList<GossipMember>(); GossipSettings settings = new GossipSettings();
for (int i = 1; i < seedNodes+1; ++i) { int seedNodes = 3;
startupMembers.add(new RemoteGossipMember("127.0.0." + i, 2000, i + "")); List<GossipMember> startupMembers = new ArrayList<>();
} for (int i = 1; i < seedNodes+1; ++i) {
startupMembers.add(new RemoteGossipMember("127.0.0." + i, 2000, i + ""));
Here we start five gossip processes and check that they discover each other. (Normally these are on different hosts but here we give each process a distinct local ip. }
```
ArrayList<GossipService> clients = new ArrayList<GossipService>();
int clusterMembers = 5; Here we start five gossip processes and check that they discover each other. (Normally these are on different hosts but here we give each process a distinct local ip.
for (int i = 1; i < clusterMembers+1; ++i) {
GossipService gossipService = new GossipService("127.0.0." + i, 2000, i + "", ```java
LogLevel.DEBUG, startupMembers, settings, null); List<GossipService> clients = new ArrayList<>();
clients.add(gossipService); int clusterMembers = 5;
gossipService.start(); for (int i = 1; i < clusterMembers+1; ++i) {
} GossipService gossipService = new GossipService("127.0.0." + i, 2000, i + "",
LogLevel.DEBUG, startupMembers, settings, null);
Later we can check that the nodes discover each other clients.add(gossipService);
gossipService.start();
Thread.sleep(10000); }
for (int i = 0; i < clusterMembers; ++i) { ```
Assert.assertEquals(4, clients.get(i).get_gossipManager().getMemberList().size());
} Later we can check that the nodes discover each other
Event Listener ```java
------ Thread.sleep(10000);
for (int i = 0; i < clusterMembers; ++i) {
The status can be polled using the getters that return immutable lists. Assert.assertEquals(4, clients.get(i).get_gossipManager().getMemberList().size());
}
List<LocalGossipMember> getMemberList() ```
public List<LocalGossipMember> getDeadList()
Event Listener
Users can also attach an event listener: ------
GossipService gossipService = new GossipService("127.0.0." + i, 2000, i + "", LogLevel.DEBUG, The status can be polled using the getters that return immutable lists.
startupMembers, settings,
new GossipListener(){ ```java
@Override List<LocalGossipMember> getMemberList()
public void gossipEvent(GossipMember member, GossipState state) { public List<LocalGossipMember> getDeadList()
System.out.println(member+" "+ state); ```
}
}); Users can also attach an event listener:
```java
Maven GossipService gossipService = new GossipService("127.0.0." + i, 2000, i + "", LogLevel.DEBUG,
------ startupMembers, settings,
new GossipListener(){
@Override
You can get this software from maven central. public void gossipEvent(GossipMember member, GossipState state) {
System.out.println(member+" "+ state);
<dependency> }
<groupId>io.teknek</groupId> });
<artifactId>gossip</artifactId> ```
<version>${pick_the_latest_version}</version>
</dependency>
Maven
------
You can get this software from maven central.
```xml
<dependency>
<groupId>io.teknek</groupId>
<artifactId>gossip</artifactId>
<version>${pick_the_latest_version}</version>
</dependency>
```

View File

@ -7,20 +7,20 @@ import org.json.JSONObject;
/** /**
* A abstract class representing a gossip member. * A abstract class representing a gossip member.
* *
* @author joshclemm, harmenw * @author joshclemm, harmenw
*/ */
public abstract class GossipMember implements Comparable<GossipMember>{ public abstract class GossipMember implements Comparable<GossipMember>{
public static final String JSON_HOST = "host"; public static final String JSON_HOST = "host";
public static final String JSON_PORT = "port"; public static final String JSON_PORT = "port";
public static final String JSON_HEARTBEAT = "heartbeat"; public static final String JSON_HEARTBEAT = "heartbeat";
public static final String JSON_ID = "id"; public static final String JSON_ID = "id";
protected String _host; protected final String _host;
protected int _port; protected final int _port;
protected int _heartbeat; protected int _heartbeat;
protected String _id; protected String _id;
/** /**
* Constructor. * Constructor.
* @param host The hostname or IP address. * @param host The hostname or IP address.
@ -33,7 +33,7 @@ public abstract class GossipMember implements Comparable<GossipMember>{
_id = id; _id = id;
_heartbeat = heartbeat; _heartbeat = heartbeat;
} }
/** /**
* Get the hostname or IP address of the remote gossip member. * Get the hostname or IP address of the remote gossip member.
* @return The hostname or IP address. * @return The hostname or IP address.
@ -41,7 +41,7 @@ public abstract class GossipMember implements Comparable<GossipMember>{
public String getHost() { public String getHost() {
return _host; return _host;
} }
/** /**
* Get the port number of the remote gossip member. * Get the port number of the remote gossip member.
* @return The port number. * @return The port number.
@ -65,7 +65,7 @@ public abstract class GossipMember implements Comparable<GossipMember>{
public int getHeartbeat() { public int getHeartbeat() {
return _heartbeat; return _heartbeat;
} }
/** /**
* Set the heartbeat of this gossip member. * Set the heartbeat of this gossip member.
* @param heartbeat The new heartbeat. * @param heartbeat The new heartbeat.
@ -73,8 +73,8 @@ public abstract class GossipMember implements Comparable<GossipMember>{
public void setHeartbeat(int heartbeat) { public void setHeartbeat(int heartbeat) {
this._heartbeat = heartbeat; this._heartbeat = heartbeat;
} }
public String getId() { public String getId() {
return _id; return _id;
} }
@ -86,7 +86,7 @@ public abstract class GossipMember implements Comparable<GossipMember>{
public String toString() { public String toString() {
return "Member [address=" + getAddress() + ", id=" + _id + ", heartbeat=" + _heartbeat + "]"; return "Member [address=" + getAddress() + ", id=" + _id + ", heartbeat=" + _heartbeat + "]";
} }
/** /**
* @see java.lang.Object#hashCode() * @see java.lang.Object#hashCode()
*/ */
@ -136,7 +136,7 @@ public abstract class GossipMember implements Comparable<GossipMember>{
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} }
public int compareTo(GossipMember other){ public int compareTo(GossipMember other){
return this.getAddress().compareTo(other.getAddress()); return this.getAddress().compareTo(other.getAddress());
} }

View File

@ -7,7 +7,6 @@ import java.io.IOException;
import org.json.JSONException; import org.json.JSONException;
public class GossipRunner { public class GossipRunner {
private StartupSettings _settings;
public static void main(String[] args) { public static void main(String[] args) {
File configFile; File configFile;
@ -26,9 +25,9 @@ public class GossipRunner {
if (configFile != null && configFile.exists()) { if (configFile != null && configFile.exists()) {
try { try {
System.out.println("Parsing the configuration file..."); System.out.println("Parsing the configuration file...");
_settings = StartupSettings.fromJSONFile(configFile); StartupSettings _settings = StartupSettings.fromJSONFile(configFile);
GossipService gossipService = new GossipService(_settings); GossipService gossipService = new GossipService(_settings);
System.out.println("Gossip service successfully inialized, let's start it..."); System.out.println("Gossip service successfully initialized, let's start it...");
gossipService.start(); gossipService.start();
} catch (FileNotFoundException e) { } catch (FileNotFoundException e) {
System.err.println("The given file is not found!"); System.err.println("The given file is not found!");

View File

@ -1,9 +1,9 @@
package com.google.code.gossip; package com.google.code.gossip;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.SocketException;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.util.ArrayList; import java.util.List;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import com.google.code.gossip.event.GossipListener; import com.google.code.gossip.event.GossipListener;
@ -12,7 +12,7 @@ import com.google.code.gossip.manager.random.RandomGossipManager;
/** /**
* This object represents the service which is responsible for gossiping with other gossip members. * This object represents the service which is responsible for gossiping with other gossip members.
* *
* @author joshclemm, harmenw * @author joshclemm, harmenw
*/ */
public class GossipService { public class GossipService {
@ -23,7 +23,7 @@ public class GossipService {
/** /**
* Constructor with the default settings. * Constructor with the default settings.
* *
* @throws InterruptedException * @throws InterruptedException
* @throws UnknownHostException * @throws UnknownHostException
*/ */
@ -36,13 +36,12 @@ public class GossipService {
/** /**
* Setup the client's lists, gossiping parameters, and parse the startup config file. * Setup the client's lists, gossiping parameters, and parse the startup config file.
* *
* @throws SocketException
* @throws InterruptedException * @throws InterruptedException
* @throws UnknownHostException * @throws UnknownHostException
*/ */
public GossipService(String ipAddress, int port, String id, int logLevel, public GossipService(String ipAddress, int port, String id, int logLevel,
ArrayList<GossipMember> gossipMembers, GossipSettings settings, GossipListener listener) List<GossipMember> gossipMembers, GossipSettings settings, GossipListener listener)
throws InterruptedException, UnknownHostException { throws InterruptedException, UnknownHostException {
_gossipManager = new RandomGossipManager(ipAddress, port, id, settings, gossipMembers, listener); _gossipManager = new RandomGossipManager(ipAddress, port, id, settings, gossipMembers, listener);
} }

View File

@ -9,20 +9,20 @@ import javax.management.timer.Timer;
* This object represents a timer for a gossip member. When the timer has elapsed without being * This object represents a timer for a gossip member. When the timer has elapsed without being
* reset in the meantime, it will inform the GossipService about this who in turn will put the * reset in the meantime, it will inform the GossipService about this who in turn will put the
* gossip member on the dead list, because it is apparantly not alive anymore. * gossip member on the dead list, because it is apparantly not alive anymore.
* *
* @author joshclemm, harmenw * @author joshclemm, harmenw
*/ */
public class GossipTimeoutTimer extends Timer { public class GossipTimeoutTimer extends Timer {
private long _sleepTime; private final long _sleepTime;
private LocalGossipMember _source; private final LocalGossipMember _source;
/** /**
* Constructor. Creates a reset-able timer that wakes up after millisecondsSleepTime. * Constructor. Creates a reset-able timer that wakes up after millisecondsSleepTime.
* *
* @param millisecondsSleepTime * @param millisecondsSleepTime
* The time for this timer to wait before an event. * The time for this timer to wait before an event.
* @param service * @param notificationListener
* @param member * @param member
*/ */
public GossipTimeoutTimer(long millisecondsSleepTime, NotificationListener notificationListener, public GossipTimeoutTimer(long millisecondsSleepTime, NotificationListener notificationListener,
@ -51,7 +51,7 @@ public class GossipTimeoutTimer extends Timer {
/** /**
* Adds a new wake-up time for this timer. * Adds a new wake-up time for this timer.
* *
* @param milliseconds * @param milliseconds
*/ */
private void setWakeupTime(long milliseconds) { private void setWakeupTime(long milliseconds) {

View File

@ -5,24 +5,24 @@ import javax.management.NotificationListener;
/** /**
* This object represent a gossip member with the properties known locally. These objects are stored * This object represent a gossip member with the properties known locally. These objects are stored
* in the local list of gossip member.s * in the local list of gossip member.s
* *
* @author harmenw * @author harmenw
*/ */
public class LocalGossipMember extends GossipMember { public class LocalGossipMember extends GossipMember {
/** The timeout timer for this gossip member. */ /** The timeout timer for this gossip member. */
private transient GossipTimeoutTimer timeoutTimer; private final transient GossipTimeoutTimer timeoutTimer;
/** /**
* Constructor. * Constructor.
* *
* @param host * @param hostname
* The hostname or IP address. * The hostname or IP address.
* @param port * @param port
* The port number. * The port number.
* @param id
* @param heartbeat * @param heartbeat
* The current heartbeat. * The current heartbeat.
* @param gossipService * @param notificationListener
* The GossipService object.
* @param cleanupTimeout * @param cleanupTimeout
* The cleanup timeout for this gossip member. * The cleanup timeout for this gossip member.
*/ */

View File

@ -3,15 +3,15 @@ package com.google.code.gossip;
/** /**
* The object represents a gossip member with the properties as received from a remote gossip * The object represents a gossip member with the properties as received from a remote gossip
* member. * member.
* *
* @author harmenw * @author harmenw
*/ */
public class RemoteGossipMember extends GossipMember { public class RemoteGossipMember extends GossipMember {
/** /**
* Constructor. * Constructor.
* *
* @param host * @param hostname
* The hostname or IP address. * The hostname or IP address.
* @param port * @param port
* The port number. * The port number.
@ -24,8 +24,8 @@ public class RemoteGossipMember extends GossipMember {
/** /**
* Construct a RemoteGossipMember with a heartbeat of 0. * Construct a RemoteGossipMember with a heartbeat of 0.
* *
* @param host * @param hostname
* The hostname or IP address. * The hostname or IP address.
* @param port * @param port
* The port number. * The port number.

View File

@ -6,6 +6,7 @@ import java.io.FileNotFoundException;
import java.io.FileReader; import java.io.FileReader;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List;
import org.json.JSONArray; import org.json.JSONArray;
import org.json.JSONException; import org.json.JSONException;
@ -13,7 +14,7 @@ import org.json.JSONObject;
/** /**
* This object represents the settings used when starting the gossip service. * This object represents the settings used when starting the gossip service.
* *
* @author harmenw * @author harmenw
*/ */
public class StartupSettings { public class StartupSettings {
@ -25,14 +26,14 @@ public class StartupSettings {
private int _logLevel; private int _logLevel;
/** The gossip settings used at startup. */ /** The gossip settings used at startup. */
private GossipSettings _gossipSettings; private final GossipSettings _gossipSettings;
/** The list with gossip members to start with. */ /** The list with gossip members to start with. */
private ArrayList<GossipMember> _gossipMembers; private final List<GossipMember> _gossipMembers;
/** /**
* Constructor. * Constructor.
* *
* @param port * @param port
* The port to start the service on. * The port to start the service on.
*/ */
@ -42,7 +43,7 @@ public class StartupSettings {
/** /**
* Constructor. * Constructor.
* *
* @param port * @param port
* The port to start the service on. * The port to start the service on.
*/ */
@ -50,12 +51,12 @@ public class StartupSettings {
_port = port; _port = port;
_logLevel = logLevel; _logLevel = logLevel;
_gossipSettings = gossipSettings; _gossipSettings = gossipSettings;
_gossipMembers = new ArrayList<GossipMember>(); _gossipMembers = new ArrayList<>();
} }
/** /**
* Set the port of the gossip service. * Set the port of the gossip service.
* *
* @param port * @param port
* The port for the gossip service. * The port for the gossip service.
*/ */
@ -65,7 +66,7 @@ public class StartupSettings {
/** /**
* Get the port for the gossip service. * Get the port for the gossip service.
* *
* @return The port of the gossip service. * @return The port of the gossip service.
*/ */
public int getPort() { public int getPort() {
@ -74,7 +75,7 @@ public class StartupSettings {
/** /**
* Set the log level of the gossip service. * Set the log level of the gossip service.
* *
* @param logLevel * @param logLevel
* The log level({LogLevel}). * The log level({LogLevel}).
*/ */
@ -84,7 +85,7 @@ public class StartupSettings {
/** /**
* Get the log level of the gossip service. * Get the log level of the gossip service.
* *
* @return The log level. * @return The log level.
*/ */
public int getLogLevel() { public int getLogLevel() {
@ -93,7 +94,7 @@ public class StartupSettings {
/** /**
* Get the GossipSettings. * Get the GossipSettings.
* *
* @return The GossipSettings object. * @return The GossipSettings object.
*/ */
public GossipSettings getGossipSettings() { public GossipSettings getGossipSettings() {
@ -102,7 +103,7 @@ public class StartupSettings {
/** /**
* Add a gossip member to the list of members to start with. * Add a gossip member to the list of members to start with.
* *
* @param member * @param member
* The member to add. * The member to add.
*/ */
@ -112,16 +113,16 @@ public class StartupSettings {
/** /**
* Get the list with gossip members. * Get the list with gossip members.
* *
* @return The gossip members. * @return The gossip members.
*/ */
public ArrayList<GossipMember> getGossipMembers() { public List<GossipMember> getGossipMembers() {
return _gossipMembers; return _gossipMembers;
} }
/** /**
* Parse the settings for the gossip service from a JSON file. * Parse the settings for the gossip service from a JSON file.
* *
* @param jsonFile * @param jsonFile
* The file object which refers to the JSON config file. * The file object which refers to the JSON config file.
* @return The StartupSettings object with the settings from the config file. * @return The StartupSettings object with the settings from the config file.

View File

@ -2,8 +2,8 @@ package com.google.code.gossip.event;
public enum GossipState { public enum GossipState {
UP("up"), DOWN("down"); UP("up"), DOWN("down");
private String state; private final String state;
private GossipState(String state){ private GossipState(String state){
this.state = state; this.state = state;
} }

View File

@ -3,6 +3,7 @@ package com.google.code.gossip.examples;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List;
import com.google.code.gossip.GossipMember; import com.google.code.gossip.GossipMember;
import com.google.code.gossip.GossipService; import com.google.code.gossip.GossipService;
@ -13,7 +14,7 @@ import com.google.code.gossip.RemoteGossipMember;
/** /**
* This class is an example of how one could use the gossip service. Here we start multiple gossip * This class is an example of how one could use the gossip service. Here we start multiple gossip
* clients on this host as specified in the config file. * clients on this host as specified in the config file.
* *
* @author harmenw * @author harmenw
*/ */
public class GossipExample extends Thread { public class GossipExample extends Thread {
@ -41,14 +42,14 @@ public class GossipExample extends Thread {
try { try {
GossipSettings settings = new GossipSettings(); GossipSettings settings = new GossipSettings();
ArrayList<GossipService> clients = new ArrayList<GossipService>(); List<GossipService> clients = new ArrayList<>();
// Get my ip address. // Get my ip address.
String myIpAddress = InetAddress.getLocalHost().getHostAddress(); String myIpAddress = InetAddress.getLocalHost().getHostAddress();
// Create the gossip members and put them in a list and give them a port number starting with // Create the gossip members and put them in a list and give them a port number starting with
// 2000. // 2000.
ArrayList<GossipMember> startupMembers = new ArrayList<GossipMember>(); List<GossipMember> startupMembers = new ArrayList<>();
for (int i = 0; i < NUMBER_OF_CLIENTS; ++i) { for (int i = 0; i < NUMBER_OF_CLIENTS; ++i) {
startupMembers.add(new RemoteGossipMember(myIpAddress, 2000 + i, "")); startupMembers.add(new RemoteGossipMember(myIpAddress, 2000 + i, ""));
} }

View File

@ -14,7 +14,7 @@ import com.google.code.gossip.LocalGossipMember;
*/ */
abstract public class ActiveGossipThread implements Runnable { abstract public class ActiveGossipThread implements Runnable {
private GossipManager _gossipManager; private final GossipManager _gossipManager;
private final AtomicBoolean _keepRunning; private final AtomicBoolean _keepRunning;
@ -50,7 +50,7 @@ abstract public class ActiveGossipThread implements Runnable {
/** /**
* Abstract method which should be implemented by a subclass. This method should return a member * Abstract method which should be implemented by a subclass. This method should return a member
* of the list to gossip with. * of the list to gossip with.
* *
* @param memberList * @param memberList
* The list of members which are stored in the local list of members. * The list of members which are stored in the local list of members.
* @return The chosen LocalGossipMember to gossip with. * @return The chosen LocalGossipMember to gossip with.

View File

@ -24,24 +24,24 @@ import com.google.code.gossip.event.GossipListener;
import com.google.code.gossip.event.GossipState; import com.google.code.gossip.event.GossipState;
public abstract class GossipManager extends Thread implements NotificationListener { public abstract class GossipManager extends Thread implements NotificationListener {
public static final Logger LOGGER = Logger.getLogger(GossipManager.class); public static final Logger LOGGER = Logger.getLogger(GossipManager.class);
public static final int MAX_PACKET_SIZE = 102400; public static final int MAX_PACKET_SIZE = 102400;
private ConcurrentSkipListMap<LocalGossipMember,GossipState> members; private final ConcurrentSkipListMap<LocalGossipMember,GossipState> members;
private LocalGossipMember _me; private final LocalGossipMember _me;
private GossipSettings _settings; private final GossipSettings _settings;
private AtomicBoolean _gossipServiceRunning; private final AtomicBoolean _gossipServiceRunning;
private ExecutorService _gossipThreadExecutor; private final Class<? extends PassiveGossipThread> _passiveGossipThreadClass;
private Class<? extends PassiveGossipThread> _passiveGossipThreadClass; private final Class<? extends ActiveGossipThread> _activeGossipThreadClass;
private PassiveGossipThread passiveGossipThread; private final GossipListener listener;
private Class<? extends ActiveGossipThread> _activeGossipThreadClass;
private ActiveGossipThread activeGossipThread; private ActiveGossipThread activeGossipThread;
private GossipListener listener; private PassiveGossipThread passiveGossipThread;
private ExecutorService _gossipThreadExecutor;
public GossipManager(Class<? extends PassiveGossipThread> passiveGossipThreadClass, public GossipManager(Class<? extends PassiveGossipThread> passiveGossipThreadClass,
Class<? extends ActiveGossipThread> activeGossipThreadClass, String address, int port, Class<? extends ActiveGossipThread> activeGossipThreadClass, String address, int port,
String id, GossipSettings settings, ArrayList<GossipMember> gossipMembers, String id, GossipSettings settings, List<GossipMember> gossipMembers,
GossipListener listener) { GossipListener listener) {
_passiveGossipThreadClass = passiveGossipThreadClass; _passiveGossipThreadClass = passiveGossipThreadClass;
_activeGossipThreadClass = activeGossipThreadClass; _activeGossipThreadClass = activeGossipThreadClass;
@ -87,7 +87,7 @@ public abstract class GossipManager extends Thread implements NotificationListen
listener.gossipEvent(m, GossipState.UP); listener.gossipEvent(m, GossipState.UP);
} }
} }
public GossipSettings getSettings() { public GossipSettings getSettings() {
return _settings; return _settings;
} }
@ -105,7 +105,7 @@ public abstract class GossipManager extends Thread implements NotificationListen
public LocalGossipMember getMyself() { public LocalGossipMember getMyself() {
return _me; return _me;
} }
public List<LocalGossipMember> getDeadList() { public List<LocalGossipMember> getDeadList() {
List<LocalGossipMember> up = new ArrayList<>(); List<LocalGossipMember> up = new ArrayList<>();
for (Entry<LocalGossipMember, GossipState> entry : members.entrySet()){ for (Entry<LocalGossipMember, GossipState> entry : members.entrySet()){
@ -119,8 +119,6 @@ public abstract class GossipManager extends Thread implements NotificationListen
/** /**
* Starts the client. Specifically, start the various cycles for this protocol. Start the gossip * Starts the client. Specifically, start the various cycles for this protocol. Start the gossip
* thread and start the receiver thread. * thread and start the receiver thread.
*
* @throws InterruptedException
*/ */
public void run() { public void run() {
for (LocalGossipMember member : members.keySet()) { for (LocalGossipMember member : members.keySet()) {
@ -160,7 +158,7 @@ public abstract class GossipManager extends Thread implements NotificationListen
activeGossipThread.shutdown(); activeGossipThread.shutdown();
try { try {
boolean result = _gossipThreadExecutor.awaitTermination(1000, TimeUnit.MILLISECONDS); boolean result = _gossipThreadExecutor.awaitTermination(1000, TimeUnit.MILLISECONDS);
if (result == false){ if (!result){
LOGGER.error("executor shutdown timed out"); LOGGER.error("executor shutdown timed out");
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {

View File

@ -26,13 +26,13 @@ import com.google.code.gossip.RemoteGossipMember;
* determine the incoming message. * determine the incoming message.
*/ */
abstract public class PassiveGossipThread implements Runnable { abstract public class PassiveGossipThread implements Runnable {
public static final Logger LOGGER = Logger.getLogger(PassiveGossipThread.class); public static final Logger LOGGER = Logger.getLogger(PassiveGossipThread.class);
/** The socket used for the passive thread of the gossip service. */ /** The socket used for the passive thread of the gossip service. */
private DatagramSocket _server; private DatagramSocket _server;
private GossipManager _gossipManager; private final GossipManager _gossipManager;
private AtomicBoolean _keepRunning; private AtomicBoolean _keepRunning;
@ -73,7 +73,7 @@ abstract public class PassiveGossipThread implements Runnable {
// A package larger than this would not be possible to be send from a GossipService, // A package larger than this would not be possible to be send from a GossipService,
// since this is check before sending the message. // since this is check before sending the message.
// This could normally only occur when the list of members is very big, // This could normally only occur when the list of members is very big,
// or when the packet is misformed, and the first 4 bytes is not the right in anymore. // or when the packet is malformed, and the first 4 bytes is not the right in anymore.
// For this reason we regards the message. // For this reason we regards the message.
if (packet_length <= GossipManager.MAX_PACKET_SIZE) { if (packet_length <= GossipManager.MAX_PACKET_SIZE) {
byte[] json_bytes = new byte[packet_length]; byte[] json_bytes = new byte[packet_length];
@ -84,7 +84,7 @@ abstract public class PassiveGossipThread implements Runnable {
GossipService.LOGGER.debug("Received message (" + packet_length + " bytes): " GossipService.LOGGER.debug("Received message (" + packet_length + " bytes): "
+ receivedMessage); + receivedMessage);
try { try {
ArrayList<GossipMember> remoteGossipMembers = new ArrayList<GossipMember>(); List<GossipMember> remoteGossipMembers = new ArrayList<>();
RemoteGossipMember senderMember = null; RemoteGossipMember senderMember = null;
GossipService.LOGGER.debug("Received member list:"); GossipService.LOGGER.debug("Received member list:");
JSONArray jsonArray = new JSONArray(receivedMessage); JSONArray jsonArray = new JSONArray(receivedMessage);
@ -136,7 +136,7 @@ abstract public class PassiveGossipThread implements Runnable {
/** /**
* Abstract method for merging the local and remote list. * Abstract method for merging the local and remote list.
* *
* @param gossipManager * @param gossipManager
* The GossipManager for retrieving the local members and dead members list. * The GossipManager for retrieving the local members and dead members list.
* @param senderMember * @param senderMember
@ -147,4 +147,4 @@ abstract public class PassiveGossipThread implements Runnable {
*/ */
abstract protected void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember, abstract protected void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember,
List<GossipMember> remoteList); List<GossipMember> remoteList);
} }

View File

@ -19,7 +19,9 @@ public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread
* Merge remote list (received from peer), and our local member list. Simply, we must update the * Merge remote list (received from peer), and our local member list. Simply, we must update the
* heartbeats that the remote list has with our list. Also, some additional logic is needed to * heartbeats that the remote list has with our list. Also, some additional logic is needed to
* make sure we have not timed out a member and then immediately received a list with that member. * make sure we have not timed out a member and then immediately received a list with that member.
* *
* @param gossipManager
* @param senderMember
* @param remoteList * @param remoteList
*/ */
protected void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember, protected void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember,
@ -92,6 +94,6 @@ public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread
} }
} }
} }

View File

@ -35,8 +35,7 @@ abstract public class SendMembersActiveGossipThread extends ActiveGossipThread {
GossipService.LOGGER.debug("Sending memberlist to " + dest + ":" + member.getPort()); GossipService.LOGGER.debug("Sending memberlist to " + dest + ":" + member.getPort());
jsonArray.put(me.toJSONObject()); jsonArray.put(me.toJSONObject());
GossipService.LOGGER.debug(me); GossipService.LOGGER.debug(me);
for (int i = 0; i < memberList.size(); i++) { for (LocalGossipMember other : memberList) {
LocalGossipMember other = memberList.get(i);
jsonArray.put(other.toJSONObject()); jsonArray.put(other.toJSONObject());
GossipService.LOGGER.debug(other); GossipService.LOGGER.debug(other);
} }

View File

@ -11,7 +11,7 @@ import com.google.code.gossip.manager.impl.SendMembersActiveGossipThread;
public class RandomActiveGossipThread extends SendMembersActiveGossipThread { public class RandomActiveGossipThread extends SendMembersActiveGossipThread {
/** The Random used for choosing a member to gossip with. */ /** The Random used for choosing a member to gossip with. */
private Random _random; private final Random _random;
public RandomActiveGossipThread(GossipManager gossipManager) { public RandomActiveGossipThread(GossipManager gossipManager) {
super(gossipManager); super(gossipManager);
@ -21,7 +21,7 @@ public class RandomActiveGossipThread extends SendMembersActiveGossipThread {
/** /**
* [The selectToSend() function.] Find a random peer from the local membership list. In the case * [The selectToSend() function.] Find a random peer from the local membership list. In the case
* where this client is the only member in the list, this method will return null. * where this client is the only member in the list, this method will return null.
* *
* @return Member random member if list is greater than 1, null otherwise * @return Member random member if list is greater than 1, null otherwise
*/ */
protected LocalGossipMember selectPartner(List<LocalGossipMember> memberList) { protected LocalGossipMember selectPartner(List<LocalGossipMember> memberList) {

View File

@ -1,16 +1,16 @@
package com.google.code.gossip.manager.random; package com.google.code.gossip.manager.random;
import java.util.ArrayList;
import com.google.code.gossip.GossipMember; import com.google.code.gossip.GossipMember;
import com.google.code.gossip.GossipSettings; import com.google.code.gossip.GossipSettings;
import com.google.code.gossip.event.GossipListener; import com.google.code.gossip.event.GossipListener;
import com.google.code.gossip.manager.GossipManager; import com.google.code.gossip.manager.GossipManager;
import com.google.code.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread; import com.google.code.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread;
import java.util.List;
public class RandomGossipManager extends GossipManager { public class RandomGossipManager extends GossipManager {
public RandomGossipManager(String address, int port, String id, GossipSettings settings, public RandomGossipManager(String address, int port, String id, GossipSettings settings,
ArrayList<GossipMember> gossipMembers, GossipListener listener) { List<GossipMember> gossipMembers, GossipListener listener) {
super(OnlyProcessReceivedPassiveGossipThread.class, RandomActiveGossipThread.class, address, super(OnlyProcessReceivedPassiveGossipThread.class, RandomActiveGossipThread.class, address,
port, id, settings, gossipMembers, listener); port, id, settings, gossipMembers, listener);
} }

View File

@ -2,6 +2,7 @@ package io.teknek.gossip;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List;
import junit.framework.Assert; import junit.framework.Assert;
@ -17,25 +18,25 @@ import com.google.code.gossip.event.GossipState;
public class TenNodeThreeSeedTest { public class TenNodeThreeSeedTest {
@Test @Test
public void test() throws UnknownHostException, InterruptedException{ public void test() throws UnknownHostException, InterruptedException{
abc(); abc();
} }
@Test @Test
public void testAgain() throws UnknownHostException, InterruptedException{ public void testAgain() throws UnknownHostException, InterruptedException{
abc(); abc();
} }
public void abc() throws InterruptedException, UnknownHostException{ public void abc() throws InterruptedException, UnknownHostException{
GossipSettings settings = new GossipSettings(); GossipSettings settings = new GossipSettings();
int seedNodes = 3; int seedNodes = 3;
ArrayList<GossipMember> startupMembers = new ArrayList<GossipMember>(); List<GossipMember> startupMembers = new ArrayList<>();
for (int i = 1; i < seedNodes+1; ++i) { for (int i = 1; i < seedNodes+1; ++i) {
startupMembers.add(new RemoteGossipMember("127.0.0." + i, 2000, i + "")); startupMembers.add(new RemoteGossipMember("127.0.0." + i, 2000, i + ""));
} }
ArrayList<GossipService> clients = new ArrayList<GossipService>(); List<GossipService> clients = new ArrayList<>();
int clusterMembers = 5; int clusterMembers = 5;
for (int i = 1; i < clusterMembers+1; ++i) { for (int i = 1; i < clusterMembers+1; ++i) {
GossipService gossipService = new GossipService("127.0.0." + i, 2000, i + "", LogLevel.DEBUG, GossipService gossipService = new GossipService("127.0.0." + i, 2000, i + "", LogLevel.DEBUG,
@ -56,6 +57,6 @@ public class TenNodeThreeSeedTest {
} }
for (int i = 0; i < clusterMembers; ++i) { for (int i = 0; i < clusterMembers; ++i) {
clients.get(i).shutdown(); clients.get(i).shutdown();
} }
} }
} }