GOSSIP-49 Refactor Failure detector Lambda into named class

This commit is contained in:
Maxim Rusak
2017-03-16 18:06:49 +03:00
parent 4eafd58ecc
commit 0136dd9395
5 changed files with 150 additions and 88 deletions

View File

@ -19,9 +19,9 @@ package org.apache.gossip.manager;
import com.codahale.metrics.MetricRegistry;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.gossip.Member;
import org.apache.gossip.GossipSettings;
import org.apache.gossip.LocalMember;
import org.apache.gossip.Member;
import org.apache.gossip.crdt.Crdt;
import org.apache.gossip.event.GossipListener;
import org.apache.gossip.event.GossipState;
@ -29,7 +29,6 @@ import org.apache.gossip.manager.handlers.MessageInvoker;
import org.apache.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread;
import org.apache.gossip.model.PerNodeDataMessage;
import org.apache.gossip.model.SharedDataMessage;
import org.apache.gossip.model.ShutdownMessage;
import org.apache.log4j.Logger;
import java.lang.reflect.Constructor;
@ -63,6 +62,7 @@ public abstract class GossipManager {
private final MetricRegistry registry;
private final RingStatePersister ringState;
private final UserDataPersister userDataState;
private final GossipMemberStateRefresher memberStateRefresher;
private final ObjectMapper objectMapper;
private final MessageInvoker messageInvoker;
@ -73,7 +73,8 @@ public abstract class GossipManager {
ObjectMapper objectMapper, MessageInvoker messageInvoker) {
this.settings = settings;
this.messageInvoker = messageInvoker;
clock = new SystemClock();
clock = new SystemClock();
me = new LocalMember(cluster, uri, id, clock.nanoTime(), properties,
settings.getWindowSize(), settings.getMinimumSamples(), settings.getDistribution());
gossipCore = new GossipCore(this, registry);
@ -83,7 +84,7 @@ public abstract class GossipManager {
if (!startupMember.equals(me)) {
LocalMember member = new LocalMember(startupMember.getClusterName(),
startupMember.getUri(), startupMember.getId(),
clock.nanoTime(), startupMember.getProperties(), settings.getWindowSize(),
clock.nanoTime(), startupMember.getProperties(), settings.getWindowSize(),
settings.getMinimumSamples(), settings.getDistribution());
//TODO should members start in down state?
members.put(member, GossipState.DOWN);
@ -96,6 +97,7 @@ public abstract class GossipManager {
this.registry = registry;
this.ringState = new RingStatePersister(this);
this.userDataState = new UserDataPersister(this, this.gossipCore);
this.memberStateRefresher = new GossipMemberStateRefresher(members, settings, listener, this::findPerNodeGossipData);
this.objectMapper = objectMapper;
readSavedRingState();
readSavedDataState();
@ -119,21 +121,21 @@ public abstract class GossipManager {
public List<LocalMember> getDeadMembers() {
return Collections.unmodifiableList(
members.entrySet()
.stream()
.filter(entry -> GossipState.DOWN.equals(entry.getValue()))
.map(Entry::getKey).collect(Collectors.toList()));
.stream()
.filter(entry -> GossipState.DOWN.equals(entry.getValue()))
.map(Entry::getKey).collect(Collectors.toList()));
}
/**
*
*
* @return a read only list of members found in the UP state
*/
public List<LocalMember> getLiveMembers() {
return Collections.unmodifiableList(
members.entrySet()
.stream()
.filter(entry -> GossipState.UP.equals(entry.getValue()))
.map(Entry::getKey).collect(Collectors.toList()));
.stream()
.filter(entry -> GossipState.UP.equals(entry.getValue()))
.map(Entry::getKey).collect(Collectors.toList()));
}
public LocalMember getMyself() {
@ -148,7 +150,7 @@ public abstract class GossipManager {
throw new RuntimeException(e);
}
}
/**
* Starts the client. Specifically, start the various cycles for this protocol. Start the gossip
* thread and start the receiver thread.
@ -161,77 +163,20 @@ public abstract class GossipManager {
dataReaper.init();
scheduledServiced.scheduleAtFixedRate(ringState, 60, 60, TimeUnit.SECONDS);
scheduledServiced.scheduleAtFixedRate(userDataState, 60, 60, TimeUnit.SECONDS);
scheduledServiced.scheduleAtFixedRate(() -> {
try {
for (Entry<LocalMember, GossipState> entry : members.entrySet()) {
boolean userDown = processOptomisticShutdown(entry);
if (userDown)
continue;
Double result = null;
try {
result = entry.getKey().detect(clock.nanoTime());
if (result != null) {
if (result > settings.getConvictThreshold() && entry.getValue() == GossipState.UP) {
members.put(entry.getKey(), GossipState.DOWN);
listener.gossipEvent(entry.getKey(), GossipState.DOWN);
}
if (result <= settings.getConvictThreshold() && entry.getValue() == GossipState.DOWN) {
members.put(entry.getKey(), GossipState.UP);
listener.gossipEvent(entry.getKey(), GossipState.UP);
}
}
} catch (IllegalArgumentException ex) {
//0.0 returns throws exception computing the mean.
long now = clock.nanoTime();
long nowInMillis = TimeUnit.MILLISECONDS.convert(now,TimeUnit.NANOSECONDS);
if (nowInMillis - settings.getCleanupInterval() > entry.getKey().getHeartbeat() && entry.getValue() == GossipState.UP){
LOGGER.warn("Marking down");
members.put(entry.getKey(), GossipState.DOWN);
listener.gossipEvent(entry.getKey(), GossipState.DOWN);
}
} //end catch
} // end for
} catch (RuntimeException ex) {
LOGGER.warn("scheduled state had exception", ex);
}
}, 0, 100, TimeUnit.MILLISECONDS);
scheduledServiced.scheduleAtFixedRate(memberStateRefresher, 0, 100, TimeUnit.MILLISECONDS);
LOGGER.debug("The GossipManager is started.");
}
/**
* If we have a special key the per-node data that means that the node has sent us
* a pre-emptive shutdown message. We process this so node is seen down sooner
* @param l member to consider
* @return true if node forced down
*/
public boolean processOptomisticShutdown(Entry<LocalMember, GossipState> l){
PerNodeDataMessage m = findPerNodeGossipData(l.getKey().getId(), ShutdownMessage.PER_NODE_KEY);
if (m == null){
return false;
}
ShutdownMessage s = (ShutdownMessage) m.getPayload();
if (s.getShutdownAtNanos() > l.getKey().getHeartbeat()){
if (l.getValue() == GossipState.UP){
members.put(l.getKey(), GossipState.DOWN);
listener.gossipEvent(l.getKey(), GossipState.DOWN);
} else {
members.put(l.getKey(), GossipState.DOWN);
}
return true;
}
return false;
}
private void readSavedRingState() {
for (LocalMember l : ringState.readFromDisk()){
LocalMember member = new LocalMember(l.getClusterName(),
l.getUri(), l.getId(),
clock.nanoTime(), l.getProperties(), settings.getWindowSize(),
clock.nanoTime(), l.getProperties(), settings.getWindowSize(),
settings.getMinimumSamples(), settings.getDistribution());
members.putIfAbsent(member, GossipState.DOWN);
}
}
private void readSavedDataState() {
for (Entry<String, ConcurrentHashMap<String, PerNodeDataMessage>> l : userDataState.readPerNodeFromDisk().entrySet()){
for (Entry<String, PerNodeDataMessage> j : l.getValue().entrySet()){
@ -274,7 +219,7 @@ public abstract class GossipManager {
}
scheduledServiced.shutdownNow();
}
public void gossipPerNodeData(PerNodeDataMessage message){
Objects.nonNull(message.getKey());
Objects.nonNull(message.getTimestamp());
@ -282,7 +227,7 @@ public abstract class GossipManager {
message.setNodeId(me.getId());
gossipCore.addPerNodeData(message);
}
public void gossipSharedData(SharedDataMessage message){
Objects.nonNull(message.getKey());
Objects.nonNull(message.getTimestamp());
@ -290,7 +235,7 @@ public abstract class GossipManager {
message.setNodeId(me.getId());
gossipCore.addSharedData(message);
}
@SuppressWarnings("rawtypes")
public Crdt findCrdt(String key){
@ -304,7 +249,7 @@ public abstract class GossipManager {
return (Crdt) l.getPayload();
}
}
@SuppressWarnings("rawtypes")
public Crdt merge(SharedDataMessage message){
Objects.nonNull(message.getKey());
@ -316,7 +261,7 @@ public abstract class GossipManager {
}
return gossipCore.merge(message);
}
public PerNodeDataMessage findPerNodeGossipData(String nodeId, String key){
ConcurrentHashMap<String, PerNodeDataMessage> j = gossipCore.getPerNodeData().get(nodeId);
if (j == null){
@ -332,7 +277,7 @@ public abstract class GossipManager {
return l;
}
}
public SharedDataMessage findSharedGossipData(String key){
SharedDataMessage l = gossipCore.getSharedData().get(key);
if (l == null){
@ -352,11 +297,15 @@ public abstract class GossipManager {
public RingStatePersister getRingState() {
return ringState;
}
public UserDataPersister getUserDataState() {
return userDataState;
}
public GossipMemberStateRefresher getMemberStateRefresher() {
return memberStateRefresher;
}
public Clock getClock() {
return clock;
}
@ -368,5 +317,5 @@ public abstract class GossipManager {
public MetricRegistry getRegistry() {
return registry;
}
}

View File

@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.manager;
import org.apache.gossip.GossipSettings;
import org.apache.gossip.LocalMember;
import org.apache.gossip.event.GossipListener;
import org.apache.gossip.event.GossipState;
import org.apache.gossip.model.PerNodeDataMessage;
import org.apache.gossip.model.ShutdownMessage;
import org.apache.log4j.Logger;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
public class GossipMemberStateRefresher implements Runnable {
public static final Logger LOGGER = Logger.getLogger(GossipMemberStateRefresher.class);
private final Map<LocalMember, GossipState> members;
private final GossipSettings settings;
private final GossipListener listener;
private final Clock clock;
private final BiFunction<String, String, PerNodeDataMessage> findPerNodeGossipData;
public GossipMemberStateRefresher(Map<LocalMember, GossipState> members, GossipSettings settings,
GossipListener listener, BiFunction<String, String, PerNodeDataMessage> findPerNodeGossipData) {
this.members = members;
this.settings = settings;
this.listener = listener;
this.findPerNodeGossipData = findPerNodeGossipData;
clock = new SystemClock();
}
public void run() {
try {
runOnce();
} catch (RuntimeException ex) {
LOGGER.warn("scheduled state had exception", ex);
}
}
public void runOnce() {
for (Entry<LocalMember, GossipState> entry : members.entrySet()) {
boolean userDown = processOptimisticShutdown(entry);
if (userDown)
continue;
try {
Double phiMeasure = entry.getKey().detect(clock.nanoTime());
if (phiMeasure != null) {
GossipState requiredState = calcRequiredState(phiMeasure);
if (entry.getValue() != requiredState) {
members.put(entry.getKey(), requiredState);
listener.gossipEvent(entry.getKey(), requiredState);
}
}
} catch (IllegalArgumentException ex) {
//0.0 returns throws exception computing the mean.
long now = clock.nanoTime();
long nowInMillis = TimeUnit.MILLISECONDS.convert(now, TimeUnit.NANOSECONDS);
if (nowInMillis - settings.getCleanupInterval() > entry.getKey().getHeartbeat() && entry.getValue() == GossipState.UP) {
LOGGER.warn("Marking down");
members.put(entry.getKey(), GossipState.DOWN);
listener.gossipEvent(entry.getKey(), GossipState.DOWN);
}
} //end catch
} // end for
}
public GossipState calcRequiredState(Double phiMeasure) {
if (phiMeasure > settings.getConvictThreshold())
return GossipState.DOWN;
else
return GossipState.UP;
}
/**
* If we have a special key the per-node data that means that the node has sent us
* a pre-emptive shutdown message. We process this so node is seen down sooner
*
* @param l member to consider
* @return true if node forced down
*/
public boolean processOptimisticShutdown(Entry<LocalMember, GossipState> l) {
PerNodeDataMessage m = findPerNodeGossipData.apply(l.getKey().getId(), ShutdownMessage.PER_NODE_KEY);
if (m == null) {
return false;
}
ShutdownMessage s = (ShutdownMessage) m.getPayload();
if (s.getShutdownAtNanos() > l.getKey().getHeartbeat()) {
members.put(l.getKey(), GossipState.DOWN);
if (l.getValue() == GossipState.UP) {
listener.gossipEvent(l.getKey(), GossipState.DOWN);
}
return true;
}
return false;
}
}

View File

@ -85,6 +85,7 @@ abstract public class PassiveGossipThread implements Runnable {
gossipCore.receive(activeGossipMessage);
unsigned.mark();
}
gossipManager.getMemberStateRefresher().run();
} catch (RuntimeException ex) {//TODO trap json exception
LOGGER.error("Unable to process message", ex);
}

View File

@ -24,6 +24,7 @@ import org.apache.gossip.model.Base;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Predicate;
public class MessageInvokerCombiner implements MessageInvoker {
private final List<MessageInvoker> invokers = new CopyOnWriteArrayList<>();
@ -32,14 +33,7 @@ public class MessageInvokerCombiner implements MessageInvoker {
}
public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
if (invokers == null) {
return false;
}
boolean result = false;
for (MessageInvoker mi : invokers) {
result = mi.invoke(gossipCore, gossipManager, base) || result;
}
return result;
return invokers.stream().filter((mi) -> mi.invoke(gossipCore, gossipManager, base)).count() > 0;
}
public void clear() {

View File

@ -81,6 +81,7 @@ public class ShutdownDeadtimeTest {
return total;
}
}).afterWaitingAtMost(40, TimeUnit.SECONDS).isEqualTo(20);
// shutdown one client and verify that one client is lost.
Random r = new Random();
int randomClientId = r.nextInt(clusterMembers);