diff --git a/pom.xml b/pom.xml index ad78c91..71db8ed 100644 --- a/pom.xml +++ b/pom.xml @@ -39,6 +39,7 @@ 2.8.5 + 1.2 5.0.0-M2 1.0.0-M2 4.12.0-M2 @@ -79,6 +80,11 @@ jackson-core ${jackson.version} + + commons-math + commons-math + ${commons-math.version} + com.fasterxml.jackson.core jackson-databind diff --git a/src/main/java/org/apache/gossip/GossipSettings.java b/src/main/java/org/apache/gossip/GossipSettings.java index 99b5807..12037fd 100644 --- a/src/main/java/org/apache/gossip/GossipSettings.java +++ b/src/main/java/org/apache/gossip/GossipSettings.java @@ -25,11 +25,20 @@ package org.apache.gossip; public class GossipSettings { /** Time between gossip'ing in ms. Default is 1 second. */ - private int gossipInterval = 1000; + private int gossipInterval = 10; /** Time between cleanups in ms. Default is 10 seconds. */ private int cleanupInterval = 10000; + /** the minimum samples needed before reporting a result */ + private int minimumSamples = 1; + + /** the number of samples to keep per host */ + private int windowSize = 5000; + + /** the threshold for the detector */ + //private double convictThreshold = 2.606201185901408; + private double convictThreshold = 4.5; /** * Construct GossipSettings with default settings. */ @@ -44,9 +53,12 @@ public class GossipSettings { * @param cleanupInterval * The cleanup interval in ms. */ - public GossipSettings(int gossipInterval, int cleanupInterval) { + public GossipSettings(int gossipInterval, int cleanupInterval, int windowSize, int minimumSamples, double convictThreshold) { this.gossipInterval = gossipInterval; this.cleanupInterval = cleanupInterval; + this.windowSize = windowSize; + this.minimumSamples = minimumSamples; + this.convictThreshold = convictThreshold; } /** @@ -87,4 +99,33 @@ public class GossipSettings { public int getCleanupInterval() { return cleanupInterval; } + + public int getMinimumSamples() { + return minimumSamples; + } + + public void setMinimumSamples(int minimumSamples) { + this.minimumSamples = minimumSamples; + } + + public int getWindowSize() { + return windowSize; + } + + public void setWindowSize(int windowSize) { + this.windowSize = windowSize; + } + + public double getConvictThreshold() { + return convictThreshold; + } + + public void setConvictThreshold(double convictThreshold) { + this.convictThreshold = convictThreshold; + } + + public void setGossipInterval(int gossipInterval) { + this.gossipInterval = gossipInterval; + } + } diff --git a/src/main/java/org/apache/gossip/LocalGossipMember.java b/src/main/java/org/apache/gossip/LocalGossipMember.java index 97f4ab6..df3bb47 100644 --- a/src/main/java/org/apache/gossip/LocalGossipMember.java +++ b/src/main/java/org/apache/gossip/LocalGossipMember.java @@ -19,52 +19,50 @@ package org.apache.gossip; import java.net.URI; -import javax.management.NotificationListener; +import org.apache.gossip.accrual.FailureDetector; /** * This object represent a gossip member with the properties known locally. These objects are stored - * in the local list of gossip member.s + * in the local list of gossip members. * - * @author harmenw */ public class LocalGossipMember extends GossipMember { - /** The timeout timer for this gossip member. */ - private final transient GossipTimeoutTimer timeoutTimer; + /** The failure detector for this member */ + private transient final FailureDetector detector; /** - * Constructor. * * @param uri * The uri of the member * @param id * id of the node * @param heartbeat - * The current heartbeat. - * @param notificationListener - * @param cleanupTimeout - * The cleanup timeout for this gossip member. + * The current heartbeat */ public LocalGossipMember(String clusterName, URI uri, String id, - long heartbeat, NotificationListener notificationListener, int cleanupTimeout) { + long heartbeat, int windowSize, int minSamples) { super(clusterName, uri, id, heartbeat); - timeoutTimer = new GossipTimeoutTimer(cleanupTimeout, notificationListener, this); + detector = new FailureDetector(this, minSamples, windowSize); } - /** - * Start the timeout timer. - */ - public void startTimeoutTimer() { - timeoutTimer.start(); + public void recordHeartbeat(long now){ + detector.recordHeartbeat(now); + } + + public Double detect(long now) { + return detector.computePhiMeasure(now); } - /** - * Reset the timeout timer. - */ - public void resetTimeoutTimer() { - timeoutTimer.reset(); + @Override + public String toString() { + Double d = null; + try { + d = detect(System.nanoTime()); + } catch (RuntimeException ex) {} + return "LocalGossipMember [uri=" + uri + ", heartbeat=" + heartbeat + ", clusterName=" + + clusterName + ", id=" + id + ", currentdetect=" + d +" ]"; } - public void disableTimer() { - timeoutTimer.removeAllNotifications(); - } + + } diff --git a/src/main/java/org/apache/gossip/StartupSettings.java b/src/main/java/org/apache/gossip/StartupSettings.java index 08f7975..38ccbf3 100644 --- a/src/main/java/org/apache/gossip/StartupSettings.java +++ b/src/main/java/org/apache/gossip/StartupSettings.java @@ -17,10 +17,8 @@ */ package org.apache.gossip; -import java.io.BufferedReader; import java.io.File; import java.io.FileNotFoundException; -import java.io.FileReader; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; @@ -161,28 +159,24 @@ public class StartupSettings { */ public static StartupSettings fromJSONFile(File jsonFile) throws FileNotFoundException, IOException, URISyntaxException { - // Read the file to a String. - StringBuffer buffer = new StringBuffer(); - try (BufferedReader br = new BufferedReader(new FileReader(jsonFile)) ){ - String line; - while ((line = br.readLine()) != null) { - buffer.append(line.trim()); - } - } ObjectMapper om = new ObjectMapper(); JsonNode root = om.readTree(jsonFile); JsonNode jsonObject = root.get(0); String uri = jsonObject.get("uri").textValue(); String id = jsonObject.get("id").textValue(); + //TODO constants as defaults? int gossipInterval = jsonObject.get("gossip_interval").intValue(); int cleanupInterval = jsonObject.get("cleanup_interval").intValue(); + int windowSize = jsonObject.get("window_size").intValue(); + int minSamples = jsonObject.get("minimum_samples").intValue(); + double convictThreshold = jsonObject.get("convict_threshold").asDouble(); String cluster = jsonObject.get("cluster").textValue(); if (cluster == null){ throw new IllegalArgumentException("cluster was null. It is required"); } URI uri2 = new URI(uri); - StartupSettings settings = new StartupSettings(id, uri2, new GossipSettings(gossipInterval, - cleanupInterval), cluster); + StartupSettings settings = new StartupSettings(id, uri2, + new GossipSettings(gossipInterval, cleanupInterval, windowSize, minSamples, convictThreshold), cluster); String configMembersDetails = "Config-members ["; JsonNode membersJSON = jsonObject.get("members"); Iterator it = membersJSON.iterator(); @@ -196,8 +190,6 @@ public class StartupSettings { configMembersDetails += ", "; } log.info(configMembersDetails + "]"); - - // Return the created settings object. return settings; } } diff --git a/src/main/java/org/apache/gossip/accrual/FailureDetector.java b/src/main/java/org/apache/gossip/accrual/FailureDetector.java new file mode 100644 index 0000000..296e79f --- /dev/null +++ b/src/main/java/org/apache/gossip/accrual/FailureDetector.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.accrual; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.math.MathException; +import org.apache.commons.math.distribution.ExponentialDistributionImpl; +import org.apache.commons.math.stat.descriptive.DescriptiveStatistics; +import org.apache.gossip.LocalGossipMember; +import org.apache.log4j.Logger; + +public class FailureDetector { + + private static final Logger LOGGER = Logger.getLogger(FailureDetector.class); + private final DescriptiveStatistics descriptiveStatistics; + private final long minimumSamples; + private volatile long latestHeartbeatMs = -1; + private final LocalGossipMember parent; + + public FailureDetector(LocalGossipMember parent, long minimumSamples, int windowSize){ + this.parent = parent; + descriptiveStatistics = new DescriptiveStatistics(windowSize); + this.minimumSamples = minimumSamples; + } + + /** + * Updates the statistics based on the delta between the last + * heartbeat and supplied time + * @param now the time of the heartbeat in milliseconds + */ + public void recordHeartbeat(long now){ + if (now < latestHeartbeatMs) + return; + synchronized (descriptiveStatistics) { + if (latestHeartbeatMs != -1){ + descriptiveStatistics.addValue(now - latestHeartbeatMs); + } else { + latestHeartbeatMs = now; + } + } + } + + public Double computePhiMeasure(long now) { + if (latestHeartbeatMs == -1 || descriptiveStatistics.getN() < minimumSamples) { + LOGGER.debug( + String.format( "%s latests %s samples %s minumumSamples %s", parent.getId(), latestHeartbeatMs, descriptiveStatistics.getN(), minimumSamples)); + return null; + } + synchronized (descriptiveStatistics) { + long delta = now - latestHeartbeatMs; + try { + //double probability = 1.0d - new NormalDistributionImpl(descriptiveStatistics.getMean(), descriptiveStatistics.getVariance()).cumulativeProbability(delta); + double probability = 1.0d - new ExponentialDistributionImpl(descriptiveStatistics.getMean()).cumulativeProbability(delta); + //LOGGER.warn (parent.getId() + " worked "+ -1.0d * Math.log10(probability)); + return -1.0d * Math.log10(probability); + } catch (MathException | IllegalArgumentException e) { + //LOGGER.warn(parent.getId() + " Exception while computing phi", e); + //LOGGER.warn(descriptiveStatistics); + //LOGGER.warn(descriptiveStatistics.getMean()); + List x = new ArrayList<>(); + for (double z : descriptiveStatistics.getValues()){ + x.add(z); + } + //LOGGER.warn(x); + //LOGGER.warn(parent.getId() + " " + descriptiveStatistics); + throw new IllegalArgumentException(e); + } + } + } +} diff --git a/src/main/java/org/apache/gossip/examples/StandAloneNode.java b/src/main/java/org/apache/gossip/examples/StandAloneNode.java new file mode 100644 index 0000000..b6784cc --- /dev/null +++ b/src/main/java/org/apache/gossip/examples/StandAloneNode.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.examples; + +import java.net.URI; +import java.net.UnknownHostException; +import java.util.Arrays; + +import org.apache.gossip.GossipService; +import org.apache.gossip.GossipSettings; +import org.apache.gossip.RemoteGossipMember; + +public class StandAloneNode { + public static void main (String [] args) throws UnknownHostException, InterruptedException{ + GossipSettings s = new GossipSettings(); + s.setWindowSize(10); + s.setConvictThreshold(1.0); + s.setGossipInterval(10); + GossipService gossipService = new GossipService("mycluster", URI.create(args[0]), args[1], + Arrays.asList( new RemoteGossipMember("mycluster", URI.create(args[2]), args[3])), s, (a,b) -> {} ); + gossipService.start(); + while (true){ + System.out.println( "Live: " + gossipService.getGossipManager().getLiveMembers()); + System.out.println( "Dead: " + gossipService.getGossipManager().getDeadMembers()); + Thread.sleep(2000); + } + } +} diff --git a/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java b/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java index 29f4688..e6248dc 100644 --- a/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java +++ b/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java @@ -24,9 +24,12 @@ import java.util.List; import java.util.Map.Entry; import java.util.Random; import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.gossip.LocalGossipMember; @@ -56,6 +59,8 @@ public class ActiveGossipThread { private final Random random; private final GossipCore gossipCore; private ScheduledExecutorService scheduledExecutorService; + private final BlockingQueue workQueue; + private ThreadPoolExecutor threadService; private ObjectMapper MAPPER = new ObjectMapper(); public ActiveGossipThread(GossipManager gossipManager, GossipCore gossipCore) { @@ -63,14 +68,18 @@ public class ActiveGossipThread { random = new Random(); this.gossipCore = gossipCore; this.scheduledExecutorService = Executors.newScheduledThreadPool(2); + workQueue = new ArrayBlockingQueue(1024); + threadService = new ThreadPoolExecutor(1, 30, 1, TimeUnit.SECONDS, workQueue, new ThreadPoolExecutor.DiscardOldestPolicy()); } public void init() { scheduledExecutorService.scheduleAtFixedRate( - () -> sendMembershipList(gossipManager.getMyself(), gossipManager.getLiveMembers()), 0, + () -> { + threadService.execute( () -> { sendToALiveMember(); }); + }, 0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS); scheduledExecutorService.scheduleAtFixedRate( - () -> sendMembershipList(gossipManager.getMyself(), gossipManager.getDeadMembers()), 0, + () -> { this.sendToDeadMember(); }, 0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS); scheduledExecutorService.scheduleAtFixedRate( () -> sendPerNodeData(gossipManager.getMyself(), gossipManager.getLiveMembers()), 0, @@ -155,26 +164,33 @@ public class ActiveGossipThread { } } + protected void sendToALiveMember(){ + LocalGossipMember member = selectPartner(gossipManager.getLiveMembers()); + sendMembershipList(gossipManager.getMyself(), member); + } + + protected void sendToDeadMember(){ + LocalGossipMember member = selectPartner(gossipManager.getDeadMembers()); + sendMembershipList(gossipManager.getMyself(), member); + } /** * Performs the sending of the membership list, after we have incremented our own heartbeat. */ - protected void sendMembershipList(LocalGossipMember me, List memberList) { - me.setHeartbeat(System.currentTimeMillis()); - LocalGossipMember member = selectPartner(memberList); + protected void sendMembershipList(LocalGossipMember me, LocalGossipMember member) { + me.setHeartbeat(System.nanoTime()); if (member == null) { LOGGER.debug("Send sendMembershipList() is called without action"); return; } else { LOGGER.debug("Send sendMembershipList() is called to " + member.toString()); } - try (DatagramSocket socket = new DatagramSocket()) { socket.setSoTimeout(gossipManager.getSettings().getGossipInterval()); UdpActiveGossipMessage message = new UdpActiveGossipMessage(); message.setUriFrom(gossipManager.getMyself().getUri().toASCIIString()); message.setUuid(UUID.randomUUID().toString()); message.getMembers().add(convert(me)); - for (LocalGossipMember other : memberList) { + for (LocalGossipMember other : gossipManager.getMembers().keySet()) { message.getMembers().add(convert(other)); } byte[] json_bytes = MAPPER.writeValueAsString(message).getBytes(); @@ -184,7 +200,7 @@ public class ActiveGossipThread { if (r instanceof ActiveGossipOk){ //maybe count metrics here } else { - LOGGER.warn("Message "+ message + " generated response "+ r); + LOGGER.debug("Message " + message + " generated response " + r); } } else { LOGGER.error("The length of the to be send message is too large (" diff --git a/src/main/java/org/apache/gossip/manager/GossipCore.java b/src/main/java/org/apache/gossip/manager/GossipCore.java index 0ab56a0..eaea8f6 100644 --- a/src/main/java/org/apache/gossip/manager/GossipCore.java +++ b/src/main/java/org/apache/gossip/manager/GossipCore.java @@ -25,6 +25,7 @@ import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; import java.util.List; +import java.util.Map.Entry; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; @@ -33,12 +34,12 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.ThreadPoolExecutor.DiscardPolicy; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.apache.gossip.GossipMember; import org.apache.gossip.LocalGossipMember; import org.apache.gossip.RemoteGossipMember; +import org.apache.gossip.event.GossipState; import org.apache.gossip.model.ActiveGossipMessage; import org.apache.gossip.model.Base; import org.apache.gossip.model.GossipDataMessage; @@ -71,7 +72,7 @@ public class GossipCore { this.gossipManager = manager; requests = new ConcurrentHashMap<>(); workQueue = new ArrayBlockingQueue<>(1024); - service = new ThreadPoolExecutor(1, 5, 1, TimeUnit.SECONDS, workQueue, new DiscardPolicy()); + service = new ThreadPoolExecutor(1, 5, 1, TimeUnit.SECONDS, workQueue, new ThreadPoolExecutor.DiscardOldestPolicy()); perNodeData = new ConcurrentHashMap<>(); sharedData = new ConcurrentHashMap<>(); } @@ -195,6 +196,11 @@ public class GossipCore { } public Response send(Base message, URI uri){ + if (LOGGER.isDebugEnabled()){ + LOGGER.debug("Sending " + message); + LOGGER.debug("Current request queue " + requests); + } + final Trackable t; if (message instanceof Trackable){ t = (Trackable) message; @@ -223,7 +229,8 @@ public class GossipCore { }); try { - return response.get(10, TimeUnit.SECONDS); + //TODO this needs to be a setting base on attempts/second + return response.get(1, TimeUnit.SECONDS); } catch (InterruptedException e) { throw new RuntimeException(e); } catch (ExecutionException e) { @@ -261,81 +268,67 @@ public class GossipCore { /** - * Merge remote list (received from peer), and our local member list. Simply, we must update the - * heartbeats that the remote list has with our list. Also, some additional logic is needed to - * make sure we have not timed out a member and then immediately received a list with that member. + * Merge lists from remote members and update heartbeats * * @param gossipManager * @param senderMember * @param remoteList * - * COPIED FROM PASSIVE GOSSIP THREAD */ protected void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember, List remoteList) { - + if (LOGGER.isDebugEnabled()){ + debugState(senderMember, remoteList); + } // if the person sending to us is in the dead list consider them up for (LocalGossipMember i : gossipManager.getDeadMembers()) { if (i.getId().equals(senderMember.getId())) { - LOGGER.info(gossipManager.getMyself() + " contacted by dead member " + senderMember.getUri()); - LocalGossipMember newLocalMember = new LocalGossipMember(senderMember.getClusterName(), - senderMember.getUri(), senderMember.getId(), - senderMember.getHeartbeat(), gossipManager, gossipManager.getSettings() - .getCleanupInterval()); - gossipManager.reviveMember(newLocalMember); - newLocalMember.startTimeoutTimer(); + LOGGER.debug(gossipManager.getMyself() + " contacted by dead member " + senderMember.getUri()); + i.recordHeartbeat(senderMember.getHeartbeat()); + i.setHeartbeat(senderMember.getHeartbeat()); + //TODO set node to UP here + } } for (GossipMember remoteMember : remoteList) { if (remoteMember.getId().equals(gossipManager.getMyself().getId())) { continue; } - if (gossipManager.getLiveMembers().contains(remoteMember)) { - LocalGossipMember localMember = gossipManager.getLiveMembers().get( - gossipManager.getLiveMembers().indexOf(remoteMember)); - if (remoteMember.getHeartbeat() > localMember.getHeartbeat()) { - localMember.setHeartbeat(remoteMember.getHeartbeat()); - localMember.resetTimeoutTimer(); - } - } else if (!gossipManager.getLiveMembers().contains(remoteMember) - && !gossipManager.getDeadMembers().contains(remoteMember)) { - LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getClusterName(), - remoteMember.getUri(), remoteMember.getId(), - remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings() - .getCleanupInterval()); - gossipManager.createOrReviveMember(newLocalMember); - newLocalMember.startTimeoutTimer(); - } else { - if (gossipManager.getDeadMembers().contains(remoteMember)) { - LocalGossipMember localDeadMember = gossipManager.getDeadMembers().get( - gossipManager.getDeadMembers().indexOf(remoteMember)); - if (remoteMember.getHeartbeat() > localDeadMember.getHeartbeat()) { - LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getClusterName(), - remoteMember.getUri(), remoteMember.getId(), - remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings() - .getCleanupInterval()); - gossipManager.reviveMember(newLocalMember); - newLocalMember.startTimeoutTimer(); - LOGGER.debug("Removed remote member " + remoteMember.getAddress() - + " from dead list and added to local member list."); - } else { - LOGGER.debug("me " + gossipManager.getMyself()); - LOGGER.debug("sender " + senderMember); - LOGGER.debug("remote " + remoteList); - LOGGER.debug("live " + gossipManager.getLiveMembers()); - LOGGER.debug("dead " + gossipManager.getDeadMembers()); + LocalGossipMember m = new LocalGossipMember(remoteMember.getClusterName(), + remoteMember.getUri(), + remoteMember.getId(), + remoteMember.getHeartbeat(), + gossipManager.getSettings().getWindowSize(), + gossipManager.getSettings().getMinimumSamples()); + m.recordHeartbeat(remoteMember.getHeartbeat()); + + Object result = gossipManager.getMembers().putIfAbsent(m, GossipState.UP); + if (result != null){ + for (Entry l : gossipManager.getMembers().entrySet()){ + if (l.getKey().getId().equals(remoteMember.getId())){ + //if (l.getKey().getHeartbeat() < remoteMember.getHeartbeat()){ + l.getKey().recordHeartbeat(remoteMember.getHeartbeat()); + l.getKey().setHeartbeat(remoteMember.getHeartbeat()); + //} } - } else { - LOGGER.debug("me " + gossipManager.getMyself()); - LOGGER.debug("sender " + senderMember); - LOGGER.debug("remote " + remoteList); - LOGGER.debug("live " + gossipManager.getLiveMembers()); - LOGGER.debug("dead " + gossipManager.getDeadMembers()); - // throw new IllegalArgumentException("wtf"); } } } + if (LOGGER.isDebugEnabled()){ + debugState(senderMember, remoteList); + } } - + private void debugState(RemoteGossipMember senderMember, + List remoteList){ + LOGGER.warn( + "-----------------------\n" + + "Me " + gossipManager.getMyself() + "\n" + + "Sender " + senderMember + "\n" + + "RemoteList " + remoteList + "\n" + + "Live " + gossipManager.getLiveMembers()+ "\n" + + "Dead " + gossipManager.getDeadMembers()+ "\n" + + "======================="); + } + } diff --git a/src/main/java/org/apache/gossip/manager/GossipManager.java b/src/main/java/org/apache/gossip/manager/GossipManager.java index cd6e0a9..2b081d0 100644 --- a/src/main/java/org/apache/gossip/manager/GossipManager.java +++ b/src/main/java/org/apache/gossip/manager/GossipManager.java @@ -27,16 +27,13 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import javax.management.Notification; -import javax.management.NotificationListener; - import org.apache.log4j.Logger; import org.apache.gossip.GossipMember; -import org.apache.gossip.GossipService; import org.apache.gossip.GossipSettings; import org.apache.gossip.LocalGossipMember; import org.apache.gossip.event.GossipListener; @@ -47,7 +44,7 @@ import org.apache.gossip.model.GossipDataMessage; import org.apache.gossip.model.SharedGossipDataMessage; -public abstract class GossipManager implements NotificationListener { +public abstract class GossipManager { public static final Logger LOGGER = Logger.getLogger(GossipManager.class); @@ -74,6 +71,8 @@ public abstract class GossipManager implements NotificationListener { private final DataReaper dataReaper; private final Clock clock; + + private final ScheduledExecutorService scheduledServiced; public GossipManager(String cluster, URI uri, String id, GossipSettings settings, @@ -83,60 +82,26 @@ public abstract class GossipManager implements NotificationListener { gossipCore = new GossipCore(this); clock = new SystemClock(); dataReaper = new DataReaper(gossipCore, clock); - me = new LocalGossipMember(cluster, uri, id, System.currentTimeMillis(), this, - settings.getCleanupInterval()); + me = new LocalGossipMember(cluster, uri, id, clock.nanoTime(), + + settings.getWindowSize(), settings.getMinimumSamples()); members = new ConcurrentSkipListMap<>(); for (GossipMember startupMember : gossipMembers) { if (!startupMember.equals(me)) { LocalGossipMember member = new LocalGossipMember(startupMember.getClusterName(), startupMember.getUri(), startupMember.getId(), - System.currentTimeMillis(), this, settings.getCleanupInterval()); - members.put(member, GossipState.UP); - GossipService.LOGGER.debug(member); + clock.nanoTime(), settings.getWindowSize(), settings.getMinimumSamples()); + //TODO should members start in down state? + members.put(member, GossipState.DOWN); } } gossipThreadExecutor = Executors.newCachedThreadPool(); gossipServiceRunning = new AtomicBoolean(true); this.listener = listener; - Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { - public void run() { - GossipService.LOGGER.debug("Service has been shutdown..."); - } - })); + this.scheduledServiced = Executors.newScheduledThreadPool(1); } - /** - * All timers associated with a member will trigger this method when it goes off. The timer will - * go off if we have not heard from this member in _settings.T_CLEANUP time. - */ - @Override - public void handleNotification(Notification notification, Object handback) { - LocalGossipMember deadMember = (LocalGossipMember) notification.getUserData(); - GossipService.LOGGER.debug("Dead member detected: " + deadMember); - members.put(deadMember, GossipState.DOWN); - if (listener != null) { - listener.gossipEvent(deadMember, GossipState.DOWN); - } - } - - public void reviveMember(LocalGossipMember m) { - for (Entry it : this.members.entrySet()) { - if (it.getKey().getId().equals(m.getId())) { - it.getKey().disableTimer(); - } - } - members.remove(m); - members.put(m, GossipState.UP); - if (listener != null) { - listener.gossipEvent(m, GossipState.UP); - } - } - - public void createOrReviveMember(LocalGossipMember m) { - members.put(m, GossipState.UP); - if (listener != null) { - listener.gossipEvent(m, GossipState.UP); - } + public ConcurrentSkipListMap getMembers() { + return members; } public GossipSettings getSettings() { @@ -181,17 +146,44 @@ public abstract class GossipManager implements NotificationListener { * thread and start the receiver thread. */ public void init() { - for (LocalGossipMember member : members.keySet()) { - if (member != me) { - member.startTimeoutTimer(); - } - } passiveGossipThread = new OnlyProcessReceivedPassiveGossipThread(this, gossipCore); gossipThreadExecutor.execute(passiveGossipThread); activeGossipThread = new ActiveGossipThread(this, this.gossipCore); activeGossipThread.init(); dataReaper.init(); - GossipService.LOGGER.debug("The GossipService is started."); + scheduledServiced.scheduleAtFixedRate(() -> { + try { + for (Entry entry : members.entrySet()) { + Double result = null; + try { + result = entry.getKey().detect(clock.nanoTime()); + //System.out.println(entry.getKey() +" "+ result); + if (result != null) { + if (result > settings.getConvictThreshold() && entry.getValue() == GossipState.UP) { + members.put(entry.getKey(), GossipState.DOWN); + listener.gossipEvent(entry.getKey(), GossipState.DOWN); + } + if (result <= settings.getConvictThreshold() && entry.getValue() == GossipState.DOWN) { + members.put(entry.getKey(), GossipState.UP); + listener.gossipEvent(entry.getKey(), GossipState.UP); + } + } + } catch (IllegalArgumentException ex) { + //0.0 returns throws exception computing the mean. + long now = clock.nanoTime(); + long nowInMillis = TimeUnit.MILLISECONDS.convert(now,TimeUnit.NANOSECONDS); + if (nowInMillis - settings.getCleanupInterval() > entry.getKey().getHeartbeat() && entry.getValue() == GossipState.UP){ + LOGGER.warn("Marking down"); + members.put(entry.getKey(), GossipState.DOWN); + listener.gossipEvent(entry.getKey(), GossipState.DOWN); + } + } //end catch + } // end for + } catch (RuntimeException ex) { + LOGGER.warn("scheduled state had exception", ex); + } + }, 0, 100, TimeUnit.MILLISECONDS); + LOGGER.debug("The GossipManager is started."); } /** diff --git a/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java b/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java index 26541ca..1444181 100644 --- a/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java +++ b/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java @@ -84,8 +84,8 @@ public class RandomGossipManager extends GossipManager { checkArgument(cluster != null, "You must specify a cluster name"); checkArgument(settings != null, "You must specify gossip settings"); checkArgument(uri != null, "You must specify a uri"); - if (this.gossipMembers == null) { - this.gossipMembers = new ArrayList<>(); + if (gossipMembers == null) { + gossipMembers = new ArrayList<>(); } return new RandomGossipManager(cluster, uri, id, settings, gossipMembers, listener); } diff --git a/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java b/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java index 49eac46..1cdb9ac 100644 --- a/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java +++ b/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java @@ -23,6 +23,7 @@ import java.net.URI; import java.net.URISyntaxException; import java.net.UnknownHostException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Random; import java.util.UUID; @@ -31,8 +32,6 @@ import java.util.concurrent.TimeUnit; import org.apache.log4j.Logger; -import org.apache.gossip.event.GossipListener; -import org.apache.gossip.event.GossipState; import org.junit.platform.runner.JUnitPlatform; import org.junit.jupiter.api.Test; @@ -46,31 +45,20 @@ public class ShutdownDeadtimeTest { @Test public void DeadNodesDoNotComeAliveAgain() throws InterruptedException, UnknownHostException, URISyntaxException { - GossipSettings settings = new GossipSettings(1000, 10000); + GossipSettings settings = new GossipSettings(1000, 10000, 1000, 1, 5.0); String cluster = UUID.randomUUID().toString(); - - log.info("Adding seed nodes"); int seedNodes = 3; List startupMembers = new ArrayList<>(); for (int i = 1; i < seedNodes + 1; ++i) { - URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i)); + URI uri = new URI("udp://" + "127.0.0.1" + ":" + (30300 + i)); startupMembers.add(new RemoteGossipMember(cluster, uri, i + "")); } - - log.info("Adding clients"); - final List clients = new ArrayList<>(); + final List clients = Collections.synchronizedList(new ArrayList()); final int clusterMembers = 5; for (int i = 1; i < clusterMembers + 1; ++i) { - final int j = i; - URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i)); + URI uri = new URI("udp://" + "127.0.0.1" + ":" + (30300 + i)); GossipService gossipService = new GossipService(cluster, uri, i + "", startupMembers, - settings, new GossipListener() { - @Override - public void gossipEvent(GossipMember member, GossipState state) { - System.out.println(System.currentTimeMillis() + " Member " + j + " reports " - + member + " " + state); - } - }); + settings, (a,b) -> {}); clients.add(gossipService); gossipService.start(); } @@ -100,7 +88,7 @@ public class ShutdownDeadtimeTest { } return total; } - }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(16); + }).afterWaitingAtMost(40, TimeUnit.SECONDS).isEqualTo(16); clients.remove(randomClientId); TUnit.assertThat(new Callable() { @@ -111,17 +99,12 @@ public class ShutdownDeadtimeTest { } return total; } - }).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(4); + }).afterWaitingAtMost(30, TimeUnit.SECONDS).isEqualTo(4); URI uri = new URI("udp://" + "127.0.0.1" + ":" + shutdownPort); // start client again GossipService gossipService = new GossipService(cluster, uri, shutdownId + "", startupMembers, - settings, new GossipListener() { - @Override - public void gossipEvent(GossipMember member, GossipState state) { - // System.out.println("revived " + member+" "+ state); - } - }); + settings, (a,b) -> {}); clients.add(gossipService); gossipService.start(); @@ -134,7 +117,7 @@ public class ShutdownDeadtimeTest { } return total; } - }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(20); + }).afterWaitingAtMost(60, TimeUnit.SECONDS).isEqualTo(20); for (int i = 0; i < clusterMembers; ++i) { clients.get(i).shutdown(); diff --git a/src/test/java/org/apache/gossip/StartupSettingsTest.java b/src/test/java/org/apache/gossip/StartupSettingsTest.java index 1c0826b..73c758a 100644 --- a/src/test/java/org/apache/gossip/StartupSettingsTest.java +++ b/src/test/java/org/apache/gossip/StartupSettingsTest.java @@ -48,28 +48,16 @@ public class StartupSettingsTest { @Test public void testUsingSettingsFile() throws IOException, InterruptedException, URISyntaxException { File settingsFile = File.createTempFile("gossipTest",".json"); - log.debug( "Using settings file: " + settingsFile.getAbsolutePath() ); settingsFile.deleteOnExit(); writeSettingsFile(settingsFile); URI uri = new URI("udp://" + "127.0.0.1" + ":" + 50000); final GossipService firstService = new GossipService( - CLUSTER, uri, UUID.randomUUID().toString(), + CLUSTER, uri, "1", new ArrayList(), new GossipSettings(), null); - firstService.start(); - - TUnit.assertThat(new Callable (){ - public Integer call() throws Exception { - return firstService.getGossipManager().getLiveMembers().size(); - }}).afterWaitingAtMost(30, TimeUnit.SECONDS).isEqualTo(0); final GossipService serviceUnderTest = new GossipService( - StartupSettings.fromJSONFile( settingsFile ) - ); + StartupSettings.fromJSONFile(settingsFile)); serviceUnderTest.start(); - TUnit.assertThat(new Callable (){ - public Integer call() throws Exception { - return serviceUnderTest.getGossipManager().getLiveMembers().size(); - }}).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(1); firstService.shutdown(); serviceUnderTest.shutdown(); } @@ -78,10 +66,13 @@ public class StartupSettingsTest { String settings = "[{\n" + // It is odd that this is meant to be in an array, but oh well. " \"cluster\":\"" + CLUSTER + "\",\n" + - " \"id\":\"" + UUID.randomUUID() + "\",\n" + + " \"id\":\"" + "2" + "\",\n" + " \"uri\":\"udp://127.0.0.1:50001\",\n" + " \"gossip_interval\":1000,\n" + + " \"window_size\":1000,\n" + + " \"minimum_samples\":5,\n" + " \"cleanup_interval\":10000,\n" + + " \"convict_threshold\":2.6,\n" + " \"members\":[\n" + " {\"cluster\": \"" + CLUSTER + "\",\"uri\":\"udp://127.0.0.1:5000\"}\n" + " ]\n" + diff --git a/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java b/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java index b663a6e..aa9e2e8 100644 --- a/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java +++ b/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java @@ -29,54 +29,39 @@ import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; import org.junit.platform.runner.JUnitPlatform; import org.junit.runner.RunWith; -import org.apache.log4j.Logger; - -import org.apache.gossip.event.GossipListener; -import org.apache.gossip.event.GossipState; import org.junit.jupiter.api.Test; @RunWith(JUnitPlatform.class) public class TenNodeThreeSeedTest { - private static final Logger log = Logger.getLogger( TenNodeThreeSeedTest.class ); @Test - public void test() throws UnknownHostException, InterruptedException, URISyntaxException{ - abc(); + public void test() throws UnknownHostException, InterruptedException, URISyntaxException { + abc(30150); } @Test public void testAgain() throws UnknownHostException, InterruptedException, URISyntaxException{ - abc(); + abc(30100); } - public void abc() throws InterruptedException, UnknownHostException, URISyntaxException{ + public void abc(int base) throws InterruptedException, UnknownHostException, URISyntaxException{ GossipSettings settings = new GossipSettings(); String cluster = UUID.randomUUID().toString(); - - log.info( "Adding seed nodes" ); int seedNodes = 3; List startupMembers = new ArrayList<>(); for (int i = 1; i < seedNodes+1; ++i) { - URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i)); + URI uri = new URI("udp://" + "127.0.0.1" + ":" + (base + i)); startupMembers.add(new RemoteGossipMember(cluster, uri, i + "")); } - - log.info( "Adding clients" ); final List clients = new ArrayList<>(); final int clusterMembers = 5; for (int i = 1; i < clusterMembers+1; ++i) { - URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i)); + URI uri = new URI("udp://" + "127.0.0.1" + ":" + (base + i)); GossipService gossipService = new GossipService(cluster, uri, i + "", - startupMembers, settings, - new GossipListener(){ - @Override - public void gossipEvent(GossipMember member, GossipState state) { - log.info(member+" "+ state); - } - }); + startupMembers, settings, (a,b) -> {}); clients.add(gossipService); gossipService.start(); - } + } TUnit.assertThat(new Callable (){ public Integer call() throws Exception { int total = 0; @@ -84,8 +69,8 @@ public class TenNodeThreeSeedTest { total += clients.get(i).getGossipManager().getLiveMembers().size(); } return total; - }}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(20); - + }}).afterWaitingAtMost(40, TimeUnit.SECONDS).isEqualTo(20); + for (int i = 0; i < clusterMembers; ++i) { clients.get(i).shutdown(); } diff --git a/src/test/java/org/apache/gossip/accrual/FailureDetectorTest.java b/src/test/java/org/apache/gossip/accrual/FailureDetectorTest.java new file mode 100644 index 0000000..f9ff9ff --- /dev/null +++ b/src/test/java/org/apache/gossip/accrual/FailureDetectorTest.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.accrual; + +import java.net.URI; + +import org.apache.gossip.LocalGossipMember; +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.jupiter.api.Test; +import org.junit.platform.runner.JUnitPlatform; +import org.junit.runner.RunWith; + +@RunWith(JUnitPlatform.class) +public class FailureDetectorTest { + + @Test + public void aTest(){ + int samples = 1; + int windowSize = 1000; + LocalGossipMember member = new LocalGossipMember("", URI.create("udp://127.0.0.1:1000"), "", 0L, windowSize, samples); + member.recordHeartbeat(5); + member.recordHeartbeat(10); + Assert.assertEquals(new Double(0.4342944819032518), member.detect(10)); + Assert.assertEquals(new Double(0.5211533782839021), member.detect(11)); + Assert.assertEquals(new Double(1.3028834457097553), member.detect(20)); + Assert.assertEquals(new Double(3.9), member.detect(50), .01); + Assert.assertEquals(new Double(8.25), member.detect(100), .01); + Assert.assertEquals(new Double(12.6), member.detect(150), .01); + Assert.assertEquals(new Double(14.77), member.detect(175), .01); + Assert.assertEquals(new Double(Double.POSITIVE_INFINITY), member.detect(500), .01); + member.recordHeartbeat(4); + Assert.assertEquals(new Double(12.6), member.detect(150), .01); + } + + @Ignore + public void sameHeartbeatTest(){ + int samples = 1; + int windowSize = 1000; + LocalGossipMember member = new LocalGossipMember("", URI.create("udp://127.0.0.1:1000"), "", 0L, windowSize, samples); + member.recordHeartbeat(5); + member.recordHeartbeat(5); + member.recordHeartbeat(5); + Assert.assertEquals(new Double(0.4342944819032518), member.detect(10)); + } + +} diff --git a/src/test/java/org/apache/gossip/manager/DataReaperTest.java b/src/test/java/org/apache/gossip/manager/DataReaperTest.java index d19c802..1b5c35e 100644 --- a/src/test/java/org/apache/gossip/manager/DataReaperTest.java +++ b/src/test/java/org/apache/gossip/manager/DataReaperTest.java @@ -37,7 +37,8 @@ public class DataReaperTest { String value = "a"; GossipSettings settings = new GossipSettings(); GossipManager gm = RandomGossipManager.newBuilder().cluster("abc").settings(settings) - .withId(myId).uri(URI.create("udp://localhost:5000")).build(); + .withId(myId).uri(URI.create("udp://localhost:6000")).build(); + gm.init(); gm.gossipPerNodeData(perNodeDatum(key, value)); gm.gossipSharedData(sharedDatum(key, value)); Assert.assertEquals(value, gm.findPerNodeGossipData(myId, key).getPayload()); @@ -46,6 +47,7 @@ public class DataReaperTest { gm.getDataReaper().runSharedOnce(); TUnit.assertThat(() -> gm.findPerNodeGossipData(myId, key)).equals(null); TUnit.assertThat(() -> gm.findSharedGossipData(key)).equals(null); + gm.shutdown(); } private GossipDataMessage perNodeDatum(String key, String value) { @@ -74,7 +76,8 @@ public class DataReaperTest { String value = "a"; GossipSettings settings = new GossipSettings(); GossipManager gm = RandomGossipManager.newBuilder().cluster("abc").settings(settings) - .withId(myId).uri(URI.create("udp://localhost:5000")).build(); + .withId(myId).uri(URI.create("udp://localhost:7000")).build(); + gm.init(); GossipDataMessage before = perNodeDatum(key, value); GossipDataMessage after = perNodeDatum(key, "b"); after.setTimestamp(after.getTimestamp() - 1); @@ -82,6 +85,7 @@ public class DataReaperTest { Assert.assertEquals(value, gm.findPerNodeGossipData(myId, key).getPayload()); gm.gossipPerNodeData(after); Assert.assertEquals(value, gm.findPerNodeGossipData(myId, key).getPayload()); + gm.shutdown(); } } diff --git a/src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java b/src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java index 875a7ab..d9635af 100644 --- a/src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java +++ b/src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java @@ -83,14 +83,14 @@ public class RandomGossipManagerBuilderTest { .uri(new URI("udp://localhost:2000")) .settings(new GossipSettings()) .gossipMembers(null).build(); - assertNotNull(gossipManager.getLiveMembers()); } @Test public void useMemberListIfProvided() throws URISyntaxException { - LocalGossipMember member = new LocalGossipMember("aCluster", new URI("udp://localhost:2000"), "aGossipMember", - System.currentTimeMillis(), new TestNotificationListener(), 60000); + LocalGossipMember member = new LocalGossipMember( + "aCluster", new URI("udp://localhost:2000"), "aGossipMember", + System.nanoTime(), 1000, 1); List memberList = new ArrayList<>(); memberList.add(member); RandomGossipManager gossipManager = RandomGossipManager.newBuilder() @@ -99,8 +99,8 @@ public class RandomGossipManagerBuilderTest { .settings(new GossipSettings()) .uri(new URI("udp://localhost:8000")) .gossipMembers(memberList).build(); - assertEquals(1, gossipManager.getLiveMembers().size()); - assertEquals(member.getId(), gossipManager.getLiveMembers().get(0).getId()); + assertEquals(1, gossipManager.getDeadMembers().size()); + assertEquals(member.getId(), gossipManager.getDeadMembers().get(0).getId()); } } \ No newline at end of file