GOSSIP-22 New failure detector

This commit is contained in:
Edward Capriolo
2017-01-06 20:59:32 -05:00
parent 18c9f911d1
commit f56d282d30
16 changed files with 428 additions and 235 deletions

View File

@ -24,9 +24,12 @@ import java.util.List;
import java.util.Map.Entry;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.gossip.LocalGossipMember;
@ -56,6 +59,8 @@ public class ActiveGossipThread {
private final Random random;
private final GossipCore gossipCore;
private ScheduledExecutorService scheduledExecutorService;
private final BlockingQueue<Runnable> workQueue;
private ThreadPoolExecutor threadService;
private ObjectMapper MAPPER = new ObjectMapper();
public ActiveGossipThread(GossipManager gossipManager, GossipCore gossipCore) {
@ -63,14 +68,18 @@ public class ActiveGossipThread {
random = new Random();
this.gossipCore = gossipCore;
this.scheduledExecutorService = Executors.newScheduledThreadPool(2);
workQueue = new ArrayBlockingQueue<Runnable>(1024);
threadService = new ThreadPoolExecutor(1, 30, 1, TimeUnit.SECONDS, workQueue, new ThreadPoolExecutor.DiscardOldestPolicy());
}
public void init() {
scheduledExecutorService.scheduleAtFixedRate(
() -> sendMembershipList(gossipManager.getMyself(), gossipManager.getLiveMembers()), 0,
() -> {
threadService.execute( () -> { sendToALiveMember(); });
}, 0,
gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
scheduledExecutorService.scheduleAtFixedRate(
() -> sendMembershipList(gossipManager.getMyself(), gossipManager.getDeadMembers()), 0,
() -> { this.sendToDeadMember(); }, 0,
gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
scheduledExecutorService.scheduleAtFixedRate(
() -> sendPerNodeData(gossipManager.getMyself(), gossipManager.getLiveMembers()), 0,
@ -155,26 +164,33 @@ public class ActiveGossipThread {
}
}
protected void sendToALiveMember(){
LocalGossipMember member = selectPartner(gossipManager.getLiveMembers());
sendMembershipList(gossipManager.getMyself(), member);
}
protected void sendToDeadMember(){
LocalGossipMember member = selectPartner(gossipManager.getDeadMembers());
sendMembershipList(gossipManager.getMyself(), member);
}
/**
* Performs the sending of the membership list, after we have incremented our own heartbeat.
*/
protected void sendMembershipList(LocalGossipMember me, List<LocalGossipMember> memberList) {
me.setHeartbeat(System.currentTimeMillis());
LocalGossipMember member = selectPartner(memberList);
protected void sendMembershipList(LocalGossipMember me, LocalGossipMember member) {
me.setHeartbeat(System.nanoTime());
if (member == null) {
LOGGER.debug("Send sendMembershipList() is called without action");
return;
} else {
LOGGER.debug("Send sendMembershipList() is called to " + member.toString());
}
try (DatagramSocket socket = new DatagramSocket()) {
socket.setSoTimeout(gossipManager.getSettings().getGossipInterval());
UdpActiveGossipMessage message = new UdpActiveGossipMessage();
message.setUriFrom(gossipManager.getMyself().getUri().toASCIIString());
message.setUuid(UUID.randomUUID().toString());
message.getMembers().add(convert(me));
for (LocalGossipMember other : memberList) {
for (LocalGossipMember other : gossipManager.getMembers().keySet()) {
message.getMembers().add(convert(other));
}
byte[] json_bytes = MAPPER.writeValueAsString(message).getBytes();
@ -184,7 +200,7 @@ public class ActiveGossipThread {
if (r instanceof ActiveGossipOk){
//maybe count metrics here
} else {
LOGGER.warn("Message "+ message + " generated response "+ r);
LOGGER.debug("Message " + message + " generated response " + r);
}
} else {
LOGGER.error("The length of the to be send message is too large ("

View File

@ -25,6 +25,7 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map.Entry;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
@ -33,12 +34,12 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor.DiscardPolicy;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.gossip.GossipMember;
import org.apache.gossip.LocalGossipMember;
import org.apache.gossip.RemoteGossipMember;
import org.apache.gossip.event.GossipState;
import org.apache.gossip.model.ActiveGossipMessage;
import org.apache.gossip.model.Base;
import org.apache.gossip.model.GossipDataMessage;
@ -71,7 +72,7 @@ public class GossipCore {
this.gossipManager = manager;
requests = new ConcurrentHashMap<>();
workQueue = new ArrayBlockingQueue<>(1024);
service = new ThreadPoolExecutor(1, 5, 1, TimeUnit.SECONDS, workQueue, new DiscardPolicy());
service = new ThreadPoolExecutor(1, 5, 1, TimeUnit.SECONDS, workQueue, new ThreadPoolExecutor.DiscardOldestPolicy());
perNodeData = new ConcurrentHashMap<>();
sharedData = new ConcurrentHashMap<>();
}
@ -195,6 +196,11 @@ public class GossipCore {
}
public Response send(Base message, URI uri){
if (LOGGER.isDebugEnabled()){
LOGGER.debug("Sending " + message);
LOGGER.debug("Current request queue " + requests);
}
final Trackable t;
if (message instanceof Trackable){
t = (Trackable) message;
@ -223,7 +229,8 @@ public class GossipCore {
});
try {
return response.get(10, TimeUnit.SECONDS);
//TODO this needs to be a setting base on attempts/second
return response.get(1, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
@ -261,81 +268,67 @@ public class GossipCore {
/**
* Merge remote list (received from peer), and our local member list. Simply, we must update the
* heartbeats that the remote list has with our list. Also, some additional logic is needed to
* make sure we have not timed out a member and then immediately received a list with that member.
* Merge lists from remote members and update heartbeats
*
* @param gossipManager
* @param senderMember
* @param remoteList
*
* COPIED FROM PASSIVE GOSSIP THREAD
*/
protected void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember,
List<GossipMember> remoteList) {
if (LOGGER.isDebugEnabled()){
debugState(senderMember, remoteList);
}
// if the person sending to us is in the dead list consider them up
for (LocalGossipMember i : gossipManager.getDeadMembers()) {
if (i.getId().equals(senderMember.getId())) {
LOGGER.info(gossipManager.getMyself() + " contacted by dead member " + senderMember.getUri());
LocalGossipMember newLocalMember = new LocalGossipMember(senderMember.getClusterName(),
senderMember.getUri(), senderMember.getId(),
senderMember.getHeartbeat(), gossipManager, gossipManager.getSettings()
.getCleanupInterval());
gossipManager.reviveMember(newLocalMember);
newLocalMember.startTimeoutTimer();
LOGGER.debug(gossipManager.getMyself() + " contacted by dead member " + senderMember.getUri());
i.recordHeartbeat(senderMember.getHeartbeat());
i.setHeartbeat(senderMember.getHeartbeat());
//TODO set node to UP here
}
}
for (GossipMember remoteMember : remoteList) {
if (remoteMember.getId().equals(gossipManager.getMyself().getId())) {
continue;
}
if (gossipManager.getLiveMembers().contains(remoteMember)) {
LocalGossipMember localMember = gossipManager.getLiveMembers().get(
gossipManager.getLiveMembers().indexOf(remoteMember));
if (remoteMember.getHeartbeat() > localMember.getHeartbeat()) {
localMember.setHeartbeat(remoteMember.getHeartbeat());
localMember.resetTimeoutTimer();
}
} else if (!gossipManager.getLiveMembers().contains(remoteMember)
&& !gossipManager.getDeadMembers().contains(remoteMember)) {
LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getClusterName(),
remoteMember.getUri(), remoteMember.getId(),
remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings()
.getCleanupInterval());
gossipManager.createOrReviveMember(newLocalMember);
newLocalMember.startTimeoutTimer();
} else {
if (gossipManager.getDeadMembers().contains(remoteMember)) {
LocalGossipMember localDeadMember = gossipManager.getDeadMembers().get(
gossipManager.getDeadMembers().indexOf(remoteMember));
if (remoteMember.getHeartbeat() > localDeadMember.getHeartbeat()) {
LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getClusterName(),
remoteMember.getUri(), remoteMember.getId(),
remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings()
.getCleanupInterval());
gossipManager.reviveMember(newLocalMember);
newLocalMember.startTimeoutTimer();
LOGGER.debug("Removed remote member " + remoteMember.getAddress()
+ " from dead list and added to local member list.");
} else {
LOGGER.debug("me " + gossipManager.getMyself());
LOGGER.debug("sender " + senderMember);
LOGGER.debug("remote " + remoteList);
LOGGER.debug("live " + gossipManager.getLiveMembers());
LOGGER.debug("dead " + gossipManager.getDeadMembers());
LocalGossipMember m = new LocalGossipMember(remoteMember.getClusterName(),
remoteMember.getUri(),
remoteMember.getId(),
remoteMember.getHeartbeat(),
gossipManager.getSettings().getWindowSize(),
gossipManager.getSettings().getMinimumSamples());
m.recordHeartbeat(remoteMember.getHeartbeat());
Object result = gossipManager.getMembers().putIfAbsent(m, GossipState.UP);
if (result != null){
for (Entry<LocalGossipMember, GossipState> l : gossipManager.getMembers().entrySet()){
if (l.getKey().getId().equals(remoteMember.getId())){
//if (l.getKey().getHeartbeat() < remoteMember.getHeartbeat()){
l.getKey().recordHeartbeat(remoteMember.getHeartbeat());
l.getKey().setHeartbeat(remoteMember.getHeartbeat());
//}
}
} else {
LOGGER.debug("me " + gossipManager.getMyself());
LOGGER.debug("sender " + senderMember);
LOGGER.debug("remote " + remoteList);
LOGGER.debug("live " + gossipManager.getLiveMembers());
LOGGER.debug("dead " + gossipManager.getDeadMembers());
// throw new IllegalArgumentException("wtf");
}
}
}
if (LOGGER.isDebugEnabled()){
debugState(senderMember, remoteList);
}
}
private void debugState(RemoteGossipMember senderMember,
List<GossipMember> remoteList){
LOGGER.warn(
"-----------------------\n" +
"Me " + gossipManager.getMyself() + "\n" +
"Sender " + senderMember + "\n" +
"RemoteList " + remoteList + "\n" +
"Live " + gossipManager.getLiveMembers()+ "\n" +
"Dead " + gossipManager.getDeadMembers()+ "\n" +
"=======================");
}
}

View File

@ -27,16 +27,13 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.management.Notification;
import javax.management.NotificationListener;
import org.apache.log4j.Logger;
import org.apache.gossip.GossipMember;
import org.apache.gossip.GossipService;
import org.apache.gossip.GossipSettings;
import org.apache.gossip.LocalGossipMember;
import org.apache.gossip.event.GossipListener;
@ -47,7 +44,7 @@ import org.apache.gossip.model.GossipDataMessage;
import org.apache.gossip.model.SharedGossipDataMessage;
public abstract class GossipManager implements NotificationListener {
public abstract class GossipManager {
public static final Logger LOGGER = Logger.getLogger(GossipManager.class);
@ -74,6 +71,8 @@ public abstract class GossipManager implements NotificationListener {
private final DataReaper dataReaper;
private final Clock clock;
private final ScheduledExecutorService scheduledServiced;
public GossipManager(String cluster,
URI uri, String id, GossipSettings settings,
@ -83,60 +82,26 @@ public abstract class GossipManager implements NotificationListener {
gossipCore = new GossipCore(this);
clock = new SystemClock();
dataReaper = new DataReaper(gossipCore, clock);
me = new LocalGossipMember(cluster, uri, id, System.currentTimeMillis(), this,
settings.getCleanupInterval());
me = new LocalGossipMember(cluster, uri, id, clock.nanoTime(),
+ settings.getWindowSize(), settings.getMinimumSamples());
members = new ConcurrentSkipListMap<>();
for (GossipMember startupMember : gossipMembers) {
if (!startupMember.equals(me)) {
LocalGossipMember member = new LocalGossipMember(startupMember.getClusterName(),
startupMember.getUri(), startupMember.getId(),
System.currentTimeMillis(), this, settings.getCleanupInterval());
members.put(member, GossipState.UP);
GossipService.LOGGER.debug(member);
clock.nanoTime(), settings.getWindowSize(), settings.getMinimumSamples());
//TODO should members start in down state?
members.put(member, GossipState.DOWN);
}
}
gossipThreadExecutor = Executors.newCachedThreadPool();
gossipServiceRunning = new AtomicBoolean(true);
this.listener = listener;
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
public void run() {
GossipService.LOGGER.debug("Service has been shutdown...");
}
}));
this.scheduledServiced = Executors.newScheduledThreadPool(1);
}
/**
* All timers associated with a member will trigger this method when it goes off. The timer will
* go off if we have not heard from this member in <code> _settings.T_CLEANUP </code> time.
*/
@Override
public void handleNotification(Notification notification, Object handback) {
LocalGossipMember deadMember = (LocalGossipMember) notification.getUserData();
GossipService.LOGGER.debug("Dead member detected: " + deadMember);
members.put(deadMember, GossipState.DOWN);
if (listener != null) {
listener.gossipEvent(deadMember, GossipState.DOWN);
}
}
public void reviveMember(LocalGossipMember m) {
for (Entry<LocalGossipMember, GossipState> it : this.members.entrySet()) {
if (it.getKey().getId().equals(m.getId())) {
it.getKey().disableTimer();
}
}
members.remove(m);
members.put(m, GossipState.UP);
if (listener != null) {
listener.gossipEvent(m, GossipState.UP);
}
}
public void createOrReviveMember(LocalGossipMember m) {
members.put(m, GossipState.UP);
if (listener != null) {
listener.gossipEvent(m, GossipState.UP);
}
public ConcurrentSkipListMap<LocalGossipMember, GossipState> getMembers() {
return members;
}
public GossipSettings getSettings() {
@ -181,17 +146,44 @@ public abstract class GossipManager implements NotificationListener {
* thread and start the receiver thread.
*/
public void init() {
for (LocalGossipMember member : members.keySet()) {
if (member != me) {
member.startTimeoutTimer();
}
}
passiveGossipThread = new OnlyProcessReceivedPassiveGossipThread(this, gossipCore);
gossipThreadExecutor.execute(passiveGossipThread);
activeGossipThread = new ActiveGossipThread(this, this.gossipCore);
activeGossipThread.init();
dataReaper.init();
GossipService.LOGGER.debug("The GossipService is started.");
scheduledServiced.scheduleAtFixedRate(() -> {
try {
for (Entry<LocalGossipMember, GossipState> entry : members.entrySet()) {
Double result = null;
try {
result = entry.getKey().detect(clock.nanoTime());
//System.out.println(entry.getKey() +" "+ result);
if (result != null) {
if (result > settings.getConvictThreshold() && entry.getValue() == GossipState.UP) {
members.put(entry.getKey(), GossipState.DOWN);
listener.gossipEvent(entry.getKey(), GossipState.DOWN);
}
if (result <= settings.getConvictThreshold() && entry.getValue() == GossipState.DOWN) {
members.put(entry.getKey(), GossipState.UP);
listener.gossipEvent(entry.getKey(), GossipState.UP);
}
}
} catch (IllegalArgumentException ex) {
//0.0 returns throws exception computing the mean.
long now = clock.nanoTime();
long nowInMillis = TimeUnit.MILLISECONDS.convert(now,TimeUnit.NANOSECONDS);
if (nowInMillis - settings.getCleanupInterval() > entry.getKey().getHeartbeat() && entry.getValue() == GossipState.UP){
LOGGER.warn("Marking down");
members.put(entry.getKey(), GossipState.DOWN);
listener.gossipEvent(entry.getKey(), GossipState.DOWN);
}
} //end catch
} // end for
} catch (RuntimeException ex) {
LOGGER.warn("scheduled state had exception", ex);
}
}, 0, 100, TimeUnit.MILLISECONDS);
LOGGER.debug("The GossipManager is started.");
}
/**

View File

@ -84,8 +84,8 @@ public class RandomGossipManager extends GossipManager {
checkArgument(cluster != null, "You must specify a cluster name");
checkArgument(settings != null, "You must specify gossip settings");
checkArgument(uri != null, "You must specify a uri");
if (this.gossipMembers == null) {
this.gossipMembers = new ArrayList<>();
if (gossipMembers == null) {
gossipMembers = new ArrayList<>();
}
return new RandomGossipManager(cluster, uri, id, settings, gossipMembers, listener);
}