diff --git a/src/main/java/org/apache/gossip/manager/GossipCore.java b/src/main/java/org/apache/gossip/manager/GossipCore.java new file mode 100644 index 0000000..ccfa951 --- /dev/null +++ b/src/main/java/org/apache/gossip/manager/GossipCore.java @@ -0,0 +1,266 @@ +package org.apache.gossip.manager; + +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.InetAddress; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.apache.gossip.GossipMember; +import org.apache.gossip.LocalGossipMember; +import org.apache.gossip.RemoteGossipMember; +import org.apache.gossip.model.ActiveGossipMessage; +import org.apache.gossip.model.Base; +import org.apache.gossip.model.Response; +import org.apache.gossip.udp.Trackable; +import org.apache.gossip.udp.UdpActiveGossipMessage; +import org.apache.gossip.udp.UdpActiveGossipOk; +import org.apache.gossip.udp.UdpNotAMemberFault; +import org.apache.log4j.Logger; +import org.codehaus.jackson.map.ObjectMapper; + +public class GossipCore { + + public static final Logger LOGGER = Logger.getLogger(GossipCore.class); + private static final ObjectMapper MAPPER = new ObjectMapper(); + private final GossipManager gossipManager; + + private ConcurrentHashMap requests; + + private ExecutorService service; + + public GossipCore(GossipManager manager){ + this.gossipManager = manager; + requests = new ConcurrentHashMap<>(); + service = Executors.newFixedThreadPool(500); + } + + public void shutdown(){ + service.shutdown(); + try { + service.awaitTermination(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOGGER.warn(e); + } + } + + public void recieve(Base base){ + if (base instanceof Response){ + if (base instanceof Trackable){ + Trackable t = (Trackable) base; + requests.put(t.getUuid() + "/" + t.getUriFrom(), (Base) t); + } + } + if (base instanceof ActiveGossipMessage){ + List remoteGossipMembers = new ArrayList<>(); + RemoteGossipMember senderMember = null; + UdpActiveGossipMessage activeGossipMessage = (UdpActiveGossipMessage) base; + for (int i = 0; i < activeGossipMessage.getMembers().size(); i++) { + URI u = null; + try { + u = new URI(activeGossipMessage.getMembers().get(i).getUri()); + } catch (URISyntaxException e) { + LOGGER.debug("Gossip message with faulty URI", e); + continue; + } + RemoteGossipMember member = new RemoteGossipMember( + activeGossipMessage.getMembers().get(i).getCluster(), + u, + activeGossipMessage.getMembers().get(i).getId(), + activeGossipMessage.getMembers().get(i).getHeartbeat()); + if (i == 0) { + senderMember = member; + } + if (!(member.getClusterName().equals(gossipManager.getMyself().getClusterName()))){ + UdpNotAMemberFault f = new UdpNotAMemberFault(); + f.setException("Not a member of this cluster " + i); + f.setUriFrom(activeGossipMessage.getUriFrom()); + f.setUuid(activeGossipMessage.getUuid()); + LOGGER.warn(f); + sendOneWay(f, member.getUri()); + continue; + } + remoteGossipMembers.add(member); + } + UdpActiveGossipOk o = new UdpActiveGossipOk(); + o.setUriFrom(activeGossipMessage.getUriFrom()); + o.setUuid(activeGossipMessage.getUuid()); + sendOneWay(o, senderMember.getUri()); + mergeLists(gossipManager, senderMember, remoteGossipMembers); + } + } + + private void sendInternal(Base message, URI uri){ + byte[] json_bytes; + try { + json_bytes = MAPPER.writeValueAsString(message).getBytes(); + } catch (IOException e) { + throw new RuntimeException(e); + } + int packet_length = json_bytes.length; + if (packet_length < GossipManager.MAX_PACKET_SIZE) { + byte[] buf = UdpUtil.createBuffer(packet_length, json_bytes); + try (DatagramSocket socket = new DatagramSocket()) { + InetAddress dest = InetAddress.getByName(uri.getHost()); + DatagramPacket datagramPacket = new DatagramPacket(buf, buf.length, dest, uri.getPort()); + socket.send(datagramPacket); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + + public Response send(Base message, URI uri){ + final Trackable t; + if (message instanceof Trackable){ + t = (Trackable) message; + } else { + t = null; + } + sendInternal(message, uri); + if (t == null){ + return null; + } + Future response = service.submit( new Callable(){ + @Override + public Response call() throws Exception { + while(true){ + Base b = requests.remove(t.getUuid() + "/" + t.getUriFrom()); + if (b != null){ + return (Response) b; + } + try { + Thread.sleep(0, 1000); + } catch (InterruptedException e) { + + } + } + } + }); + + try { + return response.get(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } catch (ExecutionException e) { + System.err.println(e); + return null; + } catch (TimeoutException e) { + System.err.println(e); + return null; + } finally { + if (t != null){ + requests.remove(t.getUuid() + "/" + t.getUriFrom()); + } + } + + } + + public void sendOneWay(Base message, URI u){ + byte[] json_bytes; + try { + json_bytes = MAPPER.writeValueAsString(message).getBytes(); + } catch (IOException e) { + throw new RuntimeException(e); + } + int packet_length = json_bytes.length; + if (packet_length < GossipManager.MAX_PACKET_SIZE) { + byte[] buf = UdpUtil.createBuffer(packet_length, json_bytes); + try (DatagramSocket socket = new DatagramSocket()) { + InetAddress dest = InetAddress.getByName(u.getHost()); + DatagramPacket datagramPacket = new DatagramPacket(buf, buf.length, dest, u.getPort()); + socket.send(datagramPacket); + } catch (IOException ex) { } + } + } + + + /** + * Merge remote list (received from peer), and our local member list. Simply, we must update the + * heartbeats that the remote list has with our list. Also, some additional logic is needed to + * make sure we have not timed out a member and then immediately received a list with that member. + * + * @param gossipManager + * @param senderMember + * @param remoteList + * + * COPIED FROM PASSIVE GOSSIP THREAD + */ + protected void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember, + List remoteList) { + + // if the person sending to us is in the dead list consider them up + for (LocalGossipMember i : gossipManager.getDeadList()) { + if (i.getId().equals(senderMember.getId())) { + LOGGER.info(gossipManager.getMyself() + " contacted by dead member " + senderMember.getUri()); + LocalGossipMember newLocalMember = new LocalGossipMember(senderMember.getClusterName(), + senderMember.getUri(), senderMember.getId(), + senderMember.getHeartbeat(), gossipManager, gossipManager.getSettings() + .getCleanupInterval()); + gossipManager.revivieMember(newLocalMember); + newLocalMember.startTimeoutTimer(); + } + } + for (GossipMember remoteMember : remoteList) { + if (remoteMember.getId().equals(gossipManager.getMyself().getId())) { + continue; + } + if (gossipManager.getMemberList().contains(remoteMember)) { + LocalGossipMember localMember = gossipManager.getMemberList().get( + gossipManager.getMemberList().indexOf(remoteMember)); + if (remoteMember.getHeartbeat() > localMember.getHeartbeat()) { + localMember.setHeartbeat(remoteMember.getHeartbeat()); + localMember.resetTimeoutTimer(); + } + } else if (!gossipManager.getMemberList().contains(remoteMember) + && !gossipManager.getDeadList().contains(remoteMember)) { + LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getClusterName(), + remoteMember.getUri(), remoteMember.getId(), + remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings() + .getCleanupInterval()); + gossipManager.createOrRevivieMember(newLocalMember); + newLocalMember.startTimeoutTimer(); + } else { + if (gossipManager.getDeadList().contains(remoteMember)) { + LocalGossipMember localDeadMember = gossipManager.getDeadList().get( + gossipManager.getDeadList().indexOf(remoteMember)); + if (remoteMember.getHeartbeat() > localDeadMember.getHeartbeat()) { + LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getClusterName(), + remoteMember.getUri(), remoteMember.getId(), + remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings() + .getCleanupInterval()); + gossipManager.revivieMember(newLocalMember); + newLocalMember.startTimeoutTimer(); + LOGGER.debug("Removed remote member " + remoteMember.getAddress() + + " from dead list and added to local member list."); + } else { + LOGGER.debug("me " + gossipManager.getMyself()); + LOGGER.debug("sender " + senderMember); + LOGGER.debug("remote " + remoteList); + LOGGER.debug("live " + gossipManager.getMemberList()); + LOGGER.debug("dead " + gossipManager.getDeadList()); + } + } else { + LOGGER.debug("me " + gossipManager.getMyself()); + LOGGER.debug("sender " + senderMember); + LOGGER.debug("remote " + remoteList); + LOGGER.debug("live " + gossipManager.getMemberList()); + LOGGER.debug("dead " + gossipManager.getDeadList()); + // throw new IllegalArgumentException("wtf"); + } + } + } + } + + +} diff --git a/src/main/java/org/apache/gossip/manager/GossipManager.java b/src/main/java/org/apache/gossip/manager/GossipManager.java index 363a4a9..7a10c91 100644 --- a/src/main/java/org/apache/gossip/manager/GossipManager.java +++ b/src/main/java/org/apache/gossip/manager/GossipManager.java @@ -17,8 +17,6 @@ */ package org.apache.gossip.manager; -import java.lang.reflect.InvocationTargetException; -import java.net.BindException; import java.net.URI; import java.util.ArrayList; import java.util.Collections; @@ -41,6 +39,8 @@ import org.apache.gossip.GossipSettings; import org.apache.gossip.LocalGossipMember; import org.apache.gossip.event.GossipListener; import org.apache.gossip.event.GossipState; +import org.apache.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread; +import org.apache.gossip.manager.random.RandomActiveGossipThread; public abstract class GossipManager extends Thread implements NotificationListener { @@ -56,10 +56,6 @@ public abstract class GossipManager extends Thread implements NotificationListen private final AtomicBoolean gossipServiceRunning; - private final Class passiveGossipThreadClass; - - private final Class activeGossipThreadClass; - private final GossipListener listener; private ActiveGossipThread activeGossipThread; @@ -67,14 +63,15 @@ public abstract class GossipManager extends Thread implements NotificationListen private PassiveGossipThread passiveGossipThread; private ExecutorService gossipThreadExecutor; + + private GossipCore gossipCore; - public GossipManager(Class passiveGossipThreadClass, - Class activeGossipThreadClass, String cluster, + public GossipManager(String cluster, URI uri, String id, GossipSettings settings, List gossipMembers, GossipListener listener) { - this.passiveGossipThreadClass = passiveGossipThreadClass; - this.activeGossipThreadClass = activeGossipThreadClass; + this.settings = settings; + this.gossipCore = new GossipCore(this); me = new LocalGossipMember(cluster, uri, id, System.currentTimeMillis(), this, settings.getCleanupInterval()); members = new ConcurrentSkipListMap<>(); @@ -173,20 +170,10 @@ public abstract class GossipManager extends Thread implements NotificationListen member.startTimeoutTimer(); } } - try { - passiveGossipThread = passiveGossipThreadClass.getConstructor(GossipManager.class) - .newInstance(this); - gossipThreadExecutor.execute(passiveGossipThread); - activeGossipThread = activeGossipThreadClass.getConstructor(GossipManager.class) - .newInstance(this); - gossipThreadExecutor.execute(activeGossipThread); - } catch (InstantiationException | IllegalAccessException | IllegalArgumentException - | InvocationTargetException | NoSuchMethodException | SecurityException e1) { - if (e1 instanceof BindException){ - LOGGER.fatal("could not bind to "+ me.getUri() + " " + me.getAddress()); - } - throw new RuntimeException(e1); - } + passiveGossipThread = new OnlyProcessReceivedPassiveGossipThread(this, gossipCore); + gossipThreadExecutor.execute(passiveGossipThread); + activeGossipThread = new RandomActiveGossipThread(this, this.gossipCore); + gossipThreadExecutor.execute(activeGossipThread); GossipService.LOGGER.debug("The GossipService is started."); while (gossipServiceRunning.get()) { try { @@ -204,6 +191,7 @@ public abstract class GossipManager extends Thread implements NotificationListen public void shutdown() { gossipServiceRunning.set(false); gossipThreadExecutor.shutdown(); + gossipCore.shutdown(); if (passiveGossipThread != null) { passiveGossipThread.shutdown(); } diff --git a/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java b/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java index 0b12ee4..6d440de 100644 --- a/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java +++ b/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java @@ -23,15 +23,12 @@ import java.net.DatagramSocket; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.net.SocketException; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.gossip.GossipMember; import org.apache.gossip.GossipService; -import org.apache.gossip.model.ActiveGossipMessage; +import org.apache.gossip.model.Base; import org.apache.log4j.Logger; import org.codehaus.jackson.map.ObjectMapper; import org.apache.gossip.RemoteGossipMember; @@ -49,16 +46,16 @@ abstract public class PassiveGossipThread implements Runnable { /** The socket used for the passive thread of the gossip service. */ private final DatagramSocket server; - private final GossipManager gossipManager; - private final AtomicBoolean keepRunning; private final String cluster; private final ObjectMapper MAPPER = new ObjectMapper(); + + private final GossipCore gossipCore; - public PassiveGossipThread(GossipManager gossipManager) { - this.gossipManager = gossipManager; + public PassiveGossipThread(GossipManager gossipManager, GossipCore gossipCore) { + this.gossipCore = gossipCore; try { SocketAddress socketAddress = new InetSocketAddress(gossipManager.getMyself().getUri().getHost(), gossipManager.getMyself().getUri().getPort()); @@ -84,57 +81,21 @@ abstract public class PassiveGossipThread implements Runnable { byte[] buf = new byte[server.getReceiveBufferSize()]; DatagramPacket p = new DatagramPacket(buf, buf.length); server.receive(p); - int packet_length = 0; - for (int i = 0; i < 4; i++) { - int shift = (4 - 1 - i) * 8; - packet_length += (buf[i] & 0x000000FF) << shift; - } + int packet_length = UdpUtil.readPacketLengthFromBuffer(buf); if (packet_length <= GossipManager.MAX_PACKET_SIZE) { byte[] json_bytes = new byte[packet_length]; for (int i = 0; i < packet_length; i++) { json_bytes[i] = buf[i + 4]; } - if (GossipService.LOGGER.isDebugEnabled()){ - String receivedMessage = new String(json_bytes); - GossipService.LOGGER.debug("Received message (" + packet_length + " bytes): " - + receivedMessage); - } + debug(packet_length, json_bytes); try { - List remoteGossipMembers = new ArrayList<>(); - RemoteGossipMember senderMember = null; - ActiveGossipMessage activeGossipMessage = MAPPER.readValue(json_bytes, - ActiveGossipMessage.class); - for (int i = 0; i < activeGossipMessage.getMembers().size(); i++) { - URI u = null; - try { - u = new URI(activeGossipMessage.getMembers().get(i).getUri()); - } catch (URISyntaxException e) { - LOGGER.debug("Gossip message with faulty URI", e); - continue; - } - RemoteGossipMember member = new RemoteGossipMember( - activeGossipMessage.getMembers().get(i).getCluster(), - u, - activeGossipMessage.getMembers().get(i).getId(), - activeGossipMessage.getMembers().get(i).getHeartbeat()); - if (!(member.getClusterName().equals(cluster))){ - LOGGER.warn("Note a member of this cluster " + i); - continue; - } - // This is the first member found, so this should be the member who is communicating - // with me. - if (i == 0) { - senderMember = member; - } - remoteGossipMembers.add(member); - } - mergeLists(gossipManager, senderMember, remoteGossipMembers); - } catch (RuntimeException ex) { + Base activeGossipMessage = MAPPER.readValue(json_bytes, Base.class); + gossipCore.recieve(activeGossipMessage); + } catch (RuntimeException ex) {//TODO trap json exception LOGGER.error("Unable to process message", ex); } } else { - LOGGER - .error("The received message is not of the expected size, it has been dropped."); + LOGGER.error("The received message is not of the expected size, it has been dropped."); } } catch (IOException e) { @@ -145,6 +106,14 @@ abstract public class PassiveGossipThread implements Runnable { shutdown(); } + private void debug(int packetLength, byte[] jsonBytes) { + if (GossipService.LOGGER.isDebugEnabled()){ + String receivedMessage = new String(jsonBytes); + GossipService.LOGGER.debug("Received message (" + packetLength + " bytes): " + + receivedMessage); + } + } + public void shutdown() { try { server.close(); diff --git a/src/main/java/org/apache/gossip/manager/Transport.java b/src/main/java/org/apache/gossip/manager/Transport.java new file mode 100644 index 0000000..72b90df --- /dev/null +++ b/src/main/java/org/apache/gossip/manager/Transport.java @@ -0,0 +1,5 @@ +package org.apache.gossip.manager; + +public class Transport { + +} diff --git a/src/main/java/org/apache/gossip/manager/UdpUtil.java b/src/main/java/org/apache/gossip/manager/UdpUtil.java new file mode 100644 index 0000000..a6a0174 --- /dev/null +++ b/src/main/java/org/apache/gossip/manager/UdpUtil.java @@ -0,0 +1,28 @@ +package org.apache.gossip.manager; + +import java.nio.ByteBuffer; + +public class UdpUtil { + + public static int readPacketLengthFromBuffer(byte [] buffer){ + int packetLength = 0; + for (int i = 0; i < 4; i++) { + int shift = (4 - 1 - i) * 8; + packetLength += (buffer[i] & 0x000000FF) << shift; + } + return packetLength; + } + + public static byte[] createBuffer(int packetLength, byte[] jsonBytes) { + byte[] lengthBytes = new byte[4]; + lengthBytes[0] = (byte) (packetLength >> 24); + lengthBytes[1] = (byte) ((packetLength << 8) >> 24); + lengthBytes[2] = (byte) ((packetLength << 16) >> 24); + lengthBytes[3] = (byte) ((packetLength << 24) >> 24); + ByteBuffer byteBuffer = ByteBuffer.allocate(4 + jsonBytes.length); + byteBuffer.put(lengthBytes); + byteBuffer.put(jsonBytes); + byte[] buf = byteBuffer.array(); + return buf; + } +} diff --git a/src/main/java/org/apache/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java b/src/main/java/org/apache/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java index bde497f..79b04ce 100644 --- a/src/main/java/org/apache/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java +++ b/src/main/java/org/apache/gossip/manager/impl/OnlyProcessReceivedPassiveGossipThread.java @@ -22,6 +22,7 @@ import java.util.List; import org.apache.gossip.GossipMember; import org.apache.gossip.LocalGossipMember; import org.apache.gossip.RemoteGossipMember; +import org.apache.gossip.manager.GossipCore; import org.apache.gossip.manager.GossipManager; import org.apache.gossip.manager.PassiveGossipThread; import org.apache.log4j.Logger; @@ -30,8 +31,8 @@ public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread public static final Logger LOGGER = Logger.getLogger(OnlyProcessReceivedPassiveGossipThread.class); - public OnlyProcessReceivedPassiveGossipThread(GossipManager gossipManager) { - super(gossipManager); + public OnlyProcessReceivedPassiveGossipThread(GossipManager gossipManager, GossipCore gossipCore) { + super(gossipManager, gossipCore); } /** @@ -110,21 +111,3 @@ public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread } } - -/** - * old comment section: // If a member is restarted the heartbeat will restart from 1, so we should - * check // that here. // So a member can become from the dead when it is either larger than a - * previous // heartbeat (due to network failure) // or when the heartbeat is 1 (after a restart of - * the service). // TODO: What if the first message of a gossip service is sent to a dead node? The - * // second member will receive a heartbeat of two. // TODO: The above does happen. Maybe a special - * message for a revived member? // TODO: Or maybe when a member is declared dead for more than // - * _settings.getCleanupInterval() ms, reset the heartbeat to 0. // It will then accept a revived - * member. // The above is now handle by checking whether the heartbeat differs // - * _settings.getCleanupInterval(), it must be restarted. - */ - -/* - * // The remote member is back from the dead. // Remove it from the dead list. // - * gossipManager.getDeadList().remove(localDeadMember); // Add it as a new member and add it to the - * member list. - */ \ No newline at end of file diff --git a/src/main/java/org/apache/gossip/manager/random/RandomActiveGossipThread.java b/src/main/java/org/apache/gossip/manager/random/RandomActiveGossipThread.java index 53885b6..da8ed22 100644 --- a/src/main/java/org/apache/gossip/manager/random/RandomActiveGossipThread.java +++ b/src/main/java/org/apache/gossip/manager/random/RandomActiveGossipThread.java @@ -18,31 +18,38 @@ package org.apache.gossip.manager.random; import java.io.IOException; -import java.net.DatagramPacket; import java.net.DatagramSocket; import java.net.InetAddress; import java.nio.ByteBuffer; import java.util.List; import java.util.Random; +import java.util.UUID; import org.apache.gossip.GossipService; import org.apache.gossip.LocalGossipMember; import org.apache.gossip.manager.ActiveGossipThread; +import org.apache.gossip.manager.GossipCore; import org.apache.gossip.manager.GossipManager; -import org.apache.gossip.model.ActiveGossipMessage; +import org.apache.gossip.model.ActiveGossipOk; import org.apache.gossip.model.GossipMember; +import org.apache.gossip.model.Response; +import org.apache.gossip.udp.UdpActiveGossipMessage; +import org.apache.log4j.Logger; import org.codehaus.jackson.map.ObjectMapper; public class RandomActiveGossipThread extends ActiveGossipThread { - protected ObjectMapper om = new ObjectMapper(); + public static final Logger LOGGER = Logger.getLogger(RandomActiveGossipThread.class); + protected ObjectMapper MAPPER = new ObjectMapper(); /** The Random used for choosing a member to gossip with. */ private final Random random; + private final GossipCore gossipCore; - public RandomActiveGossipThread(GossipManager gossipManager) { + public RandomActiveGossipThread(GossipManager gossipManager, GossipCore gossipCore) { super(gossipManager); random = new Random(); + this.gossipCore = gossipCore; } /** @@ -71,18 +78,22 @@ public class RandomActiveGossipThread extends ActiveGossipThread { } try (DatagramSocket socket = new DatagramSocket()) { socket.setSoTimeout(gossipManager.getSettings().getGossipInterval()); - InetAddress dest = InetAddress.getByName(member.getUri().getHost()); - ActiveGossipMessage message = new ActiveGossipMessage(); + UdpActiveGossipMessage message = new UdpActiveGossipMessage(); + message.setUriFrom(gossipManager.getMyself().getUri().toASCIIString()); + message.setUuid(UUID.randomUUID().toString()); message.getMembers().add(convert(me)); for (LocalGossipMember other : memberList) { message.getMembers().add(convert(other)); } - byte[] json_bytes = om.writeValueAsString(message).getBytes(); + byte[] json_bytes = MAPPER.writeValueAsString(message).getBytes(); int packet_length = json_bytes.length; if (packet_length < GossipManager.MAX_PACKET_SIZE) { - byte[] buf = createBuffer(packet_length, json_bytes); - DatagramPacket datagramPacket = new DatagramPacket(buf, buf.length, dest, member.getUri().getPort()); - socket.send(datagramPacket); + Response r = gossipCore.send(message, member.getUri()); + if (r instanceof ActiveGossipOk){ + //maybe count metrics here + } else { + LOGGER.warn("Message "+ message + " generated response "+ r); + } } else { GossipService.LOGGER.error("The length of the to be send message is too large (" + packet_length + " > " + GossipManager.MAX_PACKET_SIZE + ")."); @@ -91,19 +102,6 @@ public class RandomActiveGossipThread extends ActiveGossipThread { GossipService.LOGGER.warn(e1); } } - - private byte[] createBuffer(int packetLength, byte[] jsonBytes) { - byte[] lengthBytes = new byte[4]; - lengthBytes[0] = (byte) (packetLength >> 24); - lengthBytes[1] = (byte) ((packetLength << 8) >> 24); - lengthBytes[2] = (byte) ((packetLength << 16) >> 24); - lengthBytes[3] = (byte) ((packetLength << 24) >> 24); - ByteBuffer byteBuffer = ByteBuffer.allocate(4 + jsonBytes.length); - byteBuffer.put(lengthBytes); - byteBuffer.put(jsonBytes); - byte[] buf = byteBuffer.array(); - return buf; - } private GossipMember convert(LocalGossipMember member){ GossipMember gm = new GossipMember(); diff --git a/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java b/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java index fa2b1c5..e7e19da 100644 --- a/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java +++ b/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java @@ -85,18 +85,15 @@ public class RandomGossipManager extends GossipManager { checkArgument(cluster != null, "You must specify a cluster name"); checkArgument(settings != null, "You must specify gossip settings"); checkArgument(uri != null, "You must specify a uri"); - if (this.gossipMembers == null) { this.gossipMembers = new ArrayList<>(); } - return new RandomGossipManager(cluster, uri, id, settings, gossipMembers, listener); } } private RandomGossipManager(String cluster, URI uri, String id, GossipSettings settings, List gossipMembers, GossipListener listener) { - super(OnlyProcessReceivedPassiveGossipThread.class, RandomActiveGossipThread.class, cluster, - uri, id, settings, gossipMembers, listener); + super(cluster, uri, id, settings, gossipMembers, listener); } } diff --git a/src/main/java/org/apache/gossip/model/ActiveGossipMessage.java b/src/main/java/org/apache/gossip/model/ActiveGossipMessage.java index ac940d8..1927371 100644 --- a/src/main/java/org/apache/gossip/model/ActiveGossipMessage.java +++ b/src/main/java/org/apache/gossip/model/ActiveGossipMessage.java @@ -3,7 +3,7 @@ package org.apache.gossip.model; import java.util.ArrayList; import java.util.List; -public class ActiveGossipMessage { +public class ActiveGossipMessage extends Base { private List members = new ArrayList<>(); diff --git a/src/main/java/org/apache/gossip/model/ActiveGossipOk.java b/src/main/java/org/apache/gossip/model/ActiveGossipOk.java new file mode 100644 index 0000000..256ccd6 --- /dev/null +++ b/src/main/java/org/apache/gossip/model/ActiveGossipOk.java @@ -0,0 +1,5 @@ +package org.apache.gossip.model; + +public class ActiveGossipOk extends Response { + +} diff --git a/src/main/java/org/apache/gossip/model/Base.java b/src/main/java/org/apache/gossip/model/Base.java new file mode 100644 index 0000000..ebb3215 --- /dev/null +++ b/src/main/java/org/apache/gossip/model/Base.java @@ -0,0 +1,24 @@ +package org.apache.gossip.model; + +import org.apache.gossip.udp.UdpActiveGossipMessage; +import org.apache.gossip.udp.UdpActiveGossipOk; +import org.apache.gossip.udp.UdpNotAMemberFault; +import org.codehaus.jackson.annotate.JsonSubTypes; +import org.codehaus.jackson.annotate.JsonSubTypes.Type; +import org.codehaus.jackson.annotate.JsonTypeInfo; + +@JsonTypeInfo( + use = JsonTypeInfo.Id.CLASS, + include = JsonTypeInfo.As.PROPERTY, + property = "type") +@JsonSubTypes({ + @Type(value = ActiveGossipMessage.class, name = "ActiveGossipMessage"), + @Type(value = Fault.class, name = "Fault"), + @Type(value = ActiveGossipOk.class, name = "ActiveGossipOk"), + @Type(value = UdpActiveGossipOk.class, name = "UdpActiveGossipOk"), + @Type(value = UdpActiveGossipMessage.class, name = "UdpActiveGossipMessage"), + @Type(value = UdpNotAMemberFault.class, name = "UdpNotAMemberFault") + }) +public class Base { + +} diff --git a/src/main/java/org/apache/gossip/model/Fault.java b/src/main/java/org/apache/gossip/model/Fault.java new file mode 100644 index 0000000..ea00ea0 --- /dev/null +++ b/src/main/java/org/apache/gossip/model/Fault.java @@ -0,0 +1,23 @@ +package org.apache.gossip.model; + +public abstract class Fault extends Response { + + private String exception; + + public Fault(){} + + public String getException() { + return exception; + } + + public void setException(String exception) { + this.exception = exception; + } + + @Override + public String toString() { + return "Fault [exception=" + exception + "]"; + } + +} + diff --git a/src/main/java/org/apache/gossip/model/Message.java b/src/main/java/org/apache/gossip/model/Message.java new file mode 100644 index 0000000..5eb59fa --- /dev/null +++ b/src/main/java/org/apache/gossip/model/Message.java @@ -0,0 +1,5 @@ +package org.apache.gossip.model; + +public class Message extends Base{ + +} diff --git a/src/main/java/org/apache/gossip/model/NotAMemberFault.java b/src/main/java/org/apache/gossip/model/NotAMemberFault.java new file mode 100644 index 0000000..e7badc1 --- /dev/null +++ b/src/main/java/org/apache/gossip/model/NotAMemberFault.java @@ -0,0 +1,12 @@ +package org.apache.gossip.model; + +public class NotAMemberFault extends Fault { + + public NotAMemberFault(){ + + } + + public NotAMemberFault(String message){ + this.setException(message); + } +} diff --git a/src/main/java/org/apache/gossip/model/Response.java b/src/main/java/org/apache/gossip/model/Response.java new file mode 100644 index 0000000..ab46b48 --- /dev/null +++ b/src/main/java/org/apache/gossip/model/Response.java @@ -0,0 +1,5 @@ +package org.apache.gossip.model; + +public abstract class Response extends Base { + +} diff --git a/src/main/java/org/apache/gossip/udp/Trackable.java b/src/main/java/org/apache/gossip/udp/Trackable.java new file mode 100644 index 0000000..e76e2c3 --- /dev/null +++ b/src/main/java/org/apache/gossip/udp/Trackable.java @@ -0,0 +1,13 @@ +package org.apache.gossip.udp; + +public interface Trackable { + + public String getUriFrom(); + + public void setUriFrom(String uriFrom); + + public String getUuid(); + + public void setUuid(String uuid); + +} diff --git a/src/main/java/org/apache/gossip/udp/UdpActiveGossipMessage.java b/src/main/java/org/apache/gossip/udp/UdpActiveGossipMessage.java new file mode 100644 index 0000000..1532294 --- /dev/null +++ b/src/main/java/org/apache/gossip/udp/UdpActiveGossipMessage.java @@ -0,0 +1,31 @@ +package org.apache.gossip.udp; + +import org.apache.gossip.model.ActiveGossipMessage; + +public class UdpActiveGossipMessage extends ActiveGossipMessage implements Trackable { + + private String uriFrom; + private String uuid; + + public String getUriFrom() { + return uriFrom; + } + + public void setUriFrom(String uriFrom) { + this.uriFrom = uriFrom; + } + + public String getUuid() { + return uuid; + } + + public void setUuid(String uuid) { + this.uuid = uuid; + } + + @Override + public String toString() { + return "UdpActiveGossipMessage [uriFrom=" + uriFrom + ", uuid=" + uuid + "]"; + } + +} diff --git a/src/main/java/org/apache/gossip/udp/UdpActiveGossipOk.java b/src/main/java/org/apache/gossip/udp/UdpActiveGossipOk.java new file mode 100644 index 0000000..119bc50 --- /dev/null +++ b/src/main/java/org/apache/gossip/udp/UdpActiveGossipOk.java @@ -0,0 +1,27 @@ +package org.apache.gossip.udp; + +import org.apache.gossip.model.ActiveGossipOk; + +public class UdpActiveGossipOk extends ActiveGossipOk implements Trackable { + + + private String uriFrom; + private String uuid; + + public String getUriFrom() { + return uriFrom; + } + + public void setUriFrom(String uriFrom) { + this.uriFrom = uriFrom; + } + + public String getUuid() { + return uuid; + } + + public void setUuid(String uuid) { + this.uuid = uuid; + } + +} diff --git a/src/main/java/org/apache/gossip/udp/UdpNotAMemberFault.java b/src/main/java/org/apache/gossip/udp/UdpNotAMemberFault.java new file mode 100644 index 0000000..7b4d5ba --- /dev/null +++ b/src/main/java/org/apache/gossip/udp/UdpNotAMemberFault.java @@ -0,0 +1,29 @@ +package org.apache.gossip.udp; + +import org.apache.gossip.model.NotAMemberFault; + +public class UdpNotAMemberFault extends NotAMemberFault implements Trackable{ + + public UdpNotAMemberFault(){ + + } + private String uriFrom; + private String uuid; + + public String getUriFrom() { + return uriFrom; + } + + public void setUriFrom(String uriFrom) { + this.uriFrom = uriFrom; + } + + public String getUuid() { + return uuid; + } + + public void setUuid(String uuid) { + this.uuid = uuid; + } + +}