From a1c241b780e51fc78b41864c85bcfed98c186869 Mon Sep 17 00:00:00 2001 From: Edward Capriolo Date: Sun, 15 Feb 2015 18:57:06 -0500 Subject: [PATCH 1/3] Changed live and dead methods to return immutable lists. changed the underlying datastructure to be a map as opposed to two list --- .../com/google/code/gossip/GossipMember.java | 18 ++--- .../code/gossip/GossipTimeoutTimer.java | 3 - .../gossip/manager/ActiveGossipThread.java | 5 +- .../code/gossip/manager/GossipManager.java | 67 ++++++++----------- .../gossip/manager/PassiveGossipThread.java | 3 +- ...nlyProcessReceivedPassiveGossipThread.java | 36 +++------- .../impl/SendMembersActiveGossipThread.java | 4 +- .../random/RandomActiveGossipThread.java | 3 +- 8 files changed, 55 insertions(+), 84 deletions(-) diff --git a/src/main/java/com/google/code/gossip/GossipMember.java b/src/main/java/com/google/code/gossip/GossipMember.java index e24964e..50e9af3 100644 --- a/src/main/java/com/google/code/gossip/GossipMember.java +++ b/src/main/java/com/google/code/gossip/GossipMember.java @@ -10,25 +10,15 @@ import org.json.JSONObject; * * @author joshclemm, harmenw */ -public abstract class GossipMember { - /** The JSON key for the host property. */ +public abstract class GossipMember implements Comparable{ + public static final String JSON_HOST = "host"; - /** The JSON key for the port property. */ public static final String JSON_PORT = "port"; - /** The JSON key for the heartbeat property. */ public static final String JSON_HEARTBEAT = "heartbeat"; - public static final String JSON_ID = "id"; - - /** The hostname or IP address of this gossip member. */ protected String _host; - - /** The port number of this gossip member. */ protected int _port; - - /** The current heartbeat of this gossip member. */ protected int _heartbeat; - protected String _id; /** @@ -146,4 +136,8 @@ public abstract class GossipMember { throw new RuntimeException(e); } } + + public int compareTo(GossipMember other){ + return this.getAddress().compareTo(other.getAddress()); + } } diff --git a/src/main/java/com/google/code/gossip/GossipTimeoutTimer.java b/src/main/java/com/google/code/gossip/GossipTimeoutTimer.java index 94cc3a4..c3613fb 100644 --- a/src/main/java/com/google/code/gossip/GossipTimeoutTimer.java +++ b/src/main/java/com/google/code/gossip/GossipTimeoutTimer.java @@ -14,10 +14,7 @@ import javax.management.timer.Timer; */ public class GossipTimeoutTimer extends Timer { - /** The amount of time this timer waits before generating a wake-up event. */ private long _sleepTime; - - /** The gossip member this timer is for. */ private LocalGossipMember _source; /** diff --git a/src/main/java/com/google/code/gossip/manager/ActiveGossipThread.java b/src/main/java/com/google/code/gossip/manager/ActiveGossipThread.java index e5ab754..4cd096d 100644 --- a/src/main/java/com/google/code/gossip/manager/ActiveGossipThread.java +++ b/src/main/java/com/google/code/gossip/manager/ActiveGossipThread.java @@ -1,6 +1,7 @@ package com.google.code.gossip.manager; import java.util.ArrayList; +import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -45,7 +46,7 @@ abstract public class ActiveGossipThread implements Runnable { * Performs the sending of the membership list, after we have incremented our own heartbeat. */ abstract protected void sendMembershipList(LocalGossipMember me, - ArrayList memberList); + List memberList); /** * Abstract method which should be implemented by a subclass. This method should return a member @@ -55,5 +56,5 @@ abstract public class ActiveGossipThread implements Runnable { * The list of members which are stored in the local list of members. * @return The chosen LocalGossipMember to gossip with. */ - abstract protected LocalGossipMember selectPartner(ArrayList memberList); + abstract protected LocalGossipMember selectPartner(List memberList); } diff --git a/src/main/java/com/google/code/gossip/manager/GossipManager.java b/src/main/java/com/google/code/gossip/manager/GossipManager.java index 1b76e82..03cc92c 100644 --- a/src/main/java/com/google/code/gossip/manager/GossipManager.java +++ b/src/main/java/com/google/code/gossip/manager/GossipManager.java @@ -2,6 +2,10 @@ package com.google.code.gossip.manager; import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -20,34 +24,16 @@ import com.google.code.gossip.LocalGossipMember; public abstract class GossipManager extends Thread implements NotificationListener { public static final Logger LOGGER = Logger.getLogger(GossipManager.class); - - /** The maximal number of bytes the packet with the GOSSIP may be. (Default is 100 kb) */ public static final int MAX_PACKET_SIZE = 102400; - /** The list of members which are in the gossip group (not including myself). */ - private ArrayList _memberList; - - /** The list of members which are known to be dead. */ - private ArrayList _deadList; - - /** The member I am representing. */ + private ConcurrentSkipListMap members; private LocalGossipMember _me; - - /** The settings for gossiping. */ private GossipSettings _settings; - - /** A boolean whether the gossip service should keep running. */ private AtomicBoolean _gossipServiceRunning; - - /** A ExecutorService used for executing the active and passive gossip threads. */ private ExecutorService _gossipThreadExecutor; - private Class _passiveGossipThreadClass; - private PassiveGossipThread passiveGossipThread; - private Class _activeGossipThreadClass; - private ActiveGossipThread activeGossipThread; public GossipManager(Class passiveGossipThreadClass, @@ -57,14 +43,13 @@ public abstract class GossipManager extends Thread implements NotificationListen _activeGossipThreadClass = activeGossipThreadClass; _settings = settings; _me = new LocalGossipMember(address, port, id, 0, this, settings.getCleanupInterval()); - _memberList = new ArrayList(); - _deadList = new ArrayList(); + members = new ConcurrentSkipListMap<>(); for (GossipMember startupMember : gossipMembers) { if (!startupMember.equals(_me)) { LocalGossipMember member = new LocalGossipMember(startupMember.getHost(), startupMember.getPort(), startupMember.getId(), 0, this, settings.getCleanupInterval()); - _memberList.add(member); + members.put(member, "UP"); GossipService.LOGGER.debug(member); } } @@ -85,33 +70,39 @@ public abstract class GossipManager extends Thread implements NotificationListen public void handleNotification(Notification notification, Object handback) { LocalGossipMember deadMember = (LocalGossipMember) notification.getUserData(); GossipService.LOGGER.info("Dead member detected: " + deadMember); - synchronized (this._memberList) { - this._memberList.remove(deadMember); - } - synchronized (this._deadList) { - this._deadList.add(deadMember); - } + members.put(deadMember, "DOWN"); } public GossipSettings getSettings() { return _settings; } - /** - * Get a clone of the memberlist. - * - * @return - */ - public ArrayList getMemberList() { - return _memberList; + public List getMemberList() { + List up = new ArrayList<>(); + for (Entry entry : members.entrySet()){ + if ("UP".equals(entry.getValue())){ + up.add(entry.getKey()); + } + } + return Collections.unmodifiableList(up); } public LocalGossipMember getMyself() { return _me; } + + public void createOrRevivieMember(LocalGossipMember m){ + members.put(m, "UP"); + } - public ArrayList getDeadList() { - return _deadList; + public List getDeadList() { + List up = new ArrayList<>(); + for (Entry entry : members.entrySet()){ + if ("DOWN".equals(entry.getValue())){ + up.add(entry.getKey()); + } + } + return Collections.unmodifiableList(up); } /** @@ -121,7 +112,7 @@ public abstract class GossipManager extends Thread implements NotificationListen * @throws InterruptedException */ public void run() { - for (LocalGossipMember member : _memberList) { + for (LocalGossipMember member : members.keySet()) { if (member != _me) { member.startTimeoutTimer(); } diff --git a/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java b/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java index 08342b4..42c202c 100644 --- a/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java +++ b/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java @@ -7,6 +7,7 @@ import java.net.InetSocketAddress; import java.net.SocketAddress; import java.net.SocketException; import java.util.ArrayList; +import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import org.json.JSONArray; @@ -144,5 +145,5 @@ abstract public class PassiveGossipThread implements Runnable { * The list of members known at the remote side. */ abstract protected void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember, - ArrayList remoteList); + List remoteList); } \ No newline at end of file diff --git a/src/main/java/com/google/code/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java b/src/main/java/com/google/code/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java index 206d5c5..f0afaf9 100644 --- a/src/main/java/com/google/code/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java +++ b/src/main/java/com/google/code/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java @@ -1,6 +1,6 @@ package com.google.code.gossip.manager.impl; -import java.util.ArrayList; +import java.util.List; import com.google.code.gossip.GossipMember; import com.google.code.gossip.GossipService; @@ -23,35 +23,21 @@ public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread * @param remoteList */ protected void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember, - ArrayList remoteList) { - - synchronized (gossipManager.getDeadList()) { - - synchronized (gossipManager.getMemberList()) { + List remoteList) { for (GossipMember remoteMember : remoteList) { // Skip myself. We don't want ourselves in the local member list. - if (!remoteMember.equals(gossipManager.getMyself())) { + if (remoteMember.equals(gossipManager.getMyself())) { + continue; + } if (gossipManager.getMemberList().contains(remoteMember)) { - GossipService.LOGGER.debug("The local list already contains the remote member (" - + remoteMember + ")."); - // The local memberlist contains the remote member. LocalGossipMember localMember = gossipManager.getMemberList().get( gossipManager.getMemberList().indexOf(remoteMember)); - - // Let's synchronize it's heartbeat. if (remoteMember.getHeartbeat() > localMember.getHeartbeat()) { - // update local list with latest heartbeat localMember.setHeartbeat(remoteMember.getHeartbeat()); - // and reset the timeout of that member localMember.resetTimeoutTimer(); } - // TODO: Otherwise, should we inform the other when the heartbeat is already higher? } else { - // The local list does not contain the remote member. - GossipService.LOGGER.debug("The local list does not contain the remote member (" - + remoteMember + ")."); - // The remote member is either brand new, or a previously declared dead member. // If its dead, check the heartbeat because it may have come back from the dead. if (gossipManager.getDeadList().contains(remoteMember)) { @@ -80,13 +66,14 @@ public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread .debug("The remote member is back from the dead. We will remove it from the dead list and add it as a new member."); // The remote member is back from the dead. // Remove it from the dead list. - gossipManager.getDeadList().remove(localDeadMember); + //gossipManager.getDeadList().remove(localDeadMember); // Add it as a new member and add it to the member list. LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getHost(), remoteMember.getPort(), remoteMember.getId(), remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings() .getCleanupInterval()); - gossipManager.getMemberList().add(newLocalMember); + //gossipManager.getMemberList().add(newLocalMember); + gossipManager.createOrRevivieMember(newLocalMember); newLocalMember.startTimeoutTimer(); GossipService.LOGGER.info("Removed remote member " + remoteMember.getAddress() + " from dead list and added to local member list."); @@ -96,7 +83,7 @@ public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getHost(), remoteMember.getPort(), remoteMember.getId(), remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings().getCleanupInterval()); - gossipManager.getMemberList().add(newLocalMember); + gossipManager.createOrRevivieMember(newLocalMember); newLocalMember.startTimeoutTimer(); GossipService.LOGGER.info("Added new remote member " + remoteMember.getAddress() + " to local member list."); @@ -104,8 +91,7 @@ public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread } } } - } - } - } + + } diff --git a/src/main/java/com/google/code/gossip/manager/impl/SendMembersActiveGossipThread.java b/src/main/java/com/google/code/gossip/manager/impl/SendMembersActiveGossipThread.java index 5235db2..85e4b8a 100644 --- a/src/main/java/com/google/code/gossip/manager/impl/SendMembersActiveGossipThread.java +++ b/src/main/java/com/google/code/gossip/manager/impl/SendMembersActiveGossipThread.java @@ -5,7 +5,7 @@ import java.net.DatagramPacket; import java.net.DatagramSocket; import java.net.InetAddress; import java.nio.ByteBuffer; -import java.util.ArrayList; +import java.util.List; import org.json.JSONArray; @@ -23,7 +23,7 @@ abstract public class SendMembersActiveGossipThread extends ActiveGossipThread { /** * Performs the sending of the membership list, after we have incremented our own heartbeat. */ - protected void sendMembershipList(LocalGossipMember me, ArrayList memberList) { + protected void sendMembershipList(LocalGossipMember me, List memberList) { GossipService.LOGGER.debug("Send sendMembershipList() is called."); me.setHeartbeat(me.getHeartbeat() + 1); synchronized (memberList) { diff --git a/src/main/java/com/google/code/gossip/manager/random/RandomActiveGossipThread.java b/src/main/java/com/google/code/gossip/manager/random/RandomActiveGossipThread.java index 917c362..1baeb3a 100644 --- a/src/main/java/com/google/code/gossip/manager/random/RandomActiveGossipThread.java +++ b/src/main/java/com/google/code/gossip/manager/random/RandomActiveGossipThread.java @@ -1,6 +1,7 @@ package com.google.code.gossip.manager.random; import java.util.ArrayList; +import java.util.List; import java.util.Random; import com.google.code.gossip.GossipService; @@ -24,7 +25,7 @@ public class RandomActiveGossipThread extends SendMembersActiveGossipThread { * * @return Member random member if list is greater than 1, null otherwise */ - protected LocalGossipMember selectPartner(ArrayList memberList) { + protected LocalGossipMember selectPartner(List memberList) { LocalGossipMember member = null; if (memberList.size() > 0) { int randomNeighborIndex = _random.nextInt(memberList.size()); From 7ce0700798304881c4d0af046e14b55d41189891 Mon Sep 17 00:00:00 2001 From: Edward Capriolo Date: Sun, 15 Feb 2015 23:24:18 -0500 Subject: [PATCH 2/3] Callback --- .../com/google/code/gossip/GossipService.java | 7 ++-- .../code/gossip/event/GossipListener.java | 7 ++++ .../google/code/gossip/event/GossipState.java | 10 ++++++ .../code/gossip/examples/GossipExample.java | 2 +- .../gossip/manager/ActiveGossipThread.java | 1 - .../code/gossip/manager/GossipManager.java | 35 ++++++++++++------- .../gossip/manager/PassiveGossipThread.java | 7 ++-- .../random/RandomActiveGossipThread.java | 1 - .../manager/random/RandomGossipManager.java | 5 +-- .../teknek/gossip/TenNodeThreeSeedTest.java | 14 ++++++-- 10 files changed, 63 insertions(+), 26 deletions(-) create mode 100644 src/main/java/com/google/code/gossip/event/GossipListener.java create mode 100644 src/main/java/com/google/code/gossip/event/GossipState.java diff --git a/src/main/java/com/google/code/gossip/GossipService.java b/src/main/java/com/google/code/gossip/GossipService.java index 00027bc..1f05c25 100644 --- a/src/main/java/com/google/code/gossip/GossipService.java +++ b/src/main/java/com/google/code/gossip/GossipService.java @@ -6,6 +6,7 @@ import java.net.UnknownHostException; import java.util.ArrayList; import org.apache.log4j.Logger; +import com.google.code.gossip.event.GossipListener; import com.google.code.gossip.manager.GossipManager; import com.google.code.gossip.manager.random.RandomGossipManager; @@ -30,7 +31,7 @@ public class GossipService { UnknownHostException { this(InetAddress.getLocalHost().getHostAddress(), startupSettings.getPort(), "", startupSettings.getLogLevel(), startupSettings.getGossipMembers(), startupSettings - .getGossipSettings()); + .getGossipSettings(), null); } /** @@ -41,9 +42,9 @@ public class GossipService { * @throws UnknownHostException */ public GossipService(String ipAddress, int port, String id, int logLevel, - ArrayList gossipMembers, GossipSettings settings) + ArrayList gossipMembers, GossipSettings settings, GossipListener listener) throws InterruptedException, UnknownHostException { - _gossipManager = new RandomGossipManager(ipAddress, port, id, settings, gossipMembers); + _gossipManager = new RandomGossipManager(ipAddress, port, id, settings, gossipMembers, listener); } public void start() { diff --git a/src/main/java/com/google/code/gossip/event/GossipListener.java b/src/main/java/com/google/code/gossip/event/GossipListener.java new file mode 100644 index 0000000..7d4a462 --- /dev/null +++ b/src/main/java/com/google/code/gossip/event/GossipListener.java @@ -0,0 +1,7 @@ +package com.google.code.gossip.event; + +import com.google.code.gossip.GossipMember; + +public interface GossipListener { + void gossipEvent(GossipMember member, GossipState state); +} diff --git a/src/main/java/com/google/code/gossip/event/GossipState.java b/src/main/java/com/google/code/gossip/event/GossipState.java new file mode 100644 index 0000000..9e5db5f --- /dev/null +++ b/src/main/java/com/google/code/gossip/event/GossipState.java @@ -0,0 +1,10 @@ +package com.google.code.gossip.event; + +public enum GossipState { + UP("up"), DOWN("down"); + private String state; + + private GossipState(String state){ + this.state = state; + } +} diff --git a/src/main/java/com/google/code/gossip/examples/GossipExample.java b/src/main/java/com/google/code/gossip/examples/GossipExample.java index abc28e1..ffcf7ca 100644 --- a/src/main/java/com/google/code/gossip/examples/GossipExample.java +++ b/src/main/java/com/google/code/gossip/examples/GossipExample.java @@ -58,7 +58,7 @@ public class GossipExample extends Thread { // dead list handling. for (GossipMember member : startupMembers) { GossipService gossipService = new GossipService(myIpAddress, member.getPort(), "", - LogLevel.DEBUG, startupMembers, settings); + LogLevel.DEBUG, startupMembers, settings, null); clients.add(gossipService); gossipService.start(); sleep(settings.getCleanupInterval() + 1000); diff --git a/src/main/java/com/google/code/gossip/manager/ActiveGossipThread.java b/src/main/java/com/google/code/gossip/manager/ActiveGossipThread.java index 4cd096d..5823c74 100644 --- a/src/main/java/com/google/code/gossip/manager/ActiveGossipThread.java +++ b/src/main/java/com/google/code/gossip/manager/ActiveGossipThread.java @@ -1,6 +1,5 @@ package com.google.code.gossip.manager; -import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; diff --git a/src/main/java/com/google/code/gossip/manager/GossipManager.java b/src/main/java/com/google/code/gossip/manager/GossipManager.java index 03cc92c..197d624 100644 --- a/src/main/java/com/google/code/gossip/manager/GossipManager.java +++ b/src/main/java/com/google/code/gossip/manager/GossipManager.java @@ -20,13 +20,15 @@ import com.google.code.gossip.GossipMember; import com.google.code.gossip.GossipService; import com.google.code.gossip.GossipSettings; import com.google.code.gossip.LocalGossipMember; +import com.google.code.gossip.event.GossipListener; +import com.google.code.gossip.event.GossipState; public abstract class GossipManager extends Thread implements NotificationListener { public static final Logger LOGGER = Logger.getLogger(GossipManager.class); public static final int MAX_PACKET_SIZE = 102400; - private ConcurrentSkipListMap members; + private ConcurrentSkipListMap members; private LocalGossipMember _me; private GossipSettings _settings; private AtomicBoolean _gossipServiceRunning; @@ -35,10 +37,12 @@ public abstract class GossipManager extends Thread implements NotificationListen private PassiveGossipThread passiveGossipThread; private Class _activeGossipThreadClass; private ActiveGossipThread activeGossipThread; + private GossipListener listener; public GossipManager(Class passiveGossipThreadClass, Class activeGossipThreadClass, String address, int port, - String id, GossipSettings settings, ArrayList gossipMembers) { + String id, GossipSettings settings, ArrayList gossipMembers, + GossipListener listener) { _passiveGossipThreadClass = passiveGossipThreadClass; _activeGossipThreadClass = activeGossipThreadClass; _settings = settings; @@ -49,12 +53,13 @@ public abstract class GossipManager extends Thread implements NotificationListen LocalGossipMember member = new LocalGossipMember(startupMember.getHost(), startupMember.getPort(), startupMember.getId(), 0, this, settings.getCleanupInterval()); - members.put(member, "UP"); + members.put(member, GossipState.UP); GossipService.LOGGER.debug(member); } } _gossipServiceRunning = new AtomicBoolean(true); + this.listener = listener; Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { public void run() { GossipService.LOGGER.info("Service has been shutdown..."); @@ -70,17 +75,27 @@ public abstract class GossipManager extends Thread implements NotificationListen public void handleNotification(Notification notification, Object handback) { LocalGossipMember deadMember = (LocalGossipMember) notification.getUserData(); GossipService.LOGGER.info("Dead member detected: " + deadMember); - members.put(deadMember, "DOWN"); + members.put(deadMember, GossipState.DOWN); + if (listener != null) { + listener.gossipEvent(deadMember, GossipState.DOWN); + } } + public void createOrRevivieMember(LocalGossipMember m){ + members.put(m, GossipState.UP); + if (listener != null) { + listener.gossipEvent(m, GossipState.UP); + } + } + public GossipSettings getSettings() { return _settings; } public List getMemberList() { List up = new ArrayList<>(); - for (Entry entry : members.entrySet()){ - if ("UP".equals(entry.getValue())){ + for (Entry entry : members.entrySet()){ + if (GossipState.UP.equals(entry.getValue())){ up.add(entry.getKey()); } } @@ -91,14 +106,10 @@ public abstract class GossipManager extends Thread implements NotificationListen return _me; } - public void createOrRevivieMember(LocalGossipMember m){ - members.put(m, "UP"); - } - public List getDeadList() { List up = new ArrayList<>(); - for (Entry entry : members.entrySet()){ - if ("DOWN".equals(entry.getValue())){ + for (Entry entry : members.entrySet()){ + if (GossipState.DOWN.equals(entry.getValue())){ up.add(entry.getKey()); } } diff --git a/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java b/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java index 42c202c..314432a 100644 --- a/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java +++ b/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java @@ -10,6 +10,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.log4j.Logger; import org.json.JSONArray; import org.json.JSONException; import org.json.JSONObject; @@ -25,6 +26,8 @@ import com.google.code.gossip.RemoteGossipMember; * determine the incoming message. */ abstract public class PassiveGossipThread implements Runnable { + + public static final Logger LOGGER = Logger.getLogger(PassiveGossipThread.class); /** The socket used for the passive thread of the gossip service. */ private DatagramSocket _server; @@ -107,8 +110,6 @@ abstract public class PassiveGossipThread implements Runnable { } } - - // Merge our list with the one we just received mergeLists(_gossipManager, senderMember, remoteGossipMembers); } catch (JSONException e) { GossipService.LOGGER @@ -122,7 +123,7 @@ abstract public class PassiveGossipThread implements Runnable { } } catch (IOException e) { - e.printStackTrace(); + GossipService.LOGGER.error(e); _keepRunning.set(false); } } diff --git a/src/main/java/com/google/code/gossip/manager/random/RandomActiveGossipThread.java b/src/main/java/com/google/code/gossip/manager/random/RandomActiveGossipThread.java index 1baeb3a..d232f38 100644 --- a/src/main/java/com/google/code/gossip/manager/random/RandomActiveGossipThread.java +++ b/src/main/java/com/google/code/gossip/manager/random/RandomActiveGossipThread.java @@ -1,6 +1,5 @@ package com.google.code.gossip.manager.random; -import java.util.ArrayList; import java.util.List; import java.util.Random; diff --git a/src/main/java/com/google/code/gossip/manager/random/RandomGossipManager.java b/src/main/java/com/google/code/gossip/manager/random/RandomGossipManager.java index 8893ff5..3d028eb 100644 --- a/src/main/java/com/google/code/gossip/manager/random/RandomGossipManager.java +++ b/src/main/java/com/google/code/gossip/manager/random/RandomGossipManager.java @@ -4,13 +4,14 @@ import java.util.ArrayList; import com.google.code.gossip.GossipMember; import com.google.code.gossip.GossipSettings; +import com.google.code.gossip.event.GossipListener; import com.google.code.gossip.manager.GossipManager; import com.google.code.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread; public class RandomGossipManager extends GossipManager { public RandomGossipManager(String address, int port, String id, GossipSettings settings, - ArrayList gossipMembers) { + ArrayList gossipMembers, GossipListener listener) { super(OnlyProcessReceivedPassiveGossipThread.class, RandomActiveGossipThread.class, address, - port, id, settings, gossipMembers); + port, id, settings, gossipMembers, listener); } } diff --git a/src/test/java/io/teknek/gossip/TenNodeThreeSeedTest.java b/src/test/java/io/teknek/gossip/TenNodeThreeSeedTest.java index e30f39a..93f93c5 100644 --- a/src/test/java/io/teknek/gossip/TenNodeThreeSeedTest.java +++ b/src/test/java/io/teknek/gossip/TenNodeThreeSeedTest.java @@ -12,6 +12,8 @@ import com.google.code.gossip.GossipService; import com.google.code.gossip.GossipSettings; import com.google.code.gossip.LogLevel; import com.google.code.gossip.RemoteGossipMember; +import com.google.code.gossip.event.GossipListener; +import com.google.code.gossip.event.GossipState; public class TenNodeThreeSeedTest { @@ -31,19 +33,25 @@ public class TenNodeThreeSeedTest { int seedNodes = 3; ArrayList startupMembers = new ArrayList(); for (int i = 1; i < seedNodes+1; ++i) { - startupMembers.add(new RemoteGossipMember("127.0.0." + i, 2000, i+"")); + startupMembers.add(new RemoteGossipMember("127.0.0." + i, 2000, i + "")); } ArrayList clients = new ArrayList(); int clusterMembers = 5; for (int i = 1; i < clusterMembers+1; ++i) { - GossipService gossipService = new GossipService("127.0.0."+i, 2000, i+"", LogLevel.DEBUG, startupMembers, settings); + GossipService gossipService = new GossipService("127.0.0." + i, 2000, i + "", LogLevel.DEBUG, + startupMembers, settings, + new GossipListener(){ + @Override + public void gossipEvent(GossipMember member, GossipState state) { + System.out.println(member+" "+ state); + } + }); clients.add(gossipService); gossipService.start(); Thread.sleep(1000); } Thread.sleep(10000); for (int i = 0; i < clusterMembers; ++i) { - System.out.println(clients.get(i).get_gossipManager().getMemberList()); Assert.assertEquals(4, clients.get(i).get_gossipManager().getMemberList().size()); } for (int i = 0; i < clusterMembers; ++i) { From 59c96be9972589758c703da920e25caa24909913 Mon Sep 17 00:00:00 2001 From: Edward Capriolo Date: Sun, 15 Feb 2015 23:31:10 -0500 Subject: [PATCH 3/3] Update readme --- README.md | 23 ++++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 1fea9d4..690e08d 100644 --- a/README.md +++ b/README.md @@ -21,7 +21,7 @@ Here we start five gossip processes and check that they discover each other. (No int clusterMembers = 5; for (int i = 1; i < clusterMembers+1; ++i) { GossipService gossipService = new GossipService("127.0.0." + i, 2000, i + "", - LogLevel.DEBUG, startupMembers, settings); + LogLevel.DEBUG, startupMembers, settings, null); clients.add(gossipService); gossipService.start(); } @@ -33,9 +33,30 @@ Later we can check that the nodes discover each other Assert.assertEquals(4, clients.get(i).get_gossipManager().getMemberList().size()); } +Event Listener +------ + +The status can be polled using the getters that return immutable lists. + + List getMemberList() + public List getDeadList() + +Users can also attach an event listener: + + GossipService gossipService = new GossipService("127.0.0." + i, 2000, i + "", LogLevel.DEBUG, + startupMembers, settings, + new GossipListener(){ + @Override + public void gossipEvent(GossipMember member, GossipState state) { + System.out.println(member+" "+ state); + } + }); + + Maven ------ + You can get this software from maven central.