Small cleanup

This commit is contained in:
Joseph Price
2015-02-21 15:31:23 -05:00
parent 07f47aced6
commit 4aba10d2c3
19 changed files with 193 additions and 164 deletions

16
.gitignore vendored Normal file
View File

@ -0,0 +1,16 @@
# Eclipse
.classpath
.project
.settings/
# Intellij
.idea/
*.iml
*.iws
# Mac
.DS_Store
# Maven
log/
target/

View File

@ -1,5 +1,6 @@
# Gossip
Gossip protocol is a method for a group of nodes to discover and check the livelyness of a cluster. More information can be found at http://en.wikipedia.org/wiki/Gossip_protocol.
Gossip protocol is a method for a group of nodes to discover and check the liveliness of a cluster. More information can be found at http://en.wikipedia.org/wiki/Gossip_protocol.
The original implementation was forked from https://code.google.com/p/java-gossip/. Several bug fixes and changes have already been added.
@ -8,16 +9,19 @@ Usage
To gossip you need one or more seed nodes. Seed is just a list of places to initially connect to.
```java
GossipSettings settings = new GossipSettings();
int seedNodes = 3;
ArrayList<GossipMember> startupMembers = new ArrayList<GossipMember>();
List<GossipMember> startupMembers = new ArrayList<>();
for (int i = 1; i < seedNodes+1; ++i) {
startupMembers.add(new RemoteGossipMember("127.0.0." + i, 2000, i + ""));
}
```
Here we start five gossip processes and check that they discover each other. (Normally these are on different hosts but here we give each process a distinct local ip.
ArrayList<GossipService> clients = new ArrayList<GossipService>();
```java
List<GossipService> clients = new ArrayList<>();
int clusterMembers = 5;
for (int i = 1; i < clusterMembers+1; ++i) {
GossipService gossipService = new GossipService("127.0.0." + i, 2000, i + "",
@ -25,24 +29,30 @@ Here we start five gossip processes and check that they discover each other. (No
clients.add(gossipService);
gossipService.start();
}
```
Later we can check that the nodes discover each other
```java
Thread.sleep(10000);
for (int i = 0; i < clusterMembers; ++i) {
Assert.assertEquals(4, clients.get(i).get_gossipManager().getMemberList().size());
}
```
Event Listener
------
The status can be polled using the getters that return immutable lists.
```java
List<LocalGossipMember> getMemberList()
public List<LocalGossipMember> getDeadList()
```
Users can also attach an event listener:
```java
GossipService gossipService = new GossipService("127.0.0." + i, 2000, i + "", LogLevel.DEBUG,
startupMembers, settings,
new GossipListener(){
@ -51,6 +61,7 @@ Users can also attach an event listener:
System.out.println(member+" "+ state);
}
});
```
Maven
@ -59,8 +70,10 @@ Maven
You can get this software from maven central.
```xml
<dependency>
<groupId>io.teknek</groupId>
<artifactId>gossip</artifactId>
<version>${pick_the_latest_version}</version>
</dependency>
```

View File

@ -16,8 +16,8 @@ public abstract class GossipMember implements Comparable<GossipMember>{
public static final String JSON_PORT = "port";
public static final String JSON_HEARTBEAT = "heartbeat";
public static final String JSON_ID = "id";
protected String _host;
protected int _port;
protected final String _host;
protected final int _port;
protected int _heartbeat;
protected String _id;

View File

@ -7,7 +7,6 @@ import java.io.IOException;
import org.json.JSONException;
public class GossipRunner {
private StartupSettings _settings;
public static void main(String[] args) {
File configFile;
@ -26,9 +25,9 @@ public class GossipRunner {
if (configFile != null && configFile.exists()) {
try {
System.out.println("Parsing the configuration file...");
_settings = StartupSettings.fromJSONFile(configFile);
StartupSettings _settings = StartupSettings.fromJSONFile(configFile);
GossipService gossipService = new GossipService(_settings);
System.out.println("Gossip service successfully inialized, let's start it...");
System.out.println("Gossip service successfully initialized, let's start it...");
gossipService.start();
} catch (FileNotFoundException e) {
System.err.println("The given file is not found!");

View File

@ -1,9 +1,9 @@
package com.google.code.gossip;
import java.net.InetAddress;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
import org.apache.log4j.Logger;
import com.google.code.gossip.event.GossipListener;
@ -37,12 +37,11 @@ public class GossipService {
/**
* Setup the client's lists, gossiping parameters, and parse the startup config file.
*
* @throws SocketException
* @throws InterruptedException
* @throws UnknownHostException
*/
public GossipService(String ipAddress, int port, String id, int logLevel,
ArrayList<GossipMember> gossipMembers, GossipSettings settings, GossipListener listener)
List<GossipMember> gossipMembers, GossipSettings settings, GossipListener listener)
throws InterruptedException, UnknownHostException {
_gossipManager = new RandomGossipManager(ipAddress, port, id, settings, gossipMembers, listener);
}

View File

@ -14,15 +14,15 @@ import javax.management.timer.Timer;
*/
public class GossipTimeoutTimer extends Timer {
private long _sleepTime;
private LocalGossipMember _source;
private final long _sleepTime;
private final LocalGossipMember _source;
/**
* Constructor. Creates a reset-able timer that wakes up after millisecondsSleepTime.
*
* @param millisecondsSleepTime
* The time for this timer to wait before an event.
* @param service
* @param notificationListener
* @param member
*/
public GossipTimeoutTimer(long millisecondsSleepTime, NotificationListener notificationListener,

View File

@ -10,19 +10,19 @@ import javax.management.NotificationListener;
*/
public class LocalGossipMember extends GossipMember {
/** The timeout timer for this gossip member. */
private transient GossipTimeoutTimer timeoutTimer;
private final transient GossipTimeoutTimer timeoutTimer;
/**
* Constructor.
*
* @param host
* @param hostname
* The hostname or IP address.
* @param port
* The port number.
* @param id
* @param heartbeat
* The current heartbeat.
* @param gossipService
* The GossipService object.
* @param notificationListener
* @param cleanupTimeout
* The cleanup timeout for this gossip member.
*/

View File

@ -11,7 +11,7 @@ public class RemoteGossipMember extends GossipMember {
/**
* Constructor.
*
* @param host
* @param hostname
* The hostname or IP address.
* @param port
* The port number.
@ -25,7 +25,7 @@ public class RemoteGossipMember extends GossipMember {
/**
* Construct a RemoteGossipMember with a heartbeat of 0.
*
* @param host
* @param hostname
* The hostname or IP address.
* @param port
* The port number.

View File

@ -6,6 +6,7 @@ import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.json.JSONArray;
import org.json.JSONException;
@ -25,10 +26,10 @@ public class StartupSettings {
private int _logLevel;
/** The gossip settings used at startup. */
private GossipSettings _gossipSettings;
private final GossipSettings _gossipSettings;
/** The list with gossip members to start with. */
private ArrayList<GossipMember> _gossipMembers;
private final List<GossipMember> _gossipMembers;
/**
* Constructor.
@ -50,7 +51,7 @@ public class StartupSettings {
_port = port;
_logLevel = logLevel;
_gossipSettings = gossipSettings;
_gossipMembers = new ArrayList<GossipMember>();
_gossipMembers = new ArrayList<>();
}
/**
@ -115,7 +116,7 @@ public class StartupSettings {
*
* @return The gossip members.
*/
public ArrayList<GossipMember> getGossipMembers() {
public List<GossipMember> getGossipMembers() {
return _gossipMembers;
}

View File

@ -2,7 +2,7 @@ package com.google.code.gossip.event;
public enum GossipState {
UP("up"), DOWN("down");
private String state;
private final String state;
private GossipState(String state){
this.state = state;

View File

@ -3,6 +3,7 @@ package com.google.code.gossip.examples;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
import com.google.code.gossip.GossipMember;
import com.google.code.gossip.GossipService;
@ -41,14 +42,14 @@ public class GossipExample extends Thread {
try {
GossipSettings settings = new GossipSettings();
ArrayList<GossipService> clients = new ArrayList<GossipService>();
List<GossipService> clients = new ArrayList<>();
// Get my ip address.
String myIpAddress = InetAddress.getLocalHost().getHostAddress();
// Create the gossip members and put them in a list and give them a port number starting with
// 2000.
ArrayList<GossipMember> startupMembers = new ArrayList<GossipMember>();
List<GossipMember> startupMembers = new ArrayList<>();
for (int i = 0; i < NUMBER_OF_CLIENTS; ++i) {
startupMembers.add(new RemoteGossipMember(myIpAddress, 2000 + i, ""));
}

View File

@ -14,7 +14,7 @@ import com.google.code.gossip.LocalGossipMember;
*/
abstract public class ActiveGossipThread implements Runnable {
private GossipManager _gossipManager;
private final GossipManager _gossipManager;
private final AtomicBoolean _keepRunning;

View File

@ -28,20 +28,20 @@ public abstract class GossipManager extends Thread implements NotificationListen
public static final Logger LOGGER = Logger.getLogger(GossipManager.class);
public static final int MAX_PACKET_SIZE = 102400;
private ConcurrentSkipListMap<LocalGossipMember,GossipState> members;
private LocalGossipMember _me;
private GossipSettings _settings;
private AtomicBoolean _gossipServiceRunning;
private ExecutorService _gossipThreadExecutor;
private Class<? extends PassiveGossipThread> _passiveGossipThreadClass;
private PassiveGossipThread passiveGossipThread;
private Class<? extends ActiveGossipThread> _activeGossipThreadClass;
private final ConcurrentSkipListMap<LocalGossipMember,GossipState> members;
private final LocalGossipMember _me;
private final GossipSettings _settings;
private final AtomicBoolean _gossipServiceRunning;
private final Class<? extends PassiveGossipThread> _passiveGossipThreadClass;
private final Class<? extends ActiveGossipThread> _activeGossipThreadClass;
private final GossipListener listener;
private ActiveGossipThread activeGossipThread;
private GossipListener listener;
private PassiveGossipThread passiveGossipThread;
private ExecutorService _gossipThreadExecutor;
public GossipManager(Class<? extends PassiveGossipThread> passiveGossipThreadClass,
Class<? extends ActiveGossipThread> activeGossipThreadClass, String address, int port,
String id, GossipSettings settings, ArrayList<GossipMember> gossipMembers,
String id, GossipSettings settings, List<GossipMember> gossipMembers,
GossipListener listener) {
_passiveGossipThreadClass = passiveGossipThreadClass;
_activeGossipThreadClass = activeGossipThreadClass;
@ -119,8 +119,6 @@ public abstract class GossipManager extends Thread implements NotificationListen
/**
* Starts the client. Specifically, start the various cycles for this protocol. Start the gossip
* thread and start the receiver thread.
*
* @throws InterruptedException
*/
public void run() {
for (LocalGossipMember member : members.keySet()) {
@ -160,7 +158,7 @@ public abstract class GossipManager extends Thread implements NotificationListen
activeGossipThread.shutdown();
try {
boolean result = _gossipThreadExecutor.awaitTermination(1000, TimeUnit.MILLISECONDS);
if (result == false){
if (!result){
LOGGER.error("executor shutdown timed out");
}
} catch (InterruptedException e) {

View File

@ -32,7 +32,7 @@ abstract public class PassiveGossipThread implements Runnable {
/** The socket used for the passive thread of the gossip service. */
private DatagramSocket _server;
private GossipManager _gossipManager;
private final GossipManager _gossipManager;
private AtomicBoolean _keepRunning;
@ -73,7 +73,7 @@ abstract public class PassiveGossipThread implements Runnable {
// A package larger than this would not be possible to be send from a GossipService,
// since this is check before sending the message.
// This could normally only occur when the list of members is very big,
// or when the packet is misformed, and the first 4 bytes is not the right in anymore.
// or when the packet is malformed, and the first 4 bytes is not the right in anymore.
// For this reason we regards the message.
if (packet_length <= GossipManager.MAX_PACKET_SIZE) {
byte[] json_bytes = new byte[packet_length];
@ -84,7 +84,7 @@ abstract public class PassiveGossipThread implements Runnable {
GossipService.LOGGER.debug("Received message (" + packet_length + " bytes): "
+ receivedMessage);
try {
ArrayList<GossipMember> remoteGossipMembers = new ArrayList<GossipMember>();
List<GossipMember> remoteGossipMembers = new ArrayList<>();
RemoteGossipMember senderMember = null;
GossipService.LOGGER.debug("Received member list:");
JSONArray jsonArray = new JSONArray(receivedMessage);

View File

@ -20,6 +20,8 @@ public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread
* heartbeats that the remote list has with our list. Also, some additional logic is needed to
* make sure we have not timed out a member and then immediately received a list with that member.
*
* @param gossipManager
* @param senderMember
* @param remoteList
*/
protected void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember,

View File

@ -35,8 +35,7 @@ abstract public class SendMembersActiveGossipThread extends ActiveGossipThread {
GossipService.LOGGER.debug("Sending memberlist to " + dest + ":" + member.getPort());
jsonArray.put(me.toJSONObject());
GossipService.LOGGER.debug(me);
for (int i = 0; i < memberList.size(); i++) {
LocalGossipMember other = memberList.get(i);
for (LocalGossipMember other : memberList) {
jsonArray.put(other.toJSONObject());
GossipService.LOGGER.debug(other);
}

View File

@ -11,7 +11,7 @@ import com.google.code.gossip.manager.impl.SendMembersActiveGossipThread;
public class RandomActiveGossipThread extends SendMembersActiveGossipThread {
/** The Random used for choosing a member to gossip with. */
private Random _random;
private final Random _random;
public RandomActiveGossipThread(GossipManager gossipManager) {
super(gossipManager);

View File

@ -1,16 +1,16 @@
package com.google.code.gossip.manager.random;
import java.util.ArrayList;
import com.google.code.gossip.GossipMember;
import com.google.code.gossip.GossipSettings;
import com.google.code.gossip.event.GossipListener;
import com.google.code.gossip.manager.GossipManager;
import com.google.code.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread;
import java.util.List;
public class RandomGossipManager extends GossipManager {
public RandomGossipManager(String address, int port, String id, GossipSettings settings,
ArrayList<GossipMember> gossipMembers, GossipListener listener) {
List<GossipMember> gossipMembers, GossipListener listener) {
super(OnlyProcessReceivedPassiveGossipThread.class, RandomActiveGossipThread.class, address,
port, id, settings, gossipMembers, listener);
}

View File

@ -2,6 +2,7 @@ package io.teknek.gossip;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
import junit.framework.Assert;
@ -31,11 +32,11 @@ public class TenNodeThreeSeedTest {
public void abc() throws InterruptedException, UnknownHostException{
GossipSettings settings = new GossipSettings();
int seedNodes = 3;
ArrayList<GossipMember> startupMembers = new ArrayList<GossipMember>();
List<GossipMember> startupMembers = new ArrayList<>();
for (int i = 1; i < seedNodes+1; ++i) {
startupMembers.add(new RemoteGossipMember("127.0.0." + i, 2000, i + ""));
}
ArrayList<GossipService> clients = new ArrayList<GossipService>();
List<GossipService> clients = new ArrayList<>();
int clusterMembers = 5;
for (int i = 1; i < clusterMembers+1; ++i) {
GossipService gossipService = new GossipService("127.0.0." + i, 2000, i + "", LogLevel.DEBUG,