GOSSIP-25 Create reaper process to expire per-node data

This commit is contained in:
Edward Capriolo
2016-10-07 02:03:43 -04:00
parent daea6edb15
commit f35dddd8f2
12 changed files with 206 additions and 46 deletions

View File

@ -0,0 +1,8 @@
package org.apache.gossip.manager;
public interface Clock {
long currentTimeMillis();
long nanoTime();
}

View File

@ -0,0 +1,58 @@
package org.apache.gossip.manager;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.gossip.model.GossipDataMessage;
/**
* We wish to periodically sweep user data and remove entries past their timestamp. This
* implementation periodically sweeps through the data and removes old entries. While it might make
* sense to use a more specific high performance data-structure to handle eviction, keep in mind
* that we are not looking to store a large quantity of data as we currently have to transmit this
* data cluster wide.
*/
public class DataReaper {
private final GossipCore gossipCore;
private final ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(1);
private final Clock clock;
public DataReaper(GossipCore gossipCore, Clock clock){
this.gossipCore = gossipCore;
this.clock = clock;
}
public void init(){
Runnable reapPerNodeData = () -> {
runOnce();
};
scheduledExecutor.scheduleAtFixedRate(reapPerNodeData, 0, 5, TimeUnit.SECONDS);
}
void runOnce(){
for (Entry<String, ConcurrentHashMap<String, GossipDataMessage>> node : gossipCore.getPerNodeData().entrySet()){
reapData(node.getValue());
}
}
void reapData(ConcurrentHashMap<String, GossipDataMessage> concurrentHashMap){
for (Entry<String, GossipDataMessage> entry : concurrentHashMap.entrySet()){
if (entry.getValue().getExpireAt() < clock.currentTimeMillis()){
concurrentHashMap.remove(entry.getKey(), entry.getValue());
}
}
}
public void close(){
scheduledExecutor.shutdown();
try {
scheduledExecutor.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
}
}
}

View File

@ -47,16 +47,21 @@ public class GossipCore {
perNodeData = new ConcurrentHashMap<>();
}
/**
*
* @param message
*/
public void addPerNodeData(GossipDataMessage message){
ConcurrentHashMap<String,GossipDataMessage> m = new ConcurrentHashMap<>();
m.put(message.getKey(), message);
m = perNodeData.putIfAbsent(message.getNodeId(), m);
if (m != null){
m.put(message.getKey(), message); //TODO only put if > ts
ConcurrentHashMap<String,GossipDataMessage> nodeMap = new ConcurrentHashMap<>();
nodeMap.put(message.getKey(), message);
nodeMap = perNodeData.putIfAbsent(message.getNodeId(), nodeMap);
if (nodeMap != null){
//m.put(message.getKey(), message); //TODO only put if > ts
GossipDataMessage current = nodeMap.get(message.getKey());
if (current == null){
nodeMap.replace(message.getKey(), null, message);
} else {
if (current.getTimestamp() < message.getTimestamp()){
nodeMap.replace(message.getKey(), current, message);
}
}
}
}

View File

@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
@ -67,14 +68,20 @@ public abstract class GossipManager implements NotificationListener {
private ExecutorService gossipThreadExecutor;
private GossipCore gossipCore;
private final GossipCore gossipCore;
private final DataReaper dataReaper;
private final Clock clock;
public GossipManager(String cluster,
URI uri, String id, GossipSettings settings,
List<GossipMember> gossipMembers, GossipListener listener) {
this.settings = settings;
this.gossipCore = new GossipCore(this);
gossipCore = new GossipCore(this);
clock = new SystemClock();
dataReaper = new DataReaper(gossipCore, clock);
me = new LocalGossipMember(cluster, uri, id, System.currentTimeMillis(), this,
settings.getCleanupInterval());
members = new ConcurrentSkipListMap<>();
@ -192,6 +199,7 @@ public abstract class GossipManager implements NotificationListener {
gossipThreadExecutor.execute(passiveGossipThread);
activeGossipThread = new ActiveGossipThread(this, this.gossipCore);
activeGossipThread.init();
dataReaper.init();
GossipService.LOGGER.debug("The GossipService is started.");
}
@ -202,6 +210,7 @@ public abstract class GossipManager implements NotificationListener {
gossipServiceRunning.set(false);
gossipThreadExecutor.shutdown();
gossipCore.shutdown();
dataReaper.close();
if (passiveGossipThread != null) {
passiveGossipThread.shutdown();
}
@ -218,7 +227,10 @@ public abstract class GossipManager implements NotificationListener {
}
}
public void gossipData(GossipDataMessage message){
public void gossipPerNodeData(GossipDataMessage message){
Objects.nonNull(message.getKey());
Objects.nonNull(message.getTimestamp());
Objects.nonNull(message.getPayload());
message.setNodeId(me.getId());
gossipCore.addPerNodeData(message);
}
@ -228,8 +240,19 @@ public abstract class GossipManager implements NotificationListener {
if (j == null){
return null;
} else {
return j.get(key);
GossipDataMessage l = j.get(key);
if (l == null){
return null;
}
if (l.getExpireAt() != null && l.getExpireAt() < clock.currentTimeMillis()) {
return null;
}
return l;
}
}
public DataReaper getDataReaper() {
return dataReaper;
}
}

View File

@ -23,15 +23,11 @@ import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.SocketException;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.gossip.GossipMember;
import org.apache.gossip.GossipService;
import org.apache.gossip.model.Base;
import org.apache.log4j.Logger;
import org.codehaus.jackson.map.ObjectMapper;
import org.apache.gossip.RemoteGossipMember;
/**
* [The passive thread: reply to incoming gossip request.] This class handles the passive cycle,
@ -107,9 +103,9 @@ abstract public class PassiveGossipThread implements Runnable {
}
private void debug(int packetLength, byte[] jsonBytes) {
if (GossipService.LOGGER.isDebugEnabled()){
if (LOGGER.isDebugEnabled()){
String receivedMessage = new String(jsonBytes);
GossipService.LOGGER.debug("Received message (" + packetLength + " bytes): "
LOGGER.debug("Received message (" + packetLength + " bytes): "
+ receivedMessage);
}
}

View File

@ -0,0 +1,15 @@
package org.apache.gossip.manager;
public class SystemClock implements Clock {
@Override
public long currentTimeMillis() {
return System.currentTimeMillis();
}
@Override
public long nanoTime() {
return System.nanoTime();
}
}