GOSSIP-52 use one object mapper application wide

This commit is contained in:
Edward Capriolo
2017-02-08 17:58:42 -05:00
parent 296b55fa9f
commit 21a263b079
6 changed files with 41 additions and 49 deletions

View File

@ -56,13 +56,10 @@ import org.apache.log4j.Logger;
import com.codahale.metrics.Gauge; import com.codahale.metrics.Gauge;
import com.codahale.metrics.Meter; import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.MetricRegistry;
import com.fasterxml.jackson.databind.ObjectMapper;
public class GossipCore implements GossipCoreConstants { public class GossipCore implements GossipCoreConstants {
public static final Logger LOGGER = Logger.getLogger(GossipCore.class); public static final Logger LOGGER = Logger.getLogger(GossipCore.class);
private static final ObjectMapper MAPPER = new ObjectMapper();
private final GossipManager gossipManager; private final GossipManager gossipManager;
private ConcurrentHashMap<String, Base> requests; private ConcurrentHashMap<String, Base> requests;
private ThreadPoolExecutor service; private ThreadPoolExecutor service;
@ -73,10 +70,6 @@ public class GossipCore implements GossipCoreConstants {
private final Meter tranmissionException; private final Meter tranmissionException;
private final Meter tranmissionSuccess; private final Meter tranmissionSuccess;
{
MAPPER.enableDefaultTyping();
}
public GossipCore(GossipManager manager, MetricRegistry metrics){ public GossipCore(GossipManager manager, MetricRegistry metrics){
this.gossipManager = manager; this.gossipManager = manager;
requests = new ConcurrentHashMap<>(); requests = new ConcurrentHashMap<>();
@ -214,7 +207,7 @@ public class GossipCore implements GossipCoreConstants {
private void sendInternal(Base message, URI uri){ private void sendInternal(Base message, URI uri){
byte[] json_bytes; byte[] json_bytes;
try { try {
json_bytes = MAPPER.writeValueAsString(message).getBytes(); json_bytes = gossipManager.getObjectMapper().writeValueAsString(message).getBytes();
} catch (IOException e) { } catch (IOException e) {
messageSerdeException.mark(); messageSerdeException.mark();
throw new RuntimeException(e); throw new RuntimeException(e);
@ -292,7 +285,7 @@ public class GossipCore implements GossipCoreConstants {
public void sendOneWay(Base message, URI u){ public void sendOneWay(Base message, URI u){
byte[] json_bytes; byte[] json_bytes;
try { try {
json_bytes = MAPPER.writeValueAsBytes(message); json_bytes = gossipManager.getObjectMapper().writeValueAsBytes(message);
} catch (IOException e) { } catch (IOException e) {
messageSerdeException.mark(); messageSerdeException.mark();
throw new RuntimeException(e); throw new RuntimeException(e);

View File

@ -18,6 +18,7 @@
package org.apache.gossip.manager; package org.apache.gossip.manager;
import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.MetricRegistry;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.lang.reflect.Constructor; import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException; import java.lang.reflect.InvocationTargetException;
@ -69,10 +70,11 @@ public abstract class GossipManager {
private final MetricRegistry registry; private final MetricRegistry registry;
private final RingStatePersister ringState; private final RingStatePersister ringState;
private final UserDataPersister userDataState; private final UserDataPersister userDataState;
private final ObjectMapper objectMapper;
public GossipManager(String cluster, public GossipManager(String cluster,
URI uri, String id, Map<String,String> properties, GossipSettings settings, URI uri, String id, Map<String,String> properties, GossipSettings settings,
List<GossipMember> gossipMembers, GossipListener listener, MetricRegistry registry) { List<GossipMember> gossipMembers, GossipListener listener, MetricRegistry registry, ObjectMapper objectMapper) {
this.settings = settings; this.settings = settings;
gossipCore = new GossipCore(this, registry); gossipCore = new GossipCore(this, registry);
clock = new SystemClock(); clock = new SystemClock();
@ -97,6 +99,7 @@ public abstract class GossipManager {
this.registry = registry; this.registry = registry;
this.ringState = new RingStatePersister(this); this.ringState = new RingStatePersister(this);
this.userDataState = new UserDataPersister(this, this.gossipCore); this.userDataState = new UserDataPersister(this, this.gossipCore);
this.objectMapper = objectMapper;
readSavedRingState(); readSavedRingState();
readSavedDataState(); readSavedDataState();
} }
@ -331,4 +334,8 @@ public abstract class GossipManager {
return clock; return clock;
} }
public ObjectMapper getObjectMapper() {
return objectMapper;
}
} }

View File

@ -28,13 +28,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.gossip.model.Base; import org.apache.gossip.model.Base;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import com.fasterxml.jackson.databind.ObjectMapper;
/** /**
* [The passive thread: reply to incoming gossip request.] This class handles the passive cycle, * This class handles the passive cycle,
* where this client has received an incoming message. For now, this message is always the * where this client has received an incoming message.
* membership list, but if you choose to gossip additional information, you will need some logic to
* determine the incoming message.
*/ */
abstract public class PassiveGossipThread implements Runnable { abstract public class PassiveGossipThread implements Runnable {
@ -42,21 +38,16 @@ abstract public class PassiveGossipThread implements Runnable {
/** The socket used for the passive thread of the gossip service. */ /** The socket used for the passive thread of the gossip service. */
private final DatagramSocket server; private final DatagramSocket server;
private final AtomicBoolean keepRunning; private final AtomicBoolean keepRunning;
private final String cluster;
private final static ObjectMapper MAPPER = new ObjectMapper();
private final GossipCore gossipCore; private final GossipCore gossipCore;
private final GossipManager gossipManager;
{
MAPPER.enableDefaultTyping();
}
public PassiveGossipThread(GossipManager gossipManager, GossipCore gossipCore) { public PassiveGossipThread(GossipManager gossipManager, GossipCore gossipCore) {
this.gossipManager = gossipManager;
this.gossipCore = gossipCore; this.gossipCore = gossipCore;
if (gossipManager.getMyself().getClusterName() == null){
throw new IllegalArgumentException("Cluster was null");
}
try { try {
SocketAddress socketAddress = new InetSocketAddress(gossipManager.getMyself().getUri().getHost(), SocketAddress socketAddress = new InetSocketAddress(gossipManager.getMyself().getUri().getHost(),
gossipManager.getMyself().getUri().getPort()); gossipManager.getMyself().getUri().getPort());
@ -64,10 +55,6 @@ abstract public class PassiveGossipThread implements Runnable {
LOGGER.debug("Gossip service successfully initialized on port " LOGGER.debug("Gossip service successfully initialized on port "
+ gossipManager.getMyself().getUri().getPort()); + gossipManager.getMyself().getUri().getPort());
LOGGER.debug("I am " + gossipManager.getMyself()); LOGGER.debug("I am " + gossipManager.getMyself());
cluster = gossipManager.getMyself().getClusterName();
if (cluster == null){
throw new IllegalArgumentException("cluster was null");
}
} catch (SocketException ex) { } catch (SocketException ex) {
LOGGER.warn(ex); LOGGER.warn(ex);
throw new RuntimeException(ex); throw new RuntimeException(ex);
@ -84,7 +71,7 @@ abstract public class PassiveGossipThread implements Runnable {
server.receive(p); server.receive(p);
debug(p.getData()); debug(p.getData());
try { try {
Base activeGossipMessage = MAPPER.readValue(p.getData(), Base.class); Base activeGossipMessage = gossipManager.getObjectMapper().readValue(p.getData(), Base.class);
gossipCore.receive(activeGossipMessage); gossipCore.receive(activeGossipMessage);
} catch (RuntimeException ex) {//TODO trap json exception } catch (RuntimeException ex) {//TODO trap json exception
LOGGER.error("Unable to process message", ex); LOGGER.error("Unable to process message", ex);

View File

@ -29,15 +29,9 @@ import java.util.NavigableSet;
import org.apache.gossip.LocalGossipMember; import org.apache.gossip.LocalGossipMember;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
public class RingStatePersister implements Runnable { public class RingStatePersister implements Runnable {
private static final Logger LOGGER = Logger.getLogger(RingStatePersister.class); private static final Logger LOGGER = Logger.getLogger(RingStatePersister.class);
private static final ObjectMapper MAPPER = new ObjectMapper();
private static final TypeReference<ArrayList<LocalGossipMember>> REF
= new TypeReference<ArrayList<LocalGossipMember>>() { };
private GossipManager parent; private GossipManager parent;
public RingStatePersister(GossipManager parent){ public RingStatePersister(GossipManager parent){
@ -60,18 +54,19 @@ public class RingStatePersister implements Runnable {
} }
NavigableSet<LocalGossipMember> i = parent.getMembers().keySet(); NavigableSet<LocalGossipMember> i = parent.getMembers().keySet();
try (FileOutputStream fos = new FileOutputStream(computeTarget())){ try (FileOutputStream fos = new FileOutputStream(computeTarget())){
MAPPER.writeValue(fos, i); parent.getObjectMapper().writeValue(fos, i);
} catch (IOException e) { } catch (IOException e) {
LOGGER.debug(e); LOGGER.debug(e);
} }
} }
@SuppressWarnings("unchecked")
List<LocalGossipMember> readFromDisk(){ List<LocalGossipMember> readFromDisk(){
if (!parent.getSettings().isPersistRingState()){ if (!parent.getSettings().isPersistRingState()){
return Collections.emptyList(); return Collections.emptyList();
} }
try (FileInputStream fos = new FileInputStream(computeTarget())){ try (FileInputStream fos = new FileInputStream(computeTarget())){
return MAPPER.readValue(fos, REF); return parent.getObjectMapper().readValue(fos, ArrayList.class);
} catch (IOException e) { } catch (IOException e) {
LOGGER.debug(e); LOGGER.debug(e);
} }

View File

@ -32,14 +32,12 @@ import com.fasterxml.jackson.databind.ObjectMapper;
public class UserDataPersister implements Runnable { public class UserDataPersister implements Runnable {
private static final Logger LOGGER = Logger.getLogger(UserDataPersister.class); private static final Logger LOGGER = Logger.getLogger(UserDataPersister.class);
private static final ObjectMapper MAPPER = new ObjectMapper();
private final GossipManager parent; private final GossipManager parent;
private final GossipCore gossipCore; private final GossipCore gossipCore;
UserDataPersister(GossipManager parent, GossipCore gossipCore){ UserDataPersister(GossipManager parent, GossipCore gossipCore){
this.parent = parent; this.parent = parent;
this.gossipCore = gossipCore; this.gossipCore = gossipCore;
MAPPER.enableDefaultTyping();
} }
File computeSharedTarget(){ File computeSharedTarget(){
@ -58,7 +56,7 @@ public class UserDataPersister implements Runnable {
return new ConcurrentHashMap<String, ConcurrentHashMap<String, GossipDataMessage>>(); return new ConcurrentHashMap<String, ConcurrentHashMap<String, GossipDataMessage>>();
} }
try (FileInputStream fos = new FileInputStream(computePerNodeTarget())){ try (FileInputStream fos = new FileInputStream(computePerNodeTarget())){
return MAPPER.readValue(fos, ConcurrentHashMap.class); return parent.getObjectMapper().readValue(fos, ConcurrentHashMap.class);
} catch (IOException e) { } catch (IOException e) {
LOGGER.debug(e); LOGGER.debug(e);
} }
@ -70,7 +68,7 @@ public class UserDataPersister implements Runnable {
return; return;
} }
try (FileOutputStream fos = new FileOutputStream(computePerNodeTarget())){ try (FileOutputStream fos = new FileOutputStream(computePerNodeTarget())){
MAPPER.writeValue(fos, gossipCore.getPerNodeData()); parent.getObjectMapper().writeValue(fos, gossipCore.getPerNodeData());
} catch (IOException e) { } catch (IOException e) {
LOGGER.warn(e); LOGGER.warn(e);
} }
@ -81,7 +79,7 @@ public class UserDataPersister implements Runnable {
return; return;
} }
try (FileOutputStream fos = new FileOutputStream(computeSharedTarget())){ try (FileOutputStream fos = new FileOutputStream(computeSharedTarget())){
MAPPER.writeValue(fos, gossipCore.getSharedData()); parent.getObjectMapper().writeValue(fos, gossipCore.getSharedData());
} catch (IOException e) { } catch (IOException e) {
LOGGER.warn(e); LOGGER.warn(e);
} }
@ -93,7 +91,7 @@ public class UserDataPersister implements Runnable {
return new ConcurrentHashMap<String, SharedGossipDataMessage>(); return new ConcurrentHashMap<String, SharedGossipDataMessage>();
} }
try (FileInputStream fos = new FileInputStream(computeSharedTarget())){ try (FileInputStream fos = new FileInputStream(computeSharedTarget())){
return MAPPER.readValue(fos, ConcurrentHashMap.class); return parent.getObjectMapper().readValue(fos, ConcurrentHashMap.class);
} catch (IOException e) { } catch (IOException e) {
LOGGER.debug(e); LOGGER.debug(e);
} }

View File

@ -18,6 +18,8 @@
package org.apache.gossip.manager.random; package org.apache.gossip.manager.random;
import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.MetricRegistry;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.gossip.GossipMember; import org.apache.gossip.GossipMember;
import org.apache.gossip.GossipSettings; import org.apache.gossip.GossipSettings;
import org.apache.gossip.event.GossipListener; import org.apache.gossip.event.GossipListener;
@ -44,6 +46,7 @@ public class RandomGossipManager extends GossipManager {
private GossipListener listener; private GossipListener listener;
private MetricRegistry registry; private MetricRegistry registry;
private Map<String,String> properties; private Map<String,String> properties;
private ObjectMapper objectMapper;
private ManagerBuilder() {} private ManagerBuilder() {}
@ -93,6 +96,11 @@ public class RandomGossipManager extends GossipManager {
return this; return this;
} }
public ManagerBuilder mapper(ObjectMapper objectMapper){
this.objectMapper = objectMapper;
return this;
}
public RandomGossipManager build() { public RandomGossipManager build() {
checkArgument(id != null, "You must specify an id"); checkArgument(id != null, "You must specify an id");
checkArgument(cluster != null, "You must specify a cluster name"); checkArgument(cluster != null, "You must specify a cluster name");
@ -108,12 +116,16 @@ public class RandomGossipManager extends GossipManager {
if (gossipMembers == null) { if (gossipMembers == null) {
gossipMembers = new ArrayList<>(); gossipMembers = new ArrayList<>();
} }
return new RandomGossipManager(cluster, uri, id, properties, settings, gossipMembers, listener, registry); if (objectMapper == null) {
objectMapper = new ObjectMapper();
objectMapper.enableDefaultTyping();
}
return new RandomGossipManager(cluster, uri, id, properties, settings, gossipMembers, listener, registry, objectMapper);
} }
} }
private RandomGossipManager(String cluster, URI uri, String id, Map<String,String> properties, GossipSettings settings, private RandomGossipManager(String cluster, URI uri, String id, Map<String,String> properties, GossipSettings settings,
List<GossipMember> gossipMembers, GossipListener listener, MetricRegistry registry) { List<GossipMember> gossipMembers, GossipListener listener, MetricRegistry registry, ObjectMapper objectMapper) {
super(cluster, uri, id, properties, settings, gossipMembers, listener, registry); super(cluster, uri, id, properties, settings, gossipMembers, listener, registry, objectMapper);
} }
} }