Merge pull request #5 from edwardcapriolo/callback-for-state

Callback for state
This commit is contained in:
edwardcapriolo
2015-02-15 23:32:00 -05:00
15 changed files with 129 additions and 100 deletions

View File

@ -21,7 +21,7 @@ Here we start five gossip processes and check that they discover each other. (No
int clusterMembers = 5; int clusterMembers = 5;
for (int i = 1; i < clusterMembers+1; ++i) { for (int i = 1; i < clusterMembers+1; ++i) {
GossipService gossipService = new GossipService("127.0.0." + i, 2000, i + "", GossipService gossipService = new GossipService("127.0.0." + i, 2000, i + "",
LogLevel.DEBUG, startupMembers, settings); LogLevel.DEBUG, startupMembers, settings, null);
clients.add(gossipService); clients.add(gossipService);
gossipService.start(); gossipService.start();
} }
@ -33,9 +33,30 @@ Later we can check that the nodes discover each other
Assert.assertEquals(4, clients.get(i).get_gossipManager().getMemberList().size()); Assert.assertEquals(4, clients.get(i).get_gossipManager().getMemberList().size());
} }
Event Listener
------
The status can be polled using the getters that return immutable lists.
List<LocalGossipMember> getMemberList()
public List<LocalGossipMember> getDeadList()
Users can also attach an event listener:
GossipService gossipService = new GossipService("127.0.0." + i, 2000, i + "", LogLevel.DEBUG,
startupMembers, settings,
new GossipListener(){
@Override
public void gossipEvent(GossipMember member, GossipState state) {
System.out.println(member+" "+ state);
}
});
Maven Maven
------ ------
You can get this software from maven central. You can get this software from maven central.
<dependency> <dependency>

View File

