GOSSIP-74 Phi calculation or sampling is wrong in FailureDetector

This commit is contained in:
Maxim Rusak
2017-03-19 21:20:32 +03:00
committed by Edward Capriolo
parent 0136dd9395
commit 4aafc3ba3b
12 changed files with 148 additions and 125 deletions

View File

@ -30,19 +30,18 @@ public class GossipSettings {
private int gossipInterval = 10; private int gossipInterval = 10;
/** Time between cleanups in ms. Default is 10 seconds. */ /** Time between cleanups in ms. Default is 10 seconds. */
private int cleanupInterval = 10000; private int cleanupInterval = 5000;
/** the minimum samples needed before reporting a result */ /** the minimum samples needed before reporting a result */
private int minimumSamples = 1; private int minimumSamples = 5;
/** the number of samples to keep per host */ /** the number of samples to keep per host */
private int windowSize = 5000; private int windowSize = 5000;
/** the threshold for the detector */ /** the threshold for the detector */
//private double convictThreshold = 2.606201185901408; private double convictThreshold = 10;
private double convictThreshold = 2.606201185901408;
private String distribution = "exponential"; private String distribution = "normal";
private String activeGossipClass = "org.apache.gossip.manager.SimpleActiveGossipper"; private String activeGossipClass = "org.apache.gossip.manager.SimpleActiveGossipper";

View File

@ -43,7 +43,7 @@ public class LocalMember extends Member {
public LocalMember(String clusterName, URI uri, String id, public LocalMember(String clusterName, URI uri, String id,
long heartbeat, Map<String,String> properties, int windowSize, int minSamples, String distribution) { long heartbeat, Map<String,String> properties, int windowSize, int minSamples, String distribution) {
super(clusterName, uri, id, heartbeat, properties ); super(clusterName, uri, id, heartbeat, properties );
detector = new FailureDetector(this, minSamples, windowSize, distribution); detector = new FailureDetector(minSamples, windowSize, distribution);
} }
protected LocalMember(){ protected LocalMember(){

View File

@ -21,20 +21,17 @@ import org.apache.commons.math.MathException;
import org.apache.commons.math.distribution.ExponentialDistributionImpl; import org.apache.commons.math.distribution.ExponentialDistributionImpl;
import org.apache.commons.math.distribution.NormalDistributionImpl; import org.apache.commons.math.distribution.NormalDistributionImpl;
import org.apache.commons.math.stat.descriptive.DescriptiveStatistics; import org.apache.commons.math.stat.descriptive.DescriptiveStatistics;
import org.apache.gossip.LocalMember;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
public class FailureDetector { public class FailureDetector {
private static final Logger LOGGER = Logger.getLogger(FailureDetector.class); public static final Logger LOGGER = Logger.getLogger(FailureDetector.class);
private final DescriptiveStatistics descriptiveStatistics; private final DescriptiveStatistics descriptiveStatistics;
private final long minimumSamples; private final long minimumSamples;
private volatile long latestHeartbeatMs = -1; private volatile long latestHeartbeatMs = -1;
private final LocalMember parent;
private final String distribution; private final String distribution;
public FailureDetector(LocalMember parent, long minimumSamples, int windowSize, String distribution){ public FailureDetector(long minimumSamples, int windowSize, String distribution) {
this.parent = parent;
descriptiveStatistics = new DescriptiveStatistics(windowSize); descriptiveStatistics = new DescriptiveStatistics(windowSize);
this.minimumSamples = minimumSamples; this.minimumSamples = minimumSamples;
this.distribution = distribution; this.distribution = distribution;
@ -43,50 +40,41 @@ public class FailureDetector {
/** /**
* Updates the statistics based on the delta between the last * Updates the statistics based on the delta between the last
* heartbeat and supplied time * heartbeat and supplied time
*
* @param now the time of the heartbeat in milliseconds * @param now the time of the heartbeat in milliseconds
*/ */
public void recordHeartbeat(long now){ public synchronized void recordHeartbeat(long now) {
if (now < latestHeartbeatMs) if (now <= latestHeartbeatMs) {
return;
if (now - latestHeartbeatMs == 0){
return; return;
} }
synchronized (descriptiveStatistics) {
if (latestHeartbeatMs != -1) { if (latestHeartbeatMs != -1) {
descriptiveStatistics.addValue(now - latestHeartbeatMs); descriptiveStatistics.addValue(now - latestHeartbeatMs);
} else { }
latestHeartbeatMs = now; latestHeartbeatMs = now;
} }
}
}
public Double computePhiMeasure(long now) { public synchronized Double computePhiMeasure(long now) {
if (latestHeartbeatMs == -1 || descriptiveStatistics.getN() < minimumSamples) { if (latestHeartbeatMs == -1 || descriptiveStatistics.getN() < minimumSamples) {
LOGGER.debug(
String.format( "%s latests %s samples %s minumumSamples %s", parent.getId(),
latestHeartbeatMs, descriptiveStatistics.getN(), minimumSamples));
return null; return null;
} }
synchronized (descriptiveStatistics) {
long delta = now - latestHeartbeatMs; long delta = now - latestHeartbeatMs;
try { try {
double probability = 0.0; double probability;
if (distribution.equals("normal")) { if (distribution.equals("normal")) {
double variance = descriptiveStatistics.getVariance(); double standardDeviation = descriptiveStatistics.getStandardDeviation();
probability = 1.0d - new NormalDistributionImpl(descriptiveStatistics.getMean(), standardDeviation = standardDeviation < 0.1 ? 0.1 : standardDeviation;
variance == 0 ? 0.1 : variance).cumulativeProbability(delta); probability = new NormalDistributionImpl(descriptiveStatistics.getMean(), standardDeviation).cumulativeProbability(delta);
} else { } else {
probability = 1.0d - new ExponentialDistributionImpl(descriptiveStatistics.getMean()).cumulativeProbability(delta); probability = new ExponentialDistributionImpl(descriptiveStatistics.getMean()).cumulativeProbability(delta);
} }
return -1.0d * Math.log10(probability); final double eps = 1e-12;
if (1 - probability < eps) {
probability = 1.0;
}
return -1.0d * Math.log10(1.0d - probability);
} catch (MathException | IllegalArgumentException e) { } catch (MathException | IllegalArgumentException e) {
StringBuilder sb = new StringBuilder(); LOGGER.debug(e);
for ( double d: descriptiveStatistics.getSortedValues()){ return null;
sb.append(d + " ");
}
LOGGER.debug(e.getMessage() +" "+ sb.toString() +" "+ descriptiveStatistics);
throw new IllegalArgumentException(e);
}
} }
} }
} }

View File

@ -34,9 +34,8 @@ public class StandAloneDatacenterAndRack {
public static void main (String [] args) throws UnknownHostException, InterruptedException { public static void main (String [] args) throws UnknownHostException, InterruptedException {
GossipSettings s = new GossipSettings(); GossipSettings s = new GossipSettings();
s.setWindowSize(10); s.setWindowSize(1000);
s.setConvictThreshold(1.0); s.setGossipInterval(100);
s.setGossipInterval(1000);
s.setActiveGossipClass(DatacenterRackAwareActiveGossiper.class.getName()); s.setActiveGossipClass(DatacenterRackAwareActiveGossiper.class.getName());
Map<String, String> gossipProps = new HashMap<>(); Map<String, String> gossipProps = new HashMap<>();
gossipProps.put("sameRackGossipIntervalMs", "2000"); gossipProps.put("sameRackGossipIntervalMs", "2000");

View File

@ -28,9 +28,8 @@ import org.apache.gossip.manager.GossipManagerBuilder;
public class StandAloneNode { public class StandAloneNode {
public static void main (String [] args) throws UnknownHostException, InterruptedException{ public static void main (String [] args) throws UnknownHostException, InterruptedException{
GossipSettings s = new GossipSettings(); GossipSettings s = new GossipSettings();
s.setWindowSize(10); s.setWindowSize(1000);
s.setConvictThreshold(1.0); s.setGossipInterval(100);
s.setGossipInterval(10);
GossipManager gossipService = GossipManagerBuilder.newBuilder() GossipManager gossipService = GossipManagerBuilder.newBuilder()
.cluster("mycluster") .cluster("mycluster")
.uri(URI.create(args[0])) .uri(URI.create(args[0]))

View File

@ -33,9 +33,8 @@ import org.apache.gossip.model.SharedDataMessage;
public class StandAloneNodeCrdtOrSet { public class StandAloneNodeCrdtOrSet {
public static void main (String [] args) throws InterruptedException, IOException{ public static void main (String [] args) throws InterruptedException, IOException{
GossipSettings s = new GossipSettings(); GossipSettings s = new GossipSettings();
s.setWindowSize(10); s.setWindowSize(1000);
s.setConvictThreshold(1.0); s.setGossipInterval(100);
s.setGossipInterval(10);
GossipManager gossipService = GossipManagerBuilder.newBuilder() GossipManager gossipService = GossipManagerBuilder.newBuilder()
.cluster("mycluster") .cluster("mycluster")
.uri(URI.create(args[0])) .uri(URI.create(args[0]))

View File

@ -51,7 +51,6 @@ public abstract class GossipManager {
private final LocalMember me; private final LocalMember me;
private final GossipSettings settings; private final GossipSettings settings;
private final AtomicBoolean gossipServiceRunning; private final AtomicBoolean gossipServiceRunning;
private final GossipListener listener;
private AbstractActiveGossiper activeGossipThread; private AbstractActiveGossiper activeGossipThread;
private PassiveGossipThread passiveGossipThread; private PassiveGossipThread passiveGossipThread;
private ExecutorService gossipThreadExecutor; private ExecutorService gossipThreadExecutor;
@ -92,7 +91,6 @@ public abstract class GossipManager {
} }
gossipThreadExecutor = Executors.newCachedThreadPool(); gossipThreadExecutor = Executors.newCachedThreadPool();
gossipServiceRunning = new AtomicBoolean(true); gossipServiceRunning = new AtomicBoolean(true);
this.listener = listener;
this.scheduledServiced = Executors.newScheduledThreadPool(1); this.scheduledServiced = Executors.newScheduledThreadPool(1);
this.registry = registry; this.registry = registry;
this.ringState = new RingStatePersister(this); this.ringState = new RingStatePersister(this);

View File

@ -62,27 +62,21 @@ public class GossipMemberStateRefresher implements Runnable {
boolean userDown = processOptimisticShutdown(entry); boolean userDown = processOptimisticShutdown(entry);
if (userDown) if (userDown)
continue; continue;
try {
Double phiMeasure = entry.getKey().detect(clock.nanoTime()); Double phiMeasure = entry.getKey().detect(clock.nanoTime());
GossipState requiredState;
if (phiMeasure != null) { if (phiMeasure != null) {
GossipState requiredState = calcRequiredState(phiMeasure); requiredState = calcRequiredState(phiMeasure);
} else {
requiredState = calcRequiredStateCleanupInterval(entry.getKey(), entry.getValue());
}
if (entry.getValue() != requiredState) { if (entry.getValue() != requiredState) {
members.put(entry.getKey(), requiredState); members.put(entry.getKey(), requiredState);
listener.gossipEvent(entry.getKey(), requiredState); listener.gossipEvent(entry.getKey(), requiredState);
} }
} }
} catch (IllegalArgumentException ex) {
//0.0 returns throws exception computing the mean.
long now = clock.nanoTime();
long nowInMillis = TimeUnit.MILLISECONDS.convert(now, TimeUnit.NANOSECONDS);
if (nowInMillis - settings.getCleanupInterval() > entry.getKey().getHeartbeat() && entry.getValue() == GossipState.UP) {
LOGGER.warn("Marking down");
members.put(entry.getKey(), GossipState.DOWN);
listener.gossipEvent(entry.getKey(), GossipState.DOWN);
}
} //end catch
} // end for
} }
public GossipState calcRequiredState(Double phiMeasure) { public GossipState calcRequiredState(Double phiMeasure) {
@ -92,6 +86,16 @@ public class GossipMemberStateRefresher implements Runnable {
return GossipState.UP; return GossipState.UP;
} }
public GossipState calcRequiredStateCleanupInterval(LocalMember member, GossipState state) {
long now = clock.nanoTime();
long nowInMillis = TimeUnit.MILLISECONDS.convert(now, TimeUnit.NANOSECONDS);
if (nowInMillis - settings.getCleanupInterval() > member.getHeartbeat()) {
return GossipState.DOWN;
} else {
return state;
}
}
/** /**
* If we have a special key the per-node data that means that the node has sent us * If we have a special key the per-node data that means that the node has sent us
* a pre-emptive shutdown message. We process this so node is seen down sooner * a pre-emptive shutdown message. We process this so node is seen down sooner

View File

@ -24,7 +24,6 @@ import org.apache.gossip.model.Base;
import java.util.List; import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Predicate;
public class MessageInvokerCombiner implements MessageInvoker { public class MessageInvokerCombiner implements MessageInvoker {
private final List<MessageInvoker> invokers = new CopyOnWriteArrayList<>(); private final List<MessageInvoker> invokers = new CopyOnWriteArrayList<>();

View File

@ -63,7 +63,7 @@ public class IdAndPropertyTest extends AbstractIntegrationBase {
y.put("datacenter", "dc2"); y.put("datacenter", "dc2");
y.put("rack", "rack2"); y.put("rack", "rack2");
GossipManager gossipService2 = GossipManagerBuilder.newBuilder().cluster("a") GossipManager gossipService2 = GossipManagerBuilder.newBuilder().cluster("a")
.uri( new URI("udp://" + "127.0.0.1" + ":" + (29000 + 1))) .uri( new URI("udp://" + "127.0.0.1" + ":" + (29000 + 10)))
.id("1") .id("1")
.properties(y) .properties(y)
.gossipMembers(Arrays.asList(new RemoteMember("a", .gossipMembers(Arrays.asList(new RemoteMember("a",

View File

@ -47,7 +47,7 @@ public class ShutdownDeadtimeTest {
@Test @Test
public void DeadNodesDoNotComeAliveAgain() public void DeadNodesDoNotComeAliveAgain()
throws InterruptedException, UnknownHostException, URISyntaxException { throws InterruptedException, UnknownHostException, URISyntaxException {
GossipSettings settings = new GossipSettings(100, 10000, 1000, 1, 2.0, "exponential"); GossipSettings settings = new GossipSettings(100, 10000, 1000, 1, 10.0, "normal");
settings.setPersistRingState(false); settings.setPersistRingState(false);
settings.setPersistDataState(false); settings.setPersistDataState(false);
String cluster = UUID.randomUUID().toString(); String cluster = UUID.randomUUID().toString();

View File

@ -17,59 +17,97 @@
*/ */
package org.apache.gossip.accrual; package org.apache.gossip.accrual;
import java.net.URI; import org.apache.gossip.GossipSettings;
import org.apache.gossip.LocalMember;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Ignore;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.platform.runner.JUnitPlatform; import org.junit.platform.runner.JUnitPlatform;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
@RunWith(JUnitPlatform.class) @RunWith(JUnitPlatform.class)
public class FailureDetectorTest { public class FailureDetectorTest {
@Test @FunctionalInterface
public void aNormalTest(){ interface TriConsumer<A, B, C> {
int samples = 1; void accept(A a, B b, C c);
int windowSize = 1000; }
LocalMember member = new LocalMember("", URI.create("udp://127.0.0.1:1000"),
"", 0L, null, windowSize, samples, "normal"); static final Double failureThreshold = new GossipSettings().getConvictThreshold();
member.recordHeartbeat(5);
member.recordHeartbeat(10); List<Integer> generateTimeList(int begin, int end, int step) {
Assert.assertEquals(new Double(0.3010299956639812), member.detect(10)); List<Integer> values = new ArrayList<>();
Random rand = new Random();
for (int i = begin; i < end; i += step) {
int delta = (int) ((rand.nextDouble() - 0.5) * step / 2);
values.add(i + delta);
}
return values;
} }
@Test @Test
public void aTest(){ public void normalDistribution() {
int samples = 1; FailureDetector fd = new FailureDetector(1, 1000, "normal");
int windowSize = 1000; List<Integer> values = generateTimeList(0, 10000, 100);
LocalMember member = new LocalMember("", URI.create("udp://127.0.0.1:1000"), Double deltaSum = 0.0;
"", 0L, null, windowSize, samples, "exponential"); Integer deltaCount = 0;
member.recordHeartbeat(5); for (int i = 0; i < values.size() - 1; i++) {
member.recordHeartbeat(10); fd.recordHeartbeat(values.get(i));
Assert.assertEquals(new Double(0.4342944819032518), member.detect(10)); if (i != 0) {
Assert.assertEquals(new Double(0.5211533782839021), member.detect(11)); deltaSum += values.get(i) - values.get(i - 1);
Assert.assertEquals(new Double(1.3028834457097553), member.detect(20)); deltaCount++;
Assert.assertEquals(new Double(3.9), member.detect(50), .01); }
Assert.assertEquals(new Double(8.25), member.detect(100), .01); }
Assert.assertEquals(new Double(12.6), member.detect(150), .01); Integer lastRecorded = values.get(values.size() - 2);
Assert.assertEquals(new Double(14.77), member.detect(175), .01);
Assert.assertEquals(new Double(Double.POSITIVE_INFINITY), member.detect(500), .01); //after "step" delay we need to be considered UP
member.recordHeartbeat(4); Assert.assertTrue(fd.computePhiMeasure(values.get(values.size() - 1)) < failureThreshold);
Assert.assertEquals(new Double(12.6), member.detect(150), .01);
//if we check phi-measure after mean delay we get value for 0.5 probability(normal distribution)
Assert.assertEquals(fd.computePhiMeasure(lastRecorded + Math.round(deltaSum / deltaCount)), -Math.log10(0.5), 0.1);
} }
@Ignore @Test
public void sameHeartbeatTest(){ public void checkMinimumSamples() {
int samples = 1; Integer minimumSamples = 5;
int windowSize = 1000; FailureDetector fd = new FailureDetector(minimumSamples, 1000, "normal");
LocalMember member = new LocalMember("", URI.create("udp://127.0.0.1:1000"), for (int i = 0; i < minimumSamples + 1; i++) { // +1 because we don't place first heartbeat into structure
"", 0L, null, windowSize, samples, "exponential"); Assert.assertNull(fd.computePhiMeasure(100));
member.recordHeartbeat(5); fd.recordHeartbeat(i);
member.recordHeartbeat(5); }
member.recordHeartbeat(5); Assert.assertNotNull(fd.computePhiMeasure(100));
Assert.assertEquals(new Double(0.4342944819032518), member.detect(10));
} }
@Test
public void checkMonotonicDead() {
final FailureDetector fd = new FailureDetector(5, 1000, "normal");
TriConsumer<Integer, Integer, Integer> checkAlive = (begin, end, step) -> {
List<Integer> times = generateTimeList(begin, end, step);
for (int i = 0; i < times.size(); i++) {
Double current = fd.computePhiMeasure(times.get(i));
if (current != null) {
Assert.assertTrue(current < failureThreshold);
}
fd.recordHeartbeat(times.get(i));
}
};
TriConsumer<Integer, Integer, Integer> checkDeadMonotonic = (begin, end, step) -> {
List<Integer> times = generateTimeList(begin, end, step);
Double prev = null;
for (int i = 0; i < times.size(); i++) {
Double current = fd.computePhiMeasure(times.get(i));
if (current != null && prev != null) {
Assert.assertTrue(current >= prev);
}
prev = current;
}
};
checkAlive.accept(0, 20000, 100);
checkDeadMonotonic.accept(20000, 20500, 5);
}
} }