From 4ca9b82ae442ff094e6a0160b6a6ea2aa63a3770 Mon Sep 17 00:00:00 2001 From: Edward Capriolo Date: Mon, 16 May 2016 18:54:17 -0400 Subject: [PATCH] use jackson --- .../gossip/manager/PassiveGossipThread.java | 57 ++++++++--------- .../impl/SendMembersActiveGossipThread.java | 25 ++++++-- .../gossip/model/ActiveGossipMessage.java | 22 +++++++ .../code/gossip/model/GossipMember.java | 63 +++++++++++++++++++ 4 files changed, 129 insertions(+), 38 deletions(-) create mode 100644 src/main/java/com/google/code/gossip/model/ActiveGossipMessage.java create mode 100644 src/main/java/com/google/code/gossip/model/GossipMember.java diff --git a/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java b/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java index b05a780..5abb39f 100644 --- a/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java +++ b/src/main/java/com/google/code/gossip/manager/PassiveGossipThread.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.log4j.Logger; +import org.codehaus.jackson.map.ObjectMapper; import org.json.JSONArray; import org.json.JSONException; import org.json.JSONObject; @@ -35,6 +36,7 @@ import org.json.JSONObject; import com.google.code.gossip.GossipMember; import com.google.code.gossip.GossipService; import com.google.code.gossip.RemoteGossipMember; +import com.google.code.gossip.model.ActiveGossipMessage; /** * [The passive thread: reply to incoming gossip request.] This class handles the passive cycle, @@ -54,6 +56,8 @@ abstract public class PassiveGossipThread implements Runnable { private AtomicBoolean keepRunning; private final String cluster; + + private ObjectMapper MAPPER = new ObjectMapper(); public PassiveGossipThread(GossipManager gossipManager) { this.gossipManager = gossipManager; @@ -93,46 +97,35 @@ abstract public class PassiveGossipThread implements Runnable { json_bytes[i] = buf[i + 4]; } String receivedMessage = new String(json_bytes); - GossipService.LOGGER.debug("Received message (" + packet_length + " bytes): " + GossipService.LOGGER.warn("Received message (" + packet_length + " bytes): " + receivedMessage); try { List remoteGossipMembers = new ArrayList<>(); RemoteGossipMember senderMember = null; - JSONArray jsonArray = new JSONArray(receivedMessage); - for (int i = 0; i < jsonArray.length(); i++) { - JSONObject memberJSONObject = jsonArray.getJSONObject(i); - if (memberJSONObject.length() == 5 - && cluster.equals(memberJSONObject.get(GossipMember.JSON_CLUSTER))) { - RemoteGossipMember member = new RemoteGossipMember( - memberJSONObject.getString(GossipMember.JSON_CLUSTER), - memberJSONObject.getString(GossipMember.JSON_HOST), - memberJSONObject.getInt(GossipMember.JSON_PORT), - memberJSONObject.getString(GossipMember.JSON_ID), - memberJSONObject.getLong(GossipMember.JSON_HEARTBEAT)); - GossipService.LOGGER.debug(member.toString()); - // This is the first member found, so this should be the member who is communicating - // with me. - if (i == 0) { - senderMember = member; - } - remoteGossipMembers.add(member); - } else if (memberJSONObject.length() == 5) { - GossipService.LOGGER.warn("The member object does not belong to this cluster."); - } else { - GossipService.LOGGER - .error("The received member object does not contain 5 objects:\n" - + memberJSONObject.toString()); + ActiveGossipMessage activeGossipMessage = MAPPER.readValue(json_bytes, + ActiveGossipMessage.class); + for (int i = 0; i < activeGossipMessage.getMembers().size(); i++) { + RemoteGossipMember member = new RemoteGossipMember( + activeGossipMessage.getMembers().get(i).getCluster(), + activeGossipMessage.getMembers().get(i).getHost(), + activeGossipMessage.getMembers().get(i).getPort(), + activeGossipMessage.getMembers().get(i).getId(), + activeGossipMessage.getMembers().get(i).getHeartbeat()); + if (!(member.getClusterName().equals(cluster))){ + GossipService.LOGGER.warn("Note a member of this cluster " + i); + continue; } - + // This is the first member found, so this should be the member who is communicating + // with me. + if (i == 0) { + senderMember = member; + } + remoteGossipMembers.add(member); } mergeLists(gossipManager, senderMember, remoteGossipMembers); - } catch (JSONException e) { - GossipService.LOGGER - .error("The received message is not well-formed JSON. The following message has been dropped:\n" - + receivedMessage); - System.out.println(e); + } catch (RuntimeException ex) { + GossipService.LOGGER.error("Unable to process message", ex); } - } else { GossipService.LOGGER .error("The received message is not of the expected size, it has been dropped."); diff --git a/src/main/java/com/google/code/gossip/manager/impl/SendMembersActiveGossipThread.java b/src/main/java/com/google/code/gossip/manager/impl/SendMembersActiveGossipThread.java index 4e5f855..2259781 100644 --- a/src/main/java/com/google/code/gossip/manager/impl/SendMembersActiveGossipThread.java +++ b/src/main/java/com/google/code/gossip/manager/impl/SendMembersActiveGossipThread.java @@ -24,19 +24,33 @@ import java.net.InetAddress; import java.nio.ByteBuffer; import java.util.List; -import org.json.JSONArray; +import org.codehaus.jackson.map.ObjectMapper; import com.google.code.gossip.GossipService; import com.google.code.gossip.LocalGossipMember; import com.google.code.gossip.manager.ActiveGossipThread; import com.google.code.gossip.manager.GossipManager; +import com.google.code.gossip.model.ActiveGossipMessage; +import com.google.code.gossip.model.GossipMember; abstract public class SendMembersActiveGossipThread extends ActiveGossipThread { + protected ObjectMapper om = new ObjectMapper(); + public SendMembersActiveGossipThread(GossipManager gossipManager) { super(gossipManager); } + private GossipMember convert(LocalGossipMember member){ + GossipMember gm = new GossipMember(); + gm.setCluster(member.getClusterName()); + gm.setHeartbeat(member.getHeartbeat()); + gm.setHost(member.getHost()); + gm.setId(member.getId()); + gm.setPort(member.getPort()); + return gm; + } + /** * Performs the sending of the membership list, after we have incremented our own heartbeat. */ @@ -50,13 +64,12 @@ abstract public class SendMembersActiveGossipThread extends ActiveGossipThread { try (DatagramSocket socket = new DatagramSocket()) { socket.setSoTimeout(gossipManager.getSettings().getGossipInterval()); InetAddress dest = InetAddress.getByName(member.getHost()); - JSONArray jsonArray = new JSONArray(); - jsonArray.put(me.toJSONObject()); + ActiveGossipMessage message = new ActiveGossipMessage(); + message.getMembers().add(convert(me)); for (LocalGossipMember other : memberList) { - jsonArray.put(other.toJSONObject()); - GossipService.LOGGER.debug(other); + message.getMembers().add(convert(other)); } - byte[] json_bytes = jsonArray.toString().getBytes(); + byte[] json_bytes = om.writeValueAsString(message).getBytes(); int packet_length = json_bytes.length; if (packet_length < GossipManager.MAX_PACKET_SIZE) { byte[] buf = createBuffer(packet_length, json_bytes); diff --git a/src/main/java/com/google/code/gossip/model/ActiveGossipMessage.java b/src/main/java/com/google/code/gossip/model/ActiveGossipMessage.java new file mode 100644 index 0000000..d3516f5 --- /dev/null +++ b/src/main/java/com/google/code/gossip/model/ActiveGossipMessage.java @@ -0,0 +1,22 @@ +package com.google.code.gossip.model; + +import java.util.ArrayList; +import java.util.List; + +public class ActiveGossipMessage { + + private List members = new ArrayList<>(); + + public ActiveGossipMessage(){ + + } + + public List getMembers() { + return members; + } + + public void setMembers(List members) { + this.members = members; + } + +} diff --git a/src/main/java/com/google/code/gossip/model/GossipMember.java b/src/main/java/com/google/code/gossip/model/GossipMember.java new file mode 100644 index 0000000..6c073b4 --- /dev/null +++ b/src/main/java/com/google/code/gossip/model/GossipMember.java @@ -0,0 +1,63 @@ +package com.google.code.gossip.model; + +public class GossipMember { + + private String cluster; + private String host; + private Integer port; + private String id; + private Long heartbeat; + + public GossipMember(){ + + } + + public GossipMember(String cluster, String host, Integer port, String id, Long heartbeat){ + this.cluster=cluster; + this.host= host; + this.port = port; + this.id = id; + + } + + public String getCluster() { + return cluster; + } + + public void setCluster(String cluster) { + this.cluster = cluster; + } + + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } + + public Integer getPort() { + return port; + } + + public void setPort(Integer port) { + this.port = port; + } + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public Long getHeartbeat() { + return heartbeat; + } + + public void setHeartbeat(Long heartbeat) { + this.heartbeat = heartbeat; + } + +}