Changed live and dead methods to return immutable lists. changed the underlying datastructure to be a map as opposed to two list

This commit is contained in:
Edward Capriolo
2015-02-15 18:57:06 -05:00
parent 067ea5813b
commit a1c241b780
8 changed files with 55 additions and 84 deletions

View File

@ -10,25 +10,15 @@ import org.json.JSONObject;
*
* @author joshclemm, harmenw
*/
public abstract class GossipMember {
/** The JSON key for the host property. */
public abstract class GossipMember implements Comparable<GossipMember>{
public static final String JSON_HOST = "host";
/** The JSON key for the port property. */
public static final String JSON_PORT = "port";
/** The JSON key for the heartbeat property. */
public static final String JSON_HEARTBEAT = "heartbeat";
public static final String JSON_ID = "id";
/** The hostname or IP address of this gossip member. */
protected String _host;
/** The port number of this gossip member. */
protected int _port;
/** The current heartbeat of this gossip member. */
protected int _heartbeat;
protected String _id;
/**
@ -146,4 +136,8 @@ public abstract class GossipMember {
throw new RuntimeException(e);
}
}
public int compareTo(GossipMember other){
return this.getAddress().compareTo(other.getAddress());
}
}

View File

@ -14,10 +14,7 @@ import javax.management.timer.Timer;
*/
public class GossipTimeoutTimer extends Timer {
/** The amount of time this timer waits before generating a wake-up event. */
private long _sleepTime;
/** The gossip member this timer is for. */
private LocalGossipMember _source;
/**

View File

@ -1,6 +1,7 @@
package com.google.code.gossip.manager;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -45,7 +46,7 @@ abstract public class ActiveGossipThread implements Runnable {
* Performs the sending of the membership list, after we have incremented our own heartbeat.
*/
abstract protected void sendMembershipList(LocalGossipMember me,
ArrayList<LocalGossipMember> memberList);
List<LocalGossipMember> memberList);
/**
* Abstract method which should be implemented by a subclass. This method should return a member
@ -55,5 +56,5 @@ abstract public class ActiveGossipThread implements Runnable {
* The list of members which are stored in the local list of members.
* @return The chosen LocalGossipMember to gossip with.
*/
abstract protected LocalGossipMember selectPartner(ArrayList<LocalGossipMember> memberList);
abstract protected LocalGossipMember selectPartner(List<LocalGossipMember> memberList);
}

View File

@ -2,6 +2,10 @@ package com.google.code.gossip.manager;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@ -20,34 +24,16 @@ import com.google.code.gossip.LocalGossipMember;
public abstract class GossipManager extends Thread implements NotificationListener {
public static final Logger LOGGER = Logger.getLogger(GossipManager.class);
/** The maximal number of bytes the packet with the GOSSIP may be. (Default is 100 kb) */
public static final int MAX_PACKET_SIZE = 102400;
/** The list of members which are in the gossip group (not including myself). */
private ArrayList<LocalGossipMember> _memberList;
/** The list of members which are known to be dead. */
private ArrayList<LocalGossipMember> _deadList;
/** The member I am representing. */
private ConcurrentSkipListMap<LocalGossipMember,String> members;
private LocalGossipMember _me;
/** The settings for gossiping. */
private GossipSettings _settings;
/** A boolean whether the gossip service should keep running. */
private AtomicBoolean _gossipServiceRunning;
/** A ExecutorService used for executing the active and passive gossip threads. */
private ExecutorService _gossipThreadExecutor;
private Class<? extends PassiveGossipThread> _passiveGossipThreadClass;
private PassiveGossipThread passiveGossipThread;
private Class<? extends ActiveGossipThread> _activeGossipThreadClass;
private ActiveGossipThread activeGossipThread;
public GossipManager(Class<? extends PassiveGossipThread> passiveGossipThreadClass,
@ -57,14 +43,13 @@ public abstract class GossipManager extends Thread implements NotificationListen
_activeGossipThreadClass = activeGossipThreadClass;
_settings = settings;
_me = new LocalGossipMember(address, port, id, 0, this, settings.getCleanupInterval());
_memberList = new ArrayList<LocalGossipMember>();
_deadList = new ArrayList<LocalGossipMember>();
members = new ConcurrentSkipListMap<>();
for (GossipMember startupMember : gossipMembers) {
if (!startupMember.equals(_me)) {
LocalGossipMember member = new LocalGossipMember(startupMember.getHost(),
startupMember.getPort(), startupMember.getId(), 0, this,
settings.getCleanupInterval());
_memberList.add(member);
members.put(member, "UP");
GossipService.LOGGER.debug(member);
}
}
@ -85,33 +70,39 @@ public abstract class GossipManager extends Thread implements NotificationListen
public void handleNotification(Notification notification, Object handback) {
LocalGossipMember deadMember = (LocalGossipMember) notification.getUserData();
GossipService.LOGGER.info("Dead member detected: " + deadMember);
synchronized (this._memberList) {
this._memberList.remove(deadMember);
}
synchronized (this._deadList) {
this._deadList.add(deadMember);
}
members.put(deadMember, "DOWN");
}
public GossipSettings getSettings() {
return _settings;
}
/**
* Get a clone of the memberlist.
*
* @return
*/
public ArrayList<LocalGossipMember> getMemberList() {
return _memberList;
public List<LocalGossipMember> getMemberList() {
List<LocalGossipMember> up = new ArrayList<>();
for (Entry<LocalGossipMember, String> entry : members.entrySet()){
if ("UP".equals(entry.getValue())){
up.add(entry.getKey());
}
}
return Collections.unmodifiableList(up);
}
public LocalGossipMember getMyself() {
return _me;
}
public void createOrRevivieMember(LocalGossipMember m){
members.put(m, "UP");
}
public ArrayList<LocalGossipMember> getDeadList() {
return _deadList;
public List<LocalGossipMember> getDeadList() {
List<LocalGossipMember> up = new ArrayList<>();
for (Entry<LocalGossipMember, String> entry : members.entrySet()){
if ("DOWN".equals(entry.getValue())){
up.add(entry.getKey());
}
}
return Collections.unmodifiableList(up);
}
/**
@ -121,7 +112,7 @@ public abstract class GossipManager extends Thread implements NotificationListen
* @throws InterruptedException
*/
public void run() {
for (LocalGossipMember member : _memberList) {
for (LocalGossipMember member : members.keySet()) {
if (member != _me) {
member.startTimeoutTimer();
}

View File

@ -7,6 +7,7 @@ import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.SocketException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.json.JSONArray;
@ -144,5 +145,5 @@ abstract public class PassiveGossipThread implements Runnable {
* The list of members known at the remote side.
*/
abstract protected void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember,
ArrayList<GossipMember> remoteList);
List<GossipMember> remoteList);
}

View File

@ -1,6 +1,6 @@
package com.google.code.gossip.manager.impl;
import java.util.ArrayList;
import java.util.List;
import com.google.code.gossip.GossipMember;
import com.google.code.gossip.GossipService;
@ -23,35 +23,21 @@ public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread
* @param remoteList
*/
protected void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember,
ArrayList<GossipMember> remoteList) {
synchronized (gossipManager.getDeadList()) {
synchronized (gossipManager.getMemberList()) {
List<GossipMember> remoteList) {
for (GossipMember remoteMember : remoteList) {
// Skip myself. We don't want ourselves in the local member list.
if (!remoteMember.equals(gossipManager.getMyself())) {
if (remoteMember.equals(gossipManager.getMyself())) {
continue;
}
if (gossipManager.getMemberList().contains(remoteMember)) {
GossipService.LOGGER.debug("The local list already contains the remote member ("
+ remoteMember + ").");
// The local memberlist contains the remote member.
LocalGossipMember localMember = gossipManager.getMemberList().get(
gossipManager.getMemberList().indexOf(remoteMember));
// Let's synchronize it's heartbeat.
if (remoteMember.getHeartbeat() > localMember.getHeartbeat()) {
// update local list with latest heartbeat
localMember.setHeartbeat(remoteMember.getHeartbeat());
// and reset the timeout of that member
localMember.resetTimeoutTimer();
}
// TODO: Otherwise, should we inform the other when the heartbeat is already higher?
} else {
// The local list does not contain the remote member.
GossipService.LOGGER.debug("The local list does not contain the remote member ("
+ remoteMember + ").");
// The remote member is either brand new, or a previously declared dead member.
// If its dead, check the heartbeat because it may have come back from the dead.
if (gossipManager.getDeadList().contains(remoteMember)) {
@ -80,13 +66,14 @@ public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread
.debug("The remote member is back from the dead. We will remove it from the dead list and add it as a new member.");
// The remote member is back from the dead.
// Remove it from the dead list.
gossipManager.getDeadList().remove(localDeadMember);
//gossipManager.getDeadList().remove(localDeadMember);
// Add it as a new member and add it to the member list.
LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getHost(),
remoteMember.getPort(), remoteMember.getId(),
remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings()
.getCleanupInterval());
gossipManager.getMemberList().add(newLocalMember);
//gossipManager.getMemberList().add(newLocalMember);
gossipManager.createOrRevivieMember(newLocalMember);
newLocalMember.startTimeoutTimer();
GossipService.LOGGER.info("Removed remote member " + remoteMember.getAddress()
+ " from dead list and added to local member list.");
@ -96,7 +83,7 @@ public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread
LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getHost(),
remoteMember.getPort(), remoteMember.getId(), remoteMember.getHeartbeat(),
gossipManager, gossipManager.getSettings().getCleanupInterval());
gossipManager.getMemberList().add(newLocalMember);
gossipManager.createOrRevivieMember(newLocalMember);
newLocalMember.startTimeoutTimer();
GossipService.LOGGER.info("Added new remote member " + remoteMember.getAddress()
+ " to local member list.");
@ -104,8 +91,7 @@ public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread
}
}
}
}
}
}
}

View File

@ -5,7 +5,7 @@ import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import org.json.JSONArray;
@ -23,7 +23,7 @@ abstract public class SendMembersActiveGossipThread extends ActiveGossipThread {
/**
* Performs the sending of the membership list, after we have incremented our own heartbeat.
*/
protected void sendMembershipList(LocalGossipMember me, ArrayList<LocalGossipMember> memberList) {
protected void sendMembershipList(LocalGossipMember me, List<LocalGossipMember> memberList) {
GossipService.LOGGER.debug("Send sendMembershipList() is called.");
me.setHeartbeat(me.getHeartbeat() + 1);
synchronized (memberList) {

View File

@ -1,6 +1,7 @@
package com.google.code.gossip.manager.random;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import com.google.code.gossip.GossipService;
@ -24,7 +25,7 @@ public class RandomActiveGossipThread extends SendMembersActiveGossipThread {
*
* @return Member random member if list is greater than 1, null otherwise
*/
protected LocalGossipMember selectPartner(ArrayList<LocalGossipMember> memberList) {
protected LocalGossipMember selectPartner(List<LocalGossipMember> memberList) {
LocalGossipMember member = null;
if (memberList.size() > 0) {
int randomNeighborIndex = _random.nextInt(memberList.size());