diff --git a/src/main/java/com/google/code/gossip/GossipService.java b/src/main/java/com/google/code/gossip/GossipService.java index 00027bc..1f05c25 100644 --- a/src/main/java/com/google/code/gossip/GossipService.java +++ b/src/main/java/com/google/code/gossip/GossipService.java @@ -6,6 +6,7 @@ import java.net.UnknownHostException; import java.util.ArrayList; import org.apache.log4j.Logger; +import com.google.code.gossip.event.GossipListener; import com.google.code.gossip.manager.GossipManager; import com.google.code.gossip.manager.random.RandomGossipManager; @@ -30,7 +31,7 @@ public class GossipService { UnknownHostException { this(InetAddress.getLocalHost().getHostAddress(), startupSettings.getPort(), "", startupSettings.getLogLevel(), startupSettings.getGossipMembers(), startupSettings - .getGossipSettings()); + .getGossipSettings(), null); } /** @@ -41,9 +42,9 @@ public class GossipService { * @throws UnknownHostException */ public GossipService(String ipAddress, int port, String id, int logLevel, - ArrayList gossipMembers, GossipSettings settings) + ArrayList gossipMembers, GossipSettings settings, GossipListener listener) throws InterruptedException, UnknownHostException { - _gossipManager = new RandomGossipManager(ipAddress, port, id, settings, gossipMembers); + _gossipManager = new RandomGossipManager(ipAddress, port, id, settings, gossipMembers, listener); } public void start() { diff --git a/src/main/java/com/google/code/gossip/event/GossipListener.java b/src/main/java/com/google/code/gossip/event/GossipListener.java new file mode 100644 index 0000000..7d4a462 --- /dev/null +++ b/src/main/java/com/google/code/gossip/event/GossipListener.java @@ -0,0 +1,7 @@ +package com.google.code.gossip.event; + +import com.google.code.gossip.GossipMember; + +public interface GossipListener { + void gossipEvent(GossipMember member, GossipState state); +} diff --git a/src/main/java/com/google/code/gossip/event/GossipState.java b/src/main/java/com/google/code/gossip/event/GossipState.java new file mode 100644 index 0000000..9e5db5f --- /dev/null +++ b/src/main/java/com/google/code/gossip/event/GossipState.java @@ -0,0 +1,10 @@ +package com.google.code.gossip.event; + +public enum GossipState { + UP("up"), DOWN("down"); + private String state; + + private GossipState(String state){ + this.state = state; + } +} diff --git a/src/main/java/com/google/code/gossip/examples/GossipExample.java b/src/main/java/com/google/code/gossip/examples/GossipExample.java index abc28e1..ffcf7ca 100644 --- a/src/main/java/com/google/code/gossip/examples/GossipExample.java +++ b/src/main/java/com/google/code/gossip/examples/GossipExample.java @@ -58,7 +58,7 @@ public class GossipExample extends Thread { // dead list handling. for (GossipMember member : startupMembers) { GossipService gossipService = new GossipService(myIpAddress, member.getPort(), "", - LogLevel.DEBUG, startupMembers, settings); + LogLevel.DEBUG, startupMembers, settings, null); clients.add(gossipService); gossipService.start(); sleep(settings.getCleanupInterval() + 1000); diff --git a/src/main/java/com/google/code/gossip/manager/ActiveGossipThread.java b/src/main/java/com/google/code/gossip/manager/ActiveGossipThread.java index 4cd096d..5823c74 100644 --- a/src/main/java/com/google/code/gossip/manager/ActiveGossipThread.java +++ b/src/main/java/com/google/code/gossip/manager/ActiveGossipThread.java @@ -1,6 +1,5 @@ package com.google.code.gossip.manager; -import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; diff --git a/src/main/java/com/google/code/gossip/manager/GossipManager.java b/src/main/java/com/google/code/gossip/manager/GossipManager.java index 03cc92c..197d624 100644 --- a/src/main/java/com/google/code/gossip/manager/GossipManager.java +++ b/src/main/java/com/google/code/gossip/manager/GossipManager.java @@ -20,13 +20,15 @@ import com.google.code.gossip.GossipMember; import com.google.code.gossip.GossipService; import com.google.code.gossip.GossipSettings; import com.google.code.gossip.LocalGossipMember; +import com.google.code.gossip.event.GossipListener; +import com.google.code.gossip.event.GossipState; public abstract class GossipManager extends Thread implements NotificationListener { public static final Logger LOGGER = Logger.getLogger(GossipManager.class); public static final int MAX_PACKET_SIZE = 102400; - private ConcurrentSkipListMap members; + private ConcurrentSkipListMap members; private LocalGossipMember _me; private GossipSettings _settings; private AtomicBoolean _gossipServiceRunning; @@ -35,10 +37,12 @@ public abstract class GossipManager extends Thread implements NotificationListen private PassiveGossipThread passiveGossipThread; private Class _activeGossipThreadClass; private ActiveGossipThread activeGossipThread; + private GossipListener listener; public GossipManager(Class passiveGossipThreadClass, Class activeGossipThreadClass, String address, int port, - String id, GossipSettings settings, ArrayList gossipMembers) { + String id, GossipSettings settings, ArrayList gossipMembers, + GossipListener listener) { _passiveGossipThreadClass = passiveGossipThreadClass; _activeGossipThreadClass = activeGossipThreadClass; _settings = settings; @@ -49,12 +53,13 @@ public abstract class GossipManager extends Thread implements NotificationListen LocalGossipMember member = new LocalGossipMember(startupMember.getHost(), startupMember.getPort(), startupMember.getId(), 0, this, settings.getCleanupInterval()); - members.put(member, "UP"); + members.put(member, GossipState.UP); GossipService.LOGGER.debug(member); } } _gossipServiceRunning = new AtomicBoolean(true); + this.listener = listener; Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { public void run() { GossipService.LOGGER.info("Service has been shutdown..."); @@ -70,17 +75,27 @@ public abstract class GossipManager extends Thread implements NotificationListen public void handleNotification(Notification notification, Object handback) { LocalGossipMember deadMember = (LocalGossipMember) notification.getUserData(); GossipService.LOGGER.info("Dead member detected: " + deadMember); - members.put(deadMember, "DOWN"); + members.put(deadMember, GossipState.DOWN); + if (listener != null) { + listener.gossipEvent(deadMember, GossipState.DOWN); + } } + public void createOrRevivieMember(LocalGossipMember m){ + members.put(m, GossipState.UP); + if (listener != null) { + listener.gossipEvent(m, GossipState.UP); + } + } + public GossipSettings getSettings() { return _settings; } public List getMemberList() { List up = new ArrayList<>(); - for (Entry entry : members.entrySet()){ - if ("UP".equals(entry.getValue())){ + for (Entry entry : members.entrySet()){ + if (GossipState.UP.equals(entry.getValue())){ up.add(entry.getKey()); } } @@ -91,14 +106,10 @@ public abstract class GossipManager extends Thread implements NotificationListen return _me; } - public void createOrRevivieMember(LocalGossipMember m){ - members.put(m, "UP"); - } - public List getDeadList() { List up = new ArrayList<>(); - for (Entry entry : members.entrySet()){ - if ("DOWN".equals(entry.getValue())){ + for (Entry entry : members.entrySet()){ + if (GossipState.DOWN.equals(entry.getValue())){ up.add(entry.getKey()); } } diff --git a/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java b/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java index 42c202c..314432a 100644 --- a/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java +++ b/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java @@ -10,6 +10,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.log4j.Logger; import org.json.JSONArray; import org.json.JSONException; import org.json.JSONObject; @@ -25,6 +26,8 @@ import com.google.code.gossip.RemoteGossipMember; * determine the incoming message. */ abstract public class PassiveGossipThread implements Runnable { + + public static final Logger LOGGER = Logger.getLogger(PassiveGossipThread.class); /** The socket used for the passive thread of the gossip service. */ private DatagramSocket _server; @@ -107,8 +110,6 @@ abstract public class PassiveGossipThread implements Runnable { } } - - // Merge our list with the one we just received mergeLists(_gossipManager, senderMember, remoteGossipMembers); } catch (JSONException e) { GossipService.LOGGER @@ -122,7 +123,7 @@ abstract public class PassiveGossipThread implements Runnable { } } catch (IOException e) { - e.printStackTrace(); + GossipService.LOGGER.error(e); _keepRunning.set(false); } } diff --git a/src/main/java/com/google/code/gossip/manager/random/RandomActiveGossipThread.java b/src/main/java/com/google/code/gossip/manager/random/RandomActiveGossipThread.java index 1baeb3a..d232f38 100644 --- a/src/main/java/com/google/code/gossip/manager/random/RandomActiveGossipThread.java +++ b/src/main/java/com/google/code/gossip/manager/random/RandomActiveGossipThread.java @@ -1,6 +1,5 @@ package com.google.code.gossip.manager.random; -import java.util.ArrayList; import java.util.List; import java.util.Random; diff --git a/src/main/java/com/google/code/gossip/manager/random/RandomGossipManager.java b/src/main/java/com/google/code/gossip/manager/random/RandomGossipManager.java index 8893ff5..3d028eb 100644 --- a/src/main/java/com/google/code/gossip/manager/random/RandomGossipManager.java +++ b/src/main/java/com/google/code/gossip/manager/random/RandomGossipManager.java @@ -4,13 +4,14 @@ import java.util.ArrayList; import com.google.code.gossip.GossipMember; import com.google.code.gossip.GossipSettings; +import com.google.code.gossip.event.GossipListener; import com.google.code.gossip.manager.GossipManager; import com.google.code.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread; public class RandomGossipManager extends GossipManager { public RandomGossipManager(String address, int port, String id, GossipSettings settings, - ArrayList gossipMembers) { + ArrayList gossipMembers, GossipListener listener) { super(OnlyProcessReceivedPassiveGossipThread.class, RandomActiveGossipThread.class, address, - port, id, settings, gossipMembers); + port, id, settings, gossipMembers, listener); } } diff --git a/src/test/java/io/teknek/gossip/TenNodeThreeSeedTest.java b/src/test/java/io/teknek/gossip/TenNodeThreeSeedTest.java index e30f39a..93f93c5 100644 --- a/src/test/java/io/teknek/gossip/TenNodeThreeSeedTest.java +++ b/src/test/java/io/teknek/gossip/TenNodeThreeSeedTest.java @@ -12,6 +12,8 @@ import com.google.code.gossip.GossipService; import com.google.code.gossip.GossipSettings; import com.google.code.gossip.LogLevel; import com.google.code.gossip.RemoteGossipMember; +import com.google.code.gossip.event.GossipListener; +import com.google.code.gossip.event.GossipState; public class TenNodeThreeSeedTest { @@ -31,19 +33,25 @@ public class TenNodeThreeSeedTest { int seedNodes = 3; ArrayList startupMembers = new ArrayList(); for (int i = 1; i < seedNodes+1; ++i) { - startupMembers.add(new RemoteGossipMember("127.0.0." + i, 2000, i+"")); + startupMembers.add(new RemoteGossipMember("127.0.0." + i, 2000, i + "")); } ArrayList clients = new ArrayList(); int clusterMembers = 5; for (int i = 1; i < clusterMembers+1; ++i) { - GossipService gossipService = new GossipService("127.0.0."+i, 2000, i+"", LogLevel.DEBUG, startupMembers, settings); + GossipService gossipService = new GossipService("127.0.0." + i, 2000, i + "", LogLevel.DEBUG, + startupMembers, settings, + new GossipListener(){ + @Override + public void gossipEvent(GossipMember member, GossipState state) { + System.out.println(member+" "+ state); + } + }); clients.add(gossipService); gossipService.start(); Thread.sleep(1000); } Thread.sleep(10000); for (int i = 0; i < clusterMembers; ++i) { - System.out.println(clients.get(i).get_gossipManager().getMemberList()); Assert.assertEquals(4, clients.get(i).get_gossipManager().getMemberList().size()); } for (int i = 0; i < clusterMembers; ++i) {