GOSSIP-39 User Defined Active Gossip (review by Dorian Ellerbe)

This commit is contained in:
Edward Capriolo
2017-01-22 21:32:18 -05:00
parent b2af449074
commit 3f1882fbcf
27 changed files with 740 additions and 353 deletions

View File

@ -0,0 +1,141 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.manager;
import java.util.Map.Entry;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.MetricRegistry;
import org.apache.gossip.LocalGossipMember;
import org.apache.gossip.model.ActiveGossipOk;
import org.apache.gossip.model.GossipDataMessage;
import org.apache.gossip.model.GossipMember;
import org.apache.gossip.model.Response;
import org.apache.gossip.model.SharedGossipDataMessage;
import org.apache.gossip.udp.UdpActiveGossipMessage;
import org.apache.gossip.udp.UdpGossipDataMessage;
import org.apache.gossip.udp.UdpSharedGossipDataMessage;
import org.apache.log4j.Logger;
import static com.codahale.metrics.MetricRegistry.name;
/**
* The ActiveGossipThread is sends information. Pick a random partner and send the membership list to that partner
*/
public abstract class AbstractActiveGossiper {
protected static final Logger LOGGER = Logger.getLogger(AbstractActiveGossiper.class);
protected final GossipManager gossipManager;
protected final GossipCore gossipCore;
private final Histogram sharedDataHistogram;
private final Histogram sendPerNodeDataHistogram;
private final Histogram sendMembershipHistorgram;
public AbstractActiveGossiper(GossipManager gossipManager, GossipCore gossipCore, MetricRegistry registry) {
this.gossipManager = gossipManager;
this.gossipCore = gossipCore;
sharedDataHistogram = registry.histogram(name(AbstractActiveGossiper.class, "sharedDataHistogram-time"));
sendPerNodeDataHistogram = registry.histogram(name(AbstractActiveGossiper.class, "sendPerNodeDataHistogram-time"));
sendMembershipHistorgram = registry.histogram(name(AbstractActiveGossiper.class, "sendMembershipHistorgram-time"));
}
public void init() {
}
public void shutdown() {
}
public final void sendSharedData(LocalGossipMember me, LocalGossipMember member){
if (member == null){
return;
}
long startTime = System.currentTimeMillis();
for (Entry<String, SharedGossipDataMessage> innerEntry : gossipCore.getSharedData().entrySet()){
UdpSharedGossipDataMessage message = new UdpSharedGossipDataMessage();
message.setUuid(UUID.randomUUID().toString());
message.setUriFrom(me.getId());
message.setExpireAt(innerEntry.getValue().getExpireAt());
message.setKey(innerEntry.getValue().getKey());
message.setNodeId(innerEntry.getValue().getNodeId());
message.setTimestamp(innerEntry.getValue().getTimestamp());
message.setPayload(innerEntry.getValue().getPayload());
gossipCore.sendOneWay(message, member.getUri());
}
sharedDataHistogram.update(System.currentTimeMillis() - startTime);
}
public final void sendPerNodeData(LocalGossipMember me, LocalGossipMember member){
if (member == null){
return;
}
long startTime = System.currentTimeMillis();
for (Entry<String, ConcurrentHashMap<String, GossipDataMessage>> entry : gossipCore.getPerNodeData().entrySet()){
for (Entry<String, GossipDataMessage> innerEntry : entry.getValue().entrySet()){
UdpGossipDataMessage message = new UdpGossipDataMessage();
message.setUuid(UUID.randomUUID().toString());
message.setUriFrom(me.getId());
message.setExpireAt(innerEntry.getValue().getExpireAt());
message.setKey(innerEntry.getValue().getKey());
message.setNodeId(innerEntry.getValue().getNodeId());
message.setTimestamp(innerEntry.getValue().getTimestamp());
message.setPayload(innerEntry.getValue().getPayload());
gossipCore.sendOneWay(message, member.getUri());
}
}
sendPerNodeDataHistogram.update(System.currentTimeMillis() - startTime);
}
/**
* Performs the sending of the membership list, after we have incremented our own heartbeat.
*/
protected void sendMembershipList(LocalGossipMember me, LocalGossipMember member) {
if (member == null){
return;
}
long startTime = System.currentTimeMillis();
me.setHeartbeat(System.nanoTime());
UdpActiveGossipMessage message = new UdpActiveGossipMessage();
message.setUriFrom(gossipManager.getMyself().getUri().toASCIIString());
message.setUuid(UUID.randomUUID().toString());
message.getMembers().add(convert(me));
for (LocalGossipMember other : gossipManager.getMembers().keySet()) {
message.getMembers().add(convert(other));
}
Response r = gossipCore.send(message, member.getUri());
if (r instanceof ActiveGossipOk){
//maybe count metrics here
} else {
LOGGER.debug("Message " + message + " generated response " + r);
}
sendMembershipHistorgram.update(System.currentTimeMillis() - startTime);
}
protected final GossipMember convert(LocalGossipMember member){
GossipMember gm = new GossipMember();
gm.setCluster(member.getClusterName());
gm.setHeartbeat(member.getHeartbeat());
gm.setUri(member.getUri().toASCIIString());
gm.setId(member.getId());
gm.setProperties(member.getProperties());
return gm;
}
}

