GOSSIP-13 Add two way protocol

This commit is contained in:
Edward Capriolo
2016-07-01 21:24:33 -04:00
parent d28d9b321c
commit a045b6f80a
19 changed files with 530 additions and 122 deletions

View File

@ -17,8 +17,6 @@
*/
package org.apache.gossip.manager;
import java.lang.reflect.InvocationTargetException;
import java.net.BindException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
@ -41,6 +39,8 @@ import org.apache.gossip.GossipSettings;
import org.apache.gossip.LocalGossipMember;
import org.apache.gossip.event.GossipListener;
import org.apache.gossip.event.GossipState;
import org.apache.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread;
import org.apache.gossip.manager.random.RandomActiveGossipThread;
public abstract class GossipManager extends Thread implements NotificationListener {
@ -56,10 +56,6 @@ public abstract class GossipManager extends Thread implements NotificationListen
private final AtomicBoolean gossipServiceRunning;
private final Class<? extends PassiveGossipThread> passiveGossipThreadClass;
private final Class<? extends ActiveGossipThread> activeGossipThreadClass;
private final GossipListener listener;
private ActiveGossipThread activeGossipThread;
@ -67,14 +63,15 @@ public abstract class GossipManager extends Thread implements NotificationListen
private PassiveGossipThread passiveGossipThread;
private ExecutorService gossipThreadExecutor;
private GossipCore gossipCore;
public GossipManager(Class<? extends PassiveGossipThread> passiveGossipThreadClass,
Class<? extends ActiveGossipThread> activeGossipThreadClass, String cluster,
public GossipManager(String cluster,
URI uri, String id, GossipSettings settings,
List<GossipMember> gossipMembers, GossipListener listener) {
this.passiveGossipThreadClass = passiveGossipThreadClass;
this.activeGossipThreadClass = activeGossipThreadClass;
this.settings = settings;
this.gossipCore = new GossipCore(this);
me = new LocalGossipMember(cluster, uri, id, System.currentTimeMillis(), this,
settings.getCleanupInterval());
members = new ConcurrentSkipListMap<>();
@ -173,20 +170,10 @@ public abstract class GossipManager extends Thread implements NotificationListen
member.startTimeoutTimer();
}
}
try {
passiveGossipThread = passiveGossipThreadClass.getConstructor(GossipManager.class)
.newInstance(this);
gossipThreadExecutor.execute(passiveGossipThread);
activeGossipThread = activeGossipThreadClass.getConstructor(GossipManager.class)
.newInstance(this);
gossipThreadExecutor.execute(activeGossipThread);
} catch (InstantiationException | IllegalAccessException | IllegalArgumentException
| InvocationTargetException | NoSuchMethodException | SecurityException e1) {
if (e1 instanceof BindException){
LOGGER.fatal("could not bind to "+ me.getUri() + " " + me.getAddress());
}
throw new RuntimeException(e1);
}
passiveGossipThread = new OnlyProcessReceivedPassiveGossipThread(this, gossipCore);
gossipThreadExecutor.execute(passiveGossipThread);
activeGossipThread = new RandomActiveGossipThread(this, this.gossipCore);
gossipThreadExecutor.execute(activeGossipThread);
GossipService.LOGGER.debug("The GossipService is started.");
while (gossipServiceRunning.get()) {
try {
@ -204,6 +191,7 @@ public abstract class GossipManager extends Thread implements NotificationListen
public void shutdown() {
gossipServiceRunning.set(false);
gossipThreadExecutor.shutdown();
gossipCore.shutdown();
if (passiveGossipThread != null) {
passiveGossipThread.shutdown();
}