diff --git a/src/main/java/org/apache/gossip/examples/StandAloneNode.java b/src/main/java/org/apache/gossip/examples/StandAloneNode.java index c12f946..d24c0fa 100644 --- a/src/main/java/org/apache/gossip/examples/StandAloneNode.java +++ b/src/main/java/org/apache/gossip/examples/StandAloneNode.java @@ -31,7 +31,7 @@ public class StandAloneNode { GossipSettings s = new GossipSettings(); s.setWindowSize(10); s.setConvictThreshold(1.0); - s.setGossipInterval(10); + s.setGossipInterval(1000); GossipService gossipService = new GossipService("mycluster", URI.create(args[0]), args[1], Arrays.asList( new RemoteGossipMember("mycluster", URI.create(args[2]), args[3])), s, (a,b) -> {}, new MetricRegistry()); gossipService.start(); diff --git a/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java b/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java index 731b019..f81565b 100644 --- a/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java +++ b/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java @@ -17,7 +17,6 @@ */ package org.apache.gossip.manager; -import java.io.IOException; import java.util.List; import java.util.Map.Entry; @@ -44,8 +43,6 @@ import org.apache.gossip.udp.UdpGossipDataMessage; import org.apache.gossip.udp.UdpSharedGossipDataMessage; import org.apache.log4j.Logger; -import com.fasterxml.jackson.databind.ObjectMapper; - import static com.codahale.metrics.MetricRegistry.name; /** @@ -61,7 +58,6 @@ public class ActiveGossipThread { private ScheduledExecutorService scheduledExecutorService; private final BlockingQueue workQueue; private ThreadPoolExecutor threadService; - private ObjectMapper MAPPER = new ObjectMapper(); private final Histogram sharedDataHistogram; private final Histogram sendPerNodeDataHistogram; @@ -114,28 +110,17 @@ public class ActiveGossipThread { LOGGER.debug("Send sendMembershipList() is called without action"); sharedDataHistogram.update(System.currentTimeMillis() - startTime); return; - } - try { - for (Entry innerEntry : this.gossipCore.getSharedData().entrySet()){ - UdpSharedGossipDataMessage message = new UdpSharedGossipDataMessage(); - message.setUuid(UUID.randomUUID().toString()); - message.setUriFrom(me.getId()); - message.setExpireAt(innerEntry.getValue().getExpireAt()); - message.setKey(innerEntry.getValue().getKey()); - message.setNodeId(innerEntry.getValue().getNodeId()); - message.setTimestamp(innerEntry.getValue().getTimestamp()); - message.setPayload(innerEntry.getValue().getPayload()); - byte[] json_bytes = MAPPER.writeValueAsString(message).getBytes(); - int packet_length = json_bytes.length; - if (packet_length < GossipManager.MAX_PACKET_SIZE) { - gossipCore.sendOneWay(message, member.getUri()); - } else { - LOGGER.error("The length of the to be send message is too large (" - + packet_length + " > " + GossipManager.MAX_PACKET_SIZE + ")."); - } - } - } catch (IOException e1) { - LOGGER.warn(e1); + } + for (Entry innerEntry : gossipCore.getSharedData().entrySet()){ + UdpSharedGossipDataMessage message = new UdpSharedGossipDataMessage(); + message.setUuid(UUID.randomUUID().toString()); + message.setUriFrom(me.getId()); + message.setExpireAt(innerEntry.getValue().getExpireAt()); + message.setKey(innerEntry.getValue().getKey()); + message.setNodeId(innerEntry.getValue().getNodeId()); + message.setTimestamp(innerEntry.getValue().getTimestamp()); + message.setPayload(innerEntry.getValue().getPayload()); + gossipCore.sendOneWay(message, member.getUri()); } sharedDataHistogram.update(System.currentTimeMillis() - startTime); } @@ -148,36 +133,26 @@ public class ActiveGossipThread { LOGGER.debug("Send sendMembershipList() is called without action"); sendPerNodeDataHistogram.update(System.currentTimeMillis() - startTime); return; - } - try { - for (Entry> entry : gossipCore.getPerNodeData().entrySet()){ - for (Entry innerEntry : entry.getValue().entrySet()){ - UdpGossipDataMessage message = new UdpGossipDataMessage(); - message.setUuid(UUID.randomUUID().toString()); - message.setUriFrom(me.getId()); - message.setExpireAt(innerEntry.getValue().getExpireAt()); - message.setKey(innerEntry.getValue().getKey()); - message.setNodeId(innerEntry.getValue().getNodeId()); - message.setTimestamp(innerEntry.getValue().getTimestamp()); - message.setPayload(innerEntry.getValue().getPayload()); - byte[] json_bytes = MAPPER.writeValueAsString(message).getBytes(); - int packet_length = json_bytes.length; - if (packet_length < GossipManager.MAX_PACKET_SIZE) { - gossipCore.sendOneWay(message, member.getUri()); - } else { - LOGGER.error("The length of the to be send message is too large (" - + packet_length + " > " + GossipManager.MAX_PACKET_SIZE + ")."); - } - } + } + for (Entry> entry : gossipCore.getPerNodeData().entrySet()){ + for (Entry innerEntry : entry.getValue().entrySet()){ + UdpGossipDataMessage message = new UdpGossipDataMessage(); + message.setUuid(UUID.randomUUID().toString()); + message.setUriFrom(me.getId()); + message.setExpireAt(innerEntry.getValue().getExpireAt()); + message.setKey(innerEntry.getValue().getKey()); + message.setNodeId(innerEntry.getValue().getNodeId()); + message.setTimestamp(innerEntry.getValue().getTimestamp()); + message.setPayload(innerEntry.getValue().getPayload()); + gossipCore.sendOneWay(message, member.getUri()); } - } catch (IOException e1) { - LOGGER.warn(e1); } sendPerNodeDataHistogram.update(System.currentTimeMillis() - startTime); } protected void sendToALiveMember(){ LocalGossipMember member = selectPartner(gossipManager.getLiveMembers()); + System.out.println("send" ); sendMembershipList(gossipManager.getMyself(), member); } @@ -199,29 +174,18 @@ public class ActiveGossipThread { } else { LOGGER.debug("Send sendMembershipList() is called to " + member.toString()); } - try { - UdpActiveGossipMessage message = new UdpActiveGossipMessage(); - message.setUriFrom(gossipManager.getMyself().getUri().toASCIIString()); - message.setUuid(UUID.randomUUID().toString()); - message.getMembers().add(convert(me)); - for (LocalGossipMember other : gossipManager.getMembers().keySet()) { - message.getMembers().add(convert(other)); - } - byte[] json_bytes = MAPPER.writeValueAsString(message).getBytes(); - int packet_length = json_bytes.length; - if (packet_length < GossipManager.MAX_PACKET_SIZE) { - Response r = gossipCore.send(message, member.getUri()); - if (r instanceof ActiveGossipOk){ - //maybe count metrics here - } else { - LOGGER.debug("Message " + message + " generated response " + r); - } - } else { - LOGGER.error("The length of the to be send message is too large (" - + packet_length + " > " + GossipManager.MAX_PACKET_SIZE + ")."); - } - } catch (IOException e1) { - LOGGER.warn(e1); + UdpActiveGossipMessage message = new UdpActiveGossipMessage(); + message.setUriFrom(gossipManager.getMyself().getUri().toASCIIString()); + message.setUuid(UUID.randomUUID().toString()); + message.getMembers().add(convert(me)); + for (LocalGossipMember other : gossipManager.getMembers().keySet()) { + message.getMembers().add(convert(other)); + } + Response r = gossipCore.send(message, member.getUri()); + if (r instanceof ActiveGossipOk){ + //maybe count metrics here + } else { + LOGGER.debug("Message " + message + " generated response " + r); } sendMembershipHistorgram.update(System.currentTimeMillis() - startTime); } diff --git a/src/main/java/org/apache/gossip/manager/GossipCore.java b/src/main/java/org/apache/gossip/manager/GossipCore.java index d315361..5d561c3 100644 --- a/src/main/java/org/apache/gossip/manager/GossipCore.java +++ b/src/main/java/org/apache/gossip/manager/GossipCore.java @@ -31,7 +31,6 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -53,27 +52,42 @@ import org.apache.gossip.udp.UdpNotAMemberFault; import org.apache.gossip.udp.UdpSharedGossipDataMessage; import org.apache.log4j.Logger; +import com.codahale.metrics.Gauge; +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; import com.fasterxml.jackson.databind.ObjectMapper; -public class GossipCore { +public class GossipCore implements GossipCoreConstants { public static final Logger LOGGER = Logger.getLogger(GossipCore.class); private static final ObjectMapper MAPPER = new ObjectMapper(); private final GossipManager gossipManager; private ConcurrentHashMap requests; - private ExecutorService service; + private ThreadPoolExecutor service; private final ConcurrentHashMap> perNodeData; private final ConcurrentHashMap sharedData; private final BlockingQueue workQueue; + private final Meter messageSerdeException; + private final Meter tranmissionException; + private final Meter tranmissionSuccess; - public GossipCore(GossipManager manager){ + public GossipCore(GossipManager manager, MetricRegistry metrics){ this.gossipManager = manager; requests = new ConcurrentHashMap<>(); workQueue = new ArrayBlockingQueue<>(1024); service = new ThreadPoolExecutor(1, 5, 1, TimeUnit.SECONDS, workQueue, new ThreadPoolExecutor.DiscardOldestPolicy()); perNodeData = new ConcurrentHashMap<>(); sharedData = new ConcurrentHashMap<>(); + metrics.register(WORKQUEUE_SIZE, (Gauge)() -> workQueue.size()); + metrics.register(PER_NODE_DATA_SIZE, (Gauge)() -> perNodeData.size()); + metrics.register(SHARED_DATA_SIZE, (Gauge)() -> sharedData.size()); + metrics.register(REQUEST_SIZE, (Gauge)() -> requests.size()); + metrics.register(THREADPOOL_ACTIVE, (Gauge)() -> service.getActiveCount()); + metrics.register(THREADPOOL_SIZE, (Gauge)() -> service.getPoolSize()); + messageSerdeException = metrics.meter(MESSAGE_SERDE_EXCEPTION); + tranmissionException = metrics.meter(MESSAGE_TRANSMISSION_EXCEPTION); + tranmissionSuccess = metrics.meter(MESSAGE_TRANSMISSION_SUCCESS); } public void addSharedData(SharedGossipDataMessage message){ @@ -175,29 +189,29 @@ public class GossipCore { } /** - * Sends a blocking message. Throws exception when tranmission fails + * Sends a blocking message. * @param message * @param uri + * @throws RuntimeException if data can not be serialized or in transmission error */ private void sendInternal(Base message, URI uri){ byte[] json_bytes; try { json_bytes = MAPPER.writeValueAsString(message).getBytes(); } catch (IOException e) { + messageSerdeException.mark(); throw new RuntimeException(e); } - int packet_length = json_bytes.length; - if (packet_length < GossipManager.MAX_PACKET_SIZE) { - byte[] buf = UdpUtil.createBuffer(packet_length, json_bytes); - try (DatagramSocket socket = new DatagramSocket()) { - socket.setSoTimeout(gossipManager.getSettings().getGossipInterval() * 2); - InetAddress dest = InetAddress.getByName(uri.getHost()); - DatagramPacket datagramPacket = new DatagramPacket(buf, buf.length, dest, uri.getPort()); - socket.send(datagramPacket); - } catch (IOException e) { - throw new RuntimeException(e); - } - } + try (DatagramSocket socket = new DatagramSocket()) { + socket.setSoTimeout(gossipManager.getSettings().getGossipInterval() * 2); + InetAddress dest = InetAddress.getByName(uri.getHost()); + DatagramPacket datagramPacket = new DatagramPacket(json_bytes, json_bytes.length, dest, uri.getPort()); + socket.send(datagramPacket); + tranmissionSuccess.mark(); + } catch (IOException e) { + tranmissionException.mark(); + throw new RuntimeException(e); + } } public Response send(Base message, URI uri){ @@ -225,7 +239,7 @@ public class GossipCore { return (Response) b; } try { - Thread.sleep(0, 1000); + Thread.sleep(0, 555555); } catch (InterruptedException e) { } @@ -261,19 +275,20 @@ public class GossipCore { public void sendOneWay(Base message, URI u){ byte[] json_bytes; try { - json_bytes = MAPPER.writeValueAsString(message).getBytes(); + json_bytes = MAPPER.writeValueAsBytes(message); } catch (IOException e) { + messageSerdeException.mark(); throw new RuntimeException(e); } - int packet_length = json_bytes.length; - if (packet_length < GossipManager.MAX_PACKET_SIZE) { - byte[] buf = UdpUtil.createBuffer(packet_length, json_bytes); - try (DatagramSocket socket = new DatagramSocket()) { - socket.setSoTimeout(gossipManager.getSettings().getGossipInterval() * 2); - InetAddress dest = InetAddress.getByName(u.getHost()); - DatagramPacket datagramPacket = new DatagramPacket(buf, buf.length, dest, u.getPort()); - socket.send(datagramPacket); - } catch (IOException ex) { } + try (DatagramSocket socket = new DatagramSocket()) { + socket.setSoTimeout(gossipManager.getSettings().getGossipInterval() * 2); + InetAddress dest = InetAddress.getByName(u.getHost()); + DatagramPacket datagramPacket = new DatagramPacket(json_bytes, json_bytes.length, dest, u.getPort()); + socket.send(datagramPacket); + tranmissionSuccess.mark(); + } catch (IOException ex) { + tranmissionException.mark(); + LOGGER.debug("Send one way failed", ex); } } diff --git a/src/main/java/org/apache/gossip/manager/UdpUtil.java b/src/main/java/org/apache/gossip/manager/GossipCoreConstants.java similarity index 50% rename from src/main/java/org/apache/gossip/manager/UdpUtil.java rename to src/main/java/org/apache/gossip/manager/GossipCoreConstants.java index c61769f..6d3765a 100644 --- a/src/main/java/org/apache/gossip/manager/UdpUtil.java +++ b/src/main/java/org/apache/gossip/manager/GossipCoreConstants.java @@ -17,29 +17,14 @@ */ package org.apache.gossip.manager; -import java.nio.ByteBuffer; - -public class UdpUtil { - - public static int readPacketLengthFromBuffer(byte [] buffer){ - int packetLength = 0; - for (int i = 0; i < 4; i++) { - int shift = (4 - 1 - i) * 8; - packetLength += (buffer[i] & 0x000000FF) << shift; - } - return packetLength; - } - - public static byte[] createBuffer(int packetLength, byte[] jsonBytes) { - byte[] lengthBytes = new byte[4]; - lengthBytes[0] = (byte) (packetLength >> 24); - lengthBytes[1] = (byte) ((packetLength << 8) >> 24); - lengthBytes[2] = (byte) ((packetLength << 16) >> 24); - lengthBytes[3] = (byte) ((packetLength << 24) >> 24); - ByteBuffer byteBuffer = ByteBuffer.allocate(4 + jsonBytes.length); - byteBuffer.put(lengthBytes); - byteBuffer.put(jsonBytes); - byte[] buf = byteBuffer.array(); - return buf; - } +public interface GossipCoreConstants { + String WORKQUEUE_SIZE = "gossip.core.workqueue.size"; + String PER_NODE_DATA_SIZE = "gossip.core.pernodedata.size"; + String SHARED_DATA_SIZE = "gossip.core.shareddata.size"; + String REQUEST_SIZE = "gossip.core.requests.size"; + String THREADPOOL_ACTIVE = "gossip.core.threadpool.active"; + String THREADPOOL_SIZE = "gossip.core.threadpool.size"; + String MESSAGE_SERDE_EXCEPTION = "gossip.core.message_serde_exception"; + String MESSAGE_TRANSMISSION_EXCEPTION = "gossip.core.message_transmission_exception"; + String MESSAGE_TRANSMISSION_SUCCESS = "gossip.core.message_transmission_success"; } diff --git a/src/main/java/org/apache/gossip/manager/GossipManager.java b/src/main/java/org/apache/gossip/manager/GossipManager.java index fb7ec93..cf67c9c 100644 --- a/src/main/java/org/apache/gossip/manager/GossipManager.java +++ b/src/main/java/org/apache/gossip/manager/GossipManager.java @@ -49,8 +49,6 @@ public abstract class GossipManager { public static final Logger LOGGER = Logger.getLogger(GossipManager.class); - public static final int MAX_PACKET_SIZE = 102400; - private final ConcurrentSkipListMap members; private final LocalGossipMember me; @@ -82,7 +80,7 @@ public abstract class GossipManager { List gossipMembers, GossipListener listener, MetricRegistry registry) { this.settings = settings; - gossipCore = new GossipCore(this); + gossipCore = new GossipCore(this, registry); clock = new SystemClock(); dataReaper = new DataReaper(gossipCore, clock); me = new LocalGossipMember(cluster, uri, id, clock.nanoTime(), @@ -256,4 +254,5 @@ public abstract class GossipManager { return dataReaper; } + } diff --git a/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java b/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java index b54a963..ebda513 100644 --- a/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java +++ b/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java @@ -78,23 +78,13 @@ abstract public class PassiveGossipThread implements Runnable { byte[] buf = new byte[server.getReceiveBufferSize()]; DatagramPacket p = new DatagramPacket(buf, buf.length); server.receive(p); - int packet_length = UdpUtil.readPacketLengthFromBuffer(buf); - if (packet_length <= GossipManager.MAX_PACKET_SIZE) { - byte[] json_bytes = new byte[packet_length]; - for (int i = 0; i < packet_length; i++) { - json_bytes[i] = buf[i + 4]; - } - debug(packet_length, json_bytes); - try { - Base activeGossipMessage = MAPPER.readValue(json_bytes, Base.class); - gossipCore.receive(activeGossipMessage); - } catch (RuntimeException ex) {//TODO trap json exception - LOGGER.error("Unable to process message", ex); - } - } else { - LOGGER.error("The received message is not of the expected size, it has been dropped."); + debug(p.getData()); + try { + Base activeGossipMessage = MAPPER.readValue(p.getData(), Base.class); + gossipCore.receive(activeGossipMessage); + } catch (RuntimeException ex) {//TODO trap json exception + LOGGER.error("Unable to process message", ex); } - } catch (IOException e) { LOGGER.error(e); keepRunning.set(false); @@ -103,11 +93,10 @@ abstract public class PassiveGossipThread implements Runnable { shutdown(); } - private void debug(int packetLength, byte[] jsonBytes) { + private void debug(byte[] jsonBytes) { if (LOGGER.isDebugEnabled()){ String receivedMessage = new String(jsonBytes); - LOGGER.debug("Received message (" + packetLength + " bytes): " - + receivedMessage); + LOGGER.debug("Received message ( bytes): " + receivedMessage); } } diff --git a/src/test/java/org/apache/gossip/manager/DataReaperTest.java b/src/test/java/org/apache/gossip/manager/DataReaperTest.java index 5388bb3..b4ac45d 100644 --- a/src/test/java/org/apache/gossip/manager/DataReaperTest.java +++ b/src/test/java/org/apache/gossip/manager/DataReaperTest.java @@ -32,27 +32,37 @@ import io.teknek.tunit.TUnit; public class DataReaperTest { private final MetricRegistry registry = new MetricRegistry(); - + String myId = "4"; + String key = "key"; + String value = "a"; + @Test public void testReaperOneShot() { - String myId = "4"; - String key = "key"; - String value = "a"; GossipSettings settings = new GossipSettings(); GossipManager gm = RandomGossipManager.newBuilder().cluster("abc").settings(settings) .withId(myId).uri(URI.create("udp://localhost:6000")).registry(registry).build(); gm.init(); gm.gossipPerNodeData(perNodeDatum(key, value)); gm.gossipSharedData(sharedDatum(key, value)); - Assert.assertEquals(value, gm.findPerNodeGossipData(myId, key).getPayload()); - Assert.assertEquals(value, gm.findSharedGossipData(key).getPayload()); + assertDataIsAtCorrectValue(gm); gm.getDataReaper().runPerNodeOnce(); gm.getDataReaper().runSharedOnce(); - TUnit.assertThat(() -> gm.findPerNodeGossipData(myId, key)).equals(null); - TUnit.assertThat(() -> gm.findSharedGossipData(key)).equals(null); + assertDataIsRemoved(gm); gm.shutdown(); } + private void assertDataIsAtCorrectValue(GossipManager gm){ + Assert.assertEquals(value, gm.findPerNodeGossipData(myId, key).getPayload()); + Assert.assertEquals(1, registry.getGauges().get(GossipCoreConstants.PER_NODE_DATA_SIZE).getValue()); + Assert.assertEquals(value, gm.findSharedGossipData(key).getPayload()); + Assert.assertEquals(1, registry.getGauges().get(GossipCoreConstants.SHARED_DATA_SIZE).getValue()); + } + + private void assertDataIsRemoved(GossipManager gm){ + TUnit.assertThat(() -> gm.findPerNodeGossipData(myId, key)).equals(null); + TUnit.assertThat(() -> gm.findSharedGossipData(key)).equals(null); + } + private GossipDataMessage perNodeDatum(String key, String value) { GossipDataMessage m = new GossipDataMessage(); m.setExpireAt(System.currentTimeMillis() + 5L);