diff --git a/src/main/java/org/apache/gossip/manager/GossipCore.java b/src/main/java/org/apache/gossip/manager/GossipCore.java index 82d65fe..e23ee54 100644 --- a/src/main/java/org/apache/gossip/manager/GossipCore.java +++ b/src/main/java/org/apache/gossip/manager/GossipCore.java @@ -56,13 +56,10 @@ import org.apache.log4j.Logger; import com.codahale.metrics.Gauge; import com.codahale.metrics.Meter; import com.codahale.metrics.MetricRegistry; -import com.fasterxml.jackson.databind.ObjectMapper; - public class GossipCore implements GossipCoreConstants { public static final Logger LOGGER = Logger.getLogger(GossipCore.class); - private static final ObjectMapper MAPPER = new ObjectMapper(); private final GossipManager gossipManager; private ConcurrentHashMap requests; private ThreadPoolExecutor service; @@ -72,11 +69,7 @@ public class GossipCore implements GossipCoreConstants { private final Meter messageSerdeException; private final Meter tranmissionException; private final Meter tranmissionSuccess; - - { - MAPPER.enableDefaultTyping(); - } - + public GossipCore(GossipManager manager, MetricRegistry metrics){ this.gossipManager = manager; requests = new ConcurrentHashMap<>(); @@ -214,7 +207,7 @@ public class GossipCore implements GossipCoreConstants { private void sendInternal(Base message, URI uri){ byte[] json_bytes; try { - json_bytes = MAPPER.writeValueAsString(message).getBytes(); + json_bytes = gossipManager.getObjectMapper().writeValueAsString(message).getBytes(); } catch (IOException e) { messageSerdeException.mark(); throw new RuntimeException(e); @@ -292,7 +285,7 @@ public class GossipCore implements GossipCoreConstants { public void sendOneWay(Base message, URI u){ byte[] json_bytes; try { - json_bytes = MAPPER.writeValueAsBytes(message); + json_bytes = gossipManager.getObjectMapper().writeValueAsBytes(message); } catch (IOException e) { messageSerdeException.mark(); throw new RuntimeException(e); diff --git a/src/main/java/org/apache/gossip/manager/GossipManager.java b/src/main/java/org/apache/gossip/manager/GossipManager.java index 53ed8c7..67cb06b 100644 --- a/src/main/java/org/apache/gossip/manager/GossipManager.java +++ b/src/main/java/org/apache/gossip/manager/GossipManager.java @@ -18,6 +18,7 @@ package org.apache.gossip.manager; import com.codahale.metrics.MetricRegistry; +import com.fasterxml.jackson.databind.ObjectMapper; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; @@ -69,10 +70,11 @@ public abstract class GossipManager { private final MetricRegistry registry; private final RingStatePersister ringState; private final UserDataPersister userDataState; + private final ObjectMapper objectMapper; public GossipManager(String cluster, URI uri, String id, Map properties, GossipSettings settings, - List gossipMembers, GossipListener listener, MetricRegistry registry) { + List gossipMembers, GossipListener listener, MetricRegistry registry, ObjectMapper objectMapper) { this.settings = settings; gossipCore = new GossipCore(this, registry); clock = new SystemClock(); @@ -97,6 +99,7 @@ public abstract class GossipManager { this.registry = registry; this.ringState = new RingStatePersister(this); this.userDataState = new UserDataPersister(this, this.gossipCore); + this.objectMapper = objectMapper; readSavedRingState(); readSavedDataState(); } @@ -330,5 +333,9 @@ public abstract class GossipManager { public Clock getClock() { return clock; } + + public ObjectMapper getObjectMapper() { + return objectMapper; + } } diff --git a/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java b/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java index 47b8a8f..51cf264 100644 --- a/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java +++ b/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java @@ -28,13 +28,9 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.gossip.model.Base; import org.apache.log4j.Logger; -import com.fasterxml.jackson.databind.ObjectMapper; - /** - * [The passive thread: reply to incoming gossip request.] This class handles the passive cycle, - * where this client has received an incoming message. For now, this message is always the - * membership list, but if you choose to gossip additional information, you will need some logic to - * determine the incoming message. + * This class handles the passive cycle, + * where this client has received an incoming message. */ abstract public class PassiveGossipThread implements Runnable { @@ -42,21 +38,16 @@ abstract public class PassiveGossipThread implements Runnable { /** The socket used for the passive thread of the gossip service. */ private final DatagramSocket server; - private final AtomicBoolean keepRunning; - - private final String cluster; - - private final static ObjectMapper MAPPER = new ObjectMapper(); - private final GossipCore gossipCore; - - { - MAPPER.enableDefaultTyping(); - } + private final GossipManager gossipManager; public PassiveGossipThread(GossipManager gossipManager, GossipCore gossipCore) { + this.gossipManager = gossipManager; this.gossipCore = gossipCore; + if (gossipManager.getMyself().getClusterName() == null){ + throw new IllegalArgumentException("Cluster was null"); + } try { SocketAddress socketAddress = new InetSocketAddress(gossipManager.getMyself().getUri().getHost(), gossipManager.getMyself().getUri().getPort()); @@ -64,10 +55,6 @@ abstract public class PassiveGossipThread implements Runnable { LOGGER.debug("Gossip service successfully initialized on port " + gossipManager.getMyself().getUri().getPort()); LOGGER.debug("I am " + gossipManager.getMyself()); - cluster = gossipManager.getMyself().getClusterName(); - if (cluster == null){ - throw new IllegalArgumentException("cluster was null"); - } } catch (SocketException ex) { LOGGER.warn(ex); throw new RuntimeException(ex); @@ -84,7 +71,7 @@ abstract public class PassiveGossipThread implements Runnable { server.receive(p); debug(p.getData()); try { - Base activeGossipMessage = MAPPER.readValue(p.getData(), Base.class); + Base activeGossipMessage = gossipManager.getObjectMapper().readValue(p.getData(), Base.class); gossipCore.receive(activeGossipMessage); } catch (RuntimeException ex) {//TODO trap json exception LOGGER.error("Unable to process message", ex); diff --git a/src/main/java/org/apache/gossip/manager/RingStatePersister.java b/src/main/java/org/apache/gossip/manager/RingStatePersister.java index 24b464a..6f724e0 100644 --- a/src/main/java/org/apache/gossip/manager/RingStatePersister.java +++ b/src/main/java/org/apache/gossip/manager/RingStatePersister.java @@ -29,15 +29,9 @@ import java.util.NavigableSet; import org.apache.gossip.LocalGossipMember; import org.apache.log4j.Logger; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; - public class RingStatePersister implements Runnable { private static final Logger LOGGER = Logger.getLogger(RingStatePersister.class); - private static final ObjectMapper MAPPER = new ObjectMapper(); - private static final TypeReference> REF - = new TypeReference>() { }; private GossipManager parent; public RingStatePersister(GossipManager parent){ @@ -60,18 +54,19 @@ public class RingStatePersister implements Runnable { } NavigableSet i = parent.getMembers().keySet(); try (FileOutputStream fos = new FileOutputStream(computeTarget())){ - MAPPER.writeValue(fos, i); + parent.getObjectMapper().writeValue(fos, i); } catch (IOException e) { LOGGER.debug(e); } } + @SuppressWarnings("unchecked") List readFromDisk(){ if (!parent.getSettings().isPersistRingState()){ return Collections.emptyList(); } try (FileInputStream fos = new FileInputStream(computeTarget())){ - return MAPPER.readValue(fos, REF); + return parent.getObjectMapper().readValue(fos, ArrayList.class); } catch (IOException e) { LOGGER.debug(e); } diff --git a/src/main/java/org/apache/gossip/manager/UserDataPersister.java b/src/main/java/org/apache/gossip/manager/UserDataPersister.java index c67677a..2a123e3 100644 --- a/src/main/java/org/apache/gossip/manager/UserDataPersister.java +++ b/src/main/java/org/apache/gossip/manager/UserDataPersister.java @@ -32,14 +32,12 @@ import com.fasterxml.jackson.databind.ObjectMapper; public class UserDataPersister implements Runnable { private static final Logger LOGGER = Logger.getLogger(UserDataPersister.class); - private static final ObjectMapper MAPPER = new ObjectMapper(); private final GossipManager parent; private final GossipCore gossipCore; UserDataPersister(GossipManager parent, GossipCore gossipCore){ this.parent = parent; this.gossipCore = gossipCore; - MAPPER.enableDefaultTyping(); } File computeSharedTarget(){ @@ -58,7 +56,7 @@ public class UserDataPersister implements Runnable { return new ConcurrentHashMap>(); } try (FileInputStream fos = new FileInputStream(computePerNodeTarget())){ - return MAPPER.readValue(fos, ConcurrentHashMap.class); + return parent.getObjectMapper().readValue(fos, ConcurrentHashMap.class); } catch (IOException e) { LOGGER.debug(e); } @@ -70,7 +68,7 @@ public class UserDataPersister implements Runnable { return; } try (FileOutputStream fos = new FileOutputStream(computePerNodeTarget())){ - MAPPER.writeValue(fos, gossipCore.getPerNodeData()); + parent.getObjectMapper().writeValue(fos, gossipCore.getPerNodeData()); } catch (IOException e) { LOGGER.warn(e); } @@ -81,7 +79,7 @@ public class UserDataPersister implements Runnable { return; } try (FileOutputStream fos = new FileOutputStream(computeSharedTarget())){ - MAPPER.writeValue(fos, gossipCore.getSharedData()); + parent.getObjectMapper().writeValue(fos, gossipCore.getSharedData()); } catch (IOException e) { LOGGER.warn(e); } @@ -93,7 +91,7 @@ public class UserDataPersister implements Runnable { return new ConcurrentHashMap(); } try (FileInputStream fos = new FileInputStream(computeSharedTarget())){ - return MAPPER.readValue(fos, ConcurrentHashMap.class); + return parent.getObjectMapper().readValue(fos, ConcurrentHashMap.class); } catch (IOException e) { LOGGER.debug(e); } diff --git a/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java b/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java index 4a150be..00e3378 100644 --- a/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java +++ b/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java @@ -18,6 +18,8 @@ package org.apache.gossip.manager.random; import com.codahale.metrics.MetricRegistry; +import com.fasterxml.jackson.databind.ObjectMapper; + import org.apache.gossip.GossipMember; import org.apache.gossip.GossipSettings; import org.apache.gossip.event.GossipListener; @@ -44,6 +46,7 @@ public class RandomGossipManager extends GossipManager { private GossipListener listener; private MetricRegistry registry; private Map properties; + private ObjectMapper objectMapper; private ManagerBuilder() {} @@ -93,6 +96,11 @@ public class RandomGossipManager extends GossipManager { return this; } + public ManagerBuilder mapper(ObjectMapper objectMapper){ + this.objectMapper = objectMapper; + return this; + } + public RandomGossipManager build() { checkArgument(id != null, "You must specify an id"); checkArgument(cluster != null, "You must specify a cluster name"); @@ -108,12 +116,16 @@ public class RandomGossipManager extends GossipManager { if (gossipMembers == null) { gossipMembers = new ArrayList<>(); } - return new RandomGossipManager(cluster, uri, id, properties, settings, gossipMembers, listener, registry); + if (objectMapper == null) { + objectMapper = new ObjectMapper(); + objectMapper.enableDefaultTyping(); + } + return new RandomGossipManager(cluster, uri, id, properties, settings, gossipMembers, listener, registry, objectMapper); } } private RandomGossipManager(String cluster, URI uri, String id, Map properties, GossipSettings settings, - List gossipMembers, GossipListener listener, MetricRegistry registry) { - super(cluster, uri, id, properties, settings, gossipMembers, listener, registry); + List gossipMembers, GossipListener listener, MetricRegistry registry, ObjectMapper objectMapper) { + super(cluster, uri, id, properties, settings, gossipMembers, listener, registry, objectMapper); } }