GOSSIP-48 user shutdown message

This commit is contained in:
Edward Capriolo
2017-02-06 22:09:55 -05:00
parent 32c082a0cb
commit 296b55fa9f
10 changed files with 187 additions and 36 deletions

View File

@ -18,6 +18,8 @@
package org.apache.gossip.manager; package org.apache.gossip.manager;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.List;
import java.util.Random;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import com.codahale.metrics.Histogram; import com.codahale.metrics.Histogram;
@ -28,6 +30,7 @@ import org.apache.gossip.model.GossipDataMessage;
import org.apache.gossip.model.GossipMember; import org.apache.gossip.model.GossipMember;
import org.apache.gossip.model.Response; import org.apache.gossip.model.Response;
import org.apache.gossip.model.SharedGossipDataMessage; import org.apache.gossip.model.SharedGossipDataMessage;
import org.apache.gossip.model.ShutdownMessage;
import org.apache.gossip.udp.UdpActiveGossipMessage; import org.apache.gossip.udp.UdpActiveGossipMessage;
import org.apache.gossip.udp.UdpGossipDataMessage; import org.apache.gossip.udp.UdpGossipDataMessage;
import org.apache.gossip.udp.UdpSharedGossipDataMessage; import org.apache.gossip.udp.UdpSharedGossipDataMessage;
@ -47,6 +50,7 @@ public abstract class AbstractActiveGossiper {
private final Histogram sharedDataHistogram; private final Histogram sharedDataHistogram;
private final Histogram sendPerNodeDataHistogram; private final Histogram sendPerNodeDataHistogram;
private final Histogram sendMembershipHistorgram; private final Histogram sendMembershipHistorgram;
private final Random random;
public AbstractActiveGossiper(GossipManager gossipManager, GossipCore gossipCore, MetricRegistry registry) { public AbstractActiveGossiper(GossipManager gossipManager, GossipCore gossipCore, MetricRegistry registry) {
this.gossipManager = gossipManager; this.gossipManager = gossipManager;
@ -54,6 +58,7 @@ public abstract class AbstractActiveGossiper {
sharedDataHistogram = registry.histogram(name(AbstractActiveGossiper.class, "sharedDataHistogram-time")); sharedDataHistogram = registry.histogram(name(AbstractActiveGossiper.class, "sharedDataHistogram-time"));
sendPerNodeDataHistogram = registry.histogram(name(AbstractActiveGossiper.class, "sendPerNodeDataHistogram-time")); sendPerNodeDataHistogram = registry.histogram(name(AbstractActiveGossiper.class, "sendPerNodeDataHistogram-time"));
sendMembershipHistorgram = registry.histogram(name(AbstractActiveGossiper.class, "sendMembershipHistorgram-time")); sendMembershipHistorgram = registry.histogram(name(AbstractActiveGossiper.class, "sendMembershipHistorgram-time"));
random = new Random();
} }
public void init() { public void init() {
@ -64,6 +69,16 @@ public abstract class AbstractActiveGossiper {
} }
public final void sendShutdownMessage(LocalGossipMember me, LocalGossipMember target){
if (target == null){
return;
}
ShutdownMessage m = new ShutdownMessage();
m.setNodeId(me.getId());
m.setShutdownAtNanos(gossipManager.getClock().nanoTime());
gossipCore.sendOneWay(m, target.getUri());
}
public final void sendSharedData(LocalGossipMember me, LocalGossipMember member){ public final void sendSharedData(LocalGossipMember me, LocalGossipMember member){
if (member == null){ if (member == null){
return; return;
@ -138,4 +153,19 @@ public abstract class AbstractActiveGossiper {
gm.setProperties(member.getProperties()); gm.setProperties(member.getProperties());
return gm; return gm;
} }
/**
*
* @param memberList
* An immutable list
* @return The chosen LocalGossipMember to gossip with.
*/
protected LocalGossipMember selectPartner(List<LocalGossipMember> memberList) {
LocalGossipMember member = null;
if (memberList.size() > 0) {
int randomNeighborIndex = random.nextInt(memberList.size());
member = memberList.get(randomNeighborIndex);
}
return member;
}
} }

View File

@ -77,7 +77,7 @@ public class DataReaper {
public void close(){ public void close(){
scheduledExecutor.shutdown(); scheduledExecutor.shutdown();
try { try {
scheduledExecutor.awaitTermination(5, TimeUnit.SECONDS); scheduledExecutor.awaitTermination(1, TimeUnit.SECONDS);
} catch (InterruptedException e) { } catch (InterruptedException e) {
} }

View File

@ -18,7 +18,6 @@
package org.apache.gossip.manager; package org.apache.gossip.manager;
import java.util.List; import java.util.List;
import java.util.Random;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ArrayBlockingQueue;
@ -52,7 +51,6 @@ public class DatacenterRackAwareActiveGossiper extends AbstractActiveGossiper {
private ScheduledExecutorService scheduledExecutorService; private ScheduledExecutorService scheduledExecutorService;
private final BlockingQueue<Runnable> workQueue; private final BlockingQueue<Runnable> workQueue;
private ThreadPoolExecutor threadService; private ThreadPoolExecutor threadService;
private final Random random;
public DatacenterRackAwareActiveGossiper(GossipManager gossipManager, GossipCore gossipCore, public DatacenterRackAwareActiveGossiper(GossipManager gossipManager, GossipCore gossipCore,
MetricRegistry registry) { MetricRegistry registry) {
@ -61,7 +59,6 @@ public class DatacenterRackAwareActiveGossiper extends AbstractActiveGossiper {
workQueue = new ArrayBlockingQueue<Runnable>(1024); workQueue = new ArrayBlockingQueue<Runnable>(1024);
threadService = new ThreadPoolExecutor(1, 30, 1, TimeUnit.SECONDS, workQueue, threadService = new ThreadPoolExecutor(1, 30, 1, TimeUnit.SECONDS, workQueue,
new ThreadPoolExecutor.DiscardOldestPolicy()); new ThreadPoolExecutor.DiscardOldestPolicy());
random = new Random();
try { try {
sameRackGossipIntervalMs = Integer.parseInt(gossipManager.getSettings() sameRackGossipIntervalMs = Integer.parseInt(gossipManager.getSettings()
.getActiveGossipProperties().get("sameRackGossipIntervalMs")); .getActiveGossipProperties().get("sameRackGossipIntervalMs"));
@ -216,19 +213,32 @@ public class DatacenterRackAwareActiveGossiper extends AbstractActiveGossiper {
sendSharedData(gossipManager.getMyself(), selectPartner(sameDatacenterDifferentRack())); sendSharedData(gossipManager.getMyself(), selectPartner(sameDatacenterDifferentRack()));
} }
@Override @Override
public void shutdown() { public void shutdown() {
super.shutdown(); super.shutdown();
scheduledExecutorService.shutdown();
try {
scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
LOGGER.debug("Issue during shutdown", e);
}
sendShutdownMessage();
threadService.shutdown();
try {
threadService.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
LOGGER.debug("Issue during shutdown", e);
}
} }
protected LocalGossipMember selectPartner(List<LocalGossipMember> memberList) { /**
LocalGossipMember member = null; * sends an optimistic shutdown message to several clusters nodes
if (memberList.size() > 0) { */
int randomNeighborIndex = random.nextInt(memberList.size()); protected void sendShutdownMessage(){
member = memberList.get(randomNeighborIndex); List<LocalGossipMember> l = gossipManager.getLiveMembers();
int sendTo = l.size() < 3 ? 1 : l.size() / 3;
for (int i = 0; i < sendTo; i++) {
threadService.execute(() -> sendShutdownMessage(gossipManager.getMyself(), selectPartner(l)));
} }
return member;
} }
} }

View File

@ -44,6 +44,7 @@ import org.apache.gossip.model.Base;
import org.apache.gossip.model.GossipDataMessage; import org.apache.gossip.model.GossipDataMessage;
import org.apache.gossip.model.Response; import org.apache.gossip.model.Response;
import org.apache.gossip.model.SharedGossipDataMessage; import org.apache.gossip.model.SharedGossipDataMessage;
import org.apache.gossip.model.ShutdownMessage;
import org.apache.gossip.udp.Trackable; import org.apache.gossip.udp.Trackable;
import org.apache.gossip.udp.UdpActiveGossipMessage; import org.apache.gossip.udp.UdpActiveGossipMessage;
import org.apache.gossip.udp.UdpActiveGossipOk; import org.apache.gossip.udp.UdpActiveGossipOk;
@ -72,6 +73,10 @@ public class GossipCore implements GossipCoreConstants {
private final Meter tranmissionException; private final Meter tranmissionException;
private final Meter tranmissionSuccess; private final Meter tranmissionSuccess;
{
MAPPER.enableDefaultTyping();
}
public GossipCore(GossipManager manager, MetricRegistry metrics){ public GossipCore(GossipManager manager, MetricRegistry metrics){
this.gossipManager = manager; this.gossipManager = manager;
requests = new ConcurrentHashMap<>(); requests = new ConcurrentHashMap<>();
@ -128,10 +133,11 @@ public class GossipCore implements GossipCoreConstants {
public void shutdown(){ public void shutdown(){
service.shutdown(); service.shutdown();
try { try {
service.awaitTermination(5, TimeUnit.SECONDS); service.awaitTermination(1, TimeUnit.SECONDS);
} catch (InterruptedException e) { } catch (InterruptedException e) {
LOGGER.warn(e); LOGGER.warn(e);
} }
service.shutdownNow();
} }
public void receive(Base base){ public void receive(Base base){
@ -141,6 +147,16 @@ public class GossipCore implements GossipCoreConstants {
requests.put(t.getUuid() + "/" + t.getUriFrom(), (Base) t); requests.put(t.getUuid() + "/" + t.getUriFrom(), (Base) t);
} }
} }
if (base instanceof ShutdownMessage){
ShutdownMessage s = (ShutdownMessage) base;
GossipDataMessage m = new GossipDataMessage();
m.setKey(ShutdownMessage.PER_NODE_KEY);
m.setNodeId(s.getNodeId());
m.setPayload(base);
m.setTimestamp(System.currentTimeMillis());
m.setExpireAt(System.currentTimeMillis() + 30L * 1000L);
addPerNodeData(m);
}
if (base instanceof GossipDataMessage) { if (base instanceof GossipDataMessage) {
UdpGossipDataMessage message = (UdpGossipDataMessage) base; UdpGossipDataMessage message = (UdpGossipDataMessage) base;
addPerNodeData(message); addPerNodeData(message);

View File

@ -47,6 +47,7 @@ import org.apache.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread;
import org.apache.gossip.model.GossipDataMessage; import org.apache.gossip.model.GossipDataMessage;
import org.apache.gossip.model.SharedGossipDataMessage; import org.apache.gossip.model.SharedGossipDataMessage;
import org.apache.gossip.model.ShutdownMessage;
public abstract class GossipManager { public abstract class GossipManager {
@ -159,6 +160,9 @@ public abstract class GossipManager {
scheduledServiced.scheduleAtFixedRate(() -> { scheduledServiced.scheduleAtFixedRate(() -> {
try { try {
for (Entry<LocalGossipMember, GossipState> entry : members.entrySet()) { for (Entry<LocalGossipMember, GossipState> entry : members.entrySet()) {
boolean userDown = processOptomisticShutdown(entry);
if (userDown)
continue;
Double result = null; Double result = null;
try { try {
result = entry.getKey().detect(clock.nanoTime()); result = entry.getKey().detect(clock.nanoTime());
@ -190,6 +194,30 @@ public abstract class GossipManager {
LOGGER.debug("The GossipManager is started."); LOGGER.debug("The GossipManager is started.");
} }
/**
* If we have a special key the per-node data that means that the node has sent us
* a pre-emptive shutdown message. We process this so node is seen down sooner
* @param l member to consider
* @return true if node forced down
*/
public boolean processOptomisticShutdown(Entry<LocalGossipMember, GossipState> l){
GossipDataMessage m = findPerNodeGossipData(l.getKey().getId(), ShutdownMessage.PER_NODE_KEY);
if (m == null){
return false;
}
ShutdownMessage s = (ShutdownMessage) m.getPayload();
if (s.getShutdownAtNanos() > l.getKey().getHeartbeat()){
if (l.getValue() == GossipState.UP){
members.put(l.getKey(), GossipState.DOWN);
listener.gossipEvent(l.getKey(), GossipState.DOWN);
} else {
members.put(l.getKey(), GossipState.DOWN);
}
return true;
}
return false;
}
private void readSavedRingState() { private void readSavedRingState() {
for (LocalGossipMember l : ringState.readFromDisk()){ for (LocalGossipMember l : ringState.readFromDisk()){
LocalGossipMember member = new LocalGossipMember(l.getClusterName(), LocalGossipMember member = new LocalGossipMember(l.getClusterName(),
@ -226,7 +254,7 @@ public abstract class GossipManager {
activeGossipThread.shutdown(); activeGossipThread.shutdown();
} }
try { try {
boolean result = gossipThreadExecutor.awaitTermination(1000, TimeUnit.MILLISECONDS); boolean result = gossipThreadExecutor.awaitTermination(10, TimeUnit.MILLISECONDS);
if (!result) { if (!result) {
LOGGER.error("executor shutdown timed out"); LOGGER.error("executor shutdown timed out");
} }
@ -299,4 +327,8 @@ public abstract class GossipManager {
return userDataState; return userDataState;
} }
public Clock getClock() {
return clock;
}
} }

View File

@ -47,10 +47,14 @@ abstract public class PassiveGossipThread implements Runnable {
private final String cluster; private final String cluster;
private final ObjectMapper MAPPER = new ObjectMapper(); private final static ObjectMapper MAPPER = new ObjectMapper();
private final GossipCore gossipCore; private final GossipCore gossipCore;
{
MAPPER.enableDefaultTyping();
}
public PassiveGossipThread(GossipManager gossipManager, GossipCore gossipCore) { public PassiveGossipThread(GossipManager gossipManager, GossipCore gossipCore) {
this.gossipCore = gossipCore; this.gossipCore = gossipCore;
try { try {

View File

@ -18,7 +18,6 @@
package org.apache.gossip.manager; package org.apache.gossip.manager;
import java.util.List; import java.util.List;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
@ -39,7 +38,6 @@ public class SimpleActiveGossipper extends AbstractActiveGossiper {
private ScheduledExecutorService scheduledExecutorService; private ScheduledExecutorService scheduledExecutorService;
private final BlockingQueue<Runnable> workQueue; private final BlockingQueue<Runnable> workQueue;
private ThreadPoolExecutor threadService; private ThreadPoolExecutor threadService;
private final Random random;
public SimpleActiveGossipper(GossipManager gossipManager, GossipCore gossipCore, public SimpleActiveGossipper(GossipManager gossipManager, GossipCore gossipCore,
MetricRegistry registry) { MetricRegistry registry) {
@ -48,7 +46,6 @@ public class SimpleActiveGossipper extends AbstractActiveGossiper {
workQueue = new ArrayBlockingQueue<Runnable>(1024); workQueue = new ArrayBlockingQueue<Runnable>(1024);
threadService = new ThreadPoolExecutor(1, 30, 1, TimeUnit.SECONDS, workQueue, threadService = new ThreadPoolExecutor(1, 30, 1, TimeUnit.SECONDS, workQueue,
new ThreadPoolExecutor.DiscardOldestPolicy()); new ThreadPoolExecutor.DiscardOldestPolicy());
random = new Random();
} }
@Override @Override
@ -81,6 +78,13 @@ public class SimpleActiveGossipper extends AbstractActiveGossiper {
} catch (InterruptedException e) { } catch (InterruptedException e) {
LOGGER.debug("Issue during shutdown", e); LOGGER.debug("Issue during shutdown", e);
} }
sendShutdownMessage();
threadService.shutdown();
try {
threadService.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
LOGGER.debug("Issue during shutdown", e);
}
} }
protected void sendToALiveMember(){ protected void sendToALiveMember(){
@ -94,18 +98,13 @@ public class SimpleActiveGossipper extends AbstractActiveGossiper {
} }
/** /**
* * sends an optimistic shutdown message to several clusters nodes
* @param memberList
* The list of members which are stored in the local list of members.
* @return The chosen LocalGossipMember to gossip with.
*/ */
protected LocalGossipMember selectPartner(List<LocalGossipMember> memberList) { protected void sendShutdownMessage(){
//TODO this selection is racey what if the list size changes? List<LocalGossipMember> l = gossipManager.getLiveMembers();
LocalGossipMember member = null; int sendTo = l.size() < 3 ? 1 : l.size() / 2;
if (memberList.size() > 0) { for (int i = 0; i < sendTo; i++) {
int randomNeighborIndex = random.nextInt(memberList.size()); threadService.execute(() -> sendShutdownMessage(gossipManager.getMyself(), selectPartner(l)));
member = memberList.get(randomNeighborIndex);
} }
return member;
} }
} }

View File

@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.model;
public class ShutdownMessage extends Message {
public static final String PER_NODE_KEY = "gossipcore.shutdowmessage";
private long shutdownAtNanos;
private String nodeId;
public ShutdownMessage(){
}
public String getNodeId() {
return nodeId;
}
public void setNodeId(String nodeId) {
this.nodeId = nodeId;
}
public long getShutdownAtNanos() {
return shutdownAtNanos;
}
public void setShutdownAtNanos(long shutdownAtNanos) {
this.shutdownAtNanos = shutdownAtNanos;
}
@Override
public String toString() {
return "ShutdownMessage [shutdownAtNanos=" + shutdownAtNanos + ", nodeId=" + nodeId + "]";
}
}

View File

@ -47,7 +47,7 @@ public class ShutdownDeadtimeTest {
@Test @Test
public void DeadNodesDoNotComeAliveAgain() public void DeadNodesDoNotComeAliveAgain()
throws InterruptedException, UnknownHostException, URISyntaxException { throws InterruptedException, UnknownHostException, URISyntaxException {
GossipSettings settings = new GossipSettings(1000, 10000, 1000, 1, 2.0, "exponential"); GossipSettings settings = new GossipSettings(100, 10000, 1000, 1, 2.0, "exponential");
settings.setPersistRingState(false); settings.setPersistRingState(false);
settings.setPersistDataState(false); settings.setPersistDataState(false);
String cluster = UUID.randomUUID().toString(); String cluster = UUID.randomUUID().toString();
@ -75,7 +75,6 @@ public class ShutdownDeadtimeTest {
return total; return total;
} }
}).afterWaitingAtMost(40, TimeUnit.SECONDS).isEqualTo(20); }).afterWaitingAtMost(40, TimeUnit.SECONDS).isEqualTo(20);
// shutdown one client and verify that one client is lost. // shutdown one client and verify that one client is lost.
Random r = new Random(); Random r = new Random();
int randomClientId = r.nextInt(clusterMembers); int randomClientId = r.nextInt(clusterMembers);
@ -124,7 +123,12 @@ public class ShutdownDeadtimeTest {
}).afterWaitingAtMost(60, TimeUnit.SECONDS).isEqualTo(20); }).afterWaitingAtMost(60, TimeUnit.SECONDS).isEqualTo(20);
for (int i = 0; i < clusterMembers; ++i) { for (int i = 0; i < clusterMembers; ++i) {
clients.get(i).shutdown(); final int j = i;
new Thread() {
public void run(){
clients.get(j).shutdown();
}
}.start();
} }
} }
} }

View File

@ -47,7 +47,7 @@ public class TenNodeThreeSeedTest {
} }
public void abc(int base) throws InterruptedException, UnknownHostException, URISyntaxException{ public void abc(int base) throws InterruptedException, UnknownHostException, URISyntaxException{
GossipSettings settings = new GossipSettings(); GossipSettings settings = new GossipSettings(1000, 10000, 1000, 1, 2.0, "exponential");
settings.setPersistRingState(false); settings.setPersistRingState(false);
settings.setPersistDataState(false); settings.setPersistDataState(false);
String cluster = UUID.randomUUID().toString(); String cluster = UUID.randomUUID().toString();
@ -76,7 +76,12 @@ public class TenNodeThreeSeedTest {
}}).afterWaitingAtMost(40, TimeUnit.SECONDS).isEqualTo(20); }}).afterWaitingAtMost(40, TimeUnit.SECONDS).isEqualTo(20);
for (int i = 0; i < clusterMembers; ++i) { for (int i = 0; i < clusterMembers; ++i) {
clients.get(i).shutdown(); int j = i;
new Thread(){
public void run(){
clients.get(j).shutdown();
}
}.start();
} }
} }
} }