From 9487b8c2238e86da3f4a2373979533ee72afa684 Mon Sep 17 00:00:00 2001 From: Edward Capriolo Date: Sat, 14 Jan 2017 22:18:33 -0500 Subject: [PATCH] GOSSIP-17 Add metrics (Chandresh Pancholi via egc) --- pom.xml | 5 +++ .../java/org/apache/gossip/GossipRunner.java | 2 -- .../java/org/apache/gossip/GossipService.java | 13 ++++--- .../apache/gossip/examples/GossipExample.java | 3 +- .../gossip/examples/StandAloneNode.java | 3 +- .../gossip/manager/ActiveGossipThread.java | 35 +++++++++++++++---- .../apache/gossip/manager/GossipManager.java | 8 +++-- .../manager/random/RandomGossipManager.java | 16 +++++++-- src/test/java/org/apache/gossip/DataTest.java | 9 ++--- .../apache/gossip/ShutdownDeadtimeTest.java | 5 +-- .../apache/gossip/StartupSettingsTest.java | 4 +-- .../apache/gossip/TenNodeThreeSeedTest.java | 3 +- .../apache/gossip/manager/DataReaperTest.java | 8 +++-- .../RandomGossipManagerBuilderTest.java | 5 +-- 14 files changed, 82 insertions(+), 37 deletions(-) diff --git a/pom.xml b/pom.xml index 71db8ed..a104451 100644 --- a/pom.xml +++ b/pom.xml @@ -39,6 +39,7 @@ 2.8.5 + 3.1.2 1.2 5.0.0-M2 1.0.0-M2 @@ -90,6 +91,10 @@ jackson-databind ${jackson.version} + + io.dropwizard.metrics + metrics-core + ${metrics.version} org.junit.jupiter junit-jupiter-api diff --git a/src/main/java/org/apache/gossip/GossipRunner.java b/src/main/java/org/apache/gossip/GossipRunner.java index 1ae609c..c8a1f13 100644 --- a/src/main/java/org/apache/gossip/GossipRunner.java +++ b/src/main/java/org/apache/gossip/GossipRunner.java @@ -22,8 +22,6 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.net.URISyntaxException; - - public class GossipRunner { public static void main(String[] args) throws URISyntaxException { diff --git a/src/main/java/org/apache/gossip/GossipService.java b/src/main/java/org/apache/gossip/GossipService.java index 80c01ca..fca9f28 100644 --- a/src/main/java/org/apache/gossip/GossipService.java +++ b/src/main/java/org/apache/gossip/GossipService.java @@ -17,10 +17,12 @@ */ package org.apache.gossip; +import com.codahale.metrics.MetricRegistry; import java.net.URI; import java.net.UnknownHostException; import java.util.List; +import com.codahale.metrics.JmxReporter; import org.apache.gossip.event.GossipListener; import org.apache.gossip.manager.GossipManager; import org.apache.gossip.manager.random.RandomGossipManager; @@ -35,7 +37,8 @@ import org.apache.log4j.Logger; public class GossipService { public static final Logger LOGGER = Logger.getLogger(GossipService.class); - + private final JmxReporter jmxReporter; + private final GossipManager gossipManager; /** @@ -48,7 +51,7 @@ public class GossipService { UnknownHostException { this(startupSettings.getCluster(), startupSettings.getUri() , startupSettings.getId(), startupSettings.getGossipMembers(), - startupSettings.getGossipSettings(), null); + startupSettings.getGossipSettings(), null, new MetricRegistry()); } /** @@ -58,8 +61,10 @@ public class GossipService { * @throws UnknownHostException */ public GossipService(String cluster, URI uri, String id, - List gossipMembers, GossipSettings settings, GossipListener listener) + List gossipMembers, GossipSettings settings, GossipListener listener, MetricRegistry registry) throws InterruptedException, UnknownHostException { + jmxReporter = JmxReporter.forRegistry(registry).build(); + jmxReporter.start(); gossipManager = RandomGossipManager.newBuilder() .withId(id) .cluster(cluster) @@ -67,11 +72,11 @@ public class GossipService { .settings(settings) .gossipMembers(gossipMembers) .listener(listener) + .registry(registry) .build(); } public void start() { - LOGGER.debug("Starting: " + getGossipManager().getMyself().getUri()); gossipManager.init(); } diff --git a/src/main/java/org/apache/gossip/examples/GossipExample.java b/src/main/java/org/apache/gossip/examples/GossipExample.java index cea59f4..01cd3e3 100644 --- a/src/main/java/org/apache/gossip/examples/GossipExample.java +++ b/src/main/java/org/apache/gossip/examples/GossipExample.java @@ -17,6 +17,7 @@ */ package org.apache.gossip.examples; +import com.codahale.metrics.MetricRegistry; import java.net.InetAddress; import java.net.URI; import java.net.URISyntaxException; @@ -81,7 +82,7 @@ public class GossipExample extends Thread { // dead list handling. for (GossipMember member : startupMembers) { GossipService gossipService = new GossipService(cluster, member.getUri(), "", - startupMembers, settings, null); + startupMembers, settings, null, new MetricRegistry()); clients.add(gossipService); gossipService.start(); sleep(settings.getCleanupInterval() + 1000); diff --git a/src/main/java/org/apache/gossip/examples/StandAloneNode.java b/src/main/java/org/apache/gossip/examples/StandAloneNode.java index b6784cc..c12f946 100644 --- a/src/main/java/org/apache/gossip/examples/StandAloneNode.java +++ b/src/main/java/org/apache/gossip/examples/StandAloneNode.java @@ -17,6 +17,7 @@ */ package org.apache.gossip.examples; +import com.codahale.metrics.MetricRegistry; import java.net.URI; import java.net.UnknownHostException; import java.util.Arrays; @@ -32,7 +33,7 @@ public class StandAloneNode { s.setConvictThreshold(1.0); s.setGossipInterval(10); GossipService gossipService = new GossipService("mycluster", URI.create(args[0]), args[1], - Arrays.asList( new RemoteGossipMember("mycluster", URI.create(args[2]), args[3])), s, (a,b) -> {} ); + Arrays.asList( new RemoteGossipMember("mycluster", URI.create(args[2]), args[3])), s, (a,b) -> {}, new MetricRegistry()); gossipService.start(); while (true){ System.out.println( "Live: " + gossipService.getGossipManager().getLiveMembers()); diff --git a/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java b/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java index e6248dc..c09cfe9 100644 --- a/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java +++ b/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java @@ -32,6 +32,8 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import com.codahale.metrics.Histogram; +import com.codahale.metrics.MetricRegistry; import org.apache.gossip.LocalGossipMember; import org.apache.gossip.model.ActiveGossipOk; import org.apache.gossip.model.GossipDataMessage; @@ -45,11 +47,10 @@ import org.apache.log4j.Logger; import com.fasterxml.jackson.databind.ObjectMapper; +import static com.codahale.metrics.MetricRegistry.name; /** - * [The active thread: periodically send gossip request.] The class handles gossiping the membership - * list. This information is important to maintaining a common state among all the nodes, and is - * important for detecting failures. + * The ActiveGossipThread is sends information. Pick a random partner and send the membership list to that partner */ public class ActiveGossipThread { @@ -63,15 +64,23 @@ public class ActiveGossipThread { private ThreadPoolExecutor threadService; private ObjectMapper MAPPER = new ObjectMapper(); - public ActiveGossipThread(GossipManager gossipManager, GossipCore gossipCore) { + private final Histogram sharedDataHistogram; + private final Histogram sendPerNodeDataHistogram; + private final Histogram sendMembershipHistorgram; + + public ActiveGossipThread(GossipManager gossipManager, GossipCore gossipCore, MetricRegistry registry) { this.gossipManager = gossipManager; random = new Random(); this.gossipCore = gossipCore; - this.scheduledExecutorService = Executors.newScheduledThreadPool(2); + scheduledExecutorService = Executors.newScheduledThreadPool(2); workQueue = new ArrayBlockingQueue(1024); threadService = new ThreadPoolExecutor(1, 30, 1, TimeUnit.SECONDS, workQueue, new ThreadPoolExecutor.DiscardOldestPolicy()); + sharedDataHistogram = registry.histogram(name(ActiveGossipThread.class, "sharedDataHistogram-time")); + sendPerNodeDataHistogram = registry.histogram(name(ActiveGossipThread.class, "sendPerNodeDataHistogram-time")); + sendMembershipHistorgram = registry.histogram(name(ActiveGossipThread.class, "sendMembershipHistorgram-time")); } - + + public void init() { scheduledExecutorService.scheduleAtFixedRate( () -> { @@ -99,9 +108,12 @@ public class ActiveGossipThread { } public void sendSharedData(LocalGossipMember me, List memberList){ + long startTime = System.currentTimeMillis(); + LocalGossipMember member = selectPartner(memberList); if (member == null) { LOGGER.debug("Send sendMembershipList() is called without action"); + sharedDataHistogram.update(System.currentTimeMillis() - startTime); return; } try (DatagramSocket socket = new DatagramSocket()) { @@ -128,12 +140,16 @@ public class ActiveGossipThread { } catch (IOException e1) { LOGGER.warn(e1); } + sharedDataHistogram.update(System.currentTimeMillis() - startTime); } public void sendPerNodeData(LocalGossipMember me, List memberList){ + long startTime = System.currentTimeMillis(); + LocalGossipMember member = selectPartner(memberList); if (member == null) { LOGGER.debug("Send sendMembershipList() is called without action"); + sendPerNodeDataHistogram.update(System.currentTimeMillis() - startTime); return; } try (DatagramSocket socket = new DatagramSocket()) { @@ -162,6 +178,7 @@ public class ActiveGossipThread { } catch (IOException e1) { LOGGER.warn(e1); } + sendPerNodeDataHistogram.update(System.currentTimeMillis() - startTime); } protected void sendToALiveMember(){ @@ -176,10 +193,13 @@ public class ActiveGossipThread { /** * Performs the sending of the membership list, after we have incremented our own heartbeat. */ - protected void sendMembershipList(LocalGossipMember me, LocalGossipMember member) { + protected void sendMembershipList(LocalGossipMember me, LocalGossipMember member) { + long startTime = System.currentTimeMillis(); + me.setHeartbeat(System.nanoTime()); if (member == null) { LOGGER.debug("Send sendMembershipList() is called without action"); + sendMembershipHistorgram.update(System.currentTimeMillis() - startTime); return; } else { LOGGER.debug("Send sendMembershipList() is called to " + member.toString()); @@ -209,6 +229,7 @@ public class ActiveGossipThread { } catch (IOException e1) { LOGGER.warn(e1); } + sendMembershipHistorgram.update(System.currentTimeMillis() - startTime); } /** diff --git a/src/main/java/org/apache/gossip/manager/GossipManager.java b/src/main/java/org/apache/gossip/manager/GossipManager.java index 2b081d0..a5d57f5 100644 --- a/src/main/java/org/apache/gossip/manager/GossipManager.java +++ b/src/main/java/org/apache/gossip/manager/GossipManager.java @@ -17,6 +17,7 @@ */ package org.apache.gossip.manager; +import com.codahale.metrics.MetricRegistry; import java.net.URI; import java.util.ArrayList; import java.util.Collections; @@ -74,9 +75,11 @@ public abstract class GossipManager { private final ScheduledExecutorService scheduledServiced; + private MetricRegistry registry; + public GossipManager(String cluster, URI uri, String id, GossipSettings settings, - List gossipMembers, GossipListener listener) { + List gossipMembers, GossipListener listener, MetricRegistry registry) { this.settings = settings; gossipCore = new GossipCore(this); @@ -98,6 +101,7 @@ public abstract class GossipManager { gossipServiceRunning = new AtomicBoolean(true); this.listener = listener; this.scheduledServiced = Executors.newScheduledThreadPool(1); + this.registry = registry; } public ConcurrentSkipListMap getMembers() { @@ -148,7 +152,7 @@ public abstract class GossipManager { public void init() { passiveGossipThread = new OnlyProcessReceivedPassiveGossipThread(this, gossipCore); gossipThreadExecutor.execute(passiveGossipThread); - activeGossipThread = new ActiveGossipThread(this, this.gossipCore); + activeGossipThread = new ActiveGossipThread(this, this.gossipCore, registry); activeGossipThread.init(); dataReaper.init(); scheduledServiced.scheduleAtFixedRate(() -> { diff --git a/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java b/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java index 1444181..fd936f1 100644 --- a/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java +++ b/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java @@ -17,6 +17,7 @@ */ package org.apache.gossip.manager.random; +import com.codahale.metrics.MetricRegistry; import org.apache.gossip.GossipMember; import org.apache.gossip.GossipSettings; import org.apache.gossip.event.GossipListener; @@ -40,6 +41,7 @@ public class RandomGossipManager extends GossipManager { private GossipSettings settings; private List gossipMembers; private GossipListener listener; + private MetricRegistry registry; private ManagerBuilder() {} @@ -73,6 +75,10 @@ public class RandomGossipManager extends GossipManager { this.listener = listener; return this; } + public ManagerBuilder registry(MetricRegistry registry) { + this.registry = registry; + return this; + } public ManagerBuilder uri(URI uri){ this.uri = uri; @@ -84,15 +90,19 @@ public class RandomGossipManager extends GossipManager { checkArgument(cluster != null, "You must specify a cluster name"); checkArgument(settings != null, "You must specify gossip settings"); checkArgument(uri != null, "You must specify a uri"); + checkArgument(registry != null, "You must specify a MetricRegistry"); + if (listener == null){ + listener((a,b) -> {}); + } if (gossipMembers == null) { gossipMembers = new ArrayList<>(); } - return new RandomGossipManager(cluster, uri, id, settings, gossipMembers, listener); + return new RandomGossipManager(cluster, uri, id, settings, gossipMembers, listener, registry); } } private RandomGossipManager(String cluster, URI uri, String id, GossipSettings settings, - List gossipMembers, GossipListener listener) { - super(cluster, uri, id, settings, gossipMembers, listener); + List gossipMembers, GossipListener listener, MetricRegistry registry) { + super(cluster, uri, id, settings, gossipMembers, listener, registry); } } diff --git a/src/test/java/org/apache/gossip/DataTest.java b/src/test/java/org/apache/gossip/DataTest.java index d3babf6..766d72b 100644 --- a/src/test/java/org/apache/gossip/DataTest.java +++ b/src/test/java/org/apache/gossip/DataTest.java @@ -17,6 +17,7 @@ */ package org.apache.gossip; +import com.codahale.metrics.MetricRegistry; import java.net.URI; import java.net.URISyntaxException; import java.net.UnknownHostException; @@ -26,8 +27,6 @@ import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; -import org.apache.gossip.event.GossipListener; -import org.apache.gossip.event.GossipState; import org.apache.gossip.model.GossipDataMessage; import org.apache.gossip.model.SharedGossipDataMessage; import org.junit.Test; @@ -52,11 +51,7 @@ public class DataTest { URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i)); GossipService gossipService = new GossipService(cluster, uri, i + "", startupMembers, settings, - new GossipListener(){ - public void gossipEvent(GossipMember member, GossipState state) { - - } - }); + (a,b) -> {}, new MetricRegistry()); clients.add(gossipService); gossipService.start(); } diff --git a/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java b/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java index 1cdb9ac..7b66fbc 100644 --- a/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java +++ b/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java @@ -17,6 +17,7 @@ */ package org.apache.gossip; +import com.codahale.metrics.MetricRegistry; import io.teknek.tunit.TUnit; import java.net.URI; @@ -58,7 +59,7 @@ public class ShutdownDeadtimeTest { for (int i = 1; i < clusterMembers + 1; ++i) { URI uri = new URI("udp://" + "127.0.0.1" + ":" + (30300 + i)); GossipService gossipService = new GossipService(cluster, uri, i + "", startupMembers, - settings, (a,b) -> {}); + settings, (a,b) -> {}, new MetricRegistry()); clients.add(gossipService); gossipService.start(); } @@ -104,7 +105,7 @@ public class ShutdownDeadtimeTest { URI uri = new URI("udp://" + "127.0.0.1" + ":" + shutdownPort); // start client again GossipService gossipService = new GossipService(cluster, uri, shutdownId + "", startupMembers, - settings, (a,b) -> {}); + settings, (a,b) -> {}, new MetricRegistry()); clients.add(gossipService); gossipService.start(); diff --git a/src/test/java/org/apache/gossip/StartupSettingsTest.java b/src/test/java/org/apache/gossip/StartupSettingsTest.java index a798594..61bd4f4 100644 --- a/src/test/java/org/apache/gossip/StartupSettingsTest.java +++ b/src/test/java/org/apache/gossip/StartupSettingsTest.java @@ -17,6 +17,7 @@ */ package org.apache.gossip; +import com.codahale.metrics.MetricRegistry; import org.apache.log4j.Logger; import org.junit.jupiter.api.Test; @@ -40,7 +41,6 @@ public class StartupSettingsTest { private static final Logger log = Logger.getLogger( StartupSettingsTest.class ); private static final String CLUSTER = UUID.randomUUID().toString(); - @Test public void testUsingSettingsFile() throws IOException, InterruptedException, URISyntaxException { File settingsFile = File.createTempFile("gossipTest",".json"); @@ -49,7 +49,7 @@ public class StartupSettingsTest { URI uri = new URI("udp://" + "127.0.0.1" + ":" + 50000); final GossipService firstService = new GossipService( CLUSTER, uri, "1", - new ArrayList(), new GossipSettings(), null); + new ArrayList(), new GossipSettings(), null, new MetricRegistry()); firstService.start(); final GossipService serviceUnderTest = new GossipService( StartupSettings.fromJSONFile(settingsFile)); diff --git a/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java b/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java index aa9e2e8..af7a117 100644 --- a/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java +++ b/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java @@ -17,6 +17,7 @@ */ package org.apache.gossip; +import com.codahale.metrics.MetricRegistry; import io.teknek.tunit.TUnit; import java.net.URI; @@ -58,7 +59,7 @@ public class TenNodeThreeSeedTest { for (int i = 1; i < clusterMembers+1; ++i) { URI uri = new URI("udp://" + "127.0.0.1" + ":" + (base + i)); GossipService gossipService = new GossipService(cluster, uri, i + "", - startupMembers, settings, (a,b) -> {}); + startupMembers, settings, (a,b) -> {}, new MetricRegistry()); clients.add(gossipService); gossipService.start(); } diff --git a/src/test/java/org/apache/gossip/manager/DataReaperTest.java b/src/test/java/org/apache/gossip/manager/DataReaperTest.java index 1b5c35e..5388bb3 100644 --- a/src/test/java/org/apache/gossip/manager/DataReaperTest.java +++ b/src/test/java/org/apache/gossip/manager/DataReaperTest.java @@ -17,6 +17,7 @@ */ package org.apache.gossip.manager; +import com.codahale.metrics.MetricRegistry; import java.net.URI; import org.apache.gossip.GossipSettings; @@ -30,6 +31,8 @@ import io.teknek.tunit.TUnit; public class DataReaperTest { + private final MetricRegistry registry = new MetricRegistry(); + @Test public void testReaperOneShot() { String myId = "4"; @@ -37,7 +40,7 @@ public class DataReaperTest { String value = "a"; GossipSettings settings = new GossipSettings(); GossipManager gm = RandomGossipManager.newBuilder().cluster("abc").settings(settings) - .withId(myId).uri(URI.create("udp://localhost:6000")).build(); + .withId(myId).uri(URI.create("udp://localhost:6000")).registry(registry).build(); gm.init(); gm.gossipPerNodeData(perNodeDatum(key, value)); gm.gossipSharedData(sharedDatum(key, value)); @@ -68,7 +71,6 @@ public class DataReaperTest { return m; } - @Test public void testHigherTimestampWins() { String myId = "4"; @@ -76,7 +78,7 @@ public class DataReaperTest { String value = "a"; GossipSettings settings = new GossipSettings(); GossipManager gm = RandomGossipManager.newBuilder().cluster("abc").settings(settings) - .withId(myId).uri(URI.create("udp://localhost:7000")).build(); + .withId(myId).uri(URI.create("udp://localhost:7000")).registry(registry).build(); gm.init(); GossipDataMessage before = perNodeDatum(key, value); GossipDataMessage after = perNodeDatum(key, "b"); diff --git a/src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java b/src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java index d9635af..0c5aa88 100644 --- a/src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java +++ b/src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java @@ -17,6 +17,7 @@ */ package org.apache.gossip.manager; +import com.codahale.metrics.MetricRegistry; import org.apache.gossip.GossipMember; import org.apache.gossip.GossipSettings; import org.apache.gossip.LocalGossipMember; @@ -82,7 +83,7 @@ public class RandomGossipManagerBuilderTest { .cluster("aCluster") .uri(new URI("udp://localhost:2000")) .settings(new GossipSettings()) - .gossipMembers(null).build(); + .gossipMembers(null).registry(new MetricRegistry()).build(); assertNotNull(gossipManager.getLiveMembers()); } @@ -98,7 +99,7 @@ public class RandomGossipManagerBuilderTest { .cluster("aCluster") .settings(new GossipSettings()) .uri(new URI("udp://localhost:8000")) - .gossipMembers(memberList).build(); + .gossipMembers(memberList).registry(new MetricRegistry()).build(); assertEquals(1, gossipManager.getDeadMembers().size()); assertEquals(member.getId(), gossipManager.getDeadMembers().get(0).getId()); }