From 4ca9b82ae442ff094e6a0160b6a6ea2aa63a3770 Mon Sep 17 00:00:00 2001 From: Edward Capriolo Date: Mon, 16 May 2016 18:54:17 -0400 Subject: [PATCH 1/5] use jackson --- .../gossip/manager/PassiveGossipThread.java | 57 ++++++++--------- .../impl/SendMembersActiveGossipThread.java | 25 ++++++-- .../gossip/model/ActiveGossipMessage.java | 22 +++++++ .../code/gossip/model/GossipMember.java | 63 +++++++++++++++++++ 4 files changed, 129 insertions(+), 38 deletions(-) create mode 100644 src/main/java/com/google/code/gossip/model/ActiveGossipMessage.java create mode 100644 src/main/java/com/google/code/gossip/model/GossipMember.java diff --git a/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java b/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java index b05a780..5abb39f 100644 --- a/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java +++ b/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.log4j.Logger; +import org.codehaus.jackson.map.ObjectMapper; import org.json.JSONArray; import org.json.JSONException; import org.json.JSONObject; @@ -35,6 +36,7 @@ import org.json.JSONObject; import com.google.code.gossip.GossipMember; import com.google.code.gossip.GossipService; import com.google.code.gossip.RemoteGossipMember; +import com.google.code.gossip.model.ActiveGossipMessage; /** * [The passive thread: reply to incoming gossip request.] This class handles the passive cycle, @@ -54,6 +56,8 @@ abstract public class PassiveGossipThread implements Runnable { private AtomicBoolean keepRunning; private final String cluster; + + private ObjectMapper MAPPER = new ObjectMapper(); public PassiveGossipThread(GossipManager gossipManager) { this.gossipManager = gossipManager; @@ -93,46 +97,35 @@ abstract public class PassiveGossipThread implements Runnable { json_bytes[i] = buf[i + 4]; } String receivedMessage = new String(json_bytes); - GossipService.LOGGER.debug("Received message (" + packet_length + " bytes): " + GossipService.LOGGER.warn("Received message (" + packet_length + " bytes): " + receivedMessage); try { List remoteGossipMembers = new ArrayList<>(); RemoteGossipMember senderMember = null; - JSONArray jsonArray = new JSONArray(receivedMessage); - for (int i = 0; i < jsonArray.length(); i++) { - JSONObject memberJSONObject = jsonArray.getJSONObject(i); - if (memberJSONObject.length() == 5 - && cluster.equals(memberJSONObject.get(GossipMember.JSON_CLUSTER))) { - RemoteGossipMember member = new RemoteGossipMember( - memberJSONObject.getString(GossipMember.JSON_CLUSTER), - memberJSONObject.getString(GossipMember.JSON_HOST), - memberJSONObject.getInt(GossipMember.JSON_PORT), - memberJSONObject.getString(GossipMember.JSON_ID), - memberJSONObject.getLong(GossipMember.JSON_HEARTBEAT)); - GossipService.LOGGER.debug(member.toString()); - // This is the first member found, so this should be the member who is communicating - // with me. - if (i == 0) { - senderMember = member; - } - remoteGossipMembers.add(member); - } else if (memberJSONObject.length() == 5) { - GossipService.LOGGER.warn("The member object does not belong to this cluster."); - } else { - GossipService.LOGGER - .error("The received member object does not contain 5 objects:\n" - + memberJSONObject.toString()); + ActiveGossipMessage activeGossipMessage = MAPPER.readValue(json_bytes, + ActiveGossipMessage.class); + for (int i = 0; i < activeGossipMessage.getMembers().size(); i++) { + RemoteGossipMember member = new RemoteGossipMember( + activeGossipMessage.getMembers().get(i).getCluster(), + activeGossipMessage.getMembers().get(i).getHost(), + activeGossipMessage.getMembers().get(i).getPort(), + activeGossipMessage.getMembers().get(i).getId(), + activeGossipMessage.getMembers().get(i).getHeartbeat()); + if (!(member.getClusterName().equals(cluster))){ + GossipService.LOGGER.warn("Note a member of this cluster " + i); + continue; } - + // This is the first member found, so this should be the member who is communicating + // with me. + if (i == 0) { + senderMember = member; + } + remoteGossipMembers.add(member); } mergeLists(gossipManager, senderMember, remoteGossipMembers); - } catch (JSONException e) { - GossipService.LOGGER - .error("The received message is not well-formed JSON. The following message has been dropped:\n" - + receivedMessage); - System.out.println(e); + } catch (RuntimeException ex) { + GossipService.LOGGER.error("Unable to process message", ex); } - } else { GossipService.LOGGER .error("The received message is not of the expected size, it has been dropped."); diff --git a/src/main/java/com/google/code/gossip/manager/impl/SendMembersActiveGossipThread.java b/src/main/java/com/google/code/gossip/manager/impl/SendMembersActiveGossipThread.java index 4e5f855..2259781 100644 --- a/src/main/java/com/google/code/gossip/manager/impl/SendMembersActiveGossipThread.java +++ b/src/main/java/com/google/code/gossip/manager/impl/SendMembersActiveGossipThread.java @@ -24,19 +24,33 @@ import java.net.InetAddress; import java.nio.ByteBuffer; import java.util.List; -import org.json.JSONArray; +import org.codehaus.jackson.map.ObjectMapper; import com.google.code.gossip.GossipService; import com.google.code.gossip.LocalGossipMember; import com.google.code.gossip.manager.ActiveGossipThread; import com.google.code.gossip.manager.GossipManager; +import com.google.code.gossip.model.ActiveGossipMessage; +import com.google.code.gossip.model.GossipMember; abstract public class SendMembersActiveGossipThread extends ActiveGossipThread { + protected ObjectMapper om = new ObjectMapper(); + public SendMembersActiveGossipThread(GossipManager gossipManager) { super(gossipManager); } + private GossipMember convert(LocalGossipMember member){ + GossipMember gm = new GossipMember(); + gm.setCluster(member.getClusterName()); + gm.setHeartbeat(member.getHeartbeat()); + gm.setHost(member.getHost()); + gm.setId(member.getId()); + gm.setPort(member.getPort()); + return gm; + } + /** * Performs the sending of the membership list, after we have incremented our own heartbeat. */ @@ -50,13 +64,12 @@ abstract public class SendMembersActiveGossipThread extends ActiveGossipThread { try (DatagramSocket socket = new DatagramSocket()) { socket.setSoTimeout(gossipManager.getSettings().getGossipInterval()); InetAddress dest = InetAddress.getByName(member.getHost()); - JSONArray jsonArray = new JSONArray(); - jsonArray.put(me.toJSONObject()); + ActiveGossipMessage message = new ActiveGossipMessage(); + message.getMembers().add(convert(me)); for (LocalGossipMember other : memberList) { - jsonArray.put(other.toJSONObject()); - GossipService.LOGGER.debug(other); + message.getMembers().add(convert(other)); } - byte[] json_bytes = jsonArray.toString().getBytes(); + byte[] json_bytes = om.writeValueAsString(message).getBytes(); int packet_length = json_bytes.length; if (packet_length < GossipManager.MAX_PACKET_SIZE) { byte[] buf = createBuffer(packet_length, json_bytes); diff --git a/src/main/java/com/google/code/gossip/model/ActiveGossipMessage.java b/src/main/java/com/google/code/gossip/model/ActiveGossipMessage.java new file mode 100644 index 0000000..d3516f5 --- /dev/null +++ b/src/main/java/com/google/code/gossip/model/ActiveGossipMessage.java @@ -0,0 +1,22 @@ +package com.google.code.gossip.model; + +import java.util.ArrayList; +import java.util.List; + +public class ActiveGossipMessage { + + private List members = new ArrayList<>(); + + public ActiveGossipMessage(){ + + } + + public List getMembers() { + return members; + } + + public void setMembers(List members) { + this.members = members; + } + +} diff --git a/src/main/java/com/google/code/gossip/model/GossipMember.java b/src/main/java/com/google/code/gossip/model/GossipMember.java new file mode 100644 index 0000000..6c073b4 --- /dev/null +++ b/src/main/java/com/google/code/gossip/model/GossipMember.java @@ -0,0 +1,63 @@ +package com.google.code.gossip.model; + +public class GossipMember { + + private String cluster; + private String host; + private Integer port; + private String id; + private Long heartbeat; + + public GossipMember(){ + + } + + public GossipMember(String cluster, String host, Integer port, String id, Long heartbeat){ + this.cluster=cluster; + this.host= host; + this.port = port; + this.id = id; + + } + + public String getCluster() { + return cluster; + } + + public void setCluster(String cluster) { + this.cluster = cluster; + } + + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } + + public Integer getPort() { + return port; + } + + public void setPort(Integer port) { + this.port = port; + } + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public Long getHeartbeat() { + return heartbeat; + } + + public void setHeartbeat(Long heartbeat) { + this.heartbeat = heartbeat; + } + +} From f252d44f5277b3e6521e604ec3dec1d568eb2f0f Mon Sep 17 00:00:00 2001 From: Edward Capriolo Date: Mon, 16 May 2016 19:03:34 -0400 Subject: [PATCH 2/5] Too much logging --- .../com/google/code/gossip/manager/PassiveGossipThread.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java b/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java index 5abb39f..ec11cfe 100644 --- a/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java +++ b/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java @@ -96,9 +96,11 @@ abstract public class PassiveGossipThread implements Runnable { for (int i = 0; i < packet_length; i++) { json_bytes[i] = buf[i + 4]; } - String receivedMessage = new String(json_bytes); - GossipService.LOGGER.warn("Received message (" + packet_length + " bytes): " + if (GossipService.LOGGER.isDebugEnabled()){ + String receivedMessage = new String(json_bytes); + GossipService.LOGGER.debug("Received message (" + packet_length + " bytes): " + receivedMessage); + } try { List remoteGossipMembers = new ArrayList<>(); RemoteGossipMember senderMember = null; From 95fecb83553158033f4e76fd5659dac1a5585567 Mon Sep 17 00:00:00 2001 From: Edward Capriolo Date: Mon, 16 May 2016 19:10:53 -0400 Subject: [PATCH 3/5] Final final --- .../code/gossip/manager/PassiveGossipThread.java | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java b/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java index ec11cfe..6bf1494 100644 --- a/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java +++ b/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java @@ -29,10 +29,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.log4j.Logger; import org.codehaus.jackson.map.ObjectMapper; -import org.json.JSONArray; -import org.json.JSONException; -import org.json.JSONObject; - import com.google.code.gossip.GossipMember; import com.google.code.gossip.GossipService; import com.google.code.gossip.RemoteGossipMember; @@ -49,15 +45,15 @@ abstract public class PassiveGossipThread implements Runnable { public static final Logger LOGGER = Logger.getLogger(PassiveGossipThread.class); /** The socket used for the passive thread of the gossip service. */ - private DatagramSocket server; + private final DatagramSocket server; private final GossipManager gossipManager; - private AtomicBoolean keepRunning; + private final AtomicBoolean keepRunning; private final String cluster; - private ObjectMapper MAPPER = new ObjectMapper(); + private final ObjectMapper MAPPER = new ObjectMapper(); public PassiveGossipThread(GossipManager gossipManager) { this.gossipManager = gossipManager; From bee4936f438402dd5c4216c824b137d2e055a2fd Mon Sep 17 00:00:00 2001 From: Edward Capriolo Date: Mon, 16 May 2016 19:37:17 -0400 Subject: [PATCH 4/5] Remove dead code --- .../com/google/code/gossip/GossipMember.java | 30 +------------------ 1 file changed, 1 insertion(+), 29 deletions(-) diff --git a/src/main/java/com/google/code/gossip/GossipMember.java b/src/main/java/com/google/code/gossip/GossipMember.java index 56a5f44..314b5b7 100644 --- a/src/main/java/com/google/code/gossip/GossipMember.java +++ b/src/main/java/com/google/code/gossip/GossipMember.java @@ -29,16 +29,7 @@ import org.json.JSONObject; */ public abstract class GossipMember implements Comparable { - public static final String JSON_HOST = "host"; - - public static final String JSON_PORT = "port"; - - public static final String JSON_HEARTBEAT = "heartbeat"; - - public static final String JSON_ID = "id"; - - public static final String JSON_CLUSTER = "cluster"; - + protected final String host; protected final int port; @@ -174,25 +165,6 @@ public abstract class GossipMember implements Comparable { && getClusterName().equals(((LocalGossipMember) obj).getClusterName()); } - /** - * Get the JSONObject which is the JSON representation of this GossipMember. - * - * @return The JSONObject of this GossipMember. - */ - public JSONObject toJSONObject() { - try { - JSONObject jsonObject = new JSONObject(); - jsonObject.put(JSON_CLUSTER, clusterName); - jsonObject.put(JSON_HOST, host); - jsonObject.put(JSON_PORT, port); - jsonObject.put(JSON_ID, id); - jsonObject.put(JSON_HEARTBEAT, heartbeat); - return jsonObject; - } catch (JSONException e) { - throw new RuntimeException(e); - } - } - public int compareTo(GossipMember other) { return this.getAddress().compareTo(other.getAddress()); } From 441c68c712ce53627c92168f1641e4ff6034043f Mon Sep 17 00:00:00 2001 From: Edward Capriolo Date: Mon, 16 May 2016 21:33:45 -0400 Subject: [PATCH 5/5] clean up warnings --- src/main/java/com/google/code/gossip/GossipMember.java | 3 --- src/main/java/com/google/code/gossip/event/GossipState.java | 1 + src/test/java/io/teknek/gossip/StartupSettingsTest.java | 3 --- src/test/java/io/teknek/gossip/TenNodeThreeSeedTest.java | 1 - 4 files changed, 1 insertion(+), 7 deletions(-) diff --git a/src/main/java/com/google/code/gossip/GossipMember.java b/src/main/java/com/google/code/gossip/GossipMember.java index 314b5b7..56029fa 100644 --- a/src/main/java/com/google/code/gossip/GossipMember.java +++ b/src/main/java/com/google/code/gossip/GossipMember.java @@ -19,9 +19,6 @@ package com.google.code.gossip; import java.net.InetSocketAddress; -import org.json.JSONException; -import org.json.JSONObject; - /** * A abstract class representing a gossip member. * diff --git a/src/main/java/com/google/code/gossip/event/GossipState.java b/src/main/java/com/google/code/gossip/event/GossipState.java index c0bc565..e303c89 100644 --- a/src/main/java/com/google/code/gossip/event/GossipState.java +++ b/src/main/java/com/google/code/gossip/event/GossipState.java @@ -19,6 +19,7 @@ package com.google.code.gossip.event; public enum GossipState { UP("up"), DOWN("down"); + @SuppressWarnings("unused") private final String state; private GossipState(String state) { diff --git a/src/test/java/io/teknek/gossip/StartupSettingsTest.java b/src/test/java/io/teknek/gossip/StartupSettingsTest.java index 5f25dd9..bf6710e 100644 --- a/src/test/java/io/teknek/gossip/StartupSettingsTest.java +++ b/src/test/java/io/teknek/gossip/StartupSettingsTest.java @@ -35,9 +35,6 @@ import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - /** * Tests support of using {@code StartupSettings} and thereby reading * setup config from file. diff --git a/src/test/java/io/teknek/gossip/TenNodeThreeSeedTest.java b/src/test/java/io/teknek/gossip/TenNodeThreeSeedTest.java index 0065ade..277d0fe 100644 --- a/src/test/java/io/teknek/gossip/TenNodeThreeSeedTest.java +++ b/src/test/java/io/teknek/gossip/TenNodeThreeSeedTest.java @@ -28,7 +28,6 @@ import java.util.concurrent.TimeUnit; import org.apache.log4j.Logger; -import org.junit.Assert; import org.junit.Test; import com.google.code.gossip.GossipMember;