diff --git a/README.md b/README.md index 1fea9d4..690e08d 100644 --- a/README.md +++ b/README.md @@ -21,7 +21,7 @@ Here we start five gossip processes and check that they discover each other. (No int clusterMembers = 5; for (int i = 1; i < clusterMembers+1; ++i) { GossipService gossipService = new GossipService("127.0.0." + i, 2000, i + "", - LogLevel.DEBUG, startupMembers, settings); + LogLevel.DEBUG, startupMembers, settings, null); clients.add(gossipService); gossipService.start(); } @@ -33,9 +33,30 @@ Later we can check that the nodes discover each other Assert.assertEquals(4, clients.get(i).get_gossipManager().getMemberList().size()); } +Event Listener +------ + +The status can be polled using the getters that return immutable lists. + + List getMemberList() + public List getDeadList() + +Users can also attach an event listener: + + GossipService gossipService = new GossipService("127.0.0." + i, 2000, i + "", LogLevel.DEBUG, + startupMembers, settings, + new GossipListener(){ + @Override + public void gossipEvent(GossipMember member, GossipState state) { + System.out.println(member+" "+ state); + } + }); + + Maven ------ + You can get this software from maven central. diff --git a/src/main/java/com/google/code/gossip/GossipMember.java b/src/main/java/com/google/code/gossip/GossipMember.java index e24964e..50e9af3 100644 --- a/src/main/java/com/google/code/gossip/GossipMember.java +++ b/src/main/java/com/google/code/gossip/GossipMember.java @@ -10,25 +10,15 @@ import org.json.JSONObject; * * @author joshclemm, harmenw */ -public abstract class GossipMember { - /** The JSON key for the host property. */ +public abstract class GossipMember implements Comparable{ + public static final String JSON_HOST = "host"; - /** The JSON key for the port property. */ public static final String JSON_PORT = "port"; - /** The JSON key for the heartbeat property. */ public static final String JSON_HEARTBEAT = "heartbeat"; - public static final String JSON_ID = "id"; - - /** The hostname or IP address of this gossip member. */ protected String _host; - - /** The port number of this gossip member. */ protected int _port; - - /** The current heartbeat of this gossip member. */ protected int _heartbeat; - protected String _id; /** @@ -146,4 +136,8 @@ public abstract class GossipMember { throw new RuntimeException(e); } } + + public int compareTo(GossipMember other){ + return this.getAddress().compareTo(other.getAddress()); + } } diff --git a/src/main/java/com/google/code/gossip/GossipService.java b/src/main/java/com/google/code/gossip/GossipService.java index 00027bc..1f05c25 100644 --- a/src/main/java/com/google/code/gossip/GossipService.java +++ b/src/main/java/com/google/code/gossip/GossipService.java @@ -6,6 +6,7 @@ import java.net.UnknownHostException; import java.util.ArrayList; import org.apache.log4j.Logger; +import com.google.code.gossip.event.GossipListener; import com.google.code.gossip.manager.GossipManager; import com.google.code.gossip.manager.random.RandomGossipManager; @@ -30,7 +31,7 @@ public class GossipService { UnknownHostException { this(InetAddress.getLocalHost().getHostAddress(), startupSettings.getPort(), "", startupSettings.getLogLevel(), startupSettings.getGossipMembers(), startupSettings - .getGossipSettings()); + .getGossipSettings(), null); } /** @@ -41,9 +42,9 @@ public class GossipService { * @throws UnknownHostException */ public GossipService(String ipAddress, int port, String id, int logLevel, - ArrayList gossipMembers, GossipSettings settings) + ArrayList gossipMembers, GossipSettings settings, GossipListener listener) throws InterruptedException, UnknownHostException { - _gossipManager = new RandomGossipManager(ipAddress, port, id, settings, gossipMembers); + _gossipManager = new RandomGossipManager(ipAddress, port, id, settings, gossipMembers, listener); } public void start() { diff --git a/src/main/java/com/google/code/gossip/GossipTimeoutTimer.java b/src/main/java/com/google/code/gossip/GossipTimeoutTimer.java index 94cc3a4..c3613fb 100644 --- a/src/main/java/com/google/code/gossip/GossipTimeoutTimer.java +++ b/src/main/java/com/google/code/gossip/GossipTimeoutTimer.java @@ -14,10 +14,7 @@ import javax.management.timer.Timer; */ public class GossipTimeoutTimer extends Timer { - /** The amount of time this timer waits before generating a wake-up event. */ private long _sleepTime; - - /** The gossip member this timer is for. */ private LocalGossipMember _source; /** diff --git a/src/main/java/com/google/code/gossip/event/GossipListener.java b/src/main/java/com/google/code/gossip/event/GossipListener.java new file mode 100644 index 0000000..7d4a462 --- /dev/null +++ b/src/main/java/com/google/code/gossip/event/GossipListener.java @@ -0,0 +1,7 @@ +package com.google.code.gossip.event; + +import com.google.code.gossip.GossipMember; + +public interface GossipListener { + void gossipEvent(GossipMember member, GossipState state); +} diff --git a/src/main/java/com/google/code/gossip/event/GossipState.java b/src/main/java/com/google/code/gossip/event/GossipState.java new file mode 100644 index 0000000..9e5db5f --- /dev/null +++ b/src/main/java/com/google/code/gossip/event/GossipState.java @@ -0,0 +1,10 @@ +package com.google.code.gossip.event; + +public enum GossipState { + UP("up"), DOWN("down"); + private String state; + + private GossipState(String state){ + this.state = state; + } +} diff --git a/src/main/java/com/google/code/gossip/examples/GossipExample.java b/src/main/java/com/google/code/gossip/examples/GossipExample.java index abc28e1..ffcf7ca 100644 --- a/src/main/java/com/google/code/gossip/examples/GossipExample.java +++ b/src/main/java/com/google/code/gossip/examples/GossipExample.java @@ -58,7 +58,7 @@ public class GossipExample extends Thread { // dead list handling. for (GossipMember member : startupMembers) { GossipService gossipService = new GossipService(myIpAddress, member.getPort(), "", - LogLevel.DEBUG, startupMembers, settings); + LogLevel.DEBUG, startupMembers, settings, null); clients.add(gossipService); gossipService.start(); sleep(settings.getCleanupInterval() + 1000); diff --git a/src/main/java/com/google/code/gossip/manager/ActiveGossipThread.java b/src/main/java/com/google/code/gossip/manager/ActiveGossipThread.java index e5ab754..5823c74 100644 --- a/src/main/java/com/google/code/gossip/manager/ActiveGossipThread.java +++ b/src/main/java/com/google/code/gossip/manager/ActiveGossipThread.java @@ -1,6 +1,6 @@ package com.google.code.gossip.manager; -import java.util.ArrayList; +import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -45,7 +45,7 @@ abstract public class ActiveGossipThread implements Runnable { * Performs the sending of the membership list, after we have incremented our own heartbeat. */ abstract protected void sendMembershipList(LocalGossipMember me, - ArrayList memberList); + List memberList); /** * Abstract method which should be implemented by a subclass. This method should return a member @@ -55,5 +55,5 @@ abstract public class ActiveGossipThread implements Runnable { * The list of members which are stored in the local list of members. * @return The chosen LocalGossipMember to gossip with. */ - abstract protected LocalGossipMember selectPartner(ArrayList memberList); + abstract protected LocalGossipMember selectPartner(List memberList); } diff --git a/src/main/java/com/google/code/gossip/manager/GossipManager.java b/src/main/java/com/google/code/gossip/manager/GossipManager.java index 1b76e82..197d624 100644 --- a/src/main/java/com/google/code/gossip/manager/GossipManager.java +++ b/src/main/java/com/google/code/gossip/manager/GossipManager.java @@ -2,6 +2,10 @@ package com.google.code.gossip.manager; import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -16,60 +20,46 @@ import com.google.code.gossip.GossipMember; import com.google.code.gossip.GossipService; import com.google.code.gossip.GossipSettings; import com.google.code.gossip.LocalGossipMember; +import com.google.code.gossip.event.GossipListener; +import com.google.code.gossip.event.GossipState; public abstract class GossipManager extends Thread implements NotificationListener { public static final Logger LOGGER = Logger.getLogger(GossipManager.class); - - /** The maximal number of bytes the packet with the GOSSIP may be. (Default is 100 kb) */ public static final int MAX_PACKET_SIZE = 102400; - /** The list of members which are in the gossip group (not including myself). */ - private ArrayList _memberList; - - /** The list of members which are known to be dead. */ - private ArrayList _deadList; - - /** The member I am representing. */ + private ConcurrentSkipListMap members; private LocalGossipMember _me; - - /** The settings for gossiping. */ private GossipSettings _settings; - - /** A boolean whether the gossip service should keep running. */ private AtomicBoolean _gossipServiceRunning; - - /** A ExecutorService used for executing the active and passive gossip threads. */ private ExecutorService _gossipThreadExecutor; - private Class _passiveGossipThreadClass; - private PassiveGossipThread passiveGossipThread; - private Class _activeGossipThreadClass; - private ActiveGossipThread activeGossipThread; + private GossipListener listener; public GossipManager(Class passiveGossipThreadClass, Class activeGossipThreadClass, String address, int port, - String id, GossipSettings settings, ArrayList gossipMembers) { + String id, GossipSettings settings, ArrayList gossipMembers, + GossipListener listener) { _passiveGossipThreadClass = passiveGossipThreadClass; _activeGossipThreadClass = activeGossipThreadClass; _settings = settings; _me = new LocalGossipMember(address, port, id, 0, this, settings.getCleanupInterval()); - _memberList = new ArrayList(); - _deadList = new ArrayList(); + members = new ConcurrentSkipListMap<>(); for (GossipMember startupMember : gossipMembers) { if (!startupMember.equals(_me)) { LocalGossipMember member = new LocalGossipMember(startupMember.getHost(), startupMember.getPort(), startupMember.getId(), 0, this, settings.getCleanupInterval()); - _memberList.add(member); + members.put(member, GossipState.UP); GossipService.LOGGER.debug(member); } } _gossipServiceRunning = new AtomicBoolean(true); + this.listener = listener; Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { public void run() { GossipService.LOGGER.info("Service has been shutdown..."); @@ -85,33 +75,45 @@ public abstract class GossipManager extends Thread implements NotificationListen public void handleNotification(Notification notification, Object handback) { LocalGossipMember deadMember = (LocalGossipMember) notification.getUserData(); GossipService.LOGGER.info("Dead member detected: " + deadMember); - synchronized (this._memberList) { - this._memberList.remove(deadMember); - } - synchronized (this._deadList) { - this._deadList.add(deadMember); + members.put(deadMember, GossipState.DOWN); + if (listener != null) { + listener.gossipEvent(deadMember, GossipState.DOWN); } } + public void createOrRevivieMember(LocalGossipMember m){ + members.put(m, GossipState.UP); + if (listener != null) { + listener.gossipEvent(m, GossipState.UP); + } + } + public GossipSettings getSettings() { return _settings; } - /** - * Get a clone of the memberlist. - * - * @return - */ - public ArrayList getMemberList() { - return _memberList; + public List getMemberList() { + List up = new ArrayList<>(); + for (Entry entry : members.entrySet()){ + if (GossipState.UP.equals(entry.getValue())){ + up.add(entry.getKey()); + } + } + return Collections.unmodifiableList(up); } public LocalGossipMember getMyself() { return _me; } - - public ArrayList getDeadList() { - return _deadList; + + public List getDeadList() { + List up = new ArrayList<>(); + for (Entry entry : members.entrySet()){ + if (GossipState.DOWN.equals(entry.getValue())){ + up.add(entry.getKey()); + } + } + return Collections.unmodifiableList(up); } /** @@ -121,7 +123,7 @@ public abstract class GossipManager extends Thread implements NotificationListen * @throws InterruptedException */ public void run() { - for (LocalGossipMember member : _memberList) { + for (LocalGossipMember member : members.keySet()) { if (member != _me) { member.startTimeoutTimer(); } diff --git a/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java b/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java index 08342b4..314432a 100644 --- a/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java +++ b/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java @@ -7,8 +7,10 @@ import java.net.InetSocketAddress; import java.net.SocketAddress; import java.net.SocketException; import java.util.ArrayList; +import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.log4j.Logger; import org.json.JSONArray; import org.json.JSONException; import org.json.JSONObject; @@ -24,6 +26,8 @@ import com.google.code.gossip.RemoteGossipMember; * determine the incoming message. */ abstract public class PassiveGossipThread implements Runnable { + + public static final Logger LOGGER = Logger.getLogger(PassiveGossipThread.class); /** The socket used for the passive thread of the gossip service. */ private DatagramSocket _server; @@ -106,8 +110,6 @@ abstract public class PassiveGossipThread implements Runnable { } } - - // Merge our list with the one we just received mergeLists(_gossipManager, senderMember, remoteGossipMembers); } catch (JSONException e) { GossipService.LOGGER @@ -121,7 +123,7 @@ abstract public class PassiveGossipThread implements Runnable { } } catch (IOException e) { - e.printStackTrace(); + GossipService.LOGGER.error(e); _keepRunning.set(false); } } @@ -144,5 +146,5 @@ abstract public class PassiveGossipThread implements Runnable { * The list of members known at the remote side. */ abstract protected void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember, - ArrayList remoteList); + List remoteList); } \ No newline at end of file diff --git a/src/main/java/com/google/code/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java b/src/main/java/com/google/code/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java index 206d5c5..f0afaf9 100644 --- a/src/main/java/com/google/code/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java +++ b/src/main/java/com/google/code/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java @@ -1,6 +1,6 @@ package com.google.code.gossip.manager.impl; -import java.util.ArrayList; +import java.util.List; import com.google.code.gossip.GossipMember; import com.google.code.gossip.GossipService; @@ -23,35 +23,21 @@ public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread * @param remoteList */ protected void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember, - ArrayList remoteList) { - - synchronized (gossipManager.getDeadList()) { - - synchronized (gossipManager.getMemberList()) { + List remoteList) { for (GossipMember remoteMember : remoteList) { // Skip myself. We don't want ourselves in the local member list. - if (!remoteMember.equals(gossipManager.getMyself())) { + if (remoteMember.equals(gossipManager.getMyself())) { + continue; + } if (gossipManager.getMemberList().contains(remoteMember)) { - GossipService.LOGGER.debug("The local list already contains the remote member (" - + remoteMember + ")."); - // The local memberlist contains the remote member. LocalGossipMember localMember = gossipManager.getMemberList().get( gossipManager.getMemberList().indexOf(remoteMember)); - - // Let's synchronize it's heartbeat. if (remoteMember.getHeartbeat() > localMember.getHeartbeat()) { - // update local list with latest heartbeat localMember.setHeartbeat(remoteMember.getHeartbeat()); - // and reset the timeout of that member localMember.resetTimeoutTimer(); } - // TODO: Otherwise, should we inform the other when the heartbeat is already higher? } else { - // The local list does not contain the remote member. - GossipService.LOGGER.debug("The local list does not contain the remote member (" - + remoteMember + ")."); - // The remote member is either brand new, or a previously declared dead member. // If its dead, check the heartbeat because it may have come back from the dead. if (gossipManager.getDeadList().contains(remoteMember)) { @@ -80,13 +66,14 @@ public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread .debug("The remote member is back from the dead. We will remove it from the dead list and add it as a new member."); // The remote member is back from the dead. // Remove it from the dead list. - gossipManager.getDeadList().remove(localDeadMember); + //gossipManager.getDeadList().remove(localDeadMember); // Add it as a new member and add it to the member list. LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getHost(), remoteMember.getPort(), remoteMember.getId(), remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings() .getCleanupInterval()); - gossipManager.getMemberList().add(newLocalMember); + //gossipManager.getMemberList().add(newLocalMember); + gossipManager.createOrRevivieMember(newLocalMember); newLocalMember.startTimeoutTimer(); GossipService.LOGGER.info("Removed remote member " + remoteMember.getAddress() + " from dead list and added to local member list."); @@ -96,7 +83,7 @@ public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getHost(), remoteMember.getPort(), remoteMember.getId(), remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings().getCleanupInterval()); - gossipManager.getMemberList().add(newLocalMember); + gossipManager.createOrRevivieMember(newLocalMember); newLocalMember.startTimeoutTimer(); GossipService.LOGGER.info("Added new remote member " + remoteMember.getAddress() + " to local member list."); @@ -104,8 +91,7 @@ public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread } } } - } - } - } + + } diff --git a/src/main/java/com/google/code/gossip/manager/impl/SendMembersActiveGossipThread.java b/src/main/java/com/google/code/gossip/manager/impl/SendMembersActiveGossipThread.java index 5235db2..85e4b8a 100644 --- a/src/main/java/com/google/code/gossip/manager/impl/SendMembersActiveGossipThread.java +++ b/src/main/java/com/google/code/gossip/manager/impl/SendMembersActiveGossipThread.java @@ -5,7 +5,7 @@ import java.net.DatagramPacket; import java.net.DatagramSocket; import java.net.InetAddress; import java.nio.ByteBuffer; -import java.util.ArrayList; +import java.util.List; import org.json.JSONArray; @@ -23,7 +23,7 @@ abstract public class SendMembersActiveGossipThread extends ActiveGossipThread { /** * Performs the sending of the membership list, after we have incremented our own heartbeat. */ - protected void sendMembershipList(LocalGossipMember me, ArrayList memberList) { + protected void sendMembershipList(LocalGossipMember me, List memberList) { GossipService.LOGGER.debug("Send sendMembershipList() is called."); me.setHeartbeat(me.getHeartbeat() + 1); synchronized (memberList) { diff --git a/src/main/java/com/google/code/gossip/manager/random/RandomActiveGossipThread.java b/src/main/java/com/google/code/gossip/manager/random/RandomActiveGossipThread.java index 917c362..d232f38 100644 --- a/src/main/java/com/google/code/gossip/manager/random/RandomActiveGossipThread.java +++ b/src/main/java/com/google/code/gossip/manager/random/RandomActiveGossipThread.java @@ -1,6 +1,6 @@ package com.google.code.gossip.manager.random; -import java.util.ArrayList; +import java.util.List; import java.util.Random; import com.google.code.gossip.GossipService; @@ -24,7 +24,7 @@ public class RandomActiveGossipThread extends SendMembersActiveGossipThread { * * @return Member random member if list is greater than 1, null otherwise */ - protected LocalGossipMember selectPartner(ArrayList memberList) { + protected LocalGossipMember selectPartner(List memberList) { LocalGossipMember member = null; if (memberList.size() > 0) { int randomNeighborIndex = _random.nextInt(memberList.size()); diff --git a/src/main/java/com/google/code/gossip/manager/random/RandomGossipManager.java b/src/main/java/com/google/code/gossip/manager/random/RandomGossipManager.java index 8893ff5..3d028eb 100644 --- a/src/main/java/com/google/code/gossip/manager/random/RandomGossipManager.java +++ b/src/main/java/com/google/code/gossip/manager/random/RandomGossipManager.java @@ -4,13 +4,14 @@ import java.util.ArrayList; import com.google.code.gossip.GossipMember; import com.google.code.gossip.GossipSettings; +import com.google.code.gossip.event.GossipListener; import com.google.code.gossip.manager.GossipManager; import com.google.code.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread; public class RandomGossipManager extends GossipManager { public RandomGossipManager(String address, int port, String id, GossipSettings settings, - ArrayList gossipMembers) { + ArrayList gossipMembers, GossipListener listener) { super(OnlyProcessReceivedPassiveGossipThread.class, RandomActiveGossipThread.class, address, - port, id, settings, gossipMembers); + port, id, settings, gossipMembers, listener); } } diff --git a/src/test/java/io/teknek/gossip/TenNodeThreeSeedTest.java b/src/test/java/io/teknek/gossip/TenNodeThreeSeedTest.java index e30f39a..93f93c5 100644 --- a/src/test/java/io/teknek/gossip/TenNodeThreeSeedTest.java +++ b/src/test/java/io/teknek/gossip/TenNodeThreeSeedTest.java @@ -12,6 +12,8 @@ import com.google.code.gossip.GossipService; import com.google.code.gossip.GossipSettings; import com.google.code.gossip.LogLevel; import com.google.code.gossip.RemoteGossipMember; +import com.google.code.gossip.event.GossipListener; +import com.google.code.gossip.event.GossipState; public class TenNodeThreeSeedTest { @@ -31,19 +33,25 @@ public class TenNodeThreeSeedTest { int seedNodes = 3; ArrayList startupMembers = new ArrayList(); for (int i = 1; i < seedNodes+1; ++i) { - startupMembers.add(new RemoteGossipMember("127.0.0." + i, 2000, i+"")); + startupMembers.add(new RemoteGossipMember("127.0.0." + i, 2000, i + "")); } ArrayList clients = new ArrayList(); int clusterMembers = 5; for (int i = 1; i < clusterMembers+1; ++i) { - GossipService gossipService = new GossipService("127.0.0."+i, 2000, i+"", LogLevel.DEBUG, startupMembers, settings); + GossipService gossipService = new GossipService("127.0.0." + i, 2000, i + "", LogLevel.DEBUG, + startupMembers, settings, + new GossipListener(){ + @Override + public void gossipEvent(GossipMember member, GossipState state) { + System.out.println(member+" "+ state); + } + }); clients.add(gossipService); gossipService.start(); Thread.sleep(1000); } Thread.sleep(10000); for (int i = 0; i < clusterMembers; ++i) { - System.out.println(clients.get(i).get_gossipManager().getMemberList()); Assert.assertEquals(4, clients.get(i).get_gossipManager().getMemberList().size()); } for (int i = 0; i < clusterMembers; ++i) {