GOSSIP-44 Remove 4 byte header

This commit is contained in:
Edward Capriolo
2017-01-20 11:40:52 -05:00
parent 428c0573fb
commit b2af449074
7 changed files with 118 additions and 156 deletions

View File

@ -31,7 +31,7 @@ public class StandAloneNode {
GossipSettings s = new GossipSettings();
s.setWindowSize(10);
s.setConvictThreshold(1.0);
s.setGossipInterval(10);
s.setGossipInterval(1000);
GossipService gossipService = new GossipService("mycluster", URI.create(args[0]), args[1],
Arrays.asList( new RemoteGossipMember("mycluster", URI.create(args[2]), args[3])), s, (a,b) -> {}, new MetricRegistry());
gossipService.start();

View File

@ -17,7 +17,6 @@
*/
package org.apache.gossip.manager;
import java.io.IOException;
import java.util.List;
import java.util.Map.Entry;
@ -44,8 +43,6 @@ import org.apache.gossip.udp.UdpGossipDataMessage;
import org.apache.gossip.udp.UdpSharedGossipDataMessage;
import org.apache.log4j.Logger;
import com.fasterxml.jackson.databind.ObjectMapper;
import static com.codahale.metrics.MetricRegistry.name;
/**
@ -61,7 +58,6 @@ public class ActiveGossipThread {
private ScheduledExecutorService scheduledExecutorService;
private final BlockingQueue<Runnable> workQueue;
private ThreadPoolExecutor threadService;
private ObjectMapper MAPPER = new ObjectMapper();
private final Histogram sharedDataHistogram;
private final Histogram sendPerNodeDataHistogram;
@ -114,28 +110,17 @@ public class ActiveGossipThread {
LOGGER.debug("Send sendMembershipList() is called without action");
sharedDataHistogram.update(System.currentTimeMillis() - startTime);
return;
}
try {
for (Entry<String, SharedGossipDataMessage> innerEntry : this.gossipCore.getSharedData().entrySet()){
UdpSharedGossipDataMessage message = new UdpSharedGossipDataMessage();
message.setUuid(UUID.randomUUID().toString());
message.setUriFrom(me.getId());
message.setExpireAt(innerEntry.getValue().getExpireAt());
message.setKey(innerEntry.getValue().getKey());
message.setNodeId(innerEntry.getValue().getNodeId());
message.setTimestamp(innerEntry.getValue().getTimestamp());
message.setPayload(innerEntry.getValue().getPayload());
byte[] json_bytes = MAPPER.writeValueAsString(message).getBytes();
int packet_length = json_bytes.length;
if (packet_length < GossipManager.MAX_PACKET_SIZE) {
gossipCore.sendOneWay(message, member.getUri());
} else {
LOGGER.error("The length of the to be send message is too large ("
+ packet_length + " > " + GossipManager.MAX_PACKET_SIZE + ").");
}
}
} catch (IOException e1) {
LOGGER.warn(e1);
}
for (Entry<String, SharedGossipDataMessage> innerEntry : gossipCore.getSharedData().entrySet()){
UdpSharedGossipDataMessage message = new UdpSharedGossipDataMessage();
message.setUuid(UUID.randomUUID().toString());
message.setUriFrom(me.getId());
message.setExpireAt(innerEntry.getValue().getExpireAt());
message.setKey(innerEntry.getValue().getKey());
message.setNodeId(innerEntry.getValue().getNodeId());
message.setTimestamp(innerEntry.getValue().getTimestamp());
message.setPayload(innerEntry.getValue().getPayload());
gossipCore.sendOneWay(message, member.getUri());
}
sharedDataHistogram.update(System.currentTimeMillis() - startTime);
}
@ -148,36 +133,26 @@ public class ActiveGossipThread {
LOGGER.debug("Send sendMembershipList() is called without action");
sendPerNodeDataHistogram.update(System.currentTimeMillis() - startTime);
return;
}
try {
for (Entry<String, ConcurrentHashMap<String, GossipDataMessage>> entry : gossipCore.getPerNodeData().entrySet()){
for (Entry<String, GossipDataMessage> innerEntry : entry.getValue().entrySet()){
UdpGossipDataMessage message = new UdpGossipDataMessage();
message.setUuid(UUID.randomUUID().toString());
message.setUriFrom(me.getId());
message.setExpireAt(innerEntry.getValue().getExpireAt());
message.setKey(innerEntry.getValue().getKey());
message.setNodeId(innerEntry.getValue().getNodeId());
message.setTimestamp(innerEntry.getValue().getTimestamp());
message.setPayload(innerEntry.getValue().getPayload());
byte[] json_bytes = MAPPER.writeValueAsString(message).getBytes();
int packet_length = json_bytes.length;
if (packet_length < GossipManager.MAX_PACKET_SIZE) {
gossipCore.sendOneWay(message, member.getUri());
} else {
LOGGER.error("The length of the to be send message is too large ("
+ packet_length + " > " + GossipManager.MAX_PACKET_SIZE + ").");
}
}
}
for (Entry<String, ConcurrentHashMap<String, GossipDataMessage>> entry : gossipCore.getPerNodeData().entrySet()){
for (Entry<String, GossipDataMessage> innerEntry : entry.getValue().entrySet()){
UdpGossipDataMessage message = new UdpGossipDataMessage();
message.setUuid(UUID.randomUUID().toString());
message.setUriFrom(me.getId());
message.setExpireAt(innerEntry.getValue().getExpireAt());
message.setKey(innerEntry.getValue().getKey());
message.setNodeId(innerEntry.getValue().getNodeId());
message.setTimestamp(innerEntry.getValue().getTimestamp());
message.setPayload(innerEntry.getValue().getPayload());
gossipCore.sendOneWay(message, member.getUri());
}
} catch (IOException e1) {
LOGGER.warn(e1);
}
sendPerNodeDataHistogram.update(System.currentTimeMillis() - startTime);
}
protected void sendToALiveMember(){
LocalGossipMember member = selectPartner(gossipManager.getLiveMembers());
System.out.println("send" );
sendMembershipList(gossipManager.getMyself(), member);
}
@ -199,29 +174,18 @@ public class ActiveGossipThread {
} else {
LOGGER.debug("Send sendMembershipList() is called to " + member.toString());
}
try {
UdpActiveGossipMessage message = new UdpActiveGossipMessage();
message.setUriFrom(gossipManager.getMyself().getUri().toASCIIString());
message.setUuid(UUID.randomUUID().toString());
message.getMembers().add(convert(me));
for (LocalGossipMember other : gossipManager.getMembers().keySet()) {
message.getMembers().add(convert(other));
}
byte[] json_bytes = MAPPER.writeValueAsString(message).getBytes();
int packet_length = json_bytes.length;
if (packet_length < GossipManager.MAX_PACKET_SIZE) {
Response r = gossipCore.send(message, member.getUri());
if (r instanceof ActiveGossipOk){
//maybe count metrics here
} else {
LOGGER.debug("Message " + message + " generated response " + r);
}
} else {
LOGGER.error("The length of the to be send message is too large ("
+ packet_length + " > " + GossipManager.MAX_PACKET_SIZE + ").");
}
} catch (IOException e1) {
LOGGER.warn(e1);
UdpActiveGossipMessage message = new UdpActiveGossipMessage();
message.setUriFrom(gossipManager.getMyself().getUri().toASCIIString());
message.setUuid(UUID.randomUUID().toString());
message.getMembers().add(convert(me));
for (LocalGossipMember other : gossipManager.getMembers().keySet()) {
message.getMembers().add(convert(other));
}
Response r = gossipCore.send(message, member.getUri());
if (r instanceof ActiveGossipOk){
//maybe count metrics here
} else {
LOGGER.debug("Message " + message + " generated response " + r);
}
sendMembershipHistorgram.update(System.currentTimeMillis() - startTime);
}

View File

@ -31,7 +31,6 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@ -53,27 +52,42 @@ import org.apache.gossip.udp.UdpNotAMemberFault;
import org.apache.gossip.udp.UdpSharedGossipDataMessage;
import org.apache.log4j.Logger;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.fasterxml.jackson.databind.ObjectMapper;
public class GossipCore {
public class GossipCore implements GossipCoreConstants {
public static final Logger LOGGER = Logger.getLogger(GossipCore.class);
private static final ObjectMapper MAPPER = new ObjectMapper();
private final GossipManager gossipManager;
private ConcurrentHashMap<String, Base> requests;
private ExecutorService service;
private ThreadPoolExecutor service;
private final ConcurrentHashMap<String, ConcurrentHashMap<String, GossipDataMessage>> perNodeData;
private final ConcurrentHashMap<String, SharedGossipDataMessage> sharedData;
private final BlockingQueue<Runnable> workQueue;
private final Meter messageSerdeException;
private final Meter tranmissionException;
private final Meter tranmissionSuccess;
public GossipCore(GossipManager manager){
public GossipCore(GossipManager manager, MetricRegistry metrics){
this.gossipManager = manager;
requests = new ConcurrentHashMap<>();
workQueue = new ArrayBlockingQueue<>(1024);
service = new ThreadPoolExecutor(1, 5, 1, TimeUnit.SECONDS, workQueue, new ThreadPoolExecutor.DiscardOldestPolicy());
perNodeData = new ConcurrentHashMap<>();
sharedData = new ConcurrentHashMap<>();
metrics.register(WORKQUEUE_SIZE, (Gauge<Integer>)() -> workQueue.size());
metrics.register(PER_NODE_DATA_SIZE, (Gauge<Integer>)() -> perNodeData.size());
metrics.register(SHARED_DATA_SIZE, (Gauge<Integer>)() -> sharedData.size());
metrics.register(REQUEST_SIZE, (Gauge<Integer>)() -> requests.size());
metrics.register(THREADPOOL_ACTIVE, (Gauge<Integer>)() -> service.getActiveCount());
metrics.register(THREADPOOL_SIZE, (Gauge<Integer>)() -> service.getPoolSize());
messageSerdeException = metrics.meter(MESSAGE_SERDE_EXCEPTION);
tranmissionException = metrics.meter(MESSAGE_TRANSMISSION_EXCEPTION);
tranmissionSuccess = metrics.meter(MESSAGE_TRANSMISSION_SUCCESS);
}
public void addSharedData(SharedGossipDataMessage message){
@ -175,29 +189,29 @@ public class GossipCore {
}
/**
* Sends a blocking message. Throws exception when tranmission fails
* Sends a blocking message.
* @param message
* @param uri
* @throws RuntimeException if data can not be serialized or in transmission error
*/
private void sendInternal(Base message, URI uri){
byte[] json_bytes;
try {
json_bytes = MAPPER.writeValueAsString(message).getBytes();
} catch (IOException e) {
messageSerdeException.mark();
throw new RuntimeException(e);
}
int packet_length = json_bytes.length;
if (packet_length < GossipManager.MAX_PACKET_SIZE) {
byte[] buf = UdpUtil.createBuffer(packet_length, json_bytes);
try (DatagramSocket socket = new DatagramSocket()) {
socket.setSoTimeout(gossipManager.getSettings().getGossipInterval() * 2);
InetAddress dest = InetAddress.getByName(uri.getHost());
DatagramPacket datagramPacket = new DatagramPacket(buf, buf.length, dest, uri.getPort());
socket.send(datagramPacket);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
try (DatagramSocket socket = new DatagramSocket()) {
socket.setSoTimeout(gossipManager.getSettings().getGossipInterval() * 2);
InetAddress dest = InetAddress.getByName(uri.getHost());
DatagramPacket datagramPacket = new DatagramPacket(json_bytes, json_bytes.length, dest, uri.getPort());
socket.send(datagramPacket);
tranmissionSuccess.mark();
} catch (IOException e) {
tranmissionException.mark();
throw new RuntimeException(e);
}
}
public Response send(Base message, URI uri){
@ -225,7 +239,7 @@ public class GossipCore {
return (Response) b;
}
try {
Thread.sleep(0, 1000);
Thread.sleep(0, 555555);
} catch (InterruptedException e) {
}
@ -261,19 +275,20 @@ public class GossipCore {
public void sendOneWay(Base message, URI u){
byte[] json_bytes;
try {
json_bytes = MAPPER.writeValueAsString(message).getBytes();
json_bytes = MAPPER.writeValueAsBytes(message);
} catch (IOException e) {
messageSerdeException.mark();
throw new RuntimeException(e);
}
int packet_length = json_bytes.length;
if (packet_length < GossipManager.MAX_PACKET_SIZE) {
byte[] buf = UdpUtil.createBuffer(packet_length, json_bytes);
try (DatagramSocket socket = new DatagramSocket()) {
socket.setSoTimeout(gossipManager.getSettings().getGossipInterval() * 2);
InetAddress dest = InetAddress.getByName(u.getHost());
DatagramPacket datagramPacket = new DatagramPacket(buf, buf.length, dest, u.getPort());
socket.send(datagramPacket);
} catch (IOException ex) { }
try (DatagramSocket socket = new DatagramSocket()) {
socket.setSoTimeout(gossipManager.getSettings().getGossipInterval() * 2);
InetAddress dest = InetAddress.getByName(u.getHost());
DatagramPacket datagramPacket = new DatagramPacket(json_bytes, json_bytes.length, dest, u.getPort());
socket.send(datagramPacket);
tranmissionSuccess.mark();
} catch (IOException ex) {
tranmissionException.mark();
LOGGER.debug("Send one way failed", ex);
}
}

View File

@ -17,29 +17,14 @@
*/
package org.apache.gossip.manager;
import java.nio.ByteBuffer;
public class UdpUtil {
public static int readPacketLengthFromBuffer(byte [] buffer){
int packetLength = 0;
for (int i = 0; i < 4; i++) {
int shift = (4 - 1 - i) * 8;
packetLength += (buffer[i] & 0x000000FF) << shift;
}
return packetLength;
}
public static byte[] createBuffer(int packetLength, byte[] jsonBytes) {
byte[] lengthBytes = new byte[4];
lengthBytes[0] = (byte) (packetLength >> 24);
lengthBytes[1] = (byte) ((packetLength << 8) >> 24);
lengthBytes[2] = (byte) ((packetLength << 16) >> 24);
lengthBytes[3] = (byte) ((packetLength << 24) >> 24);
ByteBuffer byteBuffer = ByteBuffer.allocate(4 + jsonBytes.length);
byteBuffer.put(lengthBytes);
byteBuffer.put(jsonBytes);
byte[] buf = byteBuffer.array();
return buf;
}
public interface GossipCoreConstants {
String WORKQUEUE_SIZE = "gossip.core.workqueue.size";
String PER_NODE_DATA_SIZE = "gossip.core.pernodedata.size";
String SHARED_DATA_SIZE = "gossip.core.shareddata.size";
String REQUEST_SIZE = "gossip.core.requests.size";
String THREADPOOL_ACTIVE = "gossip.core.threadpool.active";
String THREADPOOL_SIZE = "gossip.core.threadpool.size";
String MESSAGE_SERDE_EXCEPTION = "gossip.core.message_serde_exception";
String MESSAGE_TRANSMISSION_EXCEPTION = "gossip.core.message_transmission_exception";
String MESSAGE_TRANSMISSION_SUCCESS = "gossip.core.message_transmission_success";
}

View File

@ -49,8 +49,6 @@ public abstract class GossipManager {
public static final Logger LOGGER = Logger.getLogger(GossipManager.class);
public static final int MAX_PACKET_SIZE = 102400;
private final ConcurrentSkipListMap<LocalGossipMember, GossipState> members;
private final LocalGossipMember me;
@ -82,7 +80,7 @@ public abstract class GossipManager {
List<GossipMember> gossipMembers, GossipListener listener, MetricRegistry registry) {
this.settings = settings;
gossipCore = new GossipCore(this);
gossipCore = new GossipCore(this, registry);
clock = new SystemClock();
dataReaper = new DataReaper(gossipCore, clock);
me = new LocalGossipMember(cluster, uri, id, clock.nanoTime(),
@ -256,4 +254,5 @@ public abstract class GossipManager {
return dataReaper;
}
}

View File

@ -78,23 +78,13 @@ abstract public class PassiveGossipThread implements Runnable {
byte[] buf = new byte[server.getReceiveBufferSize()];
DatagramPacket p = new DatagramPacket(buf, buf.length);
server.receive(p);
int packet_length = UdpUtil.readPacketLengthFromBuffer(buf);
if (packet_length <= GossipManager.MAX_PACKET_SIZE) {
byte[] json_bytes = new byte[packet_length];
for (int i = 0; i < packet_length; i++) {
json_bytes[i] = buf[i + 4];
}
debug(packet_length, json_bytes);
try {
Base activeGossipMessage = MAPPER.readValue(json_bytes, Base.class);
gossipCore.receive(activeGossipMessage);
} catch (RuntimeException ex) {//TODO trap json exception
LOGGER.error("Unable to process message", ex);
}
} else {
LOGGER.error("The received message is not of the expected size, it has been dropped.");
debug(p.getData());
try {
Base activeGossipMessage = MAPPER.readValue(p.getData(), Base.class);
gossipCore.receive(activeGossipMessage);
} catch (RuntimeException ex) {//TODO trap json exception
LOGGER.error("Unable to process message", ex);
}
} catch (IOException e) {
LOGGER.error(e);
keepRunning.set(false);
@ -103,11 +93,10 @@ abstract public class PassiveGossipThread implements Runnable {
shutdown();
}
private void debug(int packetLength, byte[] jsonBytes) {
private void debug(byte[] jsonBytes) {
if (LOGGER.isDebugEnabled()){
String receivedMessage = new String(jsonBytes);
LOGGER.debug("Received message (" + packetLength + " bytes): "
+ receivedMessage);
LOGGER.debug("Received message ( bytes): " + receivedMessage);
}
}