From 296b55fa9f9c754ccc930de21b6d90b7c5475b6c Mon Sep 17 00:00:00 2001 From: Edward Capriolo Date: Mon, 6 Feb 2017 22:09:55 -0500 Subject: [PATCH] GOSSIP-48 user shutdown message --- .../manager/AbstractActiveGossiper.java | 30 +++++++++++ .../org/apache/gossip/manager/DataReaper.java | 2 +- .../DatacenterRackAwareActiveGossiper.java | 34 ++++++++----- .../org/apache/gossip/manager/GossipCore.java | 18 ++++++- .../apache/gossip/manager/GossipManager.java | 34 ++++++++++++- .../gossip/manager/PassiveGossipThread.java | 6 ++- .../gossip/manager/SimpleActiveGossipper.java | 29 +++++------ .../apache/gossip/model/ShutdownMessage.java | 51 +++++++++++++++++++ .../apache/gossip/ShutdownDeadtimeTest.java | 10 ++-- .../apache/gossip/TenNodeThreeSeedTest.java | 9 +++- 10 files changed, 187 insertions(+), 36 deletions(-) create mode 100644 src/main/java/org/apache/gossip/model/ShutdownMessage.java diff --git a/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.java b/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.java index d58aeb9..9fea30b 100644 --- a/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.java +++ b/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.java @@ -18,6 +18,8 @@ package org.apache.gossip.manager; import java.util.Map.Entry; +import java.util.List; +import java.util.Random; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import com.codahale.metrics.Histogram; @@ -28,6 +30,7 @@ import org.apache.gossip.model.GossipDataMessage; import org.apache.gossip.model.GossipMember; import org.apache.gossip.model.Response; import org.apache.gossip.model.SharedGossipDataMessage; +import org.apache.gossip.model.ShutdownMessage; import org.apache.gossip.udp.UdpActiveGossipMessage; import org.apache.gossip.udp.UdpGossipDataMessage; import org.apache.gossip.udp.UdpSharedGossipDataMessage; @@ -47,6 +50,7 @@ public abstract class AbstractActiveGossiper { private final Histogram sharedDataHistogram; private final Histogram sendPerNodeDataHistogram; private final Histogram sendMembershipHistorgram; + private final Random random; public AbstractActiveGossiper(GossipManager gossipManager, GossipCore gossipCore, MetricRegistry registry) { this.gossipManager = gossipManager; @@ -54,6 +58,7 @@ public abstract class AbstractActiveGossiper { sharedDataHistogram = registry.histogram(name(AbstractActiveGossiper.class, "sharedDataHistogram-time")); sendPerNodeDataHistogram = registry.histogram(name(AbstractActiveGossiper.class, "sendPerNodeDataHistogram-time")); sendMembershipHistorgram = registry.histogram(name(AbstractActiveGossiper.class, "sendMembershipHistorgram-time")); + random = new Random(); } public void init() { @@ -64,6 +69,16 @@ public abstract class AbstractActiveGossiper { } + public final void sendShutdownMessage(LocalGossipMember me, LocalGossipMember target){ + if (target == null){ + return; + } + ShutdownMessage m = new ShutdownMessage(); + m.setNodeId(me.getId()); + m.setShutdownAtNanos(gossipManager.getClock().nanoTime()); + gossipCore.sendOneWay(m, target.getUri()); + } + public final void sendSharedData(LocalGossipMember me, LocalGossipMember member){ if (member == null){ return; @@ -138,4 +153,19 @@ public abstract class AbstractActiveGossiper { gm.setProperties(member.getProperties()); return gm; } + + /** + * + * @param memberList + * An immutable list + * @return The chosen LocalGossipMember to gossip with. + */ + protected LocalGossipMember selectPartner(List memberList) { + LocalGossipMember member = null; + if (memberList.size() > 0) { + int randomNeighborIndex = random.nextInt(memberList.size()); + member = memberList.get(randomNeighborIndex); + } + return member; + } } diff --git a/src/main/java/org/apache/gossip/manager/DataReaper.java b/src/main/java/org/apache/gossip/manager/DataReaper.java index 6760685..f165239 100644 --- a/src/main/java/org/apache/gossip/manager/DataReaper.java +++ b/src/main/java/org/apache/gossip/manager/DataReaper.java @@ -77,7 +77,7 @@ public class DataReaper { public void close(){ scheduledExecutor.shutdown(); try { - scheduledExecutor.awaitTermination(5, TimeUnit.SECONDS); + scheduledExecutor.awaitTermination(1, TimeUnit.SECONDS); } catch (InterruptedException e) { } diff --git a/src/main/java/org/apache/gossip/manager/DatacenterRackAwareActiveGossiper.java b/src/main/java/org/apache/gossip/manager/DatacenterRackAwareActiveGossiper.java index 4f5dfdc..c66e332 100644 --- a/src/main/java/org/apache/gossip/manager/DatacenterRackAwareActiveGossiper.java +++ b/src/main/java/org/apache/gossip/manager/DatacenterRackAwareActiveGossiper.java @@ -18,7 +18,6 @@ package org.apache.gossip.manager; import java.util.List; -import java.util.Random; import java.util.ArrayList; import java.util.Collections; import java.util.concurrent.ArrayBlockingQueue; @@ -52,7 +51,6 @@ public class DatacenterRackAwareActiveGossiper extends AbstractActiveGossiper { private ScheduledExecutorService scheduledExecutorService; private final BlockingQueue workQueue; private ThreadPoolExecutor threadService; - private final Random random; public DatacenterRackAwareActiveGossiper(GossipManager gossipManager, GossipCore gossipCore, MetricRegistry registry) { @@ -61,7 +59,6 @@ public class DatacenterRackAwareActiveGossiper extends AbstractActiveGossiper { workQueue = new ArrayBlockingQueue(1024); threadService = new ThreadPoolExecutor(1, 30, 1, TimeUnit.SECONDS, workQueue, new ThreadPoolExecutor.DiscardOldestPolicy()); - random = new Random(); try { sameRackGossipIntervalMs = Integer.parseInt(gossipManager.getSettings() .getActiveGossipProperties().get("sameRackGossipIntervalMs")); @@ -216,19 +213,32 @@ public class DatacenterRackAwareActiveGossiper extends AbstractActiveGossiper { sendSharedData(gossipManager.getMyself(), selectPartner(sameDatacenterDifferentRack())); } - @Override public void shutdown() { super.shutdown(); - } - - protected LocalGossipMember selectPartner(List memberList) { - LocalGossipMember member = null; - if (memberList.size() > 0) { - int randomNeighborIndex = random.nextInt(memberList.size()); - member = memberList.get(randomNeighborIndex); + scheduledExecutorService.shutdown(); + try { + scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOGGER.debug("Issue during shutdown", e); + } + sendShutdownMessage(); + threadService.shutdown(); + try { + threadService.awaitTermination(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOGGER.debug("Issue during shutdown", e); } - return member; } + /** + * sends an optimistic shutdown message to several clusters nodes + */ + protected void sendShutdownMessage(){ + List l = gossipManager.getLiveMembers(); + int sendTo = l.size() < 3 ? 1 : l.size() / 3; + for (int i = 0; i < sendTo; i++) { + threadService.execute(() -> sendShutdownMessage(gossipManager.getMyself(), selectPartner(l))); + } + } } diff --git a/src/main/java/org/apache/gossip/manager/GossipCore.java b/src/main/java/org/apache/gossip/manager/GossipCore.java index 31bd447..82d65fe 100644 --- a/src/main/java/org/apache/gossip/manager/GossipCore.java +++ b/src/main/java/org/apache/gossip/manager/GossipCore.java @@ -44,6 +44,7 @@ import org.apache.gossip.model.Base; import org.apache.gossip.model.GossipDataMessage; import org.apache.gossip.model.Response; import org.apache.gossip.model.SharedGossipDataMessage; +import org.apache.gossip.model.ShutdownMessage; import org.apache.gossip.udp.Trackable; import org.apache.gossip.udp.UdpActiveGossipMessage; import org.apache.gossip.udp.UdpActiveGossipOk; @@ -72,6 +73,10 @@ public class GossipCore implements GossipCoreConstants { private final Meter tranmissionException; private final Meter tranmissionSuccess; + { + MAPPER.enableDefaultTyping(); + } + public GossipCore(GossipManager manager, MetricRegistry metrics){ this.gossipManager = manager; requests = new ConcurrentHashMap<>(); @@ -128,10 +133,11 @@ public class GossipCore implements GossipCoreConstants { public void shutdown(){ service.shutdown(); try { - service.awaitTermination(5, TimeUnit.SECONDS); + service.awaitTermination(1, TimeUnit.SECONDS); } catch (InterruptedException e) { LOGGER.warn(e); } + service.shutdownNow(); } public void receive(Base base){ @@ -141,6 +147,16 @@ public class GossipCore implements GossipCoreConstants { requests.put(t.getUuid() + "/" + t.getUriFrom(), (Base) t); } } + if (base instanceof ShutdownMessage){ + ShutdownMessage s = (ShutdownMessage) base; + GossipDataMessage m = new GossipDataMessage(); + m.setKey(ShutdownMessage.PER_NODE_KEY); + m.setNodeId(s.getNodeId()); + m.setPayload(base); + m.setTimestamp(System.currentTimeMillis()); + m.setExpireAt(System.currentTimeMillis() + 30L * 1000L); + addPerNodeData(m); + } if (base instanceof GossipDataMessage) { UdpGossipDataMessage message = (UdpGossipDataMessage) base; addPerNodeData(message); diff --git a/src/main/java/org/apache/gossip/manager/GossipManager.java b/src/main/java/org/apache/gossip/manager/GossipManager.java index 04afc28..53ed8c7 100644 --- a/src/main/java/org/apache/gossip/manager/GossipManager.java +++ b/src/main/java/org/apache/gossip/manager/GossipManager.java @@ -47,6 +47,7 @@ import org.apache.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread; import org.apache.gossip.model.GossipDataMessage; import org.apache.gossip.model.SharedGossipDataMessage; +import org.apache.gossip.model.ShutdownMessage; public abstract class GossipManager { @@ -159,6 +160,9 @@ public abstract class GossipManager { scheduledServiced.scheduleAtFixedRate(() -> { try { for (Entry entry : members.entrySet()) { + boolean userDown = processOptomisticShutdown(entry); + if (userDown) + continue; Double result = null; try { result = entry.getKey().detect(clock.nanoTime()); @@ -190,6 +194,30 @@ public abstract class GossipManager { LOGGER.debug("The GossipManager is started."); } + /** + * If we have a special key the per-node data that means that the node has sent us + * a pre-emptive shutdown message. We process this so node is seen down sooner + * @param l member to consider + * @return true if node forced down + */ + public boolean processOptomisticShutdown(Entry l){ + GossipDataMessage m = findPerNodeGossipData(l.getKey().getId(), ShutdownMessage.PER_NODE_KEY); + if (m == null){ + return false; + } + ShutdownMessage s = (ShutdownMessage) m.getPayload(); + if (s.getShutdownAtNanos() > l.getKey().getHeartbeat()){ + if (l.getValue() == GossipState.UP){ + members.put(l.getKey(), GossipState.DOWN); + listener.gossipEvent(l.getKey(), GossipState.DOWN); + } else { + members.put(l.getKey(), GossipState.DOWN); + } + return true; + } + return false; + } + private void readSavedRingState() { for (LocalGossipMember l : ringState.readFromDisk()){ LocalGossipMember member = new LocalGossipMember(l.getClusterName(), @@ -226,7 +254,7 @@ public abstract class GossipManager { activeGossipThread.shutdown(); } try { - boolean result = gossipThreadExecutor.awaitTermination(1000, TimeUnit.MILLISECONDS); + boolean result = gossipThreadExecutor.awaitTermination(10, TimeUnit.MILLISECONDS); if (!result) { LOGGER.error("executor shutdown timed out"); } @@ -298,5 +326,9 @@ public abstract class GossipManager { public UserDataPersister getUserDataState() { return userDataState; } + + public Clock getClock() { + return clock; + } } diff --git a/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java b/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java index ebda513..47b8a8f 100644 --- a/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java +++ b/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java @@ -47,9 +47,13 @@ abstract public class PassiveGossipThread implements Runnable { private final String cluster; - private final ObjectMapper MAPPER = new ObjectMapper(); + private final static ObjectMapper MAPPER = new ObjectMapper(); private final GossipCore gossipCore; + + { + MAPPER.enableDefaultTyping(); + } public PassiveGossipThread(GossipManager gossipManager, GossipCore gossipCore) { this.gossipCore = gossipCore; diff --git a/src/main/java/org/apache/gossip/manager/SimpleActiveGossipper.java b/src/main/java/org/apache/gossip/manager/SimpleActiveGossipper.java index 43237fb..839d796 100644 --- a/src/main/java/org/apache/gossip/manager/SimpleActiveGossipper.java +++ b/src/main/java/org/apache/gossip/manager/SimpleActiveGossipper.java @@ -18,7 +18,6 @@ package org.apache.gossip.manager; import java.util.List; -import java.util.Random; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executors; @@ -39,7 +38,6 @@ public class SimpleActiveGossipper extends AbstractActiveGossiper { private ScheduledExecutorService scheduledExecutorService; private final BlockingQueue workQueue; private ThreadPoolExecutor threadService; - private final Random random; public SimpleActiveGossipper(GossipManager gossipManager, GossipCore gossipCore, MetricRegistry registry) { @@ -48,7 +46,6 @@ public class SimpleActiveGossipper extends AbstractActiveGossiper { workQueue = new ArrayBlockingQueue(1024); threadService = new ThreadPoolExecutor(1, 30, 1, TimeUnit.SECONDS, workQueue, new ThreadPoolExecutor.DiscardOldestPolicy()); - random = new Random(); } @Override @@ -71,7 +68,7 @@ public class SimpleActiveGossipper extends AbstractActiveGossiper { selectPartner(gossipManager.getLiveMembers())), 0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS); } - + @Override public void shutdown() { super.shutdown(); @@ -81,6 +78,13 @@ public class SimpleActiveGossipper extends AbstractActiveGossiper { } catch (InterruptedException e) { LOGGER.debug("Issue during shutdown", e); } + sendShutdownMessage(); + threadService.shutdown(); + try { + threadService.awaitTermination(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOGGER.debug("Issue during shutdown", e); + } } protected void sendToALiveMember(){ @@ -94,18 +98,13 @@ public class SimpleActiveGossipper extends AbstractActiveGossiper { } /** - * - * @param memberList - * The list of members which are stored in the local list of members. - * @return The chosen LocalGossipMember to gossip with. + * sends an optimistic shutdown message to several clusters nodes */ - protected LocalGossipMember selectPartner(List memberList) { - //TODO this selection is racey what if the list size changes? - LocalGossipMember member = null; - if (memberList.size() > 0) { - int randomNeighborIndex = random.nextInt(memberList.size()); - member = memberList.get(randomNeighborIndex); + protected void sendShutdownMessage(){ + List l = gossipManager.getLiveMembers(); + int sendTo = l.size() < 3 ? 1 : l.size() / 2; + for (int i = 0; i < sendTo; i++) { + threadService.execute(() -> sendShutdownMessage(gossipManager.getMyself(), selectPartner(l))); } - return member; } } diff --git a/src/main/java/org/apache/gossip/model/ShutdownMessage.java b/src/main/java/org/apache/gossip/model/ShutdownMessage.java new file mode 100644 index 0000000..4bca508 --- /dev/null +++ b/src/main/java/org/apache/gossip/model/ShutdownMessage.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.model; + +public class ShutdownMessage extends Message { + + public static final String PER_NODE_KEY = "gossipcore.shutdowmessage"; + private long shutdownAtNanos; + private String nodeId; + + public ShutdownMessage(){ + + } + + public String getNodeId() { + return nodeId; + } + + public void setNodeId(String nodeId) { + this.nodeId = nodeId; + } + + public long getShutdownAtNanos() { + return shutdownAtNanos; + } + + public void setShutdownAtNanos(long shutdownAtNanos) { + this.shutdownAtNanos = shutdownAtNanos; + } + + @Override + public String toString() { + return "ShutdownMessage [shutdownAtNanos=" + shutdownAtNanos + ", nodeId=" + nodeId + "]"; + } + +} diff --git a/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java b/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java index 2386084..6a0765b 100644 --- a/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java +++ b/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java @@ -47,7 +47,7 @@ public class ShutdownDeadtimeTest { @Test public void DeadNodesDoNotComeAliveAgain() throws InterruptedException, UnknownHostException, URISyntaxException { - GossipSettings settings = new GossipSettings(1000, 10000, 1000, 1, 2.0, "exponential"); + GossipSettings settings = new GossipSettings(100, 10000, 1000, 1, 2.0, "exponential"); settings.setPersistRingState(false); settings.setPersistDataState(false); String cluster = UUID.randomUUID().toString(); @@ -75,7 +75,6 @@ public class ShutdownDeadtimeTest { return total; } }).afterWaitingAtMost(40, TimeUnit.SECONDS).isEqualTo(20); - // shutdown one client and verify that one client is lost. Random r = new Random(); int randomClientId = r.nextInt(clusterMembers); @@ -124,7 +123,12 @@ public class ShutdownDeadtimeTest { }).afterWaitingAtMost(60, TimeUnit.SECONDS).isEqualTo(20); for (int i = 0; i < clusterMembers; ++i) { - clients.get(i).shutdown(); + final int j = i; + new Thread() { + public void run(){ + clients.get(j).shutdown(); + } + }.start(); } } } diff --git a/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java b/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java index aa797f5..8a9a9ab 100644 --- a/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java +++ b/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java @@ -47,7 +47,7 @@ public class TenNodeThreeSeedTest { } public void abc(int base) throws InterruptedException, UnknownHostException, URISyntaxException{ - GossipSettings settings = new GossipSettings(); + GossipSettings settings = new GossipSettings(1000, 10000, 1000, 1, 2.0, "exponential"); settings.setPersistRingState(false); settings.setPersistDataState(false); String cluster = UUID.randomUUID().toString(); @@ -76,7 +76,12 @@ public class TenNodeThreeSeedTest { }}).afterWaitingAtMost(40, TimeUnit.SECONDS).isEqualTo(20); for (int i = 0; i < clusterMembers; ++i) { - clients.get(i).shutdown(); + int j = i; + new Thread(){ + public void run(){ + clients.get(j).shutdown(); + } + }.start(); } } }