From 980e3f51e9ac0b1d00adb4f8d289dfb6da6a6dd9 Mon Sep 17 00:00:00 2001 From: Edward Capriolo Date: Fri, 9 Jan 2015 14:32:30 -0500 Subject: [PATCH] Added log4j logger and removed obvious comments --- .../com/google/code/gossip/GossipService.java | 35 +--- .../gossip/manager/ActiveGossipThread.java | 5 +- .../code/gossip/manager/GossipManager.java | 85 +++------- .../gossip/manager/PassiveGossipThread.java | 159 +++++++++--------- ...nlyProcessReceivedPassiveGossipThread.java | 12 +- .../impl/SendMembersActiveGossipThread.java | 16 +- .../random/RandomActiveGossipThread.java | 8 +- .../teknek/gossip/TenNodeThreeSeedTest.java | 3 +- 8 files changed, 118 insertions(+), 205 deletions(-) diff --git a/src/main/java/com/google/code/gossip/GossipService.java b/src/main/java/com/google/code/gossip/GossipService.java index 41f044e..2604b8e 100644 --- a/src/main/java/com/google/code/gossip/GossipService.java +++ b/src/main/java/com/google/code/gossip/GossipService.java @@ -1,11 +1,10 @@ package com.google.code.gossip; -import java.io.PrintStream; import java.net.InetAddress; import java.net.SocketException; import java.net.UnknownHostException; import java.util.ArrayList; -import java.util.Date; +import org.apache.log4j.Logger; import com.google.code.gossip.manager.GossipManager; import com.google.code.gossip.manager.random.RandomGossipManager; @@ -16,10 +15,8 @@ import com.google.code.gossip.manager.random.RandomGossipManager; * @author joshclemm, harmenw */ public class GossipService { - - /** A instance variable holding the log level. */ - private int _logLevel = LogLevel.INFO; - + + public static final Logger LOGGER = Logger.getLogger(GossipService.class); private GossipManager _gossipManager; /** @@ -38,9 +35,6 @@ public class GossipService { * @throws UnknownHostException */ public GossipService(String ipAddress, int port, int logLevel, ArrayList gossipMembers, GossipSettings settings) throws InterruptedException, UnknownHostException { - // Set the logging level. - _logLevel = logLevel; - _gossipManager = new RandomGossipManager(ipAddress, port, settings, gossipMembers); } @@ -52,29 +46,6 @@ public class GossipService { _gossipManager.shutdown(); } - public static void error(Object message) { - //if (_logLevel >= LogLevel.ERROR) printMessage(message, System.err); - printMessage(message, System.err); - } - - public static void info(Object message) { - //if (_logLevel >= LogLevel.INFO) printMessage(message, System.out); - printMessage(message, System.out); - } - - public static void debug(Object message) { - //if (_logLevel >= LogLevel.DEBUG) printMessage(message, System.out); - printMessage(message, System.out); - } - - private static void printMessage(Object message, PrintStream out) { - /**String addressString = "unknown"; - if (_me != null) - addressString = _me.getAddress(); - out.println("[" + addressString + "][" + new Date().toString() + "] " + message);*/ - out.println("[" + new Date().toString() + "] " + message); - } - public GossipManager get_gossipManager() { return _gossipManager; } diff --git a/src/main/java/com/google/code/gossip/manager/ActiveGossipThread.java b/src/main/java/com/google/code/gossip/manager/ActiveGossipThread.java index 9f3cdd0..c2168fa 100644 --- a/src/main/java/com/google/code/gossip/manager/ActiveGossipThread.java +++ b/src/main/java/com/google/code/gossip/manager/ActiveGossipThread.java @@ -22,7 +22,6 @@ abstract public class ActiveGossipThread implements Runnable { public ActiveGossipThread(GossipManager gossipManager) { _gossipManager = gossipManager; - _keepRunning = new AtomicBoolean(true); } @@ -33,9 +32,7 @@ abstract public class ActiveGossipThread implements Runnable { TimeUnit.MILLISECONDS.sleep(_gossipManager.getSettings().getGossipInterval()); sendMembershipList(_gossipManager.getMyself(), _gossipManager.getMemberList()); } catch (InterruptedException e) { - // This membership thread was interrupted externally, shutdown - GossipService.debug("The ActiveGossipThread was interrupted externally, shutdown."); - e.printStackTrace(); + GossipService.LOGGER.error(e); _keepRunning.set(false); } } diff --git a/src/main/java/com/google/code/gossip/manager/GossipManager.java b/src/main/java/com/google/code/gossip/manager/GossipManager.java index 3324583..ebc185b 100644 --- a/src/main/java/com/google/code/gossip/manager/GossipManager.java +++ b/src/main/java/com/google/code/gossip/manager/GossipManager.java @@ -42,45 +42,28 @@ public abstract class GossipManager extends Thread implements NotificationListen private Class _activeGossipThreadClass; public GossipManager(Class passiveGossipThreadClass, Class activeGossipThreadClass, String address, int port, GossipSettings settings, ArrayList gossipMembers) { - // Set the active and passive gossip thread classes to use. _passiveGossipThreadClass = passiveGossipThreadClass; _activeGossipThreadClass = activeGossipThreadClass; - - // Assign the GossipSettings to the instance variable. _settings = settings; - - // Create the local gossip member which I am representing. _me = new LocalGossipMember(address, port, 0, this, settings.getCleanupInterval()); - - // Initialize the gossip members list. _memberList = new ArrayList(); - - // Initialize the dead gossip members list. - _deadList = new ArrayList(); - - // Print the startup member list when the service is in debug mode. - GossipService.debug("Startup member list:"); - GossipService.debug("---------------------"); - // First print out myself. - GossipService.debug(_me); - // Copy the list with members given to the local member list and print the member when in debug mode. + _deadList = new ArrayList(); + GossipService.LOGGER.debug("Startup member list:"); + GossipService.LOGGER.debug("---------------------"); + GossipService.LOGGER.debug(_me); + for (GossipMember startupMember : gossipMembers) { if (!startupMember.equals(_me)) { LocalGossipMember member = new LocalGossipMember(startupMember.getHost(), startupMember.getPort(), 0, this, settings.getCleanupInterval()); _memberList.add(member); - GossipService.debug(member); - } else { - GossipService.info("Found myself in the members section of the configuration, you should not add the host itself to the members section."); - } + GossipService.LOGGER.debug(member); + } } - // Set the boolean for running the gossip service to true. _gossipServiceRunning = new AtomicBoolean(true); - - // Add a shutdown hook so we can see when the service has been shutdown. Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { public void run() { - GossipService.info("Service has been shutdown..."); + GossipService.LOGGER.info("Service has been shutdown..."); } })); } @@ -92,18 +75,11 @@ public abstract class GossipManager extends Thread implements NotificationListen */ @Override public void handleNotification(Notification notification, Object handback) { - - // Get the local gossip member associated with the notification. LocalGossipMember deadMember = (LocalGossipMember) notification.getUserData(); - - GossipService.info("Dead member detected: " + deadMember); - - // Remove the member from the active member list. + GossipService.LOGGER.info("Dead member detected: " + deadMember); synchronized (this._memberList) { this._memberList.remove(deadMember); } - - // Add the member to the dead member list. synchronized (this._deadList) { this._deadList.add(deadMember); } @@ -135,46 +111,27 @@ public abstract class GossipManager extends Thread implements NotificationListen * @throws InterruptedException */ public void run() { - // Start all timers except for me for (LocalGossipMember member : _memberList) { if (member != _me) { member.startTimeoutTimer(); } } - - try { - _gossipThreadExecutor = Executors.newCachedThreadPool(); - // The receiver thread is a passive player that handles - // merging incoming membership lists from other neighbors. - _gossipThreadExecutor.execute(_passiveGossipThreadClass.getConstructor(GossipManager.class).newInstance(this)); - // The gossiper thread is an active player that - // selects a neighbor to share its membership list - _gossipThreadExecutor.execute(_activeGossipThreadClass.getConstructor(GossipManager.class).newInstance(this)); - } catch (IllegalArgumentException e1) { - e1.printStackTrace(); - } catch (SecurityException e1) { - e1.printStackTrace(); - } catch (InstantiationException e1) { - e1.printStackTrace(); - } catch (IllegalAccessException e1) { - e1.printStackTrace(); - } catch (InvocationTargetException e1) { - e1.printStackTrace(); - } catch (NoSuchMethodException e1) { - e1.printStackTrace(); - } - - // Potentially, you could kick off more threads here - // that could perform additional data synching - - GossipService.info("The GossipService is started."); - - // keep the main thread around + _gossipThreadExecutor = Executors.newCachedThreadPool(); + try { + _gossipThreadExecutor.execute(_passiveGossipThreadClass.getConstructor(GossipManager.class) + .newInstance(this)); + _gossipThreadExecutor.execute(_activeGossipThreadClass.getConstructor(GossipManager.class) + .newInstance(this)); + } catch (InstantiationException | IllegalAccessException | IllegalArgumentException + | InvocationTargetException | NoSuchMethodException | SecurityException e1) { + throw new RuntimeException(e1); + } + GossipService.LOGGER.info("The GossipService is started."); while(_gossipServiceRunning.get()) { try { TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) { - GossipService.info("The GossipClient was interrupted."); + GossipService.LOGGER.info("The GossipClient was interrupted."); } } } diff --git a/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java b/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java index 9b15a19..3f51008 100644 --- a/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java +++ b/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java @@ -39,97 +39,92 @@ abstract public class PassiveGossipThread implements Runnable { try { SocketAddress socketAddress = new InetSocketAddress(_gossipManager.getMyself().getHost(), _gossipManager.getMyself().getPort()); _server = new DatagramSocket(socketAddress); - GossipService.info("Gossip service successfully initialized on port " + _gossipManager.getMyself().getPort()); - GossipService.debug("I am " + _gossipManager.getMyself()); + GossipService.LOGGER.info("Gossip service successfully initialized on port " + _gossipManager.getMyself().getPort()); + GossipService.LOGGER.debug("I am " + _gossipManager.getMyself()); } catch (SocketException ex) { - System.err.println(ex); + GossipService.LOGGER.error(ex); _server = null; throw new RuntimeException(ex); } _keepRunning = new AtomicBoolean(true); } - @Override - public void run() { - while(_keepRunning.get()) { - try { - // Create a byte array with the size of the buffer. - byte[] buf = new byte[_server.getReceiveBufferSize()]; - DatagramPacket p = new DatagramPacket(buf, buf.length); - _server.receive(p); - GossipService.debug("A message has been received from " + p.getAddress() + ":" + p.getPort() + "."); - - int packet_length = 0; - for (int i = 0; i < 4; i++) { - int shift = (4 - 1 - i) * 8; - packet_length += (buf[i] & 0x000000FF) << shift; - } - - // Check whether the package is smaller than the maximal packet length. - // A package larger than this would not be possible to be send from a GossipService, - // since this is check before sending the message. - // This could normally only occur when the list of members is very big, - // or when the packet is misformed, and the first 4 bytes is not the right in anymore. - // For this reason we regards the message. - if (packet_length <= GossipManager.MAX_PACKET_SIZE) { - - byte[] json_bytes = new byte[packet_length]; - for (int i=0; i remoteGossipMembers = new ArrayList(); - - RemoteGossipMember senderMember = null; - - GossipService.debug("Received member list:"); - // Convert the received JSON message to a JSON array. - JSONArray jsonArray = new JSONArray(receivedMessage); - // The JSON array should contain all members. - // Let's iterate over them. - for (int i = 0; i < jsonArray.length(); i++) { - JSONObject memberJSONObject = jsonArray.getJSONObject(i); - // Now the array should contain 3 objects (hostname, port and heartbeat). - if (memberJSONObject.length() == 3) { - // Ok, now let's create the member object. - RemoteGossipMember member = new RemoteGossipMember(memberJSONObject.getString(GossipMember.JSON_HOST), memberJSONObject.getInt(GossipMember.JSON_PORT), memberJSONObject.getInt(GossipMember.JSON_HEARTBEAT)); - GossipService.debug(member.toString()); - - // This is the first member found, so this should be the member who is communicating with me. - if (i == 0) { - senderMember = member; - } - - remoteGossipMembers.add(member); - } else { - GossipService.error("The received member object does not contain 3 objects:\n" + memberJSONObject.toString()); - } - - } - - // Merge our list with the one we just received - mergeLists(_gossipManager, senderMember, remoteGossipMembers); - - } catch (JSONException e) { - GossipService.error("The received message is not well-formed JSON. The following message has been dropped:\n" + receivedMessage); - } - - } else { - GossipService.error("The received message is not of the expected size, it has been dropped."); - } + int packet_length = 0; + for (int i = 0; i < 4; i++) { + int shift = (4 - 1 - i) * 8; + packet_length += (buf[i] & 0x000000FF) << shift; + } - } catch (IOException e) { - e.printStackTrace(); - _keepRunning.set(false); - } - } - } + // Check whether the package is smaller than the maximal packet length. + // A package larger than this would not be possible to be send from a GossipService, + // since this is check before sending the message. + // This could normally only occur when the list of members is very big, + // or when the packet is misformed, and the first 4 bytes is not the right in anymore. + // For this reason we regards the message. + if (packet_length <= GossipManager.MAX_PACKET_SIZE) { + byte[] json_bytes = new byte[packet_length]; + for (int i = 0; i < packet_length; i++) { + json_bytes[i] = buf[i + 4]; + } + String receivedMessage = new String(json_bytes); + GossipService.LOGGER.debug("Received message (" + packet_length + " bytes): " + + receivedMessage); + try { + ArrayList remoteGossipMembers = new ArrayList(); + RemoteGossipMember senderMember = null; + GossipService.LOGGER.debug("Received member list:"); + JSONArray jsonArray = new JSONArray(receivedMessage); + for (int i = 0; i < jsonArray.length(); i++) { + JSONObject memberJSONObject = jsonArray.getJSONObject(i); + if (memberJSONObject.length() == 3) { + RemoteGossipMember member = new RemoteGossipMember( + memberJSONObject.getString(GossipMember.JSON_HOST), + memberJSONObject.getInt(GossipMember.JSON_PORT), + memberJSONObject.getInt(GossipMember.JSON_HEARTBEAT)); + GossipService.LOGGER.debug(member.toString()); + // This is the first member found, so this should be the member who is communicating + // with me. + if (i == 0) { + senderMember = member; + } + remoteGossipMembers.add(member); + } else { + GossipService.LOGGER + .error("The received member object does not contain 3 objects:\n" + + memberJSONObject.toString()); + } + + } + + // Merge our list with the one we just received + mergeLists(_gossipManager, senderMember, remoteGossipMembers); + } catch (JSONException e) { + GossipService.LOGGER + .error("The received message is not well-formed JSON. The following message has been dropped:\n" + + receivedMessage); + } + + } else { + GossipService.LOGGER + .error("The received message is not of the expected size, it has been dropped."); + } + + } catch (IOException e) { + e.printStackTrace(); + _keepRunning.set(false); + } + } + } /** * Abstract method for merging the local and remote list. diff --git a/src/main/java/com/google/code/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java b/src/main/java/com/google/code/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java index 271e119..dffe9f8 100644 --- a/src/main/java/com/google/code/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java +++ b/src/main/java/com/google/code/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java @@ -33,7 +33,7 @@ public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread // Skip myself. We don't want ourselves in the local member list. if (!remoteMember.equals(gossipManager.getMyself())) { if (gossipManager.getMemberList().contains(remoteMember)) { - GossipService.debug("The local list already contains the remote member (" + remoteMember + ")."); + GossipService.LOGGER.debug("The local list already contains the remote member (" + remoteMember + ")."); // The local memberlist contains the remote member. LocalGossipMember localMember = gossipManager.getMemberList().get(gossipManager.getMemberList().indexOf(remoteMember)); @@ -47,13 +47,13 @@ public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread // TODO: Otherwise, should we inform the other when the heartbeat is already higher? } else { // The local list does not contain the remote member. - GossipService.debug("The local list does not contain the remote member (" + remoteMember + ")."); + GossipService.LOGGER.debug("The local list does not contain the remote member (" + remoteMember + ")."); // The remote member is either brand new, or a previously declared dead member. // If its dead, check the heartbeat because it may have come back from the dead. if (gossipManager.getDeadList().contains(remoteMember)) { // The remote member is known here as a dead member. - GossipService.debug("The remote member is known here as a dead member."); + GossipService.LOGGER.debug("The remote member is known here as a dead member."); LocalGossipMember localDeadMember = gossipManager.getDeadList().get(gossipManager.getDeadList().indexOf(remoteMember)); // If a member is restarted the heartbeat will restart from 1, so we should check that here. // So a member can become from the dead when it is either larger than a previous heartbeat (due to network failure) @@ -66,7 +66,7 @@ public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread if (remoteMember.getHeartbeat() == 1 || ((localDeadMember.getHeartbeat() - remoteMember.getHeartbeat()) * -1) > (gossipManager.getSettings().getCleanupInterval() / 1000) || remoteMember.getHeartbeat() > localDeadMember.getHeartbeat()) { - GossipService.debug("The remote member is back from the dead. We will remove it from the dead list and add it as a new member."); + GossipService.LOGGER.debug("The remote member is back from the dead. We will remove it from the dead list and add it as a new member."); // The remote member is back from the dead. // Remove it from the dead list. gossipManager.getDeadList().remove(localDeadMember); @@ -74,14 +74,14 @@ public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getHost(), remoteMember.getPort(), remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings().getCleanupInterval()); gossipManager.getMemberList().add(newLocalMember); newLocalMember.startTimeoutTimer(); - GossipService.info("Removed remote member " + remoteMember.getAddress() + " from dead list and added to local member list."); + GossipService.LOGGER.info("Removed remote member " + remoteMember.getAddress() + " from dead list and added to local member list."); } } else { // Brand spanking new member - welcome. LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getHost(), remoteMember.getPort(), remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings().getCleanupInterval()); gossipManager.getMemberList().add(newLocalMember); newLocalMember.startTimeoutTimer(); - GossipService.info("Added new remote member " + remoteMember.getAddress() + " to local member list."); + GossipService.LOGGER.info("Added new remote member " + remoteMember.getAddress() + " to local member list."); } } } diff --git a/src/main/java/com/google/code/gossip/manager/impl/SendMembersActiveGossipThread.java b/src/main/java/com/google/code/gossip/manager/impl/SendMembersActiveGossipThread.java index 59c8366..4ccfde8 100644 --- a/src/main/java/com/google/code/gossip/manager/impl/SendMembersActiveGossipThread.java +++ b/src/main/java/com/google/code/gossip/manager/impl/SendMembersActiveGossipThread.java @@ -25,7 +25,7 @@ abstract public class SendMembersActiveGossipThread extends ActiveGossipThread { * incremented our own heartbeat. */ protected void sendMembershipList(LocalGossipMember me, ArrayList memberList) { - GossipService.debug("Send sendMembershipList() is called."); + GossipService.LOGGER.debug("Send sendMembershipList() is called."); // Increase the heartbeat of myself by 1. me.setHeartbeat(me.getHeartbeat() + 1); @@ -39,21 +39,21 @@ abstract public class SendMembersActiveGossipThread extends ActiveGossipThread { // Create a StringBuffer for the JSON message. JSONArray jsonArray = new JSONArray(); - GossipService.debug("Sending memberlist to " + dest + ":" + member.getPort()); - GossipService.debug("---------------------"); + GossipService.LOGGER.debug("Sending memberlist to " + dest + ":" + member.getPort()); + GossipService.LOGGER.debug("---------------------"); // First write myself, append the JSON representation of the member to the buffer. jsonArray.put(me.toJSONObject()); - GossipService.debug(me); + GossipService.LOGGER.debug(me); // Then write the others. for (int i=0; i> 24 ); - GossipService.debug("Sending message ("+packet_length+" bytes): " + jsonArray.toString()); + GossipService.LOGGER.debug("Sending message ("+packet_length+" bytes): " + jsonArray.toString()); ByteBuffer byteBuffer = ByteBuffer.allocate(4 + json_bytes.length); byteBuffer.put(length_bytes); @@ -82,7 +82,7 @@ abstract public class SendMembersActiveGossipThread extends ActiveGossipThread { socket.send(datagramPacket); socket.close(); } else { - GossipService.error("The length of the to be send message is too large (" + packet_length + " > " + GossipManager.MAX_PACKET_SIZE + ")."); + GossipService.LOGGER.error("The length of the to be send message is too large (" + packet_length + " > " + GossipManager.MAX_PACKET_SIZE + ")."); } } diff --git a/src/main/java/com/google/code/gossip/manager/random/RandomActiveGossipThread.java b/src/main/java/com/google/code/gossip/manager/random/RandomActiveGossipThread.java index 6986156..c4dcb2e 100644 --- a/src/main/java/com/google/code/gossip/manager/random/RandomActiveGossipThread.java +++ b/src/main/java/com/google/code/gossip/manager/random/RandomActiveGossipThread.java @@ -15,8 +15,6 @@ public class RandomActiveGossipThread extends SendMembersActiveGossipThread { public RandomActiveGossipThread(GossipManager gossipManager) { super(gossipManager); - - // Initialize the random used for deciding on which gossip member to gossip with. _random = new Random(); } @@ -28,16 +26,12 @@ public class RandomActiveGossipThread extends SendMembersActiveGossipThread { */ protected LocalGossipMember selectPartner(ArrayList memberList) { LocalGossipMember member = null; - - // We can only send a message if there are actually other members. if (memberList.size() > 0) { - // Get the index of the random member. int randomNeighborIndex = _random.nextInt(memberList.size()); member = memberList.get(randomNeighborIndex); } else { - GossipService.debug("I am alone in this world."); + GossipService.LOGGER.debug("I am alone in this world."); } - return member; } diff --git a/src/test/java/io/teknek/gossip/TenNodeThreeSeedTest.java b/src/test/java/io/teknek/gossip/TenNodeThreeSeedTest.java index f826d42..47e7aa3 100644 --- a/src/test/java/io/teknek/gossip/TenNodeThreeSeedTest.java +++ b/src/test/java/io/teknek/gossip/TenNodeThreeSeedTest.java @@ -19,13 +19,11 @@ public class TenNodeThreeSeedTest { @Test public void test() throws UnknownHostException, InterruptedException{ GossipSettings settings = new GossipSettings(); - int seedNodes = 3; ArrayList startupMembers = new ArrayList(); for (int i = 1; i < seedNodes+1; ++i) { startupMembers.add(new RemoteGossipMember("127.0.0." + i, 2000)); } - ArrayList clients = new ArrayList(); int clusterMembers = 10; for (int i = 1; i < clusterMembers+1; ++i) { @@ -36,6 +34,7 @@ public class TenNodeThreeSeedTest { } Thread.sleep(10000); for (int i = 0; i < clusterMembers; ++i) { + System.out.println(clients.get(i).get_gossipManager().getMemberList()); Assert.assertEquals(9, clients.get(i).get_gossipManager().getMemberList().size()); } for (int i = 0; i < clusterMembers; ++i) {