This commit is contained in:
Edward Capriolo
2015-02-15 23:24:18 -05:00
parent a1c241b780
commit 7ce0700798
10 changed files with 63 additions and 26 deletions

View File

@ -6,6 +6,7 @@ import java.net.UnknownHostException;
import java.util.ArrayList; import java.util.ArrayList;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import com.google.code.gossip.event.GossipListener;
import com.google.code.gossip.manager.GossipManager; import com.google.code.gossip.manager.GossipManager;
import com.google.code.gossip.manager.random.RandomGossipManager; import com.google.code.gossip.manager.random.RandomGossipManager;
@ -30,7 +31,7 @@ public class GossipService {
UnknownHostException { UnknownHostException {
this(InetAddress.getLocalHost().getHostAddress(), startupSettings.getPort(), "", this(InetAddress.getLocalHost().getHostAddress(), startupSettings.getPort(), "",
startupSettings.getLogLevel(), startupSettings.getGossipMembers(), startupSettings startupSettings.getLogLevel(), startupSettings.getGossipMembers(), startupSettings
.getGossipSettings()); .getGossipSettings(), null);
} }
/** /**
@ -41,9 +42,9 @@ public class GossipService {
* @throws UnknownHostException * @throws UnknownHostException
*/ */
public GossipService(String ipAddress, int port, String id, int logLevel, public GossipService(String ipAddress, int port, String id, int logLevel,
ArrayList<GossipMember> gossipMembers, GossipSettings settings) ArrayList<GossipMember> gossipMembers, GossipSettings settings, GossipListener listener)
throws InterruptedException, UnknownHostException { throws InterruptedException, UnknownHostException {
_gossipManager = new RandomGossipManager(ipAddress, port, id, settings, gossipMembers); _gossipManager = new RandomGossipManager(ipAddress, port, id, settings, gossipMembers, listener);
} }
public void start() { public void start() {

View File

@ -0,0 +1,7 @@
package com.google.code.gossip.event;
import com.google.code.gossip.GossipMember;
public interface GossipListener {
void gossipEvent(GossipMember member, GossipState state);
}

View File

@ -0,0 +1,10 @@
package com.google.code.gossip.event;
public enum GossipState {
UP("up"), DOWN("down");
private String state;
private GossipState(String state){
this.state = state;
}
}

View File

@ -58,7 +58,7 @@ public class GossipExample extends Thread {
// dead list handling. // dead list handling.
for (GossipMember member : startupMembers) { for (GossipMember member : startupMembers) {
GossipService gossipService = new GossipService(myIpAddress, member.getPort(), "", GossipService gossipService = new GossipService(myIpAddress, member.getPort(), "",
LogLevel.DEBUG, startupMembers, settings); LogLevel.DEBUG, startupMembers, settings, null);
clients.add(gossipService); clients.add(gossipService);
gossipService.start(); gossipService.start();
sleep(settings.getCleanupInterval() + 1000); sleep(settings.getCleanupInterval() + 1000);

View File

@ -1,6 +1,5 @@
package com.google.code.gossip.manager; package com.google.code.gossip.manager;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;

View File

@ -20,13 +20,15 @@ import com.google.code.gossip.GossipMember;
import com.google.code.gossip.GossipService; import com.google.code.gossip.GossipService;
import com.google.code.gossip.GossipSettings; import com.google.code.gossip.GossipSettings;
import com.google.code.gossip.LocalGossipMember; import com.google.code.gossip.LocalGossipMember;
import com.google.code.gossip.event.GossipListener;
import com.google.code.gossip.event.GossipState;
public abstract class GossipManager extends Thread implements NotificationListener { public abstract class GossipManager extends Thread implements NotificationListener {
public static final Logger LOGGER = Logger.getLogger(GossipManager.class); public static final Logger LOGGER = Logger.getLogger(GossipManager.class);
public static final int MAX_PACKET_SIZE = 102400; public static final int MAX_PACKET_SIZE = 102400;
private ConcurrentSkipListMap<LocalGossipMember,String> members; private ConcurrentSkipListMap<LocalGossipMember,GossipState> members;
private LocalGossipMember _me; private LocalGossipMember _me;
private GossipSettings _settings; private GossipSettings _settings;
private AtomicBoolean _gossipServiceRunning; private AtomicBoolean _gossipServiceRunning;
@ -35,10 +37,12 @@ public abstract class GossipManager extends Thread implements NotificationListen
private PassiveGossipThread passiveGossipThread; private PassiveGossipThread passiveGossipThread;
private Class<? extends ActiveGossipThread> _activeGossipThreadClass; private Class<? extends ActiveGossipThread> _activeGossipThreadClass;
private ActiveGossipThread activeGossipThread; private ActiveGossipThread activeGossipThread;
private GossipListener listener;
public GossipManager(Class<? extends PassiveGossipThread> passiveGossipThreadClass, public GossipManager(Class<? extends PassiveGossipThread> passiveGossipThreadClass,
Class<? extends ActiveGossipThread> activeGossipThreadClass, String address, int port, Class<? extends ActiveGossipThread> activeGossipThreadClass, String address, int port,
String id, GossipSettings settings, ArrayList<GossipMember> gossipMembers) { String id, GossipSettings settings, ArrayList<GossipMember> gossipMembers,
GossipListener listener) {
_passiveGossipThreadClass = passiveGossipThreadClass; _passiveGossipThreadClass = passiveGossipThreadClass;
_activeGossipThreadClass = activeGossipThreadClass; _activeGossipThreadClass = activeGossipThreadClass;
_settings = settings; _settings = settings;
@ -49,12 +53,13 @@ public abstract class GossipManager extends Thread implements NotificationListen
LocalGossipMember member = new LocalGossipMember(startupMember.getHost(), LocalGossipMember member = new LocalGossipMember(startupMember.getHost(),
startupMember.getPort(), startupMember.getId(), 0, this, startupMember.getPort(), startupMember.getId(), 0, this,
settings.getCleanupInterval()); settings.getCleanupInterval());
members.put(member, "UP"); members.put(member, GossipState.UP);
GossipService.LOGGER.debug(member); GossipService.LOGGER.debug(member);
} }
} }
_gossipServiceRunning = new AtomicBoolean(true); _gossipServiceRunning = new AtomicBoolean(true);
this.listener = listener;
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
public void run() { public void run() {
GossipService.LOGGER.info("Service has been shutdown..."); GossipService.LOGGER.info("Service has been shutdown...");
@ -70,17 +75,27 @@ public abstract class GossipManager extends Thread implements NotificationListen
public void handleNotification(Notification notification, Object handback) { public void handleNotification(Notification notification, Object handback) {
LocalGossipMember deadMember = (LocalGossipMember) notification.getUserData(); LocalGossipMember deadMember = (LocalGossipMember) notification.getUserData();
GossipService.LOGGER.info("Dead member detected: " + deadMember); GossipService.LOGGER.info("Dead member detected: " + deadMember);
members.put(deadMember, "DOWN"); members.put(deadMember, GossipState.DOWN);
if (listener != null) {
listener.gossipEvent(deadMember, GossipState.DOWN);
}
} }
public void createOrRevivieMember(LocalGossipMember m){
members.put(m, GossipState.UP);
if (listener != null) {
listener.gossipEvent(m, GossipState.UP);
}
}
public GossipSettings getSettings() { public GossipSettings getSettings() {
return _settings; return _settings;
} }
public List<LocalGossipMember> getMemberList() { public List<LocalGossipMember> getMemberList() {
List<LocalGossipMember> up = new ArrayList<>(); List<LocalGossipMember> up = new ArrayList<>();
for (Entry<LocalGossipMember, String> entry : members.entrySet()){ for (Entry<LocalGossipMember, GossipState> entry : members.entrySet()){
if ("UP".equals(entry.getValue())){ if (GossipState.UP.equals(entry.getValue())){
up.add(entry.getKey()); up.add(entry.getKey());
} }
} }
@ -91,14 +106,10 @@ public abstract class GossipManager extends Thread implements NotificationListen
return _me; return _me;
} }
public void createOrRevivieMember(LocalGossipMember m){
members.put(m, "UP");
}
public List<LocalGossipMember> getDeadList() { public List<LocalGossipMember> getDeadList() {
List<LocalGossipMember> up = new ArrayList<>(); List<LocalGossipMember> up = new ArrayList<>();
for (Entry<LocalGossipMember, String> entry : members.entrySet()){ for (Entry<LocalGossipMember, GossipState> entry : members.entrySet()){
if ("DOWN".equals(entry.getValue())){ if (GossipState.DOWN.equals(entry.getValue())){
up.add(entry.getKey()); up.add(entry.getKey());
} }
} }

View File

@ -10,6 +10,7 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.log4j.Logger;
import org.json.JSONArray; import org.json.JSONArray;
import org.json.JSONException; import org.json.JSONException;
import org.json.JSONObject; import org.json.JSONObject;
@ -25,6 +26,8 @@ import com.google.code.gossip.RemoteGossipMember;
* determine the incoming message. * determine the incoming message.
*/ */
abstract public class PassiveGossipThread implements Runnable { abstract public class PassiveGossipThread implements Runnable {
public static final Logger LOGGER = Logger.getLogger(PassiveGossipThread.class);
/** The socket used for the passive thread of the gossip service. */ /** The socket used for the passive thread of the gossip service. */
private DatagramSocket _server; private DatagramSocket _server;
@ -107,8 +110,6 @@ abstract public class PassiveGossipThread implements Runnable {
} }
} }
// Merge our list with the one we just received
mergeLists(_gossipManager, senderMember, remoteGossipMembers); mergeLists(_gossipManager, senderMember, remoteGossipMembers);
} catch (JSONException e) { } catch (JSONException e) {
GossipService.LOGGER GossipService.LOGGER
@ -122,7 +123,7 @@ abstract public class PassiveGossipThread implements Runnable {
} }
} catch (IOException e) { } catch (IOException e) {
e.printStackTrace(); GossipService.LOGGER.error(e);
_keepRunning.set(false); _keepRunning.set(false);
} }
} }

View File

@ -1,6 +1,5 @@
package com.google.code.gossip.manager.random; package com.google.code.gossip.manager.random;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Random; import java.util.Random;

View File

@ -4,13 +4,14 @@ import java.util.ArrayList;
import com.google.code.gossip.GossipMember; import com.google.code.gossip.GossipMember;
import com.google.code.gossip.GossipSettings; import com.google.code.gossip.GossipSettings;
import com.google.code.gossip.event.GossipListener;
import com.google.code.gossip.manager.GossipManager; import com.google.code.gossip.manager.GossipManager;
import com.google.code.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread; import com.google.code.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread;
public class RandomGossipManager extends GossipManager { public class RandomGossipManager extends GossipManager {
public RandomGossipManager(String address, int port, String id, GossipSettings settings, public RandomGossipManager(String address, int port, String id, GossipSettings settings,
ArrayList<GossipMember> gossipMembers) { ArrayList<GossipMember> gossipMembers, GossipListener listener) {
super(OnlyProcessReceivedPassiveGossipThread.class, RandomActiveGossipThread.class, address, super(OnlyProcessReceivedPassiveGossipThread.class, RandomActiveGossipThread.class, address,
port, id, settings, gossipMembers); port, id, settings, gossipMembers, listener);
} }
} }

View File

@ -12,6 +12,8 @@ import com.google.code.gossip.GossipService;
import com.google.code.gossip.GossipSettings; import com.google.code.gossip.GossipSettings;
import com.google.code.gossip.LogLevel; import com.google.code.gossip.LogLevel;
import com.google.code.gossip.RemoteGossipMember; import com.google.code.gossip.RemoteGossipMember;
import com.google.code.gossip.event.GossipListener;
import com.google.code.gossip.event.GossipState;
public class TenNodeThreeSeedTest { public class TenNodeThreeSeedTest {
@ -31,19 +33,25 @@ public class TenNodeThreeSeedTest {
int seedNodes = 3; int seedNodes = 3;
ArrayList<GossipMember> startupMembers = new ArrayList<GossipMember>(); ArrayList<GossipMember> startupMembers = new ArrayList<GossipMember>();
for (int i = 1; i < seedNodes+1; ++i) { for (int i = 1; i < seedNodes+1; ++i) {
startupMembers.add(new RemoteGossipMember("127.0.0." + i, 2000, i+"")); startupMembers.add(new RemoteGossipMember("127.0.0." + i, 2000, i + ""));
} }
ArrayList<GossipService> clients = new ArrayList<GossipService>(); ArrayList<GossipService> clients = new ArrayList<GossipService>();
int clusterMembers = 5; int clusterMembers = 5;
for (int i = 1; i < clusterMembers+1; ++i) { for (int i = 1; i < clusterMembers+1; ++i) {
GossipService gossipService = new GossipService("127.0.0."+i, 2000, i+"", LogLevel.DEBUG, startupMembers, settings); GossipService gossipService = new GossipService("127.0.0." + i, 2000, i + "", LogLevel.DEBUG,
startupMembers, settings,
new GossipListener(){
@Override
public void gossipEvent(GossipMember member, GossipState state) {
System.out.println(member+" "+ state);
}
});
clients.add(gossipService); clients.add(gossipService);
gossipService.start(); gossipService.start();
Thread.sleep(1000); Thread.sleep(1000);
} }
Thread.sleep(10000); Thread.sleep(10000);
for (int i = 0; i < clusterMembers; ++i) { for (int i = 0; i < clusterMembers; ++i) {
System.out.println(clients.get(i).get_gossipManager().getMemberList());
Assert.assertEquals(4, clients.get(i).get_gossipManager().getMemberList().size()); Assert.assertEquals(4, clients.get(i).get_gossipManager().getMemberList().size());
} }
for (int i = 0; i < clusterMembers; ++i) { for (int i = 0; i < clusterMembers; ++i) {