From a1c241b780e51fc78b41864c85bcfed98c186869 Mon Sep 17 00:00:00 2001 From: Edward Capriolo Date: Sun, 15 Feb 2015 18:57:06 -0500 Subject: [PATCH] Changed live and dead methods to return immutable lists. changed the underlying datastructure to be a map as opposed to two list --- .../com/google/code/gossip/GossipMember.java | 18 ++--- .../code/gossip/GossipTimeoutTimer.java | 3 - .../gossip/manager/ActiveGossipThread.java | 5 +- .../code/gossip/manager/GossipManager.java | 67 ++++++++----------- .../gossip/manager/PassiveGossipThread.java | 3 +- ...nlyProcessReceivedPassiveGossipThread.java | 36 +++------- .../impl/SendMembersActiveGossipThread.java | 4 +- .../random/RandomActiveGossipThread.java | 3 +- 8 files changed, 55 insertions(+), 84 deletions(-) diff --git a/src/main/java/com/google/code/gossip/GossipMember.java b/src/main/java/com/google/code/gossip/GossipMember.java index e24964e..50e9af3 100644 --- a/src/main/java/com/google/code/gossip/GossipMember.java +++ b/src/main/java/com/google/code/gossip/GossipMember.java @@ -10,25 +10,15 @@ import org.json.JSONObject; * * @author joshclemm, harmenw */ -public abstract class GossipMember { - /** The JSON key for the host property. */ +public abstract class GossipMember implements Comparable{ + public static final String JSON_HOST = "host"; - /** The JSON key for the port property. */ public static final String JSON_PORT = "port"; - /** The JSON key for the heartbeat property. */ public static final String JSON_HEARTBEAT = "heartbeat"; - public static final String JSON_ID = "id"; - - /** The hostname or IP address of this gossip member. */ protected String _host; - - /** The port number of this gossip member. */ protected int _port; - - /** The current heartbeat of this gossip member. */ protected int _heartbeat; - protected String _id; /** @@ -146,4 +136,8 @@ public abstract class GossipMember { throw new RuntimeException(e); } } + + public int compareTo(GossipMember other){ + return this.getAddress().compareTo(other.getAddress()); + } } diff --git a/src/main/java/com/google/code/gossip/GossipTimeoutTimer.java b/src/main/java/com/google/code/gossip/GossipTimeoutTimer.java index 94cc3a4..c3613fb 100644 --- a/src/main/java/com/google/code/gossip/GossipTimeoutTimer.java +++ b/src/main/java/com/google/code/gossip/GossipTimeoutTimer.java @@ -14,10 +14,7 @@ import javax.management.timer.Timer; */ public class GossipTimeoutTimer extends Timer { - /** The amount of time this timer waits before generating a wake-up event. */ private long _sleepTime; - - /** The gossip member this timer is for. */ private LocalGossipMember _source; /** diff --git a/src/main/java/com/google/code/gossip/manager/ActiveGossipThread.java b/src/main/java/com/google/code/gossip/manager/ActiveGossipThread.java index e5ab754..4cd096d 100644 --- a/src/main/java/com/google/code/gossip/manager/ActiveGossipThread.java +++ b/src/main/java/com/google/code/gossip/manager/ActiveGossipThread.java @@ -1,6 +1,7 @@ package com.google.code.gossip.manager; import java.util.ArrayList; +import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -45,7 +46,7 @@ abstract public class ActiveGossipThread implements Runnable { * Performs the sending of the membership list, after we have incremented our own heartbeat. */ abstract protected void sendMembershipList(LocalGossipMember me, - ArrayList memberList); + List memberList); /** * Abstract method which should be implemented by a subclass. This method should return a member @@ -55,5 +56,5 @@ abstract public class ActiveGossipThread implements Runnable { * The list of members which are stored in the local list of members. * @return The chosen LocalGossipMember to gossip with. */ - abstract protected LocalGossipMember selectPartner(ArrayList memberList); + abstract protected LocalGossipMember selectPartner(List memberList); } diff --git a/src/main/java/com/google/code/gossip/manager/GossipManager.java b/src/main/java/com/google/code/gossip/manager/GossipManager.java index 1b76e82..03cc92c 100644 --- a/src/main/java/com/google/code/gossip/manager/GossipManager.java +++ b/src/main/java/com/google/code/gossip/manager/GossipManager.java @@ -2,6 +2,10 @@ package com.google.code.gossip.manager; import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -20,34 +24,16 @@ import com.google.code.gossip.LocalGossipMember; public abstract class GossipManager extends Thread implements NotificationListener { public static final Logger LOGGER = Logger.getLogger(GossipManager.class); - - /** The maximal number of bytes the packet with the GOSSIP may be. (Default is 100 kb) */ public static final int MAX_PACKET_SIZE = 102400; - /** The list of members which are in the gossip group (not including myself). */ - private ArrayList _memberList; - - /** The list of members which are known to be dead. */ - private ArrayList _deadList; - - /** The member I am representing. */ + private ConcurrentSkipListMap members; private LocalGossipMember _me; - - /** The settings for gossiping. */ private GossipSettings _settings; - - /** A boolean whether the gossip service should keep running. */ private AtomicBoolean _gossipServiceRunning; - - /** A ExecutorService used for executing the active and passive gossip threads. */ private ExecutorService _gossipThreadExecutor; - private Class _passiveGossipThreadClass; - private PassiveGossipThread passiveGossipThread; - private Class _activeGossipThreadClass; - private ActiveGossipThread activeGossipThread; public GossipManager(Class passiveGossipThreadClass, @@ -57,14 +43,13 @@ public abstract class GossipManager extends Thread implements NotificationListen _activeGossipThreadClass = activeGossipThreadClass; _settings = settings; _me = new LocalGossipMember(address, port, id, 0, this, settings.getCleanupInterval()); - _memberList = new ArrayList(); - _deadList = new ArrayList(); + members = new ConcurrentSkipListMap<>(); for (GossipMember startupMember : gossipMembers) { if (!startupMember.equals(_me)) { LocalGossipMember member = new LocalGossipMember(startupMember.getHost(), startupMember.getPort(), startupMember.getId(), 0, this, settings.getCleanupInterval()); - _memberList.add(member); + members.put(member, "UP"); GossipService.LOGGER.debug(member); } } @@ -85,33 +70,39 @@ public abstract class GossipManager extends Thread implements NotificationListen public void handleNotification(Notification notification, Object handback) { LocalGossipMember deadMember = (LocalGossipMember) notification.getUserData(); GossipService.LOGGER.info("Dead member detected: " + deadMember); - synchronized (this._memberList) { - this._memberList.remove(deadMember); - } - synchronized (this._deadList) { - this._deadList.add(deadMember); - } + members.put(deadMember, "DOWN"); } public GossipSettings getSettings() { return _settings; } - /** - * Get a clone of the memberlist. - * - * @return - */ - public ArrayList getMemberList() { - return _memberList; + public List getMemberList() { + List up = new ArrayList<>(); + for (Entry entry : members.entrySet()){ + if ("UP".equals(entry.getValue())){ + up.add(entry.getKey()); + } + } + return Collections.unmodifiableList(up); } public LocalGossipMember getMyself() { return _me; } + + public void createOrRevivieMember(LocalGossipMember m){ + members.put(m, "UP"); + } - public ArrayList getDeadList() { - return _deadList; + public List getDeadList() { + List up = new ArrayList<>(); + for (Entry entry : members.entrySet()){ + if ("DOWN".equals(entry.getValue())){ + up.add(entry.getKey()); + } + } + return Collections.unmodifiableList(up); } /** @@ -121,7 +112,7 @@ public abstract class GossipManager extends Thread implements NotificationListen * @throws InterruptedException */ public void run() { - for (LocalGossipMember member : _memberList) { + for (LocalGossipMember member : members.keySet()) { if (member != _me) { member.startTimeoutTimer(); } diff --git a/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java b/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java index 08342b4..42c202c 100644 --- a/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java +++ b/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java @@ -7,6 +7,7 @@ import java.net.InetSocketAddress; import java.net.SocketAddress; import java.net.SocketException; import java.util.ArrayList; +import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import org.json.JSONArray; @@ -144,5 +145,5 @@ abstract public class PassiveGossipThread implements Runnable { * The list of members known at the remote side. */ abstract protected void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember, - ArrayList remoteList); + List remoteList); } \ No newline at end of file diff --git a/src/main/java/com/google/code/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java b/src/main/java/com/google/code/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java index 206d5c5..f0afaf9 100644 --- a/src/main/java/com/google/code/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java +++ b/src/main/java/com/google/code/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java @@ -1,6 +1,6 @@ package com.google.code.gossip.manager.impl; -import java.util.ArrayList; +import java.util.List; import com.google.code.gossip.GossipMember; import com.google.code.gossip.GossipService; @@ -23,35 +23,21 @@ public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread * @param remoteList */ protected void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember, - ArrayList remoteList) { - - synchronized (gossipManager.getDeadList()) { - - synchronized (gossipManager.getMemberList()) { + List remoteList) { for (GossipMember remoteMember : remoteList) { // Skip myself. We don't want ourselves in the local member list. - if (!remoteMember.equals(gossipManager.getMyself())) { + if (remoteMember.equals(gossipManager.getMyself())) { + continue; + } if (gossipManager.getMemberList().contains(remoteMember)) { - GossipService.LOGGER.debug("The local list already contains the remote member (" - + remoteMember + ")."); - // The local memberlist contains the remote member. LocalGossipMember localMember = gossipManager.getMemberList().get( gossipManager.getMemberList().indexOf(remoteMember)); - - // Let's synchronize it's heartbeat. if (remoteMember.getHeartbeat() > localMember.getHeartbeat()) { - // update local list with latest heartbeat localMember.setHeartbeat(remoteMember.getHeartbeat()); - // and reset the timeout of that member localMember.resetTimeoutTimer(); } - // TODO: Otherwise, should we inform the other when the heartbeat is already higher? } else { - // The local list does not contain the remote member. - GossipService.LOGGER.debug("The local list does not contain the remote member (" - + remoteMember + ")."); - // The remote member is either brand new, or a previously declared dead member. // If its dead, check the heartbeat because it may have come back from the dead. if (gossipManager.getDeadList().contains(remoteMember)) { @@ -80,13 +66,14 @@ public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread .debug("The remote member is back from the dead. We will remove it from the dead list and add it as a new member."); // The remote member is back from the dead. // Remove it from the dead list. - gossipManager.getDeadList().remove(localDeadMember); + //gossipManager.getDeadList().remove(localDeadMember); // Add it as a new member and add it to the member list. LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getHost(), remoteMember.getPort(), remoteMember.getId(), remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings() .getCleanupInterval()); - gossipManager.getMemberList().add(newLocalMember); + //gossipManager.getMemberList().add(newLocalMember); + gossipManager.createOrRevivieMember(newLocalMember); newLocalMember.startTimeoutTimer(); GossipService.LOGGER.info("Removed remote member " + remoteMember.getAddress() + " from dead list and added to local member list."); @@ -96,7 +83,7 @@ public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getHost(), remoteMember.getPort(), remoteMember.getId(), remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings().getCleanupInterval()); - gossipManager.getMemberList().add(newLocalMember); + gossipManager.createOrRevivieMember(newLocalMember); newLocalMember.startTimeoutTimer(); GossipService.LOGGER.info("Added new remote member " + remoteMember.getAddress() + " to local member list."); @@ -104,8 +91,7 @@ public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread } } } - } - } - } + + } diff --git a/src/main/java/com/google/code/gossip/manager/impl/SendMembersActiveGossipThread.java b/src/main/java/com/google/code/gossip/manager/impl/SendMembersActiveGossipThread.java index 5235db2..85e4b8a 100644 --- a/src/main/java/com/google/code/gossip/manager/impl/SendMembersActiveGossipThread.java +++ b/src/main/java/com/google/code/gossip/manager/impl/SendMembersActiveGossipThread.java @@ -5,7 +5,7 @@ import java.net.DatagramPacket; import java.net.DatagramSocket; import java.net.InetAddress; import java.nio.ByteBuffer; -import java.util.ArrayList; +import java.util.List; import org.json.JSONArray; @@ -23,7 +23,7 @@ abstract public class SendMembersActiveGossipThread extends ActiveGossipThread { /** * Performs the sending of the membership list, after we have incremented our own heartbeat. */ - protected void sendMembershipList(LocalGossipMember me, ArrayList memberList) { + protected void sendMembershipList(LocalGossipMember me, List memberList) { GossipService.LOGGER.debug("Send sendMembershipList() is called."); me.setHeartbeat(me.getHeartbeat() + 1); synchronized (memberList) { diff --git a/src/main/java/com/google/code/gossip/manager/random/RandomActiveGossipThread.java b/src/main/java/com/google/code/gossip/manager/random/RandomActiveGossipThread.java index 917c362..1baeb3a 100644 --- a/src/main/java/com/google/code/gossip/manager/random/RandomActiveGossipThread.java +++ b/src/main/java/com/google/code/gossip/manager/random/RandomActiveGossipThread.java @@ -1,6 +1,7 @@ package com.google.code.gossip.manager.random; import java.util.ArrayList; +import java.util.List; import java.util.Random; import com.google.code.gossip.GossipService; @@ -24,7 +25,7 @@ public class RandomActiveGossipThread extends SendMembersActiveGossipThread { * * @return Member random member if list is greater than 1, null otherwise */ - protected LocalGossipMember selectPartner(ArrayList memberList) { + protected LocalGossipMember selectPartner(List memberList) { LocalGossipMember member = null; if (memberList.size() > 0) { int randomNeighborIndex = _random.nextInt(memberList.size());