Move to URI in model and configuration
This commit is contained in:
@ -18,6 +18,7 @@
|
||||
package org.apache.gossip;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.URI;
|
||||
|
||||
/**
|
||||
* A abstract class representing a gossip member.
|
||||
@ -27,9 +28,7 @@ import java.net.InetSocketAddress;
|
||||
public abstract class GossipMember implements Comparable<GossipMember> {
|
||||
|
||||
|
||||
protected final String host;
|
||||
|
||||
protected final int port;
|
||||
protected final URI uri;
|
||||
|
||||
protected volatile long heartbeat;
|
||||
|
||||
@ -54,12 +53,11 @@ public abstract class GossipMember implements Comparable<GossipMember> {
|
||||
* @param id
|
||||
* an id that may be replaced after contact
|
||||
*/
|
||||
public GossipMember(String clusterName, String host, int port, String id, long heartbeat) {
|
||||
public GossipMember(String clusterName, URI uri, String id, long heartbeat) {
|
||||
this.clusterName = clusterName;
|
||||
this.host = host;
|
||||
this.port = port;
|
||||
this.id = id;
|
||||
this.heartbeat = heartbeat;
|
||||
this.uri = uri;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -71,30 +69,13 @@ public abstract class GossipMember implements Comparable<GossipMember> {
|
||||
return clusterName;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the hostname or IP address of the remote gossip member.
|
||||
*
|
||||
* @return The hostname or IP address.
|
||||
*/
|
||||
public String getHost() {
|
||||
return host;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the port number of the remote gossip member.
|
||||
*
|
||||
* @return The port number.
|
||||
*/
|
||||
public int getPort() {
|
||||
return port;
|
||||
}
|
||||
|
||||
/**
|
||||
* The member address in the form IP/host:port Similar to the toString in
|
||||
* {@link InetSocketAddress}
|
||||
*/
|
||||
public String getAddress() {
|
||||
return host + ":" + port;
|
||||
return uri.getHost() + ":" + uri.getPort();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -141,6 +122,10 @@ public abstract class GossipMember implements Comparable<GossipMember> {
|
||||
return result;
|
||||
}
|
||||
|
||||
public URI getUri() {
|
||||
return uri;
|
||||
}
|
||||
|
||||
/**
|
||||
* @see java.lang.Object#equals(java.lang.Object)
|
||||
*/
|
||||
|
@ -20,12 +20,13 @@ package org.apache.gossip;
|
||||
import java.io.File;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.net.URISyntaxException;
|
||||
|
||||
import org.json.JSONException;
|
||||
|
||||
public class GossipRunner {
|
||||
|
||||
public static void main(String[] args) {
|
||||
public static void main(String[] args) throws URISyntaxException {
|
||||
File configFile;
|
||||
if (args.length == 1) {
|
||||
configFile = new File("./" + args[0]);
|
||||
@ -35,7 +36,7 @@ public class GossipRunner {
|
||||
new GossipRunner(configFile);
|
||||
}
|
||||
|
||||
public GossipRunner(File configFile) {
|
||||
public GossipRunner(File configFile) throws URISyntaxException {
|
||||
if (configFile != null && configFile.exists()) {
|
||||
try {
|
||||
System.out.println("Parsing the configuration file...");
|
||||
|
@ -18,6 +18,7 @@
|
||||
package org.apache.gossip;
|
||||
|
||||
import java.net.InetAddress;
|
||||
import java.net.URI;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.List;
|
||||
|
||||
@ -45,8 +46,8 @@ public class GossipService {
|
||||
*/
|
||||
public GossipService(StartupSettings startupSettings) throws InterruptedException,
|
||||
UnknownHostException {
|
||||
this(startupSettings.getCluster(), InetAddress.getLocalHost().getHostAddress(), startupSettings
|
||||
.getPort(), startupSettings.getId(), startupSettings.getGossipMembers(),
|
||||
this(startupSettings.getCluster(), startupSettings.getUri()
|
||||
, startupSettings.getId(), startupSettings.getGossipMembers(),
|
||||
startupSettings.getGossipSettings(), null);
|
||||
}
|
||||
|
||||
@ -56,18 +57,15 @@ public class GossipService {
|
||||
* @throws InterruptedException
|
||||
* @throws UnknownHostException
|
||||
*/
|
||||
public GossipService(String cluster, String ipAddress, int port, String id,
|
||||
public GossipService(String cluster, URI uri, String id,
|
||||
List<GossipMember> gossipMembers, GossipSettings settings, GossipListener listener)
|
||||
throws InterruptedException, UnknownHostException {
|
||||
gossipManager = new RandomGossipManager(cluster, ipAddress, port, id, settings, gossipMembers,
|
||||
gossipManager = new RandomGossipManager(cluster, uri, id, settings, gossipMembers,
|
||||
listener);
|
||||
}
|
||||
|
||||
public void start() {
|
||||
String address = get_gossipManager().getMyself().getHost() + ":"
|
||||
+ get_gossipManager().getMyself().getPort();
|
||||
LOGGER.debug("Starting: " + gossipManager.getName() + " - " + address);
|
||||
|
||||
LOGGER.debug("Starting: " + gossipManager.getName() + " - " + get_gossipManager().getMyself().getUri());
|
||||
gossipManager.start();
|
||||
}
|
||||
|
||||
|
@ -17,6 +17,8 @@
|
||||
*/
|
||||
package org.apache.gossip;
|
||||
|
||||
import java.net.URI;
|
||||
|
||||
import javax.management.NotificationListener;
|
||||
|
||||
/**
|
||||
@ -32,10 +34,8 @@ public class LocalGossipMember extends GossipMember {
|
||||
/**
|
||||
* Constructor.
|
||||
*
|
||||
* @param hostname
|
||||
* The hostname or IP address.
|
||||
* @param port
|
||||
* The port number.
|
||||
* @param uri
|
||||
* The uri of the member
|
||||
* @param id
|
||||
* @param heartbeat
|
||||
* The current heartbeat.
|
||||
@ -43,10 +43,9 @@ public class LocalGossipMember extends GossipMember {
|
||||
* @param cleanupTimeout
|
||||
* The cleanup timeout for this gossip member.
|
||||
*/
|
||||
public LocalGossipMember(String clusterName, String hostname, int port, String id,
|
||||
public LocalGossipMember(String clusterName, URI uri, String id,
|
||||
long heartbeat, NotificationListener notificationListener, int cleanupTimeout) {
|
||||
super(clusterName, hostname, port, id, heartbeat);
|
||||
|
||||
super(clusterName, uri, id, heartbeat);
|
||||
timeoutTimer = new GossipTimeoutTimer(cleanupTimeout, notificationListener, this);
|
||||
}
|
||||
|
||||
|
@ -17,6 +17,8 @@
|
||||
*/
|
||||
package org.apache.gossip;
|
||||
|
||||
import java.net.URI;
|
||||
|
||||
/**
|
||||
* The object represents a gossip member with the properties as received from a remote gossip
|
||||
* member.
|
||||
@ -35,19 +37,12 @@ public class RemoteGossipMember extends GossipMember {
|
||||
* @param heartbeat
|
||||
* The current heartbeat.
|
||||
*/
|
||||
public RemoteGossipMember(String clusterName, String hostname, int port, String id, long heartbeat) {
|
||||
super(clusterName, hostname, port, id, heartbeat);
|
||||
public RemoteGossipMember(String clusterName, URI uri, String id, long heartbeat) {
|
||||
super(clusterName, uri, id, heartbeat);
|
||||
}
|
||||
|
||||
/**
|
||||
* Construct a RemoteGossipMember with a heartbeat of 0.
|
||||
*
|
||||
* @param hostname
|
||||
* The hostname or IP address.
|
||||
* @param port
|
||||
* The port number.
|
||||
*/
|
||||
public RemoteGossipMember(String clusterName, String hostname, int port, String id) {
|
||||
super(clusterName, hostname, port, id, System.currentTimeMillis());
|
||||
public RemoteGossipMember(String clusterName, URI uri, String id) {
|
||||
super(clusterName, uri, id, System.currentTimeMillis());
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -22,6 +22,8 @@ import java.io.File;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.FileReader;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
@ -41,8 +43,7 @@ public class StartupSettings {
|
||||
/** The id to use fo the service */
|
||||
private String id;
|
||||
|
||||
/** The port to start the gossip service on. */
|
||||
private int port;
|
||||
private URI uri;
|
||||
|
||||
private String cluster;
|
||||
|
||||
@ -62,8 +63,16 @@ public class StartupSettings {
|
||||
* @param logLevel
|
||||
* unused
|
||||
*/
|
||||
public StartupSettings(String id, int port, int logLevel, String cluster) {
|
||||
this(id, port, new GossipSettings(), cluster);
|
||||
public StartupSettings(String id, URI uri, int logLevel, String cluster) {
|
||||
this(id, uri, new GossipSettings(), cluster);
|
||||
}
|
||||
|
||||
public URI getUri() {
|
||||
return uri;
|
||||
}
|
||||
|
||||
public void setUri(URI uri) {
|
||||
this.uri = uri;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -74,9 +83,9 @@ public class StartupSettings {
|
||||
* @param port
|
||||
* The port to start the service on.
|
||||
*/
|
||||
public StartupSettings(String id, int port, GossipSettings gossipSettings, String cluster) {
|
||||
public StartupSettings(String id, URI uri, GossipSettings gossipSettings, String cluster) {
|
||||
this.id = id;
|
||||
this.port = port;
|
||||
this.uri = uri;
|
||||
this.gossipSettings = gossipSettings;
|
||||
this.setCluster(cluster);
|
||||
gossipMembers = new ArrayList<>();
|
||||
@ -109,25 +118,6 @@ public class StartupSettings {
|
||||
return id;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the port of the gossip service.
|
||||
*
|
||||
* @param port
|
||||
* The port for the gossip service.
|
||||
*/
|
||||
public void setPort(int port) {
|
||||
this.port = port;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the port for the gossip service.
|
||||
*
|
||||
* @return The port of the gossip service.
|
||||
*/
|
||||
public int getPort() {
|
||||
return port;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the GossipSettings.
|
||||
*
|
||||
@ -168,9 +158,10 @@ public class StartupSettings {
|
||||
* Thrown when the file cannot be found.
|
||||
* @throws IOException
|
||||
* Thrown when reading the file gives problems.
|
||||
* @throws URISyntaxException
|
||||
*/
|
||||
public static StartupSettings fromJSONFile(File jsonFile) throws JSONException,
|
||||
FileNotFoundException, IOException {
|
||||
FileNotFoundException, IOException, URISyntaxException {
|
||||
// Read the file to a String.
|
||||
StringBuffer buffer = new StringBuffer();
|
||||
try (BufferedReader br = new BufferedReader(new FileReader(jsonFile)) ){
|
||||
@ -181,7 +172,7 @@ public class StartupSettings {
|
||||
}
|
||||
|
||||
JSONObject jsonObject = new JSONArray(buffer.toString()).getJSONObject(0);
|
||||
int port = jsonObject.getInt("port");
|
||||
String uri = jsonObject.getString("uri");
|
||||
String id = jsonObject.getString("id");
|
||||
int gossipInterval = jsonObject.getInt("gossip_interval");
|
||||
int cleanupInterval = jsonObject.getInt("cleanup_interval");
|
||||
@ -189,7 +180,8 @@ public class StartupSettings {
|
||||
if (cluster == null){
|
||||
throw new IllegalArgumentException("cluster was null. It is required");
|
||||
}
|
||||
StartupSettings settings = new StartupSettings(id, port, new GossipSettings(gossipInterval,
|
||||
URI uri2 = new URI(uri);
|
||||
StartupSettings settings = new StartupSettings(id, uri2, new GossipSettings(gossipInterval,
|
||||
cleanupInterval), cluster);
|
||||
|
||||
// Now iterate over the members from the config file and add them to the settings.
|
||||
@ -197,8 +189,9 @@ public class StartupSettings {
|
||||
JSONArray membersJSON = jsonObject.getJSONArray("members");
|
||||
for (int i = 0; i < membersJSON.length(); i++) {
|
||||
JSONObject memberJSON = membersJSON.getJSONObject(i);
|
||||
URI uri3 = new URI(memberJSON.getString("uri"));
|
||||
RemoteGossipMember member = new RemoteGossipMember(memberJSON.getString("cluster"),
|
||||
memberJSON.getString("host"), memberJSON.getInt("port"), "");
|
||||
uri3, "", 0);
|
||||
settings.addGossipMember(member);
|
||||
configMembersDetails += member.getAddress();
|
||||
if (i < (membersJSON.length() - 1))
|
||||
|
@ -18,6 +18,8 @@
|
||||
package org.apache.gossip.examples;
|
||||
|
||||
import java.net.InetAddress;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
@ -57,26 +59,28 @@ public class GossipExample extends Thread {
|
||||
public void run() {
|
||||
try {
|
||||
GossipSettings settings = new GossipSettings();
|
||||
|
||||
List<GossipService> clients = new ArrayList<>();
|
||||
|
||||
// Get my ip address.
|
||||
String myIpAddress = InetAddress.getLocalHost().getHostAddress();
|
||||
|
||||
String cluster = "My Gossip Cluster";
|
||||
|
||||
// Create the gossip members and put them in a list and give them a port number starting with
|
||||
// 2000.
|
||||
List<GossipMember> startupMembers = new ArrayList<>();
|
||||
for (int i = 0; i < NUMBER_OF_CLIENTS; ++i) {
|
||||
startupMembers.add(new RemoteGossipMember(cluster, myIpAddress, 2000 + i, ""));
|
||||
URI u;
|
||||
try {
|
||||
u = new URI("udp://" + myIpAddress + ":" + (2000 + i));
|
||||
} catch (URISyntaxException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
startupMembers.add(new RemoteGossipMember(cluster, u, "", 0 ));
|
||||
}
|
||||
|
||||
// Lets start the gossip clients.
|
||||
// Start the clients, waiting cleaning-interval + 1 second between them which will show the
|
||||
// dead list handling.
|
||||
for (GossipMember member : startupMembers) {
|
||||
GossipService gossipService = new GossipService(cluster, myIpAddress, member.getPort(), "",
|
||||
GossipService gossipService = new GossipService(cluster, member.getUri(), "",
|
||||
startupMembers, settings, null);
|
||||
clients.add(gossipService);
|
||||
gossipService.start();
|
||||
|
@ -18,6 +18,8 @@
|
||||
package org.apache.gossip.manager;
|
||||
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.net.BindException;
|
||||
import java.net.URI;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
@ -68,18 +70,18 @@ public abstract class GossipManager extends Thread implements NotificationListen
|
||||
|
||||
public GossipManager(Class<? extends PassiveGossipThread> passiveGossipThreadClass,
|
||||
Class<? extends ActiveGossipThread> activeGossipThreadClass, String cluster,
|
||||
String address, int port, String id, GossipSettings settings,
|
||||
URI uri, String id, GossipSettings settings,
|
||||
List<GossipMember> gossipMembers, GossipListener listener) {
|
||||
this.passiveGossipThreadClass = passiveGossipThreadClass;
|
||||
this.activeGossipThreadClass = activeGossipThreadClass;
|
||||
this.settings = settings;
|
||||
me = new LocalGossipMember(cluster, address, port, id, System.currentTimeMillis(), this,
|
||||
me = new LocalGossipMember(cluster, uri, id, System.currentTimeMillis(), this,
|
||||
settings.getCleanupInterval());
|
||||
members = new ConcurrentSkipListMap<>();
|
||||
for (GossipMember startupMember : gossipMembers) {
|
||||
if (!startupMember.equals(me)) {
|
||||
LocalGossipMember member = new LocalGossipMember(startupMember.getClusterName(),
|
||||
startupMember.getHost(), startupMember.getPort(), startupMember.getId(),
|
||||
startupMember.getUri(), startupMember.getId(),
|
||||
System.currentTimeMillis(), this, settings.getCleanupInterval());
|
||||
members.put(member, GossipState.UP);
|
||||
GossipService.LOGGER.debug(member);
|
||||
@ -180,6 +182,9 @@ public abstract class GossipManager extends Thread implements NotificationListen
|
||||
gossipThreadExecutor.execute(activeGossipThread);
|
||||
} catch (InstantiationException | IllegalAccessException | IllegalArgumentException
|
||||
| InvocationTargetException | NoSuchMethodException | SecurityException e1) {
|
||||
if (e1 instanceof BindException){
|
||||
LOGGER.fatal("could not bind to "+ me.getUri() + " " + me.getAddress());
|
||||
}
|
||||
throw new RuntimeException(e1);
|
||||
}
|
||||
GossipService.LOGGER.debug("The GossipService is started.");
|
||||
|
@ -23,6 +23,8 @@ import java.net.DatagramSocket;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.SocketAddress;
|
||||
import java.net.SocketException;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
@ -58,18 +60,18 @@ abstract public class PassiveGossipThread implements Runnable {
|
||||
public PassiveGossipThread(GossipManager gossipManager) {
|
||||
this.gossipManager = gossipManager;
|
||||
try {
|
||||
SocketAddress socketAddress = new InetSocketAddress(gossipManager.getMyself().getHost(),
|
||||
gossipManager.getMyself().getPort());
|
||||
SocketAddress socketAddress = new InetSocketAddress(gossipManager.getMyself().getUri().getHost(),
|
||||
gossipManager.getMyself().getUri().getPort());
|
||||
server = new DatagramSocket(socketAddress);
|
||||
GossipService.LOGGER.debug("Gossip service successfully initialized on port "
|
||||
+ gossipManager.getMyself().getPort());
|
||||
GossipService.LOGGER.debug("I am " + gossipManager.getMyself());
|
||||
LOGGER.debug("Gossip service successfully initialized on port "
|
||||
+ gossipManager.getMyself().getUri().getPort());
|
||||
LOGGER.debug("I am " + gossipManager.getMyself());
|
||||
cluster = gossipManager.getMyself().getClusterName();
|
||||
if (cluster == null){
|
||||
throw new IllegalArgumentException("cluster was null");
|
||||
}
|
||||
} catch (SocketException ex) {
|
||||
GossipService.LOGGER.warn(ex);
|
||||
LOGGER.warn(ex);
|
||||
throw new RuntimeException(ex);
|
||||
}
|
||||
keepRunning = new AtomicBoolean(true);
|
||||
@ -103,14 +105,20 @@ abstract public class PassiveGossipThread implements Runnable {
|
||||
ActiveGossipMessage activeGossipMessage = MAPPER.readValue(json_bytes,
|
||||
ActiveGossipMessage.class);
|
||||
for (int i = 0; i < activeGossipMessage.getMembers().size(); i++) {
|
||||
URI u = null;
|
||||
try {
|
||||
u = new URI(activeGossipMessage.getMembers().get(i).getUri());
|
||||
} catch (URISyntaxException e) {
|
||||
LOGGER.debug("Gossip message with faulty URI", e);
|
||||
continue;
|
||||
}
|
||||
RemoteGossipMember member = new RemoteGossipMember(
|
||||
activeGossipMessage.getMembers().get(i).getCluster(),
|
||||
activeGossipMessage.getMembers().get(i).getHost(),
|
||||
activeGossipMessage.getMembers().get(i).getPort(),
|
||||
u,
|
||||
activeGossipMessage.getMembers().get(i).getId(),
|
||||
activeGossipMessage.getMembers().get(i).getHeartbeat());
|
||||
if (!(member.getClusterName().equals(cluster))){
|
||||
GossipService.LOGGER.warn("Note a member of this cluster " + i);
|
||||
LOGGER.warn("Note a member of this cluster " + i);
|
||||
continue;
|
||||
}
|
||||
// This is the first member found, so this should be the member who is communicating
|
||||
@ -122,16 +130,15 @@ abstract public class PassiveGossipThread implements Runnable {
|
||||
}
|
||||
mergeLists(gossipManager, senderMember, remoteGossipMembers);
|
||||
} catch (RuntimeException ex) {
|
||||
GossipService.LOGGER.error("Unable to process message", ex);
|
||||
LOGGER.error("Unable to process message", ex);
|
||||
}
|
||||
} else {
|
||||
GossipService.LOGGER
|
||||
LOGGER
|
||||
.error("The received message is not of the expected size, it has been dropped.");
|
||||
}
|
||||
|
||||
} catch (IOException e) {
|
||||
GossipService.LOGGER.error(e);
|
||||
System.out.println(e);
|
||||
LOGGER.error(e);
|
||||
keepRunning.set(false);
|
||||
}
|
||||
}
|
||||
|
@ -25,9 +25,12 @@ import org.apache.gossip.LocalGossipMember;
|
||||
import org.apache.gossip.RemoteGossipMember;
|
||||
import org.apache.gossip.manager.GossipManager;
|
||||
import org.apache.gossip.manager.PassiveGossipThread;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread {
|
||||
|
||||
public static final Logger LOGGER = Logger.getLogger(OnlyProcessReceivedPassiveGossipThread.class);
|
||||
|
||||
public OnlyProcessReceivedPassiveGossipThread(GossipManager gossipManager) {
|
||||
super(gossipManager);
|
||||
}
|
||||
@ -47,9 +50,9 @@ public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread
|
||||
// if the person sending to us is in the dead list consider them up
|
||||
for (LocalGossipMember i : gossipManager.getDeadList()) {
|
||||
if (i.getId().equals(senderMember.getId())) {
|
||||
System.out.println(gossipManager.getMyself() + " caught a live one!");
|
||||
LOGGER.info(gossipManager.getMyself() + " contacted by dead member " + senderMember.getUri());
|
||||
LocalGossipMember newLocalMember = new LocalGossipMember(senderMember.getClusterName(),
|
||||
senderMember.getHost(), senderMember.getPort(), senderMember.getId(),
|
||||
senderMember.getUri(), senderMember.getId(),
|
||||
senderMember.getHeartbeat(), gossipManager, gossipManager.getSettings()
|
||||
.getCleanupInterval());
|
||||
gossipManager.revivieMember(newLocalMember);
|
||||
@ -70,7 +73,7 @@ public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread
|
||||
} else if (!gossipManager.getMemberList().contains(remoteMember)
|
||||
&& !gossipManager.getDeadList().contains(remoteMember)) {
|
||||
LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getClusterName(),
|
||||
remoteMember.getHost(), remoteMember.getPort(), remoteMember.getId(),
|
||||
remoteMember.getUri(), remoteMember.getId(),
|
||||
remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings()
|
||||
.getCleanupInterval());
|
||||
gossipManager.createOrRevivieMember(newLocalMember);
|
||||
@ -81,26 +84,26 @@ public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread
|
||||
gossipManager.getDeadList().indexOf(remoteMember));
|
||||
if (remoteMember.getHeartbeat() > localDeadMember.getHeartbeat()) {
|
||||
LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getClusterName(),
|
||||
remoteMember.getHost(), remoteMember.getPort(), remoteMember.getId(),
|
||||
remoteMember.getUri(), remoteMember.getId(),
|
||||
remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings()
|
||||
.getCleanupInterval());
|
||||
gossipManager.revivieMember(newLocalMember);
|
||||
newLocalMember.startTimeoutTimer();
|
||||
GossipService.LOGGER.debug("Removed remote member " + remoteMember.getAddress()
|
||||
LOGGER.debug("Removed remote member " + remoteMember.getAddress()
|
||||
+ " from dead list and added to local member list.");
|
||||
} else {
|
||||
GossipService.LOGGER.debug("me " + gossipManager.getMyself());
|
||||
GossipService.LOGGER.debug("sender " + senderMember);
|
||||
GossipService.LOGGER.debug("remote " + remoteList);
|
||||
GossipService.LOGGER.debug("live " + gossipManager.getMemberList());
|
||||
GossipService.LOGGER.debug("dead " + gossipManager.getDeadList());
|
||||
LOGGER.debug("me " + gossipManager.getMyself());
|
||||
LOGGER.debug("sender " + senderMember);
|
||||
LOGGER.debug("remote " + remoteList);
|
||||
LOGGER.debug("live " + gossipManager.getMemberList());
|
||||
LOGGER.debug("dead " + gossipManager.getDeadList());
|
||||
}
|
||||
} else {
|
||||
GossipService.LOGGER.debug("me " + gossipManager.getMyself());
|
||||
GossipService.LOGGER.debug("sender " + senderMember);
|
||||
GossipService.LOGGER.debug("remote " + remoteList);
|
||||
GossipService.LOGGER.debug("live " + gossipManager.getMemberList());
|
||||
GossipService.LOGGER.debug("dead " + gossipManager.getDeadList());
|
||||
LOGGER.debug("me " + gossipManager.getMyself());
|
||||
LOGGER.debug("sender " + senderMember);
|
||||
LOGGER.debug("remote " + remoteList);
|
||||
LOGGER.debug("live " + gossipManager.getMemberList());
|
||||
LOGGER.debug("dead " + gossipManager.getDeadList());
|
||||
// throw new IllegalArgumentException("wtf");
|
||||
}
|
||||
}
|
||||
|
@ -44,9 +44,8 @@ abstract public class SendMembersActiveGossipThread extends ActiveGossipThread {
|
||||
GossipMember gm = new GossipMember();
|
||||
gm.setCluster(member.getClusterName());
|
||||
gm.setHeartbeat(member.getHeartbeat());
|
||||
gm.setHost(member.getHost());
|
||||
gm.setUri(member.getUri().toASCIIString());
|
||||
gm.setId(member.getId());
|
||||
gm.setPort(member.getPort());
|
||||
return gm;
|
||||
}
|
||||
|
||||
@ -62,7 +61,7 @@ abstract public class SendMembersActiveGossipThread extends ActiveGossipThread {
|
||||
}
|
||||
try (DatagramSocket socket = new DatagramSocket()) {
|
||||
socket.setSoTimeout(gossipManager.getSettings().getGossipInterval());
|
||||
InetAddress dest = InetAddress.getByName(member.getHost());
|
||||
InetAddress dest = InetAddress.getByName(member.getUri().getHost());
|
||||
ActiveGossipMessage message = new ActiveGossipMessage();
|
||||
message.getMembers().add(convert(me));
|
||||
for (LocalGossipMember other : memberList) {
|
||||
@ -72,7 +71,7 @@ abstract public class SendMembersActiveGossipThread extends ActiveGossipThread {
|
||||
int packet_length = json_bytes.length;
|
||||
if (packet_length < GossipManager.MAX_PACKET_SIZE) {
|
||||
byte[] buf = createBuffer(packet_length, json_bytes);
|
||||
DatagramPacket datagramPacket = new DatagramPacket(buf, buf.length, dest, member.getPort());
|
||||
DatagramPacket datagramPacket = new DatagramPacket(buf, buf.length, dest, member.getUri().getPort());
|
||||
socket.send(datagramPacket);
|
||||
} else {
|
||||
GossipService.LOGGER.error("The length of the to be send message is too large ("
|
||||
|
@ -23,12 +23,13 @@ import org.apache.gossip.event.GossipListener;
|
||||
import org.apache.gossip.manager.GossipManager;
|
||||
import org.apache.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread;
|
||||
|
||||
import java.net.URI;
|
||||
import java.util.List;
|
||||
|
||||
public class RandomGossipManager extends GossipManager {
|
||||
public RandomGossipManager(String cluster, String address, int port, String id,
|
||||
public RandomGossipManager(String cluster, URI uri, String id,
|
||||
GossipSettings settings, List<GossipMember> gossipMembers, GossipListener listener) {
|
||||
super(OnlyProcessReceivedPassiveGossipThread.class, RandomActiveGossipThread.class, cluster,
|
||||
address, port, id, settings, gossipMembers, listener);
|
||||
uri, id, settings, gossipMembers, listener);
|
||||
}
|
||||
}
|
||||
|
@ -3,8 +3,7 @@ package org.apache.gossip.model;
|
||||
public class GossipMember {
|
||||
|
||||
private String cluster;
|
||||
private String host;
|
||||
private Integer port;
|
||||
private String uri;
|
||||
private String id;
|
||||
private Long heartbeat;
|
||||
|
||||
@ -12,12 +11,11 @@ public class GossipMember {
|
||||
|
||||
}
|
||||
|
||||
public GossipMember(String cluster, String host, Integer port, String id, Long heartbeat){
|
||||
public GossipMember(String cluster, String uri, String id, Long heartbeat){
|
||||
this.cluster = cluster;
|
||||
this.host= host;
|
||||
this.port = port;
|
||||
this.uri = uri;
|
||||
this.id = id;
|
||||
|
||||
this.heartbeat = heartbeat;
|
||||
}
|
||||
|
||||
public String getCluster() {
|
||||
@ -28,20 +26,12 @@ public class GossipMember {
|
||||
this.cluster = cluster;
|
||||
}
|
||||
|
||||
public String getHost() {
|
||||
return host;
|
||||
public String getUri() {
|
||||
return uri;
|
||||
}
|
||||
|
||||
public void setHost(String host) {
|
||||
this.host = host;
|
||||
}
|
||||
|
||||
public Integer getPort() {
|
||||
return port;
|
||||
}
|
||||
|
||||
public void setPort(Integer port) {
|
||||
this.port = port;
|
||||
public void setUri(String uri) {
|
||||
this.uri = uri;
|
||||
}
|
||||
|
||||
public String getId() {
|
||||
|
@ -19,6 +19,8 @@ package io.teknek.gossip;
|
||||
|
||||
import io.teknek.tunit.TUnit;
|
||||
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
@ -43,7 +45,7 @@ public class ShutdownDeadtimeTest {
|
||||
private static final Logger log = Logger.getLogger(ShutdownDeadtimeTest.class );
|
||||
@Test
|
||||
//@Ignore
|
||||
public void DeadNodesDoNotComeAliveAgain() throws InterruptedException, UnknownHostException {
|
||||
public void DeadNodesDoNotComeAliveAgain() throws InterruptedException, UnknownHostException, URISyntaxException {
|
||||
GossipSettings settings = new GossipSettings(1000, 10000);
|
||||
String cluster = UUID.randomUUID().toString();
|
||||
|
||||
@ -51,7 +53,8 @@ public class ShutdownDeadtimeTest {
|
||||
int seedNodes = 3;
|
||||
List<GossipMember> startupMembers = new ArrayList<>();
|
||||
for (int i = 1; i < seedNodes + 1; ++i) {
|
||||
startupMembers.add(new RemoteGossipMember(cluster, "127.0.0.1", 50000 + i, i + ""));
|
||||
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
|
||||
startupMembers.add(new RemoteGossipMember(cluster, uri, i + ""));
|
||||
}
|
||||
|
||||
log.info( "Adding clients" );
|
||||
@ -59,7 +62,8 @@ public class ShutdownDeadtimeTest {
|
||||
final int clusterMembers = 5;
|
||||
for (int i = 1; i < clusterMembers+1; ++i) {
|
||||
final int j = i;
|
||||
GossipService gossipService = new GossipService(cluster, "127.0.0.1", 50000 + i, i + "",
|
||||
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
|
||||
GossipService gossipService = new GossipService(cluster, uri, i + "",
|
||||
startupMembers, settings,
|
||||
new GossipListener(){
|
||||
@Override
|
||||
@ -83,7 +87,7 @@ public class ShutdownDeadtimeTest {
|
||||
Random r = new Random();
|
||||
int randomClientId = r.nextInt(clusterMembers);
|
||||
log.info( "shutting down " + randomClientId );
|
||||
final int shutdownPort = clients.get(randomClientId).get_gossipManager().getMyself().getPort();
|
||||
final int shutdownPort = clients.get(randomClientId).get_gossipManager().getMyself().getUri().getPort();
|
||||
final String shutdownId = clients.get(randomClientId).get_gossipManager().getMyself().getId();
|
||||
clients.get(randomClientId).shutdown();
|
||||
TUnit.assertThat(new Callable<Integer> (){
|
||||
@ -105,8 +109,9 @@ public class ShutdownDeadtimeTest {
|
||||
return total;
|
||||
}}).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(4);
|
||||
|
||||
URI uri = new URI("udp://" + "127.0.0.1" + ":" + shutdownPort);
|
||||
// start client again
|
||||
GossipService gossipService = new GossipService(cluster, "127.0.0.1", shutdownPort, shutdownId + "",
|
||||
GossipService gossipService = new GossipService(cluster, uri, shutdownId + "",
|
||||
startupMembers, settings,
|
||||
new GossipListener(){
|
||||
@Override
|
||||
|
@ -30,6 +30,8 @@ import io.teknek.tunit.TUnit;
|
||||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.Callable;
|
||||
@ -44,13 +46,14 @@ public class StartupSettingsTest {
|
||||
private static final String CLUSTER = UUID.randomUUID().toString();
|
||||
|
||||
@Test
|
||||
public void testUsingSettingsFile() throws IOException, InterruptedException, JSONException {
|
||||
public void testUsingSettingsFile() throws IOException, InterruptedException, JSONException, URISyntaxException {
|
||||
File settingsFile = File.createTempFile("gossipTest",".json");
|
||||
log.debug( "Using settings file: " + settingsFile.getAbsolutePath() );
|
||||
settingsFile.deleteOnExit();
|
||||
writeSettingsFile(settingsFile);
|
||||
URI uri = new URI("udp://" + "127.0.0.1" + ":" + 50000);
|
||||
final GossipService firstService = new GossipService(
|
||||
CLUSTER, "127.0.0.1", 50000, UUID.randomUUID().toString(),
|
||||
CLUSTER, uri, UUID.randomUUID().toString(),
|
||||
new ArrayList<GossipMember>(), new GossipSettings(), null);
|
||||
|
||||
firstService.start();
|
||||
@ -76,11 +79,11 @@ public class StartupSettingsTest {
|
||||
"[{\n" + // It is odd that this is meant to be in an array, but oh well.
|
||||
" \"cluster\":\"" + CLUSTER + "\",\n" +
|
||||
" \"id\":\"" + UUID.randomUUID() + "\",\n" +
|
||||
" \"port\":50001,\n" +
|
||||
" \"uri\":\"udp://127.0.0.1:50001\",\n" +
|
||||
" \"gossip_interval\":1000,\n" +
|
||||
" \"cleanup_interval\":10000,\n" +
|
||||
" \"members\":[\n" +
|
||||
" {\"cluster\": \"" + CLUSTER + "\",\"host\":\"127.0.0.1\", \"port\":50000}\n" +
|
||||
" {\"cluster\": \"" + CLUSTER + "\",\"uri\":\"udp://127.0.0.1:5000\"}\n" +
|
||||
" ]\n" +
|
||||
"}]";
|
||||
|
||||
|
@ -19,6 +19,8 @@ package io.teknek.gossip;
|
||||
|
||||
import io.teknek.tunit.TUnit;
|
||||
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
@ -41,16 +43,16 @@ public class TenNodeThreeSeedTest {
|
||||
private static final Logger log = Logger.getLogger( TenNodeThreeSeedTest.class );
|
||||
|
||||
@Test
|
||||
public void test() throws UnknownHostException, InterruptedException{
|
||||
public void test() throws UnknownHostException, InterruptedException, URISyntaxException{
|
||||
abc();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAgain() throws UnknownHostException, InterruptedException{
|
||||
public void testAgain() throws UnknownHostException, InterruptedException, URISyntaxException{
|
||||
abc();
|
||||
}
|
||||
|
||||
public void abc() throws InterruptedException, UnknownHostException{
|
||||
public void abc() throws InterruptedException, UnknownHostException, URISyntaxException{
|
||||
GossipSettings settings = new GossipSettings();
|
||||
String cluster = UUID.randomUUID().toString();
|
||||
|
||||
@ -58,14 +60,16 @@ public class TenNodeThreeSeedTest {
|
||||
int seedNodes = 3;
|
||||
List<GossipMember> startupMembers = new ArrayList<>();
|
||||
for (int i = 1; i < seedNodes+1; ++i) {
|
||||
startupMembers.add(new RemoteGossipMember(cluster, "127.0.0.1", 50000 + i, i + ""));
|
||||
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
|
||||
startupMembers.add(new RemoteGossipMember(cluster, uri, i + ""));
|
||||
}
|
||||
|
||||
log.info( "Adding clients" );
|
||||
final List<GossipService> clients = new ArrayList<>();
|
||||
final int clusterMembers = 5;
|
||||
for (int i = 1; i < clusterMembers+1; ++i) {
|
||||
GossipService gossipService = new GossipService(cluster, "127.0.0.1", 50000 + i, i + "",
|
||||
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
|
||||
GossipService gossipService = new GossipService(cluster, uri, i + "",
|
||||
startupMembers, settings,
|
||||
new GossipListener(){
|
||||
@Override
|
||||
@ -75,7 +79,6 @@ public class TenNodeThreeSeedTest {
|
||||
});
|
||||
clients.add(gossipService);
|
||||
gossipService.start();
|
||||
gossipService.get_gossipManager().getMemberList();
|
||||
}
|
||||
TUnit.assertThat(new Callable<Integer> (){
|
||||
public Integer call() throws Exception {
|
||||
@ -84,7 +87,7 @@ public class TenNodeThreeSeedTest {
|
||||
total += clients.get(i).get_gossipManager().getMemberList().size();
|
||||
}
|
||||
return total;
|
||||
}}).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(20);
|
||||
}}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(20);
|
||||
|
||||
for (int i = 0; i < clusterMembers; ++i) {
|
||||
clients.get(i).shutdown();
|
||||
|
Reference in New Issue
Block a user