diff --git a/src/main/java/org/apache/gossip/manager/GossipManager.java b/src/main/java/org/apache/gossip/manager/GossipManager.java index ba8517b..d2f5d20 100644 --- a/src/main/java/org/apache/gossip/manager/GossipManager.java +++ b/src/main/java/org/apache/gossip/manager/GossipManager.java @@ -19,9 +19,9 @@ package org.apache.gossip.manager; import com.codahale.metrics.MetricRegistry; import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.gossip.Member; import org.apache.gossip.GossipSettings; import org.apache.gossip.LocalMember; +import org.apache.gossip.Member; import org.apache.gossip.crdt.Crdt; import org.apache.gossip.event.GossipListener; import org.apache.gossip.event.GossipState; @@ -29,7 +29,6 @@ import org.apache.gossip.manager.handlers.MessageInvoker; import org.apache.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread; import org.apache.gossip.model.PerNodeDataMessage; import org.apache.gossip.model.SharedDataMessage; -import org.apache.gossip.model.ShutdownMessage; import org.apache.log4j.Logger; import java.lang.reflect.Constructor; @@ -63,6 +62,7 @@ public abstract class GossipManager { private final MetricRegistry registry; private final RingStatePersister ringState; private final UserDataPersister userDataState; + private final GossipMemberStateRefresher memberStateRefresher; private final ObjectMapper objectMapper; private final MessageInvoker messageInvoker; @@ -73,7 +73,8 @@ public abstract class GossipManager { ObjectMapper objectMapper, MessageInvoker messageInvoker) { this.settings = settings; this.messageInvoker = messageInvoker; - clock = new SystemClock(); + + clock = new SystemClock(); me = new LocalMember(cluster, uri, id, clock.nanoTime(), properties, settings.getWindowSize(), settings.getMinimumSamples(), settings.getDistribution()); gossipCore = new GossipCore(this, registry); @@ -83,7 +84,7 @@ public abstract class GossipManager { if (!startupMember.equals(me)) { LocalMember member = new LocalMember(startupMember.getClusterName(), startupMember.getUri(), startupMember.getId(), - clock.nanoTime(), startupMember.getProperties(), settings.getWindowSize(), + clock.nanoTime(), startupMember.getProperties(), settings.getWindowSize(), settings.getMinimumSamples(), settings.getDistribution()); //TODO should members start in down state? members.put(member, GossipState.DOWN); @@ -96,6 +97,7 @@ public abstract class GossipManager { this.registry = registry; this.ringState = new RingStatePersister(this); this.userDataState = new UserDataPersister(this, this.gossipCore); + this.memberStateRefresher = new GossipMemberStateRefresher(members, settings, listener, this::findPerNodeGossipData); this.objectMapper = objectMapper; readSavedRingState(); readSavedDataState(); @@ -119,21 +121,21 @@ public abstract class GossipManager { public List getDeadMembers() { return Collections.unmodifiableList( members.entrySet() - .stream() - .filter(entry -> GossipState.DOWN.equals(entry.getValue())) - .map(Entry::getKey).collect(Collectors.toList())); + .stream() + .filter(entry -> GossipState.DOWN.equals(entry.getValue())) + .map(Entry::getKey).collect(Collectors.toList())); } /** - * + * * @return a read only list of members found in the UP state */ public List getLiveMembers() { return Collections.unmodifiableList( members.entrySet() - .stream() - .filter(entry -> GossipState.UP.equals(entry.getValue())) - .map(Entry::getKey).collect(Collectors.toList())); + .stream() + .filter(entry -> GossipState.UP.equals(entry.getValue())) + .map(Entry::getKey).collect(Collectors.toList())); } public LocalMember getMyself() { @@ -148,7 +150,7 @@ public abstract class GossipManager { throw new RuntimeException(e); } } - + /** * Starts the client. Specifically, start the various cycles for this protocol. Start the gossip * thread and start the receiver thread. @@ -161,77 +163,20 @@ public abstract class GossipManager { dataReaper.init(); scheduledServiced.scheduleAtFixedRate(ringState, 60, 60, TimeUnit.SECONDS); scheduledServiced.scheduleAtFixedRate(userDataState, 60, 60, TimeUnit.SECONDS); - scheduledServiced.scheduleAtFixedRate(() -> { - try { - for (Entry entry : members.entrySet()) { - boolean userDown = processOptomisticShutdown(entry); - if (userDown) - continue; - Double result = null; - try { - result = entry.getKey().detect(clock.nanoTime()); - if (result != null) { - if (result > settings.getConvictThreshold() && entry.getValue() == GossipState.UP) { - members.put(entry.getKey(), GossipState.DOWN); - listener.gossipEvent(entry.getKey(), GossipState.DOWN); - } - if (result <= settings.getConvictThreshold() && entry.getValue() == GossipState.DOWN) { - members.put(entry.getKey(), GossipState.UP); - listener.gossipEvent(entry.getKey(), GossipState.UP); - } - } - } catch (IllegalArgumentException ex) { - //0.0 returns throws exception computing the mean. - long now = clock.nanoTime(); - long nowInMillis = TimeUnit.MILLISECONDS.convert(now,TimeUnit.NANOSECONDS); - if (nowInMillis - settings.getCleanupInterval() > entry.getKey().getHeartbeat() && entry.getValue() == GossipState.UP){ - LOGGER.warn("Marking down"); - members.put(entry.getKey(), GossipState.DOWN); - listener.gossipEvent(entry.getKey(), GossipState.DOWN); - } - } //end catch - } // end for - } catch (RuntimeException ex) { - LOGGER.warn("scheduled state had exception", ex); - } - }, 0, 100, TimeUnit.MILLISECONDS); + scheduledServiced.scheduleAtFixedRate(memberStateRefresher, 0, 100, TimeUnit.MILLISECONDS); LOGGER.debug("The GossipManager is started."); } - /** - * If we have a special key the per-node data that means that the node has sent us - * a pre-emptive shutdown message. We process this so node is seen down sooner - * @param l member to consider - * @return true if node forced down - */ - public boolean processOptomisticShutdown(Entry l){ - PerNodeDataMessage m = findPerNodeGossipData(l.getKey().getId(), ShutdownMessage.PER_NODE_KEY); - if (m == null){ - return false; - } - ShutdownMessage s = (ShutdownMessage) m.getPayload(); - if (s.getShutdownAtNanos() > l.getKey().getHeartbeat()){ - if (l.getValue() == GossipState.UP){ - members.put(l.getKey(), GossipState.DOWN); - listener.gossipEvent(l.getKey(), GossipState.DOWN); - } else { - members.put(l.getKey(), GossipState.DOWN); - } - return true; - } - return false; - } - private void readSavedRingState() { for (LocalMember l : ringState.readFromDisk()){ LocalMember member = new LocalMember(l.getClusterName(), l.getUri(), l.getId(), - clock.nanoTime(), l.getProperties(), settings.getWindowSize(), + clock.nanoTime(), l.getProperties(), settings.getWindowSize(), settings.getMinimumSamples(), settings.getDistribution()); members.putIfAbsent(member, GossipState.DOWN); } } - + private void readSavedDataState() { for (Entry> l : userDataState.readPerNodeFromDisk().entrySet()){ for (Entry j : l.getValue().entrySet()){ @@ -274,7 +219,7 @@ public abstract class GossipManager { } scheduledServiced.shutdownNow(); } - + public void gossipPerNodeData(PerNodeDataMessage message){ Objects.nonNull(message.getKey()); Objects.nonNull(message.getTimestamp()); @@ -282,7 +227,7 @@ public abstract class GossipManager { message.setNodeId(me.getId()); gossipCore.addPerNodeData(message); } - + public void gossipSharedData(SharedDataMessage message){ Objects.nonNull(message.getKey()); Objects.nonNull(message.getTimestamp()); @@ -290,7 +235,7 @@ public abstract class GossipManager { message.setNodeId(me.getId()); gossipCore.addSharedData(message); } - + @SuppressWarnings("rawtypes") public Crdt findCrdt(String key){ @@ -304,7 +249,7 @@ public abstract class GossipManager { return (Crdt) l.getPayload(); } } - + @SuppressWarnings("rawtypes") public Crdt merge(SharedDataMessage message){ Objects.nonNull(message.getKey()); @@ -316,7 +261,7 @@ public abstract class GossipManager { } return gossipCore.merge(message); } - + public PerNodeDataMessage findPerNodeGossipData(String nodeId, String key){ ConcurrentHashMap j = gossipCore.getPerNodeData().get(nodeId); if (j == null){ @@ -332,7 +277,7 @@ public abstract class GossipManager { return l; } } - + public SharedDataMessage findSharedGossipData(String key){ SharedDataMessage l = gossipCore.getSharedData().get(key); if (l == null){ @@ -352,11 +297,15 @@ public abstract class GossipManager { public RingStatePersister getRingState() { return ringState; } - + public UserDataPersister getUserDataState() { return userDataState; } + public GossipMemberStateRefresher getMemberStateRefresher() { + return memberStateRefresher; + } + public Clock getClock() { return clock; } @@ -368,5 +317,5 @@ public abstract class GossipManager { public MetricRegistry getRegistry() { return registry; } - + } diff --git a/src/main/java/org/apache/gossip/manager/GossipMemberStateRefresher.java b/src/main/java/org/apache/gossip/manager/GossipMemberStateRefresher.java new file mode 100644 index 0000000..ad2e055 --- /dev/null +++ b/src/main/java/org/apache/gossip/manager/GossipMemberStateRefresher.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gossip.manager; + +import org.apache.gossip.GossipSettings; +import org.apache.gossip.LocalMember; +import org.apache.gossip.event.GossipListener; +import org.apache.gossip.event.GossipState; +import org.apache.gossip.model.PerNodeDataMessage; +import org.apache.gossip.model.ShutdownMessage; +import org.apache.log4j.Logger; + +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.TimeUnit; +import java.util.function.BiFunction; + +public class GossipMemberStateRefresher implements Runnable { + public static final Logger LOGGER = Logger.getLogger(GossipMemberStateRefresher.class); + + private final Map members; + private final GossipSettings settings; + private final GossipListener listener; + private final Clock clock; + private final BiFunction findPerNodeGossipData; + + public GossipMemberStateRefresher(Map members, GossipSettings settings, + GossipListener listener, BiFunction findPerNodeGossipData) { + this.members = members; + this.settings = settings; + this.listener = listener; + this.findPerNodeGossipData = findPerNodeGossipData; + clock = new SystemClock(); + } + + public void run() { + try { + runOnce(); + } catch (RuntimeException ex) { + LOGGER.warn("scheduled state had exception", ex); + } + } + + public void runOnce() { + for (Entry entry : members.entrySet()) { + boolean userDown = processOptimisticShutdown(entry); + if (userDown) + continue; + try { + Double phiMeasure = entry.getKey().detect(clock.nanoTime()); + if (phiMeasure != null) { + GossipState requiredState = calcRequiredState(phiMeasure); + if (entry.getValue() != requiredState) { + members.put(entry.getKey(), requiredState); + listener.gossipEvent(entry.getKey(), requiredState); + } + } + } catch (IllegalArgumentException ex) { + //0.0 returns throws exception computing the mean. + + long now = clock.nanoTime(); + long nowInMillis = TimeUnit.MILLISECONDS.convert(now, TimeUnit.NANOSECONDS); + if (nowInMillis - settings.getCleanupInterval() > entry.getKey().getHeartbeat() && entry.getValue() == GossipState.UP) { + LOGGER.warn("Marking down"); + members.put(entry.getKey(), GossipState.DOWN); + listener.gossipEvent(entry.getKey(), GossipState.DOWN); + } + } //end catch + } // end for + } + + public GossipState calcRequiredState(Double phiMeasure) { + if (phiMeasure > settings.getConvictThreshold()) + return GossipState.DOWN; + else + return GossipState.UP; + } + + /** + * If we have a special key the per-node data that means that the node has sent us + * a pre-emptive shutdown message. We process this so node is seen down sooner + * + * @param l member to consider + * @return true if node forced down + */ + public boolean processOptimisticShutdown(Entry l) { + PerNodeDataMessage m = findPerNodeGossipData.apply(l.getKey().getId(), ShutdownMessage.PER_NODE_KEY); + if (m == null) { + return false; + } + ShutdownMessage s = (ShutdownMessage) m.getPayload(); + if (s.getShutdownAtNanos() > l.getKey().getHeartbeat()) { + members.put(l.getKey(), GossipState.DOWN); + if (l.getValue() == GossipState.UP) { + listener.gossipEvent(l.getKey(), GossipState.DOWN); + } + return true; + } + return false; + } +} \ No newline at end of file diff --git a/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java b/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java index bfce2dd..ae28bf7 100644 --- a/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java +++ b/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java @@ -85,6 +85,7 @@ abstract public class PassiveGossipThread implements Runnable { gossipCore.receive(activeGossipMessage); unsigned.mark(); } + gossipManager.getMemberStateRefresher().run(); } catch (RuntimeException ex) {//TODO trap json exception LOGGER.error("Unable to process message", ex); } diff --git a/src/main/java/org/apache/gossip/manager/handlers/MessageInvokerCombiner.java b/src/main/java/org/apache/gossip/manager/handlers/MessageInvokerCombiner.java index 964da86..cc6ef52 100644 --- a/src/main/java/org/apache/gossip/manager/handlers/MessageInvokerCombiner.java +++ b/src/main/java/org/apache/gossip/manager/handlers/MessageInvokerCombiner.java @@ -24,6 +24,7 @@ import org.apache.gossip.model.Base; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.function.Predicate; public class MessageInvokerCombiner implements MessageInvoker { private final List invokers = new CopyOnWriteArrayList<>(); @@ -32,14 +33,7 @@ public class MessageInvokerCombiner implements MessageInvoker { } public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) { - if (invokers == null) { - return false; - } - boolean result = false; - for (MessageInvoker mi : invokers) { - result = mi.invoke(gossipCore, gossipManager, base) || result; - } - return result; + return invokers.stream().filter((mi) -> mi.invoke(gossipCore, gossipManager, base)).count() > 0; } public void clear() { diff --git a/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java b/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java index 48fb2cb..a91480e 100644 --- a/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java +++ b/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java @@ -81,6 +81,7 @@ public class ShutdownDeadtimeTest { return total; } }).afterWaitingAtMost(40, TimeUnit.SECONDS).isEqualTo(20); + // shutdown one client and verify that one client is lost. Random r = new Random(); int randomClientId = r.nextInt(clusterMembers);