View File

@ -1,218 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.manager;
import java.util.List;
import java.util.Map.Entry;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.MetricRegistry;
import org.apache.gossip.LocalGossipMember;
import org.apache.gossip.model.ActiveGossipOk;
import org.apache.gossip.model.GossipDataMessage;
import org.apache.gossip.model.GossipMember;
import org.apache.gossip.model.Response;
import org.apache.gossip.model.SharedGossipDataMessage;
import org.apache.gossip.udp.UdpActiveGossipMessage;
import org.apache.gossip.udp.UdpGossipDataMessage;
import org.apache.gossip.udp.UdpSharedGossipDataMessage;
import org.apache.log4j.Logger;
import static com.codahale.metrics.MetricRegistry.name;
/**
* The ActiveGossipThread is sends information. Pick a random partner and send the membership list to that partner
*/
public class ActiveGossipThread {
private static final Logger LOGGER = Logger.getLogger(ActiveGossipThread.class);
private final GossipManager gossipManager;
private final Random random;
private final GossipCore gossipCore;
private ScheduledExecutorService scheduledExecutorService;
private final BlockingQueue<Runnable> workQueue;
private ThreadPoolExecutor threadService;
private final Histogram sharedDataHistogram;
private final Histogram sendPerNodeDataHistogram;
private final Histogram sendMembershipHistorgram;
public ActiveGossipThread(GossipManager gossipManager, GossipCore gossipCore, MetricRegistry registry) {
this.gossipManager = gossipManager;
random = new Random();
this.gossipCore = gossipCore;
scheduledExecutorService = Executors.newScheduledThreadPool(2);
workQueue = new ArrayBlockingQueue<Runnable>(1024);
threadService = new ThreadPoolExecutor(1, 30, 1, TimeUnit.SECONDS, workQueue, new ThreadPoolExecutor.DiscardOldestPolicy());
sharedDataHistogram = registry.histogram(name(ActiveGossipThread.class, "sharedDataHistogram-time"));
sendPerNodeDataHistogram = registry.histogram(name(ActiveGossipThread.class, "sendPerNodeDataHistogram-time"));
sendMembershipHistorgram = registry.histogram(name(ActiveGossipThread.class, "sendMembershipHistorgram-time"));
}
public void init() {
scheduledExecutorService.scheduleAtFixedRate(
() -> {
threadService.execute( () -> { sendToALiveMember(); });
}, 0,
gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
scheduledExecutorService.scheduleAtFixedRate(
() -> { this.sendToDeadMember(); }, 0,
gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
scheduledExecutorService.scheduleAtFixedRate(
() -> sendPerNodeData(gossipManager.getMyself(), gossipManager.getLiveMembers()), 0,
gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
scheduledExecutorService.scheduleAtFixedRate(
() -> sendSharedData(gossipManager.getMyself(), gossipManager.getLiveMembers()), 0,
gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
}
public void shutdown() {
scheduledExecutorService.shutdown();
try {
scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
LOGGER.debug("Issue during shurdown" + e);
}
}
public void sendSharedData(LocalGossipMember me, List<LocalGossipMember> memberList){
long startTime = System.currentTimeMillis();
LocalGossipMember member = selectPartner(memberList);
if (member == null) {
LOGGER.debug("Send sendMembershipList() is called without action");
sharedDataHistogram.update(System.currentTimeMillis() - startTime);
return;
}
for (Entry<String, SharedGossipDataMessage> innerEntry : gossipCore.getSharedData().entrySet()){
UdpSharedGossipDataMessage message = new UdpSharedGossipDataMessage();
message.setUuid(UUID.randomUUID().toString());
message.setUriFrom(me.getId());
message.setExpireAt(innerEntry.getValue().getExpireAt());
message.setKey(innerEntry.getValue().getKey());
message.setNodeId(innerEntry.getValue().getNodeId());
message.setTimestamp(innerEntry.getValue().getTimestamp());
message.setPayload(innerEntry.getValue().getPayload());
gossipCore.sendOneWay(message, member.getUri());
}
sharedDataHistogram.update(System.currentTimeMillis() - startTime);
}
public void sendPerNodeData(LocalGossipMember me, List<LocalGossipMember> memberList){
long startTime = System.currentTimeMillis();
LocalGossipMember member = selectPartner(memberList);
if (member == null) {
LOGGER.debug("Send sendMembershipList() is called without action");
sendPerNodeDataHistogram.update(System.currentTimeMillis() - startTime);
return;
}
for (Entry<String, ConcurrentHashMap<String, GossipDataMessage>> entry : gossipCore.getPerNodeData().entrySet()){
for (Entry<String, GossipDataMessage> innerEntry : entry.getValue().entrySet()){
UdpGossipDataMessage message = new UdpGossipDataMessage();
message.setUuid(UUID.randomUUID().toString());
message.setUriFrom(me.getId());
message.setExpireAt(innerEntry.getValue().getExpireAt());
message.setKey(innerEntry.getValue().getKey());
message.setNodeId(innerEntry.getValue().getNodeId());
message.setTimestamp(innerEntry.getValue().getTimestamp());
message.setPayload(innerEntry.getValue().getPayload());
gossipCore.sendOneWay(message, member.getUri());
}
}
sendPerNodeDataHistogram.update(System.currentTimeMillis() - startTime);
}
protected void sendToALiveMember(){
LocalGossipMember member = selectPartner(gossipManager.getLiveMembers());
System.out.println("send" );
sendMembershipList(gossipManager.getMyself(), member);
}
protected void sendToDeadMember(){
LocalGossipMember member = selectPartner(gossipManager.getDeadMembers());
sendMembershipList(gossipManager.getMyself(), member);
}
/**
* Performs the sending of the membership list, after we have incremented our own heartbeat.
*/
protected void sendMembershipList(LocalGossipMember me, LocalGossipMember member) {
long startTime = System.currentTimeMillis();
me.setHeartbeat(System.nanoTime());
if (member == null) {
LOGGER.debug("Send sendMembershipList() is called without action");
sendMembershipHistorgram.update(System.currentTimeMillis() - startTime);
return;
} else {
LOGGER.debug("Send sendMembershipList() is called to " + member.toString());
}
UdpActiveGossipMessage message = new UdpActiveGossipMessage();
message.setUriFrom(gossipManager.getMyself().getUri().toASCIIString());
message.setUuid(UUID.randomUUID().toString());
message.getMembers().add(convert(me));
for (LocalGossipMember other : gossipManager.getMembers().keySet()) {
message.getMembers().add(convert(other));
}
Response r = gossipCore.send(message, member.getUri());
if (r instanceof ActiveGossipOk){
//maybe count metrics here
} else {
LOGGER.debug("Message " + message + " generated response " + r);
}
sendMembershipHistorgram.update(System.currentTimeMillis() - startTime);
}
/**
*
* @param memberList
* The list of members which are stored in the local list of members.
* @return The chosen LocalGossipMember to gossip with.
*/
protected LocalGossipMember selectPartner(List<LocalGossipMember> memberList) {
LocalGossipMember member = null;
if (memberList.size() > 0) {
int randomNeighborIndex = random.nextInt(memberList.size());
member = memberList.get(randomNeighborIndex);
} else {
LOGGER.debug("I am alone in this world.");
}
return member;
}
private GossipMember convert(LocalGossipMember member){
GossipMember gm = new GossipMember();
gm.setCluster(member.getClusterName());
gm.setHeartbeat(member.getHeartbeat());
gm.setUri(member.getUri().toASCIIString());
gm.setId(member.getId());
return gm;
}
}

View File

@ -0,0 +1,233 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.manager;
import java.util.List;
import java.util.Random;
import java.util.ArrayList;
import java.util.Collections;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.gossip.LocalGossipMember;
import com.codahale.metrics.MetricRegistry;
/**
* Sends gossip traffic at different rates to other racks and data-centers.
* This implementation controls the rate at which gossip traffic is shared.
* There are two constructs Datacenter and Rack. It is assumed that bandwidth and latency is higher
* in the rack than in the the datacenter. We can adjust the rate at which we send messages to each group.
*
*/
public class DatacenterRackAwareActiveGossiper extends AbstractActiveGossiper {
public static final String DATACENTER = "datacenter";
public static final String RACK = "rack";
private int sameRackGossipIntervalMs = 100;
private int sameDcGossipIntervalMs = 500;
private int differentDatacenterGossipIntervalMs = 1000;
private int randomDeadMemberSendIntervalMs = 250;
private ScheduledExecutorService scheduledExecutorService;
private final BlockingQueue<Runnable> workQueue;
private ThreadPoolExecutor threadService;
private final Random random;
public DatacenterRackAwareActiveGossiper(GossipManager gossipManager, GossipCore gossipCore,
MetricRegistry registry) {
super(gossipManager, gossipCore, registry);
scheduledExecutorService = Executors.newScheduledThreadPool(2);
workQueue = new ArrayBlockingQueue<Runnable>(1024);
threadService = new ThreadPoolExecutor(1, 30, 1, TimeUnit.SECONDS, workQueue,
new ThreadPoolExecutor.DiscardOldestPolicy());
random = new Random();
try {
sameRackGossipIntervalMs = Integer.parseInt(gossipManager.getSettings()
.getActiveGossipProperties().get("sameRackGossipIntervalMs"));
} catch (RuntimeException ex) { }
try {
sameDcGossipIntervalMs = Integer.parseInt(gossipManager.getSettings()
.getActiveGossipProperties().get("sameDcGossipIntervalMs"));
} catch (RuntimeException ex) { }
try {
differentDatacenterGossipIntervalMs = Integer.parseInt(gossipManager.getSettings()
.getActiveGossipProperties().get("differentDatacenterGossipIntervalMs"));
} catch (RuntimeException ex) { }
try {
randomDeadMemberSendIntervalMs = Integer.parseInt(gossipManager.getSettings()
.getActiveGossipProperties().get("randomDeadMemberSendIntervalMs"));
} catch (RuntimeException ex) { }
}
@Override
public void init() {
super.init();
//same rack
scheduledExecutorService.scheduleAtFixedRate(() ->
threadService.execute(() -> sendToSameRackMember()),
0, sameRackGossipIntervalMs, TimeUnit.MILLISECONDS);
scheduledExecutorService.scheduleAtFixedRate(() ->
threadService.execute(() -> sendToSameRackMemberPerNode()),
0, sameRackGossipIntervalMs, TimeUnit.MILLISECONDS);
scheduledExecutorService.scheduleAtFixedRate(() ->
threadService.execute(() -> sendToSameRackShared()),
0, sameRackGossipIntervalMs, TimeUnit.MILLISECONDS);
//same dc different rack
scheduledExecutorService.scheduleAtFixedRate(() ->
threadService.execute(() -> sameDcDiffernetRackMember()),
0, sameDcGossipIntervalMs, TimeUnit.MILLISECONDS);
scheduledExecutorService.scheduleAtFixedRate(() ->
threadService.execute(() -> sameDcDiffernetRackPerNode()),
0, sameDcGossipIntervalMs, TimeUnit.MILLISECONDS);
scheduledExecutorService.scheduleAtFixedRate(() ->
threadService.execute(() -> sameDcDiffernetRackShared()),
0, sameDcGossipIntervalMs, TimeUnit.MILLISECONDS);
//different dc
scheduledExecutorService.scheduleAtFixedRate(() ->
threadService.execute(() -> differentDcMember()),
0, differentDatacenterGossipIntervalMs, TimeUnit.MILLISECONDS);
scheduledExecutorService.scheduleAtFixedRate(() ->
threadService.execute(() -> differentDcPerNode()),
0, differentDatacenterGossipIntervalMs, TimeUnit.MILLISECONDS);
scheduledExecutorService.scheduleAtFixedRate(() ->
threadService.execute(() -> differentDcShared()),
0, differentDatacenterGossipIntervalMs, TimeUnit.MILLISECONDS);
//the dead
scheduledExecutorService.scheduleAtFixedRate(() ->
threadService.execute(() -> sendToDeadMember()),
0, randomDeadMemberSendIntervalMs, TimeUnit.MILLISECONDS);
}
private void sendToDeadMember() {
sendMembershipList(gossipManager.getMyself(), selectPartner(gossipManager.getDeadMembers()));
}
private List<LocalGossipMember> differentDataCenter(){
String myDc = gossipManager.getMyself().getProperties().get(DATACENTER);
String rack = gossipManager.getMyself().getProperties().get(RACK);
if (myDc == null|| rack == null){
return Collections.emptyList();
}
List<LocalGossipMember> notMyDc = new ArrayList<LocalGossipMember>(10);
for (LocalGossipMember i : gossipManager.getLiveMembers()){
if (!myDc.equals(i.getProperties().get(DATACENTER))){
notMyDc.add(i);
}
}
return notMyDc;
}
private List<LocalGossipMember> sameDatacenterDifferentRack(){
String myDc = gossipManager.getMyself().getProperties().get(DATACENTER);
String rack = gossipManager.getMyself().getProperties().get(RACK);
if (myDc == null|| rack == null){
return Collections.emptyList();
}
List<LocalGossipMember> notMyDc = new ArrayList<LocalGossipMember>(10);
for (LocalGossipMember i : gossipManager.getLiveMembers()){
if (myDc.equals(i.getProperties().get(DATACENTER)) && !rack.equals(i.getProperties().get(RACK))){
notMyDc.add(i);
}
}
return notMyDc;
}
private List<LocalGossipMember> sameRackNodes(){
String myDc = gossipManager.getMyself().getProperties().get(DATACENTER);
String rack = gossipManager.getMyself().getProperties().get(RACK);
if (myDc == null|| rack == null){
return Collections.emptyList();
}
List<LocalGossipMember> sameDcAndRack = new ArrayList<LocalGossipMember>(10);
for (LocalGossipMember i : gossipManager.getLiveMembers()){
if (myDc.equals(i.getProperties().get(DATACENTER))
&& rack.equals(i.getProperties().get(RACK))){
sameDcAndRack.add(i);
}
}
return sameDcAndRack;
}
private void sendToSameRackMember() {
sendMembershipList(gossipManager.getMyself(), selectPartner(sameRackNodes()));
}
private void sendToSameRackMemberPerNode() {
sendPerNodeData(gossipManager.getMyself(), selectPartner(sameRackNodes()));
}
private void sendToSameRackShared() {
sendSharedData(gossipManager.getMyself(), selectPartner(sameRackNodes()));
}
private void differentDcMember() {
sendMembershipList(gossipManager.getMyself(), selectPartner(differentDataCenter()));
}
private void differentDcPerNode() {
sendPerNodeData(gossipManager.getMyself(), selectPartner(differentDataCenter()));
}
private void differentDcShared() {
sendSharedData(gossipManager.getMyself(), selectPartner(differentDataCenter()));
}
private void sameDcDiffernetRackMember() {
sendMembershipList(gossipManager.getMyself(), selectPartner(sameDatacenterDifferentRack()));
}
private void sameDcDiffernetRackPerNode() {
sendPerNodeData(gossipManager.getMyself(), selectPartner(sameDatacenterDifferentRack()));
}
private void sameDcDiffernetRackShared() {
sendSharedData(gossipManager.getMyself(), selectPartner(sameDatacenterDifferentRack()));
}
@Override
public void shutdown() {
super.shutdown();
}
protected LocalGossipMember selectPartner(List<LocalGossipMember> memberList) {
LocalGossipMember member = null;
if (memberList.size() > 0) {
int randomNeighborIndex = random.nextInt(memberList.size());
member = memberList.get(randomNeighborIndex);
}
return member;
}
}

View File

@ -165,7 +165,8 @@ public class GossipCore implements GossipCoreConstants {
activeGossipMessage.getMembers().get(i).getCluster(),
u,
activeGossipMessage.getMembers().get(i).getId(),
activeGossipMessage.getMembers().get(i).getHeartbeat());
activeGossipMessage.getMembers().get(i).getHeartbeat(),
activeGossipMessage.getMembers().get(i).getProperties());
if (i == 0) {
senderMember = member;
}
@ -321,6 +322,7 @@ public class GossipCore implements GossipCoreConstants {
remoteMember.getUri(),
remoteMember.getId(),
remoteMember.getHeartbeat(),
remoteMember.getProperties(),
gossipManager.getSettings().getWindowSize(),
gossipManager.getSettings().getMinimumSamples(),
gossipManager.getSettings().getDistribution());
@ -331,6 +333,7 @@ public class GossipCore implements GossipCoreConstants {
if (localMember.getKey().getId().equals(remoteMember.getId())){
localMember.getKey().recordHeartbeat(remoteMember.getHeartbeat());
localMember.getKey().setHeartbeat(remoteMember.getHeartbeat());
localMember.getKey().setProperties(remoteMember.getProperties());
}
}
}

View File

@ -18,9 +18,13 @@
package org.apache.gossip.manager;
import com.codahale.metrics.MetricRegistry;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.net.URI;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
@ -59,7 +63,7 @@ public abstract class GossipManager {
private final GossipListener listener;
private ActiveGossipThread activeGossipThread;
private AbstractActiveGossiper activeGossipThread;
private PassiveGossipThread passiveGossipThread;
@ -76,21 +80,22 @@ public abstract class GossipManager {
private MetricRegistry registry;
public GossipManager(String cluster,
URI uri, String id, GossipSettings settings,
URI uri, String id, Map<String,String> properties, GossipSettings settings,
List<GossipMember> gossipMembers, GossipListener listener, MetricRegistry registry) {
this.settings = settings;
gossipCore = new GossipCore(this, registry);
clock = new SystemClock();
dataReaper = new DataReaper(gossipCore, clock);
me = new LocalGossipMember(cluster, uri, id, clock.nanoTime(),
me = new LocalGossipMember(cluster, uri, id, clock.nanoTime(), properties,
settings.getWindowSize(), settings.getMinimumSamples(), settings.getDistribution());
members = new ConcurrentSkipListMap<>();
for (GossipMember startupMember : gossipMembers) {
if (!startupMember.equals(me)) {
LocalGossipMember member = new LocalGossipMember(startupMember.getClusterName(),
startupMember.getUri(), startupMember.getId(),
clock.nanoTime(), settings.getWindowSize(), settings.getMinimumSamples(), settings.getDistribution());
clock.nanoTime(), startupMember.getProperties(), settings.getWindowSize(),
settings.getMinimumSamples(), settings.getDistribution());
//TODO should members start in down state?
members.put(member, GossipState.DOWN);
}
@ -137,6 +142,14 @@ public abstract class GossipManager {
return me;
}
private AbstractActiveGossiper constructActiveGossiper(){
try {
Constructor<?> c = Class.forName(settings.getActiveGossipClass()).getConstructor(GossipManager.class, GossipCore.class, MetricRegistry.class);
return (AbstractActiveGossiper) c.newInstance(this, gossipCore, registry);
} catch (NoSuchMethodException | SecurityException | ClassNotFoundException | InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
throw new RuntimeException(e);
}
}
/**
* Starts the client. Specifically, start the various cycles for this protocol. Start the gossip
* thread and start the receiver thread.
@ -144,7 +157,7 @@ public abstract class GossipManager {
public void init() {
passiveGossipThread = new OnlyProcessReceivedPassiveGossipThread(this, gossipCore);
gossipThreadExecutor.execute(passiveGossipThread);
activeGossipThread = new ActiveGossipThread(this, this.gossipCore, registry);
activeGossipThread = constructActiveGossiper();
activeGossipThread.init();
dataReaper.init();
scheduledServiced.scheduleAtFixedRate(() -> {

View File

@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.manager;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.gossip.LocalGossipMember;
import com.codahale.metrics.MetricRegistry;
/**
* Base implementation gossips randomly to live nodes periodically gossips to dead ones
*
*/
public class SimpleActiveGossipper extends AbstractActiveGossiper {
private ScheduledExecutorService scheduledExecutorService;
private final BlockingQueue<Runnable> workQueue;
private ThreadPoolExecutor threadService;
private final Random random;
public SimpleActiveGossipper(GossipManager gossipManager, GossipCore gossipCore,
MetricRegistry registry) {
super(gossipManager, gossipCore, registry);
scheduledExecutorService = Executors.newScheduledThreadPool(2);
workQueue = new ArrayBlockingQueue<Runnable>(1024);
threadService = new ThreadPoolExecutor(1, 30, 1, TimeUnit.SECONDS, workQueue,
new ThreadPoolExecutor.DiscardOldestPolicy());
random = new Random();
}
@Override
public void init() {
super.init();
scheduledExecutorService.scheduleAtFixedRate(() -> {
threadService.execute(() -> {
sendToALiveMember();
});
}, 0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
scheduledExecutorService.scheduleAtFixedRate(() -> {
sendToDeadMember();
}, 0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
scheduledExecutorService.scheduleAtFixedRate(
() -> sendPerNodeData(gossipManager.getMyself(),
selectPartner(gossipManager.getLiveMembers())),
0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
scheduledExecutorService.scheduleAtFixedRate(
() -> sendSharedData(gossipManager.getMyself(),
selectPartner(gossipManager.getLiveMembers())),
0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
}
@Override
public void shutdown() {
super.shutdown();
scheduledExecutorService.shutdown();
try {
scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
LOGGER.debug("Issue during shutdown", e);
}
}
protected void sendToALiveMember(){
LocalGossipMember member = selectPartner(gossipManager.getLiveMembers());
sendMembershipList(gossipManager.getMyself(), member);
}
protected void sendToDeadMember(){
LocalGossipMember member = selectPartner(gossipManager.getDeadMembers());
sendMembershipList(gossipManager.getMyself(), member);
}
/**
*
* @param memberList
* The list of members which are stored in the local list of members.
* @return The chosen LocalGossipMember to gossip with.
*/
protected LocalGossipMember selectPartner(List<LocalGossipMember> memberList) {
//TODO this selection is racey what if the list size changes?
LocalGossipMember member = null;
if (memberList.size() > 0) {
int randomNeighborIndex = random.nextInt(memberList.size());
member = memberList.get(randomNeighborIndex);
}
return member;
}
}

View File

@ -25,8 +25,9 @@ import org.apache.gossip.manager.GossipManager;
import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.ArrayList;
import java.util.HashMap;
public class RandomGossipManager extends GossipManager {
@ -42,6 +43,7 @@ public class RandomGossipManager extends GossipManager {
private List<GossipMember> gossipMembers;
private GossipListener listener;
private MetricRegistry registry;
private Map<String,String> properties;
private ManagerBuilder() {}
@ -55,6 +57,11 @@ public class RandomGossipManager extends GossipManager {
this.cluster = cluster;
return this;
}
public ManagerBuilder properties(Map<String,String> properties) {
this.properties = properties;
return this;
}
public ManagerBuilder withId(String id) {
this.id = id;
@ -75,6 +82,7 @@ public class RandomGossipManager extends GossipManager {
this.listener = listener;
return this;
}
public ManagerBuilder registry(MetricRegistry registry) {
this.registry = registry;
return this;
@ -91,18 +99,21 @@ public class RandomGossipManager extends GossipManager {
checkArgument(settings != null, "You must specify gossip settings");
checkArgument(uri != null, "You must specify a uri");
checkArgument(registry != null, "You must specify a MetricRegistry");
if (properties == null){
properties = new HashMap<String,String>();
}
if (listener == null){
listener((a,b) -> {});
}
if (gossipMembers == null) {
gossipMembers = new ArrayList<>();
}
return new RandomGossipManager(cluster, uri, id, settings, gossipMembers, listener, registry);
return new RandomGossipManager(cluster, uri, id, properties, settings, gossipMembers, listener, registry);
}
}
private RandomGossipManager(String cluster, URI uri, String id, GossipSettings settings,
private RandomGossipManager(String cluster, URI uri, String id, Map<String,String> properties, GossipSettings settings,
List<GossipMember> gossipMembers, GossipListener listener, MetricRegistry registry) {
super(cluster, uri, id, settings, gossipMembers, listener, registry);
super(cluster, uri, id, properties, settings, gossipMembers, listener, registry);
}
}