diff --git a/src/main/java/com/google/code/gossip/manager/ActiveGossipThread.java b/src/main/java/com/google/code/gossip/manager/ActiveGossipThread.java index 1dd8837..cbc6051 100644 --- a/src/main/java/com/google/code/gossip/manager/ActiveGossipThread.java +++ b/src/main/java/com/google/code/gossip/manager/ActiveGossipThread.java @@ -14,7 +14,7 @@ import com.google.code.gossip.LocalGossipMember; */ abstract public class ActiveGossipThread implements Runnable { - private final GossipManager _gossipManager; + protected final GossipManager _gossipManager; private final AtomicBoolean _keepRunning; diff --git a/src/main/java/com/google/code/gossip/manager/GossipManager.java b/src/main/java/com/google/code/gossip/manager/GossipManager.java index 377188d..81afaa3 100644 --- a/src/main/java/com/google/code/gossip/manager/GossipManager.java +++ b/src/main/java/com/google/code/gossip/manager/GossipManager.java @@ -92,6 +92,10 @@ public abstract class GossipManager extends Thread implements NotificationListen return _settings; } + /** + * + * @return a read only list of members found in the UP state + */ public List getMemberList() { List up = new ArrayList<>(); for (Entry entry : members.entrySet()){ diff --git a/src/main/java/com/google/code/gossip/manager/impl/SendMembersActiveGossipThread.java b/src/main/java/com/google/code/gossip/manager/impl/SendMembersActiveGossipThread.java index d788f79..88159f2 100644 --- a/src/main/java/com/google/code/gossip/manager/impl/SendMembersActiveGossipThread.java +++ b/src/main/java/com/google/code/gossip/manager/impl/SendMembersActiveGossipThread.java @@ -26,51 +26,46 @@ abstract public class SendMembersActiveGossipThread extends ActiveGossipThread { protected void sendMembershipList(LocalGossipMember me, List memberList) { GossipService.LOGGER.debug("Send sendMembershipList() is called."); me.setHeartbeat(me.getHeartbeat() + 1); - synchronized (memberList) { - try { - LocalGossipMember member = selectPartner(memberList); - if (member != null) { - InetAddress dest = InetAddress.getByName(member.getHost()); - JSONArray jsonArray = new JSONArray(); - GossipService.LOGGER.debug("Sending memberlist to " + dest + ":" + member.getPort()); - jsonArray.put(me.toJSONObject()); - GossipService.LOGGER.debug(me); - for (LocalGossipMember other : memberList) { - jsonArray.put(other.toJSONObject()); - GossipService.LOGGER.debug(other); - } - byte[] json_bytes = jsonArray.toString().getBytes(); - int packet_length = json_bytes.length; - if (packet_length < GossipManager.MAX_PACKET_SIZE) { - // Convert the packet length to the byte representation of the int. - byte[] length_bytes = new byte[4]; - length_bytes[0] = (byte) (packet_length >> 24); - length_bytes[1] = (byte) ((packet_length << 8) >> 24); - length_bytes[2] = (byte) ((packet_length << 16) >> 24); - length_bytes[3] = (byte) ((packet_length << 24) >> 24); - - GossipService.LOGGER.debug("Sending message (" + packet_length + " bytes): " - + jsonArray.toString()); - - ByteBuffer byteBuffer = ByteBuffer.allocate(4 + json_bytes.length); - byteBuffer.put(length_bytes); - byteBuffer.put(json_bytes); - byte[] buf = byteBuffer.array(); - - DatagramSocket socket = new DatagramSocket(); - DatagramPacket datagramPacket = new DatagramPacket(buf, buf.length, dest, - member.getPort()); - socket.send(datagramPacket); - socket.close(); - } else { - GossipService.LOGGER.error("The length of the to be send message is too large (" - + packet_length + " > " + GossipManager.MAX_PACKET_SIZE + ")."); - } - } - - } catch (IOException e1) { - e1.printStackTrace(); + LocalGossipMember member = selectPartner(memberList); + if (member == null) { + return; + } + try (DatagramSocket socket = new DatagramSocket()){ + socket.setSoTimeout(_gossipManager.getSettings().getGossipInterval()); + InetAddress dest = InetAddress.getByName(member.getHost()); + JSONArray jsonArray = new JSONArray(); + jsonArray.put(me.toJSONObject()); + for (LocalGossipMember other : memberList) { + jsonArray.put(other.toJSONObject()); + GossipService.LOGGER.debug(other); } + byte[] json_bytes = jsonArray.toString().getBytes(); + int packet_length = json_bytes.length; + if (packet_length < GossipManager.MAX_PACKET_SIZE) { + byte[] buf = createBuffer(packet_length, json_bytes); + DatagramPacket datagramPacket = new DatagramPacket(buf, buf.length, dest, + member.getPort()); + socket.send(datagramPacket); + } else { + GossipService.LOGGER.error("The length of the to be send message is too large (" + + packet_length + " > " + GossipManager.MAX_PACKET_SIZE + ")."); + } + } catch (IOException e1) { + GossipService.LOGGER.warn(e1); } } + + private byte[] createBuffer(int packetLength, byte [] jsonBytes){ + byte[] lengthBytes = new byte[4]; + lengthBytes[0] = (byte) (packetLength >> 24); + lengthBytes[1] = (byte) ((packetLength << 8) >> 24); + lengthBytes[2] = (byte) ((packetLength << 16) >> 24); + lengthBytes[3] = (byte) ((packetLength << 24) >> 24); + ByteBuffer byteBuffer = ByteBuffer.allocate(4 + jsonBytes.length); + byteBuffer.put(lengthBytes); + byteBuffer.put(jsonBytes); + byte[] buf = byteBuffer.array(); + return buf; + } + } diff --git a/src/test/java/io/teknek/gossip/ShutdownDeadtimeTest.java b/src/test/java/io/teknek/gossip/ShutdownDeadtimeTest.java new file mode 100644 index 0000000..27e34ba --- /dev/null +++ b/src/test/java/io/teknek/gossip/ShutdownDeadtimeTest.java @@ -0,0 +1,103 @@ +package io.teknek.gossip; + +import io.teknek.tunit.TUnit; + +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + + +import org.apache.log4j.Logger; +import org.junit.Ignore; +import org.junit.Test; + +import com.google.code.gossip.GossipMember; +import com.google.code.gossip.GossipService; +import com.google.code.gossip.GossipSettings; +import com.google.code.gossip.RemoteGossipMember; +import com.google.code.gossip.event.GossipListener; +import com.google.code.gossip.event.GossipState; + +public class ShutdownDeadtimeTest { + + private static final Logger log = Logger.getLogger(ShutdownDeadtimeTest.class ); + //@Test + @Ignore + public void DeadNodesDoNotComeAliveAgain() throws InterruptedException, UnknownHostException { + GossipSettings settings = new GossipSettings(10,10000); + + log.info( "Adding seed nodes" ); + int seedNodes = 3; + List startupMembers = new ArrayList<>(); + for (int i = 1; i < seedNodes + 1; ++i) { + startupMembers.add(new RemoteGossipMember("127.0.0.1", 50000 + i, i + "")); + } + + log.info( "Adding clients" ); + final List clients = new ArrayList<>(); + final int clusterMembers = 5; + for (int i = 1; i < clusterMembers+1; ++i) { + final int j = i; + GossipService gossipService = new GossipService("127.0.0.1", 50000 + i, i + "", + startupMembers, settings, + new GossipListener(){ + @Override + public void gossipEvent(GossipMember member, GossipState state) { + System.out.println(System.currentTimeMillis() + " Member "+j + " reports "+ member+" "+ state); + } + }); + clients.add(gossipService); + gossipService.start(); + } + TUnit.assertThat(new Callable (){ + public Integer call() throws Exception { + int total = 0; + for (int i = 0; i < clusterMembers; ++i) { + total += clients.get(i).get_gossipManager().getMemberList().size(); + } + return total; + }}).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(20); + + // shutdown one client and verify that one client is lost. + Random r = new Random(); + int randomClientId = r.nextInt(clusterMembers); + log.info( "shutting down " + randomClientId ); + final int shutdownPort = clients.get(randomClientId).get_gossipManager().getMyself().getPort(); + final String shutdownId = clients.get(randomClientId).get_gossipManager().getMyself().getId(); + clients.get(randomClientId).shutdown(); + TUnit.assertThat(new Callable (){ + public Integer call() throws Exception { + int total = 0; + for (int i = 0; i < clusterMembers; ++i) { + total += clients.get(i).get_gossipManager().getMemberList().size(); + } + return total; + }}).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(16); + + // start client again + GossipService gossipService = new GossipService("127.0.0.1", shutdownPort, shutdownId + "", + startupMembers, settings, + new GossipListener(){ + @Override + public void gossipEvent(GossipMember member, GossipState state) { + //System.out.println("revived " + member+" "+ state); + } + }); + clients.add(gossipService); + gossipService.start(); + + // verify that the client is alive again for every node + TUnit.assertThat(new Callable (){ + public Integer call() throws Exception { + int total = 0; + for (int i = 0; i < clusterMembers; ++i) { + total += clients.get(i).get_gossipManager().getMemberList().size(); + } + return total; + }}).afterWaitingAtMost(70, TimeUnit.SECONDS).isEqualTo(20); + + } +}