diff --git a/src/main/java/org/apache/gossip/GossipMember.java b/src/main/java/org/apache/gossip/GossipMember.java index a2f416e..f2834bd 100644 --- a/src/main/java/org/apache/gossip/GossipMember.java +++ b/src/main/java/org/apache/gossip/GossipMember.java @@ -19,6 +19,7 @@ package org.apache.gossip; import java.net.InetSocketAddress; import java.net.URI; +import java.util.Map; /** * A abstract class representing a gossip member. @@ -40,6 +41,9 @@ public abstract class GossipMember implements Comparable { */ protected String id; + /* properties provided at startup time */ + protected Map properties; + /** * Constructor. * @@ -52,11 +56,12 @@ public abstract class GossipMember implements Comparable { * @param id * An id that may be replaced after contact */ - public GossipMember(String clusterName, URI uri, String id, long heartbeat) { + public GossipMember(String clusterName, URI uri, String id, long heartbeat, Map properties) { this.clusterName = clusterName; this.id = id; this.heartbeat = heartbeat; this.uri = uri; + this.properties = properties; } /** @@ -104,6 +109,14 @@ public abstract class GossipMember implements Comparable { this.id = _id; } + public Map getProperties() { + return properties; + } + + public void setProperties(Map properties) { + this.properties = properties; + } + public String toString() { return "Member [address=" + getAddress() + ", id=" + id + ", heartbeat=" + heartbeat + "]"; } diff --git a/src/main/java/org/apache/gossip/GossipRunner.java b/src/main/java/org/apache/gossip/GossipRunner.java deleted file mode 100644 index c8a1f13..0000000 --- a/src/main/java/org/apache/gossip/GossipRunner.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gossip; - -import java.io.File; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.net.URISyntaxException; - -public class GossipRunner { - - public static void main(String[] args) throws URISyntaxException { - File configFile; - if (args.length == 1) { - configFile = new File("./" + args[0]); - } else { - configFile = new File("gossip.conf"); - } - new GossipRunner(configFile); - } - - public GossipRunner(File configFile) throws URISyntaxException { - if (configFile != null && configFile.exists()) { - try { - System.out.println("Parsing the configuration file..."); - StartupSettings _settings = StartupSettings.fromJSONFile(configFile); - GossipService gossipService = new GossipService(_settings); - System.out.println("Gossip service successfully initialized, let's start it..."); - gossipService.start(); - } catch (FileNotFoundException e) { - System.err.println("The given file is not found!"); - } catch (IOException e) { - System.err.println("Could not read the configuration file: " + e.getMessage()); - } catch (InterruptedException e) { - System.err.println("Error while starting the gossip service: " + e.getMessage()); - } - } else { - System.out - .println("The gossip.conf file is not found.\n\nEither specify the path to the startup settings file or place the gossip.json file in the same folder as the JAR file."); - } - } -} diff --git a/src/main/java/org/apache/gossip/GossipService.java b/src/main/java/org/apache/gossip/GossipService.java index fca9f28..f32eb35 100644 --- a/src/main/java/org/apache/gossip/GossipService.java +++ b/src/main/java/org/apache/gossip/GossipService.java @@ -20,7 +20,9 @@ package org.apache.gossip; import com.codahale.metrics.MetricRegistry; import java.net.URI; import java.net.UnknownHostException; +import java.util.HashMap; import java.util.List; +import java.util.Map; import com.codahale.metrics.JmxReporter; import org.apache.gossip.event.GossipListener; @@ -50,7 +52,7 @@ public class GossipService { public GossipService(StartupSettings startupSettings) throws InterruptedException, UnknownHostException { this(startupSettings.getCluster(), startupSettings.getUri() - , startupSettings.getId(), startupSettings.getGossipMembers(), + , startupSettings.getId(), new HashMap (),startupSettings.getGossipMembers(), startupSettings.getGossipSettings(), null, new MetricRegistry()); } @@ -60,7 +62,7 @@ public class GossipService { * @throws InterruptedException * @throws UnknownHostException */ - public GossipService(String cluster, URI uri, String id, + public GossipService(String cluster, URI uri, String id, Map properties, List gossipMembers, GossipSettings settings, GossipListener listener, MetricRegistry registry) throws InterruptedException, UnknownHostException { jmxReporter = JmxReporter.forRegistry(registry).build(); @@ -73,6 +75,7 @@ public class GossipService { .gossipMembers(gossipMembers) .listener(listener) .registry(registry) + .properties(properties) .build(); } diff --git a/src/main/java/org/apache/gossip/GossipSettings.java b/src/main/java/org/apache/gossip/GossipSettings.java index 36fabb6..1fed914 100644 --- a/src/main/java/org/apache/gossip/GossipSettings.java +++ b/src/main/java/org/apache/gossip/GossipSettings.java @@ -17,6 +17,9 @@ */ package org.apache.gossip; +import java.util.HashMap; +import java.util.Map; + /** * In this object the settings used by the GossipService are held. * @@ -41,6 +44,10 @@ public class GossipSettings { private String distribution = "exponential"; + private String activeGossipClass = "org.apache.gossip.manager.SimpleActiveGossipper"; + + private Map activeGossipProperties = new HashMap<>(); + /** * Construct GossipSettings with default settings. */ @@ -139,5 +146,21 @@ public class GossipSettings { public void setDistribution(String distribution) { this.distribution = distribution; } + + public String getActiveGossipClass() { + return activeGossipClass; + } + + public void setActiveGossipClass(String activeGossipClass) { + this.activeGossipClass = activeGossipClass; + } + + public Map getActiveGossipProperties() { + return activeGossipProperties; + } + + public void setActiveGossipProperties(Map activeGossipProperties) { + this.activeGossipProperties = activeGossipProperties; + } } diff --git a/src/main/java/org/apache/gossip/LocalGossipMember.java b/src/main/java/org/apache/gossip/LocalGossipMember.java index 83a13df..557ffcb 100644 --- a/src/main/java/org/apache/gossip/LocalGossipMember.java +++ b/src/main/java/org/apache/gossip/LocalGossipMember.java @@ -18,6 +18,7 @@ package org.apache.gossip; import java.net.URI; +import java.util.Map; import org.apache.gossip.accrual.FailureDetector; @@ -40,8 +41,8 @@ public class LocalGossipMember extends GossipMember { * The current heartbeat */ public LocalGossipMember(String clusterName, URI uri, String id, - long heartbeat, int windowSize, int minSamples, String distribution) { - super(clusterName, uri, id, heartbeat ); + long heartbeat, Map properties, int windowSize, int minSamples, String distribution) { + super(clusterName, uri, id, heartbeat, properties ); detector = new FailureDetector(this, minSamples, windowSize, distribution); } diff --git a/src/main/java/org/apache/gossip/RemoteGossipMember.java b/src/main/java/org/apache/gossip/RemoteGossipMember.java index a9e6a76..e3f6620 100644 --- a/src/main/java/org/apache/gossip/RemoteGossipMember.java +++ b/src/main/java/org/apache/gossip/RemoteGossipMember.java @@ -18,12 +18,13 @@ package org.apache.gossip; import java.net.URI; +import java.util.HashMap; +import java.util.Map; /** * The object represents a gossip member with the properties as received from a remote gossip * member. * - * @author harmenw */ public class RemoteGossipMember extends GossipMember { @@ -35,12 +36,12 @@ public class RemoteGossipMember extends GossipMember { * @param heartbeat * The current heartbeat */ - public RemoteGossipMember(String clusterName, URI uri, String id, long heartbeat) { - super(clusterName, uri, id, heartbeat); + public RemoteGossipMember(String clusterName, URI uri, String id, long heartbeat, Map properties) { + super(clusterName, uri, id, heartbeat, properties); } public RemoteGossipMember(String clusterName, URI uri, String id) { - super(clusterName, uri, id, System.currentTimeMillis()); + super(clusterName, uri, id, System.nanoTime(), new HashMap()); } } diff --git a/src/main/java/org/apache/gossip/StartupSettings.java b/src/main/java/org/apache/gossip/StartupSettings.java index de63c66..0117be7 100644 --- a/src/main/java/org/apache/gossip/StartupSettings.java +++ b/src/main/java/org/apache/gossip/StartupSettings.java @@ -23,8 +23,11 @@ import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; +import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; +import java.util.Map.Entry; import org.apache.log4j.Logger; @@ -164,6 +167,13 @@ public class StartupSettings { JsonNode jsonObject = root.get(0); String uri = jsonObject.get("uri").textValue(); String id = jsonObject.get("id").textValue(); + Map properties = new HashMap(); + JsonNode n = jsonObject.get("properties"); + Iterator> l = n.fields(); + while (l.hasNext()){ + Entry i = l.next(); + properties.put(i.getKey(), i.getValue().asText()); + } //TODO constants as defaults? int gossipInterval = jsonObject.get("gossip_interval").intValue(); int cleanupInterval = jsonObject.get("cleanup_interval").intValue(); @@ -186,7 +196,7 @@ public class StartupSettings { JsonNode child = it.next(); URI uri3 = new URI(child.get("uri").textValue()); RemoteGossipMember member = new RemoteGossipMember(child.get("cluster").asText(), - uri3, "", 0); + uri3, "", 0, new HashMap()); settings.addGossipMember(member); configMembersDetails += member.getAddress(); configMembersDetails += ", "; diff --git a/src/main/java/org/apache/gossip/accrual/FailureDetector.java b/src/main/java/org/apache/gossip/accrual/FailureDetector.java index b6a12b6..10d66a9 100644 --- a/src/main/java/org/apache/gossip/accrual/FailureDetector.java +++ b/src/main/java/org/apache/gossip/accrual/FailureDetector.java @@ -48,6 +48,9 @@ public class FailureDetector { public void recordHeartbeat(long now){ if (now < latestHeartbeatMs) return; + if (now - latestHeartbeatMs == 0){ + return; + } synchronized (descriptiveStatistics) { if (latestHeartbeatMs != -1){ descriptiveStatistics.addValue(now - latestHeartbeatMs); @@ -77,7 +80,11 @@ public class FailureDetector { } return -1.0d * Math.log10(probability); } catch (MathException | IllegalArgumentException e) { - e.printStackTrace(); + StringBuilder sb = new StringBuilder(); + for ( double d: descriptiveStatistics.getSortedValues()){ + sb.append(d + " "); + } + LOGGER.debug(e.getMessage() +" "+ sb.toString() +" "+ descriptiveStatistics); throw new IllegalArgumentException(e); } } diff --git a/src/main/java/org/apache/gossip/examples/GossipExample.java b/src/main/java/org/apache/gossip/examples/GossipExample.java index 01cd3e3..8236d46 100644 --- a/src/main/java/org/apache/gossip/examples/GossipExample.java +++ b/src/main/java/org/apache/gossip/examples/GossipExample.java @@ -23,6 +23,7 @@ import java.net.URI; import java.net.URISyntaxException; import java.net.UnknownHostException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import org.apache.gossip.GossipMember; @@ -74,14 +75,14 @@ public class GossipExample extends Thread { } catch (URISyntaxException e) { throw new RuntimeException(e); } - startupMembers.add(new RemoteGossipMember(cluster, u, "", 0 )); + startupMembers.add(new RemoteGossipMember(cluster, u, "", 0, new HashMap())); } // Lets start the gossip clients. // Start the clients, waiting cleaning-interval + 1 second between them which will show the // dead list handling. for (GossipMember member : startupMembers) { - GossipService gossipService = new GossipService(cluster, member.getUri(), "", + GossipService gossipService = new GossipService(cluster, member.getUri(), "", new HashMap(), startupMembers, settings, null, new MetricRegistry()); clients.add(gossipService); gossipService.start(); diff --git a/src/main/java/org/apache/gossip/examples/StandAloneNode.java b/src/main/java/org/apache/gossip/examples/StandAloneNode.java index d24c0fa..3564943 100644 --- a/src/main/java/org/apache/gossip/examples/StandAloneNode.java +++ b/src/main/java/org/apache/gossip/examples/StandAloneNode.java @@ -21,6 +21,7 @@ import com.codahale.metrics.MetricRegistry; import java.net.URI; import java.net.UnknownHostException; import java.util.Arrays; +import java.util.HashMap; import org.apache.gossip.GossipService; import org.apache.gossip.GossipSettings; @@ -31,8 +32,8 @@ public class StandAloneNode { GossipSettings s = new GossipSettings(); s.setWindowSize(10); s.setConvictThreshold(1.0); - s.setGossipInterval(1000); - GossipService gossipService = new GossipService("mycluster", URI.create(args[0]), args[1], + s.setGossipInterval(10); + GossipService gossipService = new GossipService("mycluster", URI.create(args[0]), args[1], new HashMap(), Arrays.asList( new RemoteGossipMember("mycluster", URI.create(args[2]), args[3])), s, (a,b) -> {}, new MetricRegistry()); gossipService.start(); while (true){ diff --git a/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.java b/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.java new file mode 100644 index 0000000..d58aeb9 --- /dev/null +++ b/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.manager; + +import java.util.Map.Entry; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import com.codahale.metrics.Histogram; +import com.codahale.metrics.MetricRegistry; +import org.apache.gossip.LocalGossipMember; +import org.apache.gossip.model.ActiveGossipOk; +import org.apache.gossip.model.GossipDataMessage; +import org.apache.gossip.model.GossipMember; +import org.apache.gossip.model.Response; +import org.apache.gossip.model.SharedGossipDataMessage; +import org.apache.gossip.udp.UdpActiveGossipMessage; +import org.apache.gossip.udp.UdpGossipDataMessage; +import org.apache.gossip.udp.UdpSharedGossipDataMessage; +import org.apache.log4j.Logger; + +import static com.codahale.metrics.MetricRegistry.name; + +/** + * The ActiveGossipThread is sends information. Pick a random partner and send the membership list to that partner + */ +public abstract class AbstractActiveGossiper { + + protected static final Logger LOGGER = Logger.getLogger(AbstractActiveGossiper.class); + + protected final GossipManager gossipManager; + protected final GossipCore gossipCore; + private final Histogram sharedDataHistogram; + private final Histogram sendPerNodeDataHistogram; + private final Histogram sendMembershipHistorgram; + + public AbstractActiveGossiper(GossipManager gossipManager, GossipCore gossipCore, MetricRegistry registry) { + this.gossipManager = gossipManager; + this.gossipCore = gossipCore; + sharedDataHistogram = registry.histogram(name(AbstractActiveGossiper.class, "sharedDataHistogram-time")); + sendPerNodeDataHistogram = registry.histogram(name(AbstractActiveGossiper.class, "sendPerNodeDataHistogram-time")); + sendMembershipHistorgram = registry.histogram(name(AbstractActiveGossiper.class, "sendMembershipHistorgram-time")); + } + + public void init() { + + } + + public void shutdown() { + + } + + public final void sendSharedData(LocalGossipMember me, LocalGossipMember member){ + if (member == null){ + return; + } + long startTime = System.currentTimeMillis(); + for (Entry innerEntry : gossipCore.getSharedData().entrySet()){ + UdpSharedGossipDataMessage message = new UdpSharedGossipDataMessage(); + message.setUuid(UUID.randomUUID().toString()); + message.setUriFrom(me.getId()); + message.setExpireAt(innerEntry.getValue().getExpireAt()); + message.setKey(innerEntry.getValue().getKey()); + message.setNodeId(innerEntry.getValue().getNodeId()); + message.setTimestamp(innerEntry.getValue().getTimestamp()); + message.setPayload(innerEntry.getValue().getPayload()); + gossipCore.sendOneWay(message, member.getUri()); + } + sharedDataHistogram.update(System.currentTimeMillis() - startTime); + } + + public final void sendPerNodeData(LocalGossipMember me, LocalGossipMember member){ + if (member == null){ + return; + } + long startTime = System.currentTimeMillis(); + for (Entry> entry : gossipCore.getPerNodeData().entrySet()){ + for (Entry innerEntry : entry.getValue().entrySet()){ + UdpGossipDataMessage message = new UdpGossipDataMessage(); + message.setUuid(UUID.randomUUID().toString()); + message.setUriFrom(me.getId()); + message.setExpireAt(innerEntry.getValue().getExpireAt()); + message.setKey(innerEntry.getValue().getKey()); + message.setNodeId(innerEntry.getValue().getNodeId()); + message.setTimestamp(innerEntry.getValue().getTimestamp()); + message.setPayload(innerEntry.getValue().getPayload()); + gossipCore.sendOneWay(message, member.getUri()); + } + } + sendPerNodeDataHistogram.update(System.currentTimeMillis() - startTime); + } + + /** + * Performs the sending of the membership list, after we have incremented our own heartbeat. + */ + protected void sendMembershipList(LocalGossipMember me, LocalGossipMember member) { + if (member == null){ + return; + } + long startTime = System.currentTimeMillis(); + me.setHeartbeat(System.nanoTime()); + UdpActiveGossipMessage message = new UdpActiveGossipMessage(); + message.setUriFrom(gossipManager.getMyself().getUri().toASCIIString()); + message.setUuid(UUID.randomUUID().toString()); + message.getMembers().add(convert(me)); + for (LocalGossipMember other : gossipManager.getMembers().keySet()) { + message.getMembers().add(convert(other)); + } + Response r = gossipCore.send(message, member.getUri()); + if (r instanceof ActiveGossipOk){ + //maybe count metrics here + } else { + LOGGER.debug("Message " + message + " generated response " + r); + } + sendMembershipHistorgram.update(System.currentTimeMillis() - startTime); + } + + protected final GossipMember convert(LocalGossipMember member){ + GossipMember gm = new GossipMember(); + gm.setCluster(member.getClusterName()); + gm.setHeartbeat(member.getHeartbeat()); + gm.setUri(member.getUri().toASCIIString()); + gm.setId(member.getId()); + gm.setProperties(member.getProperties()); + return gm; + } +} diff --git a/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java b/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java deleted file mode 100644 index f81565b..0000000 --- a/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java +++ /dev/null @@ -1,218 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gossip.manager; - -import java.util.List; - -import java.util.Map.Entry; -import java.util.Random; -import java.util.UUID; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -import com.codahale.metrics.Histogram; -import com.codahale.metrics.MetricRegistry; -import org.apache.gossip.LocalGossipMember; -import org.apache.gossip.model.ActiveGossipOk; -import org.apache.gossip.model.GossipDataMessage; -import org.apache.gossip.model.GossipMember; -import org.apache.gossip.model.Response; -import org.apache.gossip.model.SharedGossipDataMessage; -import org.apache.gossip.udp.UdpActiveGossipMessage; -import org.apache.gossip.udp.UdpGossipDataMessage; -import org.apache.gossip.udp.UdpSharedGossipDataMessage; -import org.apache.log4j.Logger; - -import static com.codahale.metrics.MetricRegistry.name; - -/** - * The ActiveGossipThread is sends information. Pick a random partner and send the membership list to that partner - */ -public class ActiveGossipThread { - - private static final Logger LOGGER = Logger.getLogger(ActiveGossipThread.class); - - private final GossipManager gossipManager; - private final Random random; - private final GossipCore gossipCore; - private ScheduledExecutorService scheduledExecutorService; - private final BlockingQueue workQueue; - private ThreadPoolExecutor threadService; - - private final Histogram sharedDataHistogram; - private final Histogram sendPerNodeDataHistogram; - private final Histogram sendMembershipHistorgram; - - public ActiveGossipThread(GossipManager gossipManager, GossipCore gossipCore, MetricRegistry registry) { - this.gossipManager = gossipManager; - random = new Random(); - this.gossipCore = gossipCore; - scheduledExecutorService = Executors.newScheduledThreadPool(2); - workQueue = new ArrayBlockingQueue(1024); - threadService = new ThreadPoolExecutor(1, 30, 1, TimeUnit.SECONDS, workQueue, new ThreadPoolExecutor.DiscardOldestPolicy()); - sharedDataHistogram = registry.histogram(name(ActiveGossipThread.class, "sharedDataHistogram-time")); - sendPerNodeDataHistogram = registry.histogram(name(ActiveGossipThread.class, "sendPerNodeDataHistogram-time")); - sendMembershipHistorgram = registry.histogram(name(ActiveGossipThread.class, "sendMembershipHistorgram-time")); - } - - - public void init() { - scheduledExecutorService.scheduleAtFixedRate( - () -> { - threadService.execute( () -> { sendToALiveMember(); }); - }, 0, - gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS); - scheduledExecutorService.scheduleAtFixedRate( - () -> { this.sendToDeadMember(); }, 0, - gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS); - scheduledExecutorService.scheduleAtFixedRate( - () -> sendPerNodeData(gossipManager.getMyself(), gossipManager.getLiveMembers()), 0, - gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS); - scheduledExecutorService.scheduleAtFixedRate( - () -> sendSharedData(gossipManager.getMyself(), gossipManager.getLiveMembers()), 0, - gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS); - } - - public void shutdown() { - scheduledExecutorService.shutdown(); - try { - scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS); - } catch (InterruptedException e) { - LOGGER.debug("Issue during shurdown" + e); - } - } - - public void sendSharedData(LocalGossipMember me, List memberList){ - long startTime = System.currentTimeMillis(); - - LocalGossipMember member = selectPartner(memberList); - if (member == null) { - LOGGER.debug("Send sendMembershipList() is called without action"); - sharedDataHistogram.update(System.currentTimeMillis() - startTime); - return; - } - for (Entry innerEntry : gossipCore.getSharedData().entrySet()){ - UdpSharedGossipDataMessage message = new UdpSharedGossipDataMessage(); - message.setUuid(UUID.randomUUID().toString()); - message.setUriFrom(me.getId()); - message.setExpireAt(innerEntry.getValue().getExpireAt()); - message.setKey(innerEntry.getValue().getKey()); - message.setNodeId(innerEntry.getValue().getNodeId()); - message.setTimestamp(innerEntry.getValue().getTimestamp()); - message.setPayload(innerEntry.getValue().getPayload()); - gossipCore.sendOneWay(message, member.getUri()); - } - sharedDataHistogram.update(System.currentTimeMillis() - startTime); - } - - public void sendPerNodeData(LocalGossipMember me, List memberList){ - long startTime = System.currentTimeMillis(); - - LocalGossipMember member = selectPartner(memberList); - if (member == null) { - LOGGER.debug("Send sendMembershipList() is called without action"); - sendPerNodeDataHistogram.update(System.currentTimeMillis() - startTime); - return; - } - for (Entry> entry : gossipCore.getPerNodeData().entrySet()){ - for (Entry innerEntry : entry.getValue().entrySet()){ - UdpGossipDataMessage message = new UdpGossipDataMessage(); - message.setUuid(UUID.randomUUID().toString()); - message.setUriFrom(me.getId()); - message.setExpireAt(innerEntry.getValue().getExpireAt()); - message.setKey(innerEntry.getValue().getKey()); - message.setNodeId(innerEntry.getValue().getNodeId()); - message.setTimestamp(innerEntry.getValue().getTimestamp()); - message.setPayload(innerEntry.getValue().getPayload()); - gossipCore.sendOneWay(message, member.getUri()); - } - } - sendPerNodeDataHistogram.update(System.currentTimeMillis() - startTime); - } - - protected void sendToALiveMember(){ - LocalGossipMember member = selectPartner(gossipManager.getLiveMembers()); - System.out.println("send" ); - sendMembershipList(gossipManager.getMyself(), member); - } - - protected void sendToDeadMember(){ - LocalGossipMember member = selectPartner(gossipManager.getDeadMembers()); - sendMembershipList(gossipManager.getMyself(), member); - } - - /** - * Performs the sending of the membership list, after we have incremented our own heartbeat. - */ - protected void sendMembershipList(LocalGossipMember me, LocalGossipMember member) { - long startTime = System.currentTimeMillis(); - me.setHeartbeat(System.nanoTime()); - if (member == null) { - LOGGER.debug("Send sendMembershipList() is called without action"); - sendMembershipHistorgram.update(System.currentTimeMillis() - startTime); - return; - } else { - LOGGER.debug("Send sendMembershipList() is called to " + member.toString()); - } - UdpActiveGossipMessage message = new UdpActiveGossipMessage(); - message.setUriFrom(gossipManager.getMyself().getUri().toASCIIString()); - message.setUuid(UUID.randomUUID().toString()); - message.getMembers().add(convert(me)); - for (LocalGossipMember other : gossipManager.getMembers().keySet()) { - message.getMembers().add(convert(other)); - } - Response r = gossipCore.send(message, member.getUri()); - if (r instanceof ActiveGossipOk){ - //maybe count metrics here - } else { - LOGGER.debug("Message " + message + " generated response " + r); - } - sendMembershipHistorgram.update(System.currentTimeMillis() - startTime); - } - - /** - * - * @param memberList - * The list of members which are stored in the local list of members. - * @return The chosen LocalGossipMember to gossip with. - */ - protected LocalGossipMember selectPartner(List memberList) { - LocalGossipMember member = null; - if (memberList.size() > 0) { - int randomNeighborIndex = random.nextInt(memberList.size()); - member = memberList.get(randomNeighborIndex); - } else { - LOGGER.debug("I am alone in this world."); - } - return member; - } - - private GossipMember convert(LocalGossipMember member){ - GossipMember gm = new GossipMember(); - gm.setCluster(member.getClusterName()); - gm.setHeartbeat(member.getHeartbeat()); - gm.setUri(member.getUri().toASCIIString()); - gm.setId(member.getId()); - return gm; - } -} diff --git a/src/main/java/org/apache/gossip/manager/DatacenterRackAwareActiveGossiper.java b/src/main/java/org/apache/gossip/manager/DatacenterRackAwareActiveGossiper.java new file mode 100644 index 0000000..40b9c28 --- /dev/null +++ b/src/main/java/org/apache/gossip/manager/DatacenterRackAwareActiveGossiper.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.manager; + +import java.util.List; +import java.util.Random; +import java.util.ArrayList; +import java.util.Collections; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.gossip.LocalGossipMember; + +import com.codahale.metrics.MetricRegistry; + +/** + * Sends gossip traffic at different rates to other racks and data-centers. + * This implementation controls the rate at which gossip traffic is shared. + * There are two constructs Datacenter and Rack. It is assumed that bandwidth and latency is higher + * in the rack than in the the datacenter. We can adjust the rate at which we send messages to each group. + * + */ +public class DatacenterRackAwareActiveGossiper extends AbstractActiveGossiper { + + public static final String DATACENTER = "datacenter"; + public static final String RACK = "rack"; + + private int sameRackGossipIntervalMs = 100; + private int sameDcGossipIntervalMs = 500; + private int differentDatacenterGossipIntervalMs = 1000; + private int randomDeadMemberSendIntervalMs = 250; + + private ScheduledExecutorService scheduledExecutorService; + private final BlockingQueue workQueue; + private ThreadPoolExecutor threadService; + private final Random random; + + public DatacenterRackAwareActiveGossiper(GossipManager gossipManager, GossipCore gossipCore, + MetricRegistry registry) { + super(gossipManager, gossipCore, registry); + scheduledExecutorService = Executors.newScheduledThreadPool(2); + workQueue = new ArrayBlockingQueue(1024); + threadService = new ThreadPoolExecutor(1, 30, 1, TimeUnit.SECONDS, workQueue, + new ThreadPoolExecutor.DiscardOldestPolicy()); + random = new Random(); + try { + sameRackGossipIntervalMs = Integer.parseInt(gossipManager.getSettings() + .getActiveGossipProperties().get("sameRackGossipIntervalMs")); + } catch (RuntimeException ex) { } + try { + sameDcGossipIntervalMs = Integer.parseInt(gossipManager.getSettings() + .getActiveGossipProperties().get("sameDcGossipIntervalMs")); + } catch (RuntimeException ex) { } + try { + differentDatacenterGossipIntervalMs = Integer.parseInt(gossipManager.getSettings() + .getActiveGossipProperties().get("differentDatacenterGossipIntervalMs")); + } catch (RuntimeException ex) { } + try { + randomDeadMemberSendIntervalMs = Integer.parseInt(gossipManager.getSettings() + .getActiveGossipProperties().get("randomDeadMemberSendIntervalMs")); + } catch (RuntimeException ex) { } + } + + @Override + public void init() { + super.init(); + //same rack + scheduledExecutorService.scheduleAtFixedRate(() -> + threadService.execute(() -> sendToSameRackMember()), + 0, sameRackGossipIntervalMs, TimeUnit.MILLISECONDS); + + scheduledExecutorService.scheduleAtFixedRate(() -> + threadService.execute(() -> sendToSameRackMemberPerNode()), + 0, sameRackGossipIntervalMs, TimeUnit.MILLISECONDS); + + scheduledExecutorService.scheduleAtFixedRate(() -> + threadService.execute(() -> sendToSameRackShared()), + 0, sameRackGossipIntervalMs, TimeUnit.MILLISECONDS); + + //same dc different rack + scheduledExecutorService.scheduleAtFixedRate(() -> + threadService.execute(() -> sameDcDiffernetRackMember()), + 0, sameDcGossipIntervalMs, TimeUnit.MILLISECONDS); + + scheduledExecutorService.scheduleAtFixedRate(() -> + threadService.execute(() -> sameDcDiffernetRackPerNode()), + 0, sameDcGossipIntervalMs, TimeUnit.MILLISECONDS); + + scheduledExecutorService.scheduleAtFixedRate(() -> + threadService.execute(() -> sameDcDiffernetRackShared()), + 0, sameDcGossipIntervalMs, TimeUnit.MILLISECONDS); + + //different dc + scheduledExecutorService.scheduleAtFixedRate(() -> + threadService.execute(() -> differentDcMember()), + 0, differentDatacenterGossipIntervalMs, TimeUnit.MILLISECONDS); + + scheduledExecutorService.scheduleAtFixedRate(() -> + threadService.execute(() -> differentDcPerNode()), + 0, differentDatacenterGossipIntervalMs, TimeUnit.MILLISECONDS); + + scheduledExecutorService.scheduleAtFixedRate(() -> + threadService.execute(() -> differentDcShared()), + 0, differentDatacenterGossipIntervalMs, TimeUnit.MILLISECONDS); + + //the dead + scheduledExecutorService.scheduleAtFixedRate(() -> + threadService.execute(() -> sendToDeadMember()), + 0, randomDeadMemberSendIntervalMs, TimeUnit.MILLISECONDS); + + } + + private void sendToDeadMember() { + sendMembershipList(gossipManager.getMyself(), selectPartner(gossipManager.getDeadMembers())); + } + + private List differentDataCenter(){ + String myDc = gossipManager.getMyself().getProperties().get(DATACENTER); + String rack = gossipManager.getMyself().getProperties().get(RACK); + if (myDc == null|| rack == null){ + return Collections.emptyList(); + } + List notMyDc = new ArrayList(10); + for (LocalGossipMember i : gossipManager.getLiveMembers()){ + if (!myDc.equals(i.getProperties().get(DATACENTER))){ + notMyDc.add(i); + } + } + return notMyDc; + } + + private List sameDatacenterDifferentRack(){ + String myDc = gossipManager.getMyself().getProperties().get(DATACENTER); + String rack = gossipManager.getMyself().getProperties().get(RACK); + if (myDc == null|| rack == null){ + return Collections.emptyList(); + } + List notMyDc = new ArrayList(10); + for (LocalGossipMember i : gossipManager.getLiveMembers()){ + if (myDc.equals(i.getProperties().get(DATACENTER)) && !rack.equals(i.getProperties().get(RACK))){ + notMyDc.add(i); + } + } + return notMyDc; + } + + private List sameRackNodes(){ + String myDc = gossipManager.getMyself().getProperties().get(DATACENTER); + String rack = gossipManager.getMyself().getProperties().get(RACK); + if (myDc == null|| rack == null){ + return Collections.emptyList(); + } + List sameDcAndRack = new ArrayList(10); + for (LocalGossipMember i : gossipManager.getLiveMembers()){ + if (myDc.equals(i.getProperties().get(DATACENTER)) + && rack.equals(i.getProperties().get(RACK))){ + sameDcAndRack.add(i); + } + } + return sameDcAndRack; + } + + private void sendToSameRackMember() { + sendMembershipList(gossipManager.getMyself(), selectPartner(sameRackNodes())); + } + + private void sendToSameRackMemberPerNode() { + sendPerNodeData(gossipManager.getMyself(), selectPartner(sameRackNodes())); + } + + private void sendToSameRackShared() { + sendSharedData(gossipManager.getMyself(), selectPartner(sameRackNodes())); + } + + private void differentDcMember() { + sendMembershipList(gossipManager.getMyself(), selectPartner(differentDataCenter())); + } + + private void differentDcPerNode() { + sendPerNodeData(gossipManager.getMyself(), selectPartner(differentDataCenter())); + } + + private void differentDcShared() { + sendSharedData(gossipManager.getMyself(), selectPartner(differentDataCenter())); + } + + private void sameDcDiffernetRackMember() { + sendMembershipList(gossipManager.getMyself(), selectPartner(sameDatacenterDifferentRack())); + } + + private void sameDcDiffernetRackPerNode() { + sendPerNodeData(gossipManager.getMyself(), selectPartner(sameDatacenterDifferentRack())); + } + + private void sameDcDiffernetRackShared() { + sendSharedData(gossipManager.getMyself(), selectPartner(sameDatacenterDifferentRack())); + } + + + @Override + public void shutdown() { + super.shutdown(); + } + + protected LocalGossipMember selectPartner(List memberList) { + LocalGossipMember member = null; + if (memberList.size() > 0) { + int randomNeighborIndex = random.nextInt(memberList.size()); + member = memberList.get(randomNeighborIndex); + } + return member; + } + +} diff --git a/src/main/java/org/apache/gossip/manager/GossipCore.java b/src/main/java/org/apache/gossip/manager/GossipCore.java index 5d561c3..31bd447 100644 --- a/src/main/java/org/apache/gossip/manager/GossipCore.java +++ b/src/main/java/org/apache/gossip/manager/GossipCore.java @@ -165,7 +165,8 @@ public class GossipCore implements GossipCoreConstants { activeGossipMessage.getMembers().get(i).getCluster(), u, activeGossipMessage.getMembers().get(i).getId(), - activeGossipMessage.getMembers().get(i).getHeartbeat()); + activeGossipMessage.getMembers().get(i).getHeartbeat(), + activeGossipMessage.getMembers().get(i).getProperties()); if (i == 0) { senderMember = member; } @@ -321,6 +322,7 @@ public class GossipCore implements GossipCoreConstants { remoteMember.getUri(), remoteMember.getId(), remoteMember.getHeartbeat(), + remoteMember.getProperties(), gossipManager.getSettings().getWindowSize(), gossipManager.getSettings().getMinimumSamples(), gossipManager.getSettings().getDistribution()); @@ -331,6 +333,7 @@ public class GossipCore implements GossipCoreConstants { if (localMember.getKey().getId().equals(remoteMember.getId())){ localMember.getKey().recordHeartbeat(remoteMember.getHeartbeat()); localMember.getKey().setHeartbeat(remoteMember.getHeartbeat()); + localMember.getKey().setProperties(remoteMember.getProperties()); } } } diff --git a/src/main/java/org/apache/gossip/manager/GossipManager.java b/src/main/java/org/apache/gossip/manager/GossipManager.java index cf67c9c..840efb9 100644 --- a/src/main/java/org/apache/gossip/manager/GossipManager.java +++ b/src/main/java/org/apache/gossip/manager/GossipManager.java @@ -18,9 +18,13 @@ package org.apache.gossip.manager; import com.codahale.metrics.MetricRegistry; + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; import java.net.URI; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Map.Entry; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; @@ -59,7 +63,7 @@ public abstract class GossipManager { private final GossipListener listener; - private ActiveGossipThread activeGossipThread; + private AbstractActiveGossiper activeGossipThread; private PassiveGossipThread passiveGossipThread; @@ -76,21 +80,22 @@ public abstract class GossipManager { private MetricRegistry registry; public GossipManager(String cluster, - URI uri, String id, GossipSettings settings, + URI uri, String id, Map properties, GossipSettings settings, List gossipMembers, GossipListener listener, MetricRegistry registry) { this.settings = settings; gossipCore = new GossipCore(this, registry); clock = new SystemClock(); dataReaper = new DataReaper(gossipCore, clock); - me = new LocalGossipMember(cluster, uri, id, clock.nanoTime(), + me = new LocalGossipMember(cluster, uri, id, clock.nanoTime(), properties, settings.getWindowSize(), settings.getMinimumSamples(), settings.getDistribution()); members = new ConcurrentSkipListMap<>(); for (GossipMember startupMember : gossipMembers) { if (!startupMember.equals(me)) { LocalGossipMember member = new LocalGossipMember(startupMember.getClusterName(), startupMember.getUri(), startupMember.getId(), - clock.nanoTime(), settings.getWindowSize(), settings.getMinimumSamples(), settings.getDistribution()); + clock.nanoTime(), startupMember.getProperties(), settings.getWindowSize(), + settings.getMinimumSamples(), settings.getDistribution()); //TODO should members start in down state? members.put(member, GossipState.DOWN); } @@ -137,6 +142,14 @@ public abstract class GossipManager { return me; } + private AbstractActiveGossiper constructActiveGossiper(){ + try { + Constructor c = Class.forName(settings.getActiveGossipClass()).getConstructor(GossipManager.class, GossipCore.class, MetricRegistry.class); + return (AbstractActiveGossiper) c.newInstance(this, gossipCore, registry); + } catch (NoSuchMethodException | SecurityException | ClassNotFoundException | InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { + throw new RuntimeException(e); + } + } /** * Starts the client. Specifically, start the various cycles for this protocol. Start the gossip * thread and start the receiver thread. @@ -144,7 +157,7 @@ public abstract class GossipManager { public void init() { passiveGossipThread = new OnlyProcessReceivedPassiveGossipThread(this, gossipCore); gossipThreadExecutor.execute(passiveGossipThread); - activeGossipThread = new ActiveGossipThread(this, this.gossipCore, registry); + activeGossipThread = constructActiveGossiper(); activeGossipThread.init(); dataReaper.init(); scheduledServiced.scheduleAtFixedRate(() -> { diff --git a/src/main/java/org/apache/gossip/manager/SimpleActiveGossipper.java b/src/main/java/org/apache/gossip/manager/SimpleActiveGossipper.java new file mode 100644 index 0000000..43237fb --- /dev/null +++ b/src/main/java/org/apache/gossip/manager/SimpleActiveGossipper.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.manager; + +import java.util.List; +import java.util.Random; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.gossip.LocalGossipMember; + +import com.codahale.metrics.MetricRegistry; + +/** + * Base implementation gossips randomly to live nodes periodically gossips to dead ones + * + */ +public class SimpleActiveGossipper extends AbstractActiveGossiper { + + private ScheduledExecutorService scheduledExecutorService; + private final BlockingQueue workQueue; + private ThreadPoolExecutor threadService; + private final Random random; + + public SimpleActiveGossipper(GossipManager gossipManager, GossipCore gossipCore, + MetricRegistry registry) { + super(gossipManager, gossipCore, registry); + scheduledExecutorService = Executors.newScheduledThreadPool(2); + workQueue = new ArrayBlockingQueue(1024); + threadService = new ThreadPoolExecutor(1, 30, 1, TimeUnit.SECONDS, workQueue, + new ThreadPoolExecutor.DiscardOldestPolicy()); + random = new Random(); + } + + @Override + public void init() { + super.init(); + scheduledExecutorService.scheduleAtFixedRate(() -> { + threadService.execute(() -> { + sendToALiveMember(); + }); + }, 0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS); + scheduledExecutorService.scheduleAtFixedRate(() -> { + sendToDeadMember(); + }, 0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS); + scheduledExecutorService.scheduleAtFixedRate( + () -> sendPerNodeData(gossipManager.getMyself(), + selectPartner(gossipManager.getLiveMembers())), + 0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS); + scheduledExecutorService.scheduleAtFixedRate( + () -> sendSharedData(gossipManager.getMyself(), + selectPartner(gossipManager.getLiveMembers())), + 0, gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS); + } + + @Override + public void shutdown() { + super.shutdown(); + scheduledExecutorService.shutdown(); + try { + scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOGGER.debug("Issue during shutdown", e); + } + } + + protected void sendToALiveMember(){ + LocalGossipMember member = selectPartner(gossipManager.getLiveMembers()); + sendMembershipList(gossipManager.getMyself(), member); + } + + protected void sendToDeadMember(){ + LocalGossipMember member = selectPartner(gossipManager.getDeadMembers()); + sendMembershipList(gossipManager.getMyself(), member); + } + + /** + * + * @param memberList + * The list of members which are stored in the local list of members. + * @return The chosen LocalGossipMember to gossip with. + */ + protected LocalGossipMember selectPartner(List memberList) { + //TODO this selection is racey what if the list size changes? + LocalGossipMember member = null; + if (memberList.size() > 0) { + int randomNeighborIndex = random.nextInt(memberList.size()); + member = memberList.get(randomNeighborIndex); + } + return member; + } +} diff --git a/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java b/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java index fd936f1..4a150be 100644 --- a/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java +++ b/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java @@ -25,8 +25,9 @@ import org.apache.gossip.manager.GossipManager; import java.net.URI; import java.util.List; - +import java.util.Map; import java.util.ArrayList; +import java.util.HashMap; public class RandomGossipManager extends GossipManager { @@ -42,6 +43,7 @@ public class RandomGossipManager extends GossipManager { private List gossipMembers; private GossipListener listener; private MetricRegistry registry; + private Map properties; private ManagerBuilder() {} @@ -55,6 +57,11 @@ public class RandomGossipManager extends GossipManager { this.cluster = cluster; return this; } + + public ManagerBuilder properties(Map properties) { + this.properties = properties; + return this; + } public ManagerBuilder withId(String id) { this.id = id; @@ -75,6 +82,7 @@ public class RandomGossipManager extends GossipManager { this.listener = listener; return this; } + public ManagerBuilder registry(MetricRegistry registry) { this.registry = registry; return this; @@ -91,18 +99,21 @@ public class RandomGossipManager extends GossipManager { checkArgument(settings != null, "You must specify gossip settings"); checkArgument(uri != null, "You must specify a uri"); checkArgument(registry != null, "You must specify a MetricRegistry"); + if (properties == null){ + properties = new HashMap(); + } if (listener == null){ listener((a,b) -> {}); } if (gossipMembers == null) { gossipMembers = new ArrayList<>(); } - return new RandomGossipManager(cluster, uri, id, settings, gossipMembers, listener, registry); + return new RandomGossipManager(cluster, uri, id, properties, settings, gossipMembers, listener, registry); } } - private RandomGossipManager(String cluster, URI uri, String id, GossipSettings settings, + private RandomGossipManager(String cluster, URI uri, String id, Map properties, GossipSettings settings, List gossipMembers, GossipListener listener, MetricRegistry registry) { - super(cluster, uri, id, settings, gossipMembers, listener, registry); + super(cluster, uri, id, properties, settings, gossipMembers, listener, registry); } } diff --git a/src/main/java/org/apache/gossip/model/GossipMember.java b/src/main/java/org/apache/gossip/model/GossipMember.java index 03ec13d..a318776 100644 --- a/src/main/java/org/apache/gossip/model/GossipMember.java +++ b/src/main/java/org/apache/gossip/model/GossipMember.java @@ -17,12 +17,15 @@ */ package org.apache.gossip.model; +import java.util.Map; + public class GossipMember { private String cluster; private String uri; private String id; private Long heartbeat; + private Map properties; public GossipMember(){ @@ -66,5 +69,19 @@ public class GossipMember { public void setHeartbeat(Long heartbeat) { this.heartbeat = heartbeat; } + + public Map getProperties() { + return properties; + } + + public void setProperties(Map properties) { + this.properties = properties; + } + + @Override + public String toString() { + return "GossipMember [cluster=" + cluster + ", uri=" + uri + ", id=" + id + ", heartbeat=" + + heartbeat + ", properties=" + properties + "]"; + } } diff --git a/src/main/java/org/apache/gossip/udp/UdpActiveGossipMessage.java b/src/main/java/org/apache/gossip/udp/UdpActiveGossipMessage.java index 424d1ca..b6e8101 100644 --- a/src/main/java/org/apache/gossip/udp/UdpActiveGossipMessage.java +++ b/src/main/java/org/apache/gossip/udp/UdpActiveGossipMessage.java @@ -42,7 +42,8 @@ public class UdpActiveGossipMessage extends ActiveGossipMessage implements Track @Override public String toString() { - return "UdpActiveGossipMessage [uriFrom=" + uriFrom + ", uuid=" + uuid + "]"; + return "UdpActiveGossipMessage [uriFrom=" + uriFrom + ", uuid=" + uuid + ", getMembers()=" + + getMembers() + "]"; } - + } diff --git a/src/test/java/org/apache/gossip/DataTest.java b/src/test/java/org/apache/gossip/DataTest.java index 766d72b..83879f9 100644 --- a/src/test/java/org/apache/gossip/DataTest.java +++ b/src/test/java/org/apache/gossip/DataTest.java @@ -22,6 +22,7 @@ import java.net.URI; import java.net.URISyntaxException; import java.net.UnknownHostException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.UUID; import java.util.concurrent.Callable; @@ -50,7 +51,7 @@ public class DataTest { for (int i = 1; i < clusterMembers+1; ++i) { URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i)); GossipService gossipService = new GossipService(cluster, uri, i + "", - startupMembers, settings, + new HashMap(), startupMembers, settings, (a,b) -> {}, new MetricRegistry()); clients.add(gossipService); gossipService.start(); @@ -65,29 +66,27 @@ public class DataTest { }}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(2); clients.get(0).gossipPerNodeData(msg()); clients.get(0).gossipSharedData(sharedMsg()); - Thread.sleep(10000); - TUnit.assertThat( - new Callable() { - public Object call() throws Exception { - GossipDataMessage x = clients.get(1).findPerNodeData(1 + "", "a"); - if (x == null) - return ""; - else - return x.getPayload(); - } - }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("b"); + + TUnit.assertThat(new Callable() { + public Object call() throws Exception { + GossipDataMessage x = clients.get(1).findPerNodeData(1 + "", "a"); + if (x == null) + return ""; + else + return x.getPayload(); + } + }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("b"); - TUnit.assertThat( - new Callable() { - public Object call() throws Exception { - SharedGossipDataMessage x = clients.get(1).findSharedData("a"); - if (x == null) - return ""; - else - return x.getPayload(); - } - }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("c"); + TUnit.assertThat(new Callable() { + public Object call() throws Exception { + SharedGossipDataMessage x = clients.get(1).findSharedData("a"); + if (x == null) + return ""; + else + return x.getPayload(); + } + }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("c"); for (int i = 0; i < clusterMembers; ++i) { diff --git a/src/test/java/org/apache/gossip/GossipMemberTest.java b/src/test/java/org/apache/gossip/GossipMemberTest.java index e15259e..272c7fb 100644 --- a/src/test/java/org/apache/gossip/GossipMemberTest.java +++ b/src/test/java/org/apache/gossip/GossipMemberTest.java @@ -19,6 +19,7 @@ package org.apache.gossip; import java.net.URI; import java.net.URISyntaxException; +import java.util.HashMap; import org.junit.Assert; import org.junit.jupiter.api.Test; @@ -31,9 +32,9 @@ public class GossipMemberTest { @Test public void testHashCodeFromGossip40() throws URISyntaxException { Assert.assertNotEquals( - new LocalGossipMember("mycluster", new URI("udp://4.4.4.4:1000"), "myid", 1, 10, 5, "exponential") + new LocalGossipMember("mycluster", new URI("udp://4.4.4.4:1000"), "myid", 1, new HashMap(), 10, 5, "exponential") .hashCode(), - new LocalGossipMember("mycluster", new URI("udp://4.4.4.5:1005"), "yourid", 11, 11, 6, "exponential") + new LocalGossipMember("mycluster", new URI("udp://4.4.4.5:1005"), "yourid", 11, new HashMap(), 11, 6, "exponential") .hashCode()); } } diff --git a/src/test/java/org/apache/gossip/IdAndPropertyTest.java b/src/test/java/org/apache/gossip/IdAndPropertyTest.java new file mode 100644 index 0000000..2a98f01 --- /dev/null +++ b/src/test/java/org/apache/gossip/IdAndPropertyTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip; + +import java.net.URI; +import java.net.URISyntaxException; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.gossip.manager.DatacenterRackAwareActiveGossiper; +import org.junit.jupiter.api.Test; +import org.junit.platform.runner.JUnitPlatform; +import org.junit.runner.RunWith; + +import com.codahale.metrics.MetricRegistry; + +import io.teknek.tunit.TUnit; + +@RunWith(JUnitPlatform.class) +public class IdAndPropertyTest { + + @Test + public void testDatacenterRackGossiper() throws URISyntaxException, UnknownHostException, InterruptedException{ + GossipSettings settings = new GossipSettings(); + settings.setActiveGossipClass(DatacenterRackAwareActiveGossiper.class.getName()); + List startupMembers = new ArrayList<>(); + Map x = new HashMap<>(); + x.put("a", "b"); + x.put("datacenter", "dc1"); + x.put("rack", "rack1"); + GossipService gossipService1 = new GossipService("a", new URI("udp://" + "127.0.0.1" + ":" + (29000 + 0)), "0", x, startupMembers, settings, + (a, b) -> {}, new MetricRegistry()); + gossipService1.start(); + + Map y = new HashMap<>(); + y.put("a", "c"); + y.put("datacenter", "dc2"); + y.put("rack", "rack2"); + GossipService gossipService2 = new GossipService("a", new URI("udp://" + "127.0.0.1" + ":" + (29000 + 1)), "1", y, + Arrays.asList(new RemoteGossipMember("a", + new URI("udp://" + "127.0.0.1" + ":" + (29000 + 0)), "0")), + settings, (a, b) -> { }, new MetricRegistry()); + gossipService2.start(); + TUnit.assertThat(() -> { + String value = ""; + try { + value = gossipService1.getGossipManager().getLiveMembers().get(0).getProperties().get("a"); + } catch (RuntimeException e){ } + return value; + }).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo("c"); + + TUnit.assertThat(() -> { + String value = ""; + try { + value = gossipService2.getGossipManager().getLiveMembers().get(0).getProperties().get("a"); + } catch (RuntimeException e){ } + return value; + }).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo("b"); + + gossipService1.shutdown(); + gossipService2.shutdown(); + + } +} diff --git a/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java b/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java index f5e34ba..9d02556 100644 --- a/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java +++ b/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java @@ -25,6 +25,7 @@ import java.net.URISyntaxException; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Random; import java.util.UUID; @@ -58,7 +59,7 @@ public class ShutdownDeadtimeTest { final int clusterMembers = 5; for (int i = 1; i < clusterMembers + 1; ++i) { URI uri = new URI("udp://" + "127.0.0.1" + ":" + (30300 + i)); - GossipService gossipService = new GossipService(cluster, uri, i + "", startupMembers, + GossipService gossipService = new GossipService(cluster, uri, i + "", new HashMap(), startupMembers, settings, (a,b) -> {}, new MetricRegistry()); clients.add(gossipService); gossipService.start(); @@ -100,11 +101,11 @@ public class ShutdownDeadtimeTest { } return total; } - }).afterWaitingAtMost(30, TimeUnit.SECONDS).isEqualTo(4); + }).afterWaitingAtMost(50, TimeUnit.SECONDS).isEqualTo(4); URI uri = new URI("udp://" + "127.0.0.1" + ":" + shutdownPort); // start client again - GossipService gossipService = new GossipService(cluster, uri, shutdownId + "", startupMembers, + GossipService gossipService = new GossipService(cluster, uri, shutdownId + "", new HashMap(), startupMembers, settings, (a,b) -> {}, new MetricRegistry()); clients.add(gossipService); gossipService.start(); diff --git a/src/test/java/org/apache/gossip/StartupSettingsTest.java b/src/test/java/org/apache/gossip/StartupSettingsTest.java index f0acabf..aa4d255 100644 --- a/src/test/java/org/apache/gossip/StartupSettingsTest.java +++ b/src/test/java/org/apache/gossip/StartupSettingsTest.java @@ -28,6 +28,7 @@ import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; +import java.util.HashMap; import java.util.UUID; import org.junit.platform.runner.JUnitPlatform; import org.junit.runner.RunWith; @@ -48,7 +49,7 @@ public class StartupSettingsTest { writeSettingsFile(settingsFile); URI uri = new URI("udp://" + "127.0.0.1" + ":" + 50000); final GossipService firstService = new GossipService( - CLUSTER, uri, "1", + CLUSTER, uri, "1", new HashMap(), new ArrayList(), new GossipSettings(), null, new MetricRegistry()); firstService.start(); final GossipService serviceUnderTest = new GossipService( @@ -70,6 +71,7 @@ public class StartupSettingsTest { " \"cleanup_interval\":10000,\n" + " \"convict_threshold\":2.6,\n" + " \"distribution\":\"exponential\",\n" + + " \"properties\":{},\n" + " \"members\":[\n" + " {\"cluster\": \"" + CLUSTER + "\",\"uri\":\"udp://127.0.0.1:5000\"}\n" + " ]\n" + @@ -77,7 +79,7 @@ public class StartupSettingsTest { log.info( "Using settings file with contents of:\n---\n" + settings + "\n---" ); FileOutputStream output = new FileOutputStream(target); - output.write( settings.getBytes() ); + output.write(settings.getBytes()); output.close(); } } diff --git a/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java b/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java index af7a117..bc4004d 100644 --- a/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java +++ b/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java @@ -24,6 +24,7 @@ import java.net.URI; import java.net.URISyntaxException; import java.net.UnknownHostException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.UUID; import java.util.concurrent.Callable; @@ -58,7 +59,7 @@ public class TenNodeThreeSeedTest { final int clusterMembers = 5; for (int i = 1; i < clusterMembers+1; ++i) { URI uri = new URI("udp://" + "127.0.0.1" + ":" + (base + i)); - GossipService gossipService = new GossipService(cluster, uri, i + "", + GossipService gossipService = new GossipService(cluster, uri, i + "", new HashMap(), startupMembers, settings, (a,b) -> {}, new MetricRegistry()); clients.add(gossipService); gossipService.start(); diff --git a/src/test/java/org/apache/gossip/accrual/FailureDetectorTest.java b/src/test/java/org/apache/gossip/accrual/FailureDetectorTest.java index 99cf9c8..69d46b8 100644 --- a/src/test/java/org/apache/gossip/accrual/FailureDetectorTest.java +++ b/src/test/java/org/apache/gossip/accrual/FailureDetectorTest.java @@ -34,7 +34,7 @@ public class FailureDetectorTest { int samples = 1; int windowSize = 1000; LocalGossipMember member = new LocalGossipMember("", URI.create("udp://127.0.0.1:1000"), - "", 0L, windowSize, samples, "normal"); + "", 0L, null, windowSize, samples, "normal"); member.recordHeartbeat(5); member.recordHeartbeat(10); Assert.assertEquals(new Double(0.3010299956639812), member.detect(10)); @@ -45,7 +45,7 @@ public class FailureDetectorTest { int samples = 1; int windowSize = 1000; LocalGossipMember member = new LocalGossipMember("", URI.create("udp://127.0.0.1:1000"), - "", 0L, windowSize, samples, "exponential"); + "", 0L, null, windowSize, samples, "exponential"); member.recordHeartbeat(5); member.recordHeartbeat(10); Assert.assertEquals(new Double(0.4342944819032518), member.detect(10)); @@ -65,7 +65,7 @@ public class FailureDetectorTest { int samples = 1; int windowSize = 1000; LocalGossipMember member = new LocalGossipMember("", URI.create("udp://127.0.0.1:1000"), - "", 0L, windowSize, samples, "exponential"); + "", 0L, null, windowSize, samples, "exponential"); member.recordHeartbeat(5); member.recordHeartbeat(5); member.recordHeartbeat(5); diff --git a/src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java b/src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java index 1ef3a5b..cf38492 100644 --- a/src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java +++ b/src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java @@ -21,19 +21,15 @@ import com.codahale.metrics.MetricRegistry; import org.apache.gossip.GossipMember; import org.apache.gossip.GossipSettings; import org.apache.gossip.LocalGossipMember; -import org.apache.gossip.event.GossipListener; -import org.apache.gossip.event.GossipState; import org.apache.gossip.manager.random.RandomGossipManager; import org.junit.jupiter.api.Test; import org.junit.platform.runner.JUnitPlatform; import org.junit.runner.RunWith; -import javax.management.Notification; -import javax.management.NotificationListener; - import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -43,18 +39,6 @@ import static org.junit.jupiter.api.Assertions.expectThrows; @RunWith(JUnitPlatform.class) public class RandomGossipManagerBuilderTest { - public static class TestGossipListener implements GossipListener { - @Override - public void gossipEvent(GossipMember member, GossipState state) { - } - } - - public static class TestNotificationListener implements NotificationListener { - @Override - public void handleNotification(Notification notification, Object o) { - } - } - @Test public void idShouldNotBeNull() { expectThrows(IllegalArgumentException.class,() -> { @@ -91,7 +75,7 @@ public class RandomGossipManagerBuilderTest { public void useMemberListIfProvided() throws URISyntaxException { LocalGossipMember member = new LocalGossipMember( "aCluster", new URI("udp://localhost:2000"), "aGossipMember", - System.nanoTime(), 1000, 1, "exponential"); + System.nanoTime(), new HashMap(), 1000, 1, "exponential"); List memberList = new ArrayList<>(); memberList.add(member); RandomGossipManager gossipManager = RandomGossipManager.newBuilder()