This commit is contained in:
edward
2017-08-08 21:20:24 -04:00
2 changed files with 6 additions and 10 deletions

View File

@ -57,26 +57,23 @@ public class GossipCore implements GossipCoreConstants {
private ConcurrentHashMap<String, LatchAndBase> requests; private ConcurrentHashMap<String, LatchAndBase> requests;
private final ConcurrentHashMap<String, ConcurrentHashMap<String, PerNodeDataMessage>> perNodeData; private final ConcurrentHashMap<String, ConcurrentHashMap<String, PerNodeDataMessage>> perNodeData;
private final ConcurrentHashMap<String, SharedDataMessage> sharedData; private final ConcurrentHashMap<String, SharedDataMessage> sharedData;
private final BlockingQueue<Runnable> workQueue;
private final Meter messageSerdeException; private final Meter messageSerdeException;
private final Meter tranmissionException; private final Meter transmissionException;
private final Meter tranmissionSuccess; private final Meter transmissionSuccess;
private final DataEventManager eventManager; private final DataEventManager eventManager;
public GossipCore(GossipManager manager, MetricRegistry metrics){ public GossipCore(GossipManager manager, MetricRegistry metrics){
this.gossipManager = manager; this.gossipManager = manager;
requests = new ConcurrentHashMap<>(); requests = new ConcurrentHashMap<>();
workQueue = new ArrayBlockingQueue<>(1024);
perNodeData = new ConcurrentHashMap<>(); perNodeData = new ConcurrentHashMap<>();
sharedData = new ConcurrentHashMap<>(); sharedData = new ConcurrentHashMap<>();
eventManager = new DataEventManager(metrics); eventManager = new DataEventManager(metrics);
metrics.register(WORKQUEUE_SIZE, (Gauge<Integer>)() -> workQueue.size());
metrics.register(PER_NODE_DATA_SIZE, (Gauge<Integer>)() -> perNodeData.size()); metrics.register(PER_NODE_DATA_SIZE, (Gauge<Integer>)() -> perNodeData.size());
metrics.register(SHARED_DATA_SIZE, (Gauge<Integer>)() -> sharedData.size()); metrics.register(SHARED_DATA_SIZE, (Gauge<Integer>)() -> sharedData.size());
metrics.register(REQUEST_SIZE, (Gauge<Integer>)() -> requests.size()); metrics.register(REQUEST_SIZE, (Gauge<Integer>)() -> requests.size());
messageSerdeException = metrics.meter(MESSAGE_SERDE_EXCEPTION); messageSerdeException = metrics.meter(MESSAGE_SERDE_EXCEPTION);
tranmissionException = metrics.meter(MESSAGE_TRANSMISSION_EXCEPTION); transmissionException = metrics.meter(MESSAGE_TRANSMISSION_EXCEPTION);
tranmissionSuccess = metrics.meter(MESSAGE_TRANSMISSION_SUCCESS); transmissionSuccess = metrics.meter(MESSAGE_TRANSMISSION_SUCCESS);
} }
@SuppressWarnings({ "unchecked", "rawtypes" }) @SuppressWarnings({ "unchecked", "rawtypes" })
@ -172,9 +169,9 @@ public class GossipCore implements GossipCoreConstants {
} }
try { try {
gossipManager.getTransportManager().send(uri, json_bytes); gossipManager.getTransportManager().send(uri, json_bytes);
tranmissionSuccess.mark(); transmissionSuccess.mark();
} catch (IOException e) { } catch (IOException e) {
tranmissionException.mark(); transmissionException.mark();
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} }

View File

@ -18,7 +18,6 @@
package org.apache.gossip.manager; package org.apache.gossip.manager;
public interface GossipCoreConstants { public interface GossipCoreConstants {
String WORKQUEUE_SIZE = "gossip.core.workqueue.size";
String PER_NODE_DATA_SIZE = "gossip.core.pernodedata.size"; String PER_NODE_DATA_SIZE = "gossip.core.pernodedata.size";
String SHARED_DATA_SIZE = "gossip.core.shareddata.size"; String SHARED_DATA_SIZE = "gossip.core.shareddata.size";
String REQUEST_SIZE = "gossip.core.requests.size"; String REQUEST_SIZE = "gossip.core.requests.size";