From 4aafc3ba3bc452d45c16557ff616956e54350462 Mon Sep 17 00:00:00 2001 From: Maxim Rusak Date: Sun, 19 Mar 2017 21:20:32 +0300 Subject: [PATCH] GOSSIP-74 Phi calculation or sampling is wrong in FailureDetector --- .../org/apache/gossip/GossipSettings.java | 9 +- .../java/org/apache/gossip/LocalMember.java | 2 +- .../gossip/accrual/FailureDetector.java | 72 +++++----- .../examples/StandAloneDatacenterAndRack.java | 5 +- .../gossip/examples/StandAloneNode.java | 5 +- .../examples/StandAloneNodeCrdtOrSet.java | 5 +- .../apache/gossip/manager/GossipManager.java | 2 - .../manager/GossipMemberStateRefresher.java | 44 ++++--- .../handlers/MessageInvokerCombiner.java | 1 - .../org/apache/gossip/IdAndPropertyTest.java | 2 +- .../apache/gossip/ShutdownDeadtimeTest.java | 2 +- .../gossip/accrual/FailureDetectorTest.java | 124 ++++++++++++------ 12 files changed, 148 insertions(+), 125 deletions(-) diff --git a/src/main/java/org/apache/gossip/GossipSettings.java b/src/main/java/org/apache/gossip/GossipSettings.java index bcea75c..6b2bf8b 100644 --- a/src/main/java/org/apache/gossip/GossipSettings.java +++ b/src/main/java/org/apache/gossip/GossipSettings.java @@ -30,19 +30,18 @@ public class GossipSettings { private int gossipInterval = 10; /** Time between cleanups in ms. Default is 10 seconds. */ - private int cleanupInterval = 10000; + private int cleanupInterval = 5000; /** the minimum samples needed before reporting a result */ - private int minimumSamples = 1; + private int minimumSamples = 5; /** the number of samples to keep per host */ private int windowSize = 5000; /** the threshold for the detector */ - //private double convictThreshold = 2.606201185901408; - private double convictThreshold = 2.606201185901408; + private double convictThreshold = 10; - private String distribution = "exponential"; + private String distribution = "normal"; private String activeGossipClass = "org.apache.gossip.manager.SimpleActiveGossipper"; diff --git a/src/main/java/org/apache/gossip/LocalMember.java b/src/main/java/org/apache/gossip/LocalMember.java index db6e3f7..450bce5 100644 --- a/src/main/java/org/apache/gossip/LocalMember.java +++ b/src/main/java/org/apache/gossip/LocalMember.java @@ -43,7 +43,7 @@ public class LocalMember extends Member { public LocalMember(String clusterName, URI uri, String id, long heartbeat, Map properties, int windowSize, int minSamples, String distribution) { super(clusterName, uri, id, heartbeat, properties ); - detector = new FailureDetector(this, minSamples, windowSize, distribution); + detector = new FailureDetector(minSamples, windowSize, distribution); } protected LocalMember(){ diff --git a/src/main/java/org/apache/gossip/accrual/FailureDetector.java b/src/main/java/org/apache/gossip/accrual/FailureDetector.java index 22e73db..5abd5c6 100644 --- a/src/main/java/org/apache/gossip/accrual/FailureDetector.java +++ b/src/main/java/org/apache/gossip/accrual/FailureDetector.java @@ -21,72 +21,60 @@ import org.apache.commons.math.MathException; import org.apache.commons.math.distribution.ExponentialDistributionImpl; import org.apache.commons.math.distribution.NormalDistributionImpl; import org.apache.commons.math.stat.descriptive.DescriptiveStatistics; -import org.apache.gossip.LocalMember; import org.apache.log4j.Logger; public class FailureDetector { - private static final Logger LOGGER = Logger.getLogger(FailureDetector.class); + public static final Logger LOGGER = Logger.getLogger(FailureDetector.class); private final DescriptiveStatistics descriptiveStatistics; private final long minimumSamples; private volatile long latestHeartbeatMs = -1; - private final LocalMember parent; private final String distribution; - - public FailureDetector(LocalMember parent, long minimumSamples, int windowSize, String distribution){ - this.parent = parent; + + public FailureDetector(long minimumSamples, int windowSize, String distribution) { descriptiveStatistics = new DescriptiveStatistics(windowSize); this.minimumSamples = minimumSamples; this.distribution = distribution; } - + /** - * Updates the statistics based on the delta between the last + * Updates the statistics based on the delta between the last * heartbeat and supplied time + * * @param now the time of the heartbeat in milliseconds */ - public void recordHeartbeat(long now){ - if (now < latestHeartbeatMs) - return; - if (now - latestHeartbeatMs == 0){ + public synchronized void recordHeartbeat(long now) { + if (now <= latestHeartbeatMs) { return; } - synchronized (descriptiveStatistics) { - if (latestHeartbeatMs != -1){ - descriptiveStatistics.addValue(now - latestHeartbeatMs); - } else { - latestHeartbeatMs = now; - } + if (latestHeartbeatMs != -1) { + descriptiveStatistics.addValue(now - latestHeartbeatMs); } + latestHeartbeatMs = now; } - - public Double computePhiMeasure(long now) { + + public synchronized Double computePhiMeasure(long now) { if (latestHeartbeatMs == -1 || descriptiveStatistics.getN() < minimumSamples) { - LOGGER.debug( - String.format( "%s latests %s samples %s minumumSamples %s", parent.getId(), - latestHeartbeatMs, descriptiveStatistics.getN(), minimumSamples)); return null; } - synchronized (descriptiveStatistics) { - long delta = now - latestHeartbeatMs; - try { - double probability = 0.0; - if (distribution.equals("normal")){ - double variance = descriptiveStatistics.getVariance(); - probability = 1.0d - new NormalDistributionImpl(descriptiveStatistics.getMean(), - variance == 0 ? 0.1 : variance).cumulativeProbability(delta); - } else { - probability = 1.0d - new ExponentialDistributionImpl(descriptiveStatistics.getMean()).cumulativeProbability(delta); - } - return -1.0d * Math.log10(probability); - } catch (MathException | IllegalArgumentException e) { - StringBuilder sb = new StringBuilder(); - for ( double d: descriptiveStatistics.getSortedValues()){ - sb.append(d + " "); - } - LOGGER.debug(e.getMessage() +" "+ sb.toString() +" "+ descriptiveStatistics); - throw new IllegalArgumentException(e); + long delta = now - latestHeartbeatMs; + try { + double probability; + if (distribution.equals("normal")) { + double standardDeviation = descriptiveStatistics.getStandardDeviation(); + standardDeviation = standardDeviation < 0.1 ? 0.1 : standardDeviation; + probability = new NormalDistributionImpl(descriptiveStatistics.getMean(), standardDeviation).cumulativeProbability(delta); + } else { + probability = new ExponentialDistributionImpl(descriptiveStatistics.getMean()).cumulativeProbability(delta); } + final double eps = 1e-12; + if (1 - probability < eps) { + probability = 1.0; + } + return -1.0d * Math.log10(1.0d - probability); + } catch (MathException | IllegalArgumentException e) { + LOGGER.debug(e); + return null; } } } diff --git a/src/main/java/org/apache/gossip/examples/StandAloneDatacenterAndRack.java b/src/main/java/org/apache/gossip/examples/StandAloneDatacenterAndRack.java index 357e316..497894c 100644 --- a/src/main/java/org/apache/gossip/examples/StandAloneDatacenterAndRack.java +++ b/src/main/java/org/apache/gossip/examples/StandAloneDatacenterAndRack.java @@ -34,9 +34,8 @@ public class StandAloneDatacenterAndRack { public static void main (String [] args) throws UnknownHostException, InterruptedException { GossipSettings s = new GossipSettings(); - s.setWindowSize(10); - s.setConvictThreshold(1.0); - s.setGossipInterval(1000); + s.setWindowSize(1000); + s.setGossipInterval(100); s.setActiveGossipClass(DatacenterRackAwareActiveGossiper.class.getName()); Map gossipProps = new HashMap<>(); gossipProps.put("sameRackGossipIntervalMs", "2000"); diff --git a/src/main/java/org/apache/gossip/examples/StandAloneNode.java b/src/main/java/org/apache/gossip/examples/StandAloneNode.java index b38865e..93421b1 100644 --- a/src/main/java/org/apache/gossip/examples/StandAloneNode.java +++ b/src/main/java/org/apache/gossip/examples/StandAloneNode.java @@ -28,9 +28,8 @@ import org.apache.gossip.manager.GossipManagerBuilder; public class StandAloneNode { public static void main (String [] args) throws UnknownHostException, InterruptedException{ GossipSettings s = new GossipSettings(); - s.setWindowSize(10); - s.setConvictThreshold(1.0); - s.setGossipInterval(10); + s.setWindowSize(1000); + s.setGossipInterval(100); GossipManager gossipService = GossipManagerBuilder.newBuilder() .cluster("mycluster") .uri(URI.create(args[0])) diff --git a/src/main/java/org/apache/gossip/examples/StandAloneNodeCrdtOrSet.java b/src/main/java/org/apache/gossip/examples/StandAloneNodeCrdtOrSet.java index 0c5c0d5..d78cf5e 100644 --- a/src/main/java/org/apache/gossip/examples/StandAloneNodeCrdtOrSet.java +++ b/src/main/java/org/apache/gossip/examples/StandAloneNodeCrdtOrSet.java @@ -33,9 +33,8 @@ import org.apache.gossip.model.SharedDataMessage; public class StandAloneNodeCrdtOrSet { public static void main (String [] args) throws InterruptedException, IOException{ GossipSettings s = new GossipSettings(); - s.setWindowSize(10); - s.setConvictThreshold(1.0); - s.setGossipInterval(10); + s.setWindowSize(1000); + s.setGossipInterval(100); GossipManager gossipService = GossipManagerBuilder.newBuilder() .cluster("mycluster") .uri(URI.create(args[0])) diff --git a/src/main/java/org/apache/gossip/manager/GossipManager.java b/src/main/java/org/apache/gossip/manager/GossipManager.java index d2f5d20..c2b50ae 100644 --- a/src/main/java/org/apache/gossip/manager/GossipManager.java +++ b/src/main/java/org/apache/gossip/manager/GossipManager.java @@ -51,7 +51,6 @@ public abstract class GossipManager { private final LocalMember me; private final GossipSettings settings; private final AtomicBoolean gossipServiceRunning; - private final GossipListener listener; private AbstractActiveGossiper activeGossipThread; private PassiveGossipThread passiveGossipThread; private ExecutorService gossipThreadExecutor; @@ -92,7 +91,6 @@ public abstract class GossipManager { } gossipThreadExecutor = Executors.newCachedThreadPool(); gossipServiceRunning = new AtomicBoolean(true); - this.listener = listener; this.scheduledServiced = Executors.newScheduledThreadPool(1); this.registry = registry; this.ringState = new RingStatePersister(this); diff --git a/src/main/java/org/apache/gossip/manager/GossipMemberStateRefresher.java b/src/main/java/org/apache/gossip/manager/GossipMemberStateRefresher.java index ad2e055..1836309 100644 --- a/src/main/java/org/apache/gossip/manager/GossipMemberStateRefresher.java +++ b/src/main/java/org/apache/gossip/manager/GossipMemberStateRefresher.java @@ -62,27 +62,21 @@ public class GossipMemberStateRefresher implements Runnable { boolean userDown = processOptimisticShutdown(entry); if (userDown) continue; - try { - Double phiMeasure = entry.getKey().detect(clock.nanoTime()); - if (phiMeasure != null) { - GossipState requiredState = calcRequiredState(phiMeasure); - if (entry.getValue() != requiredState) { - members.put(entry.getKey(), requiredState); - listener.gossipEvent(entry.getKey(), requiredState); - } - } - } catch (IllegalArgumentException ex) { - //0.0 returns throws exception computing the mean. - long now = clock.nanoTime(); - long nowInMillis = TimeUnit.MILLISECONDS.convert(now, TimeUnit.NANOSECONDS); - if (nowInMillis - settings.getCleanupInterval() > entry.getKey().getHeartbeat() && entry.getValue() == GossipState.UP) { - LOGGER.warn("Marking down"); - members.put(entry.getKey(), GossipState.DOWN); - listener.gossipEvent(entry.getKey(), GossipState.DOWN); - } - } //end catch - } // end for + Double phiMeasure = entry.getKey().detect(clock.nanoTime()); + GossipState requiredState; + + if (phiMeasure != null) { + requiredState = calcRequiredState(phiMeasure); + } else { + requiredState = calcRequiredStateCleanupInterval(entry.getKey(), entry.getValue()); + } + + if (entry.getValue() != requiredState) { + members.put(entry.getKey(), requiredState); + listener.gossipEvent(entry.getKey(), requiredState); + } + } } public GossipState calcRequiredState(Double phiMeasure) { @@ -92,6 +86,16 @@ public class GossipMemberStateRefresher implements Runnable { return GossipState.UP; } + public GossipState calcRequiredStateCleanupInterval(LocalMember member, GossipState state) { + long now = clock.nanoTime(); + long nowInMillis = TimeUnit.MILLISECONDS.convert(now, TimeUnit.NANOSECONDS); + if (nowInMillis - settings.getCleanupInterval() > member.getHeartbeat()) { + return GossipState.DOWN; + } else { + return state; + } + } + /** * If we have a special key the per-node data that means that the node has sent us * a pre-emptive shutdown message. We process this so node is seen down sooner diff --git a/src/main/java/org/apache/gossip/manager/handlers/MessageInvokerCombiner.java b/src/main/java/org/apache/gossip/manager/handlers/MessageInvokerCombiner.java index cc6ef52..5faf6a5 100644 --- a/src/main/java/org/apache/gossip/manager/handlers/MessageInvokerCombiner.java +++ b/src/main/java/org/apache/gossip/manager/handlers/MessageInvokerCombiner.java @@ -24,7 +24,6 @@ import org.apache.gossip.model.Base; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.function.Predicate; public class MessageInvokerCombiner implements MessageInvoker { private final List invokers = new CopyOnWriteArrayList<>(); diff --git a/src/test/java/org/apache/gossip/IdAndPropertyTest.java b/src/test/java/org/apache/gossip/IdAndPropertyTest.java index 7c02d2d..7f550de 100644 --- a/src/test/java/org/apache/gossip/IdAndPropertyTest.java +++ b/src/test/java/org/apache/gossip/IdAndPropertyTest.java @@ -63,7 +63,7 @@ public class IdAndPropertyTest extends AbstractIntegrationBase { y.put("datacenter", "dc2"); y.put("rack", "rack2"); GossipManager gossipService2 = GossipManagerBuilder.newBuilder().cluster("a") - .uri( new URI("udp://" + "127.0.0.1" + ":" + (29000 + 1))) + .uri( new URI("udp://" + "127.0.0.1" + ":" + (29000 + 10))) .id("1") .properties(y) .gossipMembers(Arrays.asList(new RemoteMember("a", diff --git a/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java b/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java index a91480e..54005c3 100644 --- a/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java +++ b/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java @@ -47,7 +47,7 @@ public class ShutdownDeadtimeTest { @Test public void DeadNodesDoNotComeAliveAgain() throws InterruptedException, UnknownHostException, URISyntaxException { - GossipSettings settings = new GossipSettings(100, 10000, 1000, 1, 2.0, "exponential"); + GossipSettings settings = new GossipSettings(100, 10000, 1000, 1, 10.0, "normal"); settings.setPersistRingState(false); settings.setPersistDataState(false); String cluster = UUID.randomUUID().toString(); diff --git a/src/test/java/org/apache/gossip/accrual/FailureDetectorTest.java b/src/test/java/org/apache/gossip/accrual/FailureDetectorTest.java index b00c0c3..3434c17 100644 --- a/src/test/java/org/apache/gossip/accrual/FailureDetectorTest.java +++ b/src/test/java/org/apache/gossip/accrual/FailureDetectorTest.java @@ -17,59 +17,97 @@ */ package org.apache.gossip.accrual; -import java.net.URI; - -import org.apache.gossip.LocalMember; +import org.apache.gossip.GossipSettings; import org.junit.Assert; -import org.junit.Ignore; import org.junit.jupiter.api.Test; import org.junit.platform.runner.JUnitPlatform; import org.junit.runner.RunWith; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + @RunWith(JUnitPlatform.class) public class FailureDetectorTest { - - @Test - public void aNormalTest(){ - int samples = 1; - int windowSize = 1000; - LocalMember member = new LocalMember("", URI.create("udp://127.0.0.1:1000"), - "", 0L, null, windowSize, samples, "normal"); - member.recordHeartbeat(5); - member.recordHeartbeat(10); - Assert.assertEquals(new Double(0.3010299956639812), member.detect(10)); + + @FunctionalInterface + interface TriConsumer { + void accept(A a, B b, C c); + } + + static final Double failureThreshold = new GossipSettings().getConvictThreshold(); + + List generateTimeList(int begin, int end, int step) { + List values = new ArrayList<>(); + Random rand = new Random(); + for (int i = begin; i < end; i += step) { + int delta = (int) ((rand.nextDouble() - 0.5) * step / 2); + + values.add(i + delta); + } + return values; } @Test - public void aTest(){ - int samples = 1; - int windowSize = 1000; - LocalMember member = new LocalMember("", URI.create("udp://127.0.0.1:1000"), - "", 0L, null, windowSize, samples, "exponential"); - member.recordHeartbeat(5); - member.recordHeartbeat(10); - Assert.assertEquals(new Double(0.4342944819032518), member.detect(10)); - Assert.assertEquals(new Double(0.5211533782839021), member.detect(11)); - Assert.assertEquals(new Double(1.3028834457097553), member.detect(20)); - Assert.assertEquals(new Double(3.9), member.detect(50), .01); - Assert.assertEquals(new Double(8.25), member.detect(100), .01); - Assert.assertEquals(new Double(12.6), member.detect(150), .01); - Assert.assertEquals(new Double(14.77), member.detect(175), .01); - Assert.assertEquals(new Double(Double.POSITIVE_INFINITY), member.detect(500), .01); - member.recordHeartbeat(4); - Assert.assertEquals(new Double(12.6), member.detect(150), .01); + public void normalDistribution() { + FailureDetector fd = new FailureDetector(1, 1000, "normal"); + List values = generateTimeList(0, 10000, 100); + Double deltaSum = 0.0; + Integer deltaCount = 0; + for (int i = 0; i < values.size() - 1; i++) { + fd.recordHeartbeat(values.get(i)); + if (i != 0) { + deltaSum += values.get(i) - values.get(i - 1); + deltaCount++; + } + } + Integer lastRecorded = values.get(values.size() - 2); + + //after "step" delay we need to be considered UP + Assert.assertTrue(fd.computePhiMeasure(values.get(values.size() - 1)) < failureThreshold); + + //if we check phi-measure after mean delay we get value for 0.5 probability(normal distribution) + Assert.assertEquals(fd.computePhiMeasure(lastRecorded + Math.round(deltaSum / deltaCount)), -Math.log10(0.5), 0.1); } - - @Ignore - public void sameHeartbeatTest(){ - int samples = 1; - int windowSize = 1000; - LocalMember member = new LocalMember("", URI.create("udp://127.0.0.1:1000"), - "", 0L, null, windowSize, samples, "exponential"); - member.recordHeartbeat(5); - member.recordHeartbeat(5); - member.recordHeartbeat(5); - Assert.assertEquals(new Double(0.4342944819032518), member.detect(10)); + + @Test + public void checkMinimumSamples() { + Integer minimumSamples = 5; + FailureDetector fd = new FailureDetector(minimumSamples, 1000, "normal"); + for (int i = 0; i < minimumSamples + 1; i++) { // +1 because we don't place first heartbeat into structure + Assert.assertNull(fd.computePhiMeasure(100)); + fd.recordHeartbeat(i); + } + Assert.assertNotNull(fd.computePhiMeasure(100)); + } + + @Test + public void checkMonotonicDead() { + final FailureDetector fd = new FailureDetector(5, 1000, "normal"); + TriConsumer checkAlive = (begin, end, step) -> { + List times = generateTimeList(begin, end, step); + for (int i = 0; i < times.size(); i++) { + Double current = fd.computePhiMeasure(times.get(i)); + if (current != null) { + Assert.assertTrue(current < failureThreshold); + } + fd.recordHeartbeat(times.get(i)); + } + }; + + TriConsumer checkDeadMonotonic = (begin, end, step) -> { + List times = generateTimeList(begin, end, step); + Double prev = null; + for (int i = 0; i < times.size(); i++) { + Double current = fd.computePhiMeasure(times.get(i)); + if (current != null && prev != null) { + Assert.assertTrue(current >= prev); + } + prev = current; + } + }; + + checkAlive.accept(0, 20000, 100); + checkDeadMonotonic.accept(20000, 20500, 5); } - }