From ac830389327508688c664a01463d1ddfa3fb6721 Mon Sep 17 00:00:00 2001 From: pxsalehi Date: Mon, 21 Aug 2017 11:58:15 +0200 Subject: [PATCH] GOSSIP-38 Multiple async GossipListeners --- .../org/apache/gossip/GossipSettings.java | 2 +- .../main/java/org/apache/gossip/Member.java | 2 +- .../apache/gossip/manager/GossipManager.java | 7 ++- .../gossip/manager/GossipManagerBuilder.java | 3 +- .../manager/GossipMemberStateRefresher.java | 50 ++++++++++++++++--- ...ssipper.java => SimpleActiveGossiper.java} | 6 +-- .../gossip/transport/TransportManager.java | 2 +- 7 files changed, 57 insertions(+), 15 deletions(-) rename gossip-base/src/main/java/org/apache/gossip/manager/{SimpleActiveGossipper.java => SimpleActiveGossiper.java} (95%) diff --git a/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java b/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java index 792af85..32c00c9 100644 --- a/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java +++ b/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java @@ -43,7 +43,7 @@ public class GossipSettings { private String distribution = "normal"; - private String activeGossipClass = "org.apache.gossip.manager.SimpleActiveGossipper"; + private String activeGossipClass = "org.apache.gossip.manager.SimpleActiveGossiper"; private String transportManagerClass = "org.apache.gossip.transport.udp.UdpTransportManager"; private String protocolManagerClass = "org.apache.gossip.protocol.json.JacksonProtocolManager"; diff --git a/gossip-base/src/main/java/org/apache/gossip/Member.java b/gossip-base/src/main/java/org/apache/gossip/Member.java index d04a7b6..54a6737 100644 --- a/gossip-base/src/main/java/org/apache/gossip/Member.java +++ b/gossip-base/src/main/java/org/apache/gossip/Member.java @@ -22,7 +22,7 @@ import java.net.URI; import java.util.Map; /** - * A abstract class representing a gossip member. + * An abstract class representing a gossip member. * */ public abstract class Member implements Comparable { diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java b/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java index d839b2e..db442c6 100644 --- a/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java +++ b/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java @@ -185,7 +185,7 @@ public abstract class GossipManager { if (settings.isPersistDataState()) { scheduledServiced.scheduleAtFixedRate(userDataState, 60, 60, TimeUnit.SECONDS); } - scheduledServiced.scheduleAtFixedRate(memberStateRefresher, 0, 100, TimeUnit.MILLISECONDS); + memberStateRefresher.init(); LOGGER.debug("The GossipManager is started."); } @@ -224,6 +224,7 @@ public abstract class GossipManager { gossipCore.shutdown(); transportManager.shutdown(); dataReaper.close(); + memberStateRefresher.shutdown(); scheduledServiced.shutdown(); try { scheduledServiced.awaitTermination(1, TimeUnit.SECONDS); @@ -366,4 +367,8 @@ public abstract class GossipManager { public void unregisterSharedDataSubscriber(UpdateSharedDataEventHandler handler){ gossipCore.unregisterSharedDataSubscriber(handler); } + + public void registerGossipListener(GossipListener listener) { + memberStateRefresher.register(listener); + } } diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/GossipManagerBuilder.java b/gossip-base/src/main/java/org/apache/gossip/manager/GossipManagerBuilder.java index 86dca57..f3ca23a 100644 --- a/gossip-base/src/main/java/org/apache/gossip/manager/GossipManagerBuilder.java +++ b/gossip-base/src/main/java/org/apache/gossip/manager/GossipManagerBuilder.java @@ -18,10 +18,11 @@ package org.apache.gossip.manager; import com.codahale.metrics.MetricRegistry; -import org.apache.gossip.Member; import org.apache.gossip.GossipSettings; +import org.apache.gossip.Member; import org.apache.gossip.StartupSettings; import org.apache.gossip.event.GossipListener; +import org.apache.gossip.event.GossipState; import org.apache.gossip.manager.handlers.MessageHandler; import org.apache.gossip.manager.handlers.MessageHandlerFactory; diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/GossipMemberStateRefresher.java b/gossip-base/src/main/java/org/apache/gossip/manager/GossipMemberStateRefresher.java index 1836309..652bf5c 100644 --- a/gossip-base/src/main/java/org/apache/gossip/manager/GossipMemberStateRefresher.java +++ b/gossip-base/src/main/java/org/apache/gossip/manager/GossipMemberStateRefresher.java @@ -26,27 +26,40 @@ import org.apache.gossip.model.PerNodeDataMessage; import org.apache.gossip.model.ShutdownMessage; import org.apache.log4j.Logger; +import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.function.BiFunction; -public class GossipMemberStateRefresher implements Runnable { +public class GossipMemberStateRefresher { public static final Logger LOGGER = Logger.getLogger(GossipMemberStateRefresher.class); private final Map members; private final GossipSettings settings; - private final GossipListener listener; + private final List listeners = new CopyOnWriteArrayList<>(); private final Clock clock; private final BiFunction findPerNodeGossipData; + private final ExecutorService listenerExecutor; + private final ScheduledExecutorService scheduledExecutor; + private final BlockingQueue workQueue; public GossipMemberStateRefresher(Map members, GossipSettings settings, - GossipListener listener, BiFunction findPerNodeGossipData) { + GossipListener listener, + BiFunction findPerNodeGossipData) { this.members = members; this.settings = settings; - this.listener = listener; + listeners.add(listener); this.findPerNodeGossipData = findPerNodeGossipData; clock = new SystemClock(); + workQueue = new ArrayBlockingQueue<>(1024); + listenerExecutor = new ThreadPoolExecutor(1, 20, 1, TimeUnit.SECONDS, workQueue, + new ThreadPoolExecutor.DiscardOldestPolicy()); + scheduledExecutor = Executors.newScheduledThreadPool(1); + } + + public void init() { + scheduledExecutor.scheduleAtFixedRate(() -> run(), 0, 100, TimeUnit.MILLISECONDS); } public void run() { @@ -74,7 +87,9 @@ public class GossipMemberStateRefresher implements Runnable { if (entry.getValue() != requiredState) { members.put(entry.getKey(), requiredState); - listener.gossipEvent(entry.getKey(), requiredState); + /* Call listeners asynchronously */ + for (GossipListener listener: listeners) + listenerExecutor.execute(() -> listener.gossipEvent(entry.getKey(), requiredState)); } } } @@ -112,10 +127,31 @@ public class GossipMemberStateRefresher implements Runnable { if (s.getShutdownAtNanos() > l.getKey().getHeartbeat()) { members.put(l.getKey(), GossipState.DOWN); if (l.getValue() == GossipState.UP) { - listener.gossipEvent(l.getKey(), GossipState.DOWN); + for (GossipListener listener: listeners) + listenerExecutor.execute(() -> listener.gossipEvent(l.getKey(), GossipState.DOWN)); } return true; } return false; } + + public void register(GossipListener listener) { + listeners.add(listener); + } + + public void shutdown() { + scheduledExecutor.shutdown(); + try { + scheduledExecutor.awaitTermination(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOGGER.debug("Issue during shutdown", e); + } + listenerExecutor.shutdown(); + try { + listenerExecutor.awaitTermination(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOGGER.debug("Issue during shutdown", e); + } + listenerExecutor.shutdownNow(); + } } \ No newline at end of file diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/SimpleActiveGossipper.java b/gossip-base/src/main/java/org/apache/gossip/manager/SimpleActiveGossiper.java similarity index 95% rename from gossip-base/src/main/java/org/apache/gossip/manager/SimpleActiveGossipper.java rename to gossip-base/src/main/java/org/apache/gossip/manager/SimpleActiveGossiper.java index e47fe2a..7d498b4 100644 --- a/gossip-base/src/main/java/org/apache/gossip/manager/SimpleActiveGossipper.java +++ b/gossip-base/src/main/java/org/apache/gossip/manager/SimpleActiveGossiper.java @@ -33,14 +33,14 @@ import com.codahale.metrics.MetricRegistry; * Base implementation gossips randomly to live nodes periodically gossips to dead ones * */ -public class SimpleActiveGossipper extends AbstractActiveGossiper { +public class SimpleActiveGossiper extends AbstractActiveGossiper { private ScheduledExecutorService scheduledExecutorService; private final BlockingQueue workQueue; private ThreadPoolExecutor threadService; - public SimpleActiveGossipper(GossipManager gossipManager, GossipCore gossipCore, - MetricRegistry registry) { + public SimpleActiveGossiper(GossipManager gossipManager, GossipCore gossipCore, + MetricRegistry registry) { super(gossipManager, gossipCore, registry); scheduledExecutorService = Executors.newScheduledThreadPool(2); workQueue = new ArrayBlockingQueue(1024); diff --git a/gossip-base/src/main/java/org/apache/gossip/transport/TransportManager.java b/gossip-base/src/main/java/org/apache/gossip/transport/TransportManager.java index 031d90e..99354d1 100644 --- a/gossip-base/src/main/java/org/apache/gossip/transport/TransportManager.java +++ b/gossip-base/src/main/java/org/apache/gossip/transport/TransportManager.java @@ -20,7 +20,7 @@ package org.apache.gossip.transport; import java.io.IOException; import java.net.URI; -/** interface for manage that sends and receives messages that have already been serialized. */ +/** interface for manager that sends and receives messages that have already been serialized. */ public interface TransportManager { /** starts the active gossip thread responsible for reaching out to remote nodes. Not related to `startEndpoint()` */