@ -10,25 +10,15 @@ import org.json.JSONObject;
* *
* @author joshclemm, harmenw * @author joshclemm, harmenw
*/ */
public abstract class GossipMember { public abstract class GossipMember implements Comparable<GossipMember>{
/** The JSON key for the host property. */
public static final String JSON_HOST = "host"; public static final String JSON_HOST = "host";
/** The JSON key for the port property. */
public static final String JSON_PORT = "port"; public static final String JSON_PORT = "port";
/** The JSON key for the heartbeat property. */
public static final String JSON_HEARTBEAT = "heartbeat"; public static final String JSON_HEARTBEAT = "heartbeat";
public static final String JSON_ID = "id"; public static final String JSON_ID = "id";
/** The hostname or IP address of this gossip member. */
protected String _host; protected String _host;
/** The port number of this gossip member. */
protected int _port; protected int _port;
/** The current heartbeat of this gossip member. */
protected int _heartbeat; protected int _heartbeat;
protected String _id; protected String _id;
/** /**
@ -146,4 +136,8 @@ public abstract class GossipMember {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} }
public int compareTo(GossipMember other){
return this.getAddress().compareTo(other.getAddress());
}
} }

View File

@ -6,6 +6,7 @@ import java.net.UnknownHostException;
import java.util.ArrayList; import java.util.ArrayList;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import com.google.code.gossip.event.GossipListener;
import com.google.code.gossip.manager.GossipManager; import com.google.code.gossip.manager.GossipManager;
import com.google.code.gossip.manager.random.RandomGossipManager; import com.google.code.gossip.manager.random.RandomGossipManager;
@ -30,7 +31,7 @@ public class GossipService {
UnknownHostException { UnknownHostException {
this(InetAddress.getLocalHost().getHostAddress(), startupSettings.getPort(), "", this(InetAddress.getLocalHost().getHostAddress(), startupSettings.getPort(), "",
startupSettings.getLogLevel(), startupSettings.getGossipMembers(), startupSettings startupSettings.getLogLevel(), startupSettings.getGossipMembers(), startupSettings
.getGossipSettings()); .getGossipSettings(), null);
} }
/** /**
@ -41,9 +42,9 @@ public class GossipService {
* @throws UnknownHostException * @throws UnknownHostException
*/ */
public GossipService(String ipAddress, int port, String id, int logLevel, public GossipService(String ipAddress, int port, String id, int logLevel,
ArrayList<GossipMember> gossipMembers, GossipSettings settings) ArrayList<GossipMember> gossipMembers, GossipSettings settings, GossipListener listener)
throws InterruptedException, UnknownHostException { throws InterruptedException, UnknownHostException {
_gossipManager = new RandomGossipManager(ipAddress, port, id, settings, gossipMembers); _gossipManager = new RandomGossipManager(ipAddress, port, id, settings, gossipMembers, listener);
} }
public void start() { public void start() {

View File

@ -14,10 +14,7 @@ import javax.management.timer.Timer;
*/ */
public class GossipTimeoutTimer extends Timer { public class GossipTimeoutTimer extends Timer {
/** The amount of time this timer waits before generating a wake-up event. */
private long _sleepTime; private long _sleepTime;
/** The gossip member this timer is for. */
private LocalGossipMember _source; private LocalGossipMember _source;
/** /**

View File

@ -0,0 +1,7 @@
package com.google.code.gossip.event;
import com.google.code.gossip.GossipMember;
public interface GossipListener {
void gossipEvent(GossipMember member, GossipState state);
}

View File

@ -0,0 +1,10 @@
package com.google.code.gossip.event;
public enum GossipState {
UP("up"), DOWN("down");
private String state;
private GossipState(String state){
this.state = state;
}
}

View File

@ -58,7 +58,7 @@ public class GossipExample extends Thread {
// dead list handling. // dead list handling.
for (GossipMember member : startupMembers) { for (GossipMember member : startupMembers) {
GossipService gossipService = new GossipService(myIpAddress, member.getPort(), "", GossipService gossipService = new GossipService(myIpAddress, member.getPort(), "",
LogLevel.DEBUG, startupMembers, settings); LogLevel.DEBUG, startupMembers, settings, null);
clients.add(gossipService); clients.add(gossipService);
gossipService.start(); gossipService.start();
sleep(settings.getCleanupInterval() + 1000); sleep(settings.getCleanupInterval() + 1000);

View File

@ -1,6 +1,6 @@
package com.google.code.gossip.manager; package com.google.code.gossip.manager;
import java.util.ArrayList; import java.util.List;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -45,7 +45,7 @@ abstract public class ActiveGossipThread implements Runnable {
* Performs the sending of the membership list, after we have incremented our own heartbeat. * Performs the sending of the membership list, after we have incremented our own heartbeat.
*/ */
abstract protected void sendMembershipList(LocalGossipMember me, abstract protected void sendMembershipList(LocalGossipMember me,
ArrayList<LocalGossipMember> memberList); List<LocalGossipMember> memberList);
/** /**
* Abstract method which should be implemented by a subclass. This method should return a member * Abstract method which should be implemented by a subclass. This method should return a member
@ -55,5 +55,5 @@ abstract public class ActiveGossipThread implements Runnable {
* The list of members which are stored in the local list of members. * The list of members which are stored in the local list of members.
* @return The chosen LocalGossipMember to gossip with. * @return The chosen LocalGossipMember to gossip with.
*/ */
abstract protected LocalGossipMember selectPartner(ArrayList<LocalGossipMember> memberList); abstract protected LocalGossipMember selectPartner(List<LocalGossipMember> memberList);
} }

View File

@ -2,6 +2,10 @@ package com.google.code.gossip.manager;
import java.lang.reflect.InvocationTargetException; import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -16,60 +20,46 @@ import com.google.code.gossip.GossipMember;
import com.google.code.gossip.GossipService; import com.google.code.gossip.GossipService;
import com.google.code.gossip.GossipSettings; import com.google.code.gossip.GossipSettings;
import com.google.code.gossip.LocalGossipMember; import com.google.code.gossip.LocalGossipMember;
import com.google.code.gossip.event.GossipListener;
import com.google.code.gossip.event.GossipState;
public abstract class GossipManager extends Thread implements NotificationListener { public abstract class GossipManager extends Thread implements NotificationListener {
public static final Logger LOGGER = Logger.getLogger(GossipManager.class); public static final Logger LOGGER = Logger.getLogger(GossipManager.class);
/** The maximal number of bytes the packet with the GOSSIP may be. (Default is 100 kb) */
public static final int MAX_PACKET_SIZE = 102400; public static final int MAX_PACKET_SIZE = 102400;
/** The list of members which are in the gossip group (not including myself). */ private ConcurrentSkipListMap<LocalGossipMember,GossipState> members;
private ArrayList<LocalGossipMember> _memberList;
/** The list of members which are known to be dead. */
private ArrayList<LocalGossipMember> _deadList;
/** The member I am representing. */
private LocalGossipMember _me; private LocalGossipMember _me;
/** The settings for gossiping. */
private GossipSettings _settings; private GossipSettings _settings;
/** A boolean whether the gossip service should keep running. */
private AtomicBoolean _gossipServiceRunning; private AtomicBoolean _gossipServiceRunning;
/** A ExecutorService used for executing the active and passive gossip threads. */
private ExecutorService _gossipThreadExecutor; private ExecutorService _gossipThreadExecutor;
private Class<? extends PassiveGossipThread> _passiveGossipThreadClass; private Class<? extends PassiveGossipThread> _passiveGossipThreadClass;
private PassiveGossipThread passiveGossipThread; private PassiveGossipThread passiveGossipThread;
private Class<? extends ActiveGossipThread> _activeGossipThreadClass; private Class<? extends ActiveGossipThread> _activeGossipThreadClass;
private ActiveGossipThread activeGossipThread; private ActiveGossipThread activeGossipThread;
private GossipListener listener;
public GossipManager(Class<? extends PassiveGossipThread> passiveGossipThreadClass, public GossipManager(Class<? extends PassiveGossipThread> passiveGossipThreadClass,
Class<? extends ActiveGossipThread> activeGossipThreadClass, String address, int port, Class<? extends ActiveGossipThread> activeGossipThreadClass, String address, int port,
String id, GossipSettings settings, ArrayList<GossipMember> gossipMembers) { String id, GossipSettings settings, ArrayList<GossipMember> gossipMembers,
GossipListener listener) {
_passiveGossipThreadClass = passiveGossipThreadClass; _passiveGossipThreadClass = passiveGossipThreadClass;
_activeGossipThreadClass = activeGossipThreadClass; _activeGossipThreadClass = activeGossipThreadClass;
_settings = settings; _settings = settings;
_me = new LocalGossipMember(address, port, id, 0, this, settings.getCleanupInterval()); _me = new LocalGossipMember(address, port, id, 0, this, settings.getCleanupInterval());
_memberList = new ArrayList<LocalGossipMember>(); members = new ConcurrentSkipListMap<>();
_deadList = new ArrayList<LocalGossipMember>();
for (GossipMember startupMember : gossipMembers) { for (GossipMember startupMember : gossipMembers) {
if (!startupMember.equals(_me)) { if (!startupMember.equals(_me)) {
LocalGossipMember member = new LocalGossipMember(startupMember.getHost(), LocalGossipMember member = new LocalGossipMember(startupMember.getHost(),
startupMember.getPort(), startupMember.getId(), 0, this, startupMember.getPort(), startupMember.getId(), 0, this,
settings.getCleanupInterval()); settings.getCleanupInterval());
_memberList.add(member); members.put(member, GossipState.UP);
GossipService.LOGGER.debug(member); GossipService.LOGGER.debug(member);
} }
} }
_gossipServiceRunning = new AtomicBoolean(true); _gossipServiceRunning = new AtomicBoolean(true);
this.listener = listener;
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
public void run() { public void run() {
GossipService.LOGGER.info("Service has been shutdown..."); GossipService.LOGGER.info("Service has been shutdown...");
@ -85,11 +75,16 @@ public abstract class GossipManager extends Thread implements NotificationListen
public void handleNotification(Notification notification, Object handback) { public void handleNotification(Notification notification, Object handback) {
LocalGossipMember deadMember = (LocalGossipMember) notification.getUserData(); LocalGossipMember deadMember = (LocalGossipMember) notification.getUserData();
GossipService.LOGGER.info("Dead member detected: " + deadMember); GossipService.LOGGER.info("Dead member detected: " + deadMember);
synchronized (this._memberList) { members.put(deadMember, GossipState.DOWN);
this._memberList.remove(deadMember); if (listener != null) {
listener.gossipEvent(deadMember, GossipState.DOWN);
} }
synchronized (this._deadList) { }
this._deadList.add(deadMember);
public void createOrRevivieMember(LocalGossipMember m){
members.put(m, GossipState.UP);
if (listener != null) {
listener.gossipEvent(m, GossipState.UP);
} }
} }
@ -97,21 +92,28 @@ public abstract class GossipManager extends Thread implements NotificationListen
return _settings; return _settings;
} }
/** public List<LocalGossipMember> getMemberList() {
* Get a clone of the memberlist. List<LocalGossipMember> up = new ArrayList<>();
* for (Entry<LocalGossipMember, GossipState> entry : members.entrySet()){
* @return if (GossipState.UP.equals(entry.getValue())){
*/ up.add(entry.getKey());
public ArrayList<LocalGossipMember> getMemberList() { }
return _memberList; }
return Collections.unmodifiableList(up);
} }
public LocalGossipMember getMyself() { public LocalGossipMember getMyself() {
return _me; return _me;
} }
public ArrayList<LocalGossipMember> getDeadList() { public List<LocalGossipMember> getDeadList() {
return _deadList; List<LocalGossipMember> up = new ArrayList<>();
for (Entry<LocalGossipMember, GossipState> entry : members.entrySet()){
if (GossipState.DOWN.equals(entry.getValue())){
up.add(entry.getKey());
}
}
return Collections.unmodifiableList(up);
} }
/** /**
@ -121,7 +123,7 @@ public abstract class GossipManager extends Thread implements NotificationListen
* @throws InterruptedException * @throws InterruptedException
*/ */
public void run() { public void run() {
for (LocalGossipMember member : _memberList) { for (LocalGossipMember member : members.keySet()) {
if (member != _me) { if (member != _me) {
member.startTimeoutTimer(); member.startTimeoutTimer();
} }

View File

@ -7,8 +7,10 @@ import java.net.InetSocketAddress;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.net.SocketException; import java.net.SocketException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.log4j.Logger;
import org.json.JSONArray; import org.json.JSONArray;
import org.json.JSONException; import org.json.JSONException;
import org.json.JSONObject; import org.json.JSONObject;
@ -25,6 +27,8 @@ import com.google.code.gossip.RemoteGossipMember;
*/ */
abstract public class PassiveGossipThread implements Runnable { abstract public class PassiveGossipThread implements Runnable {
public static final Logger LOGGER = Logger.getLogger(PassiveGossipThread.class);
/** The socket used for the passive thread of the gossip service. */ /** The socket used for the passive thread of the gossip service. */
private DatagramSocket _server; private DatagramSocket _server;
@ -106,8 +110,6 @@ abstract public class PassiveGossipThread implements Runnable {
} }
} }
// Merge our list with the one we just received
mergeLists(_gossipManager, senderMember, remoteGossipMembers); mergeLists(_gossipManager, senderMember, remoteGossipMembers);
} catch (JSONException e) { } catch (JSONException e) {
GossipService.LOGGER GossipService.LOGGER
@ -121,7 +123,7 @@ abstract public class PassiveGossipThread implements Runnable {
} }
} catch (IOException e) { } catch (IOException e) {
e.printStackTrace(); GossipService.LOGGER.error(e);
_keepRunning.set(false); _keepRunning.set(false);
} }
} }
@ -144,5 +146,5 @@ abstract public class PassiveGossipThread implements Runnable {
* The list of members known at the remote side. * The list of members known at the remote side.
*/ */
abstract protected void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember, abstract protected void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember,
ArrayList<GossipMember> remoteList); List<GossipMember> remoteList);
} }

