Move to URI in model and configuration

This commit is contained in:
Edward Capriolo
2016-06-02 09:23:55 -04:00
parent 3ca8e0f9c8
commit 5532585e67
16 changed files with 153 additions and 162 deletions

View File

@ -18,6 +18,8 @@
package org.apache.gossip.manager;
import java.lang.reflect.InvocationTargetException;
import java.net.BindException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@ -68,18 +70,18 @@ public abstract class GossipManager extends Thread implements NotificationListen
public GossipManager(Class<? extends PassiveGossipThread> passiveGossipThreadClass,
Class<? extends ActiveGossipThread> activeGossipThreadClass, String cluster,
String address, int port, String id, GossipSettings settings,
URI uri, String id, GossipSettings settings,
List<GossipMember> gossipMembers, GossipListener listener) {
this.passiveGossipThreadClass = passiveGossipThreadClass;
this.activeGossipThreadClass = activeGossipThreadClass;
this.settings = settings;
me = new LocalGossipMember(cluster, address, port, id, System.currentTimeMillis(), this,
me = new LocalGossipMember(cluster, uri, id, System.currentTimeMillis(), this,
settings.getCleanupInterval());
members = new ConcurrentSkipListMap<>();
for (GossipMember startupMember : gossipMembers) {
if (!startupMember.equals(me)) {
LocalGossipMember member = new LocalGossipMember(startupMember.getClusterName(),
startupMember.getHost(), startupMember.getPort(), startupMember.getId(),
startupMember.getUri(), startupMember.getId(),
System.currentTimeMillis(), this, settings.getCleanupInterval());
members.put(member, GossipState.UP);
GossipService.LOGGER.debug(member);
@ -180,6 +182,9 @@ public abstract class GossipManager extends Thread implements NotificationListen
gossipThreadExecutor.execute(activeGossipThread);
} catch (InstantiationException | IllegalAccessException | IllegalArgumentException
| InvocationTargetException | NoSuchMethodException | SecurityException e1) {
if (e1 instanceof BindException){
LOGGER.fatal("could not bind to "+ me.getUri() + " " + me.getAddress());
}
throw new RuntimeException(e1);
}
GossipService.LOGGER.debug("The GossipService is started.");

View File

@ -23,6 +23,8 @@ import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.SocketException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
@ -58,18 +60,18 @@ abstract public class PassiveGossipThread implements Runnable {
public PassiveGossipThread(GossipManager gossipManager) {
this.gossipManager = gossipManager;
try {
SocketAddress socketAddress = new InetSocketAddress(gossipManager.getMyself().getHost(),
gossipManager.getMyself().getPort());
SocketAddress socketAddress = new InetSocketAddress(gossipManager.getMyself().getUri().getHost(),
gossipManager.getMyself().getUri().getPort());
server = new DatagramSocket(socketAddress);
GossipService.LOGGER.debug("Gossip service successfully initialized on port "
+ gossipManager.getMyself().getPort());
GossipService.LOGGER.debug("I am " + gossipManager.getMyself());
LOGGER.debug("Gossip service successfully initialized on port "
+ gossipManager.getMyself().getUri().getPort());
LOGGER.debug("I am " + gossipManager.getMyself());
cluster = gossipManager.getMyself().getClusterName();
if (cluster == null){
throw new IllegalArgumentException("cluster was null");
}
} catch (SocketException ex) {
GossipService.LOGGER.warn(ex);
LOGGER.warn(ex);
throw new RuntimeException(ex);
}
keepRunning = new AtomicBoolean(true);
@ -103,14 +105,20 @@ abstract public class PassiveGossipThread implements Runnable {
ActiveGossipMessage activeGossipMessage = MAPPER.readValue(json_bytes,
ActiveGossipMessage.class);
for (int i = 0; i < activeGossipMessage.getMembers().size(); i++) {
URI u = null;
try {
u = new URI(activeGossipMessage.getMembers().get(i).getUri());
} catch (URISyntaxException e) {
LOGGER.debug("Gossip message with faulty URI", e);
continue;
}
RemoteGossipMember member = new RemoteGossipMember(
activeGossipMessage.getMembers().get(i).getCluster(),
activeGossipMessage.getMembers().get(i).getHost(),
activeGossipMessage.getMembers().get(i).getPort(),
u,
activeGossipMessage.getMembers().get(i).getId(),
activeGossipMessage.getMembers().get(i).getHeartbeat());
if (!(member.getClusterName().equals(cluster))){
GossipService.LOGGER.warn("Note a member of this cluster " + i);
LOGGER.warn("Note a member of this cluster " + i);
continue;
}
// This is the first member found, so this should be the member who is communicating
@ -122,16 +130,15 @@ abstract public class PassiveGossipThread implements Runnable {
}
mergeLists(gossipManager, senderMember, remoteGossipMembers);
} catch (RuntimeException ex) {
GossipService.LOGGER.error("Unable to process message", ex);
LOGGER.error("Unable to process message", ex);
}
} else {
GossipService.LOGGER
LOGGER
.error("The received message is not of the expected size, it has been dropped.");
}
} catch (IOException e) {
GossipService.LOGGER.error(e);
System.out.println(e);
LOGGER.error(e);
keepRunning.set(false);
}
}

View File

@ -25,8 +25,11 @@ import org.apache.gossip.LocalGossipMember;
import org.apache.gossip.RemoteGossipMember;
import org.apache.gossip.manager.GossipManager;
import org.apache.gossip.manager.PassiveGossipThread;
import org.apache.log4j.Logger;
public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread {
public static final Logger LOGGER = Logger.getLogger(OnlyProcessReceivedPassiveGossipThread.class);
public OnlyProcessReceivedPassiveGossipThread(GossipManager gossipManager) {
super(gossipManager);
@ -47,9 +50,9 @@ public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread
// if the person sending to us is in the dead list consider them up
for (LocalGossipMember i : gossipManager.getDeadList()) {
if (i.getId().equals(senderMember.getId())) {
System.out.println(gossipManager.getMyself() + " caught a live one!");
LOGGER.info(gossipManager.getMyself() + " contacted by dead member " + senderMember.getUri());
LocalGossipMember newLocalMember = new LocalGossipMember(senderMember.getClusterName(),
senderMember.getHost(), senderMember.getPort(), senderMember.getId(),
senderMember.getUri(), senderMember.getId(),
senderMember.getHeartbeat(), gossipManager, gossipManager.getSettings()
.getCleanupInterval());
gossipManager.revivieMember(newLocalMember);
@ -70,7 +73,7 @@ public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread
} else if (!gossipManager.getMemberList().contains(remoteMember)
&& !gossipManager.getDeadList().contains(remoteMember)) {
LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getClusterName(),
remoteMember.getHost(), remoteMember.getPort(), remoteMember.getId(),
remoteMember.getUri(), remoteMember.getId(),
remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings()
.getCleanupInterval());
gossipManager.createOrRevivieMember(newLocalMember);
@ -81,26 +84,26 @@ public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread
gossipManager.getDeadList().indexOf(remoteMember));
if (remoteMember.getHeartbeat() > localDeadMember.getHeartbeat()) {
LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getClusterName(),
remoteMember.getHost(), remoteMember.getPort(), remoteMember.getId(),
remoteMember.getUri(), remoteMember.getId(),
remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings()
.getCleanupInterval());
gossipManager.revivieMember(newLocalMember);
newLocalMember.startTimeoutTimer();
GossipService.LOGGER.debug("Removed remote member " + remoteMember.getAddress()
LOGGER.debug("Removed remote member " + remoteMember.getAddress()
+ " from dead list and added to local member list.");
} else {
GossipService.LOGGER.debug("me " + gossipManager.getMyself());
GossipService.LOGGER.debug("sender " + senderMember);
GossipService.LOGGER.debug("remote " + remoteList);
GossipService.LOGGER.debug("live " + gossipManager.getMemberList());
GossipService.LOGGER.debug("dead " + gossipManager.getDeadList());
LOGGER.debug("me " + gossipManager.getMyself());
LOGGER.debug("sender " + senderMember);
LOGGER.debug("remote " + remoteList);
LOGGER.debug("live " + gossipManager.getMemberList());
LOGGER.debug("dead " + gossipManager.getDeadList());
}
} else {
GossipService.LOGGER.debug("me " + gossipManager.getMyself());
GossipService.LOGGER.debug("sender " + senderMember);
GossipService.LOGGER.debug("remote " + remoteList);
GossipService.LOGGER.debug("live " + gossipManager.getMemberList());
GossipService.LOGGER.debug("dead " + gossipManager.getDeadList());
LOGGER.debug("me " + gossipManager.getMyself());
LOGGER.debug("sender " + senderMember);
LOGGER.debug("remote " + remoteList);
LOGGER.debug("live " + gossipManager.getMemberList());
LOGGER.debug("dead " + gossipManager.getDeadList());
// throw new IllegalArgumentException("wtf");
}
}

View File

@ -44,9 +44,8 @@ abstract public class SendMembersActiveGossipThread extends ActiveGossipThread {
GossipMember gm = new GossipMember();
gm.setCluster(member.getClusterName());
gm.setHeartbeat(member.getHeartbeat());
gm.setHost(member.getHost());
gm.setUri(member.getUri().toASCIIString());
gm.setId(member.getId());
gm.setPort(member.getPort());
return gm;
}
@ -62,7 +61,7 @@ abstract public class SendMembersActiveGossipThread extends ActiveGossipThread {
}
try (DatagramSocket socket = new DatagramSocket()) {
socket.setSoTimeout(gossipManager.getSettings().getGossipInterval());
InetAddress dest = InetAddress.getByName(member.getHost());
InetAddress dest = InetAddress.getByName(member.getUri().getHost());
ActiveGossipMessage message = new ActiveGossipMessage();
message.getMembers().add(convert(me));
for (LocalGossipMember other : memberList) {
@ -72,7 +71,7 @@ abstract public class SendMembersActiveGossipThread extends ActiveGossipThread {
int packet_length = json_bytes.length;
if (packet_length < GossipManager.MAX_PACKET_SIZE) {
byte[] buf = createBuffer(packet_length, json_bytes);
DatagramPacket datagramPacket = new DatagramPacket(buf, buf.length, dest, member.getPort());
DatagramPacket datagramPacket = new DatagramPacket(buf, buf.length, dest, member.getUri().getPort());
socket.send(datagramPacket);
} else {
GossipService.LOGGER.error("The length of the to be send message is too large ("

View File

@ -23,12 +23,13 @@ import org.apache.gossip.event.GossipListener;
import org.apache.gossip.manager.GossipManager;
import org.apache.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread;
import java.net.URI;
import java.util.List;
public class RandomGossipManager extends GossipManager {
public RandomGossipManager(String cluster, String address, int port, String id,
public RandomGossipManager(String cluster, URI uri, String id,
GossipSettings settings, List<GossipMember> gossipMembers, GossipListener listener) {
super(OnlyProcessReceivedPassiveGossipThread.class, RandomActiveGossipThread.class, cluster,
address, port, id, settings, gossipMembers, listener);
uri, id, settings, gossipMembers, listener);
}
}