GOSSIP-79 Isolate UDP and JSON code

With these changes, it should now be possible to create alternate serialization (e.g. Gson or native) and transports (like HTTP).

To make this PR reviewable I decided against creating new modules right now. That can be done subsequently in another PR that doesn't modify any code.

* Creates two new interfaces: `TransportManager` and `ProtocolManager`
  * Implementation classes must honor a common constructor interface
* Includes UDP and Jackson implementations of those.
* `AbstractTransportManager` has a lot of boilerplate that includes:
  * starting the active gossiper, and
  * starting the passive gossiper.

I spent some time trying to polish the implementations to become less dependent on references to `GossipManager`. I still feel there is a lot of room for improvement.
This commit is contained in:
Gary Dusbabek
2017-04-17 12:37:42 -05:00
parent c544b8bf16
commit 851cd93e67
18 changed files with 614 additions and 291 deletions

View File

@ -45,6 +45,9 @@ public class GossipSettings {
private String activeGossipClass = "org.apache.gossip.manager.SimpleActiveGossipper";
private String transportManagerClass = "org.apache.gossip.transport.UdpTransportManager";
private String protocolManagerClass = "org.apache.gossip.protocol.JacksonProtocolManager";
private Map<String,String> activeGossipProperties = new HashMap<>();
private String pathToRingState = "./";
@ -222,5 +225,12 @@ public class GossipSettings {
public void setSignMessages(boolean signMessages) {
this.signMessages = signMessages;
}
public String getTransportManagerClass() {
return transportManagerClass;
}
public String getProtocolManagerClass() {
return protocolManagerClass;
}
}

View File

@ -29,16 +29,8 @@ import org.apache.gossip.model.*;
import org.apache.gossip.udp.Trackable;
import org.apache.log4j.Logger;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.URI;
import java.security.*;
import java.security.spec.InvalidKeySpecException;
import java.security.spec.PKCS8EncodedKeySpec;
import java.util.List;
import java.util.Map.Entry;
import java.util.concurrent.*;
@ -60,8 +52,6 @@ public class GossipCore implements GossipCoreConstants {
private final ConcurrentHashMap<String, ConcurrentHashMap<String, PerNodeDataMessage>> perNodeData;
private final ConcurrentHashMap<String, SharedDataMessage> sharedData;
private final BlockingQueue<Runnable> workQueue;
private final PKCS8EncodedKeySpec privKeySpec;
private final PrivateKey privKey;
private final Meter messageSerdeException;
private final Meter tranmissionException;
private final Meter tranmissionSuccess;
@ -79,42 +69,6 @@ public class GossipCore implements GossipCoreConstants {
messageSerdeException = metrics.meter(MESSAGE_SERDE_EXCEPTION);
tranmissionException = metrics.meter(MESSAGE_TRANSMISSION_EXCEPTION);
tranmissionSuccess = metrics.meter(MESSAGE_TRANSMISSION_SUCCESS);
if (manager.getSettings().isSignMessages()){
File privateKey = new File(manager.getSettings().getPathToKeyStore(), manager.getMyself().getId());
File publicKey = new File(manager.getSettings().getPathToKeyStore(), manager.getMyself().getId() + ".pub");
if (!privateKey.exists()){
throw new IllegalArgumentException("private key not found " + privateKey);
}
if (!publicKey.exists()){
throw new IllegalArgumentException("public key not found " + publicKey);
}
try (FileInputStream keyfis = new FileInputStream(privateKey)) {
byte[] encKey = new byte[keyfis.available()];
keyfis.read(encKey);
keyfis.close();
privKeySpec = new PKCS8EncodedKeySpec(encKey);
KeyFactory keyFactory = KeyFactory.getInstance("DSA");
privKey = keyFactory.generatePrivate(privKeySpec);
} catch (NoSuchAlgorithmException | InvalidKeySpecException | IOException e) {
throw new RuntimeException("failed hard", e);
}
} else {
privKeySpec = null;
privKey = null;
}
}
private byte [] sign(byte [] bytes){
Signature dsa;
try {
dsa = Signature.getInstance("SHA1withDSA", "SUN");
dsa.initSign(privKey);
dsa.update(bytes);
return dsa.sign();
} catch (NoSuchAlgorithmException | NoSuchProviderException | InvalidKeyException | SignatureException e) {
throw new RuntimeException(e);
}
}
@SuppressWarnings({ "unchecked", "rawtypes" })
@ -184,30 +138,21 @@ public class GossipCore implements GossipCoreConstants {
/**
* Sends a blocking message.
* todo: move functionality to TransportManager layer.
* @param message
* @param uri
* @throws RuntimeException if data can not be serialized or in transmission error
*/
private void sendInternal(Base message, URI uri){
private void sendInternal(Base message, URI uri) {
byte[] json_bytes;
try {
if (privKey == null){
json_bytes = gossipManager.getObjectMapper().writeValueAsBytes(message);
} else {
SignedPayload p = new SignedPayload();
p.setData(gossipManager.getObjectMapper().writeValueAsString(message).getBytes());
p.setSignature(sign(p.getData()));
json_bytes = gossipManager.getObjectMapper().writeValueAsBytes(p);
}
json_bytes = gossipManager.getProtocolManager().write(message);
} catch (IOException e) {
messageSerdeException.mark();
throw new RuntimeException(e);
}
try (DatagramSocket socket = new DatagramSocket()) {
socket.setSoTimeout(gossipManager.getSettings().getGossipInterval() * 2);
InetAddress dest = InetAddress.getByName(uri.getHost());
DatagramPacket datagramPacket = new DatagramPacket(json_bytes, json_bytes.length, dest, uri.getPort());
socket.send(datagramPacket);
try {
gossipManager.getTransportManager().send(uri, json_bytes);
tranmissionSuccess.mark();
} catch (IOException e) {
tranmissionException.mark();

View File

@ -18,6 +18,7 @@
package org.apache.gossip.manager;
import com.codahale.metrics.MetricRegistry;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.gossip.GossipSettings;
import org.apache.gossip.LocalMember;
@ -26,13 +27,14 @@ import org.apache.gossip.crdt.Crdt;
import org.apache.gossip.event.GossipListener;
import org.apache.gossip.event.GossipState;
import org.apache.gossip.manager.handlers.MessageHandler;
import org.apache.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread;
import org.apache.gossip.model.PerNodeDataMessage;
import org.apache.gossip.model.SharedDataMessage;
import org.apache.gossip.protocol.ProtocolManager;
import org.apache.gossip.transport.TransportManager;
import org.apache.gossip.utils.ReflectionUtils;
import org.apache.log4j.Logger;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.io.File;
import java.net.URI;
import java.util.Collections;
import java.util.List;
@ -46,14 +48,21 @@ import java.util.stream.Collectors;
public abstract class GossipManager {
public static final Logger LOGGER = Logger.getLogger(GossipManager.class);
// this mapper is used for ring and user-data persistence only. NOT messages.
public static final ObjectMapper metdataObjectMapper = new ObjectMapper() {{
enableDefaultTyping();
configure(JsonGenerator.Feature.WRITE_NUMBERS_AS_STRINGS, false);
}};
private final ConcurrentSkipListMap<LocalMember, GossipState> members;
private final LocalMember me;
private final GossipSettings settings;
private final AtomicBoolean gossipServiceRunning;
private AbstractActiveGossiper activeGossipThread;
private PassiveGossipThread passiveGossipThread;
private ExecutorService gossipThreadExecutor;
private TransportManager transportManager;
private ProtocolManager protocolManager;
private final GossipCore gossipCore;
private final DataReaper dataReaper;
private final Clock clock;
@ -62,14 +71,13 @@ public abstract class GossipManager {
private final RingStatePersister ringState;
private final UserDataPersister userDataState;
private final GossipMemberStateRefresher memberStateRefresher;
private final ObjectMapper objectMapper;
private final MessageHandler messageHandler;
public GossipManager(String cluster,
URI uri, String id, Map<String, String> properties, GossipSettings settings,
List<Member> gossipMembers, GossipListener listener, MetricRegistry registry,
ObjectMapper objectMapper, MessageHandler messageHandler) {
MessageHandler messageHandler) {
this.settings = settings;
this.messageHandler = messageHandler;
@ -89,14 +97,15 @@ public abstract class GossipManager {
members.put(member, GossipState.DOWN);
}
}
gossipThreadExecutor = Executors.newCachedThreadPool();
gossipServiceRunning = new AtomicBoolean(true);
this.scheduledServiced = Executors.newScheduledThreadPool(1);
this.registry = registry;
this.ringState = new RingStatePersister(this);
this.userDataState = new UserDataPersister(this, this.gossipCore);
this.ringState = new RingStatePersister(GossipManager.buildRingStatePath(this), this);
this.userDataState = new UserDataPersister(
gossipCore,
GossipManager.buildPerNodeDataPath(this),
GossipManager.buildSharedDataPath(this));
this.memberStateRefresher = new GossipMemberStateRefresher(members, settings, listener, this::findPerNodeGossipData);
this.objectMapper = objectMapper;
readSavedRingState();
readSavedDataState();
}
@ -140,49 +149,66 @@ public abstract class GossipManager {
return me;
}
private AbstractActiveGossiper constructActiveGossiper(){
try {
Constructor<?> c = Class.forName(settings.getActiveGossipClass()).getConstructor(GossipManager.class, GossipCore.class, MetricRegistry.class);
return (AbstractActiveGossiper) c.newInstance(this, gossipCore, registry);
} catch (NoSuchMethodException | SecurityException | ClassNotFoundException | InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
throw new RuntimeException(e);
}
}
/**
* Starts the client. Specifically, start the various cycles for this protocol. Start the gossip
* thread and start the receiver thread.
*/
public void init() {
passiveGossipThread = new OnlyProcessReceivedPassiveGossipThread(this, gossipCore);
gossipThreadExecutor.execute(passiveGossipThread);
activeGossipThread = constructActiveGossiper();
activeGossipThread.init();
// protocol manager and transport managers are specified in settings.
// construct them here via reflection.
protocolManager = ReflectionUtils.constructWithReflection(
settings.getProtocolManagerClass(),
new Class<?>[] { GossipSettings.class, String.class, MetricRegistry.class },
new Object[] { settings, me.getId(), this.getRegistry() }
);
transportManager = ReflectionUtils.constructWithReflection(
settings.getTransportManagerClass(),
new Class<?>[] { GossipManager.class, GossipCore.class},
new Object[] { this, gossipCore }
);
// start processing gossip messages.
transportManager.startEndpoint();
transportManager.startActiveGossiper();
dataReaper.init();
scheduledServiced.scheduleAtFixedRate(ringState, 60, 60, TimeUnit.SECONDS);
scheduledServiced.scheduleAtFixedRate(userDataState, 60, 60, TimeUnit.SECONDS);
if (settings.isPersistRingState()) {
scheduledServiced.scheduleAtFixedRate(ringState, 60, 60, TimeUnit.SECONDS);
}
if (settings.isPersistDataState()) {
scheduledServiced.scheduleAtFixedRate(userDataState, 60, 60, TimeUnit.SECONDS);
}
scheduledServiced.scheduleAtFixedRate(memberStateRefresher, 0, 100, TimeUnit.MILLISECONDS);
LOGGER.debug("The GossipManager is started.");
}
private void readSavedRingState() {
for (LocalMember l : ringState.readFromDisk()){
LocalMember member = new LocalMember(l.getClusterName(),
l.getUri(), l.getId(),
clock.nanoTime(), l.getProperties(), settings.getWindowSize(),
settings.getMinimumSamples(), settings.getDistribution());
members.putIfAbsent(member, GossipState.DOWN);
if (settings.isPersistRingState()) {
for (LocalMember l : ringState.readFromDisk()) {
LocalMember member = new LocalMember(l.getClusterName(),
l.getUri(), l.getId(),
clock.nanoTime(), l.getProperties(), settings.getWindowSize(),
settings.getMinimumSamples(), settings.getDistribution());
members.putIfAbsent(member, GossipState.DOWN);
}
}
}
private void readSavedDataState() {
for (Entry<String, ConcurrentHashMap<String, PerNodeDataMessage>> l : userDataState.readPerNodeFromDisk().entrySet()){
for (Entry<String, PerNodeDataMessage> j : l.getValue().entrySet()){
gossipCore.addPerNodeData(j.getValue());
if (settings.isPersistDataState()) {
for (Entry<String, ConcurrentHashMap<String, PerNodeDataMessage>> l : userDataState.readPerNodeFromDisk().entrySet()) {
for (Entry<String, PerNodeDataMessage> j : l.getValue().entrySet()) {
gossipCore.addPerNodeData(j.getValue());
}
}
}
for (Entry<String, SharedDataMessage> l: userDataState.readSharedDataFromDisk().entrySet()){
gossipCore.addSharedData(l.getValue());
if (settings.isPersistRingState()) {
for (Entry<String, SharedDataMessage> l : userDataState.readSharedDataFromDisk().entrySet()) {
gossipCore.addSharedData(l.getValue());
}
}
}
@ -191,24 +217,9 @@ public abstract class GossipManager {
*/
public void shutdown() {
gossipServiceRunning.set(false);
gossipThreadExecutor.shutdown();
gossipCore.shutdown();
transportManager.shutdown();
dataReaper.close();
if (passiveGossipThread != null) {
passiveGossipThread.shutdown();
}
if (activeGossipThread != null) {
activeGossipThread.shutdown();
}
try {
boolean result = gossipThreadExecutor.awaitTermination(10, TimeUnit.MILLISECONDS);
if (!result) {
LOGGER.error("executor shutdown timed out");
}
} catch (InterruptedException e) {
LOGGER.error(e);
}
gossipThreadExecutor.shutdownNow();
scheduledServiced.shutdown();
try {
scheduledServiced.awaitTermination(1, TimeUnit.SECONDS);
@ -234,7 +245,6 @@ public abstract class GossipManager {
gossipCore.addSharedData(message);
}
@SuppressWarnings("rawtypes")
public Crdt findCrdt(String key){
SharedDataMessage l = gossipCore.getSharedData().get(key);
@ -308,12 +318,32 @@ public abstract class GossipManager {
return clock;
}
public ObjectMapper getObjectMapper() {
return objectMapper;
}
public MetricRegistry getRegistry() {
return registry;
}
public ProtocolManager getProtocolManager() {
return protocolManager;
}
public TransportManager getTransportManager() {
return transportManager;
}
// todo: consider making these path methods part of GossipSettings
public static File buildRingStatePath(GossipManager manager) {
return new File(manager.getSettings().getPathToRingState(), "ringstate." + manager.getMyself().getClusterName() + "."
+ manager.getMyself().getId() + ".json");
}
public static File buildSharedDataPath(GossipManager manager){
return new File(manager.getSettings().getPathToDataState(), "shareddata."
+ manager.getMyself().getClusterName() + "." + manager.getMyself().getId() + ".json");
}
public static File buildPerNodeDataPath(GossipManager manager) {
return new File(manager.getSettings().getPathToDataState(), "pernodedata."
+ manager.getMyself().getClusterName() + "." + manager.getMyself().getId() + ".json");
}
}

View File

@ -18,12 +18,9 @@
package org.apache.gossip.manager;
import com.codahale.metrics.MetricRegistry;
import com.fasterxml.jackson.core.JsonGenerator.Feature;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.gossip.Member;
import org.apache.gossip.GossipSettings;
import org.apache.gossip.StartupSettings;
import org.apache.gossip.crdt.CrdtModule;
import org.apache.gossip.event.GossipListener;
import org.apache.gossip.manager.handlers.MessageHandler;
import org.apache.gossip.manager.handlers.MessageHandlerFactory;
@ -49,7 +46,6 @@ public class GossipManagerBuilder {
private GossipListener listener;
private MetricRegistry registry;
private Map<String,String> properties;
private ObjectMapper objectMapper;
private MessageHandler messageHandler;
private ManagerBuilder() {}
@ -108,11 +104,6 @@ public class GossipManagerBuilder {
this.uri = uri;
return this;
}
public ManagerBuilder mapper(ObjectMapper objectMapper){
this.objectMapper = objectMapper;
return this;
}
public ManagerBuilder messageHandler(MessageHandler messageHandler) {
this.messageHandler = messageHandler;
@ -136,16 +127,11 @@ public class GossipManagerBuilder {
if (gossipMembers == null) {
gossipMembers = new ArrayList<>();
}
if (objectMapper == null) {
objectMapper = new ObjectMapper();
objectMapper.enableDefaultTyping();
objectMapper.registerModule(new CrdtModule());
objectMapper.configure(Feature.WRITE_NUMBERS_AS_STRINGS, false);
}
if (messageHandler == null) {
messageHandler = MessageHandlerFactory.defaultHandler();
}
return new GossipManager(cluster, uri, id, properties, settings, gossipMembers, listener, registry, objectMapper, messageHandler) {} ;
}
return new GossipManager(cluster, uri, id, properties, settings, gossipMembers, listener, registry, messageHandler) {} ;
}
}

View File

@ -18,34 +18,25 @@
package org.apache.gossip.manager;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.SocketException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.gossip.model.Base;
import org.apache.gossip.model.SignedPayload;
import org.apache.log4j.Logger;
import com.codahale.metrics.Meter;
/**
* This class handles the passive cycle,
* where this client has received an incoming message.
*/
abstract public class PassiveGossipThread implements Runnable {
public class PassiveGossipThread implements Runnable {
public static final Logger LOGGER = Logger.getLogger(PassiveGossipThread.class);
/** The socket used for the passive thread of the gossip service. */
private final DatagramSocket server;
private final AtomicBoolean keepRunning;
private final GossipCore gossipCore;
private final GossipManager gossipManager;
private final Meter signed;
private final Meter unsigned;
public PassiveGossipThread(GossipManager gossipManager, GossipCore gossipCore) {
this.gossipManager = gossipManager;
@ -53,38 +44,18 @@ abstract public class PassiveGossipThread implements Runnable {
if (gossipManager.getMyself().getClusterName() == null){
throw new IllegalArgumentException("Cluster was null");
}
try {
SocketAddress socketAddress = new InetSocketAddress(gossipManager.getMyself().getUri().getHost(),
gossipManager.getMyself().getUri().getPort());
server = new DatagramSocket(socketAddress);
} catch (SocketException ex) {
LOGGER.warn(ex);
throw new RuntimeException(ex);
}
keepRunning = new AtomicBoolean(true);
signed = gossipManager.getRegistry().meter(PassiveGossipConstants.SIGNED_MESSAGE);
unsigned = gossipManager.getRegistry().meter(PassiveGossipConstants.UNSIGNED_MESSAGE);
}
@Override
public void run() {
while (keepRunning.get()) {
try {
byte[] buf = new byte[server.getReceiveBufferSize()];
DatagramPacket p = new DatagramPacket(buf, buf.length);
server.receive(p);
debug(p.getData());
byte[] buf = gossipManager.getTransportManager().read();
try {
Base activeGossipMessage = gossipManager.getObjectMapper().readValue(p.getData(), Base.class);
if (activeGossipMessage instanceof SignedPayload){
SignedPayload s = (SignedPayload) activeGossipMessage;
Base nested = gossipManager.getObjectMapper().readValue(s.getData(), Base.class);
gossipCore.receive(nested);
signed.mark();
} else {
gossipCore.receive(activeGossipMessage);
unsigned.mark();
}
Base message = gossipManager.getProtocolManager().read(buf);
gossipCore.receive(message);
gossipManager.getMemberStateRefresher().run();
} catch (RuntimeException ex) {//TODO trap json exception
LOGGER.error("Unable to process message", ex);
@ -94,21 +65,9 @@ abstract public class PassiveGossipThread implements Runnable {
keepRunning.set(false);
}
}
shutdown();
}
private void debug(byte[] jsonBytes) {
if (LOGGER.isDebugEnabled()){
String receivedMessage = new String(jsonBytes);
LOGGER.debug("Received message ( bytes): " + receivedMessage);
}
public void requestStop() {
keepRunning.set(false);
}
public void shutdown() {
try {
server.close();
} catch (RuntimeException ex) {
}
}
}

View File

@ -26,16 +26,24 @@ import java.util.Collections;
import java.util.List;
import java.util.NavigableSet;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.gossip.LocalMember;
import org.apache.gossip.crdt.CrdtModule;
import org.apache.log4j.Logger;
public class RingStatePersister implements Runnable {
private static final Logger LOGGER = Logger.getLogger(RingStatePersister.class);
private GossipManager parent;
private final File path;
// NOTE: this is a different instance than what gets used for message marshalling.
private final ObjectMapper objectMapper;
private final GossipManager manager;
public RingStatePersister(GossipManager parent){
this.parent = parent;
public RingStatePersister(File path, GossipManager manager){
this.path = path;
this.objectMapper = GossipManager.metdataObjectMapper;
this.manager = manager;
}
@Override
@ -43,34 +51,25 @@ public class RingStatePersister implements Runnable {
writeToDisk();
}
File computeTarget(){
return new File(parent.getSettings().getPathToRingState(), "ringstate." + parent.getMyself().getClusterName() + "."
+ parent.getMyself().getId() + ".json");
}
void writeToDisk(){
if (!parent.getSettings().isPersistRingState()){
return;
}
NavigableSet<LocalMember> i = parent.getMembers().keySet();
try (FileOutputStream fos = new FileOutputStream(computeTarget())){
parent.getObjectMapper().writeValue(fos, i);
void writeToDisk() {
NavigableSet<LocalMember> i = manager.getMembers().keySet();
try (FileOutputStream fos = new FileOutputStream(path)){
objectMapper.writeValue(fos, i);
} catch (IOException e) {
LOGGER.debug(e);
}
}
@SuppressWarnings("unchecked")
List<LocalMember> readFromDisk(){
if (!parent.getSettings().isPersistRingState()){
return Collections.emptyList();
List<LocalMember> readFromDisk() {
if (!path.exists()) {
return new ArrayList<>();
}
try (FileInputStream fos = new FileInputStream(computeTarget())){
return parent.getObjectMapper().readValue(fos, ArrayList.class);
try (FileInputStream fos = new FileInputStream(path)){
return objectMapper.readValue(fos, ArrayList.class);
} catch (IOException e) {
LOGGER.debug(e);
}
return Collections.emptyList();
return new ArrayList<>();
}
}

View File

@ -23,6 +23,7 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.gossip.model.PerNodeDataMessage;
import org.apache.gossip.model.SharedDataMessage;
import org.apache.log4j.Logger;
@ -30,31 +31,26 @@ import org.apache.log4j.Logger;
public class UserDataPersister implements Runnable {
private static final Logger LOGGER = Logger.getLogger(UserDataPersister.class);
private final GossipManager parent;
private final GossipCore gossipCore;
UserDataPersister(GossipManager parent, GossipCore gossipCore){
this.parent = parent;
private final File perNodePath;
private final File sharedPath;
private final ObjectMapper objectMapper;
UserDataPersister(GossipCore gossipCore, File perNodePath, File sharedPath) {
this.gossipCore = gossipCore;
}
File computeSharedTarget(){
return new File(parent.getSettings().getPathToDataState(), "shareddata."
+ parent.getMyself().getClusterName() + "." + parent.getMyself().getId() + ".json");
}
File computePerNodeTarget() {
return new File(parent.getSettings().getPathToDataState(), "pernodedata."
+ parent.getMyself().getClusterName() + "." + parent.getMyself().getId() + ".json");
this.objectMapper = GossipManager.metdataObjectMapper;
this.perNodePath = perNodePath;
this.sharedPath = sharedPath;
}
@SuppressWarnings("unchecked")
ConcurrentHashMap<String, ConcurrentHashMap<String, PerNodeDataMessage>> readPerNodeFromDisk(){
if (!parent.getSettings().isPersistDataState()){
if (!perNodePath.exists()) {
return new ConcurrentHashMap<String, ConcurrentHashMap<String, PerNodeDataMessage>>();
}
try (FileInputStream fos = new FileInputStream(computePerNodeTarget())){
return parent.getObjectMapper().readValue(fos, ConcurrentHashMap.class);
try (FileInputStream fos = new FileInputStream(perNodePath)){
return objectMapper.readValue(fos, ConcurrentHashMap.class);
} catch (IOException e) {
LOGGER.debug(e);
}
@ -62,22 +58,16 @@ public class UserDataPersister implements Runnable {
}
void writePerNodeToDisk(){
if (!parent.getSettings().isPersistDataState()){
return;
}
try (FileOutputStream fos = new FileOutputStream(computePerNodeTarget())){
parent.getObjectMapper().writeValue(fos, gossipCore.getPerNodeData());
try (FileOutputStream fos = new FileOutputStream(perNodePath)){
objectMapper.writeValue(fos, gossipCore.getPerNodeData());
} catch (IOException e) {
LOGGER.warn(e);
}
}
void writeSharedToDisk(){
if (!parent.getSettings().isPersistDataState()){
return;
}
try (FileOutputStream fos = new FileOutputStream(computeSharedTarget())){
parent.getObjectMapper().writeValue(fos, gossipCore.getSharedData());
try (FileOutputStream fos = new FileOutputStream(sharedPath)){
objectMapper.writeValue(fos, gossipCore.getSharedData());
} catch (IOException e) {
LOGGER.warn(e);
}
@ -85,11 +75,11 @@ public class UserDataPersister implements Runnable {
@SuppressWarnings("unchecked")
ConcurrentHashMap<String, SharedDataMessage> readSharedDataFromDisk(){
if (!parent.getSettings().isPersistRingState()){
return new ConcurrentHashMap<String, SharedDataMessage>();
if (!sharedPath.exists()) {
return new ConcurrentHashMap<>();
}
try (FileInputStream fos = new FileInputStream(computeSharedTarget())){
return parent.getObjectMapper().readValue(fos, ConcurrentHashMap.class);
try (FileInputStream fos = new FileInputStream(sharedPath)){
return objectMapper.readValue(fos, ConcurrentHashMap.class);
} catch (IOException e) {
LOGGER.debug(e);
}

View File

@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.protocol;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.gossip.GossipSettings;
import org.apache.gossip.crdt.CrdtModule;
import org.apache.gossip.manager.PassiveGossipConstants;
import org.apache.gossip.model.Base;
import org.apache.gossip.model.SignedPayload;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.security.InvalidKeyException;
import java.security.KeyFactory;
import java.security.NoSuchAlgorithmException;
import java.security.NoSuchProviderException;
import java.security.PrivateKey;
import java.security.Signature;
import java.security.SignatureException;
import java.security.spec.InvalidKeySpecException;
import java.security.spec.PKCS8EncodedKeySpec;
// this class is constructed by reflection in GossipManager.
public class JacksonProtocolManager implements ProtocolManager {
private final ObjectMapper objectMapper;
private final PrivateKey privKey;
private final Meter signed;
private final Meter unsigned;
/** required for reflection to work! */
public JacksonProtocolManager(GossipSettings settings, String id, MetricRegistry registry) {
// set up object mapper.
objectMapper = buildObjectMapper(settings);
// set up message signing.
if (settings.isSignMessages()){
File privateKey = new File(settings.getPathToKeyStore(), id);
File publicKey = new File(settings.getPathToKeyStore(), id + ".pub");
if (!privateKey.exists()){
throw new IllegalArgumentException("private key not found " + privateKey);
}
if (!publicKey.exists()){
throw new IllegalArgumentException("public key not found " + publicKey);
}
try (FileInputStream keyfis = new FileInputStream(privateKey)) {
byte[] encKey = new byte[keyfis.available()];
keyfis.read(encKey);
keyfis.close();
PKCS8EncodedKeySpec privKeySpec = new PKCS8EncodedKeySpec(encKey);
KeyFactory keyFactory = KeyFactory.getInstance("DSA");
privKey = keyFactory.generatePrivate(privKeySpec);
} catch (NoSuchAlgorithmException | InvalidKeySpecException | IOException e) {
throw new RuntimeException("failed hard", e);
}
} else {
privKey = null;
}
signed = registry.meter(PassiveGossipConstants.SIGNED_MESSAGE);
unsigned = registry.meter(PassiveGossipConstants.UNSIGNED_MESSAGE);
}
@Override
public byte[] write(Base message) throws IOException {
byte[] json_bytes;
if (privKey == null){
json_bytes = objectMapper.writeValueAsBytes(message);
} else {
SignedPayload p = new SignedPayload();
p.setData(objectMapper.writeValueAsString(message).getBytes());
p.setSignature(sign(p.getData(), privKey));
json_bytes = objectMapper.writeValueAsBytes(p);
}
return json_bytes;
}
@Override
public Base read(byte[] buf) throws IOException {
Base activeGossipMessage = objectMapper.readValue(buf, Base.class);
if (activeGossipMessage instanceof SignedPayload){
SignedPayload s = (SignedPayload) activeGossipMessage;
signed.mark();
return objectMapper.readValue(s.getData(), Base.class);
} else {
unsigned.mark();
return activeGossipMessage;
}
}
public static ObjectMapper buildObjectMapper(GossipSettings settings) {
ObjectMapper om = new ObjectMapper();
om.enableDefaultTyping();
// todo: should be specified in the configuration.
om.registerModule(new CrdtModule());
om.configure(JsonGenerator.Feature.WRITE_NUMBERS_AS_STRINGS, false);
return om;
}
private static byte[] sign(byte [] bytes, PrivateKey pk){
Signature dsa;
try {
dsa = Signature.getInstance("SHA1withDSA", "SUN");
dsa.initSign(pk);
dsa.update(bytes);
return dsa.sign();
} catch (NoSuchAlgorithmException | NoSuchProviderException | InvalidKeyException | SignatureException e) {
throw new RuntimeException(e);
}
}
}

View File

@ -15,19 +15,27 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.manager.impl;
package org.apache.gossip.protocol;
import org.apache.gossip.manager.GossipCore;
import org.apache.gossip.manager.GossipManager;
import org.apache.gossip.manager.PassiveGossipThread;
import org.apache.log4j.Logger;
import org.apache.gossip.model.Base;
public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread {
public static final Logger LOGGER = Logger.getLogger(OnlyProcessReceivedPassiveGossipThread.class);
import java.io.IOException;
public OnlyProcessReceivedPassiveGossipThread(GossipManager gossipManager, GossipCore gossipCore) {
super(gossipManager, gossipCore);
}
/** interface for managing message marshaling. */
public interface ProtocolManager {
/** serialize a message
* @param message
* @return serialized message.
* @throws IOException
*/
byte[] write(Base message) throws IOException;
/**
* Reads the next message from a byte source.
* @param buf
* @return a gossip message.
* @throws IOException
*/
Base read(byte[] buf) throws IOException;
}

View File

@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.transport;
import com.codahale.metrics.MetricRegistry;
import org.apache.gossip.manager.AbstractActiveGossiper;
import org.apache.gossip.manager.GossipCore;
import org.apache.gossip.manager.GossipManager;
import org.apache.gossip.manager.PassiveGossipThread;
import org.apache.gossip.utils.ReflectionUtils;
import org.apache.log4j.Logger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* Manage the protcol threads (active and passive gossipers).
*/
public abstract class AbstractTransportManager implements TransportManager {
public static final Logger LOGGER = Logger.getLogger(AbstractTransportManager.class);
private final PassiveGossipThread passiveGossipThread;
private final ExecutorService gossipThreadExecutor;
private final AbstractActiveGossiper activeGossipThread;
public AbstractTransportManager(GossipManager gossipManager, GossipCore gossipCore) {
passiveGossipThread = new PassiveGossipThread(gossipManager, gossipCore);
gossipThreadExecutor = Executors.newCachedThreadPool();
activeGossipThread = ReflectionUtils.constructWithReflection(
gossipManager.getSettings().getActiveGossipClass(),
new Class<?>[]{
GossipManager.class, GossipCore.class, MetricRegistry.class
},
new Object[]{
gossipManager, gossipCore, gossipManager.getRegistry()
});
}
// shut down threads etc.
@Override
public void shutdown() {
passiveGossipThread.requestStop();
gossipThreadExecutor.shutdown();
if (activeGossipThread != null) {
activeGossipThread.shutdown();
}
try {
boolean result = gossipThreadExecutor.awaitTermination(10, TimeUnit.MILLISECONDS);
if (!result) {
LOGGER.error("executor shutdown timed out");
}
} catch (InterruptedException e) {
LOGGER.error(e);
}
gossipThreadExecutor.shutdownNow();
}
@Override
public void startActiveGossiper() {
activeGossipThread.init();
}
@Override
public void startEndpoint() {
gossipThreadExecutor.execute(passiveGossipThread);
}
}

View File

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.transport;
import java.io.IOException;
import java.net.URI;
/** interface for manage that sends and receives messages that have already been serialized. */
public interface TransportManager {
/** starts the active gossip thread responsible for reaching out to remote nodes. Not related to `startEndpoint()` */
void startActiveGossiper();
/** starts the passive gossip thread that receives messages from remote nodes. Not related to `startActiveGossiper()` */
void startEndpoint();
/** attempts to shutdown all threads. */
void shutdown();
/** sends a payload to an endpoint. */
void send(URI endpoint, byte[] buf) throws IOException;
/** gets the next payload being sent to this node */
byte[] read() throws IOException;
}

View File

@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.transport;
import org.apache.gossip.manager.GossipCore;
import org.apache.gossip.manager.GossipManager;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.SocketException;
import java.net.URI;
/**
* This class is constructed by reflection in GossipManager.
* It manages transport (byte read/write) operations over UDP.
*/
public class UdpTransportManager extends AbstractTransportManager {
public static final Logger LOGGER = Logger.getLogger(UdpTransportManager.class);
/** The socket used for the passive thread of the gossip service. */
private final DatagramSocket server;
private final int soTimeout;
/** required for reflection to work! */
public UdpTransportManager(GossipManager gossipManager, GossipCore gossipCore) {
super(gossipManager, gossipCore);
soTimeout = gossipManager.getSettings().getGossipInterval() * 2;
try {
SocketAddress socketAddress = new InetSocketAddress(gossipManager.getMyself().getUri().getHost(),
gossipManager.getMyself().getUri().getPort());
server = new DatagramSocket(socketAddress);
} catch (SocketException ex) {
LOGGER.warn(ex);
throw new RuntimeException(ex);
}
}
@Override
public void shutdown() {
server.close();
super.shutdown();
}
/**
* blocking read a message.
* @return buffer of message contents.
* @throws IOException
*/
public byte[] read() throws IOException {
byte[] buf = new byte[server.getReceiveBufferSize()];
DatagramPacket p = new DatagramPacket(buf, buf.length);
server.receive(p);
debug(p.getData());
return p.getData();
}
@Override
public void send(URI endpoint, byte[] buf) throws IOException {
DatagramSocket socket = new DatagramSocket();
socket.setSoTimeout(soTimeout);
InetAddress dest = InetAddress.getByName(endpoint.getHost());
DatagramPacket payload = new DatagramPacket(buf, buf.length, dest, endpoint.getPort());
socket.send(payload);
// todo: investigate UDP socket reuse. It would save a little setup/teardown time wrt to the local socket.
socket.close();
}
private void debug(byte[] jsonBytes) {
if (LOGGER.isDebugEnabled()){
String receivedMessage = new String(jsonBytes);
LOGGER.debug("Received message ( bytes): " + receivedMessage);
}
}
}

View File

@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.utils;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
public class ReflectionUtils {
/**
* Create an instance of a thing. This method essentially makes code more readable by handing the various exception
* trapping.
* @param className
* @param constructorTypes
* @param constructorArgs
* @param <T>
* @return constructed instance of a thing.
*/
@SuppressWarnings("unchecked")
public static <T> T constructWithReflection(String className, Class<?>[] constructorTypes, Object[] constructorArgs) {
try {
Constructor<?> c = Class.forName(className).getConstructor(constructorTypes);
c.setAccessible(true);
return (T) c.newInstance(constructorArgs);
} catch (InvocationTargetException e) {
// catch ITE and throw the target if it is a RTE.
if (e.getTargetException() != null && RuntimeException.class.isAssignableFrom(e.getTargetException().getClass())) {
throw (RuntimeException) e.getTargetException();
} else {
throw new RuntimeException(e);
}
} catch (ReflectiveOperationException others) {
// Note: No class in the above list should be a descendent of RuntimeException. Otherwise, we're just wrapping
// and making stack traces confusing.
throw new RuntimeException(others);
}
}
}

View File

@ -18,15 +18,14 @@
package org.apache.gossip.crdt;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.SortedSet;
import java.util.TreeSet;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.gossip.GossipSettings;
import org.apache.gossip.manager.GossipManager;
import org.apache.gossip.manager.GossipManagerBuilder;
import org.apache.gossip.protocol.JacksonProtocolManager;
import org.junit.Assert;
import org.junit.Test;
@ -88,16 +87,11 @@ public class OrSetTest {
@Test
public void serialTest() throws InterruptedException, URISyntaxException, IOException {
GossipManager gossipService2 = GossipManagerBuilder.newBuilder()
.cluster("a")
.uri(new URI("udp://" + "127.0.0.1" + ":" + (29000 + 1)))
.id("1")
.gossipSettings(new GossipSettings())
.build();
ObjectMapper objectMapper = JacksonProtocolManager.buildObjectMapper(new GossipSettings());
OrSet<Integer> i = new OrSet<Integer>(new OrSet.Builder<Integer>().add(1).remove(1));
String s = gossipService2.getObjectMapper().writeValueAsString(i);
String s = objectMapper.writeValueAsString(i);
@SuppressWarnings("unchecked")
OrSet<Integer> back = gossipService2.getObjectMapper().readValue(s, OrSet.class);
OrSet<Integer> back = objectMapper.readValue(s, OrSet.class);
Assert.assertEquals(back, i);
}

View File

@ -25,6 +25,7 @@ import org.apache.gossip.manager.handlers.MessageHandler;
import org.apache.gossip.manager.handlers.ResponseHandler;
import org.apache.gossip.manager.handlers.TypedMessageHandler;
import org.junit.Assert;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.platform.runner.JUnitPlatform;
import org.junit.runner.RunWith;
@ -43,6 +44,17 @@ import static org.junit.jupiter.api.Assertions.expectThrows;
@RunWith(JUnitPlatform.class)
public class GossipManagerBuilderTest {
private GossipManagerBuilder.ManagerBuilder builder;
@BeforeEach
public void setup() throws Exception {
builder = GossipManagerBuilder.newBuilder()
.id("id")
.cluster("aCluster")
.uri(new URI("udp://localhost:2000"))
.gossipSettings(new GossipSettings());
}
@Test
public void idShouldNotBeNull() {
expectThrows(IllegalArgumentException.class,() -> {
@ -66,35 +78,20 @@ public class GossipManagerBuilderTest {
@Test
public void createMembersListIfNull() throws URISyntaxException {
GossipManager gossipManager = GossipManagerBuilder.newBuilder()
.id("id")
.cluster("aCluster")
.uri(new URI("udp://localhost:2000"))
.gossipSettings(new GossipSettings())
.gossipMembers(null).registry(new MetricRegistry()).build();
GossipManager gossipManager = builder.gossipMembers(null).registry(new MetricRegistry()).build();
assertNotNull(gossipManager.getLiveMembers());
}
@Test
public void createDefaultMessageHandlerIfNull() throws URISyntaxException {
GossipManager gossipManager = GossipManagerBuilder.newBuilder()
.id("id")
.cluster("aCluster")
.uri(new URI("udp://localhost:2000"))
.gossipSettings(new GossipSettings())
.messageHandler(null).registry(new MetricRegistry()).build();
GossipManager gossipManager = builder.messageHandler(null).registry(new MetricRegistry()).build();
assertNotNull(gossipManager.getMessageHandler());
}
@Test
public void testMessageHandlerKeeping() throws URISyntaxException {
MessageHandler mi = new TypedMessageHandler(Response.class, new ResponseHandler());
GossipManager gossipManager = GossipManagerBuilder.newBuilder()
.id("id")
.cluster("aCluster")
.uri(new URI("udp://localhost:2000"))
.gossipSettings(new GossipSettings())
.messageHandler(mi).registry(new MetricRegistry()).build();
GossipManager gossipManager = builder.messageHandler(mi).registry(new MetricRegistry()).build();
assertNotNull(gossipManager.getMessageHandler());
Assert.assertEquals(gossipManager.getMessageHandler(), mi);
}
@ -106,10 +103,7 @@ public class GossipManagerBuilderTest {
System.nanoTime(), new HashMap<String, String>(), 1000, 1, "exponential");
List<Member> memberList = new ArrayList<>();
memberList.add(member);
GossipManager gossipManager = GossipManagerBuilder.newBuilder()
.id("id")
.cluster("aCluster")
.gossipSettings(new GossipSettings())
GossipManager gossipManager = builder
.uri(new URI("udp://localhost:8000"))
.gossipMembers(memberList).registry(new MetricRegistry()).build();
assertEquals(1, gossipManager.getDeadMembers().size());

View File

@ -49,7 +49,7 @@ public class RingPersistenceTest {
new RemoteMember("a", new URI("udp://" + "127.0.0.1" + ":" + (29000 + 0)), "0"),
new RemoteMember("a", new URI("udp://" + "127.0.0.1" + ":" + (29000 + 2)), "2"))).build();
gossipService.getRingState().writeToDisk();
return gossipService.getRingState().computeTarget();
return GossipManager.buildRingStatePath(gossipService);
}
private void aNewInstanceGetsRingInfo(GossipSettings settings) throws UnknownHostException, InterruptedException, URISyntaxException {

View File

@ -68,8 +68,8 @@ public class UserDataPersistenceTest {
gossipService.init();
Assert.assertEquals("red", ((AToothpick) gossipService.findPerNodeGossipData(nodeId, "a").getPayload()).getColor());
Assert.assertEquals("blue", ((AToothpick) gossipService.findSharedGossipData("a").getPayload()).getColor());
File f = gossipService.getUserDataState().computeSharedTarget();
File g = gossipService.getUserDataState().computePerNodeTarget();
File f = GossipManager.buildSharedDataPath(gossipService);
File g = GossipManager.buildPerNodeDataPath(gossipService);
gossipService.shutdown();
f.delete();
g.delete();

View File

@ -64,7 +64,7 @@ public class MessageHandlerTest {
return true;
}
}
@Test
public void testSimpleHandler() {
MessageHandler mi = new TypedMessageHandler(FakeMessage.class, new FakeMessageHandler());