View File

@ -1,6 +1,6 @@
package com.google.code.gossip.manager.impl; package com.google.code.gossip.manager.impl;
import java.util.ArrayList; import java.util.List;
import com.google.code.gossip.GossipMember; import com.google.code.gossip.GossipMember;
import com.google.code.gossip.GossipService; import com.google.code.gossip.GossipService;
@ -23,35 +23,21 @@ public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread
* @param remoteList * @param remoteList
*/ */
protected void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember, protected void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember,
ArrayList<GossipMember> remoteList) { List<GossipMember> remoteList) {
synchronized (gossipManager.getDeadList()) {
synchronized (gossipManager.getMemberList()) {
for (GossipMember remoteMember : remoteList) { for (GossipMember remoteMember : remoteList) {
// Skip myself. We don't want ourselves in the local member list. // Skip myself. We don't want ourselves in the local member list.
if (!remoteMember.equals(gossipManager.getMyself())) { if (remoteMember.equals(gossipManager.getMyself())) {
continue;
}
if (gossipManager.getMemberList().contains(remoteMember)) { if (gossipManager.getMemberList().contains(remoteMember)) {
GossipService.LOGGER.debug("The local list already contains the remote member ("
+ remoteMember + ").");
// The local memberlist contains the remote member.
LocalGossipMember localMember = gossipManager.getMemberList().get( LocalGossipMember localMember = gossipManager.getMemberList().get(
gossipManager.getMemberList().indexOf(remoteMember)); gossipManager.getMemberList().indexOf(remoteMember));
// Let's synchronize it's heartbeat.
if (remoteMember.getHeartbeat() > localMember.getHeartbeat()) { if (remoteMember.getHeartbeat() > localMember.getHeartbeat()) {
// update local list with latest heartbeat
localMember.setHeartbeat(remoteMember.getHeartbeat()); localMember.setHeartbeat(remoteMember.getHeartbeat());
// and reset the timeout of that member
localMember.resetTimeoutTimer(); localMember.resetTimeoutTimer();
} }
// TODO: Otherwise, should we inform the other when the heartbeat is already higher?
} else { } else {
// The local list does not contain the remote member.
GossipService.LOGGER.debug("The local list does not contain the remote member ("
+ remoteMember + ").");
// The remote member is either brand new, or a previously declared dead member. // The remote member is either brand new, or a previously declared dead member.
// If its dead, check the heartbeat because it may have come back from the dead. // If its dead, check the heartbeat because it may have come back from the dead.
if (gossipManager.getDeadList().contains(remoteMember)) { if (gossipManager.getDeadList().contains(remoteMember)) {
@ -80,13 +66,14 @@ public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread
.debug("The remote member is back from the dead. We will remove it from the dead list and add it as a new member."); .debug("The remote member is back from the dead. We will remove it from the dead list and add it as a new member.");
// The remote member is back from the dead. // The remote member is back from the dead.
// Remove it from the dead list. // Remove it from the dead list.
gossipManager.getDeadList().remove(localDeadMember); //gossipManager.getDeadList().remove(localDeadMember);
// Add it as a new member and add it to the member list. // Add it as a new member and add it to the member list.
LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getHost(), LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getHost(),
remoteMember.getPort(), remoteMember.getId(), remoteMember.getPort(), remoteMember.getId(),
remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings() remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings()
.getCleanupInterval()); .getCleanupInterval());
gossipManager.getMemberList().add(newLocalMember); //gossipManager.getMemberList().add(newLocalMember);
gossipManager.createOrRevivieMember(newLocalMember);
newLocalMember.startTimeoutTimer(); newLocalMember.startTimeoutTimer();
GossipService.LOGGER.info("Removed remote member " + remoteMember.getAddress() GossipService.LOGGER.info("Removed remote member " + remoteMember.getAddress()
+ " from dead list and added to local member list."); + " from dead list and added to local member list.");
@ -96,7 +83,7 @@ public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread
LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getHost(), LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getHost(),
remoteMember.getPort(), remoteMember.getId(), remoteMember.getHeartbeat(), remoteMember.getPort(), remoteMember.getId(), remoteMember.getHeartbeat(),
gossipManager, gossipManager.getSettings().getCleanupInterval()); gossipManager, gossipManager.getSettings().getCleanupInterval());
gossipManager.getMemberList().add(newLocalMember); gossipManager.createOrRevivieMember(newLocalMember);
newLocalMember.startTimeoutTimer(); newLocalMember.startTimeoutTimer();
GossipService.LOGGER.info("Added new remote member " + remoteMember.getAddress() GossipService.LOGGER.info("Added new remote member " + remoteMember.getAddress()
+ " to local member list."); + " to local member list.");
@ -104,8 +91,7 @@ public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread
} }
} }
} }
}
}
}
} }

