Added log4j logger and removed obvious comments
This commit is contained in:
@ -1,11 +1,10 @@
|
|||||||
package com.google.code.gossip;
|
package com.google.code.gossip;
|
||||||
|
|
||||||
import java.io.PrintStream;
|
|
||||||
import java.net.InetAddress;
|
import java.net.InetAddress;
|
||||||
import java.net.SocketException;
|
import java.net.SocketException;
|
||||||
import java.net.UnknownHostException;
|
import java.net.UnknownHostException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Date;
|
import org.apache.log4j.Logger;
|
||||||
|
|
||||||
import com.google.code.gossip.manager.GossipManager;
|
import com.google.code.gossip.manager.GossipManager;
|
||||||
import com.google.code.gossip.manager.random.RandomGossipManager;
|
import com.google.code.gossip.manager.random.RandomGossipManager;
|
||||||
@ -16,10 +15,8 @@ import com.google.code.gossip.manager.random.RandomGossipManager;
|
|||||||
* @author joshclemm, harmenw
|
* @author joshclemm, harmenw
|
||||||
*/
|
*/
|
||||||
public class GossipService {
|
public class GossipService {
|
||||||
|
|
||||||
/** A instance variable holding the log level. */
|
public static final Logger LOGGER = Logger.getLogger(GossipService.class);
|
||||||
private int _logLevel = LogLevel.INFO;
|
|
||||||
|
|
||||||
private GossipManager _gossipManager;
|
private GossipManager _gossipManager;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -38,9 +35,6 @@ public class GossipService {
|
|||||||
* @throws UnknownHostException
|
* @throws UnknownHostException
|
||||||
*/
|
*/
|
||||||
public GossipService(String ipAddress, int port, int logLevel, ArrayList<GossipMember> gossipMembers, GossipSettings settings) throws InterruptedException, UnknownHostException {
|
public GossipService(String ipAddress, int port, int logLevel, ArrayList<GossipMember> gossipMembers, GossipSettings settings) throws InterruptedException, UnknownHostException {
|
||||||
// Set the logging level.
|
|
||||||
_logLevel = logLevel;
|
|
||||||
|
|
||||||
_gossipManager = new RandomGossipManager(ipAddress, port, settings, gossipMembers);
|
_gossipManager = new RandomGossipManager(ipAddress, port, settings, gossipMembers);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -52,29 +46,6 @@ public class GossipService {
|
|||||||
_gossipManager.shutdown();
|
_gossipManager.shutdown();
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void error(Object message) {
|
|
||||||
//if (_logLevel >= LogLevel.ERROR) printMessage(message, System.err);
|
|
||||||
printMessage(message, System.err);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static void info(Object message) {
|
|
||||||
//if (_logLevel >= LogLevel.INFO) printMessage(message, System.out);
|
|
||||||
printMessage(message, System.out);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static void debug(Object message) {
|
|
||||||
//if (_logLevel >= LogLevel.DEBUG) printMessage(message, System.out);
|
|
||||||
printMessage(message, System.out);
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void printMessage(Object message, PrintStream out) {
|
|
||||||
/**String addressString = "unknown";
|
|
||||||
if (_me != null)
|
|
||||||
addressString = _me.getAddress();
|
|
||||||
out.println("[" + addressString + "][" + new Date().toString() + "] " + message);*/
|
|
||||||
out.println("[" + new Date().toString() + "] " + message);
|
|
||||||
}
|
|
||||||
|
|
||||||
public GossipManager get_gossipManager() {
|
public GossipManager get_gossipManager() {
|
||||||
return _gossipManager;
|
return _gossipManager;
|
||||||
}
|
}
|
||||||
|
@ -22,7 +22,6 @@ abstract public class ActiveGossipThread implements Runnable {
|
|||||||
|
|
||||||
public ActiveGossipThread(GossipManager gossipManager) {
|
public ActiveGossipThread(GossipManager gossipManager) {
|
||||||
_gossipManager = gossipManager;
|
_gossipManager = gossipManager;
|
||||||
|
|
||||||
_keepRunning = new AtomicBoolean(true);
|
_keepRunning = new AtomicBoolean(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -33,9 +32,7 @@ abstract public class ActiveGossipThread implements Runnable {
|
|||||||
TimeUnit.MILLISECONDS.sleep(_gossipManager.getSettings().getGossipInterval());
|
TimeUnit.MILLISECONDS.sleep(_gossipManager.getSettings().getGossipInterval());
|
||||||
sendMembershipList(_gossipManager.getMyself(), _gossipManager.getMemberList());
|
sendMembershipList(_gossipManager.getMyself(), _gossipManager.getMemberList());
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
// This membership thread was interrupted externally, shutdown
|
GossipService.LOGGER.error(e);
|
||||||
GossipService.debug("The ActiveGossipThread was interrupted externally, shutdown.");
|
|
||||||
e.printStackTrace();
|
|
||||||
_keepRunning.set(false);
|
_keepRunning.set(false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -42,45 +42,28 @@ public abstract class GossipManager extends Thread implements NotificationListen
|
|||||||
private Class<? extends ActiveGossipThread> _activeGossipThreadClass;
|
private Class<? extends ActiveGossipThread> _activeGossipThreadClass;
|
||||||
|
|
||||||
public GossipManager(Class<? extends PassiveGossipThread> passiveGossipThreadClass, Class<? extends ActiveGossipThread> activeGossipThreadClass, String address, int port, GossipSettings settings, ArrayList<GossipMember> gossipMembers) {
|
public GossipManager(Class<? extends PassiveGossipThread> passiveGossipThreadClass, Class<? extends ActiveGossipThread> activeGossipThreadClass, String address, int port, GossipSettings settings, ArrayList<GossipMember> gossipMembers) {
|
||||||
// Set the active and passive gossip thread classes to use.
|
|
||||||
_passiveGossipThreadClass = passiveGossipThreadClass;
|
_passiveGossipThreadClass = passiveGossipThreadClass;
|
||||||
_activeGossipThreadClass = activeGossipThreadClass;
|
_activeGossipThreadClass = activeGossipThreadClass;
|
||||||
|
|
||||||
// Assign the GossipSettings to the instance variable.
|
|
||||||
_settings = settings;
|
_settings = settings;
|
||||||
|
|
||||||
// Create the local gossip member which I am representing.
|
|
||||||
_me = new LocalGossipMember(address, port, 0, this, settings.getCleanupInterval());
|
_me = new LocalGossipMember(address, port, 0, this, settings.getCleanupInterval());
|
||||||
|
|
||||||
// Initialize the gossip members list.
|
|
||||||
_memberList = new ArrayList<LocalGossipMember>();
|
_memberList = new ArrayList<LocalGossipMember>();
|
||||||
|
_deadList = new ArrayList<LocalGossipMember>();
|
||||||
// Initialize the dead gossip members list.
|
GossipService.LOGGER.debug("Startup member list:");
|
||||||
_deadList = new ArrayList<LocalGossipMember>();
|
GossipService.LOGGER.debug("---------------------");
|
||||||
|
GossipService.LOGGER.debug(_me);
|
||||||
// Print the startup member list when the service is in debug mode.
|
|
||||||
GossipService.debug("Startup member list:");
|
|
||||||
GossipService.debug("---------------------");
|
|
||||||
// First print out myself.
|
|
||||||
GossipService.debug(_me);
|
|
||||||
// Copy the list with members given to the local member list and print the member when in debug mode.
|
|
||||||
for (GossipMember startupMember : gossipMembers) {
|
for (GossipMember startupMember : gossipMembers) {
|
||||||
if (!startupMember.equals(_me)) {
|
if (!startupMember.equals(_me)) {
|
||||||
LocalGossipMember member = new LocalGossipMember(startupMember.getHost(), startupMember.getPort(), 0, this, settings.getCleanupInterval());
|
LocalGossipMember member = new LocalGossipMember(startupMember.getHost(), startupMember.getPort(), 0, this, settings.getCleanupInterval());
|
||||||
_memberList.add(member);
|
_memberList.add(member);
|
||||||
GossipService.debug(member);
|
GossipService.LOGGER.debug(member);
|
||||||
} else {
|
}
|
||||||
GossipService.info("Found myself in the members section of the configuration, you should not add the host itself to the members section.");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set the boolean for running the gossip service to true.
|
|
||||||
_gossipServiceRunning = new AtomicBoolean(true);
|
_gossipServiceRunning = new AtomicBoolean(true);
|
||||||
|
|
||||||
// Add a shutdown hook so we can see when the service has been shutdown.
|
|
||||||
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
|
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
|
||||||
public void run() {
|
public void run() {
|
||||||
GossipService.info("Service has been shutdown...");
|
GossipService.LOGGER.info("Service has been shutdown...");
|
||||||
}
|
}
|
||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
@ -92,18 +75,11 @@ public abstract class GossipManager extends Thread implements NotificationListen
|
|||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void handleNotification(Notification notification, Object handback) {
|
public void handleNotification(Notification notification, Object handback) {
|
||||||
|
|
||||||
// Get the local gossip member associated with the notification.
|
|
||||||
LocalGossipMember deadMember = (LocalGossipMember) notification.getUserData();
|
LocalGossipMember deadMember = (LocalGossipMember) notification.getUserData();
|
||||||
|
GossipService.LOGGER.info("Dead member detected: " + deadMember);
|
||||||
GossipService.info("Dead member detected: " + deadMember);
|
|
||||||
|
|
||||||
// Remove the member from the active member list.
|
|
||||||
synchronized (this._memberList) {
|
synchronized (this._memberList) {
|
||||||
this._memberList.remove(deadMember);
|
this._memberList.remove(deadMember);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add the member to the dead member list.
|
|
||||||
synchronized (this._deadList) {
|
synchronized (this._deadList) {
|
||||||
this._deadList.add(deadMember);
|
this._deadList.add(deadMember);
|
||||||
}
|
}
|
||||||
@ -135,46 +111,27 @@ public abstract class GossipManager extends Thread implements NotificationListen
|
|||||||
* @throws InterruptedException
|
* @throws InterruptedException
|
||||||
*/
|
*/
|
||||||
public void run() {
|
public void run() {
|
||||||
// Start all timers except for me
|
|
||||||
for (LocalGossipMember member : _memberList) {
|
for (LocalGossipMember member : _memberList) {
|
||||||
if (member != _me) {
|
if (member != _me) {
|
||||||
member.startTimeoutTimer();
|
member.startTimeoutTimer();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
_gossipThreadExecutor = Executors.newCachedThreadPool();
|
||||||
try {
|
try {
|
||||||
_gossipThreadExecutor = Executors.newCachedThreadPool();
|
_gossipThreadExecutor.execute(_passiveGossipThreadClass.getConstructor(GossipManager.class)
|
||||||
// The receiver thread is a passive player that handles
|
.newInstance(this));
|
||||||
// merging incoming membership lists from other neighbors.
|
_gossipThreadExecutor.execute(_activeGossipThreadClass.getConstructor(GossipManager.class)
|
||||||
_gossipThreadExecutor.execute(_passiveGossipThreadClass.getConstructor(GossipManager.class).newInstance(this));
|
.newInstance(this));
|
||||||
// The gossiper thread is an active player that
|
} catch (InstantiationException | IllegalAccessException | IllegalArgumentException
|
||||||
// selects a neighbor to share its membership list
|
| InvocationTargetException | NoSuchMethodException | SecurityException e1) {
|
||||||
_gossipThreadExecutor.execute(_activeGossipThreadClass.getConstructor(GossipManager.class).newInstance(this));
|
throw new RuntimeException(e1);
|
||||||
} catch (IllegalArgumentException e1) {
|
}
|
||||||
e1.printStackTrace();
|
GossipService.LOGGER.info("The GossipService is started.");
|
||||||
} catch (SecurityException e1) {
|
|
||||||
e1.printStackTrace();
|
|
||||||
} catch (InstantiationException e1) {
|
|
||||||
e1.printStackTrace();
|
|
||||||
} catch (IllegalAccessException e1) {
|
|
||||||
e1.printStackTrace();
|
|
||||||
} catch (InvocationTargetException e1) {
|
|
||||||
e1.printStackTrace();
|
|
||||||
} catch (NoSuchMethodException e1) {
|
|
||||||
e1.printStackTrace();
|
|
||||||
}
|
|
||||||
|
|
||||||
// Potentially, you could kick off more threads here
|
|
||||||
// that could perform additional data synching
|
|
||||||
|
|
||||||
GossipService.info("The GossipService is started.");
|
|
||||||
|
|
||||||
// keep the main thread around
|
|
||||||
while(_gossipServiceRunning.get()) {
|
while(_gossipServiceRunning.get()) {
|
||||||
try {
|
try {
|
||||||
TimeUnit.SECONDS.sleep(10);
|
TimeUnit.SECONDS.sleep(10);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
GossipService.info("The GossipClient was interrupted.");
|
GossipService.LOGGER.info("The GossipClient was interrupted.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -39,97 +39,92 @@ abstract public class PassiveGossipThread implements Runnable {
|
|||||||
try {
|
try {
|
||||||
SocketAddress socketAddress = new InetSocketAddress(_gossipManager.getMyself().getHost(), _gossipManager.getMyself().getPort());
|
SocketAddress socketAddress = new InetSocketAddress(_gossipManager.getMyself().getHost(), _gossipManager.getMyself().getPort());
|
||||||
_server = new DatagramSocket(socketAddress);
|
_server = new DatagramSocket(socketAddress);
|
||||||
GossipService.info("Gossip service successfully initialized on port " + _gossipManager.getMyself().getPort());
|
GossipService.LOGGER.info("Gossip service successfully initialized on port " + _gossipManager.getMyself().getPort());
|
||||||
GossipService.debug("I am " + _gossipManager.getMyself());
|
GossipService.LOGGER.debug("I am " + _gossipManager.getMyself());
|
||||||
} catch (SocketException ex) {
|
} catch (SocketException ex) {
|
||||||
System.err.println(ex);
|
GossipService.LOGGER.error(ex);
|
||||||
_server = null;
|
_server = null;
|
||||||
throw new RuntimeException(ex);
|
throw new RuntimeException(ex);
|
||||||
}
|
}
|
||||||
_keepRunning = new AtomicBoolean(true);
|
_keepRunning = new AtomicBoolean(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
while(_keepRunning.get()) {
|
while (_keepRunning.get()) {
|
||||||
try {
|
try {
|
||||||
// Create a byte array with the size of the buffer.
|
byte[] buf = new byte[_server.getReceiveBufferSize()];
|
||||||
byte[] buf = new byte[_server.getReceiveBufferSize()];
|
DatagramPacket p = new DatagramPacket(buf, buf.length);
|
||||||
DatagramPacket p = new DatagramPacket(buf, buf.length);
|
_server.receive(p);
|
||||||
_server.receive(p);
|
GossipService.LOGGER.debug("A message has been received from " + p.getAddress() + ":"
|
||||||
GossipService.debug("A message has been received from " + p.getAddress() + ":" + p.getPort() + ".");
|
+ p.getPort() + ".");
|
||||||
|
|
||||||
int packet_length = 0;
|
|
||||||
for (int i = 0; i < 4; i++) {
|
|
||||||
int shift = (4 - 1 - i) * 8;
|
|
||||||
packet_length += (buf[i] & 0x000000FF) << shift;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check whether the package is smaller than the maximal packet length.
|
|
||||||
// A package larger than this would not be possible to be send from a GossipService,
|
|
||||||
// since this is check before sending the message.
|
|
||||||
// This could normally only occur when the list of members is very big,
|
|
||||||
// or when the packet is misformed, and the first 4 bytes is not the right in anymore.
|
|
||||||
// For this reason we regards the message.
|
|
||||||
if (packet_length <= GossipManager.MAX_PACKET_SIZE) {
|
|
||||||
|
|
||||||
byte[] json_bytes = new byte[packet_length];
|
|
||||||
for (int i=0; i<packet_length; i++) {
|
|
||||||
json_bytes[i] = buf[i+4];
|
|
||||||
}
|
|
||||||
|
|
||||||
// Extract the members out of the packet
|
int packet_length = 0;
|
||||||
String receivedMessage = new String(json_bytes);
|
for (int i = 0; i < 4; i++) {
|
||||||
GossipService.debug("Received message (" + packet_length + " bytes): " + receivedMessage);
|
int shift = (4 - 1 - i) * 8;
|
||||||
|
packet_length += (buf[i] & 0x000000FF) << shift;
|
||||||
try {
|
}
|
||||||
|
|
||||||
ArrayList<GossipMember> remoteGossipMembers = new ArrayList<GossipMember>();
|
|
||||||
|
|
||||||
RemoteGossipMember senderMember = null;
|
|
||||||
|
|
||||||
GossipService.debug("Received member list:");
|
|
||||||
// Convert the received JSON message to a JSON array.
|
|
||||||
JSONArray jsonArray = new JSONArray(receivedMessage);
|
|
||||||
// The JSON array should contain all members.
|
|
||||||
// Let's iterate over them.
|
|
||||||
for (int i = 0; i < jsonArray.length(); i++) {
|
|
||||||
JSONObject memberJSONObject = jsonArray.getJSONObject(i);
|
|
||||||
// Now the array should contain 3 objects (hostname, port and heartbeat).
|
|
||||||
if (memberJSONObject.length() == 3) {
|
|
||||||
// Ok, now let's create the member object.
|
|
||||||
RemoteGossipMember member = new RemoteGossipMember(memberJSONObject.getString(GossipMember.JSON_HOST), memberJSONObject.getInt(GossipMember.JSON_PORT), memberJSONObject.getInt(GossipMember.JSON_HEARTBEAT));
|
|
||||||
GossipService.debug(member.toString());
|
|
||||||
|
|
||||||
// This is the first member found, so this should be the member who is communicating with me.
|
|
||||||
if (i == 0) {
|
|
||||||
senderMember = member;
|
|
||||||
}
|
|
||||||
|
|
||||||
remoteGossipMembers.add(member);
|
|
||||||
} else {
|
|
||||||
GossipService.error("The received member object does not contain 3 objects:\n" + memberJSONObject.toString());
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
// Merge our list with the one we just received
|
|
||||||
mergeLists(_gossipManager, senderMember, remoteGossipMembers);
|
|
||||||
|
|
||||||
} catch (JSONException e) {
|
|
||||||
GossipService.error("The received message is not well-formed JSON. The following message has been dropped:\n" + receivedMessage);
|
|
||||||
}
|
|
||||||
|
|
||||||
} else {
|
|
||||||
GossipService.error("The received message is not of the expected size, it has been dropped.");
|
|
||||||
}
|
|
||||||
|
|
||||||
} catch (IOException e) {
|
// Check whether the package is smaller than the maximal packet length.
|
||||||
e.printStackTrace();
|
// A package larger than this would not be possible to be send from a GossipService,
|
||||||
_keepRunning.set(false);
|
// since this is check before sending the message.
|
||||||
}
|
// This could normally only occur when the list of members is very big,
|
||||||
}
|
// or when the packet is misformed, and the first 4 bytes is not the right in anymore.
|
||||||
}
|
// For this reason we regards the message.
|
||||||
|
if (packet_length <= GossipManager.MAX_PACKET_SIZE) {
|
||||||
|
byte[] json_bytes = new byte[packet_length];
|
||||||
|
for (int i = 0; i < packet_length; i++) {
|
||||||
|
json_bytes[i] = buf[i + 4];
|
||||||
|
}
|
||||||
|
String receivedMessage = new String(json_bytes);
|
||||||
|
GossipService.LOGGER.debug("Received message (" + packet_length + " bytes): "
|
||||||
|
+ receivedMessage);
|
||||||
|
try {
|
||||||
|
ArrayList<GossipMember> remoteGossipMembers = new ArrayList<GossipMember>();
|
||||||
|
RemoteGossipMember senderMember = null;
|
||||||
|
GossipService.LOGGER.debug("Received member list:");
|
||||||
|
JSONArray jsonArray = new JSONArray(receivedMessage);
|
||||||
|
for (int i = 0; i < jsonArray.length(); i++) {
|
||||||
|
JSONObject memberJSONObject = jsonArray.getJSONObject(i);
|
||||||
|
if (memberJSONObject.length() == 3) {
|
||||||
|
RemoteGossipMember member = new RemoteGossipMember(
|
||||||
|
memberJSONObject.getString(GossipMember.JSON_HOST),
|
||||||
|
memberJSONObject.getInt(GossipMember.JSON_PORT),
|
||||||
|
memberJSONObject.getInt(GossipMember.JSON_HEARTBEAT));
|
||||||
|
GossipService.LOGGER.debug(member.toString());
|
||||||
|
// This is the first member found, so this should be the member who is communicating
|
||||||
|
// with me.
|
||||||
|
if (i == 0) {
|
||||||
|
senderMember = member;
|
||||||
|
}
|
||||||
|
remoteGossipMembers.add(member);
|
||||||
|
} else {
|
||||||
|
GossipService.LOGGER
|
||||||
|
.error("The received member object does not contain 3 objects:\n"
|
||||||
|
+ memberJSONObject.toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// Merge our list with the one we just received
|
||||||
|
mergeLists(_gossipManager, senderMember, remoteGossipMembers);
|
||||||
|
} catch (JSONException e) {
|
||||||
|
GossipService.LOGGER
|
||||||
|
.error("The received message is not well-formed JSON. The following message has been dropped:\n"
|
||||||
|
+ receivedMessage);
|
||||||
|
}
|
||||||
|
|
||||||
|
} else {
|
||||||
|
GossipService.LOGGER
|
||||||
|
.error("The received message is not of the expected size, it has been dropped.");
|
||||||
|
}
|
||||||
|
|
||||||
|
} catch (IOException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
_keepRunning.set(false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Abstract method for merging the local and remote list.
|
* Abstract method for merging the local and remote list.
|
||||||
|
@ -33,7 +33,7 @@ public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread
|
|||||||
// Skip myself. We don't want ourselves in the local member list.
|
// Skip myself. We don't want ourselves in the local member list.
|
||||||
if (!remoteMember.equals(gossipManager.getMyself())) {
|
if (!remoteMember.equals(gossipManager.getMyself())) {
|
||||||
if (gossipManager.getMemberList().contains(remoteMember)) {
|
if (gossipManager.getMemberList().contains(remoteMember)) {
|
||||||
GossipService.debug("The local list already contains the remote member (" + remoteMember + ").");
|
GossipService.LOGGER.debug("The local list already contains the remote member (" + remoteMember + ").");
|
||||||
// The local memberlist contains the remote member.
|
// The local memberlist contains the remote member.
|
||||||
LocalGossipMember localMember = gossipManager.getMemberList().get(gossipManager.getMemberList().indexOf(remoteMember));
|
LocalGossipMember localMember = gossipManager.getMemberList().get(gossipManager.getMemberList().indexOf(remoteMember));
|
||||||
|
|
||||||
@ -47,13 +47,13 @@ public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread
|
|||||||
// TODO: Otherwise, should we inform the other when the heartbeat is already higher?
|
// TODO: Otherwise, should we inform the other when the heartbeat is already higher?
|
||||||
} else {
|
} else {
|
||||||
// The local list does not contain the remote member.
|
// The local list does not contain the remote member.
|
||||||
GossipService.debug("The local list does not contain the remote member (" + remoteMember + ").");
|
GossipService.LOGGER.debug("The local list does not contain the remote member (" + remoteMember + ").");
|
||||||
|
|
||||||
// The remote member is either brand new, or a previously declared dead member.
|
// The remote member is either brand new, or a previously declared dead member.
|
||||||
// If its dead, check the heartbeat because it may have come back from the dead.
|
// If its dead, check the heartbeat because it may have come back from the dead.
|
||||||
if (gossipManager.getDeadList().contains(remoteMember)) {
|
if (gossipManager.getDeadList().contains(remoteMember)) {
|
||||||
// The remote member is known here as a dead member.
|
// The remote member is known here as a dead member.
|
||||||
GossipService.debug("The remote member is known here as a dead member.");
|
GossipService.LOGGER.debug("The remote member is known here as a dead member.");
|
||||||
LocalGossipMember localDeadMember = gossipManager.getDeadList().get(gossipManager.getDeadList().indexOf(remoteMember));
|
LocalGossipMember localDeadMember = gossipManager.getDeadList().get(gossipManager.getDeadList().indexOf(remoteMember));
|
||||||
// If a member is restarted the heartbeat will restart from 1, so we should check that here.
|
// If a member is restarted the heartbeat will restart from 1, so we should check that here.
|
||||||
// So a member can become from the dead when it is either larger than a previous heartbeat (due to network failure)
|
// So a member can become from the dead when it is either larger than a previous heartbeat (due to network failure)
|
||||||
@ -66,7 +66,7 @@ public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread
|
|||||||
if (remoteMember.getHeartbeat() == 1
|
if (remoteMember.getHeartbeat() == 1
|
||||||
|| ((localDeadMember.getHeartbeat() - remoteMember.getHeartbeat()) * -1) > (gossipManager.getSettings().getCleanupInterval() / 1000)
|
|| ((localDeadMember.getHeartbeat() - remoteMember.getHeartbeat()) * -1) > (gossipManager.getSettings().getCleanupInterval() / 1000)
|
||||||
|| remoteMember.getHeartbeat() > localDeadMember.getHeartbeat()) {
|
|| remoteMember.getHeartbeat() > localDeadMember.getHeartbeat()) {
|
||||||
GossipService.debug("The remote member is back from the dead. We will remove it from the dead list and add it as a new member.");
|
GossipService.LOGGER.debug("The remote member is back from the dead. We will remove it from the dead list and add it as a new member.");
|
||||||
// The remote member is back from the dead.
|
// The remote member is back from the dead.
|
||||||
// Remove it from the dead list.
|
// Remove it from the dead list.
|
||||||
gossipManager.getDeadList().remove(localDeadMember);
|
gossipManager.getDeadList().remove(localDeadMember);
|
||||||
@ -74,14 +74,14 @@ public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread
|
|||||||
LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getHost(), remoteMember.getPort(), remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings().getCleanupInterval());
|
LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getHost(), remoteMember.getPort(), remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings().getCleanupInterval());
|
||||||
gossipManager.getMemberList().add(newLocalMember);
|
gossipManager.getMemberList().add(newLocalMember);
|
||||||
newLocalMember.startTimeoutTimer();
|
newLocalMember.startTimeoutTimer();
|
||||||
GossipService.info("Removed remote member " + remoteMember.getAddress() + " from dead list and added to local member list.");
|
GossipService.LOGGER.info("Removed remote member " + remoteMember.getAddress() + " from dead list and added to local member list.");
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// Brand spanking new member - welcome.
|
// Brand spanking new member - welcome.
|
||||||
LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getHost(), remoteMember.getPort(), remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings().getCleanupInterval());
|
LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getHost(), remoteMember.getPort(), remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings().getCleanupInterval());
|
||||||
gossipManager.getMemberList().add(newLocalMember);
|
gossipManager.getMemberList().add(newLocalMember);
|
||||||
newLocalMember.startTimeoutTimer();
|
newLocalMember.startTimeoutTimer();
|
||||||
GossipService.info("Added new remote member " + remoteMember.getAddress() + " to local member list.");
|
GossipService.LOGGER.info("Added new remote member " + remoteMember.getAddress() + " to local member list.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -25,7 +25,7 @@ abstract public class SendMembersActiveGossipThread extends ActiveGossipThread {
|
|||||||
* incremented our own heartbeat.
|
* incremented our own heartbeat.
|
||||||
*/
|
*/
|
||||||
protected void sendMembershipList(LocalGossipMember me, ArrayList<LocalGossipMember> memberList) {
|
protected void sendMembershipList(LocalGossipMember me, ArrayList<LocalGossipMember> memberList) {
|
||||||
GossipService.debug("Send sendMembershipList() is called.");
|
GossipService.LOGGER.debug("Send sendMembershipList() is called.");
|
||||||
|
|
||||||
// Increase the heartbeat of myself by 1.
|
// Increase the heartbeat of myself by 1.
|
||||||
me.setHeartbeat(me.getHeartbeat() + 1);
|
me.setHeartbeat(me.getHeartbeat() + 1);
|
||||||
@ -39,21 +39,21 @@ abstract public class SendMembersActiveGossipThread extends ActiveGossipThread {
|
|||||||
|
|
||||||
// Create a StringBuffer for the JSON message.
|
// Create a StringBuffer for the JSON message.
|
||||||
JSONArray jsonArray = new JSONArray();
|
JSONArray jsonArray = new JSONArray();
|
||||||
GossipService.debug("Sending memberlist to " + dest + ":" + member.getPort());
|
GossipService.LOGGER.debug("Sending memberlist to " + dest + ":" + member.getPort());
|
||||||
GossipService.debug("---------------------");
|
GossipService.LOGGER.debug("---------------------");
|
||||||
|
|
||||||
// First write myself, append the JSON representation of the member to the buffer.
|
// First write myself, append the JSON representation of the member to the buffer.
|
||||||
jsonArray.put(me.toJSONObject());
|
jsonArray.put(me.toJSONObject());
|
||||||
GossipService.debug(me);
|
GossipService.LOGGER.debug(me);
|
||||||
|
|
||||||
// Then write the others.
|
// Then write the others.
|
||||||
for (int i=0; i<memberList.size(); i++) {
|
for (int i=0; i<memberList.size(); i++) {
|
||||||
LocalGossipMember other = memberList.get(i);
|
LocalGossipMember other = memberList.get(i);
|
||||||
// Append the JSON representation of the member to the buffer.
|
// Append the JSON representation of the member to the buffer.
|
||||||
jsonArray.put(other.toJSONObject());
|
jsonArray.put(other.toJSONObject());
|
||||||
GossipService.debug(other);
|
GossipService.LOGGER.debug(other);
|
||||||
}
|
}
|
||||||
GossipService.debug("---------------------");
|
GossipService.LOGGER.debug("---------------------");
|
||||||
|
|
||||||
// Write the objects to a byte array.
|
// Write the objects to a byte array.
|
||||||
byte[] json_bytes = jsonArray.toString().getBytes();
|
byte[] json_bytes = jsonArray.toString().getBytes();
|
||||||
@ -70,7 +70,7 @@ abstract public class SendMembersActiveGossipThread extends ActiveGossipThread {
|
|||||||
length_bytes[3] =(byte)( (packet_length << 24) >> 24 );
|
length_bytes[3] =(byte)( (packet_length << 24) >> 24 );
|
||||||
|
|
||||||
|
|
||||||
GossipService.debug("Sending message ("+packet_length+" bytes): " + jsonArray.toString());
|
GossipService.LOGGER.debug("Sending message ("+packet_length+" bytes): " + jsonArray.toString());
|
||||||
|
|
||||||
ByteBuffer byteBuffer = ByteBuffer.allocate(4 + json_bytes.length);
|
ByteBuffer byteBuffer = ByteBuffer.allocate(4 + json_bytes.length);
|
||||||
byteBuffer.put(length_bytes);
|
byteBuffer.put(length_bytes);
|
||||||
@ -82,7 +82,7 @@ abstract public class SendMembersActiveGossipThread extends ActiveGossipThread {
|
|||||||
socket.send(datagramPacket);
|
socket.send(datagramPacket);
|
||||||
socket.close();
|
socket.close();
|
||||||
} else {
|
} else {
|
||||||
GossipService.error("The length of the to be send message is too large (" + packet_length + " > " + GossipManager.MAX_PACKET_SIZE + ").");
|
GossipService.LOGGER.error("The length of the to be send message is too large (" + packet_length + " > " + GossipManager.MAX_PACKET_SIZE + ").");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -15,8 +15,6 @@ public class RandomActiveGossipThread extends SendMembersActiveGossipThread {
|
|||||||
|
|
||||||
public RandomActiveGossipThread(GossipManager gossipManager) {
|
public RandomActiveGossipThread(GossipManager gossipManager) {
|
||||||
super(gossipManager);
|
super(gossipManager);
|
||||||
|
|
||||||
// Initialize the random used for deciding on which gossip member to gossip with.
|
|
||||||
_random = new Random();
|
_random = new Random();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -28,16 +26,12 @@ public class RandomActiveGossipThread extends SendMembersActiveGossipThread {
|
|||||||
*/
|
*/
|
||||||
protected LocalGossipMember selectPartner(ArrayList<LocalGossipMember> memberList) {
|
protected LocalGossipMember selectPartner(ArrayList<LocalGossipMember> memberList) {
|
||||||
LocalGossipMember member = null;
|
LocalGossipMember member = null;
|
||||||
|
|
||||||
// We can only send a message if there are actually other members.
|
|
||||||
if (memberList.size() > 0) {
|
if (memberList.size() > 0) {
|
||||||
// Get the index of the random member.
|
|
||||||
int randomNeighborIndex = _random.nextInt(memberList.size());
|
int randomNeighborIndex = _random.nextInt(memberList.size());
|
||||||
member = memberList.get(randomNeighborIndex);
|
member = memberList.get(randomNeighborIndex);
|
||||||
} else {
|
} else {
|
||||||
GossipService.debug("I am alone in this world.");
|
GossipService.LOGGER.debug("I am alone in this world.");
|
||||||
}
|
}
|
||||||
|
|
||||||
return member;
|
return member;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -19,13 +19,11 @@ public class TenNodeThreeSeedTest {
|
|||||||
@Test
|
@Test
|
||||||
public void test() throws UnknownHostException, InterruptedException{
|
public void test() throws UnknownHostException, InterruptedException{
|
||||||
GossipSettings settings = new GossipSettings();
|
GossipSettings settings = new GossipSettings();
|
||||||
|
|
||||||
int seedNodes = 3;
|
int seedNodes = 3;
|
||||||
ArrayList<GossipMember> startupMembers = new ArrayList<GossipMember>();
|
ArrayList<GossipMember> startupMembers = new ArrayList<GossipMember>();
|
||||||
for (int i = 1; i < seedNodes+1; ++i) {
|
for (int i = 1; i < seedNodes+1; ++i) {
|
||||||
startupMembers.add(new RemoteGossipMember("127.0.0." + i, 2000));
|
startupMembers.add(new RemoteGossipMember("127.0.0." + i, 2000));
|
||||||
}
|
}
|
||||||
|
|
||||||
ArrayList<GossipService> clients = new ArrayList<GossipService>();
|
ArrayList<GossipService> clients = new ArrayList<GossipService>();
|
||||||
int clusterMembers = 10;
|
int clusterMembers = 10;
|
||||||
for (int i = 1; i < clusterMembers+1; ++i) {
|
for (int i = 1; i < clusterMembers+1; ++i) {
|
||||||
@ -36,6 +34,7 @@ public class TenNodeThreeSeedTest {
|
|||||||
}
|
}
|
||||||
Thread.sleep(10000);
|
Thread.sleep(10000);
|
||||||
for (int i = 0; i < clusterMembers; ++i) {
|
for (int i = 0; i < clusterMembers; ++i) {
|
||||||
|
System.out.println(clients.get(i).get_gossipManager().getMemberList());
|
||||||
Assert.assertEquals(9, clients.get(i).get_gossipManager().getMemberList().size());
|
Assert.assertEquals(9, clients.get(i).get_gossipManager().getMemberList().size());
|
||||||
}
|
}
|
||||||
for (int i = 0; i < clusterMembers; ++i) {
|
for (int i = 0; i < clusterMembers; ++i) {
|
||||||
|
Reference in New Issue
Block a user