GOSSIP-17 Add metrics (Chandresh Pancholi via egc)
This commit is contained in:
@ -32,6 +32,8 @@ import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import com.codahale.metrics.Histogram;
|
||||
import com.codahale.metrics.MetricRegistry;
|
||||
import org.apache.gossip.LocalGossipMember;
|
||||
import org.apache.gossip.model.ActiveGossipOk;
|
||||
import org.apache.gossip.model.GossipDataMessage;
|
||||
@ -45,11 +47,10 @@ import org.apache.log4j.Logger;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
import static com.codahale.metrics.MetricRegistry.name;
|
||||
|
||||
/**
|
||||
* [The active thread: periodically send gossip request.] The class handles gossiping the membership
|
||||
* list. This information is important to maintaining a common state among all the nodes, and is
|
||||
* important for detecting failures.
|
||||
* The ActiveGossipThread is sends information. Pick a random partner and send the membership list to that partner
|
||||
*/
|
||||
public class ActiveGossipThread {
|
||||
|
||||
@ -63,15 +64,23 @@ public class ActiveGossipThread {
|
||||
private ThreadPoolExecutor threadService;
|
||||
private ObjectMapper MAPPER = new ObjectMapper();
|
||||
|
||||
public ActiveGossipThread(GossipManager gossipManager, GossipCore gossipCore) {
|
||||
private final Histogram sharedDataHistogram;
|
||||
private final Histogram sendPerNodeDataHistogram;
|
||||
private final Histogram sendMembershipHistorgram;
|
||||
|
||||
public ActiveGossipThread(GossipManager gossipManager, GossipCore gossipCore, MetricRegistry registry) {
|
||||
this.gossipManager = gossipManager;
|
||||
random = new Random();
|
||||
this.gossipCore = gossipCore;
|
||||
this.scheduledExecutorService = Executors.newScheduledThreadPool(2);
|
||||
scheduledExecutorService = Executors.newScheduledThreadPool(2);
|
||||
workQueue = new ArrayBlockingQueue<Runnable>(1024);
|
||||
threadService = new ThreadPoolExecutor(1, 30, 1, TimeUnit.SECONDS, workQueue, new ThreadPoolExecutor.DiscardOldestPolicy());
|
||||
sharedDataHistogram = registry.histogram(name(ActiveGossipThread.class, "sharedDataHistogram-time"));
|
||||
sendPerNodeDataHistogram = registry.histogram(name(ActiveGossipThread.class, "sendPerNodeDataHistogram-time"));
|
||||
sendMembershipHistorgram = registry.histogram(name(ActiveGossipThread.class, "sendMembershipHistorgram-time"));
|
||||
}
|
||||
|
||||
|
||||
|
||||
public void init() {
|
||||
scheduledExecutorService.scheduleAtFixedRate(
|
||||
() -> {
|
||||
@ -99,9 +108,12 @@ public class ActiveGossipThread {
|
||||
}
|
||||
|
||||
public void sendSharedData(LocalGossipMember me, List<LocalGossipMember> memberList){
|
||||
long startTime = System.currentTimeMillis();
|
||||
|
||||
LocalGossipMember member = selectPartner(memberList);
|
||||
if (member == null) {
|
||||
LOGGER.debug("Send sendMembershipList() is called without action");
|
||||
sharedDataHistogram.update(System.currentTimeMillis() - startTime);
|
||||
return;
|
||||
}
|
||||
try (DatagramSocket socket = new DatagramSocket()) {
|
||||
@ -128,12 +140,16 @@ public class ActiveGossipThread {
|
||||
} catch (IOException e1) {
|
||||
LOGGER.warn(e1);
|
||||
}
|
||||
sharedDataHistogram.update(System.currentTimeMillis() - startTime);
|
||||
}
|
||||
|
||||
public void sendPerNodeData(LocalGossipMember me, List<LocalGossipMember> memberList){
|
||||
long startTime = System.currentTimeMillis();
|
||||
|
||||
LocalGossipMember member = selectPartner(memberList);
|
||||
if (member == null) {
|
||||
LOGGER.debug("Send sendMembershipList() is called without action");
|
||||
sendPerNodeDataHistogram.update(System.currentTimeMillis() - startTime);
|
||||
return;
|
||||
}
|
||||
try (DatagramSocket socket = new DatagramSocket()) {
|
||||
@ -162,6 +178,7 @@ public class ActiveGossipThread {
|
||||
} catch (IOException e1) {
|
||||
LOGGER.warn(e1);
|
||||
}
|
||||
sendPerNodeDataHistogram.update(System.currentTimeMillis() - startTime);
|
||||
}
|
||||
|
||||
protected void sendToALiveMember(){
|
||||
@ -176,10 +193,13 @@ public class ActiveGossipThread {
|
||||
/**
|
||||
* Performs the sending of the membership list, after we have incremented our own heartbeat.
|
||||
*/
|
||||
protected void sendMembershipList(LocalGossipMember me, LocalGossipMember member) {
|
||||
protected void sendMembershipList(LocalGossipMember me, LocalGossipMember member) {
|
||||
long startTime = System.currentTimeMillis();
|
||||
|
||||
me.setHeartbeat(System.nanoTime());
|
||||
if (member == null) {
|
||||
LOGGER.debug("Send sendMembershipList() is called without action");
|
||||
sendMembershipHistorgram.update(System.currentTimeMillis() - startTime);
|
||||
return;
|
||||
} else {
|
||||
LOGGER.debug("Send sendMembershipList() is called to " + member.toString());
|
||||
@ -209,6 +229,7 @@ public class ActiveGossipThread {
|
||||
} catch (IOException e1) {
|
||||
LOGGER.warn(e1);
|
||||
}
|
||||
sendMembershipHistorgram.update(System.currentTimeMillis() - startTime);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -17,6 +17,7 @@
|
||||
*/
|
||||
package org.apache.gossip.manager;
|
||||
|
||||
import com.codahale.metrics.MetricRegistry;
|
||||
import java.net.URI;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
@ -74,9 +75,11 @@ public abstract class GossipManager {
|
||||
|
||||
private final ScheduledExecutorService scheduledServiced;
|
||||
|
||||
private MetricRegistry registry;
|
||||
|
||||
public GossipManager(String cluster,
|
||||
URI uri, String id, GossipSettings settings,
|
||||
List<GossipMember> gossipMembers, GossipListener listener) {
|
||||
List<GossipMember> gossipMembers, GossipListener listener, MetricRegistry registry) {
|
||||
|
||||
this.settings = settings;
|
||||
gossipCore = new GossipCore(this);
|
||||
@ -98,6 +101,7 @@ public abstract class GossipManager {
|
||||
gossipServiceRunning = new AtomicBoolean(true);
|
||||
this.listener = listener;
|
||||
this.scheduledServiced = Executors.newScheduledThreadPool(1);
|
||||
this.registry = registry;
|
||||
}
|
||||
|
||||
public ConcurrentSkipListMap<LocalGossipMember, GossipState> getMembers() {
|
||||
@ -148,7 +152,7 @@ public abstract class GossipManager {
|
||||
public void init() {
|
||||
passiveGossipThread = new OnlyProcessReceivedPassiveGossipThread(this, gossipCore);
|
||||
gossipThreadExecutor.execute(passiveGossipThread);
|
||||
activeGossipThread = new ActiveGossipThread(this, this.gossipCore);
|
||||
activeGossipThread = new ActiveGossipThread(this, this.gossipCore, registry);
|
||||
activeGossipThread.init();
|
||||
dataReaper.init();
|
||||
scheduledServiced.scheduleAtFixedRate(() -> {
|
||||
|
@ -17,6 +17,7 @@
|
||||
*/
|
||||
package org.apache.gossip.manager.random;
|
||||
|
||||
import com.codahale.metrics.MetricRegistry;
|
||||
import org.apache.gossip.GossipMember;
|
||||
import org.apache.gossip.GossipSettings;
|
||||
import org.apache.gossip.event.GossipListener;
|
||||
@ -40,6 +41,7 @@ public class RandomGossipManager extends GossipManager {
|
||||
private GossipSettings settings;
|
||||
private List<GossipMember> gossipMembers;
|
||||
private GossipListener listener;
|
||||
private MetricRegistry registry;
|
||||
|
||||
private ManagerBuilder() {}
|
||||
|
||||
@ -73,6 +75,10 @@ public class RandomGossipManager extends GossipManager {
|
||||
this.listener = listener;
|
||||
return this;
|
||||
}
|
||||
public ManagerBuilder registry(MetricRegistry registry) {
|
||||
this.registry = registry;
|
||||
return this;
|
||||
}
|
||||
|
||||
public ManagerBuilder uri(URI uri){
|
||||
this.uri = uri;
|
||||
@ -84,15 +90,19 @@ public class RandomGossipManager extends GossipManager {
|
||||
checkArgument(cluster != null, "You must specify a cluster name");
|
||||
checkArgument(settings != null, "You must specify gossip settings");
|
||||
checkArgument(uri != null, "You must specify a uri");
|
||||
checkArgument(registry != null, "You must specify a MetricRegistry");
|
||||
if (listener == null){
|
||||
listener((a,b) -> {});
|
||||
}
|
||||
if (gossipMembers == null) {
|
||||
gossipMembers = new ArrayList<>();
|
||||
}
|
||||
return new RandomGossipManager(cluster, uri, id, settings, gossipMembers, listener);
|
||||
return new RandomGossipManager(cluster, uri, id, settings, gossipMembers, listener, registry);
|
||||
}
|
||||
}
|
||||
|
||||
private RandomGossipManager(String cluster, URI uri, String id, GossipSettings settings,
|
||||
List<GossipMember> gossipMembers, GossipListener listener) {
|
||||
super(cluster, uri, id, settings, gossipMembers, listener);
|
||||
List<GossipMember> gossipMembers, GossipListener listener, MetricRegistry registry) {
|
||||
super(cluster, uri, id, settings, gossipMembers, listener, registry);
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user