GOSSIP-38 Multiple async GossipListeners
This commit is contained in:
@ -43,7 +43,7 @@ public class GossipSettings {
|
||||
|
||||
private String distribution = "normal";
|
||||
|
||||
private String activeGossipClass = "org.apache.gossip.manager.SimpleActiveGossipper";
|
||||
private String activeGossipClass = "org.apache.gossip.manager.SimpleActiveGossiper";
|
||||
|
||||
private String transportManagerClass = "org.apache.gossip.transport.udp.UdpTransportManager";
|
||||
private String protocolManagerClass = "org.apache.gossip.protocol.json.JacksonProtocolManager";
|
||||
|
@ -22,7 +22,7 @@ import java.net.URI;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* A abstract class representing a gossip member.
|
||||
* An abstract class representing a gossip member.
|
||||
*
|
||||
*/
|
||||
public abstract class Member implements Comparable<Member> {
|
||||
|
@ -185,7 +185,7 @@ public abstract class GossipManager {
|
||||
if (settings.isPersistDataState()) {
|
||||
scheduledServiced.scheduleAtFixedRate(userDataState, 60, 60, TimeUnit.SECONDS);
|
||||
}
|
||||
scheduledServiced.scheduleAtFixedRate(memberStateRefresher, 0, 100, TimeUnit.MILLISECONDS);
|
||||
memberStateRefresher.init();
|
||||
LOGGER.debug("The GossipManager is started.");
|
||||
}
|
||||
|
||||
@ -224,6 +224,7 @@ public abstract class GossipManager {
|
||||
gossipCore.shutdown();
|
||||
transportManager.shutdown();
|
||||
dataReaper.close();
|
||||
memberStateRefresher.shutdown();
|
||||
scheduledServiced.shutdown();
|
||||
try {
|
||||
scheduledServiced.awaitTermination(1, TimeUnit.SECONDS);
|
||||
@ -366,4 +367,8 @@ public abstract class GossipManager {
|
||||
public void unregisterSharedDataSubscriber(UpdateSharedDataEventHandler handler){
|
||||
gossipCore.unregisterSharedDataSubscriber(handler);
|
||||
}
|
||||
|
||||
public void registerGossipListener(GossipListener listener) {
|
||||
memberStateRefresher.register(listener);
|
||||
}
|
||||
}
|
||||
|
@ -18,10 +18,11 @@
|
||||
package org.apache.gossip.manager;
|
||||
|
||||
import com.codahale.metrics.MetricRegistry;
|
||||
import org.apache.gossip.Member;
|
||||
import org.apache.gossip.GossipSettings;
|
||||
import org.apache.gossip.Member;
|
||||
import org.apache.gossip.StartupSettings;
|
||||
import org.apache.gossip.event.GossipListener;
|
||||
import org.apache.gossip.event.GossipState;
|
||||
import org.apache.gossip.manager.handlers.MessageHandler;
|
||||
import org.apache.gossip.manager.handlers.MessageHandlerFactory;
|
||||
|
||||
|
@ -26,27 +26,40 @@ import org.apache.gossip.model.PerNodeDataMessage;
|
||||
import org.apache.gossip.model.ShutdownMessage;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.function.BiFunction;
|
||||
|
||||
public class GossipMemberStateRefresher implements Runnable {
|
||||
public class GossipMemberStateRefresher {
|
||||
public static final Logger LOGGER = Logger.getLogger(GossipMemberStateRefresher.class);
|
||||
|
||||
private final Map<LocalMember, GossipState> members;
|
||||
private final GossipSettings settings;
|
||||
private final GossipListener listener;
|
||||
private final List<GossipListener> listeners = new CopyOnWriteArrayList<>();
|
||||
private final Clock clock;
|
||||
private final BiFunction<String, String, PerNodeDataMessage> findPerNodeGossipData;
|
||||
private final ExecutorService listenerExecutor;
|
||||
private final ScheduledExecutorService scheduledExecutor;
|
||||
private final BlockingQueue<Runnable> workQueue;
|
||||
|
||||
public GossipMemberStateRefresher(Map<LocalMember, GossipState> members, GossipSettings settings,
|
||||
GossipListener listener, BiFunction<String, String, PerNodeDataMessage> findPerNodeGossipData) {
|
||||
GossipListener listener,
|
||||
BiFunction<String, String, PerNodeDataMessage> findPerNodeGossipData) {
|
||||
this.members = members;
|
||||
this.settings = settings;
|
||||
this.listener = listener;
|
||||
listeners.add(listener);
|
||||
this.findPerNodeGossipData = findPerNodeGossipData;
|
||||
clock = new SystemClock();
|
||||
workQueue = new ArrayBlockingQueue<>(1024);
|
||||
listenerExecutor = new ThreadPoolExecutor(1, 20, 1, TimeUnit.SECONDS, workQueue,
|
||||
new ThreadPoolExecutor.DiscardOldestPolicy());
|
||||
scheduledExecutor = Executors.newScheduledThreadPool(1);
|
||||
}
|
||||
|
||||
public void init() {
|
||||
scheduledExecutor.scheduleAtFixedRate(() -> run(), 0, 100, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
public void run() {
|
||||
@ -74,7 +87,9 @@ public class GossipMemberStateRefresher implements Runnable {
|
||||
|
||||
if (entry.getValue() != requiredState) {
|
||||
members.put(entry.getKey(), requiredState);
|
||||
listener.gossipEvent(entry.getKey(), requiredState);
|
||||
/* Call listeners asynchronously */
|
||||
for (GossipListener listener: listeners)
|
||||
listenerExecutor.execute(() -> listener.gossipEvent(entry.getKey(), requiredState));
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -112,10 +127,31 @@ public class GossipMemberStateRefresher implements Runnable {
|
||||
if (s.getShutdownAtNanos() > l.getKey().getHeartbeat()) {
|
||||
members.put(l.getKey(), GossipState.DOWN);
|
||||
if (l.getValue() == GossipState.UP) {
|
||||
listener.gossipEvent(l.getKey(), GossipState.DOWN);
|
||||
for (GossipListener listener: listeners)
|
||||
listenerExecutor.execute(() -> listener.gossipEvent(l.getKey(), GossipState.DOWN));
|
||||
}
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
public void register(GossipListener listener) {
|
||||
listeners.add(listener);
|
||||
}
|
||||
|
||||
public void shutdown() {
|
||||
scheduledExecutor.shutdown();
|
||||
try {
|
||||
scheduledExecutor.awaitTermination(5, TimeUnit.SECONDS);
|
||||
} catch (InterruptedException e) {
|
||||
LOGGER.debug("Issue during shutdown", e);
|
||||
}
|
||||
listenerExecutor.shutdown();
|
||||
try {
|
||||
listenerExecutor.awaitTermination(5, TimeUnit.SECONDS);
|
||||
} catch (InterruptedException e) {
|
||||
LOGGER.debug("Issue during shutdown", e);
|
||||
}
|
||||
listenerExecutor.shutdownNow();
|
||||
}
|
||||
}
|
@ -33,14 +33,14 @@ import com.codahale.metrics.MetricRegistry;
|
||||
* Base implementation gossips randomly to live nodes periodically gossips to dead ones
|
||||
*
|
||||
*/
|
||||
public class SimpleActiveGossipper extends AbstractActiveGossiper {
|
||||
public class SimpleActiveGossiper extends AbstractActiveGossiper {
|
||||
|
||||
private ScheduledExecutorService scheduledExecutorService;
|
||||
private final BlockingQueue<Runnable> workQueue;
|
||||
private ThreadPoolExecutor threadService;
|
||||
|
||||
public SimpleActiveGossipper(GossipManager gossipManager, GossipCore gossipCore,
|
||||
MetricRegistry registry) {
|
||||
public SimpleActiveGossiper(GossipManager gossipManager, GossipCore gossipCore,
|
||||
MetricRegistry registry) {
|
||||
super(gossipManager, gossipCore, registry);
|
||||
scheduledExecutorService = Executors.newScheduledThreadPool(2);
|
||||
workQueue = new ArrayBlockingQueue<Runnable>(1024);
|
@ -20,7 +20,7 @@ package org.apache.gossip.transport;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
|
||||
/** interface for manage that sends and receives messages that have already been serialized. */
|
||||
/** interface for manager that sends and receives messages that have already been serialized. */
|
||||
public interface TransportManager {
|
||||
|
||||
/** starts the active gossip thread responsible for reaching out to remote nodes. Not related to `startEndpoint()` */
|
||||
|
Reference in New Issue
Block a user