From ddc9a67dd60341c3f2326163810b2e4ede935f4e Mon Sep 17 00:00:00 2001 From: Edward Capriolo Date: Thu, 2 Jun 2016 09:23:55 -0400 Subject: [PATCH] Move to URI in model and configuration --- .../java/org/apache/gossip/GossipMember.java | 35 ++++-------- .../java/org/apache/gossip/GossipRunner.java | 5 +- .../java/org/apache/gossip/GossipService.java | 14 +++-- .../org/apache/gossip/LocalGossipMember.java | 13 +++-- .../org/apache/gossip/RemoteGossipMember.java | 19 +++---- .../org/apache/gossip/StartupSettings.java | 53 ++++++++----------- .../apache/gossip/examples/GossipExample.java | 16 +++--- .../apache/gossip/manager/GossipManager.java | 11 ++-- .../gossip/manager/PassiveGossipThread.java | 33 +++++++----- ...nlyProcessReceivedPassiveGossipThread.java | 33 ++++++------ .../impl/SendMembersActiveGossipThread.java | 7 ++- .../manager/random/RandomGossipManager.java | 5 +- .../org/apache/gossip/model/GossipMember.java | 28 ++++------ .../teknek/gossip/ShutdownDeadtimeTest.java | 15 ++++-- .../io/teknek/gossip/StartupSettingsTest.java | 11 ++-- .../teknek/gossip/TenNodeThreeSeedTest.java | 17 +++--- 16 files changed, 153 insertions(+), 162 deletions(-) diff --git a/src/main/java/org/apache/gossip/GossipMember.java b/src/main/java/org/apache/gossip/GossipMember.java index fd44ddd..dbc84b2 100644 --- a/src/main/java/org/apache/gossip/GossipMember.java +++ b/src/main/java/org/apache/gossip/GossipMember.java @@ -18,6 +18,7 @@ package org.apache.gossip; import java.net.InetSocketAddress; +import java.net.URI; /** * A abstract class representing a gossip member. @@ -27,9 +28,7 @@ import java.net.InetSocketAddress; public abstract class GossipMember implements Comparable { - protected final String host; - - protected final int port; + protected final URI uri; protected volatile long heartbeat; @@ -54,12 +53,11 @@ public abstract class GossipMember implements Comparable { * @param id * an id that may be replaced after contact */ - public GossipMember(String clusterName, String host, int port, String id, long heartbeat) { + public GossipMember(String clusterName, URI uri, String id, long heartbeat) { this.clusterName = clusterName; - this.host = host; - this.port = port; this.id = id; this.heartbeat = heartbeat; + this.uri = uri; } /** @@ -71,30 +69,13 @@ public abstract class GossipMember implements Comparable { return clusterName; } - /** - * Get the hostname or IP address of the remote gossip member. - * - * @return The hostname or IP address. - */ - public String getHost() { - return host; - } - - /** - * Get the port number of the remote gossip member. - * - * @return The port number. - */ - public int getPort() { - return port; - } - + /** * The member address in the form IP/host:port Similar to the toString in * {@link InetSocketAddress} */ public String getAddress() { - return host + ":" + port; + return uri.getHost() + ":" + uri.getPort(); } /** @@ -141,6 +122,10 @@ public abstract class GossipMember implements Comparable { return result; } + public URI getUri() { + return uri; + } + /** * @see java.lang.Object#equals(java.lang.Object) */ diff --git a/src/main/java/org/apache/gossip/GossipRunner.java b/src/main/java/org/apache/gossip/GossipRunner.java index d995cce..c765ed6 100644 --- a/src/main/java/org/apache/gossip/GossipRunner.java +++ b/src/main/java/org/apache/gossip/GossipRunner.java @@ -20,12 +20,13 @@ package org.apache.gossip; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; +import java.net.URISyntaxException; import org.json.JSONException; public class GossipRunner { - public static void main(String[] args) { + public static void main(String[] args) throws URISyntaxException { File configFile; if (args.length == 1) { configFile = new File("./" + args[0]); @@ -35,7 +36,7 @@ public class GossipRunner { new GossipRunner(configFile); } - public GossipRunner(File configFile) { + public GossipRunner(File configFile) throws URISyntaxException { if (configFile != null && configFile.exists()) { try { System.out.println("Parsing the configuration file..."); diff --git a/src/main/java/org/apache/gossip/GossipService.java b/src/main/java/org/apache/gossip/GossipService.java index 9db740e..3175706 100644 --- a/src/main/java/org/apache/gossip/GossipService.java +++ b/src/main/java/org/apache/gossip/GossipService.java @@ -18,6 +18,7 @@ package org.apache.gossip; import java.net.InetAddress; +import java.net.URI; import java.net.UnknownHostException; import java.util.List; @@ -45,8 +46,8 @@ public class GossipService { */ public GossipService(StartupSettings startupSettings) throws InterruptedException, UnknownHostException { - this(startupSettings.getCluster(), InetAddress.getLocalHost().getHostAddress(), startupSettings - .getPort(), startupSettings.getId(), startupSettings.getGossipMembers(), + this(startupSettings.getCluster(), startupSettings.getUri() + , startupSettings.getId(), startupSettings.getGossipMembers(), startupSettings.getGossipSettings(), null); } @@ -56,18 +57,15 @@ public class GossipService { * @throws InterruptedException * @throws UnknownHostException */ - public GossipService(String cluster, String ipAddress, int port, String id, + public GossipService(String cluster, URI uri, String id, List gossipMembers, GossipSettings settings, GossipListener listener) throws InterruptedException, UnknownHostException { - gossipManager = new RandomGossipManager(cluster, ipAddress, port, id, settings, gossipMembers, + gossipManager = new RandomGossipManager(cluster, uri, id, settings, gossipMembers, listener); } public void start() { - String address = get_gossipManager().getMyself().getHost() + ":" - + get_gossipManager().getMyself().getPort(); - LOGGER.debug("Starting: " + gossipManager.getName() + " - " + address); - + LOGGER.debug("Starting: " + gossipManager.getName() + " - " + get_gossipManager().getMyself().getUri()); gossipManager.start(); } diff --git a/src/main/java/org/apache/gossip/LocalGossipMember.java b/src/main/java/org/apache/gossip/LocalGossipMember.java index 55ce257..d7e9f4e 100644 --- a/src/main/java/org/apache/gossip/LocalGossipMember.java +++ b/src/main/java/org/apache/gossip/LocalGossipMember.java @@ -17,6 +17,8 @@ */ package org.apache.gossip; +import java.net.URI; + import javax.management.NotificationListener; /** @@ -32,10 +34,8 @@ public class LocalGossipMember extends GossipMember { /** * Constructor. * - * @param hostname - * The hostname or IP address. - * @param port - * The port number. + * @param uri + * The uri of the member * @param id * @param heartbeat * The current heartbeat. @@ -43,10 +43,9 @@ public class LocalGossipMember extends GossipMember { * @param cleanupTimeout * The cleanup timeout for this gossip member. */ - public LocalGossipMember(String clusterName, String hostname, int port, String id, + public LocalGossipMember(String clusterName, URI uri, String id, long heartbeat, NotificationListener notificationListener, int cleanupTimeout) { - super(clusterName, hostname, port, id, heartbeat); - + super(clusterName, uri, id, heartbeat); timeoutTimer = new GossipTimeoutTimer(cleanupTimeout, notificationListener, this); } diff --git a/src/main/java/org/apache/gossip/RemoteGossipMember.java b/src/main/java/org/apache/gossip/RemoteGossipMember.java index 899da93..88c568a 100644 --- a/src/main/java/org/apache/gossip/RemoteGossipMember.java +++ b/src/main/java/org/apache/gossip/RemoteGossipMember.java @@ -17,6 +17,8 @@ */ package org.apache.gossip; +import java.net.URI; + /** * The object represents a gossip member with the properties as received from a remote gossip * member. @@ -35,19 +37,12 @@ public class RemoteGossipMember extends GossipMember { * @param heartbeat * The current heartbeat. */ - public RemoteGossipMember(String clusterName, String hostname, int port, String id, long heartbeat) { - super(clusterName, hostname, port, id, heartbeat); + public RemoteGossipMember(String clusterName, URI uri, String id, long heartbeat) { + super(clusterName, uri, id, heartbeat); } - /** - * Construct a RemoteGossipMember with a heartbeat of 0. - * - * @param hostname - * The hostname or IP address. - * @param port - * The port number. - */ - public RemoteGossipMember(String clusterName, String hostname, int port, String id) { - super(clusterName, hostname, port, id, System.currentTimeMillis()); + public RemoteGossipMember(String clusterName, URI uri, String id) { + super(clusterName, uri, id, System.currentTimeMillis()); } + } diff --git a/src/main/java/org/apache/gossip/StartupSettings.java b/src/main/java/org/apache/gossip/StartupSettings.java index 176a79b..9475536 100644 --- a/src/main/java/org/apache/gossip/StartupSettings.java +++ b/src/main/java/org/apache/gossip/StartupSettings.java @@ -22,6 +22,8 @@ import java.io.File; import java.io.FileNotFoundException; import java.io.FileReader; import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; import java.util.ArrayList; import java.util.List; @@ -41,9 +43,8 @@ public class StartupSettings { /** The id to use fo the service */ private String id; - /** The port to start the gossip service on. */ - private int port; - + private URI uri; + private String cluster; /** The gossip settings used at startup. */ @@ -62,8 +63,16 @@ public class StartupSettings { * @param logLevel * unused */ - public StartupSettings(String id, int port, int logLevel, String cluster) { - this(id, port, new GossipSettings(), cluster); + public StartupSettings(String id, URI uri, int logLevel, String cluster) { + this(id, uri, new GossipSettings(), cluster); + } + + public URI getUri() { + return uri; + } + + public void setUri(URI uri) { + this.uri = uri; } /** @@ -74,9 +83,9 @@ public class StartupSettings { * @param port * The port to start the service on. */ - public StartupSettings(String id, int port, GossipSettings gossipSettings, String cluster) { + public StartupSettings(String id, URI uri, GossipSettings gossipSettings, String cluster) { this.id = id; - this.port = port; + this.uri = uri; this.gossipSettings = gossipSettings; this.setCluster(cluster); gossipMembers = new ArrayList<>(); @@ -109,25 +118,6 @@ public class StartupSettings { return id; } - /** - * Set the port of the gossip service. - * - * @param port - * The port for the gossip service. - */ - public void setPort(int port) { - this.port = port; - } - - /** - * Get the port for the gossip service. - * - * @return The port of the gossip service. - */ - public int getPort() { - return port; - } - /** * Get the GossipSettings. * @@ -168,9 +158,10 @@ public class StartupSettings { * Thrown when the file cannot be found. * @throws IOException * Thrown when reading the file gives problems. + * @throws URISyntaxException */ public static StartupSettings fromJSONFile(File jsonFile) throws JSONException, - FileNotFoundException, IOException { + FileNotFoundException, IOException, URISyntaxException { // Read the file to a String. StringBuffer buffer = new StringBuffer(); try (BufferedReader br = new BufferedReader(new FileReader(jsonFile)) ){ @@ -181,7 +172,7 @@ public class StartupSettings { } JSONObject jsonObject = new JSONArray(buffer.toString()).getJSONObject(0); - int port = jsonObject.getInt("port"); + String uri = jsonObject.getString("uri"); String id = jsonObject.getString("id"); int gossipInterval = jsonObject.getInt("gossip_interval"); int cleanupInterval = jsonObject.getInt("cleanup_interval"); @@ -189,7 +180,8 @@ public class StartupSettings { if (cluster == null){ throw new IllegalArgumentException("cluster was null. It is required"); } - StartupSettings settings = new StartupSettings(id, port, new GossipSettings(gossipInterval, + URI uri2 = new URI(uri); + StartupSettings settings = new StartupSettings(id, uri2, new GossipSettings(gossipInterval, cleanupInterval), cluster); // Now iterate over the members from the config file and add them to the settings. @@ -197,8 +189,9 @@ public class StartupSettings { JSONArray membersJSON = jsonObject.getJSONArray("members"); for (int i = 0; i < membersJSON.length(); i++) { JSONObject memberJSON = membersJSON.getJSONObject(i); + URI uri3 = new URI(memberJSON.getString("uri")); RemoteGossipMember member = new RemoteGossipMember(memberJSON.getString("cluster"), - memberJSON.getString("host"), memberJSON.getInt("port"), ""); + uri3, "", 0); settings.addGossipMember(member); configMembersDetails += member.getAddress(); if (i < (membersJSON.length() - 1)) diff --git a/src/main/java/org/apache/gossip/examples/GossipExample.java b/src/main/java/org/apache/gossip/examples/GossipExample.java index e953c77..cea59f4 100644 --- a/src/main/java/org/apache/gossip/examples/GossipExample.java +++ b/src/main/java/org/apache/gossip/examples/GossipExample.java @@ -18,6 +18,8 @@ package org.apache.gossip.examples; import java.net.InetAddress; +import java.net.URI; +import java.net.URISyntaxException; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.List; @@ -57,26 +59,28 @@ public class GossipExample extends Thread { public void run() { try { GossipSettings settings = new GossipSettings(); - List clients = new ArrayList<>(); - - // Get my ip address. String myIpAddress = InetAddress.getLocalHost().getHostAddress(); - String cluster = "My Gossip Cluster"; // Create the gossip members and put them in a list and give them a port number starting with // 2000. List startupMembers = new ArrayList<>(); for (int i = 0; i < NUMBER_OF_CLIENTS; ++i) { - startupMembers.add(new RemoteGossipMember(cluster, myIpAddress, 2000 + i, "")); + URI u; + try { + u = new URI("udp://" + myIpAddress + ":" + (2000 + i)); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + startupMembers.add(new RemoteGossipMember(cluster, u, "", 0 )); } // Lets start the gossip clients. // Start the clients, waiting cleaning-interval + 1 second between them which will show the // dead list handling. for (GossipMember member : startupMembers) { - GossipService gossipService = new GossipService(cluster, myIpAddress, member.getPort(), "", + GossipService gossipService = new GossipService(cluster, member.getUri(), "", startupMembers, settings, null); clients.add(gossipService); gossipService.start(); diff --git a/src/main/java/org/apache/gossip/manager/GossipManager.java b/src/main/java/org/apache/gossip/manager/GossipManager.java index 80cadf7..363a4a9 100644 --- a/src/main/java/org/apache/gossip/manager/GossipManager.java +++ b/src/main/java/org/apache/gossip/manager/GossipManager.java @@ -18,6 +18,8 @@ package org.apache.gossip.manager; import java.lang.reflect.InvocationTargetException; +import java.net.BindException; +import java.net.URI; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -68,18 +70,18 @@ public abstract class GossipManager extends Thread implements NotificationListen public GossipManager(Class passiveGossipThreadClass, Class activeGossipThreadClass, String cluster, - String address, int port, String id, GossipSettings settings, + URI uri, String id, GossipSettings settings, List gossipMembers, GossipListener listener) { this.passiveGossipThreadClass = passiveGossipThreadClass; this.activeGossipThreadClass = activeGossipThreadClass; this.settings = settings; - me = new LocalGossipMember(cluster, address, port, id, System.currentTimeMillis(), this, + me = new LocalGossipMember(cluster, uri, id, System.currentTimeMillis(), this, settings.getCleanupInterval()); members = new ConcurrentSkipListMap<>(); for (GossipMember startupMember : gossipMembers) { if (!startupMember.equals(me)) { LocalGossipMember member = new LocalGossipMember(startupMember.getClusterName(), - startupMember.getHost(), startupMember.getPort(), startupMember.getId(), + startupMember.getUri(), startupMember.getId(), System.currentTimeMillis(), this, settings.getCleanupInterval()); members.put(member, GossipState.UP); GossipService.LOGGER.debug(member); @@ -180,6 +182,9 @@ public abstract class GossipManager extends Thread implements NotificationListen gossipThreadExecutor.execute(activeGossipThread); } catch (InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException | NoSuchMethodException | SecurityException e1) { + if (e1 instanceof BindException){ + LOGGER.fatal("could not bind to "+ me.getUri() + " " + me.getAddress()); + } throw new RuntimeException(e1); } GossipService.LOGGER.debug("The GossipService is started."); diff --git a/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java b/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java index bd7354e..a057e7d 100644 --- a/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java +++ b/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java @@ -23,6 +23,8 @@ import java.net.DatagramSocket; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.net.SocketException; +import java.net.URI; +import java.net.URISyntaxException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; @@ -58,18 +60,18 @@ abstract public class PassiveGossipThread implements Runnable { public PassiveGossipThread(GossipManager gossipManager) { this.gossipManager = gossipManager; try { - SocketAddress socketAddress = new InetSocketAddress(gossipManager.getMyself().getHost(), - gossipManager.getMyself().getPort()); + SocketAddress socketAddress = new InetSocketAddress(gossipManager.getMyself().getUri().getHost(), + gossipManager.getMyself().getUri().getPort()); server = new DatagramSocket(socketAddress); - GossipService.LOGGER.debug("Gossip service successfully initialized on port " - + gossipManager.getMyself().getPort()); - GossipService.LOGGER.debug("I am " + gossipManager.getMyself()); + LOGGER.debug("Gossip service successfully initialized on port " + + gossipManager.getMyself().getUri().getPort()); + LOGGER.debug("I am " + gossipManager.getMyself()); cluster = gossipManager.getMyself().getClusterName(); if (cluster == null){ throw new IllegalArgumentException("cluster was null"); } } catch (SocketException ex) { - GossipService.LOGGER.warn(ex); + LOGGER.warn(ex); throw new RuntimeException(ex); } keepRunning = new AtomicBoolean(true); @@ -103,14 +105,20 @@ abstract public class PassiveGossipThread implements Runnable { ActiveGossipMessage activeGossipMessage = MAPPER.readValue(json_bytes, ActiveGossipMessage.class); for (int i = 0; i < activeGossipMessage.getMembers().size(); i++) { + URI u = null; + try { + u = new URI(activeGossipMessage.getMembers().get(i).getUri()); + } catch (URISyntaxException e) { + LOGGER.debug("Gossip message with faulty URI", e); + continue; + } RemoteGossipMember member = new RemoteGossipMember( activeGossipMessage.getMembers().get(i).getCluster(), - activeGossipMessage.getMembers().get(i).getHost(), - activeGossipMessage.getMembers().get(i).getPort(), + u, activeGossipMessage.getMembers().get(i).getId(), activeGossipMessage.getMembers().get(i).getHeartbeat()); if (!(member.getClusterName().equals(cluster))){ - GossipService.LOGGER.warn("Note a member of this cluster " + i); + LOGGER.warn("Note a member of this cluster " + i); continue; } // This is the first member found, so this should be the member who is communicating @@ -122,16 +130,15 @@ abstract public class PassiveGossipThread implements Runnable { } mergeLists(gossipManager, senderMember, remoteGossipMembers); } catch (RuntimeException ex) { - GossipService.LOGGER.error("Unable to process message", ex); + LOGGER.error("Unable to process message", ex); } } else { - GossipService.LOGGER + LOGGER .error("The received message is not of the expected size, it has been dropped."); } } catch (IOException e) { - GossipService.LOGGER.error(e); - System.out.println(e); + LOGGER.error(e); keepRunning.set(false); } } diff --git a/src/main/java/org/apache/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java b/src/main/java/org/apache/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java index edf21f3..d0acfc1 100644 --- a/src/main/java/org/apache/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java +++ b/src/main/java/org/apache/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java @@ -25,8 +25,11 @@ import org.apache.gossip.LocalGossipMember; import org.apache.gossip.RemoteGossipMember; import org.apache.gossip.manager.GossipManager; import org.apache.gossip.manager.PassiveGossipThread; +import org.apache.log4j.Logger; public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread { + + public static final Logger LOGGER = Logger.getLogger(OnlyProcessReceivedPassiveGossipThread.class); public OnlyProcessReceivedPassiveGossipThread(GossipManager gossipManager) { super(gossipManager); @@ -47,9 +50,9 @@ public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread // if the person sending to us is in the dead list consider them up for (LocalGossipMember i : gossipManager.getDeadList()) { if (i.getId().equals(senderMember.getId())) { - System.out.println(gossipManager.getMyself() + " caught a live one!"); + LOGGER.info(gossipManager.getMyself() + " contacted by dead member " + senderMember.getUri()); LocalGossipMember newLocalMember = new LocalGossipMember(senderMember.getClusterName(), - senderMember.getHost(), senderMember.getPort(), senderMember.getId(), + senderMember.getUri(), senderMember.getId(), senderMember.getHeartbeat(), gossipManager, gossipManager.getSettings() .getCleanupInterval()); gossipManager.revivieMember(newLocalMember); @@ -70,7 +73,7 @@ public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread } else if (!gossipManager.getMemberList().contains(remoteMember) && !gossipManager.getDeadList().contains(remoteMember)) { LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getClusterName(), - remoteMember.getHost(), remoteMember.getPort(), remoteMember.getId(), + remoteMember.getUri(), remoteMember.getId(), remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings() .getCleanupInterval()); gossipManager.createOrRevivieMember(newLocalMember); @@ -81,26 +84,26 @@ public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread gossipManager.getDeadList().indexOf(remoteMember)); if (remoteMember.getHeartbeat() > localDeadMember.getHeartbeat()) { LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getClusterName(), - remoteMember.getHost(), remoteMember.getPort(), remoteMember.getId(), + remoteMember.getUri(), remoteMember.getId(), remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings() .getCleanupInterval()); gossipManager.revivieMember(newLocalMember); newLocalMember.startTimeoutTimer(); - GossipService.LOGGER.debug("Removed remote member " + remoteMember.getAddress() + LOGGER.debug("Removed remote member " + remoteMember.getAddress() + " from dead list and added to local member list."); } else { - GossipService.LOGGER.debug("me " + gossipManager.getMyself()); - GossipService.LOGGER.debug("sender " + senderMember); - GossipService.LOGGER.debug("remote " + remoteList); - GossipService.LOGGER.debug("live " + gossipManager.getMemberList()); - GossipService.LOGGER.debug("dead " + gossipManager.getDeadList()); + LOGGER.debug("me " + gossipManager.getMyself()); + LOGGER.debug("sender " + senderMember); + LOGGER.debug("remote " + remoteList); + LOGGER.debug("live " + gossipManager.getMemberList()); + LOGGER.debug("dead " + gossipManager.getDeadList()); } } else { - GossipService.LOGGER.debug("me " + gossipManager.getMyself()); - GossipService.LOGGER.debug("sender " + senderMember); - GossipService.LOGGER.debug("remote " + remoteList); - GossipService.LOGGER.debug("live " + gossipManager.getMemberList()); - GossipService.LOGGER.debug("dead " + gossipManager.getDeadList()); + LOGGER.debug("me " + gossipManager.getMyself()); + LOGGER.debug("sender " + senderMember); + LOGGER.debug("remote " + remoteList); + LOGGER.debug("live " + gossipManager.getMemberList()); + LOGGER.debug("dead " + gossipManager.getDeadList()); // throw new IllegalArgumentException("wtf"); } } diff --git a/src/main/java/org/apache/gossip/manager/impl/SendMembersActiveGossipThread.java b/src/main/java/org/apache/gossip/manager/impl/SendMembersActiveGossipThread.java index 16d0d32..c296156 100644 --- a/src/main/java/org/apache/gossip/manager/impl/SendMembersActiveGossipThread.java +++ b/src/main/java/org/apache/gossip/manager/impl/SendMembersActiveGossipThread.java @@ -44,9 +44,8 @@ abstract public class SendMembersActiveGossipThread extends ActiveGossipThread { GossipMember gm = new GossipMember(); gm.setCluster(member.getClusterName()); gm.setHeartbeat(member.getHeartbeat()); - gm.setHost(member.getHost()); + gm.setUri(member.getUri().toASCIIString()); gm.setId(member.getId()); - gm.setPort(member.getPort()); return gm; } @@ -62,7 +61,7 @@ abstract public class SendMembersActiveGossipThread extends ActiveGossipThread { } try (DatagramSocket socket = new DatagramSocket()) { socket.setSoTimeout(gossipManager.getSettings().getGossipInterval()); - InetAddress dest = InetAddress.getByName(member.getHost()); + InetAddress dest = InetAddress.getByName(member.getUri().getHost()); ActiveGossipMessage message = new ActiveGossipMessage(); message.getMembers().add(convert(me)); for (LocalGossipMember other : memberList) { @@ -72,7 +71,7 @@ abstract public class SendMembersActiveGossipThread extends ActiveGossipThread { int packet_length = json_bytes.length; if (packet_length < GossipManager.MAX_PACKET_SIZE) { byte[] buf = createBuffer(packet_length, json_bytes); - DatagramPacket datagramPacket = new DatagramPacket(buf, buf.length, dest, member.getPort()); + DatagramPacket datagramPacket = new DatagramPacket(buf, buf.length, dest, member.getUri().getPort()); socket.send(datagramPacket); } else { GossipService.LOGGER.error("The length of the to be send message is too large (" diff --git a/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java b/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java index 0122610..7aa4435 100644 --- a/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java +++ b/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java @@ -23,12 +23,13 @@ import org.apache.gossip.event.GossipListener; import org.apache.gossip.manager.GossipManager; import org.apache.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread; +import java.net.URI; import java.util.List; public class RandomGossipManager extends GossipManager { - public RandomGossipManager(String cluster, String address, int port, String id, + public RandomGossipManager(String cluster, URI uri, String id, GossipSettings settings, List gossipMembers, GossipListener listener) { super(OnlyProcessReceivedPassiveGossipThread.class, RandomActiveGossipThread.class, cluster, - address, port, id, settings, gossipMembers, listener); + uri, id, settings, gossipMembers, listener); } } diff --git a/src/main/java/org/apache/gossip/model/GossipMember.java b/src/main/java/org/apache/gossip/model/GossipMember.java index 8dc6bf7..413ab71 100644 --- a/src/main/java/org/apache/gossip/model/GossipMember.java +++ b/src/main/java/org/apache/gossip/model/GossipMember.java @@ -3,8 +3,7 @@ package org.apache.gossip.model; public class GossipMember { private String cluster; - private String host; - private Integer port; + private String uri; private String id; private Long heartbeat; @@ -12,12 +11,11 @@ public class GossipMember { } - public GossipMember(String cluster, String host, Integer port, String id, Long heartbeat){ - this.cluster=cluster; - this.host= host; - this.port = port; + public GossipMember(String cluster, String uri, String id, Long heartbeat){ + this.cluster = cluster; + this.uri = uri; this.id = id; - + this.heartbeat = heartbeat; } public String getCluster() { @@ -28,20 +26,12 @@ public class GossipMember { this.cluster = cluster; } - public String getHost() { - return host; + public String getUri() { + return uri; } - public void setHost(String host) { - this.host = host; - } - - public Integer getPort() { - return port; - } - - public void setPort(Integer port) { - this.port = port; + public void setUri(String uri) { + this.uri = uri; } public String getId() { diff --git a/src/test/java/io/teknek/gossip/ShutdownDeadtimeTest.java b/src/test/java/io/teknek/gossip/ShutdownDeadtimeTest.java index 2d8190b..340886a 100644 --- a/src/test/java/io/teknek/gossip/ShutdownDeadtimeTest.java +++ b/src/test/java/io/teknek/gossip/ShutdownDeadtimeTest.java @@ -19,6 +19,8 @@ package io.teknek.gossip; import io.teknek.tunit.TUnit; +import java.net.URI; +import java.net.URISyntaxException; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.List; @@ -43,7 +45,7 @@ public class ShutdownDeadtimeTest { private static final Logger log = Logger.getLogger(ShutdownDeadtimeTest.class ); @Test //@Ignore - public void DeadNodesDoNotComeAliveAgain() throws InterruptedException, UnknownHostException { + public void DeadNodesDoNotComeAliveAgain() throws InterruptedException, UnknownHostException, URISyntaxException { GossipSettings settings = new GossipSettings(1000, 10000); String cluster = UUID.randomUUID().toString(); @@ -51,7 +53,8 @@ public class ShutdownDeadtimeTest { int seedNodes = 3; List startupMembers = new ArrayList<>(); for (int i = 1; i < seedNodes + 1; ++i) { - startupMembers.add(new RemoteGossipMember(cluster, "127.0.0.1", 50000 + i, i + "")); + URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i)); + startupMembers.add(new RemoteGossipMember(cluster, uri, i + "")); } log.info( "Adding clients" ); @@ -59,7 +62,8 @@ public class ShutdownDeadtimeTest { final int clusterMembers = 5; for (int i = 1; i < clusterMembers+1; ++i) { final int j = i; - GossipService gossipService = new GossipService(cluster, "127.0.0.1", 50000 + i, i + "", + URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i)); + GossipService gossipService = new GossipService(cluster, uri, i + "", startupMembers, settings, new GossipListener(){ @Override @@ -83,7 +87,7 @@ public class ShutdownDeadtimeTest { Random r = new Random(); int randomClientId = r.nextInt(clusterMembers); log.info( "shutting down " + randomClientId ); - final int shutdownPort = clients.get(randomClientId).get_gossipManager().getMyself().getPort(); + final int shutdownPort = clients.get(randomClientId).get_gossipManager().getMyself().getUri().getPort(); final String shutdownId = clients.get(randomClientId).get_gossipManager().getMyself().getId(); clients.get(randomClientId).shutdown(); TUnit.assertThat(new Callable (){ @@ -105,8 +109,9 @@ public class ShutdownDeadtimeTest { return total; }}).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(4); + URI uri = new URI("udp://" + "127.0.0.1" + ":" + shutdownPort); // start client again - GossipService gossipService = new GossipService(cluster, "127.0.0.1", shutdownPort, shutdownId + "", + GossipService gossipService = new GossipService(cluster, uri, shutdownId + "", startupMembers, settings, new GossipListener(){ @Override diff --git a/src/test/java/io/teknek/gossip/StartupSettingsTest.java b/src/test/java/io/teknek/gossip/StartupSettingsTest.java index aa4e404..a4a9011 100644 --- a/src/test/java/io/teknek/gossip/StartupSettingsTest.java +++ b/src/test/java/io/teknek/gossip/StartupSettingsTest.java @@ -30,6 +30,8 @@ import io.teknek.tunit.TUnit; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; import java.util.ArrayList; import java.util.UUID; import java.util.concurrent.Callable; @@ -44,13 +46,14 @@ public class StartupSettingsTest { private static final String CLUSTER = UUID.randomUUID().toString(); @Test - public void testUsingSettingsFile() throws IOException, InterruptedException, JSONException { + public void testUsingSettingsFile() throws IOException, InterruptedException, JSONException, URISyntaxException { File settingsFile = File.createTempFile("gossipTest",".json"); log.debug( "Using settings file: " + settingsFile.getAbsolutePath() ); settingsFile.deleteOnExit(); writeSettingsFile(settingsFile); + URI uri = new URI("udp://" + "127.0.0.1" + ":" + 50000); final GossipService firstService = new GossipService( - CLUSTER, "127.0.0.1", 50000, UUID.randomUUID().toString(), + CLUSTER, uri, UUID.randomUUID().toString(), new ArrayList(), new GossipSettings(), null); firstService.start(); @@ -76,11 +79,11 @@ public class StartupSettingsTest { "[{\n" + // It is odd that this is meant to be in an array, but oh well. " \"cluster\":\"" + CLUSTER + "\",\n" + " \"id\":\"" + UUID.randomUUID() + "\",\n" + - " \"port\":50001,\n" + + " \"uri\":\"udp://127.0.0.1:50001\",\n" + " \"gossip_interval\":1000,\n" + " \"cleanup_interval\":10000,\n" + " \"members\":[\n" + - " {\"cluster\": \"" + CLUSTER + "\",\"host\":\"127.0.0.1\", \"port\":50000}\n" + + " {\"cluster\": \"" + CLUSTER + "\",\"uri\":\"udp://127.0.0.1:5000\"}\n" + " ]\n" + "}]"; diff --git a/src/test/java/io/teknek/gossip/TenNodeThreeSeedTest.java b/src/test/java/io/teknek/gossip/TenNodeThreeSeedTest.java index 4e731ae..2b5f7fe 100644 --- a/src/test/java/io/teknek/gossip/TenNodeThreeSeedTest.java +++ b/src/test/java/io/teknek/gossip/TenNodeThreeSeedTest.java @@ -19,6 +19,8 @@ package io.teknek.gossip; import io.teknek.tunit.TUnit; +import java.net.URI; +import java.net.URISyntaxException; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.List; @@ -41,16 +43,16 @@ public class TenNodeThreeSeedTest { private static final Logger log = Logger.getLogger( TenNodeThreeSeedTest.class ); @Test - public void test() throws UnknownHostException, InterruptedException{ + public void test() throws UnknownHostException, InterruptedException, URISyntaxException{ abc(); } @Test - public void testAgain() throws UnknownHostException, InterruptedException{ + public void testAgain() throws UnknownHostException, InterruptedException, URISyntaxException{ abc(); } - public void abc() throws InterruptedException, UnknownHostException{ + public void abc() throws InterruptedException, UnknownHostException, URISyntaxException{ GossipSettings settings = new GossipSettings(); String cluster = UUID.randomUUID().toString(); @@ -58,14 +60,16 @@ public class TenNodeThreeSeedTest { int seedNodes = 3; List startupMembers = new ArrayList<>(); for (int i = 1; i < seedNodes+1; ++i) { - startupMembers.add(new RemoteGossipMember(cluster, "127.0.0.1", 50000 + i, i + "")); + URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i)); + startupMembers.add(new RemoteGossipMember(cluster, uri, i + "")); } log.info( "Adding clients" ); final List clients = new ArrayList<>(); final int clusterMembers = 5; for (int i = 1; i < clusterMembers+1; ++i) { - GossipService gossipService = new GossipService(cluster, "127.0.0.1", 50000 + i, i + "", + URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i)); + GossipService gossipService = new GossipService(cluster, uri, i + "", startupMembers, settings, new GossipListener(){ @Override @@ -75,7 +79,6 @@ public class TenNodeThreeSeedTest { }); clients.add(gossipService); gossipService.start(); - gossipService.get_gossipManager().getMemberList(); } TUnit.assertThat(new Callable (){ public Integer call() throws Exception { @@ -84,7 +87,7 @@ public class TenNodeThreeSeedTest { total += clients.get(i).get_gossipManager().getMemberList().size(); } return total; - }}).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(20); + }}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(20); for (int i = 0; i < clusterMembers; ++i) { clients.get(i).shutdown();