diff --git a/src/main/java/org/apache/gossip/GossipService.java b/src/main/java/org/apache/gossip/GossipService.java deleted file mode 100644 index f216c33..0000000 --- a/src/main/java/org/apache/gossip/GossipService.java +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gossip; - -import com.codahale.metrics.MetricRegistry; -import java.net.URI; -import java.net.UnknownHostException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import com.codahale.metrics.JmxReporter; -import org.apache.gossip.event.GossipListener; -import org.apache.gossip.manager.GossipManager; -import org.apache.gossip.manager.random.RandomGossipManager; -import org.apache.gossip.model.GossipDataMessage; -import org.apache.gossip.model.SharedGossipDataMessage; -import org.apache.log4j.Logger; - -/** - * This object represents the service which is responsible for gossiping with other gossip members. - * - */ -public class GossipService { - - public static final Logger LOGGER = Logger.getLogger(GossipService.class); - private final JmxReporter jmxReporter; - - private final GossipManager gossipManager; - - /** - * Constructor with the default settings. - * - * @throws InterruptedException - * @throws UnknownHostException - */ - public GossipService(StartupSettings startupSettings) throws InterruptedException, - UnknownHostException { - this(startupSettings.getCluster(), startupSettings.getUri() - , startupSettings.getId(), new HashMap (),startupSettings.getGossipMembers(), - startupSettings.getGossipSettings(), null, new MetricRegistry()); - } - - /** - * Setup the client's lists, gossiping parameters, and parse the startup config file. - * - * @throws InterruptedException - * @throws UnknownHostException - */ - public GossipService(String cluster, URI uri, String id, Map properties, - List gossipMembers, GossipSettings settings, GossipListener listener, MetricRegistry registry) - throws InterruptedException, UnknownHostException { - jmxReporter = JmxReporter.forRegistry(registry).build(); - jmxReporter.start(); - gossipManager = RandomGossipManager.newBuilder() - .withId(id) - .cluster(cluster) - .uri(uri) - .settings(settings) - .gossipMembers(gossipMembers) - .listener(listener) - .registry(registry) - .properties(properties) - .build(); - } - - public void start() { - gossipManager.init(); - } - - public void shutdown() { - gossipManager.shutdown(); - } - - public GossipManager getGossipManager() { - return gossipManager; - } - - /** - * Gossip data in a namespace that is per-node { node-id { key, value } } - * @param message - * message to be gossip'ed across the cluster - */ - public void gossipPerNodeData(GossipDataMessage message){ - gossipManager.gossipPerNodeData(message); - } - - /** - * Retrieve per-node gossip data by key - * - * @param nodeId - * the id of the node that owns the data - * @param key - * the key in the per-node map to find the data - * @return the value if found or null if not found or expired - */ - public GossipDataMessage findPerNodeData(String nodeId, String key){ - return getGossipManager().findPerNodeGossipData(nodeId, key); - } - - /** - * - * @param message - * Shared data to gossip around the cluster - */ - public void gossipSharedData(SharedGossipDataMessage message){ - gossipManager.gossipSharedData(message); - } - - /** - * - * @param key - * the key to search for - * @return the value associated with given key - */ - public SharedGossipDataMessage findSharedData(String key){ - return getGossipManager().findSharedGossipData(key); - } - -} diff --git a/src/main/java/org/apache/gossip/LocalGossipMember.java b/src/main/java/org/apache/gossip/LocalMember.java similarity index 93% rename from src/main/java/org/apache/gossip/LocalGossipMember.java rename to src/main/java/org/apache/gossip/LocalMember.java index 05874f5..db6e3f7 100644 --- a/src/main/java/org/apache/gossip/LocalGossipMember.java +++ b/src/main/java/org/apache/gossip/LocalMember.java @@ -27,7 +27,7 @@ import org.apache.gossip.accrual.FailureDetector; * in the local list of gossip members. * */ -public class LocalGossipMember extends GossipMember { +public class LocalMember extends Member { /** The failure detector for this member */ private transient FailureDetector detector; @@ -40,13 +40,13 @@ public class LocalGossipMember extends GossipMember { * @param heartbeat * The current heartbeat */ - public LocalGossipMember(String clusterName, URI uri, String id, + public LocalMember(String clusterName, URI uri, String id, long heartbeat, Map properties, int windowSize, int minSamples, String distribution) { super(clusterName, uri, id, heartbeat, properties ); detector = new FailureDetector(this, minSamples, windowSize, distribution); } - protected LocalGossipMember(){ + protected LocalMember(){ } diff --git a/src/main/java/org/apache/gossip/GossipMember.java b/src/main/java/org/apache/gossip/Member.java similarity index 89% rename from src/main/java/org/apache/gossip/GossipMember.java rename to src/main/java/org/apache/gossip/Member.java index 703ac55..d04a7b6 100644 --- a/src/main/java/org/apache/gossip/GossipMember.java +++ b/src/main/java/org/apache/gossip/Member.java @@ -25,7 +25,7 @@ import java.util.Map; * A abstract class representing a gossip member. * */ -public abstract class GossipMember implements Comparable { +public abstract class Member implements Comparable { protected URI uri; @@ -56,7 +56,7 @@ public abstract class GossipMember implements Comparable { * @param id * An id that may be replaced after contact */ - public GossipMember(String clusterName, URI uri, String id, long heartbeat, Map properties) { + public Member(String clusterName, URI uri, String id, long heartbeat, Map properties) { this.clusterName = clusterName; this.id = id; this.heartbeat = heartbeat; @@ -64,7 +64,7 @@ public abstract class GossipMember implements Comparable { this.properties = properties; } - protected GossipMember(){} + protected Member(){} /** * Get the name of the cluster the member belongs to. * @@ -151,16 +151,16 @@ public abstract class GossipMember implements Comparable { System.err.println("equals(): obj is null."); return false; } - if (!(obj instanceof GossipMember)) { + if (!(obj instanceof Member)) { System.err.println("equals(): obj is not of type GossipMember."); return false; } // The object is the same of they both have the same address (hostname and port). - return computeAddress().equals(((LocalGossipMember) obj).computeAddress()) - && getClusterName().equals(((LocalGossipMember) obj).getClusterName()); + return computeAddress().equals(((LocalMember) obj).computeAddress()) + && getClusterName().equals(((LocalMember) obj).getClusterName()); } - public int compareTo(GossipMember other) { + public int compareTo(Member other) { return this.computeAddress().compareTo(other.computeAddress()); } } diff --git a/src/main/java/org/apache/gossip/RemoteGossipMember.java b/src/main/java/org/apache/gossip/RemoteMember.java similarity index 84% rename from src/main/java/org/apache/gossip/RemoteGossipMember.java rename to src/main/java/org/apache/gossip/RemoteMember.java index e3f6620..6b42da2 100644 --- a/src/main/java/org/apache/gossip/RemoteGossipMember.java +++ b/src/main/java/org/apache/gossip/RemoteMember.java @@ -26,7 +26,7 @@ import java.util.Map; * member. * */ -public class RemoteGossipMember extends GossipMember { +public class RemoteMember extends Member { /** * Constructor. @@ -36,11 +36,11 @@ public class RemoteGossipMember extends GossipMember { * @param heartbeat * The current heartbeat */ - public RemoteGossipMember(String clusterName, URI uri, String id, long heartbeat, Map properties) { + public RemoteMember(String clusterName, URI uri, String id, long heartbeat, Map properties) { super(clusterName, uri, id, heartbeat, properties); } - public RemoteGossipMember(String clusterName, URI uri, String id) { + public RemoteMember(String clusterName, URI uri, String id) { super(clusterName, uri, id, System.nanoTime(), new HashMap()); } diff --git a/src/main/java/org/apache/gossip/StartupSettings.java b/src/main/java/org/apache/gossip/StartupSettings.java index ab5f764..17eaaf2 100644 --- a/src/main/java/org/apache/gossip/StartupSettings.java +++ b/src/main/java/org/apache/gossip/StartupSettings.java @@ -53,7 +53,7 @@ public class StartupSettings { private final GossipSettings gossipSettings; /** The list with gossip members to start with. */ - private final List gossipMembers; + private final List gossipMembers; /** * Constructor. @@ -135,7 +135,7 @@ public class StartupSettings { * @param member * The member to add. */ - public void addGossipMember(GossipMember member) { + public void addGossipMember(Member member) { gossipMembers.add(member); } @@ -144,7 +144,7 @@ public class StartupSettings { * * @return The gossip members. */ - public List getGossipMembers() { + public List getGossipMembers() { return gossipMembers; } @@ -195,7 +195,7 @@ public class StartupSettings { while (it.hasNext()){ JsonNode child = it.next(); URI uri3 = new URI(child.get("uri").textValue()); - RemoteGossipMember member = new RemoteGossipMember(child.get("cluster").asText(), + RemoteMember member = new RemoteMember(child.get("cluster").asText(), uri3, "", 0, new HashMap()); settings.addGossipMember(member); configMembersDetails += member.computeAddress(); diff --git a/src/main/java/org/apache/gossip/accrual/FailureDetector.java b/src/main/java/org/apache/gossip/accrual/FailureDetector.java index 10d66a9..22e73db 100644 --- a/src/main/java/org/apache/gossip/accrual/FailureDetector.java +++ b/src/main/java/org/apache/gossip/accrual/FailureDetector.java @@ -21,7 +21,7 @@ import org.apache.commons.math.MathException; import org.apache.commons.math.distribution.ExponentialDistributionImpl; import org.apache.commons.math.distribution.NormalDistributionImpl; import org.apache.commons.math.stat.descriptive.DescriptiveStatistics; -import org.apache.gossip.LocalGossipMember; +import org.apache.gossip.LocalMember; import org.apache.log4j.Logger; public class FailureDetector { @@ -30,10 +30,10 @@ public class FailureDetector { private final DescriptiveStatistics descriptiveStatistics; private final long minimumSamples; private volatile long latestHeartbeatMs = -1; - private final LocalGossipMember parent; + private final LocalMember parent; private final String distribution; - public FailureDetector(LocalGossipMember parent, long minimumSamples, int windowSize, String distribution){ + public FailureDetector(LocalMember parent, long minimumSamples, int windowSize, String distribution){ this.parent = parent; descriptiveStatistics = new DescriptiveStatistics(windowSize); this.minimumSamples = minimumSamples; diff --git a/src/main/java/org/apache/gossip/event/GossipListener.java b/src/main/java/org/apache/gossip/event/GossipListener.java index 2e882f6..9b33dab 100644 --- a/src/main/java/org/apache/gossip/event/GossipListener.java +++ b/src/main/java/org/apache/gossip/event/GossipListener.java @@ -17,8 +17,8 @@ */ package org.apache.gossip.event; -import org.apache.gossip.GossipMember; +import org.apache.gossip.Member; public interface GossipListener { - void gossipEvent(GossipMember member, GossipState state); + void gossipEvent(Member member, GossipState state); } diff --git a/src/main/java/org/apache/gossip/examples/GossipExample.java b/src/main/java/org/apache/gossip/examples/GossipExample.java deleted file mode 100644 index 8236d46..0000000 --- a/src/main/java/org/apache/gossip/examples/GossipExample.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gossip.examples; - -import com.codahale.metrics.MetricRegistry; -import java.net.InetAddress; -import java.net.URI; -import java.net.URISyntaxException; -import java.net.UnknownHostException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; - -import org.apache.gossip.GossipMember; -import org.apache.gossip.GossipService; -import org.apache.gossip.GossipSettings; -import org.apache.gossip.RemoteGossipMember; - -/** - * This class is an example of how one could use the gossip service. Here we start multiple gossip - * clients on this host as specified in the config file. - * - * @author harmenw - */ -public class GossipExample extends Thread { - /** The number of clients to start. */ - private static final int NUMBER_OF_CLIENTS = 4; - - /** - * @param args - */ - public static void main(String[] args) { - new GossipExample(); - } - - /** - * Constructor. This will start the this thread. - */ - public GossipExample() { - start(); - } - - /** - * @see java.lang.Thread#run() - */ - public void run() { - try { - GossipSettings settings = new GossipSettings(); - List clients = new ArrayList<>(); - String myIpAddress = InetAddress.getLocalHost().getHostAddress(); - String cluster = "My Gossip Cluster"; - - // Create the gossip members and put them in a list and give them a port number starting with - // 2000. - List startupMembers = new ArrayList<>(); - for (int i = 0; i < NUMBER_OF_CLIENTS; ++i) { - URI u; - try { - u = new URI("udp://" + myIpAddress + ":" + (2000 + i)); - } catch (URISyntaxException e) { - throw new RuntimeException(e); - } - startupMembers.add(new RemoteGossipMember(cluster, u, "", 0, new HashMap())); - } - - // Lets start the gossip clients. - // Start the clients, waiting cleaning-interval + 1 second between them which will show the - // dead list handling. - for (GossipMember member : startupMembers) { - GossipService gossipService = new GossipService(cluster, member.getUri(), "", new HashMap(), - startupMembers, settings, null, new MetricRegistry()); - clients.add(gossipService); - gossipService.start(); - sleep(settings.getCleanupInterval() + 1000); - } - - // After starting all gossip clients, first wait 10 seconds and then shut them down. - sleep(10000); - System.err.println("Going to shutdown all services..."); - // Since they all run in the same virtual machine and share the same executor, if one is - // shutdown they will all stop. - clients.get(0).shutdown(); - - } catch (UnknownHostException e) { - e.printStackTrace(); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } -} diff --git a/src/main/java/org/apache/gossip/examples/StandAloneDatacenterAndRack.java b/src/main/java/org/apache/gossip/examples/StandAloneDatacenterAndRack.java index dfeabd7..357e316 100644 --- a/src/main/java/org/apache/gossip/examples/StandAloneDatacenterAndRack.java +++ b/src/main/java/org/apache/gossip/examples/StandAloneDatacenterAndRack.java @@ -24,12 +24,11 @@ import java.util.Arrays; import java.util.HashMap; import java.util.Map; -import org.apache.gossip.GossipService; import org.apache.gossip.GossipSettings; -import org.apache.gossip.RemoteGossipMember; +import org.apache.gossip.RemoteMember; import org.apache.gossip.manager.DatacenterRackAwareActiveGossiper; - -import com.codahale.metrics.MetricRegistry; +import org.apache.gossip.manager.GossipManager; +import org.apache.gossip.manager.GossipManagerBuilder; public class StandAloneDatacenterAndRack { @@ -43,18 +42,21 @@ public class StandAloneDatacenterAndRack { gossipProps.put("sameRackGossipIntervalMs", "2000"); gossipProps.put("differentDatacenterGossipIntervalMs", "10000"); s.setActiveGossipProperties(gossipProps); - - Map props = new HashMap<>(); props.put(DatacenterRackAwareActiveGossiper.DATACENTER, args[4]); props.put(DatacenterRackAwareActiveGossiper.RACK, args[5]); - GossipService gossipService = new GossipService("mycluster", URI.create(args[0]), args[1], - props, Arrays.asList(new RemoteGossipMember("mycluster", URI.create(args[2]), args[3])), - s, (a, b) -> { }, new MetricRegistry()); - gossipService.start(); + GossipManager manager = GossipManagerBuilder.newBuilder() + .cluster("mycluster") + .uri(URI.create(args[0])) + .id(args[1]) + .gossipSettings(s) + .gossipMembers(Arrays.asList(new RemoteMember("mycluster", URI.create(args[2]), args[3]))) + .properties(props) + .build(); + manager.init(); while (true){ - System.out.println("Live: " + gossipService.getGossipManager().getLiveMembers()); - System.out.println("Dead: " + gossipService.getGossipManager().getDeadMembers()); + System.out.println("Live: " + manager.getLiveMembers()); + System.out.println("Dead: " + manager.getDeadMembers()); Thread.sleep(2000); } } diff --git a/src/main/java/org/apache/gossip/examples/StandAloneNode.java b/src/main/java/org/apache/gossip/examples/StandAloneNode.java index 3564943..b38865e 100644 --- a/src/main/java/org/apache/gossip/examples/StandAloneNode.java +++ b/src/main/java/org/apache/gossip/examples/StandAloneNode.java @@ -17,15 +17,13 @@ */ package org.apache.gossip.examples; -import com.codahale.metrics.MetricRegistry; import java.net.URI; import java.net.UnknownHostException; import java.util.Arrays; -import java.util.HashMap; - -import org.apache.gossip.GossipService; import org.apache.gossip.GossipSettings; -import org.apache.gossip.RemoteGossipMember; +import org.apache.gossip.RemoteMember; +import org.apache.gossip.manager.GossipManager; +import org.apache.gossip.manager.GossipManagerBuilder; public class StandAloneNode { public static void main (String [] args) throws UnknownHostException, InterruptedException{ @@ -33,12 +31,17 @@ public class StandAloneNode { s.setWindowSize(10); s.setConvictThreshold(1.0); s.setGossipInterval(10); - GossipService gossipService = new GossipService("mycluster", URI.create(args[0]), args[1], new HashMap(), - Arrays.asList( new RemoteGossipMember("mycluster", URI.create(args[2]), args[3])), s, (a,b) -> {}, new MetricRegistry()); - gossipService.start(); + GossipManager gossipService = GossipManagerBuilder.newBuilder() + .cluster("mycluster") + .uri(URI.create(args[0])) + .id(args[1]) + .gossipMembers(Arrays.asList( new RemoteMember("mycluster", URI.create(args[2]), args[3]))) + .gossipSettings(s) + .build(); + gossipService.init(); while (true){ - System.out.println( "Live: " + gossipService.getGossipManager().getLiveMembers()); - System.out.println( "Dead: " + gossipService.getGossipManager().getDeadMembers()); + System.out.println("Live: " + gossipService.getLiveMembers()); + System.out.println("Dead: " + gossipService.getDeadMembers()); Thread.sleep(2000); } } diff --git a/src/main/java/org/apache/gossip/examples/StandAloneNodeCrdtOrSet.java b/src/main/java/org/apache/gossip/examples/StandAloneNodeCrdtOrSet.java index d1c1751..00f1279 100644 --- a/src/main/java/org/apache/gossip/examples/StandAloneNodeCrdtOrSet.java +++ b/src/main/java/org/apache/gossip/examples/StandAloneNodeCrdtOrSet.java @@ -17,20 +17,17 @@ */ package org.apache.gossip.examples; -import com.codahale.metrics.MetricRegistry; - import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.net.URI; import java.util.Arrays; -import java.util.HashMap; - -import org.apache.gossip.GossipService; import org.apache.gossip.GossipSettings; -import org.apache.gossip.RemoteGossipMember; +import org.apache.gossip.RemoteMember; import org.apache.gossip.crdt.OrSet; -import org.apache.gossip.model.SharedGossipDataMessage; +import org.apache.gossip.manager.GossipManager; +import org.apache.gossip.manager.GossipManagerBuilder; +import org.apache.gossip.model.SharedDataMessage; public class StandAloneNodeCrdtOrSet { public static void main (String [] args) throws InterruptedException, IOException{ @@ -38,17 +35,22 @@ public class StandAloneNodeCrdtOrSet { s.setWindowSize(10); s.setConvictThreshold(1.0); s.setGossipInterval(10); - GossipService gossipService = new GossipService("mycluster", URI.create(args[0]), args[1], new HashMap(), - Arrays.asList( new RemoteGossipMember("mycluster", URI.create(args[2]), args[3])), s, (a,b) -> {}, new MetricRegistry()); - gossipService.start(); + GossipManager gossipService = GossipManagerBuilder.newBuilder() + .cluster("mycluster") + .uri(URI.create(args[0])) + .id(args[1]) + .gossipMembers(Arrays.asList( new RemoteMember("mycluster", URI.create(args[2]), args[3]))) + .gossipSettings(s) + .build(); + gossipService.init(); new Thread(() -> { while (true){ - System.out.println("Live: " + gossipService.getGossipManager().getLiveMembers()); - System.out.println("Dead: " + gossipService.getGossipManager().getDeadMembers()); - System.out.println("---------- " + (gossipService.getGossipManager().findCrdt("abc") == null ? "": - gossipService.getGossipManager().findCrdt("abc").value())); - System.out.println("********** " + gossipService.getGossipManager().findCrdt("abc")); + System.out.println("Live: " + gossipService.getLiveMembers()); + System.out.println("Dead: " + gossipService.getDeadMembers()); + System.out.println("---------- " + (gossipService.findCrdt("abc") == null ? "": + gossipService.findCrdt("abc").value())); + System.out.println("********** " + gossipService.findCrdt("abc")); try { Thread.sleep(2000); } catch (Exception e) {} @@ -70,22 +72,23 @@ public class StandAloneNodeCrdtOrSet { } } - private static void removeData(String val, GossipService gossipService){ - OrSet s = (OrSet) gossipService.getGossipManager().findCrdt("abc"); - SharedGossipDataMessage m = new SharedGossipDataMessage(); + private static void removeData(String val, GossipManager gossipService){ + @SuppressWarnings("unchecked") + OrSet s = (OrSet) gossipService.findCrdt("abc"); + SharedDataMessage m = new SharedDataMessage(); m.setExpireAt(Long.MAX_VALUE); m.setKey("abc"); m.setPayload(new OrSet(s , new OrSet.Builder().remove(val))); m.setTimestamp(System.currentTimeMillis()); - gossipService.getGossipManager().merge(m); + gossipService.merge(m); } - private static void addData(String val, GossipService gossipService){ - SharedGossipDataMessage m = new SharedGossipDataMessage(); + private static void addData(String val, GossipManager gossipService){ + SharedDataMessage m = new SharedDataMessage(); m.setExpireAt(Long.MAX_VALUE); m.setKey("abc"); m.setPayload(new OrSet(val)); m.setTimestamp(System.currentTimeMillis()); - gossipService.getGossipManager().merge(m); + gossipService.merge(m); } } diff --git a/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.java b/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.java index 9fea30b..b73550e 100644 --- a/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.java +++ b/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.java @@ -24,16 +24,16 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import com.codahale.metrics.Histogram; import com.codahale.metrics.MetricRegistry; -import org.apache.gossip.LocalGossipMember; +import org.apache.gossip.LocalMember; import org.apache.gossip.model.ActiveGossipOk; -import org.apache.gossip.model.GossipDataMessage; -import org.apache.gossip.model.GossipMember; +import org.apache.gossip.model.PerNodeDataMessage; +import org.apache.gossip.model.Member; import org.apache.gossip.model.Response; -import org.apache.gossip.model.SharedGossipDataMessage; +import org.apache.gossip.model.SharedDataMessage; import org.apache.gossip.model.ShutdownMessage; import org.apache.gossip.udp.UdpActiveGossipMessage; -import org.apache.gossip.udp.UdpGossipDataMessage; -import org.apache.gossip.udp.UdpSharedGossipDataMessage; +import org.apache.gossip.udp.UdpPerNodeDataMessage; +import org.apache.gossip.udp.UdpSharedDataMessage; import org.apache.log4j.Logger; import static com.codahale.metrics.MetricRegistry.name; @@ -69,7 +69,7 @@ public abstract class AbstractActiveGossiper { } - public final void sendShutdownMessage(LocalGossipMember me, LocalGossipMember target){ + public final void sendShutdownMessage(LocalMember me, LocalMember target){ if (target == null){ return; } @@ -79,13 +79,13 @@ public abstract class AbstractActiveGossiper { gossipCore.sendOneWay(m, target.getUri()); } - public final void sendSharedData(LocalGossipMember me, LocalGossipMember member){ + public final void sendSharedData(LocalMember me, LocalMember member){ if (member == null){ return; } long startTime = System.currentTimeMillis(); - for (Entry innerEntry : gossipCore.getSharedData().entrySet()){ - UdpSharedGossipDataMessage message = new UdpSharedGossipDataMessage(); + for (Entry innerEntry : gossipCore.getSharedData().entrySet()){ + UdpSharedDataMessage message = new UdpSharedDataMessage(); message.setUuid(UUID.randomUUID().toString()); message.setUriFrom(me.getId()); message.setExpireAt(innerEntry.getValue().getExpireAt()); @@ -98,14 +98,14 @@ public abstract class AbstractActiveGossiper { sharedDataHistogram.update(System.currentTimeMillis() - startTime); } - public final void sendPerNodeData(LocalGossipMember me, LocalGossipMember member){ + public final void sendPerNodeData(LocalMember me, LocalMember member){ if (member == null){ return; } long startTime = System.currentTimeMillis(); - for (Entry> entry : gossipCore.getPerNodeData().entrySet()){ - for (Entry innerEntry : entry.getValue().entrySet()){ - UdpGossipDataMessage message = new UdpGossipDataMessage(); + for (Entry> entry : gossipCore.getPerNodeData().entrySet()){ + for (Entry innerEntry : entry.getValue().entrySet()){ + UdpPerNodeDataMessage message = new UdpPerNodeDataMessage(); message.setUuid(UUID.randomUUID().toString()); message.setUriFrom(me.getId()); message.setExpireAt(innerEntry.getValue().getExpireAt()); @@ -122,7 +122,7 @@ public abstract class AbstractActiveGossiper { /** * Performs the sending of the membership list, after we have incremented our own heartbeat. */ - protected void sendMembershipList(LocalGossipMember me, LocalGossipMember member) { + protected void sendMembershipList(LocalMember me, LocalMember member) { if (member == null){ return; } @@ -132,7 +132,7 @@ public abstract class AbstractActiveGossiper { message.setUriFrom(gossipManager.getMyself().getUri().toASCIIString()); message.setUuid(UUID.randomUUID().toString()); message.getMembers().add(convert(me)); - for (LocalGossipMember other : gossipManager.getMembers().keySet()) { + for (LocalMember other : gossipManager.getMembers().keySet()) { message.getMembers().add(convert(other)); } Response r = gossipCore.send(message, member.getUri()); @@ -144,8 +144,8 @@ public abstract class AbstractActiveGossiper { sendMembershipHistorgram.update(System.currentTimeMillis() - startTime); } - protected final GossipMember convert(LocalGossipMember member){ - GossipMember gm = new GossipMember(); + protected final Member convert(LocalMember member){ + Member gm = new Member(); gm.setCluster(member.getClusterName()); gm.setHeartbeat(member.getHeartbeat()); gm.setUri(member.getUri().toASCIIString()); @@ -160,8 +160,8 @@ public abstract class AbstractActiveGossiper { * An immutable list * @return The chosen LocalGossipMember to gossip with. */ - protected LocalGossipMember selectPartner(List memberList) { - LocalGossipMember member = null; + protected LocalMember selectPartner(List memberList) { + LocalMember member = null; if (memberList.size() > 0) { int randomNeighborIndex = random.nextInt(memberList.size()); member = memberList.get(randomNeighborIndex); diff --git a/src/main/java/org/apache/gossip/manager/DataReaper.java b/src/main/java/org/apache/gossip/manager/DataReaper.java index f165239..8175a1b 100644 --- a/src/main/java/org/apache/gossip/manager/DataReaper.java +++ b/src/main/java/org/apache/gossip/manager/DataReaper.java @@ -23,8 +23,8 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import org.apache.gossip.model.GossipDataMessage; -import org.apache.gossip.model.SharedGossipDataMessage; +import org.apache.gossip.model.PerNodeDataMessage; +import org.apache.gossip.model.SharedDataMessage; /** * We wish to periodically sweep user data and remove entries past their timestamp. This @@ -53,7 +53,7 @@ public class DataReaper { } void runSharedOnce(){ - for (Entry entry : gossipCore.getSharedData().entrySet()){ + for (Entry entry : gossipCore.getSharedData().entrySet()){ if (entry.getValue().getExpireAt() < clock.currentTimeMillis()){ gossipCore.getSharedData().remove(entry.getKey(), entry.getValue()); } @@ -61,13 +61,13 @@ public class DataReaper { } void runPerNodeOnce(){ - for (Entry> node : gossipCore.getPerNodeData().entrySet()){ + for (Entry> node : gossipCore.getPerNodeData().entrySet()){ reapData(node.getValue()); } } - void reapData(ConcurrentHashMap concurrentHashMap){ - for (Entry entry : concurrentHashMap.entrySet()){ + void reapData(ConcurrentHashMap concurrentHashMap){ + for (Entry entry : concurrentHashMap.entrySet()){ if (entry.getValue().getExpireAt() < clock.currentTimeMillis()){ concurrentHashMap.remove(entry.getKey(), entry.getValue()); } diff --git a/src/main/java/org/apache/gossip/manager/DatacenterRackAwareActiveGossiper.java b/src/main/java/org/apache/gossip/manager/DatacenterRackAwareActiveGossiper.java index c66e332..2f489a2 100644 --- a/src/main/java/org/apache/gossip/manager/DatacenterRackAwareActiveGossiper.java +++ b/src/main/java/org/apache/gossip/manager/DatacenterRackAwareActiveGossiper.java @@ -27,7 +27,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import org.apache.gossip.LocalGossipMember; +import org.apache.gossip.LocalMember; import com.codahale.metrics.MetricRegistry; @@ -130,14 +130,14 @@ public class DatacenterRackAwareActiveGossiper extends AbstractActiveGossiper { sendMembershipList(gossipManager.getMyself(), selectPartner(gossipManager.getDeadMembers())); } - private List differentDataCenter(){ + private List differentDataCenter(){ String myDc = gossipManager.getMyself().getProperties().get(DATACENTER); String rack = gossipManager.getMyself().getProperties().get(RACK); if (myDc == null|| rack == null){ return Collections.emptyList(); } - List notMyDc = new ArrayList(10); - for (LocalGossipMember i : gossipManager.getLiveMembers()){ + List notMyDc = new ArrayList(10); + for (LocalMember i : gossipManager.getLiveMembers()){ if (!myDc.equals(i.getProperties().get(DATACENTER))){ notMyDc.add(i); } @@ -145,14 +145,14 @@ public class DatacenterRackAwareActiveGossiper extends AbstractActiveGossiper { return notMyDc; } - private List sameDatacenterDifferentRack(){ + private List sameDatacenterDifferentRack(){ String myDc = gossipManager.getMyself().getProperties().get(DATACENTER); String rack = gossipManager.getMyself().getProperties().get(RACK); if (myDc == null|| rack == null){ return Collections.emptyList(); } - List notMyDc = new ArrayList(10); - for (LocalGossipMember i : gossipManager.getLiveMembers()){ + List notMyDc = new ArrayList(10); + for (LocalMember i : gossipManager.getLiveMembers()){ if (myDc.equals(i.getProperties().get(DATACENTER)) && !rack.equals(i.getProperties().get(RACK))){ notMyDc.add(i); } @@ -160,14 +160,14 @@ public class DatacenterRackAwareActiveGossiper extends AbstractActiveGossiper { return notMyDc; } - private List sameRackNodes(){ + private List sameRackNodes(){ String myDc = gossipManager.getMyself().getProperties().get(DATACENTER); String rack = gossipManager.getMyself().getProperties().get(RACK); if (myDc == null|| rack == null){ return Collections.emptyList(); } - List sameDcAndRack = new ArrayList(10); - for (LocalGossipMember i : gossipManager.getLiveMembers()){ + List sameDcAndRack = new ArrayList(10); + for (LocalMember i : gossipManager.getLiveMembers()){ if (myDc.equals(i.getProperties().get(DATACENTER)) && rack.equals(i.getProperties().get(RACK))){ sameDcAndRack.add(i); @@ -177,7 +177,7 @@ public class DatacenterRackAwareActiveGossiper extends AbstractActiveGossiper { } private void sendToSameRackMember() { - LocalGossipMember i = selectPartner(sameRackNodes()); + LocalMember i = selectPartner(sameRackNodes()); sendMembershipList(gossipManager.getMyself(), i); } @@ -235,7 +235,7 @@ public class DatacenterRackAwareActiveGossiper extends AbstractActiveGossiper { * sends an optimistic shutdown message to several clusters nodes */ protected void sendShutdownMessage(){ - List l = gossipManager.getLiveMembers(); + List l = gossipManager.getLiveMembers(); int sendTo = l.size() < 3 ? 1 : l.size() / 3; for (int i = 0; i < sendTo; i++) { threadService.execute(() -> sendShutdownMessage(gossipManager.getMyself(), selectPartner(l))); diff --git a/src/main/java/org/apache/gossip/manager/GossipCore.java b/src/main/java/org/apache/gossip/manager/GossipCore.java index a24b125..e3dcb21 100644 --- a/src/main/java/org/apache/gossip/manager/GossipCore.java +++ b/src/main/java/org/apache/gossip/manager/GossipCore.java @@ -20,9 +20,9 @@ package org.apache.gossip.manager; import com.codahale.metrics.Gauge; import com.codahale.metrics.Meter; import com.codahale.metrics.MetricRegistry; -import org.apache.gossip.GossipMember; -import org.apache.gossip.LocalGossipMember; -import org.apache.gossip.RemoteGossipMember; +import org.apache.gossip.Member; +import org.apache.gossip.LocalMember; +import org.apache.gossip.RemoteMember; import org.apache.gossip.crdt.Crdt; import org.apache.gossip.event.GossipState; import org.apache.gossip.model.*; @@ -49,8 +49,8 @@ public class GossipCore implements GossipCoreConstants { private final GossipManager gossipManager; private ConcurrentHashMap requests; private ThreadPoolExecutor service; - private final ConcurrentHashMap> perNodeData; - private final ConcurrentHashMap sharedData; + private final ConcurrentHashMap> perNodeData; + private final ConcurrentHashMap sharedData; private final BlockingQueue workQueue; private final PKCS8EncodedKeySpec privKeySpec; private final PrivateKey privKey; @@ -113,14 +113,14 @@ public class GossipCore implements GossipCoreConstants { } @SuppressWarnings({ "unchecked", "rawtypes" }) - public void addSharedData(SharedGossipDataMessage message) { + public void addSharedData(SharedDataMessage message) { while (true){ - SharedGossipDataMessage previous = sharedData.putIfAbsent(message.getKey(), message); + SharedDataMessage previous = sharedData.putIfAbsent(message.getKey(), message); if (previous == null){ return; } if (message.getPayload() instanceof Crdt){ - SharedGossipDataMessage merged = new SharedGossipDataMessage(); + SharedDataMessage merged = new SharedDataMessage(); merged.setExpireAt(message.getExpireAt()); merged.setKey(message.getKey()); merged.setNodeId(message.getNodeId()); @@ -144,12 +144,12 @@ public class GossipCore implements GossipCoreConstants { } } - public void addPerNodeData(GossipDataMessage message){ - ConcurrentHashMap nodeMap = new ConcurrentHashMap<>(); + public void addPerNodeData(PerNodeDataMessage message){ + ConcurrentHashMap nodeMap = new ConcurrentHashMap<>(); nodeMap.put(message.getKey(), message); nodeMap = perNodeData.putIfAbsent(message.getNodeId(), nodeMap); if (nodeMap != null){ - GossipDataMessage current = nodeMap.get(message.getKey()); + PerNodeDataMessage current = nodeMap.get(message.getKey()); if (current == null){ nodeMap.putIfAbsent(message.getKey(), message); } else { @@ -160,11 +160,11 @@ public class GossipCore implements GossipCoreConstants { } } - public ConcurrentHashMap> getPerNodeData(){ + public ConcurrentHashMap> getPerNodeData(){ return perNodeData; } - public ConcurrentHashMap getSharedData() { + public ConcurrentHashMap getSharedData() { return sharedData; } @@ -314,12 +314,12 @@ public class GossipCore implements GossipCoreConstants { * @param remoteList * */ - public void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember, - List remoteList) { + public void mergeLists(GossipManager gossipManager, RemoteMember senderMember, + List remoteList) { if (LOGGER.isDebugEnabled()){ debugState(senderMember, remoteList); } - for (LocalGossipMember i : gossipManager.getDeadMembers()) { + for (LocalMember i : gossipManager.getDeadMembers()) { if (i.getId().equals(senderMember.getId())) { LOGGER.debug(gossipManager.getMyself() + " contacted by dead member " + senderMember.getUri()); i.recordHeartbeat(senderMember.getHeartbeat()); @@ -327,11 +327,11 @@ public class GossipCore implements GossipCoreConstants { //TODO consider forcing an UP here } } - for (GossipMember remoteMember : remoteList) { + for (Member remoteMember : remoteList) { if (remoteMember.getId().equals(gossipManager.getMyself().getId())) { continue; } - LocalGossipMember aNewMember = new LocalGossipMember(remoteMember.getClusterName(), + LocalMember aNewMember = new LocalMember(remoteMember.getClusterName(), remoteMember.getUri(), remoteMember.getId(), remoteMember.getHeartbeat(), @@ -342,7 +342,7 @@ public class GossipCore implements GossipCoreConstants { aNewMember.recordHeartbeat(remoteMember.getHeartbeat()); Object result = gossipManager.getMembers().putIfAbsent(aNewMember, GossipState.UP); if (result != null){ - for (Entry localMember : gossipManager.getMembers().entrySet()){ + for (Entry localMember : gossipManager.getMembers().entrySet()){ if (localMember.getKey().getId().equals(remoteMember.getId())){ localMember.getKey().recordHeartbeat(remoteMember.getHeartbeat()); localMember.getKey().setHeartbeat(remoteMember.getHeartbeat()); @@ -356,8 +356,8 @@ public class GossipCore implements GossipCoreConstants { } } - private void debugState(RemoteGossipMember senderMember, - List remoteList){ + private void debugState(RemoteMember senderMember, + List remoteList){ LOGGER.warn( "-----------------------\n" + "Me " + gossipManager.getMyself() + "\n" + @@ -369,13 +369,13 @@ public class GossipCore implements GossipCoreConstants { } @SuppressWarnings("rawtypes") - public Crdt merge(SharedGossipDataMessage message) { + public Crdt merge(SharedDataMessage message) { for (;;){ - SharedGossipDataMessage previous = sharedData.putIfAbsent(message.getKey(), message); + SharedDataMessage previous = sharedData.putIfAbsent(message.getKey(), message); if (previous == null){ return (Crdt) message.getPayload(); } - SharedGossipDataMessage copy = new SharedGossipDataMessage(); + SharedDataMessage copy = new SharedDataMessage(); copy.setExpireAt(message.getExpireAt()); copy.setKey(message.getKey()); copy.setNodeId(message.getNodeId()); diff --git a/src/main/java/org/apache/gossip/manager/GossipManager.java b/src/main/java/org/apache/gossip/manager/GossipManager.java index 4b28f2f..ba8517b 100644 --- a/src/main/java/org/apache/gossip/manager/GossipManager.java +++ b/src/main/java/org/apache/gossip/manager/GossipManager.java @@ -19,16 +19,16 @@ package org.apache.gossip.manager; import com.codahale.metrics.MetricRegistry; import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.gossip.GossipMember; +import org.apache.gossip.Member; import org.apache.gossip.GossipSettings; -import org.apache.gossip.LocalGossipMember; +import org.apache.gossip.LocalMember; import org.apache.gossip.crdt.Crdt; import org.apache.gossip.event.GossipListener; import org.apache.gossip.event.GossipState; import org.apache.gossip.manager.handlers.MessageInvoker; import org.apache.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread; -import org.apache.gossip.model.GossipDataMessage; -import org.apache.gossip.model.SharedGossipDataMessage; +import org.apache.gossip.model.PerNodeDataMessage; +import org.apache.gossip.model.SharedDataMessage; import org.apache.gossip.model.ShutdownMessage; import org.apache.log4j.Logger; @@ -44,13 +44,12 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; - public abstract class GossipManager { public static final Logger LOGGER = Logger.getLogger(GossipManager.class); - private final ConcurrentSkipListMap members; - private final LocalGossipMember me; + private final ConcurrentSkipListMap members; + private final LocalMember me; private final GossipSettings settings; private final AtomicBoolean gossipServiceRunning; private final GossipListener listener; @@ -70,19 +69,19 @@ public abstract class GossipManager { public GossipManager(String cluster, URI uri, String id, Map properties, GossipSettings settings, - List gossipMembers, GossipListener listener, MetricRegistry registry, + List gossipMembers, GossipListener listener, MetricRegistry registry, ObjectMapper objectMapper, MessageInvoker messageInvoker) { this.settings = settings; this.messageInvoker = messageInvoker; clock = new SystemClock(); - me = new LocalGossipMember(cluster, uri, id, clock.nanoTime(), properties, + me = new LocalMember(cluster, uri, id, clock.nanoTime(), properties, settings.getWindowSize(), settings.getMinimumSamples(), settings.getDistribution()); gossipCore = new GossipCore(this, registry); dataReaper = new DataReaper(gossipCore, clock); members = new ConcurrentSkipListMap<>(); - for (GossipMember startupMember : gossipMembers) { + for (Member startupMember : gossipMembers) { if (!startupMember.equals(me)) { - LocalGossipMember member = new LocalGossipMember(startupMember.getClusterName(), + LocalMember member = new LocalMember(startupMember.getClusterName(), startupMember.getUri(), startupMember.getId(), clock.nanoTime(), startupMember.getProperties(), settings.getWindowSize(), settings.getMinimumSamples(), settings.getDistribution()); @@ -106,7 +105,7 @@ public abstract class GossipManager { return messageInvoker; } - public ConcurrentSkipListMap getMembers() { + public ConcurrentSkipListMap getMembers() { return members; } @@ -117,7 +116,7 @@ public abstract class GossipManager { /** * @return a read only list of members found in the DOWN state. */ - public List getDeadMembers() { + public List getDeadMembers() { return Collections.unmodifiableList( members.entrySet() .stream() @@ -129,7 +128,7 @@ public abstract class GossipManager { * * @return a read only list of members found in the UP state */ - public List getLiveMembers() { + public List getLiveMembers() { return Collections.unmodifiableList( members.entrySet() .stream() @@ -137,7 +136,7 @@ public abstract class GossipManager { .map(Entry::getKey).collect(Collectors.toList())); } - public LocalGossipMember getMyself() { + public LocalMember getMyself() { return me; } @@ -164,7 +163,7 @@ public abstract class GossipManager { scheduledServiced.scheduleAtFixedRate(userDataState, 60, 60, TimeUnit.SECONDS); scheduledServiced.scheduleAtFixedRate(() -> { try { - for (Entry entry : members.entrySet()) { + for (Entry entry : members.entrySet()) { boolean userDown = processOptomisticShutdown(entry); if (userDown) continue; @@ -205,8 +204,8 @@ public abstract class GossipManager { * @param l member to consider * @return true if node forced down */ - public boolean processOptomisticShutdown(Entry l){ - GossipDataMessage m = findPerNodeGossipData(l.getKey().getId(), ShutdownMessage.PER_NODE_KEY); + public boolean processOptomisticShutdown(Entry l){ + PerNodeDataMessage m = findPerNodeGossipData(l.getKey().getId(), ShutdownMessage.PER_NODE_KEY); if (m == null){ return false; } @@ -224,8 +223,8 @@ public abstract class GossipManager { } private void readSavedRingState() { - for (LocalGossipMember l : ringState.readFromDisk()){ - LocalGossipMember member = new LocalGossipMember(l.getClusterName(), + for (LocalMember l : ringState.readFromDisk()){ + LocalMember member = new LocalMember(l.getClusterName(), l.getUri(), l.getId(), clock.nanoTime(), l.getProperties(), settings.getWindowSize(), settings.getMinimumSamples(), settings.getDistribution()); @@ -234,12 +233,12 @@ public abstract class GossipManager { } private void readSavedDataState() { - for (Entry> l : userDataState.readPerNodeFromDisk().entrySet()){ - for (Entry j : l.getValue().entrySet()){ + for (Entry> l : userDataState.readPerNodeFromDisk().entrySet()){ + for (Entry j : l.getValue().entrySet()){ gossipCore.addPerNodeData(j.getValue()); } } - for (Entry l: userDataState.readSharedDataFromDisk().entrySet()){ + for (Entry l: userDataState.readSharedDataFromDisk().entrySet()){ gossipCore.addSharedData(l.getValue()); } } @@ -276,7 +275,7 @@ public abstract class GossipManager { scheduledServiced.shutdownNow(); } - public void gossipPerNodeData(GossipDataMessage message){ + public void gossipPerNodeData(PerNodeDataMessage message){ Objects.nonNull(message.getKey()); Objects.nonNull(message.getTimestamp()); Objects.nonNull(message.getPayload()); @@ -284,7 +283,7 @@ public abstract class GossipManager { gossipCore.addPerNodeData(message); } - public void gossipSharedData(SharedGossipDataMessage message){ + public void gossipSharedData(SharedDataMessage message){ Objects.nonNull(message.getKey()); Objects.nonNull(message.getTimestamp()); Objects.nonNull(message.getPayload()); @@ -295,7 +294,7 @@ public abstract class GossipManager { @SuppressWarnings("rawtypes") public Crdt findCrdt(String key){ - SharedGossipDataMessage l = gossipCore.getSharedData().get(key); + SharedDataMessage l = gossipCore.getSharedData().get(key); if (l == null){ return null; } @@ -307,7 +306,7 @@ public abstract class GossipManager { } @SuppressWarnings("rawtypes") - public Crdt merge(SharedGossipDataMessage message){ + public Crdt merge(SharedDataMessage message){ Objects.nonNull(message.getKey()); Objects.nonNull(message.getTimestamp()); Objects.nonNull(message.getPayload()); @@ -318,12 +317,12 @@ public abstract class GossipManager { return gossipCore.merge(message); } - public GossipDataMessage findPerNodeGossipData(String nodeId, String key){ - ConcurrentHashMap j = gossipCore.getPerNodeData().get(nodeId); + public PerNodeDataMessage findPerNodeGossipData(String nodeId, String key){ + ConcurrentHashMap j = gossipCore.getPerNodeData().get(nodeId); if (j == null){ return null; } else { - GossipDataMessage l = j.get(key); + PerNodeDataMessage l = j.get(key); if (l == null){ return null; } @@ -334,8 +333,8 @@ public abstract class GossipManager { } } - public SharedGossipDataMessage findSharedGossipData(String key){ - SharedGossipDataMessage l = gossipCore.getSharedData().get(key); + public SharedDataMessage findSharedGossipData(String key){ + SharedDataMessage l = gossipCore.getSharedData().get(key); if (l == null){ return null; } diff --git a/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java b/src/main/java/org/apache/gossip/manager/GossipManagerBuilder.java similarity index 78% rename from src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java rename to src/main/java/org/apache/gossip/manager/GossipManagerBuilder.java index 3ac237a..b87045b 100644 --- a/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java +++ b/src/main/java/org/apache/gossip/manager/GossipManagerBuilder.java @@ -15,16 +15,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.gossip.manager.random; +package org.apache.gossip.manager; import com.codahale.metrics.MetricRegistry; import com.fasterxml.jackson.core.JsonGenerator.Feature; import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.gossip.GossipMember; +import org.apache.gossip.Member; import org.apache.gossip.GossipSettings; +import org.apache.gossip.StartupSettings; import org.apache.gossip.crdt.CrdtModule; import org.apache.gossip.event.GossipListener; -import org.apache.gossip.manager.GossipManager; import org.apache.gossip.manager.handlers.DefaultMessageInvoker; import org.apache.gossip.manager.handlers.MessageInvoker; @@ -34,7 +34,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -public class RandomGossipManager extends GossipManager { +public class GossipManagerBuilder { public static ManagerBuilder newBuilder() { return new ManagerBuilder(); @@ -45,7 +45,7 @@ public class RandomGossipManager extends GossipManager { private URI uri; private String id; private GossipSettings settings; - private List gossipMembers; + private List gossipMembers; private GossipListener listener; private MetricRegistry registry; private Map properties; @@ -70,17 +70,26 @@ public class RandomGossipManager extends GossipManager { return this; } - public ManagerBuilder withId(String id) { + public ManagerBuilder id(String id) { this.id = id; return this; } - public ManagerBuilder settings(GossipSettings settings) { + public ManagerBuilder gossipSettings(GossipSettings settings) { this.settings = settings; return this; } + + public ManagerBuilder startupSettings(StartupSettings startupSettings) { + this.cluster = startupSettings.getCluster(); + this.id = startupSettings.getId(); + this.settings = startupSettings.getGossipSettings(); + this.gossipMembers = startupSettings.getGossipMembers(); + this.uri = startupSettings.getUri(); + return this; + } - public ManagerBuilder gossipMembers(List members) { + public ManagerBuilder gossipMembers(List members) { this.gossipMembers = members; return this; } @@ -110,12 +119,14 @@ public class RandomGossipManager extends GossipManager { return this; } - public RandomGossipManager build() { + public GossipManager build() { checkArgument(id != null, "You must specify an id"); checkArgument(cluster != null, "You must specify a cluster name"); checkArgument(settings != null, "You must specify gossip settings"); checkArgument(uri != null, "You must specify a uri"); - checkArgument(registry != null, "You must specify a MetricRegistry"); + if (registry == null){ + registry = new MetricRegistry(); + } if (properties == null){ properties = new HashMap(); } @@ -133,13 +144,9 @@ public class RandomGossipManager extends GossipManager { } if (messageInvoker == null) { messageInvoker = new DefaultMessageInvoker(); - } - return new RandomGossipManager(cluster, uri, id, properties, settings, gossipMembers, listener, registry, objectMapper, messageInvoker); + } + return new GossipManager(cluster, uri, id, properties, settings, gossipMembers, listener, registry, objectMapper, messageInvoker) {} ; } } - private RandomGossipManager(String cluster, URI uri, String id, Map properties, GossipSettings settings, - List gossipMembers, GossipListener listener, MetricRegistry registry, ObjectMapper objectMapper, MessageInvoker messageInvoker) { - super(cluster, uri, id, properties, settings, gossipMembers, listener, registry, objectMapper, messageInvoker); - } } diff --git a/src/main/java/org/apache/gossip/manager/RingStatePersister.java b/src/main/java/org/apache/gossip/manager/RingStatePersister.java index 6f724e0..7e42562 100644 --- a/src/main/java/org/apache/gossip/manager/RingStatePersister.java +++ b/src/main/java/org/apache/gossip/manager/RingStatePersister.java @@ -26,7 +26,7 @@ import java.util.Collections; import java.util.List; import java.util.NavigableSet; -import org.apache.gossip.LocalGossipMember; +import org.apache.gossip.LocalMember; import org.apache.log4j.Logger; public class RingStatePersister implements Runnable { @@ -52,7 +52,7 @@ public class RingStatePersister implements Runnable { if (!parent.getSettings().isPersistRingState()){ return; } - NavigableSet i = parent.getMembers().keySet(); + NavigableSet i = parent.getMembers().keySet(); try (FileOutputStream fos = new FileOutputStream(computeTarget())){ parent.getObjectMapper().writeValue(fos, i); } catch (IOException e) { @@ -61,7 +61,7 @@ public class RingStatePersister implements Runnable { } @SuppressWarnings("unchecked") - List readFromDisk(){ + List readFromDisk(){ if (!parent.getSettings().isPersistRingState()){ return Collections.emptyList(); } diff --git a/src/main/java/org/apache/gossip/manager/SimpleActiveGossipper.java b/src/main/java/org/apache/gossip/manager/SimpleActiveGossipper.java index 839d796..e47fe2a 100644 --- a/src/main/java/org/apache/gossip/manager/SimpleActiveGossipper.java +++ b/src/main/java/org/apache/gossip/manager/SimpleActiveGossipper.java @@ -25,7 +25,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import org.apache.gossip.LocalGossipMember; +import org.apache.gossip.LocalMember; import com.codahale.metrics.MetricRegistry; @@ -88,12 +88,12 @@ public class SimpleActiveGossipper extends AbstractActiveGossiper { } protected void sendToALiveMember(){ - LocalGossipMember member = selectPartner(gossipManager.getLiveMembers()); + LocalMember member = selectPartner(gossipManager.getLiveMembers()); sendMembershipList(gossipManager.getMyself(), member); } protected void sendToDeadMember(){ - LocalGossipMember member = selectPartner(gossipManager.getDeadMembers()); + LocalMember member = selectPartner(gossipManager.getDeadMembers()); sendMembershipList(gossipManager.getMyself(), member); } @@ -101,7 +101,7 @@ public class SimpleActiveGossipper extends AbstractActiveGossiper { * sends an optimistic shutdown message to several clusters nodes */ protected void sendShutdownMessage(){ - List l = gossipManager.getLiveMembers(); + List l = gossipManager.getLiveMembers(); int sendTo = l.size() < 3 ? 1 : l.size() / 2; for (int i = 0; i < sendTo; i++) { threadService.execute(() -> sendShutdownMessage(gossipManager.getMyself(), selectPartner(l))); diff --git a/src/main/java/org/apache/gossip/manager/UserDataPersister.java b/src/main/java/org/apache/gossip/manager/UserDataPersister.java index 4a8a415..3b9eafa 100644 --- a/src/main/java/org/apache/gossip/manager/UserDataPersister.java +++ b/src/main/java/org/apache/gossip/manager/UserDataPersister.java @@ -23,8 +23,8 @@ import java.io.FileOutputStream; import java.io.IOException; import java.util.concurrent.ConcurrentHashMap; -import org.apache.gossip.model.GossipDataMessage; -import org.apache.gossip.model.SharedGossipDataMessage; +import org.apache.gossip.model.PerNodeDataMessage; +import org.apache.gossip.model.SharedDataMessage; import org.apache.log4j.Logger; public class UserDataPersister implements Runnable { @@ -49,16 +49,16 @@ public class UserDataPersister implements Runnable { } @SuppressWarnings("unchecked") - ConcurrentHashMap> readPerNodeFromDisk(){ + ConcurrentHashMap> readPerNodeFromDisk(){ if (!parent.getSettings().isPersistDataState()){ - return new ConcurrentHashMap>(); + return new ConcurrentHashMap>(); } try (FileInputStream fos = new FileInputStream(computePerNodeTarget())){ return parent.getObjectMapper().readValue(fos, ConcurrentHashMap.class); } catch (IOException e) { LOGGER.debug(e); } - return new ConcurrentHashMap>(); + return new ConcurrentHashMap>(); } void writePerNodeToDisk(){ @@ -84,16 +84,16 @@ public class UserDataPersister implements Runnable { } @SuppressWarnings("unchecked") - ConcurrentHashMap readSharedDataFromDisk(){ + ConcurrentHashMap readSharedDataFromDisk(){ if (!parent.getSettings().isPersistRingState()){ - return new ConcurrentHashMap(); + return new ConcurrentHashMap(); } try (FileInputStream fos = new FileInputStream(computeSharedTarget())){ return parent.getObjectMapper().readValue(fos, ConcurrentHashMap.class); } catch (IOException e) { LOGGER.debug(e); } - return new ConcurrentHashMap(); + return new ConcurrentHashMap(); } /** diff --git a/src/main/java/org/apache/gossip/manager/handlers/ActiveGossipMessageHandler.java b/src/main/java/org/apache/gossip/manager/handlers/ActiveGossipMessageHandler.java index 54aa40c..f5e568e 100644 --- a/src/main/java/org/apache/gossip/manager/handlers/ActiveGossipMessageHandler.java +++ b/src/main/java/org/apache/gossip/manager/handlers/ActiveGossipMessageHandler.java @@ -17,8 +17,8 @@ */ package org.apache.gossip.manager.handlers; -import org.apache.gossip.GossipMember; -import org.apache.gossip.RemoteGossipMember; +import org.apache.gossip.Member; +import org.apache.gossip.RemoteMember; import org.apache.gossip.manager.GossipCore; import org.apache.gossip.manager.GossipManager; import org.apache.gossip.model.Base; @@ -34,8 +34,8 @@ import java.util.List; public class ActiveGossipMessageHandler implements MessageHandler { @Override public void invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) { - List remoteGossipMembers = new ArrayList<>(); - RemoteGossipMember senderMember = null; + List remoteGossipMembers = new ArrayList<>(); + RemoteMember senderMember = null; UdpActiveGossipMessage activeGossipMessage = (UdpActiveGossipMessage) base; for (int i = 0; i < activeGossipMessage.getMembers().size(); i++) { URI u; @@ -45,7 +45,7 @@ public class ActiveGossipMessageHandler implements MessageHandler { GossipCore.LOGGER.debug("Gossip message with faulty URI", e); continue; } - RemoteGossipMember member = new RemoteGossipMember( + RemoteMember member = new RemoteMember( activeGossipMessage.getMembers().get(i).getCluster(), u, activeGossipMessage.getMembers().get(i).getId(), diff --git a/src/main/java/org/apache/gossip/manager/handlers/DefaultMessageInvoker.java b/src/main/java/org/apache/gossip/manager/handlers/DefaultMessageInvoker.java index 034691d..5b78ce3 100644 --- a/src/main/java/org/apache/gossip/manager/handlers/DefaultMessageInvoker.java +++ b/src/main/java/org/apache/gossip/manager/handlers/DefaultMessageInvoker.java @@ -29,8 +29,8 @@ public class DefaultMessageInvoker implements MessageInvoker { mic = new MessageInvokerCombiner(); mic.add(new SimpleMessageInvoker(Response.class, new ResponseHandler())); mic.add(new SimpleMessageInvoker(ShutdownMessage.class, new ShutdownMessageHandler())); - mic.add(new SimpleMessageInvoker(GossipDataMessage.class, new GossipDataMessageHandler())); - mic.add(new SimpleMessageInvoker(SharedGossipDataMessage.class, new SharedGossipDataMessageHandler())); + mic.add(new SimpleMessageInvoker(PerNodeDataMessage.class, new PerNodeDataMessageHandler())); + mic.add(new SimpleMessageInvoker(SharedDataMessage.class, new SharedDataMessageHandler())); mic.add(new SimpleMessageInvoker(ActiveGossipMessage.class, new ActiveGossipMessageHandler())); } diff --git a/src/main/java/org/apache/gossip/manager/handlers/GossipDataMessageHandler.java b/src/main/java/org/apache/gossip/manager/handlers/PerNodeDataMessageHandler.java similarity index 85% rename from src/main/java/org/apache/gossip/manager/handlers/GossipDataMessageHandler.java rename to src/main/java/org/apache/gossip/manager/handlers/PerNodeDataMessageHandler.java index edf2579..b3a785e 100644 --- a/src/main/java/org/apache/gossip/manager/handlers/GossipDataMessageHandler.java +++ b/src/main/java/org/apache/gossip/manager/handlers/PerNodeDataMessageHandler.java @@ -20,12 +20,12 @@ package org.apache.gossip.manager.handlers; import org.apache.gossip.manager.GossipCore; import org.apache.gossip.manager.GossipManager; import org.apache.gossip.model.Base; -import org.apache.gossip.udp.UdpGossipDataMessage; +import org.apache.gossip.udp.UdpPerNodeDataMessage; -public class GossipDataMessageHandler implements MessageHandler { +public class PerNodeDataMessageHandler implements MessageHandler { @Override public void invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) { - UdpGossipDataMessage message = (UdpGossipDataMessage) base; + UdpPerNodeDataMessage message = (UdpPerNodeDataMessage) base; gossipCore.addPerNodeData(message); } } diff --git a/src/main/java/org/apache/gossip/manager/handlers/SharedGossipDataMessageHandler.java b/src/main/java/org/apache/gossip/manager/handlers/SharedDataMessageHandler.java similarity index 84% rename from src/main/java/org/apache/gossip/manager/handlers/SharedGossipDataMessageHandler.java rename to src/main/java/org/apache/gossip/manager/handlers/SharedDataMessageHandler.java index e9d5343..89ca4a0 100644 --- a/src/main/java/org/apache/gossip/manager/handlers/SharedGossipDataMessageHandler.java +++ b/src/main/java/org/apache/gossip/manager/handlers/SharedDataMessageHandler.java @@ -20,12 +20,12 @@ package org.apache.gossip.manager.handlers; import org.apache.gossip.manager.GossipCore; import org.apache.gossip.manager.GossipManager; import org.apache.gossip.model.Base; -import org.apache.gossip.udp.UdpSharedGossipDataMessage; +import org.apache.gossip.udp.UdpSharedDataMessage; -public class SharedGossipDataMessageHandler implements MessageHandler{ +public class SharedDataMessageHandler implements MessageHandler{ @Override public void invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) { - UdpSharedGossipDataMessage message = (UdpSharedGossipDataMessage) base; + UdpSharedDataMessage message = (UdpSharedDataMessage) base; gossipCore.addSharedData(message); } } diff --git a/src/main/java/org/apache/gossip/manager/handlers/ShutdownMessageHandler.java b/src/main/java/org/apache/gossip/manager/handlers/ShutdownMessageHandler.java index c4adea2..a40c7a1 100644 --- a/src/main/java/org/apache/gossip/manager/handlers/ShutdownMessageHandler.java +++ b/src/main/java/org/apache/gossip/manager/handlers/ShutdownMessageHandler.java @@ -20,14 +20,14 @@ package org.apache.gossip.manager.handlers; import org.apache.gossip.manager.GossipCore; import org.apache.gossip.manager.GossipManager; import org.apache.gossip.model.Base; -import org.apache.gossip.model.GossipDataMessage; +import org.apache.gossip.model.PerNodeDataMessage; import org.apache.gossip.model.ShutdownMessage; public class ShutdownMessageHandler implements MessageHandler { @Override public void invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) { ShutdownMessage s = (ShutdownMessage) base; - GossipDataMessage m = new GossipDataMessage(); + PerNodeDataMessage m = new PerNodeDataMessage(); m.setKey(ShutdownMessage.PER_NODE_KEY); m.setNodeId(s.getNodeId()); m.setPayload(base); diff --git a/src/main/java/org/apache/gossip/model/ActiveGossipMessage.java b/src/main/java/org/apache/gossip/model/ActiveGossipMessage.java index 438f42a..a3c45b8 100644 --- a/src/main/java/org/apache/gossip/model/ActiveGossipMessage.java +++ b/src/main/java/org/apache/gossip/model/ActiveGossipMessage.java @@ -22,17 +22,17 @@ import java.util.List; public class ActiveGossipMessage extends Base { - private List members = new ArrayList<>(); + private List members = new ArrayList<>(); public ActiveGossipMessage(){ } - public List getMembers() { + public List getMembers() { return members; } - public void setMembers(List members) { + public void setMembers(List members) { this.members = members; } diff --git a/src/main/java/org/apache/gossip/model/Base.java b/src/main/java/org/apache/gossip/model/Base.java index 4551f2a..1b66310 100644 --- a/src/main/java/org/apache/gossip/model/Base.java +++ b/src/main/java/org/apache/gossip/model/Base.java @@ -19,9 +19,9 @@ package org.apache.gossip.model; import org.apache.gossip.udp.UdpActiveGossipMessage; import org.apache.gossip.udp.UdpActiveGossipOk; -import org.apache.gossip.udp.UdpGossipDataMessage; +import org.apache.gossip.udp.UdpPerNodeDataMessage; import org.apache.gossip.udp.UdpNotAMemberFault; -import org.apache.gossip.udp.UdpSharedGossipDataMessage; +import org.apache.gossip.udp.UdpSharedDataMessage; import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.fasterxml.jackson.annotation.JsonSubTypes; @@ -39,10 +39,10 @@ import com.fasterxml.jackson.annotation.JsonSubTypes.Type; @Type(value = UdpActiveGossipOk.class, name = "UdpActiveGossipOk"), @Type(value = UdpActiveGossipMessage.class, name = "UdpActiveGossipMessage"), @Type(value = UdpNotAMemberFault.class, name = "UdpNotAMemberFault"), - @Type(value = GossipDataMessage.class, name = "GossipDataMessage"), - @Type(value = UdpGossipDataMessage.class, name = "UdpGossipDataMessage"), - @Type(value = SharedGossipDataMessage.class, name = "SharedGossipDataMessage"), - @Type(value = UdpSharedGossipDataMessage.class, name = "UdpSharedGossipDataMessage") + @Type(value = PerNodeDataMessage.class, name = "PerNodeDataMessage"), + @Type(value = UdpPerNodeDataMessage.class, name = "UdpPerNodeDataMessage"), + @Type(value = SharedDataMessage.class, name = "SharedDataMessage"), + @Type(value = UdpSharedDataMessage.class, name = "UdpSharedDataMessage") }) public class Base { diff --git a/src/main/java/org/apache/gossip/model/GossipMember.java b/src/main/java/org/apache/gossip/model/Member.java similarity index 89% rename from src/main/java/org/apache/gossip/model/GossipMember.java rename to src/main/java/org/apache/gossip/model/Member.java index a318776..d86aad8 100644 --- a/src/main/java/org/apache/gossip/model/GossipMember.java +++ b/src/main/java/org/apache/gossip/model/Member.java @@ -19,7 +19,7 @@ package org.apache.gossip.model; import java.util.Map; -public class GossipMember { +public class Member { private String cluster; private String uri; @@ -27,11 +27,11 @@ public class GossipMember { private Long heartbeat; private Map properties; - public GossipMember(){ + public Member(){ } - public GossipMember(String cluster, String uri, String id, Long heartbeat){ + public Member(String cluster, String uri, String id, Long heartbeat){ this.cluster = cluster; this.uri = uri; this.id = id; @@ -80,7 +80,7 @@ public class GossipMember { @Override public String toString() { - return "GossipMember [cluster=" + cluster + ", uri=" + uri + ", id=" + id + ", heartbeat=" + return "Member [cluster=" + cluster + ", uri=" + uri + ", id=" + id + ", heartbeat=" + heartbeat + ", properties=" + properties + "]"; } diff --git a/src/main/java/org/apache/gossip/model/GossipDataMessage.java b/src/main/java/org/apache/gossip/model/PerNodeDataMessage.java similarity index 97% rename from src/main/java/org/apache/gossip/model/GossipDataMessage.java rename to src/main/java/org/apache/gossip/model/PerNodeDataMessage.java index e9aae61..2d1cdef 100644 --- a/src/main/java/org/apache/gossip/model/GossipDataMessage.java +++ b/src/main/java/org/apache/gossip/model/PerNodeDataMessage.java @@ -17,7 +17,7 @@ */ package org.apache.gossip.model; -public class GossipDataMessage extends Base { +public class PerNodeDataMessage extends Base { private String nodeId; private String key; diff --git a/src/main/java/org/apache/gossip/model/SharedGossipDataMessage.java b/src/main/java/org/apache/gossip/model/SharedDataMessage.java similarity index 97% rename from src/main/java/org/apache/gossip/model/SharedGossipDataMessage.java rename to src/main/java/org/apache/gossip/model/SharedDataMessage.java index dc80f05..e423be8 100644 --- a/src/main/java/org/apache/gossip/model/SharedGossipDataMessage.java +++ b/src/main/java/org/apache/gossip/model/SharedDataMessage.java @@ -17,7 +17,7 @@ */ package org.apache.gossip.model; -public class SharedGossipDataMessage extends Base { +public class SharedDataMessage extends Base { private String nodeId; private String key; diff --git a/src/main/java/org/apache/gossip/udp/UdpGossipDataMessage.java b/src/main/java/org/apache/gossip/udp/UdpPerNodeDataMessage.java similarity index 90% rename from src/main/java/org/apache/gossip/udp/UdpGossipDataMessage.java rename to src/main/java/org/apache/gossip/udp/UdpPerNodeDataMessage.java index f2042a5..6eb170a 100644 --- a/src/main/java/org/apache/gossip/udp/UdpGossipDataMessage.java +++ b/src/main/java/org/apache/gossip/udp/UdpPerNodeDataMessage.java @@ -17,9 +17,9 @@ */ package org.apache.gossip.udp; -import org.apache.gossip.model.GossipDataMessage; +import org.apache.gossip.model.PerNodeDataMessage; -public class UdpGossipDataMessage extends GossipDataMessage implements Trackable { +public class UdpPerNodeDataMessage extends PerNodeDataMessage implements Trackable { private String uriFrom; private String uuid; diff --git a/src/main/java/org/apache/gossip/udp/UdpSharedGossipDataMessage.java b/src/main/java/org/apache/gossip/udp/UdpSharedDataMessage.java similarity index 90% rename from src/main/java/org/apache/gossip/udp/UdpSharedGossipDataMessage.java rename to src/main/java/org/apache/gossip/udp/UdpSharedDataMessage.java index 6020328..1658503 100644 --- a/src/main/java/org/apache/gossip/udp/UdpSharedGossipDataMessage.java +++ b/src/main/java/org/apache/gossip/udp/UdpSharedDataMessage.java @@ -17,9 +17,9 @@ */ package org.apache.gossip.udp; -import org.apache.gossip.model.SharedGossipDataMessage; +import org.apache.gossip.model.SharedDataMessage; -public class UdpSharedGossipDataMessage extends SharedGossipDataMessage implements Trackable { +public class UdpSharedDataMessage extends SharedDataMessage implements Trackable { private String uriFrom; private String uuid; diff --git a/src/test/java/org/apache/gossip/AbstractIntegrationBase.java b/src/test/java/org/apache/gossip/AbstractIntegrationBase.java new file mode 100644 index 0000000..896157f --- /dev/null +++ b/src/test/java/org/apache/gossip/AbstractIntegrationBase.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gossip; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.gossip.manager.GossipManager; +import org.junit.After; +import org.junit.Before; + +public abstract class AbstractIntegrationBase { + + List nodes = new ArrayList(); + + public void register(GossipManager manager){ + nodes.add(manager); + } + + @Before + public void before(){ + nodes = new ArrayList(); + } + + @After + public void after(){ + for (GossipManager node: nodes){ + if (node !=null){ + node.shutdown(); + } + } + } + +} diff --git a/src/test/java/org/apache/gossip/DataTest.java b/src/test/java/org/apache/gossip/DataTest.java index 3892e9b..147702d 100644 --- a/src/test/java/org/apache/gossip/DataTest.java +++ b/src/test/java/org/apache/gossip/DataTest.java @@ -17,26 +17,26 @@ */ package org.apache.gossip; -import com.codahale.metrics.MetricRegistry; import java.net.URI; import java.net.URISyntaxException; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Arrays; -import java.util.HashMap; import java.util.List; import java.util.UUID; import java.util.concurrent.TimeUnit; import org.apache.gossip.crdt.GrowOnlySet; import org.apache.gossip.crdt.OrSet; -import org.apache.gossip.model.GossipDataMessage; -import org.apache.gossip.model.SharedGossipDataMessage; +import org.apache.gossip.manager.GossipManager; +import org.apache.gossip.manager.GossipManagerBuilder; +import org.apache.gossip.model.PerNodeDataMessage; +import org.apache.gossip.model.SharedDataMessage; import org.junit.Test; import io.teknek.tunit.TUnit; -public class DataTest { +public class DataTest extends AbstractIntegrationBase { private String orSetKey = "cror"; @@ -47,25 +47,25 @@ public class DataTest { settings.setPersistDataState(false); String cluster = UUID.randomUUID().toString(); int seedNodes = 1; - List startupMembers = new ArrayList<>(); + List startupMembers = new ArrayList<>(); for (int i = 1; i < seedNodes+1; ++i) { URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i)); - startupMembers.add(new RemoteGossipMember(cluster, uri, i + "")); + startupMembers.add(new RemoteMember(cluster, uri, i + "")); } - final List clients = new ArrayList<>(); + final List clients = new ArrayList<>(); final int clusterMembers = 2; - for (int i = 1; i < clusterMembers+1; ++i) { + for (int i = 1; i < clusterMembers + 1; ++i) { URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i)); - GossipService gossipService = new GossipService(cluster, uri, i + "", - new HashMap(), startupMembers, settings, - (a,b) -> {}, new MetricRegistry()); + GossipManager gossipService = GossipManagerBuilder.newBuilder().cluster(cluster).uri(uri) + .id(i + "").gossipMembers(startupMembers).gossipSettings(settings).build(); clients.add(gossipService); - gossipService.start(); + gossipService.init(); + register(gossipService); } TUnit.assertThat(() -> { int total = 0; for (int i = 0; i < clusterMembers; ++i) { - total += clients.get(i).getGossipManager().getLiveMembers().size(); + total += clients.get(i).getLiveMembers().size(); } return total; }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(2); @@ -73,7 +73,7 @@ public class DataTest { clients.get(0).gossipSharedData(sharedMsg()); TUnit.assertThat(()-> { - GossipDataMessage x = clients.get(1).findPerNodeData(1 + "", "a"); + PerNodeDataMessage x = clients.get(1).findPerNodeGossipData(1 + "", "a"); if (x == null) return ""; else @@ -81,7 +81,7 @@ public class DataTest { }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("b"); TUnit.assertThat(() -> { - SharedGossipDataMessage x = clients.get(1).findSharedData("a"); + SharedDataMessage x = clients.get(1).findSharedGossipData("a"); if (x == null) return ""; else @@ -95,71 +95,67 @@ public class DataTest { assertThatOrSetIsMerged(clients); dropIt(clients); assertThatOrSetDelIsMerged(clients); - - for (int i = 0; i < clusterMembers; ++i) { - clients.get(i).shutdown(); - } } - private void givenOrs(List clients) { + private void givenOrs(List clients) { { - SharedGossipDataMessage d = new SharedGossipDataMessage(); + SharedDataMessage d = new SharedDataMessage(); d.setKey(orSetKey); d.setPayload(new OrSet("1", "2")); d.setExpireAt(Long.MAX_VALUE); d.setTimestamp(System.currentTimeMillis()); - clients.get(0).getGossipManager().merge(d); + clients.get(0).merge(d); } { - SharedGossipDataMessage d = new SharedGossipDataMessage(); + SharedDataMessage d = new SharedDataMessage(); d.setKey(orSetKey); d.setPayload(new OrSet("3", "4")); d.setExpireAt(Long.MAX_VALUE); d.setTimestamp(System.currentTimeMillis()); - clients.get(1).getGossipManager().merge(d); + clients.get(1).merge(d); } } - private void dropIt(List clients) { + private void dropIt(List clients) { @SuppressWarnings("unchecked") - OrSet o = (OrSet) clients.get(0).getGossipManager().findCrdt(orSetKey); + OrSet o = (OrSet) clients.get(0).findCrdt(orSetKey); OrSet o2 = new OrSet(o, new OrSet.Builder().remove("3")); - SharedGossipDataMessage d = new SharedGossipDataMessage(); + SharedDataMessage d = new SharedDataMessage(); d.setKey(orSetKey); d.setPayload(o2); d.setExpireAt(Long.MAX_VALUE); d.setTimestamp(System.currentTimeMillis()); - clients.get(0).getGossipManager().merge(d); + clients.get(0).merge(d); } - private void assertThatOrSetIsMerged(final List clients){ + private void assertThatOrSetIsMerged(final List clients){ TUnit.assertThat(() -> { - return clients.get(0).getGossipManager().findCrdt(orSetKey).value(); + return clients.get(0).findCrdt(orSetKey).value(); }).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(new OrSet("1", "2", "3", "4").value()); TUnit.assertThat(() -> { - return clients.get(1).getGossipManager().findCrdt(orSetKey).value(); + return clients.get(1).findCrdt(orSetKey).value(); }).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(new OrSet("1", "2", "3", "4").value()); } - private void assertThatOrSetDelIsMerged(final List clients){ + private void assertThatOrSetDelIsMerged(final List clients){ TUnit.assertThat(() -> { - return clients.get(0).getGossipManager().findCrdt(orSetKey); + return clients.get(0).findCrdt(orSetKey); }).afterWaitingAtMost(10, TimeUnit.SECONDS).equals(new OrSet("1", "2", "4")); } - private void givenDifferentDatumsInSet(final List clients){ - clients.get(0).getGossipManager().merge(CrdtMessage("1")); - clients.get(1).getGossipManager().merge(CrdtMessage("2")); + private void givenDifferentDatumsInSet(final List clients){ + clients.get(0).merge(CrdtMessage("1")); + clients.get(1).merge(CrdtMessage("2")); } - private void assertThatListIsMerged(final List clients){ + private void assertThatListIsMerged(final List clients){ TUnit.assertThat(() -> { - return clients.get(0).getGossipManager().findCrdt("cr"); + return clients.get(0).findCrdt("cr"); }).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(new GrowOnlySet(Arrays.asList("1","2"))); } - private SharedGossipDataMessage CrdtMessage(String item){ - SharedGossipDataMessage d = new SharedGossipDataMessage(); + private SharedDataMessage CrdtMessage(String item){ + SharedDataMessage d = new SharedDataMessage(); d.setKey("cr"); d.setPayload(new GrowOnlySet( Arrays.asList(item))); d.setExpireAt(Long.MAX_VALUE); @@ -167,8 +163,8 @@ public class DataTest { return d; } - private GossipDataMessage msg(){ - GossipDataMessage g = new GossipDataMessage(); + private PerNodeDataMessage msg(){ + PerNodeDataMessage g = new PerNodeDataMessage(); g.setExpireAt(Long.MAX_VALUE); g.setKey("a"); g.setPayload("b"); @@ -176,8 +172,8 @@ public class DataTest { return g; } - private SharedGossipDataMessage sharedMsg(){ - SharedGossipDataMessage g = new SharedGossipDataMessage(); + private SharedDataMessage sharedMsg(){ + SharedDataMessage g = new SharedDataMessage(); g.setExpireAt(Long.MAX_VALUE); g.setKey("a"); g.setPayload("c"); diff --git a/src/test/java/org/apache/gossip/IdAndPropertyTest.java b/src/test/java/org/apache/gossip/IdAndPropertyTest.java index 1eb0aee..7c02d2d 100644 --- a/src/test/java/org/apache/gossip/IdAndPropertyTest.java +++ b/src/test/java/org/apache/gossip/IdAndPropertyTest.java @@ -28,43 +28,54 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.gossip.manager.DatacenterRackAwareActiveGossiper; +import org.apache.gossip.manager.GossipManager; +import org.apache.gossip.manager.GossipManagerBuilder; import org.junit.jupiter.api.Test; import org.junit.platform.runner.JUnitPlatform; import org.junit.runner.RunWith; -import com.codahale.metrics.MetricRegistry; - import io.teknek.tunit.TUnit; @RunWith(JUnitPlatform.class) -public class IdAndPropertyTest { +public class IdAndPropertyTest extends AbstractIntegrationBase { @Test - public void testDatacenterRackGossiper() throws URISyntaxException, UnknownHostException, InterruptedException{ + public void testDatacenterRackGossiper() throws URISyntaxException, UnknownHostException, InterruptedException { GossipSettings settings = new GossipSettings(); settings.setActiveGossipClass(DatacenterRackAwareActiveGossiper.class.getName()); - List startupMembers = new ArrayList<>(); + List startupMembers = new ArrayList<>(); Map x = new HashMap<>(); x.put("a", "b"); x.put("datacenter", "dc1"); x.put("rack", "rack1"); - GossipService gossipService1 = new GossipService("a", new URI("udp://" + "127.0.0.1" + ":" + (29000 + 0)), "0", x, startupMembers, settings, - (a, b) -> {}, new MetricRegistry()); - gossipService1.start(); + GossipManager gossipService1 = GossipManagerBuilder.newBuilder() + .cluster("a") + .uri(new URI("udp://" + "127.0.0.1" + ":" + (29000 + 0))) + .id("0") + .properties(x) + .gossipMembers(startupMembers) + .gossipSettings(settings).build(); + gossipService1.init(); + register(gossipService1); Map y = new HashMap<>(); y.put("a", "c"); y.put("datacenter", "dc2"); y.put("rack", "rack2"); - GossipService gossipService2 = new GossipService("a", new URI("udp://" + "127.0.0.1" + ":" + (29000 + 1)), "1", y, - Arrays.asList(new RemoteGossipMember("a", - new URI("udp://" + "127.0.0.1" + ":" + (29000 + 0)), "0")), - settings, (a, b) -> { }, new MetricRegistry()); - gossipService2.start(); + GossipManager gossipService2 = GossipManagerBuilder.newBuilder().cluster("a") + .uri( new URI("udp://" + "127.0.0.1" + ":" + (29000 + 1))) + .id("1") + .properties(y) + .gossipMembers(Arrays.asList(new RemoteMember("a", + new URI("udp://" + "127.0.0.1" + ":" + (29000 + 0)), "0"))) + .gossipSettings(settings).build(); + gossipService2.init(); + register(gossipService2); + TUnit.assertThat(() -> { String value = ""; try { - value = gossipService1.getGossipManager().getLiveMembers().get(0).getProperties().get("a"); + value = gossipService1.getLiveMembers().get(0).getProperties().get("a"); } catch (RuntimeException e){ } return value; }).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo("c"); @@ -72,12 +83,9 @@ public class IdAndPropertyTest { TUnit.assertThat(() -> { String value = ""; try { - value = gossipService2.getGossipManager().getLiveMembers().get(0).getProperties().get("a"); + value = gossipService2.getLiveMembers().get(0).getProperties().get("a"); } catch (RuntimeException e){ } return value; - }).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo("b"); - gossipService1.shutdown(); - gossipService2.shutdown(); - + }).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo("b"); } } diff --git a/src/test/java/org/apache/gossip/GossipMemberTest.java b/src/test/java/org/apache/gossip/MemberTest.java similarity index 79% rename from src/test/java/org/apache/gossip/GossipMemberTest.java rename to src/test/java/org/apache/gossip/MemberTest.java index 272c7fb..5f0d18a 100644 --- a/src/test/java/org/apache/gossip/GossipMemberTest.java +++ b/src/test/java/org/apache/gossip/MemberTest.java @@ -27,14 +27,14 @@ import org.junit.platform.runner.JUnitPlatform; import org.junit.runner.RunWith; @RunWith(JUnitPlatform.class) -public class GossipMemberTest { +public class MemberTest { @Test public void testHashCodeFromGossip40() throws URISyntaxException { Assert.assertNotEquals( - new LocalGossipMember("mycluster", new URI("udp://4.4.4.4:1000"), "myid", 1, new HashMap(), 10, 5, "exponential") + new LocalMember("mycluster", new URI("udp://4.4.4.4:1000"), "myid", 1, new HashMap(), 10, 5, "exponential") .hashCode(), - new LocalGossipMember("mycluster", new URI("udp://4.4.4.5:1005"), "yourid", 11, new HashMap(), 11, 6, "exponential") + new LocalMember("mycluster", new URI("udp://4.4.4.5:1005"), "yourid", 11, new HashMap(), 11, 6, "exponential") .hashCode()); } } diff --git a/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java b/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java index 6a0765b..48fb2cb 100644 --- a/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java +++ b/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java @@ -17,7 +17,6 @@ */ package org.apache.gossip; -import com.codahale.metrics.MetricRegistry; import io.teknek.tunit.TUnit; import java.net.URI; @@ -25,13 +24,14 @@ import java.net.URISyntaxException; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Random; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; +import org.apache.gossip.manager.GossipManager; +import org.apache.gossip.manager.GossipManagerBuilder; import org.apache.log4j.Logger; import org.junit.platform.runner.JUnitPlatform; @@ -52,25 +52,31 @@ public class ShutdownDeadtimeTest { settings.setPersistDataState(false); String cluster = UUID.randomUUID().toString(); int seedNodes = 3; - List startupMembers = new ArrayList<>(); + List startupMembers = new ArrayList<>(); for (int i = 1; i < seedNodes + 1; ++i) { URI uri = new URI("udp://" + "127.0.0.1" + ":" + (30300 + i)); - startupMembers.add(new RemoteGossipMember(cluster, uri, i + "")); + startupMembers.add(new RemoteMember(cluster, uri, i + "")); } - final List clients = Collections.synchronizedList(new ArrayList()); + final List clients = Collections.synchronizedList(new ArrayList()); final int clusterMembers = 5; for (int i = 1; i < clusterMembers + 1; ++i) { URI uri = new URI("udp://" + "127.0.0.1" + ":" + (30300 + i)); - GossipService gossipService = new GossipService(cluster, uri, i + "", new HashMap(), startupMembers, - settings, (a,b) -> {}, new MetricRegistry()); + GossipManager gossipService = GossipManagerBuilder.newBuilder() + .cluster(cluster) + .uri(uri) + .id(i + "") + .gossipMembers(startupMembers) + .gossipSettings(settings) + .build(); clients.add(gossipService); - gossipService.start(); + gossipService.init(); + } TUnit.assertThat(new Callable() { public Integer call() throws Exception { int total = 0; for (int i = 0; i < clusterMembers; ++i) { - total += clients.get(i).getGossipManager().getLiveMembers().size(); + total += clients.get(i).getLiveMembers().size(); } return total; } @@ -79,15 +85,15 @@ public class ShutdownDeadtimeTest { Random r = new Random(); int randomClientId = r.nextInt(clusterMembers); log.info("shutting down " + randomClientId); - final int shutdownPort = clients.get(randomClientId).getGossipManager().getMyself().getUri() + final int shutdownPort = clients.get(randomClientId).getMyself().getUri() .getPort(); - final String shutdownId = clients.get(randomClientId).getGossipManager().getMyself().getId(); + final String shutdownId = clients.get(randomClientId).getMyself().getId(); clients.get(randomClientId).shutdown(); TUnit.assertThat(new Callable() { public Integer call() throws Exception { int total = 0; for (int i = 0; i < clusterMembers; ++i) { - total += clients.get(i).getGossipManager().getLiveMembers().size(); + total += clients.get(i).getLiveMembers().size(); } return total; } @@ -98,7 +104,7 @@ public class ShutdownDeadtimeTest { public Integer call() throws Exception { int total = 0; for (int i = 0; i < clusterMembers - 1; ++i) { - total += clients.get(i).getGossipManager().getDeadMembers().size(); + total += clients.get(i).getDeadMembers().size(); } return total; } @@ -106,17 +112,22 @@ public class ShutdownDeadtimeTest { URI uri = new URI("udp://" + "127.0.0.1" + ":" + shutdownPort); // start client again - GossipService gossipService = new GossipService(cluster, uri, shutdownId + "", new HashMap(), startupMembers, - settings, (a,b) -> {}, new MetricRegistry()); + GossipManager gossipService = GossipManagerBuilder.newBuilder() + .gossipSettings(settings) + .cluster(cluster) + .uri(uri) + .id(shutdownId+"") + .gossipMembers(startupMembers) + .build(); clients.add(gossipService); - gossipService.start(); + gossipService.init(); // verify that the client is alive again for every node TUnit.assertThat(new Callable() { public Integer call() throws Exception { int total = 0; for (int i = 0; i < clusterMembers; ++i) { - total += clients.get(i).getGossipManager().getLiveMembers().size(); + total += clients.get(i).getLiveMembers().size(); } return total; } diff --git a/src/test/java/org/apache/gossip/SignedMessageTest.java b/src/test/java/org/apache/gossip/SignedMessageTest.java index 6bea974..50e3cb5 100644 --- a/src/test/java/org/apache/gossip/SignedMessageTest.java +++ b/src/test/java/org/apache/gossip/SignedMessageTest.java @@ -25,33 +25,36 @@ import java.net.UnknownHostException; import java.security.NoSuchAlgorithmException; import java.security.NoSuchProviderException; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.UUID; import java.util.concurrent.TimeUnit; +import org.apache.gossip.manager.GossipManager; +import org.apache.gossip.manager.GossipManagerBuilder; import org.apache.gossip.manager.PassiveGossipConstants; import org.apache.gossip.secure.KeyTool; import org.junit.Assert; import org.junit.Test; -import com.codahale.metrics.MetricRegistry; - import io.teknek.tunit.TUnit; -public class SignedMessageTest { +public class SignedMessageTest extends AbstractIntegrationBase { - @Test(expected=IllegalArgumentException.class) + @Test(expected = IllegalArgumentException.class) public void ifSignMustHaveKeys() throws URISyntaxException, UnknownHostException, InterruptedException { String cluster = UUID.randomUUID().toString(); GossipSettings settings = gossiperThatSigns(); - List startupMembers = new ArrayList<>(); + List startupMembers = new ArrayList<>(); URI uri = new URI("udp://" + "127.0.0.1" + ":" + (30000 + 1)); - GossipService gossipService = new GossipService(cluster, uri, 1 + "", - new HashMap(), startupMembers, settings, (a, b) -> { }, - new MetricRegistry()); - gossipService.start(); + GossipManager gossipService = GossipManagerBuilder.newBuilder() + .cluster(cluster) + .uri(uri) + .id(1 + "") + .gossipMembers(startupMembers) + .gossipSettings(settings) + .build(); + gossipService.init(); } private GossipSettings gossiperThatSigns(){ @@ -63,48 +66,52 @@ public class SignedMessageTest { } @Test - public void dataTest() throws InterruptedException, URISyntaxException, NoSuchAlgorithmException, NoSuchProviderException, IOException{ + public void dataTest() throws InterruptedException, URISyntaxException, NoSuchAlgorithmException, NoSuchProviderException, IOException { String keys = "./keys"; GossipSettings settings = gossiperThatSigns(); setup(keys); String cluster = UUID.randomUUID().toString(); - List startupMembers = new ArrayList<>(); + List startupMembers = new ArrayList<>(); for (int i = 1; i < 2; ++i) { URI uri = new URI("udp://" + "127.0.0.1" + ":" + (30000 + i)); - startupMembers.add(new RemoteGossipMember(cluster, uri, i + "")); + startupMembers.add(new RemoteMember(cluster, uri, i + "")); } - final List clients = new ArrayList<>(); + final List clients = new ArrayList<>(); for (int i = 1; i < 3; ++i) { URI uri = new URI("udp://" + "127.0.0.1" + ":" + (30000 + i)); - GossipService gossipService = new GossipService(cluster, uri, i + "", - new HashMap(), startupMembers, settings, - (a,b) -> {}, new MetricRegistry()); + GossipManager gossipService = GossipManagerBuilder.newBuilder() + .cluster(cluster) + .uri(uri) + .id(i + "") + .gossipMembers(startupMembers) + .gossipSettings(settings) + .build(); + gossipService.init(); clients.add(gossipService); - gossipService.start(); } assertTwoAlive(clients); assertOnlySignedMessages(clients); cleanup(keys, clients); } - private void assertTwoAlive(List clients){ + private void assertTwoAlive(List clients){ TUnit.assertThat(() -> { int total = 0; for (int i = 0; i < clients.size(); ++i) { - total += clients.get(i).getGossipManager().getLiveMembers().size(); + total += clients.get(i).getLiveMembers().size(); } return total; }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(2); } - private void assertOnlySignedMessages(List clients){ - Assert.assertEquals(0, clients.get(0).getGossipManager().getRegistry() + private void assertOnlySignedMessages(List clients){ + Assert.assertEquals(0, clients.get(0).getRegistry() .meter(PassiveGossipConstants.UNSIGNED_MESSAGE).getCount()); - Assert.assertTrue(clients.get(0).getGossipManager().getRegistry() + Assert.assertTrue(clients.get(0).getRegistry() .meter(PassiveGossipConstants.SIGNED_MESSAGE).getCount() > 0); } - private void cleanup(String keys, List clients){ + private void cleanup(String keys, List clients){ new File(keys, "1").delete(); new File(keys, "2").delete(); new File(keys).delete(); diff --git a/src/test/java/org/apache/gossip/StartupSettingsTest.java b/src/test/java/org/apache/gossip/StartupSettingsTest.java index aa4d255..d6c4a1e 100644 --- a/src/test/java/org/apache/gossip/StartupSettingsTest.java +++ b/src/test/java/org/apache/gossip/StartupSettingsTest.java @@ -17,7 +17,8 @@ */ package org.apache.gossip; -import com.codahale.metrics.MetricRegistry; +import org.apache.gossip.manager.GossipManager; +import org.apache.gossip.manager.GossipManagerBuilder; import org.apache.log4j.Logger; import org.junit.jupiter.api.Test; @@ -27,8 +28,6 @@ import java.io.FileOutputStream; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; -import java.util.ArrayList; -import java.util.HashMap; import java.util.UUID; import org.junit.platform.runner.JUnitPlatform; import org.junit.runner.RunWith; @@ -39,7 +38,7 @@ import org.junit.runner.RunWith; */ @RunWith(JUnitPlatform.class) public class StartupSettingsTest { - private static final Logger log = Logger.getLogger( StartupSettingsTest.class ); + private static final Logger log = Logger.getLogger(StartupSettingsTest.class); private static final String CLUSTER = UUID.randomUUID().toString(); @Test @@ -48,15 +47,17 @@ public class StartupSettingsTest { settingsFile.deleteOnExit(); writeSettingsFile(settingsFile); URI uri = new URI("udp://" + "127.0.0.1" + ":" + 50000); - final GossipService firstService = new GossipService( - CLUSTER, uri, "1", new HashMap(), - new ArrayList(), new GossipSettings(), null, new MetricRegistry()); - firstService.start(); - final GossipService serviceUnderTest = new GossipService( - StartupSettings.fromJSONFile(settingsFile)); - serviceUnderTest.start(); + GossipManager firstService = GossipManagerBuilder.newBuilder() + .cluster(CLUSTER) + .uri(uri) + .id("1") + .gossipSettings(new GossipSettings()).build(); + firstService.init(); + GossipManager manager = GossipManagerBuilder.newBuilder() + .startupSettings(StartupSettings.fromJSONFile(settingsFile)).build(); + manager.init(); firstService.shutdown(); - serviceUnderTest.shutdown(); + manager.shutdown(); } private void writeSettingsFile( File target ) throws IOException { diff --git a/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java b/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java index 8a9a9ab..8ae783e 100644 --- a/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java +++ b/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java @@ -17,20 +17,20 @@ */ package org.apache.gossip; -import com.codahale.metrics.MetricRegistry; import io.teknek.tunit.TUnit; import java.net.URI; import java.net.URISyntaxException; import java.net.UnknownHostException; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; import org.junit.platform.runner.JUnitPlatform; import org.junit.runner.RunWith; +import org.apache.gossip.manager.GossipManager; +import org.apache.gossip.manager.GossipManagerBuilder; import org.junit.jupiter.api.Test; @RunWith(JUnitPlatform.class) @@ -42,35 +42,40 @@ public class TenNodeThreeSeedTest { } @Test - public void testAgain() throws UnknownHostException, InterruptedException, URISyntaxException{ + public void testAgain() throws UnknownHostException, InterruptedException, URISyntaxException { abc(30100); } - public void abc(int base) throws InterruptedException, UnknownHostException, URISyntaxException{ - GossipSettings settings = new GossipSettings(1000, 10000, 1000, 1, 2.0, "exponential"); + public void abc(int base) throws InterruptedException, UnknownHostException, URISyntaxException { + GossipSettings settings = new GossipSettings(1000, 10000, 1000, 1, 1.6, "exponential"); settings.setPersistRingState(false); settings.setPersistDataState(false); String cluster = UUID.randomUUID().toString(); int seedNodes = 3; - List startupMembers = new ArrayList<>(); + List startupMembers = new ArrayList<>(); for (int i = 1; i < seedNodes+1; ++i) { URI uri = new URI("udp://" + "127.0.0.1" + ":" + (base + i)); - startupMembers.add(new RemoteGossipMember(cluster, uri, i + "")); + startupMembers.add(new RemoteMember(cluster, uri, i + "")); } - final List clients = new ArrayList<>(); + final List clients = new ArrayList<>(); final int clusterMembers = 5; for (int i = 1; i < clusterMembers+1; ++i) { URI uri = new URI("udp://" + "127.0.0.1" + ":" + (base + i)); - GossipService gossipService = new GossipService(cluster, uri, i + "", new HashMap(), - startupMembers, settings, (a,b) -> {}, new MetricRegistry()); + GossipManager gossipService = GossipManagerBuilder.newBuilder() + .cluster(cluster) + .uri(uri) + .id(i + "") + .gossipSettings(settings) + .gossipMembers(startupMembers) + .build(); + gossipService.init(); clients.add(gossipService); - gossipService.start(); } TUnit.assertThat(new Callable (){ public Integer call() throws Exception { int total = 0; for (int i = 0; i < clusterMembers; ++i) { - total += clients.get(i).getGossipManager().getLiveMembers().size(); + total += clients.get(i).getLiveMembers().size(); } return total; }}).afterWaitingAtMost(40, TimeUnit.SECONDS).isEqualTo(20); diff --git a/src/test/java/org/apache/gossip/accrual/FailureDetectorTest.java b/src/test/java/org/apache/gossip/accrual/FailureDetectorTest.java index 69d46b8..b00c0c3 100644 --- a/src/test/java/org/apache/gossip/accrual/FailureDetectorTest.java +++ b/src/test/java/org/apache/gossip/accrual/FailureDetectorTest.java @@ -19,7 +19,7 @@ package org.apache.gossip.accrual; import java.net.URI; -import org.apache.gossip.LocalGossipMember; +import org.apache.gossip.LocalMember; import org.junit.Assert; import org.junit.Ignore; import org.junit.jupiter.api.Test; @@ -33,7 +33,7 @@ public class FailureDetectorTest { public void aNormalTest(){ int samples = 1; int windowSize = 1000; - LocalGossipMember member = new LocalGossipMember("", URI.create("udp://127.0.0.1:1000"), + LocalMember member = new LocalMember("", URI.create("udp://127.0.0.1:1000"), "", 0L, null, windowSize, samples, "normal"); member.recordHeartbeat(5); member.recordHeartbeat(10); @@ -44,7 +44,7 @@ public class FailureDetectorTest { public void aTest(){ int samples = 1; int windowSize = 1000; - LocalGossipMember member = new LocalGossipMember("", URI.create("udp://127.0.0.1:1000"), + LocalMember member = new LocalMember("", URI.create("udp://127.0.0.1:1000"), "", 0L, null, windowSize, samples, "exponential"); member.recordHeartbeat(5); member.recordHeartbeat(10); @@ -64,7 +64,7 @@ public class FailureDetectorTest { public void sameHeartbeatTest(){ int samples = 1; int windowSize = 1000; - LocalGossipMember member = new LocalGossipMember("", URI.create("udp://127.0.0.1:1000"), + LocalMember member = new LocalMember("", URI.create("udp://127.0.0.1:1000"), "", 0L, null, windowSize, samples, "exponential"); member.recordHeartbeat(5); member.recordHeartbeat(5); diff --git a/src/test/java/org/apache/gossip/crdt/OrSetTest.java b/src/test/java/org/apache/gossip/crdt/OrSetTest.java index e576764..b3d8af3 100644 --- a/src/test/java/org/apache/gossip/crdt/OrSetTest.java +++ b/src/test/java/org/apache/gossip/crdt/OrSetTest.java @@ -21,18 +21,15 @@ import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.Arrays; -import java.util.HashMap; import java.util.SortedSet; import java.util.TreeSet; -import org.apache.gossip.GossipService; import org.apache.gossip.GossipSettings; -import org.apache.gossip.RemoteGossipMember; +import org.apache.gossip.manager.GossipManager; +import org.apache.gossip.manager.GossipManagerBuilder; import org.junit.Assert; import org.junit.Test; -import com.codahale.metrics.MetricRegistry; - public class OrSetTest { @Test @@ -91,14 +88,16 @@ public class OrSetTest { @Test public void serialTest() throws InterruptedException, URISyntaxException, IOException { - GossipService gossipService2 = new GossipService("a", new URI("udp://" + "127.0.0.1" + ":" + (29000 + 1)), "1", new HashMap<>(), - Arrays.asList(new RemoteGossipMember("a", - new URI("udp://" + "127.0.0.1" + ":" + (29000 + 0)), "0")), - new GossipSettings(), (a, b) -> { }, new MetricRegistry()); + GossipManager gossipService2 = GossipManagerBuilder.newBuilder() + .cluster("a") + .uri(new URI("udp://" + "127.0.0.1" + ":" + (29000 + 1))) + .id("1") + .gossipSettings(new GossipSettings()) + .build(); OrSet i = new OrSet(new OrSet.Builder().add(1).remove(1)); - String s = gossipService2.getGossipManager().getObjectMapper().writeValueAsString(i); + String s = gossipService2.getObjectMapper().writeValueAsString(i); @SuppressWarnings("unchecked") - OrSet back = gossipService2.getGossipManager().getObjectMapper().readValue(s, OrSet.class); + OrSet back = gossipService2.getObjectMapper().readValue(s, OrSet.class); Assert.assertEquals(back, i); } diff --git a/src/test/java/org/apache/gossip/manager/DataReaperTest.java b/src/test/java/org/apache/gossip/manager/DataReaperTest.java index a9c861c..e328c24 100644 --- a/src/test/java/org/apache/gossip/manager/DataReaperTest.java +++ b/src/test/java/org/apache/gossip/manager/DataReaperTest.java @@ -21,9 +21,8 @@ import com.codahale.metrics.MetricRegistry; import java.net.URI; import org.apache.gossip.GossipSettings; -import org.apache.gossip.manager.random.RandomGossipManager; -import org.apache.gossip.model.GossipDataMessage; -import org.apache.gossip.model.SharedGossipDataMessage; +import org.apache.gossip.model.PerNodeDataMessage; +import org.apache.gossip.model.SharedDataMessage; import org.junit.Assert; import org.junit.Test; @@ -41,8 +40,8 @@ public class DataReaperTest { GossipSettings settings = new GossipSettings(); settings.setPersistRingState(false); settings.setPersistDataState(false); - GossipManager gm = RandomGossipManager.newBuilder().cluster("abc").settings(settings) - .withId(myId).uri(URI.create("udp://localhost:6000")).registry(registry).build(); + GossipManager gm = GossipManagerBuilder.newBuilder().cluster("abc").gossipSettings(settings) + .id(myId).uri(URI.create("udp://localhost:6000")).registry(registry).build(); gm.init(); gm.gossipPerNodeData(perNodeDatum(key, value)); gm.gossipSharedData(sharedDatum(key, value)); @@ -65,8 +64,8 @@ public class DataReaperTest { TUnit.assertThat(() -> gm.findSharedGossipData(key)).equals(null); } - private GossipDataMessage perNodeDatum(String key, String value) { - GossipDataMessage m = new GossipDataMessage(); + private PerNodeDataMessage perNodeDatum(String key, String value) { + PerNodeDataMessage m = new PerNodeDataMessage(); m.setExpireAt(System.currentTimeMillis() + 5L); m.setKey(key); m.setPayload(value); @@ -74,8 +73,8 @@ public class DataReaperTest { return m; } - private SharedGossipDataMessage sharedDatum(String key, String value) { - SharedGossipDataMessage m = new SharedGossipDataMessage(); + private SharedDataMessage sharedDatum(String key, String value) { + SharedDataMessage m = new SharedDataMessage(); m.setExpireAt(System.currentTimeMillis() + 5L); m.setKey(key); m.setPayload(value); @@ -89,11 +88,11 @@ public class DataReaperTest { String key = "key"; String value = "a"; GossipSettings settings = new GossipSettings(); - GossipManager gm = RandomGossipManager.newBuilder().cluster("abc").settings(settings) - .withId(myId).uri(URI.create("udp://localhost:7000")).registry(registry).build(); + GossipManager gm = GossipManagerBuilder.newBuilder().cluster("abc").gossipSettings(settings) + .id(myId).uri(URI.create("udp://localhost:7000")).registry(registry).build(); gm.init(); - GossipDataMessage before = perNodeDatum(key, value); - GossipDataMessage after = perNodeDatum(key, "b"); + PerNodeDataMessage before = perNodeDatum(key, value); + PerNodeDataMessage after = perNodeDatum(key, "b"); after.setTimestamp(after.getTimestamp() - 1); gm.gossipPerNodeData(before); Assert.assertEquals(value, gm.findPerNodeGossipData(myId, key).getPayload()); diff --git a/src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java b/src/test/java/org/apache/gossip/manager/GossipManagerBuilderTest.java similarity index 77% rename from src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java rename to src/test/java/org/apache/gossip/manager/GossipManagerBuilderTest.java index 2d04087..8842643 100644 --- a/src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java +++ b/src/test/java/org/apache/gossip/manager/GossipManagerBuilderTest.java @@ -18,14 +18,13 @@ package org.apache.gossip.manager; import com.codahale.metrics.MetricRegistry; -import org.apache.gossip.GossipMember; +import org.apache.gossip.Member; import org.apache.gossip.GossipSettings; -import org.apache.gossip.LocalGossipMember; +import org.apache.gossip.LocalMember; import org.apache.gossip.manager.handlers.DefaultMessageInvoker; import org.apache.gossip.manager.handlers.MessageInvoker; import org.apache.gossip.manager.handlers.ResponseHandler; import org.apache.gossip.manager.handlers.SimpleMessageInvoker; -import org.apache.gossip.manager.random.RandomGossipManager; import org.junit.Assert; import org.junit.jupiter.api.Test; import org.junit.platform.runner.JUnitPlatform; @@ -43,47 +42,47 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.expectThrows; @RunWith(JUnitPlatform.class) -public class RandomGossipManagerBuilderTest { +public class GossipManagerBuilderTest { @Test public void idShouldNotBeNull() { expectThrows(IllegalArgumentException.class,() -> { - RandomGossipManager.newBuilder().cluster("aCluster").build(); + GossipManagerBuilder.newBuilder().cluster("aCluster").build(); }); } @Test public void clusterShouldNotBeNull() { expectThrows(IllegalArgumentException.class,() -> { - RandomGossipManager.newBuilder().withId("id").build(); + GossipManagerBuilder.newBuilder().id("id").build(); }); } @Test public void settingsShouldNotBeNull() { expectThrows(IllegalArgumentException.class,() -> { - RandomGossipManager.newBuilder().withId("id").cluster("aCluster").build(); + GossipManagerBuilder.newBuilder().id("id").cluster("aCluster").build(); }); } @Test public void createMembersListIfNull() throws URISyntaxException { - RandomGossipManager gossipManager = RandomGossipManager.newBuilder() - .withId("id") + GossipManager gossipManager = GossipManagerBuilder.newBuilder() + .id("id") .cluster("aCluster") .uri(new URI("udp://localhost:2000")) - .settings(new GossipSettings()) + .gossipSettings(new GossipSettings()) .gossipMembers(null).registry(new MetricRegistry()).build(); assertNotNull(gossipManager.getLiveMembers()); } @Test public void createDefaultMessageInvokerIfNull() throws URISyntaxException { - RandomGossipManager gossipManager = RandomGossipManager.newBuilder() - .withId("id") + GossipManager gossipManager = GossipManagerBuilder.newBuilder() + .id("id") .cluster("aCluster") .uri(new URI("udp://localhost:2000")) - .settings(new GossipSettings()) + .gossipSettings(new GossipSettings()) .messageInvoker(null).registry(new MetricRegistry()).build(); assertNotNull(gossipManager.getMessageInvoker()); Assert.assertEquals(gossipManager.getMessageInvoker().getClass(), new DefaultMessageInvoker().getClass()); @@ -92,11 +91,11 @@ public class RandomGossipManagerBuilderTest { @Test public void testMessageInvokerKeeping() throws URISyntaxException { MessageInvoker mi = new SimpleMessageInvoker(Response.class, new ResponseHandler()); - RandomGossipManager gossipManager = RandomGossipManager.newBuilder() - .withId("id") + GossipManager gossipManager = GossipManagerBuilder.newBuilder() + .id("id") .cluster("aCluster") .uri(new URI("udp://localhost:2000")) - .settings(new GossipSettings()) + .gossipSettings(new GossipSettings()) .messageInvoker(mi).registry(new MetricRegistry()).build(); assertNotNull(gossipManager.getMessageInvoker()); Assert.assertEquals(gossipManager.getMessageInvoker(), mi); @@ -104,15 +103,15 @@ public class RandomGossipManagerBuilderTest { @Test public void useMemberListIfProvided() throws URISyntaxException { - LocalGossipMember member = new LocalGossipMember( + LocalMember member = new LocalMember( "aCluster", new URI("udp://localhost:2000"), "aGossipMember", System.nanoTime(), new HashMap(), 1000, 1, "exponential"); - List memberList = new ArrayList<>(); + List memberList = new ArrayList<>(); memberList.add(member); - RandomGossipManager gossipManager = RandomGossipManager.newBuilder() - .withId("id") + GossipManager gossipManager = GossipManagerBuilder.newBuilder() + .id("id") .cluster("aCluster") - .settings(new GossipSettings()) + .gossipSettings(new GossipSettings()) .uri(new URI("udp://localhost:8000")) .gossipMembers(memberList).registry(new MetricRegistry()).build(); assertEquals(1, gossipManager.getDeadMembers().size()); diff --git a/src/test/java/org/apache/gossip/manager/RingPersistenceTest.java b/src/test/java/org/apache/gossip/manager/RingPersistenceTest.java index 6e41bdc..d448b98 100644 --- a/src/test/java/org/apache/gossip/manager/RingPersistenceTest.java +++ b/src/test/java/org/apache/gossip/manager/RingPersistenceTest.java @@ -22,15 +22,11 @@ import java.net.URI; import java.net.URISyntaxException; import java.net.UnknownHostException; import java.util.Arrays; -import java.util.HashMap; -import org.apache.gossip.GossipService; import org.apache.gossip.GossipSettings; -import org.apache.gossip.RemoteGossipMember; +import org.apache.gossip.RemoteMember; import org.junit.Assert; import org.junit.Test; -import com.codahale.metrics.MetricRegistry; - public class RingPersistenceTest { @Test @@ -43,21 +39,26 @@ public class RingPersistenceTest { } private File aGossiperPersists(GossipSettings settings) throws UnknownHostException, InterruptedException, URISyntaxException { - GossipService gossipService = new GossipService("a", new URI("udp://" + "127.0.0.1" + ":" + (29000 + 1)), "1", new HashMap(), - Arrays.asList( - new RemoteGossipMember("a", new URI("udp://" + "127.0.0.1" + ":" + (29000 + 0)), "0"), - new RemoteGossipMember("a", new URI("udp://" + "127.0.0.1" + ":" + (29000 + 2)), "2") - ), - settings, (a, b) -> { }, new MetricRegistry()); - gossipService.getGossipManager().getRingState().writeToDisk(); - return gossipService.getGossipManager().getRingState().computeTarget(); + GossipManager gossipService = GossipManagerBuilder.newBuilder() + .cluster("a") + .uri(new URI("udp://" + "127.0.0.1" + ":" + (29000 + 1))) + .id("1") + .gossipSettings(settings) + .gossipMembers( + Arrays.asList( + new RemoteMember("a", new URI("udp://" + "127.0.0.1" + ":" + (29000 + 0)), "0"), + new RemoteMember("a", new URI("udp://" + "127.0.0.1" + ":" + (29000 + 2)), "2"))).build(); + gossipService.getRingState().writeToDisk(); + return gossipService.getRingState().computeTarget(); } - private void aNewInstanceGetsRingInfo(GossipSettings settings) throws UnknownHostException, InterruptedException, URISyntaxException{ - GossipService gossipService2 = new GossipService("a", new URI("udp://" + "127.0.0.1" + ":" + (29000 + 1)), "1", new HashMap(), - Arrays.asList(), - settings, (a, b) -> { }, new MetricRegistry()); - Assert.assertEquals(2, gossipService2.getGossipManager().getMembers().size()); + private void aNewInstanceGetsRingInfo(GossipSettings settings) throws UnknownHostException, InterruptedException, URISyntaxException { + GossipManager gossipService2 = GossipManagerBuilder.newBuilder() + .cluster("a") + .uri(new URI("udp://" + "127.0.0.1" + ":" + (29000 + 1))) + .id("1") + .gossipSettings(settings).build(); + Assert.assertEquals(2, gossipService2.getMembers().size()); } } diff --git a/src/test/java/org/apache/gossip/manager/UserDataPersistenceTest.java b/src/test/java/org/apache/gossip/manager/UserDataPersistenceTest.java index e0cbcf4..7b17e41 100644 --- a/src/test/java/org/apache/gossip/manager/UserDataPersistenceTest.java +++ b/src/test/java/org/apache/gossip/manager/UserDataPersistenceTest.java @@ -21,64 +21,65 @@ import java.io.File; import java.net.URI; import java.net.URISyntaxException; import java.net.UnknownHostException; -import java.util.Arrays; -import java.util.HashMap; import java.util.concurrent.ConcurrentHashMap; -import org.apache.gossip.GossipService; import org.apache.gossip.GossipSettings; -import org.apache.gossip.model.GossipDataMessage; -import org.apache.gossip.model.SharedGossipDataMessage; +import org.apache.gossip.model.PerNodeDataMessage; +import org.apache.gossip.model.SharedDataMessage; import org.junit.Assert; import org.junit.Test; -import com.codahale.metrics.MetricRegistry; - public class UserDataPersistenceTest { + String nodeId = "1"; + + private GossipManager sameService() throws URISyntaxException { + GossipSettings settings = new GossipSettings(); + return GossipManagerBuilder.newBuilder() + .cluster("a") + .uri(new URI("udp://" + "127.0.0.1" + ":" + (29000 + 1))) + .id(nodeId) + .gossipSettings(settings).build(); + } + @Test public void givenThatRingIsPersisted() throws UnknownHostException, InterruptedException, URISyntaxException { - String nodeId = "1"; - GossipSettings settings = new GossipSettings(); + { //Create a gossip service and force it to persist its user data - GossipService gossipService = new GossipService("a", - new URI("udp://" + "127.0.0.1" + ":" + (29000 + 1)), nodeId, new HashMap(), - Arrays.asList(), settings, (a, b) -> { }, new MetricRegistry()); - gossipService.start(); + GossipManager gossipService = sameService(); + gossipService.init(); gossipService.gossipPerNodeData(getToothpick()); gossipService.gossipSharedData(getAnotherToothpick()); - gossipService.getGossipManager().getUserDataState().writePerNodeToDisk(); - gossipService.getGossipManager().getUserDataState().writeSharedToDisk(); + gossipService.getUserDataState().writePerNodeToDisk(); + gossipService.getUserDataState().writeSharedToDisk(); { //read the raw data and confirm - ConcurrentHashMap> l = gossipService.getGossipManager().getUserDataState().readPerNodeFromDisk(); + ConcurrentHashMap> l = gossipService.getUserDataState().readPerNodeFromDisk(); Assert.assertEquals("red", ((AToothpick) l.get(nodeId).get("a").getPayload()).getColor()); } { - ConcurrentHashMap l = - gossipService.getGossipManager().getUserDataState().readSharedDataFromDisk(); + ConcurrentHashMap l = + gossipService.getUserDataState().readSharedDataFromDisk(); Assert.assertEquals("blue", ((AToothpick) l.get("a").getPayload()).getColor()); } gossipService.shutdown(); } { //recreate the service and see that the data is read back in - GossipService gossipService = new GossipService("a", - new URI("udp://" + "127.0.0.1" + ":" + (29000 + 1)), nodeId, new HashMap(), - Arrays.asList(), settings, (a, b) -> { }, new MetricRegistry()); - gossipService.start(); - Assert.assertEquals("red", ((AToothpick) gossipService.findPerNodeData(nodeId, "a").getPayload()).getColor()); - Assert.assertEquals("blue", ((AToothpick) gossipService.findSharedData("a").getPayload()).getColor()); - File f = gossipService.getGossipManager().getUserDataState().computeSharedTarget(); - File g = gossipService.getGossipManager().getUserDataState().computePerNodeTarget(); + GossipManager gossipService = sameService(); + gossipService.init(); + Assert.assertEquals("red", ((AToothpick) gossipService.findPerNodeGossipData(nodeId, "a").getPayload()).getColor()); + Assert.assertEquals("blue", ((AToothpick) gossipService.findSharedGossipData("a").getPayload()).getColor()); + File f = gossipService.getUserDataState().computeSharedTarget(); + File g = gossipService.getUserDataState().computePerNodeTarget(); gossipService.shutdown(); f.delete(); g.delete(); } } - public GossipDataMessage getToothpick(){ + public PerNodeDataMessage getToothpick(){ AToothpick a = new AToothpick(); a.setColor("red"); - GossipDataMessage d = new GossipDataMessage(); + PerNodeDataMessage d = new PerNodeDataMessage(); d.setExpireAt(Long.MAX_VALUE); d.setKey("a"); d.setPayload(a); @@ -86,10 +87,10 @@ public class UserDataPersistenceTest { return d; } - public SharedGossipDataMessage getAnotherToothpick(){ + public SharedDataMessage getAnotherToothpick(){ AToothpick a = new AToothpick(); a.setColor("blue"); - SharedGossipDataMessage d = new SharedGossipDataMessage(); + SharedDataMessage d = new SharedDataMessage(); d.setExpireAt(Long.MAX_VALUE); d.setKey("a"); d.setPayload(a); diff --git a/src/test/java/org/apache/gossip/manager/handlers/MessageInvokerTest.java b/src/test/java/org/apache/gossip/manager/handlers/MessageInvokerTest.java index d402d59..571d7ba 100644 --- a/src/test/java/org/apache/gossip/manager/handlers/MessageInvokerTest.java +++ b/src/test/java/org/apache/gossip/manager/handlers/MessageInvokerTest.java @@ -21,7 +21,7 @@ import org.apache.gossip.manager.GossipCore; import org.apache.gossip.manager.GossipManager; import org.apache.gossip.model.ActiveGossipMessage; import org.apache.gossip.model.Base; -import org.apache.gossip.udp.UdpSharedGossipDataMessage; +import org.apache.gossip.udp.UdpSharedDataMessage; import org.junit.Assert; import org.junit.Test; @@ -166,7 +166,7 @@ public class MessageInvokerTest { //UdpSharedGossipDataMessage with null gossipCore -> exception boolean thrown = false; try { - mi.invoke(null, null, new UdpSharedGossipDataMessage()); + mi.invoke(null, null, new UdpSharedDataMessage()); } catch (NullPointerException e) { thrown = true; }