Dos2unix this joint
This commit is contained in:
@ -1,58 +1,58 @@
|
||||
package com.google.code.gossip.manager;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import com.google.code.gossip.GossipService;
|
||||
import com.google.code.gossip.LocalGossipMember;
|
||||
|
||||
/**
|
||||
* [The active thread: periodically send gossip request.]
|
||||
* The class handles gossiping the membership list.
|
||||
* This information is important to maintaining a common
|
||||
* state among all the nodes, and is important for detecting
|
||||
* failures.
|
||||
*/
|
||||
abstract public class ActiveGossipThread implements Runnable {
|
||||
|
||||
private GossipManager _gossipManager;
|
||||
|
||||
private final AtomicBoolean _keepRunning;
|
||||
|
||||
public ActiveGossipThread(GossipManager gossipManager) {
|
||||
_gossipManager = gossipManager;
|
||||
_keepRunning = new AtomicBoolean(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
while(_keepRunning.get()) {
|
||||
try {
|
||||
TimeUnit.MILLISECONDS.sleep(_gossipManager.getSettings().getGossipInterval());
|
||||
sendMembershipList(_gossipManager.getMyself(), _gossipManager.getMemberList());
|
||||
} catch (InterruptedException e) {
|
||||
GossipService.LOGGER.error(e);
|
||||
_keepRunning.set(false);
|
||||
}
|
||||
}
|
||||
shutdown();
|
||||
}
|
||||
|
||||
public void shutdown(){
|
||||
_keepRunning.set(false);
|
||||
}
|
||||
/**
|
||||
* Performs the sending of the membership list, after we have
|
||||
* incremented our own heartbeat.
|
||||
*/
|
||||
abstract protected void sendMembershipList(LocalGossipMember me, ArrayList<LocalGossipMember> memberList);
|
||||
|
||||
/**
|
||||
* Abstract method which should be implemented by a subclass.
|
||||
* This method should return a member of the list to gossip with.
|
||||
* @param memberList The list of members which are stored in the local list of members.
|
||||
* @return The chosen LocalGossipMember to gossip with.
|
||||
*/
|
||||
abstract protected LocalGossipMember selectPartner(ArrayList<LocalGossipMember> memberList);
|
||||
}
|
||||
package com.google.code.gossip.manager;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import com.google.code.gossip.GossipService;
|
||||
import com.google.code.gossip.LocalGossipMember;
|
||||
|
||||
/**
|
||||
* [The active thread: periodically send gossip request.]
|
||||
* The class handles gossiping the membership list.
|
||||
* This information is important to maintaining a common
|
||||
* state among all the nodes, and is important for detecting
|
||||
* failures.
|
||||
*/
|
||||
abstract public class ActiveGossipThread implements Runnable {
|
||||
|
||||
private GossipManager _gossipManager;
|
||||
|
||||
private final AtomicBoolean _keepRunning;
|
||||
|
||||
public ActiveGossipThread(GossipManager gossipManager) {
|
||||
_gossipManager = gossipManager;
|
||||
_keepRunning = new AtomicBoolean(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
while(_keepRunning.get()) {
|
||||
try {
|
||||
TimeUnit.MILLISECONDS.sleep(_gossipManager.getSettings().getGossipInterval());
|
||||
sendMembershipList(_gossipManager.getMyself(), _gossipManager.getMemberList());
|
||||
} catch (InterruptedException e) {
|
||||
GossipService.LOGGER.error(e);
|
||||
_keepRunning.set(false);
|
||||
}
|
||||
}
|
||||
shutdown();
|
||||
}
|
||||
|
||||
public void shutdown(){
|
||||
_keepRunning.set(false);
|
||||
}
|
||||
/**
|
||||
* Performs the sending of the membership list, after we have
|
||||
* incremented our own heartbeat.
|
||||
*/
|
||||
abstract protected void sendMembershipList(LocalGossipMember me, ArrayList<LocalGossipMember> memberList);
|
||||
|
||||
/**
|
||||
* Abstract method which should be implemented by a subclass.
|
||||
* This method should return a member of the list to gossip with.
|
||||
* @param memberList The list of members which are stored in the local list of members.
|
||||
* @return The chosen LocalGossipMember to gossip with.
|
||||
*/
|
||||
abstract protected LocalGossipMember selectPartner(ArrayList<LocalGossipMember> memberList);
|
||||
}
|
||||
|
@ -1,157 +1,157 @@
|
||||
package com.google.code.gossip.manager;
|
||||
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import javax.management.Notification;
|
||||
import javax.management.NotificationListener;
|
||||
|
||||
import com.google.code.gossip.GossipMember;
|
||||
import com.google.code.gossip.GossipService;
|
||||
import com.google.code.gossip.GossipSettings;
|
||||
import com.google.code.gossip.LocalGossipMember;
|
||||
|
||||
public abstract class GossipManager extends Thread implements NotificationListener {
|
||||
/** The maximal number of bytes the packet with the GOSSIP may be. (Default is 100 kb) */
|
||||
public static final int MAX_PACKET_SIZE = 102400;
|
||||
|
||||
/** The list of members which are in the gossip group (not including myself). */
|
||||
private ArrayList<LocalGossipMember> _memberList;
|
||||
|
||||
/** The list of members which are known to be dead. */
|
||||
private ArrayList<LocalGossipMember> _deadList;
|
||||
|
||||
/** The member I am representing. */
|
||||
private LocalGossipMember _me;
|
||||
|
||||
/** The settings for gossiping. */
|
||||
private GossipSettings _settings;
|
||||
|
||||
/** A boolean whether the gossip service should keep running. */
|
||||
private AtomicBoolean _gossipServiceRunning;
|
||||
|
||||
/** A ExecutorService used for executing the active and passive gossip threads. */
|
||||
private ExecutorService _gossipThreadExecutor;
|
||||
|
||||
private Class<? extends PassiveGossipThread> _passiveGossipThreadClass;
|
||||
private PassiveGossipThread passiveGossipThread;
|
||||
|
||||
private Class<? extends ActiveGossipThread> _activeGossipThreadClass;
|
||||
private ActiveGossipThread activeGossipThread;
|
||||
|
||||
public GossipManager(Class<? extends PassiveGossipThread> passiveGossipThreadClass,
|
||||
Class<? extends ActiveGossipThread> activeGossipThreadClass, String address, int port,
|
||||
String id, GossipSettings settings, ArrayList<GossipMember> gossipMembers) {
|
||||
_passiveGossipThreadClass = passiveGossipThreadClass;
|
||||
_activeGossipThreadClass = activeGossipThreadClass;
|
||||
_settings = settings;
|
||||
_me = new LocalGossipMember(address, port, id, 0, this, settings.getCleanupInterval());
|
||||
_memberList = new ArrayList<LocalGossipMember>();
|
||||
_deadList = new ArrayList<LocalGossipMember>();
|
||||
for (GossipMember startupMember : gossipMembers) {
|
||||
if (!startupMember.equals(_me)) {
|
||||
LocalGossipMember member = new LocalGossipMember(startupMember.getHost(),
|
||||
startupMember.getPort(), startupMember.getId(), 0, this,
|
||||
settings.getCleanupInterval());
|
||||
_memberList.add(member);
|
||||
GossipService.LOGGER.debug(member);
|
||||
}
|
||||
}
|
||||
|
||||
_gossipServiceRunning = new AtomicBoolean(true);
|
||||
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
|
||||
public void run() {
|
||||
GossipService.LOGGER.info("Service has been shutdown...");
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
/**
|
||||
* All timers associated with a member will trigger this method when it goes
|
||||
* off. The timer will go off if we have not heard from this member in
|
||||
* <code> _settings.T_CLEANUP </code> time.
|
||||
*/
|
||||
@Override
|
||||
public void handleNotification(Notification notification, Object handback) {
|
||||
LocalGossipMember deadMember = (LocalGossipMember) notification.getUserData();
|
||||
GossipService.LOGGER.info("Dead member detected: " + deadMember);
|
||||
synchronized (this._memberList) {
|
||||
this._memberList.remove(deadMember);
|
||||
}
|
||||
synchronized (this._deadList) {
|
||||
this._deadList.add(deadMember);
|
||||
}
|
||||
}
|
||||
|
||||
public GossipSettings getSettings() {
|
||||
return _settings;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a clone of the memberlist.
|
||||
* @return
|
||||
*/
|
||||
public ArrayList<LocalGossipMember> getMemberList() {
|
||||
return _memberList;
|
||||
}
|
||||
|
||||
public LocalGossipMember getMyself() {
|
||||
return _me;
|
||||
}
|
||||
|
||||
public ArrayList<LocalGossipMember> getDeadList() {
|
||||
return _deadList;
|
||||
}
|
||||
|
||||
/**
|
||||
* Starts the client. Specifically, start the various cycles for this protocol.
|
||||
* Start the gossip thread and start the receiver thread.
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
public void run() {
|
||||
for (LocalGossipMember member : _memberList) {
|
||||
if (member != _me) {
|
||||
member.startTimeoutTimer();
|
||||
}
|
||||
}
|
||||
_gossipThreadExecutor = Executors.newCachedThreadPool();
|
||||
try {
|
||||
passiveGossipThread = _passiveGossipThreadClass.getConstructor(GossipManager.class).newInstance(this);
|
||||
_gossipThreadExecutor.execute(passiveGossipThread);
|
||||
activeGossipThread = _activeGossipThreadClass.getConstructor(GossipManager.class).newInstance(this);
|
||||
_gossipThreadExecutor.execute(activeGossipThread);
|
||||
} catch (InstantiationException | IllegalAccessException | IllegalArgumentException
|
||||
| InvocationTargetException | NoSuchMethodException | SecurityException e1) {
|
||||
throw new RuntimeException(e1);
|
||||
}
|
||||
GossipService.LOGGER.info("The GossipService is started.");
|
||||
while(_gossipServiceRunning.get()) {
|
||||
try {
|
||||
//TODO
|
||||
TimeUnit.MILLISECONDS.sleep(1);
|
||||
} catch (InterruptedException e) {
|
||||
GossipService.LOGGER.info("The GossipClient was interrupted.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Shutdown the gossip service.
|
||||
*/
|
||||
public void shutdown() {
|
||||
_gossipThreadExecutor.shutdown();
|
||||
passiveGossipThread.shutdown();
|
||||
activeGossipThread.shutdown();
|
||||
try {
|
||||
boolean result = _gossipThreadExecutor.awaitTermination(1000, TimeUnit.MILLISECONDS);
|
||||
System.err.println("Terminate retuned " + result);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
_gossipServiceRunning.set(false);
|
||||
}
|
||||
}
|
||||
package com.google.code.gossip.manager;
|
||||
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import javax.management.Notification;
|
||||
import javax.management.NotificationListener;
|
||||
|
||||
import com.google.code.gossip.GossipMember;
|
||||
import com.google.code.gossip.GossipService;
|
||||
import com.google.code.gossip.GossipSettings;
|
||||
import com.google.code.gossip.LocalGossipMember;
|
||||
|
||||
public abstract class GossipManager extends Thread implements NotificationListener {
|
||||
/** The maximal number of bytes the packet with the GOSSIP may be. (Default is 100 kb) */
|
||||
public static final int MAX_PACKET_SIZE = 102400;
|
||||
|
||||
/** The list of members which are in the gossip group (not including myself). */
|
||||
private ArrayList<LocalGossipMember> _memberList;
|
||||
|
||||
/** The list of members which are known to be dead. */
|
||||
private ArrayList<LocalGossipMember> _deadList;
|
||||
|
||||
/** The member I am representing. */
|
||||
private LocalGossipMember _me;
|
||||
|
||||
/** The settings for gossiping. */
|
||||
private GossipSettings _settings;
|
||||
|
||||
/** A boolean whether the gossip service should keep running. */
|
||||
private AtomicBoolean _gossipServiceRunning;
|
||||
|
||||
/** A ExecutorService used for executing the active and passive gossip threads. */
|
||||
private ExecutorService _gossipThreadExecutor;
|
||||
|
||||
private Class<? extends PassiveGossipThread> _passiveGossipThreadClass;
|
||||
private PassiveGossipThread passiveGossipThread;
|
||||
|
||||
private Class<? extends ActiveGossipThread> _activeGossipThreadClass;
|
||||
private ActiveGossipThread activeGossipThread;
|
||||
|
||||
public GossipManager(Class<? extends PassiveGossipThread> passiveGossipThreadClass,
|
||||
Class<? extends ActiveGossipThread> activeGossipThreadClass, String address, int port,
|
||||
String id, GossipSettings settings, ArrayList<GossipMember> gossipMembers) {
|
||||
_passiveGossipThreadClass = passiveGossipThreadClass;
|
||||
_activeGossipThreadClass = activeGossipThreadClass;
|
||||
_settings = settings;
|
||||
_me = new LocalGossipMember(address, port, id, 0, this, settings.getCleanupInterval());
|
||||
_memberList = new ArrayList<LocalGossipMember>();
|
||||
_deadList = new ArrayList<LocalGossipMember>();
|
||||
for (GossipMember startupMember : gossipMembers) {
|
||||
if (!startupMember.equals(_me)) {
|
||||
LocalGossipMember member = new LocalGossipMember(startupMember.getHost(),
|
||||
startupMember.getPort(), startupMember.getId(), 0, this,
|
||||
settings.getCleanupInterval());
|
||||
_memberList.add(member);
|
||||
GossipService.LOGGER.debug(member);
|
||||
}
|
||||
}
|
||||
|
||||
_gossipServiceRunning = new AtomicBoolean(true);
|
||||
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
|
||||
public void run() {
|
||||
GossipService.LOGGER.info("Service has been shutdown...");
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
/**
|
||||
* All timers associated with a member will trigger this method when it goes
|
||||
* off. The timer will go off if we have not heard from this member in
|
||||
* <code> _settings.T_CLEANUP </code> time.
|
||||
*/
|
||||
@Override
|
||||
public void handleNotification(Notification notification, Object handback) {
|
||||
LocalGossipMember deadMember = (LocalGossipMember) notification.getUserData();
|
||||
GossipService.LOGGER.info("Dead member detected: " + deadMember);
|
||||
synchronized (this._memberList) {
|
||||
this._memberList.remove(deadMember);
|
||||
}
|
||||
synchronized (this._deadList) {
|
||||
this._deadList.add(deadMember);
|
||||
}
|
||||
}
|
||||
|
||||
public GossipSettings getSettings() {
|
||||
return _settings;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a clone of the memberlist.
|
||||
* @return
|
||||
*/
|
||||
public ArrayList<LocalGossipMember> getMemberList() {
|
||||
return _memberList;
|
||||
}
|
||||
|
||||
public LocalGossipMember getMyself() {
|
||||
return _me;
|
||||
}
|
||||
|
||||
public ArrayList<LocalGossipMember> getDeadList() {
|
||||
return _deadList;
|
||||
}
|
||||
|
||||
/**
|
||||
* Starts the client. Specifically, start the various cycles for this protocol.
|
||||
* Start the gossip thread and start the receiver thread.
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
public void run() {
|
||||
for (LocalGossipMember member : _memberList) {
|
||||
if (member != _me) {
|
||||
member.startTimeoutTimer();
|
||||
}
|
||||
}
|
||||
_gossipThreadExecutor = Executors.newCachedThreadPool();
|
||||
try {
|
||||
passiveGossipThread = _passiveGossipThreadClass.getConstructor(GossipManager.class).newInstance(this);
|
||||
_gossipThreadExecutor.execute(passiveGossipThread);
|
||||
activeGossipThread = _activeGossipThreadClass.getConstructor(GossipManager.class).newInstance(this);
|
||||
_gossipThreadExecutor.execute(activeGossipThread);
|
||||
} catch (InstantiationException | IllegalAccessException | IllegalArgumentException
|
||||
| InvocationTargetException | NoSuchMethodException | SecurityException e1) {
|
||||
throw new RuntimeException(e1);
|
||||
}
|
||||
GossipService.LOGGER.info("The GossipService is started.");
|
||||
while(_gossipServiceRunning.get()) {
|
||||
try {
|
||||
//TODO
|
||||
TimeUnit.MILLISECONDS.sleep(1);
|
||||
} catch (InterruptedException e) {
|
||||
GossipService.LOGGER.info("The GossipClient was interrupted.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Shutdown the gossip service.
|
||||
*/
|
||||
public void shutdown() {
|
||||
_gossipThreadExecutor.shutdown();
|
||||
passiveGossipThread.shutdown();
|
||||
activeGossipThread.shutdown();
|
||||
try {
|
||||
boolean result = _gossipThreadExecutor.awaitTermination(1000, TimeUnit.MILLISECONDS);
|
||||
System.err.println("Terminate retuned " + result);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
_gossipServiceRunning.set(false);
|
||||
}
|
||||
}
|
||||
|
@ -1,142 +1,142 @@
|
||||
package com.google.code.gossip.manager;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.DatagramPacket;
|
||||
import java.net.DatagramSocket;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.SocketAddress;
|
||||
import java.net.SocketException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.json.JSONArray;
|
||||
import org.json.JSONException;
|
||||
import org.json.JSONObject;
|
||||
|
||||
import com.google.code.gossip.GossipMember;
|
||||
import com.google.code.gossip.GossipService;
|
||||
import com.google.code.gossip.RemoteGossipMember;
|
||||
|
||||
/**
|
||||
* [The passive thread: reply to incoming gossip request.]
|
||||
* This class handles the passive cycle, where this client
|
||||
* has received an incoming message. For now, this message
|
||||
* is always the membership list, but if you choose to gossip
|
||||
* additional information, you will need some logic to determine
|
||||
* the incoming message.
|
||||
*/
|
||||
abstract public class PassiveGossipThread implements Runnable {
|
||||
|
||||
/** The socket used for the passive thread of the gossip service. */
|
||||
private DatagramSocket _server;
|
||||
|
||||
private GossipManager _gossipManager;
|
||||
|
||||
private AtomicBoolean _keepRunning;
|
||||
|
||||
public PassiveGossipThread(GossipManager gossipManager) {
|
||||
_gossipManager = gossipManager;
|
||||
try {
|
||||
SocketAddress socketAddress = new InetSocketAddress(_gossipManager.getMyself().getHost(), _gossipManager.getMyself().getPort());
|
||||
_server = new DatagramSocket(socketAddress);
|
||||
GossipService.LOGGER.info("Gossip service successfully initialized on port " + _gossipManager.getMyself().getPort());
|
||||
GossipService.LOGGER.debug("I am " + _gossipManager.getMyself());
|
||||
} catch (SocketException ex) {
|
||||
GossipService.LOGGER.error(ex);
|
||||
_server = null;
|
||||
throw new RuntimeException(ex);
|
||||
}
|
||||
_keepRunning = new AtomicBoolean(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
while (_keepRunning.get()) {
|
||||
try {
|
||||
byte[] buf = new byte[_server.getReceiveBufferSize()];
|
||||
DatagramPacket p = new DatagramPacket(buf, buf.length);
|
||||
_server.receive(p);
|
||||
GossipService.LOGGER.debug("A message has been received from " + p.getAddress() + ":"
|
||||
+ p.getPort() + ".");
|
||||
|
||||
int packet_length = 0;
|
||||
for (int i = 0; i < 4; i++) {
|
||||
int shift = (4 - 1 - i) * 8;
|
||||
packet_length += (buf[i] & 0x000000FF) << shift;
|
||||
}
|
||||
|
||||
// Check whether the package is smaller than the maximal packet length.
|
||||
// A package larger than this would not be possible to be send from a GossipService,
|
||||
// since this is check before sending the message.
|
||||
// This could normally only occur when the list of members is very big,
|
||||
// or when the packet is misformed, and the first 4 bytes is not the right in anymore.
|
||||
// For this reason we regards the message.
|
||||
if (packet_length <= GossipManager.MAX_PACKET_SIZE) {
|
||||
byte[] json_bytes = new byte[packet_length];
|
||||
for (int i = 0; i < packet_length; i++) {
|
||||
json_bytes[i] = buf[i + 4];
|
||||
}
|
||||
String receivedMessage = new String(json_bytes);
|
||||
GossipService.LOGGER.debug("Received message (" + packet_length + " bytes): "
|
||||
+ receivedMessage);
|
||||
try {
|
||||
ArrayList<GossipMember> remoteGossipMembers = new ArrayList<GossipMember>();
|
||||
RemoteGossipMember senderMember = null;
|
||||
GossipService.LOGGER.debug("Received member list:");
|
||||
JSONArray jsonArray = new JSONArray(receivedMessage);
|
||||
for (int i = 0; i < jsonArray.length(); i++) {
|
||||
JSONObject memberJSONObject = jsonArray.getJSONObject(i);
|
||||
if (memberJSONObject.length() == 4) {
|
||||
RemoteGossipMember member = new RemoteGossipMember(
|
||||
memberJSONObject.getString(GossipMember.JSON_HOST),
|
||||
memberJSONObject.getInt(GossipMember.JSON_PORT),
|
||||
memberJSONObject.getString(GossipMember.JSON_ID),
|
||||
memberJSONObject.getInt(GossipMember.JSON_HEARTBEAT));
|
||||
GossipService.LOGGER.debug(member.toString());
|
||||
// This is the first member found, so this should be the member who is communicating
|
||||
// with me.
|
||||
if (i == 0) {
|
||||
senderMember = member;
|
||||
}
|
||||
remoteGossipMembers.add(member);
|
||||
} else {
|
||||
GossipService.LOGGER
|
||||
.error("The received member object does not contain 4 objects:\n"
|
||||
+ memberJSONObject.toString());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// Merge our list with the one we just received
|
||||
mergeLists(_gossipManager, senderMember, remoteGossipMembers);
|
||||
} catch (JSONException e) {
|
||||
GossipService.LOGGER
|
||||
.error("The received message is not well-formed JSON. The following message has been dropped:\n"
|
||||
+ receivedMessage);
|
||||
}
|
||||
|
||||
} else {
|
||||
GossipService.LOGGER
|
||||
.error("The received message is not of the expected size, it has been dropped.");
|
||||
}
|
||||
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
_keepRunning.set(false);
|
||||
}
|
||||
}
|
||||
shutdown();
|
||||
}
|
||||
|
||||
public void shutdown(){
|
||||
_server.close();
|
||||
}
|
||||
|
||||
/**
|
||||
* Abstract method for merging the local and remote list.
|
||||
* @param gossipManager The GossipManager for retrieving the local members and dead members list.
|
||||
* @param senderMember The member who is sending this list, this could be used to send a response if the remote list contains out-dated information.
|
||||
* @param remoteList The list of members known at the remote side.
|
||||
*/
|
||||
abstract protected void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember, ArrayList<GossipMember> remoteList);
|
||||
package com.google.code.gossip.manager;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.DatagramPacket;
|
||||
import java.net.DatagramSocket;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.SocketAddress;
|
||||
import java.net.SocketException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.json.JSONArray;
|
||||
import org.json.JSONException;
|
||||
import org.json.JSONObject;
|
||||
|
||||
import com.google.code.gossip.GossipMember;
|
||||
import com.google.code.gossip.GossipService;
|
||||
import com.google.code.gossip.RemoteGossipMember;
|
||||
|
||||
/**
|
||||
* [The passive thread: reply to incoming gossip request.]
|
||||
* This class handles the passive cycle, where this client
|
||||
* has received an incoming message. For now, this message
|
||||
* is always the membership list, but if you choose to gossip
|
||||
* additional information, you will need some logic to determine
|
||||
* the incoming message.
|
||||
*/
|
||||
abstract public class PassiveGossipThread implements Runnable {
|
||||
|
||||
/** The socket used for the passive thread of the gossip service. */
|
||||
private DatagramSocket _server;
|
||||
|
||||
private GossipManager _gossipManager;
|
||||
|
||||
private AtomicBoolean _keepRunning;
|
||||
|
||||
public PassiveGossipThread(GossipManager gossipManager) {
|
||||
_gossipManager = gossipManager;
|
||||
try {
|
||||
SocketAddress socketAddress = new InetSocketAddress(_gossipManager.getMyself().getHost(), _gossipManager.getMyself().getPort());
|
||||
_server = new DatagramSocket(socketAddress);
|
||||
GossipService.LOGGER.info("Gossip service successfully initialized on port " + _gossipManager.getMyself().getPort());
|
||||
GossipService.LOGGER.debug("I am " + _gossipManager.getMyself());
|
||||
} catch (SocketException ex) {
|
||||
GossipService.LOGGER.error(ex);
|
||||
_server = null;
|
||||
throw new RuntimeException(ex);
|
||||
}
|
||||
_keepRunning = new AtomicBoolean(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
while (_keepRunning.get()) {
|
||||
try {
|
||||
byte[] buf = new byte[_server.getReceiveBufferSize()];
|
||||
DatagramPacket p = new DatagramPacket(buf, buf.length);
|
||||
_server.receive(p);
|
||||
GossipService.LOGGER.debug("A message has been received from " + p.getAddress() + ":"
|
||||
+ p.getPort() + ".");
|
||||
|
||||
int packet_length = 0;
|
||||
for (int i = 0; i < 4; i++) {
|
||||
int shift = (4 - 1 - i) * 8;
|
||||
packet_length += (buf[i] & 0x000000FF) << shift;
|
||||
}
|
||||
|
||||
// Check whether the package is smaller than the maximal packet length.
|
||||
// A package larger than this would not be possible to be send from a GossipService,
|
||||
// since this is check before sending the message.
|
||||
// This could normally only occur when the list of members is very big,
|
||||
// or when the packet is misformed, and the first 4 bytes is not the right in anymore.
|
||||
// For this reason we regards the message.
|
||||
if (packet_length <= GossipManager.MAX_PACKET_SIZE) {
|
||||
byte[] json_bytes = new byte[packet_length];
|
||||
for (int i = 0; i < packet_length; i++) {
|
||||
json_bytes[i] = buf[i + 4];
|
||||
}
|
||||
String receivedMessage = new String(json_bytes);
|
||||
GossipService.LOGGER.debug("Received message (" + packet_length + " bytes): "
|
||||
+ receivedMessage);
|
||||
try {
|
||||
ArrayList<GossipMember> remoteGossipMembers = new ArrayList<GossipMember>();
|
||||
RemoteGossipMember senderMember = null;
|
||||
GossipService.LOGGER.debug("Received member list:");
|
||||
JSONArray jsonArray = new JSONArray(receivedMessage);
|
||||
for (int i = 0; i < jsonArray.length(); i++) {
|
||||
JSONObject memberJSONObject = jsonArray.getJSONObject(i);
|
||||
if (memberJSONObject.length() == 4) {
|
||||
RemoteGossipMember member = new RemoteGossipMember(
|
||||
memberJSONObject.getString(GossipMember.JSON_HOST),
|
||||
memberJSONObject.getInt(GossipMember.JSON_PORT),
|
||||
memberJSONObject.getString(GossipMember.JSON_ID),
|
||||
memberJSONObject.getInt(GossipMember.JSON_HEARTBEAT));
|
||||
GossipService.LOGGER.debug(member.toString());
|
||||
// This is the first member found, so this should be the member who is communicating
|
||||
// with me.
|
||||
if (i == 0) {
|
||||
senderMember = member;
|
||||
}
|
||||
remoteGossipMembers.add(member);
|
||||
} else {
|
||||
GossipService.LOGGER
|
||||
.error("The received member object does not contain 4 objects:\n"
|
||||
+ memberJSONObject.toString());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// Merge our list with the one we just received
|
||||
mergeLists(_gossipManager, senderMember, remoteGossipMembers);
|
||||
} catch (JSONException e) {
|
||||
GossipService.LOGGER
|
||||
.error("The received message is not well-formed JSON. The following message has been dropped:\n"
|
||||
+ receivedMessage);
|
||||
}
|
||||
|
||||
} else {
|
||||
GossipService.LOGGER
|
||||
.error("The received message is not of the expected size, it has been dropped.");
|
||||
}
|
||||
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
_keepRunning.set(false);
|
||||
}
|
||||
}
|
||||
shutdown();
|
||||
}
|
||||
|
||||
public void shutdown(){
|
||||
_server.close();
|
||||
}
|
||||
|
||||
/**
|
||||
* Abstract method for merging the local and remote list.
|
||||
* @param gossipManager The GossipManager for retrieving the local members and dead members list.
|
||||
* @param senderMember The member who is sending this list, this could be used to send a response if the remote list contains out-dated information.
|
||||
* @param remoteList The list of members known at the remote side.
|
||||
*/
|
||||
abstract protected void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember, ArrayList<GossipMember> remoteList);
|
||||
}
|
@ -1,93 +1,93 @@
|
||||
package com.google.code.gossip.manager.impl;
|
||||
|
||||
import java.util.ArrayList;
|
||||
|
||||
import com.google.code.gossip.GossipMember;
|
||||
import com.google.code.gossip.GossipService;
|
||||
import com.google.code.gossip.LocalGossipMember;
|
||||
import com.google.code.gossip.RemoteGossipMember;
|
||||
import com.google.code.gossip.manager.GossipManager;
|
||||
import com.google.code.gossip.manager.PassiveGossipThread;
|
||||
|
||||
public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread {
|
||||
|
||||
public OnlyProcessReceivedPassiveGossipThread(GossipManager gossipManager) {
|
||||
super(gossipManager);
|
||||
}
|
||||
|
||||
/**
|
||||
* Merge remote list (received from peer), and our local member list.
|
||||
* Simply, we must update the heartbeats that the remote list has with
|
||||
* our list. Also, some additional logic is needed to make sure we have
|
||||
* not timed out a member and then immediately received a list with that
|
||||
* member.
|
||||
* @param remoteList
|
||||
*/
|
||||
protected void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember, ArrayList<GossipMember> remoteList) {
|
||||
|
||||
synchronized (gossipManager.getDeadList()) {
|
||||
|
||||
synchronized (gossipManager.getMemberList()) {
|
||||
|
||||
for (GossipMember remoteMember : remoteList) {
|
||||
// Skip myself. We don't want ourselves in the local member list.
|
||||
if (!remoteMember.equals(gossipManager.getMyself())) {
|
||||
if (gossipManager.getMemberList().contains(remoteMember)) {
|
||||
GossipService.LOGGER.debug("The local list already contains the remote member (" + remoteMember + ").");
|
||||
// The local memberlist contains the remote member.
|
||||
LocalGossipMember localMember = gossipManager.getMemberList().get(gossipManager.getMemberList().indexOf(remoteMember));
|
||||
|
||||
// Let's synchronize it's heartbeat.
|
||||
if (remoteMember.getHeartbeat() > localMember.getHeartbeat()) {
|
||||
// update local list with latest heartbeat
|
||||
localMember.setHeartbeat(remoteMember.getHeartbeat());
|
||||
// and reset the timeout of that member
|
||||
localMember.resetTimeoutTimer();
|
||||
}
|
||||
// TODO: Otherwise, should we inform the other when the heartbeat is already higher?
|
||||
} else {
|
||||
// The local list does not contain the remote member.
|
||||
GossipService.LOGGER.debug("The local list does not contain the remote member (" + remoteMember + ").");
|
||||
|
||||
// The remote member is either brand new, or a previously declared dead member.
|
||||
// If its dead, check the heartbeat because it may have come back from the dead.
|
||||
if (gossipManager.getDeadList().contains(remoteMember)) {
|
||||
// The remote member is known here as a dead member.
|
||||
GossipService.LOGGER.debug("The remote member is known here as a dead member.");
|
||||
LocalGossipMember localDeadMember = gossipManager.getDeadList().get(gossipManager.getDeadList().indexOf(remoteMember));
|
||||
// If a member is restarted the heartbeat will restart from 1, so we should check that here.
|
||||
// So a member can become from the dead when it is either larger than a previous heartbeat (due to network failure)
|
||||
// or when the heartbeat is 1 (after a restart of the service).
|
||||
// TODO: What if the first message of a gossip service is sent to a dead node? The second member will receive a heartbeat of two.
|
||||
// TODO: The above does happen. Maybe a special message for a revived member?
|
||||
// TODO: Or maybe when a member is declared dead for more than _settings.getCleanupInterval() ms, reset the heartbeat to 0.
|
||||
// It will then accept a revived member.
|
||||
// The above is now handle by checking whether the heartbeat differs _settings.getCleanupInterval(), it must be restarted.
|
||||
if (remoteMember.getHeartbeat() == 1
|
||||
|| ((localDeadMember.getHeartbeat() - remoteMember.getHeartbeat()) * -1) > (gossipManager.getSettings().getCleanupInterval() / 1000)
|
||||
|| remoteMember.getHeartbeat() > localDeadMember.getHeartbeat()) {
|
||||
GossipService.LOGGER.debug("The remote member is back from the dead. We will remove it from the dead list and add it as a new member.");
|
||||
// The remote member is back from the dead.
|
||||
// Remove it from the dead list.
|
||||
gossipManager.getDeadList().remove(localDeadMember);
|
||||
// Add it as a new member and add it to the member list.
|
||||
LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getHost(), remoteMember.getPort(), remoteMember.getId(), remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings().getCleanupInterval());
|
||||
gossipManager.getMemberList().add(newLocalMember);
|
||||
newLocalMember.startTimeoutTimer();
|
||||
GossipService.LOGGER.info("Removed remote member " + remoteMember.getAddress() + " from dead list and added to local member list.");
|
||||
}
|
||||
} else {
|
||||
// Brand spanking new member - welcome.
|
||||
LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getHost(), remoteMember.getPort(), remoteMember.getId(), remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings().getCleanupInterval());
|
||||
gossipManager.getMemberList().add(newLocalMember);
|
||||
newLocalMember.startTimeoutTimer();
|
||||
GossipService.LOGGER.info("Added new remote member " + remoteMember.getAddress() + " to local member list.");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
package com.google.code.gossip.manager.impl;
|
||||
|
||||
import java.util.ArrayList;
|
||||
|
||||
import com.google.code.gossip.GossipMember;
|
||||
import com.google.code.gossip.GossipService;
|
||||
import com.google.code.gossip.LocalGossipMember;
|
||||
import com.google.code.gossip.RemoteGossipMember;
|
||||
import com.google.code.gossip.manager.GossipManager;
|
||||
import com.google.code.gossip.manager.PassiveGossipThread;
|
||||
|
||||
public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread {
|
||||
|
||||
public OnlyProcessReceivedPassiveGossipThread(GossipManager gossipManager) {
|
||||
super(gossipManager);
|
||||
}
|
||||
|
||||
/**
|
||||
* Merge remote list (received from peer), and our local member list.
|
||||
* Simply, we must update the heartbeats that the remote list has with
|
||||
* our list. Also, some additional logic is needed to make sure we have
|
||||
* not timed out a member and then immediately received a list with that
|
||||
* member.
|
||||
* @param remoteList
|
||||
*/
|
||||
protected void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember, ArrayList<GossipMember> remoteList) {
|
||||
|
||||
synchronized (gossipManager.getDeadList()) {
|
||||
|
||||
synchronized (gossipManager.getMemberList()) {
|
||||
|
||||
for (GossipMember remoteMember : remoteList) {
|
||||
// Skip myself. We don't want ourselves in the local member list.
|
||||
if (!remoteMember.equals(gossipManager.getMyself())) {
|
||||
if (gossipManager.getMemberList().contains(remoteMember)) {
|
||||
GossipService.LOGGER.debug("The local list already contains the remote member (" + remoteMember + ").");
|
||||
// The local memberlist contains the remote member.
|
||||
LocalGossipMember localMember = gossipManager.getMemberList().get(gossipManager.getMemberList().indexOf(remoteMember));
|
||||
|
||||
// Let's synchronize it's heartbeat.
|
||||
if (remoteMember.getHeartbeat() > localMember.getHeartbeat()) {
|
||||
// update local list with latest heartbeat
|
||||
localMember.setHeartbeat(remoteMember.getHeartbeat());
|
||||
// and reset the timeout of that member
|
||||
localMember.resetTimeoutTimer();
|
||||
}
|
||||
// TODO: Otherwise, should we inform the other when the heartbeat is already higher?
|
||||
} else {
|
||||
// The local list does not contain the remote member.
|
||||
GossipService.LOGGER.debug("The local list does not contain the remote member (" + remoteMember + ").");
|
||||
|
||||
// The remote member is either brand new, or a previously declared dead member.
|
||||
// If its dead, check the heartbeat because it may have come back from the dead.
|
||||
if (gossipManager.getDeadList().contains(remoteMember)) {
|
||||
// The remote member is known here as a dead member.
|
||||
GossipService.LOGGER.debug("The remote member is known here as a dead member.");
|
||||
LocalGossipMember localDeadMember = gossipManager.getDeadList().get(gossipManager.getDeadList().indexOf(remoteMember));
|
||||
// If a member is restarted the heartbeat will restart from 1, so we should check that here.
|
||||
// So a member can become from the dead when it is either larger than a previous heartbeat (due to network failure)
|
||||
// or when the heartbeat is 1 (after a restart of the service).
|
||||
// TODO: What if the first message of a gossip service is sent to a dead node? The second member will receive a heartbeat of two.
|
||||
// TODO: The above does happen. Maybe a special message for a revived member?
|
||||
// TODO: Or maybe when a member is declared dead for more than _settings.getCleanupInterval() ms, reset the heartbeat to 0.
|
||||
// It will then accept a revived member.
|
||||
// The above is now handle by checking whether the heartbeat differs _settings.getCleanupInterval(), it must be restarted.
|
||||
if (remoteMember.getHeartbeat() == 1
|
||||
|| ((localDeadMember.getHeartbeat() - remoteMember.getHeartbeat()) * -1) > (gossipManager.getSettings().getCleanupInterval() / 1000)
|
||||
|| remoteMember.getHeartbeat() > localDeadMember.getHeartbeat()) {
|
||||
GossipService.LOGGER.debug("The remote member is back from the dead. We will remove it from the dead list and add it as a new member.");
|
||||
// The remote member is back from the dead.
|
||||
// Remove it from the dead list.
|
||||
gossipManager.getDeadList().remove(localDeadMember);
|
||||
// Add it as a new member and add it to the member list.
|
||||
LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getHost(), remoteMember.getPort(), remoteMember.getId(), remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings().getCleanupInterval());
|
||||
gossipManager.getMemberList().add(newLocalMember);
|
||||
newLocalMember.startTimeoutTimer();
|
||||
GossipService.LOGGER.info("Removed remote member " + remoteMember.getAddress() + " from dead list and added to local member list.");
|
||||
}
|
||||
} else {
|
||||
// Brand spanking new member - welcome.
|
||||
LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getHost(), remoteMember.getPort(), remoteMember.getId(), remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings().getCleanupInterval());
|
||||
gossipManager.getMemberList().add(newLocalMember);
|
||||
newLocalMember.startTimeoutTimer();
|
||||
GossipService.LOGGER.info("Added new remote member " + remoteMember.getAddress() + " to local member list.");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -1,94 +1,94 @@
|
||||
package com.google.code.gossip.manager.impl;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.DatagramPacket;
|
||||
import java.net.DatagramSocket;
|
||||
import java.net.InetAddress;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
|
||||
import org.json.JSONArray;
|
||||
|
||||
import com.google.code.gossip.GossipService;
|
||||
import com.google.code.gossip.LocalGossipMember;
|
||||
import com.google.code.gossip.manager.ActiveGossipThread;
|
||||
import com.google.code.gossip.manager.GossipManager;
|
||||
|
||||
abstract public class SendMembersActiveGossipThread extends ActiveGossipThread {
|
||||
|
||||
public SendMembersActiveGossipThread(GossipManager gossipManager) {
|
||||
super(gossipManager);
|
||||
}
|
||||
|
||||
/**
|
||||
* Performs the sending of the membership list, after we have
|
||||
* incremented our own heartbeat.
|
||||
*/
|
||||
protected void sendMembershipList(LocalGossipMember me, ArrayList<LocalGossipMember> memberList) {
|
||||
GossipService.LOGGER.debug("Send sendMembershipList() is called.");
|
||||
|
||||
// Increase the heartbeat of myself by 1.
|
||||
me.setHeartbeat(me.getHeartbeat() + 1);
|
||||
|
||||
synchronized (memberList) {
|
||||
try {
|
||||
LocalGossipMember member = selectPartner(memberList);
|
||||
|
||||
if (member != null) {
|
||||
InetAddress dest = InetAddress.getByName(member.getHost());
|
||||
|
||||
// Create a StringBuffer for the JSON message.
|
||||
JSONArray jsonArray = new JSONArray();
|
||||
GossipService.LOGGER.debug("Sending memberlist to " + dest + ":" + member.getPort());
|
||||
GossipService.LOGGER.debug("---------------------");
|
||||
|
||||
// First write myself, append the JSON representation of the member to the buffer.
|
||||
jsonArray.put(me.toJSONObject());
|
||||
GossipService.LOGGER.debug(me);
|
||||
|
||||
// Then write the others.
|
||||
for (int i=0; i<memberList.size(); i++) {
|
||||
LocalGossipMember other = memberList.get(i);
|
||||
// Append the JSON representation of the member to the buffer.
|
||||
jsonArray.put(other.toJSONObject());
|
||||
GossipService.LOGGER.debug(other);
|
||||
}
|
||||
GossipService.LOGGER.debug("---------------------");
|
||||
|
||||
// Write the objects to a byte array.
|
||||
byte[] json_bytes = jsonArray.toString().getBytes();
|
||||
|
||||
int packet_length = json_bytes.length;
|
||||
|
||||
if (packet_length < GossipManager.MAX_PACKET_SIZE) {
|
||||
|
||||
// Convert the packet length to the byte representation of the int.
|
||||
byte[] length_bytes = new byte[4];
|
||||
length_bytes[0] =(byte)( packet_length >> 24 );
|
||||
length_bytes[1] =(byte)( (packet_length << 8) >> 24 );
|
||||
length_bytes[2] =(byte)( (packet_length << 16) >> 24 );
|
||||
length_bytes[3] =(byte)( (packet_length << 24) >> 24 );
|
||||
|
||||
|
||||
GossipService.LOGGER.debug("Sending message ("+packet_length+" bytes): " + jsonArray.toString());
|
||||
|
||||
ByteBuffer byteBuffer = ByteBuffer.allocate(4 + json_bytes.length);
|
||||
byteBuffer.put(length_bytes);
|
||||
byteBuffer.put(json_bytes);
|
||||
byte[] buf = byteBuffer.array();
|
||||
|
||||
DatagramSocket socket = new DatagramSocket();
|
||||
DatagramPacket datagramPacket = new DatagramPacket(buf, buf.length, dest, member.getPort());
|
||||
socket.send(datagramPacket);
|
||||
socket.close();
|
||||
} else {
|
||||
GossipService.LOGGER.error("The length of the to be send message is too large (" + packet_length + " > " + GossipManager.MAX_PACKET_SIZE + ").");
|
||||
}
|
||||
}
|
||||
|
||||
} catch (IOException e1) {
|
||||
e1.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
package com.google.code.gossip.manager.impl;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.DatagramPacket;
|
||||
import java.net.DatagramSocket;
|
||||
import java.net.InetAddress;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
|
||||
import org.json.JSONArray;
|
||||
|
||||
import com.google.code.gossip.GossipService;
|
||||
import com.google.code.gossip.LocalGossipMember;
|
||||
import com.google.code.gossip.manager.ActiveGossipThread;
|
||||
import com.google.code.gossip.manager.GossipManager;
|
||||
|
||||
abstract public class SendMembersActiveGossipThread extends ActiveGossipThread {
|
||||
|
||||
public SendMembersActiveGossipThread(GossipManager gossipManager) {
|
||||
super(gossipManager);
|
||||
}
|
||||
|
||||
/**
|
||||
* Performs the sending of the membership list, after we have
|
||||
* incremented our own heartbeat.
|
||||
*/
|
||||
protected void sendMembershipList(LocalGossipMember me, ArrayList<LocalGossipMember> memberList) {
|
||||
GossipService.LOGGER.debug("Send sendMembershipList() is called.");
|
||||
|
||||
// Increase the heartbeat of myself by 1.
|
||||
me.setHeartbeat(me.getHeartbeat() + 1);
|
||||
|
||||
synchronized (memberList) {
|
||||
try {
|
||||
LocalGossipMember member = selectPartner(memberList);
|
||||
|
||||
if (member != null) {
|
||||
InetAddress dest = InetAddress.getByName(member.getHost());
|
||||
|
||||
// Create a StringBuffer for the JSON message.
|
||||
JSONArray jsonArray = new JSONArray();
|
||||
GossipService.LOGGER.debug("Sending memberlist to " + dest + ":" + member.getPort());
|
||||
GossipService.LOGGER.debug("---------------------");
|
||||
|
||||
// First write myself, append the JSON representation of the member to the buffer.
|
||||
jsonArray.put(me.toJSONObject());
|
||||
GossipService.LOGGER.debug(me);
|
||||
|
||||
// Then write the others.
|
||||
for (int i=0; i<memberList.size(); i++) {
|
||||
LocalGossipMember other = memberList.get(i);
|
||||
// Append the JSON representation of the member to the buffer.
|
||||
jsonArray.put(other.toJSONObject());
|
||||
GossipService.LOGGER.debug(other);
|
||||
}
|
||||
GossipService.LOGGER.debug("---------------------");
|
||||
|
||||
// Write the objects to a byte array.
|
||||
byte[] json_bytes = jsonArray.toString().getBytes();
|
||||
|
||||
int packet_length = json_bytes.length;
|
||||
|
||||
if (packet_length < GossipManager.MAX_PACKET_SIZE) {
|
||||
|
||||
// Convert the packet length to the byte representation of the int.
|
||||
byte[] length_bytes = new byte[4];
|
||||
length_bytes[0] =(byte)( packet_length >> 24 );
|
||||
length_bytes[1] =(byte)( (packet_length << 8) >> 24 );
|
||||
length_bytes[2] =(byte)( (packet_length << 16) >> 24 );
|
||||
length_bytes[3] =(byte)( (packet_length << 24) >> 24 );
|
||||
|
||||
|
||||
GossipService.LOGGER.debug("Sending message ("+packet_length+" bytes): " + jsonArray.toString());
|
||||
|
||||
ByteBuffer byteBuffer = ByteBuffer.allocate(4 + json_bytes.length);
|
||||
byteBuffer.put(length_bytes);
|
||||
byteBuffer.put(json_bytes);
|
||||
byte[] buf = byteBuffer.array();
|
||||
|
||||
DatagramSocket socket = new DatagramSocket();
|
||||
DatagramPacket datagramPacket = new DatagramPacket(buf, buf.length, dest, member.getPort());
|
||||
socket.send(datagramPacket);
|
||||
socket.close();
|
||||
} else {
|
||||
GossipService.LOGGER.error("The length of the to be send message is too large (" + packet_length + " > " + GossipManager.MAX_PACKET_SIZE + ").");
|
||||
}
|
||||
}
|
||||
|
||||
} catch (IOException e1) {
|
||||
e1.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,38 +1,38 @@
|
||||
package com.google.code.gossip.manager.random;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Random;
|
||||
|
||||
import com.google.code.gossip.GossipService;
|
||||
import com.google.code.gossip.LocalGossipMember;
|
||||
import com.google.code.gossip.manager.GossipManager;
|
||||
import com.google.code.gossip.manager.impl.SendMembersActiveGossipThread;
|
||||
|
||||
public class RandomActiveGossipThread extends SendMembersActiveGossipThread {
|
||||
|
||||
/** The Random used for choosing a member to gossip with. */
|
||||
private Random _random;
|
||||
|
||||
public RandomActiveGossipThread(GossipManager gossipManager) {
|
||||
super(gossipManager);
|
||||
_random = new Random();
|
||||
}
|
||||
|
||||
/**
|
||||
* [The selectToSend() function.]
|
||||
* Find a random peer from the local membership list.
|
||||
* In the case where this client is the only member in the list, this method will return null.
|
||||
* @return Member random member if list is greater than 1, null otherwise
|
||||
*/
|
||||
protected LocalGossipMember selectPartner(ArrayList<LocalGossipMember> memberList) {
|
||||
LocalGossipMember member = null;
|
||||
if (memberList.size() > 0) {
|
||||
int randomNeighborIndex = _random.nextInt(memberList.size());
|
||||
member = memberList.get(randomNeighborIndex);
|
||||
} else {
|
||||
GossipService.LOGGER.debug("I am alone in this world.");
|
||||
}
|
||||
return member;
|
||||
}
|
||||
|
||||
}
|
||||
package com.google.code.gossip.manager.random;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Random;
|
||||
|
||||
import com.google.code.gossip.GossipService;
|
||||
import com.google.code.gossip.LocalGossipMember;
|
||||
import com.google.code.gossip.manager.GossipManager;
|
||||
import com.google.code.gossip.manager.impl.SendMembersActiveGossipThread;
|
||||
|
||||
public class RandomActiveGossipThread extends SendMembersActiveGossipThread {
|
||||
|
||||
/** The Random used for choosing a member to gossip with. */
|
||||
private Random _random;
|
||||
|
||||
public RandomActiveGossipThread(GossipManager gossipManager) {
|
||||
super(gossipManager);
|
||||
_random = new Random();
|
||||
}
|
||||
|
||||
/**
|
||||
* [The selectToSend() function.]
|
||||
* Find a random peer from the local membership list.
|
||||
* In the case where this client is the only member in the list, this method will return null.
|
||||
* @return Member random member if list is greater than 1, null otherwise
|
||||
*/
|
||||
protected LocalGossipMember selectPartner(ArrayList<LocalGossipMember> memberList) {
|
||||
LocalGossipMember member = null;
|
||||
if (memberList.size() > 0) {
|
||||
int randomNeighborIndex = _random.nextInt(memberList.size());
|
||||
member = memberList.get(randomNeighborIndex);
|
||||
} else {
|
||||
GossipService.LOGGER.debug("I am alone in this world.");
|
||||
}
|
||||
return member;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -1,14 +1,14 @@
|
||||
package com.google.code.gossip.manager.random;
|
||||
|
||||
import java.util.ArrayList;
|
||||
|
||||
import com.google.code.gossip.GossipMember;
|
||||
import com.google.code.gossip.GossipSettings;
|
||||
import com.google.code.gossip.manager.GossipManager;
|
||||
import com.google.code.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread;
|
||||
|
||||
public class RandomGossipManager extends GossipManager {
|
||||
public RandomGossipManager(String address, int port, String id, GossipSettings settings, ArrayList<GossipMember> gossipMembers) {
|
||||
super(OnlyProcessReceivedPassiveGossipThread.class, RandomActiveGossipThread.class, address, port, id, settings, gossipMembers);
|
||||
}
|
||||
}
|
||||
package com.google.code.gossip.manager.random;
|
||||
|
||||
import java.util.ArrayList;
|
||||
|
||||
import com.google.code.gossip.GossipMember;
|
||||
import com.google.code.gossip.GossipSettings;
|
||||
import com.google.code.gossip.manager.GossipManager;
|
||||
import com.google.code.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread;
|
||||
|
||||
public class RandomGossipManager extends GossipManager {
|
||||
public RandomGossipManager(String address, int port, String id, GossipSettings settings, ArrayList<GossipMember> gossipMembers) {
|
||||
super(OnlyProcessReceivedPassiveGossipThread.class, RandomActiveGossipThread.class, address, port, id, settings, gossipMembers);
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user