View File

@ -5,7 +5,7 @@ import java.net.DatagramPacket;
import java.net.DatagramSocket; import java.net.DatagramSocket;
import java.net.InetAddress; import java.net.InetAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.List;
import org.json.JSONArray; import org.json.JSONArray;
@ -23,7 +23,7 @@ abstract public class SendMembersActiveGossipThread extends ActiveGossipThread {
/** /**
* Performs the sending of the membership list, after we have incremented our own heartbeat. * Performs the sending of the membership list, after we have incremented our own heartbeat.
*/ */
protected void sendMembershipList(LocalGossipMember me, ArrayList<LocalGossipMember> memberList) { protected void sendMembershipList(LocalGossipMember me, List<LocalGossipMember> memberList) {
GossipService.LOGGER.debug("Send sendMembershipList() is called."); GossipService.LOGGER.debug("Send sendMembershipList() is called.");
me.setHeartbeat(me.getHeartbeat() + 1); me.setHeartbeat(me.getHeartbeat() + 1);
synchronized (memberList) { synchronized (memberList) {

View File

@ -1,6 +1,6 @@
package com.google.code.gossip.manager.random; package com.google.code.gossip.manager.random;
import java.util.ArrayList; import java.util.List;
import java.util.Random; import java.util.Random;
import com.google.code.gossip.GossipService; import com.google.code.gossip.GossipService;
@ -24,7 +24,7 @@ public class RandomActiveGossipThread extends SendMembersActiveGossipThread {
* *
* @return Member random member if list is greater than 1, null otherwise * @return Member random member if list is greater than 1, null otherwise
*/ */
protected LocalGossipMember selectPartner(ArrayList<LocalGossipMember> memberList) { protected LocalGossipMember selectPartner(List<LocalGossipMember> memberList) {
LocalGossipMember member = null; LocalGossipMember member = null;
if (memberList.size() > 0) { if (memberList.size() > 0) {
int randomNeighborIndex = _random.nextInt(memberList.size()); int randomNeighborIndex = _random.nextInt(memberList.size());

View File

@ -4,13 +4,14 @@ import java.util.ArrayList;
import com.google.code.gossip.GossipMember; import com.google.code.gossip.GossipMember;
import com.google.code.gossip.GossipSettings; import com.google.code.gossip.GossipSettings;
import com.google.code.gossip.event.GossipListener;
import com.google.code.gossip.manager.GossipManager; import com.google.code.gossip.manager.GossipManager;
import com.google.code.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread; import com.google.code.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread;
public class RandomGossipManager extends GossipManager { public class RandomGossipManager extends GossipManager {
public RandomGossipManager(String address, int port, String id, GossipSettings settings, public RandomGossipManager(String address, int port, String id, GossipSettings settings,
ArrayList<GossipMember> gossipMembers) { ArrayList<GossipMember> gossipMembers, GossipListener listener) {
super(OnlyProcessReceivedPassiveGossipThread.class, RandomActiveGossipThread.class, address, super(OnlyProcessReceivedPassiveGossipThread.class, RandomActiveGossipThread.class, address,
port, id, settings, gossipMembers); port, id, settings, gossipMembers, listener);
} }
} }

View File

@ -12,6 +12,8 @@ import com.google.code.gossip.GossipService;
import com.google.code.gossip.GossipSettings; import com.google.code.gossip.GossipSettings;
import com.google.code.gossip.LogLevel; import com.google.code.gossip.LogLevel;
import com.google.code.gossip.RemoteGossipMember; import com.google.code.gossip.RemoteGossipMember;
import com.google.code.gossip.event.GossipListener;
import com.google.code.gossip.event.GossipState;
public class TenNodeThreeSeedTest { public class TenNodeThreeSeedTest {
@ -36,14 +38,20 @@ public class TenNodeThreeSeedTest {
ArrayList<GossipService> clients = new ArrayList<GossipService>(); ArrayList<GossipService> clients = new ArrayList<GossipService>();
int clusterMembers = 5; int clusterMembers = 5;
for (int i = 1; i < clusterMembers+1; ++i) { for (int i = 1; i < clusterMembers+1; ++i) {
GossipService gossipService = new GossipService("127.0.0."+i, 2000, i+"", LogLevel.DEBUG, startupMembers, settings); GossipService gossipService = new GossipService("127.0.0." + i, 2000, i + "", LogLevel.DEBUG,
startupMembers, settings,
new GossipListener(){
@Override
public void gossipEvent(GossipMember member, GossipState state) {
System.out.println(member+" "+ state);
}
});
clients.add(gossipService); clients.add(gossipService);
gossipService.start(); gossipService.start();
Thread.sleep(1000); Thread.sleep(1000);
} }
Thread.sleep(10000); Thread.sleep(10000);
for (int i = 0; i < clusterMembers; ++i) { for (int i = 0; i < clusterMembers; ++i) {
System.out.println(clients.get(i).get_gossipManager().getMemberList());
Assert.assertEquals(4, clients.get(i).get_gossipManager().getMemberList().size()); Assert.assertEquals(4, clients.get(i).get_gossipManager().getMemberList().size());
} }
for (int i = 0; i < clusterMembers; ++i) { for (int i = 0; i < clusterMembers; ++i) {