diff --git a/README.md b/README.md index 5bdc620..86f179a 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,8 @@ To gossip you need one or more seed nodes. Seed is just a list of places to init int seedNodes = 3; List startupMembers = new ArrayList<>(); for (int i = 1; i < seedNodes+1; ++i) { - startupMembers.add(new RemoteGossipMember("127.0.0." + i, 2000, i + "")); + URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i)); + startupMembers.add(new RemoteGossipMember(cluster, uri, i + "")); } ``` @@ -24,10 +25,9 @@ Here we start five gossip processes and check that they discover each other. (No List clients = new ArrayList<>(); int clusterMembers = 5; for (int i = 1; i < clusterMembers+1; ++i) { - GossipService gossipService = new GossipService("127.0.0." + i, 2000, i + "", - LogLevel.DEBUG, startupMembers, settings, null); - clients.add(gossipService); - gossipService.start(); + URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i)); + GossipService gossipService = new GossipService(cluster, uri, i + "", + startupMembers, settings, (a,b) -> {}); } ``` @@ -47,22 +47,24 @@ For a very simple client setup with a settings file you first need a JSON file s ```json [{ - "id":"419af818-0114-4c7b-8fdb-952915335ce4", - "port":50001, + "cluster":"9f1e6ddf-8e1c-4026-8fc6-8585d0132f77", + "id":"447c5bec-f112-492d-968b-f64c8e36dfd7", + "uri":"udp://127.0.0.1:50001", "gossip_interval":1000, "cleanup_interval":10000, "members":[ - {"host":"127.0.0.1", "port":50000} + {"cluster": "9f1e6ddf-8e1c-4026-8fc6-8585d0132f77","uri":"udp://127.0.0.1:5000"} ] }] ``` where: +* `cluster` - is the name of the cluster * `id` - is a unique id for this node (you can use any string, but above we use a UUID) -* `port` - the port to use on the default adapter on the node's machine +* `uri` - is a URI object containing IP/hostname and port to use on the default adapter on the node's machine * `gossip_interval` - how often (in milliseconds) to gossip list of members to other node(s) -* `cleanup_interval` - when to remove 'dead' nodes (in milliseconds) +* `cleanup_interval` - when to remove 'dead' nodes (in milliseconds) (deprecated may be coming back) * `members` - initial seed nodes Then starting a local node is as simple as: @@ -96,13 +98,14 @@ These can be accessed from the `GossipManager` on your `GossipService`, e.g: Users can also attach an event listener: ```java - GossipService gossipService = new GossipService("127.0.0." + i, 2000, i + "", LogLevel.DEBUG, - startupMembers, settings, - new GossipListener(){ - @Override - public void gossipEvent(GossipMember member, GossipState state) { - System.out.println(member+" "+ state); - } + GossipService gossipService = new GossipService(cluster, uri, i + "", startupMembers, + settings, new GossipListener() { + @Override + public void gossipEvent(GossipMember member, GossipState state) { + System.out.println(System.currentTimeMillis() + " Member " + j + " reports " + + member + " " + state); + } }); + //The lambda syntax is (a,b) -> { } //NICE! ``` diff --git a/src/main/java/org/apache/gossip/GossipTimeoutTimer.java b/src/main/java/org/apache/gossip/GossipTimeoutTimer.java deleted file mode 100644 index 2fa09c0..0000000 --- a/src/main/java/org/apache/gossip/GossipTimeoutTimer.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gossip; - -import java.util.Date; - -import javax.management.NotificationListener; -import javax.management.timer.Timer; - -/** - * This object represents a timer for a gossip member. When the timer has elapsed without being - * reset in the meantime, it will inform the GossipService about this who in turn will put the - * gossip member on the dead list, because it is apparantly not alive anymore. - * - * @author joshclemm, harmenw - */ -public class GossipTimeoutTimer extends Timer { - - private final long sleepTime; - - private final LocalGossipMember source; - - /** - * Constructor. Creates a reset-able timer that wakes up after millisecondsSleepTime. - * - * @param millisecondsSleepTime - * The time for this timer to wait before an event. - * @param notificationListener - * @param member - */ - public GossipTimeoutTimer(long millisecondsSleepTime, NotificationListener notificationListener, - LocalGossipMember member) { - super(); - sleepTime = millisecondsSleepTime; - source = member; - addNotificationListener(notificationListener, null, null); - } - - /** - * @see javax.management.timer.Timer#start() - */ - public void start() { - this.reset(); - super.start(); - } - - /** - * Resets timer to start counting down from original time. - */ - public void reset() { - removeAllNotifications(); - setWakeupTime(sleepTime); - } - - /** - * Adds a new wake-up time for this timer. - * - * @param milliseconds - */ - private void setWakeupTime(long milliseconds) { - addNotification("type", "message", source, new Date(System.currentTimeMillis() + milliseconds)); - } -} diff --git a/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java b/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java index c09cfe9..731b019 100644 --- a/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java +++ b/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java @@ -18,7 +18,6 @@ package org.apache.gossip.manager; import java.io.IOException; -import java.net.DatagramSocket; import java.util.List; import java.util.Map.Entry; @@ -116,8 +115,7 @@ public class ActiveGossipThread { sharedDataHistogram.update(System.currentTimeMillis() - startTime); return; } - try (DatagramSocket socket = new DatagramSocket()) { - socket.setSoTimeout(gossipManager.getSettings().getGossipInterval()); + try { for (Entry innerEntry : this.gossipCore.getSharedData().entrySet()){ UdpSharedGossipDataMessage message = new UdpSharedGossipDataMessage(); message.setUuid(UUID.randomUUID().toString()); @@ -127,7 +125,6 @@ public class ActiveGossipThread { message.setNodeId(innerEntry.getValue().getNodeId()); message.setTimestamp(innerEntry.getValue().getTimestamp()); message.setPayload(innerEntry.getValue().getPayload()); - message.setTimestamp(innerEntry.getValue().getTimestamp()); byte[] json_bytes = MAPPER.writeValueAsString(message).getBytes(); int packet_length = json_bytes.length; if (packet_length < GossipManager.MAX_PACKET_SIZE) { @@ -152,8 +149,7 @@ public class ActiveGossipThread { sendPerNodeDataHistogram.update(System.currentTimeMillis() - startTime); return; } - try (DatagramSocket socket = new DatagramSocket()) { - socket.setSoTimeout(gossipManager.getSettings().getGossipInterval()); + try { for (Entry> entry : gossipCore.getPerNodeData().entrySet()){ for (Entry innerEntry : entry.getValue().entrySet()){ UdpGossipDataMessage message = new UdpGossipDataMessage(); @@ -164,7 +160,6 @@ public class ActiveGossipThread { message.setNodeId(innerEntry.getValue().getNodeId()); message.setTimestamp(innerEntry.getValue().getTimestamp()); message.setPayload(innerEntry.getValue().getPayload()); - message.setTimestamp(innerEntry.getValue().getTimestamp()); byte[] json_bytes = MAPPER.writeValueAsString(message).getBytes(); int packet_length = json_bytes.length; if (packet_length < GossipManager.MAX_PACKET_SIZE) { @@ -190,12 +185,12 @@ public class ActiveGossipThread { LocalGossipMember member = selectPartner(gossipManager.getDeadMembers()); sendMembershipList(gossipManager.getMyself(), member); } + /** * Performs the sending of the membership list, after we have incremented our own heartbeat. */ protected void sendMembershipList(LocalGossipMember me, LocalGossipMember member) { long startTime = System.currentTimeMillis(); - me.setHeartbeat(System.nanoTime()); if (member == null) { LOGGER.debug("Send sendMembershipList() is called without action"); @@ -204,8 +199,7 @@ public class ActiveGossipThread { } else { LOGGER.debug("Send sendMembershipList() is called to " + member.toString()); } - try (DatagramSocket socket = new DatagramSocket()) { - socket.setSoTimeout(gossipManager.getSettings().getGossipInterval()); + try { UdpActiveGossipMessage message = new UdpActiveGossipMessage(); message.setUriFrom(gossipManager.getMyself().getUri().toASCIIString()); message.setUuid(UUID.randomUUID().toString()); diff --git a/src/main/java/org/apache/gossip/manager/GossipCore.java b/src/main/java/org/apache/gossip/manager/GossipCore.java index eaea8f6..de940c6 100644 --- a/src/main/java/org/apache/gossip/manager/GossipCore.java +++ b/src/main/java/org/apache/gossip/manager/GossipCore.java @@ -67,7 +67,6 @@ public class GossipCore { private final ConcurrentHashMap sharedData; private final BlockingQueue workQueue; - public GossipCore(GossipManager manager){ this.gossipManager = manager; requests = new ConcurrentHashMap<>(); @@ -175,6 +174,11 @@ public class GossipCore { } } + /** + * Sends a blocking message. Throws exception when tranmission fails + * @param message + * @param uri + */ private void sendInternal(Base message, URI uri){ byte[] json_bytes; try { @@ -186,6 +190,7 @@ public class GossipCore { if (packet_length < GossipManager.MAX_PACKET_SIZE) { byte[] buf = UdpUtil.createBuffer(packet_length, json_bytes); try (DatagramSocket socket = new DatagramSocket()) { + socket.setSoTimeout(gossipManager.getSettings().getGossipInterval() * 2); InetAddress dest = InetAddress.getByName(uri.getHost()); DatagramPacket datagramPacket = new DatagramPacket(buf, buf.length, dest, uri.getPort()); socket.send(datagramPacket); @@ -245,9 +250,14 @@ public class GossipCore { requests.remove(t.getUuid() + "/" + t.getUriFrom()); } } - } + /** + * Sends a message across the network while blocking. Catches and ignores IOException in transmission. Used + * when the protocol for the message is not to wait for a response + * @param message the message to send + * @param u the uri to send it to + */ public void sendOneWay(Base message, URI u){ byte[] json_bytes; try { @@ -259,13 +269,13 @@ public class GossipCore { if (packet_length < GossipManager.MAX_PACKET_SIZE) { byte[] buf = UdpUtil.createBuffer(packet_length, json_bytes); try (DatagramSocket socket = new DatagramSocket()) { + socket.setSoTimeout(gossipManager.getSettings().getGossipInterval() * 2); InetAddress dest = InetAddress.getByName(u.getHost()); DatagramPacket datagramPacket = new DatagramPacket(buf, buf.length, dest, u.getPort()); socket.send(datagramPacket); } catch (IOException ex) { } } } - /** * Merge lists from remote members and update heartbeats @@ -280,36 +290,31 @@ public class GossipCore { if (LOGGER.isDebugEnabled()){ debugState(senderMember, remoteList); } - // if the person sending to us is in the dead list consider them up for (LocalGossipMember i : gossipManager.getDeadMembers()) { if (i.getId().equals(senderMember.getId())) { LOGGER.debug(gossipManager.getMyself() + " contacted by dead member " + senderMember.getUri()); i.recordHeartbeat(senderMember.getHeartbeat()); i.setHeartbeat(senderMember.getHeartbeat()); - //TODO set node to UP here - + //TODO consider forcing an UP here } } for (GossipMember remoteMember : remoteList) { if (remoteMember.getId().equals(gossipManager.getMyself().getId())) { continue; } - LocalGossipMember m = new LocalGossipMember(remoteMember.getClusterName(), + LocalGossipMember aNewMember = new LocalGossipMember(remoteMember.getClusterName(), remoteMember.getUri(), remoteMember.getId(), remoteMember.getHeartbeat(), gossipManager.getSettings().getWindowSize(), gossipManager.getSettings().getMinimumSamples()); - m.recordHeartbeat(remoteMember.getHeartbeat()); - - Object result = gossipManager.getMembers().putIfAbsent(m, GossipState.UP); + aNewMember.recordHeartbeat(remoteMember.getHeartbeat()); + Object result = gossipManager.getMembers().putIfAbsent(aNewMember, GossipState.UP); if (result != null){ - for (Entry l : gossipManager.getMembers().entrySet()){ - if (l.getKey().getId().equals(remoteMember.getId())){ - //if (l.getKey().getHeartbeat() < remoteMember.getHeartbeat()){ - l.getKey().recordHeartbeat(remoteMember.getHeartbeat()); - l.getKey().setHeartbeat(remoteMember.getHeartbeat()); - //} + for (Entry localMember : gossipManager.getMembers().entrySet()){ + if (localMember.getKey().getId().equals(remoteMember.getId())){ + localMember.getKey().recordHeartbeat(remoteMember.getHeartbeat()); + localMember.getKey().setHeartbeat(remoteMember.getHeartbeat()); } } } diff --git a/src/main/java/org/apache/gossip/manager/GossipManager.java b/src/main/java/org/apache/gossip/manager/GossipManager.java index a5d57f5..bb8c7fa 100644 --- a/src/main/java/org/apache/gossip/manager/GossipManager.java +++ b/src/main/java/org/apache/gossip/manager/GossipManager.java @@ -31,6 +31,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; import org.apache.log4j.Logger; @@ -86,7 +87,7 @@ public abstract class GossipManager { clock = new SystemClock(); dataReaper = new DataReaper(gossipCore, clock); me = new LocalGossipMember(cluster, uri, id, clock.nanoTime(), - + settings.getWindowSize(), settings.getMinimumSamples()); + settings.getWindowSize(), settings.getMinimumSamples()); members = new ConcurrentSkipListMap<>(); for (GossipMember startupMember : gossipMembers) { if (!startupMember.equals(me)) { @@ -112,19 +113,15 @@ public abstract class GossipManager { return settings; } - // TODO: Use some java 8 goodness for these functions. - /** * @return a read only list of members found in the DOWN state. */ public List getDeadMembers() { - List down = new ArrayList<>(); - for (Entry entry : members.entrySet()) { - if (GossipState.DOWN.equals(entry.getValue())) { - down.add(entry.getKey()); - } - } - return Collections.unmodifiableList(down); + return Collections.unmodifiableList( + members.entrySet() + .stream() + .filter(entry -> GossipState.DOWN.equals(entry.getValue())) + .map(Entry::getKey).collect(Collectors.toList())); } /** @@ -132,13 +129,11 @@ public abstract class GossipManager { * @return a read only list of members found in the UP state */ public List getLiveMembers() { - List up = new ArrayList<>(); - for (Entry entry : members.entrySet()) { - if (GossipState.UP.equals(entry.getValue())) { - up.add(entry.getKey()); - } - } - return Collections.unmodifiableList(up); + return Collections.unmodifiableList( + members.entrySet() + .stream() + .filter(entry -> GossipState.UP.equals(entry.getValue())) + .map(Entry::getKey).collect(Collectors.toList())); } public LocalGossipMember getMyself() {