Better socket cleanup for ActiveThread
This commit is contained in:
@ -14,7 +14,7 @@ import com.google.code.gossip.LocalGossipMember;
|
|||||||
*/
|
*/
|
||||||
abstract public class ActiveGossipThread implements Runnable {
|
abstract public class ActiveGossipThread implements Runnable {
|
||||||
|
|
||||||
private final GossipManager _gossipManager;
|
protected final GossipManager _gossipManager;
|
||||||
|
|
||||||
private final AtomicBoolean _keepRunning;
|
private final AtomicBoolean _keepRunning;
|
||||||
|
|
||||||
|
@ -92,6 +92,10 @@ public abstract class GossipManager extends Thread implements NotificationListen
|
|||||||
return _settings;
|
return _settings;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* @return a read only list of members found in the UP state
|
||||||
|
*/
|
||||||
public List<LocalGossipMember> getMemberList() {
|
public List<LocalGossipMember> getMemberList() {
|
||||||
List<LocalGossipMember> up = new ArrayList<>();
|
List<LocalGossipMember> up = new ArrayList<>();
|
||||||
for (Entry<LocalGossipMember, GossipState> entry : members.entrySet()){
|
for (Entry<LocalGossipMember, GossipState> entry : members.entrySet()){
|
||||||
|
@ -26,15 +26,15 @@ abstract public class SendMembersActiveGossipThread extends ActiveGossipThread {
|
|||||||
protected void sendMembershipList(LocalGossipMember me, List<LocalGossipMember> memberList) {
|
protected void sendMembershipList(LocalGossipMember me, List<LocalGossipMember> memberList) {
|
||||||
GossipService.LOGGER.debug("Send sendMembershipList() is called.");
|
GossipService.LOGGER.debug("Send sendMembershipList() is called.");
|
||||||
me.setHeartbeat(me.getHeartbeat() + 1);
|
me.setHeartbeat(me.getHeartbeat() + 1);
|
||||||
synchronized (memberList) {
|
|
||||||
try {
|
|
||||||
LocalGossipMember member = selectPartner(memberList);
|
LocalGossipMember member = selectPartner(memberList);
|
||||||
if (member != null) {
|
if (member == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
try (DatagramSocket socket = new DatagramSocket()){
|
||||||
|
socket.setSoTimeout(_gossipManager.getSettings().getGossipInterval());
|
||||||
InetAddress dest = InetAddress.getByName(member.getHost());
|
InetAddress dest = InetAddress.getByName(member.getHost());
|
||||||
JSONArray jsonArray = new JSONArray();
|
JSONArray jsonArray = new JSONArray();
|
||||||
GossipService.LOGGER.debug("Sending memberlist to " + dest + ":" + member.getPort());
|
|
||||||
jsonArray.put(me.toJSONObject());
|
jsonArray.put(me.toJSONObject());
|
||||||
GossipService.LOGGER.debug(me);
|
|
||||||
for (LocalGossipMember other : memberList) {
|
for (LocalGossipMember other : memberList) {
|
||||||
jsonArray.put(other.toJSONObject());
|
jsonArray.put(other.toJSONObject());
|
||||||
GossipService.LOGGER.debug(other);
|
GossipService.LOGGER.debug(other);
|
||||||
@ -42,35 +42,30 @@ abstract public class SendMembersActiveGossipThread extends ActiveGossipThread {
|
|||||||
byte[] json_bytes = jsonArray.toString().getBytes();
|
byte[] json_bytes = jsonArray.toString().getBytes();
|
||||||
int packet_length = json_bytes.length;
|
int packet_length = json_bytes.length;
|
||||||
if (packet_length < GossipManager.MAX_PACKET_SIZE) {
|
if (packet_length < GossipManager.MAX_PACKET_SIZE) {
|
||||||
// Convert the packet length to the byte representation of the int.
|
byte[] buf = createBuffer(packet_length, json_bytes);
|
||||||
byte[] length_bytes = new byte[4];
|
|
||||||
length_bytes[0] = (byte) (packet_length >> 24);
|
|
||||||
length_bytes[1] = (byte) ((packet_length << 8) >> 24);
|
|
||||||
length_bytes[2] = (byte) ((packet_length << 16) >> 24);
|
|
||||||
length_bytes[3] = (byte) ((packet_length << 24) >> 24);
|
|
||||||
|
|
||||||
GossipService.LOGGER.debug("Sending message (" + packet_length + " bytes): "
|
|
||||||
+ jsonArray.toString());
|
|
||||||
|
|
||||||
ByteBuffer byteBuffer = ByteBuffer.allocate(4 + json_bytes.length);
|
|
||||||
byteBuffer.put(length_bytes);
|
|
||||||
byteBuffer.put(json_bytes);
|
|
||||||
byte[] buf = byteBuffer.array();
|
|
||||||
|
|
||||||
DatagramSocket socket = new DatagramSocket();
|
|
||||||
DatagramPacket datagramPacket = new DatagramPacket(buf, buf.length, dest,
|
DatagramPacket datagramPacket = new DatagramPacket(buf, buf.length, dest,
|
||||||
member.getPort());
|
member.getPort());
|
||||||
socket.send(datagramPacket);
|
socket.send(datagramPacket);
|
||||||
socket.close();
|
|
||||||
} else {
|
} else {
|
||||||
GossipService.LOGGER.error("The length of the to be send message is too large ("
|
GossipService.LOGGER.error("The length of the to be send message is too large ("
|
||||||
+ packet_length + " > " + GossipManager.MAX_PACKET_SIZE + ").");
|
+ packet_length + " > " + GossipManager.MAX_PACKET_SIZE + ").");
|
||||||
}
|
}
|
||||||
|
} catch (IOException e1) {
|
||||||
|
GossipService.LOGGER.warn(e1);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
} catch (IOException e1) {
|
private byte[] createBuffer(int packetLength, byte [] jsonBytes){
|
||||||
e1.printStackTrace();
|
byte[] lengthBytes = new byte[4];
|
||||||
}
|
lengthBytes[0] = (byte) (packetLength >> 24);
|
||||||
}
|
lengthBytes[1] = (byte) ((packetLength << 8) >> 24);
|
||||||
|
lengthBytes[2] = (byte) ((packetLength << 16) >> 24);
|
||||||
|
lengthBytes[3] = (byte) ((packetLength << 24) >> 24);
|
||||||
|
ByteBuffer byteBuffer = ByteBuffer.allocate(4 + jsonBytes.length);
|
||||||
|
byteBuffer.put(lengthBytes);
|
||||||
|
byteBuffer.put(jsonBytes);
|
||||||
|
byte[] buf = byteBuffer.array();
|
||||||
|
return buf;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
103
src/test/java/io/teknek/gossip/ShutdownDeadtimeTest.java
Normal file
103
src/test/java/io/teknek/gossip/ShutdownDeadtimeTest.java
Normal file
@ -0,0 +1,103 @@
|
|||||||
|
package io.teknek.gossip;
|
||||||
|
|
||||||
|
import io.teknek.tunit.TUnit;
|
||||||
|
|
||||||
|
import java.net.UnknownHostException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Random;
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
|
||||||
|
import org.apache.log4j.Logger;
|
||||||
|
import org.junit.Ignore;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import com.google.code.gossip.GossipMember;
|
||||||
|
import com.google.code.gossip.GossipService;
|
||||||
|
import com.google.code.gossip.GossipSettings;
|
||||||
|
import com.google.code.gossip.RemoteGossipMember;
|
||||||
|
import com.google.code.gossip.event.GossipListener;
|
||||||
|
import com.google.code.gossip.event.GossipState;
|
||||||
|
|
||||||
|
public class ShutdownDeadtimeTest {
|
||||||
|
|
||||||
|
private static final Logger log = Logger.getLogger(ShutdownDeadtimeTest.class );
|
||||||
|
//@Test
|
||||||
|
@Ignore
|
||||||
|
public void DeadNodesDoNotComeAliveAgain() throws InterruptedException, UnknownHostException {
|
||||||
|
GossipSettings settings = new GossipSettings(10,10000);
|
||||||
|
|
||||||
|
log.info( "Adding seed nodes" );
|
||||||
|
int seedNodes = 3;
|
||||||
|
List<GossipMember> startupMembers = new ArrayList<>();
|
||||||
|
for (int i = 1; i < seedNodes + 1; ++i) {
|
||||||
|
startupMembers.add(new RemoteGossipMember("127.0.0.1", 50000 + i, i + ""));
|
||||||
|
}
|
||||||
|
|
||||||
|
log.info( "Adding clients" );
|
||||||
|
final List<GossipService> clients = new ArrayList<>();
|
||||||
|
final int clusterMembers = 5;
|
||||||
|
for (int i = 1; i < clusterMembers+1; ++i) {
|
||||||
|
final int j = i;
|
||||||
|
GossipService gossipService = new GossipService("127.0.0.1", 50000 + i, i + "",
|
||||||
|
startupMembers, settings,
|
||||||
|
new GossipListener(){
|
||||||
|
@Override
|
||||||
|
public void gossipEvent(GossipMember member, GossipState state) {
|
||||||
|
System.out.println(System.currentTimeMillis() + " Member "+j + " reports "+ member+" "+ state);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
clients.add(gossipService);
|
||||||
|
gossipService.start();
|
||||||
|
}
|
||||||
|
TUnit.assertThat(new Callable<Integer> (){
|
||||||
|
public Integer call() throws Exception {
|
||||||
|
int total = 0;
|
||||||
|
for (int i = 0; i < clusterMembers; ++i) {
|
||||||
|
total += clients.get(i).get_gossipManager().getMemberList().size();
|
||||||
|
}
|
||||||
|
return total;
|
||||||
|
}}).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(20);
|
||||||
|
|
||||||
|
// shutdown one client and verify that one client is lost.
|
||||||
|
Random r = new Random();
|
||||||
|
int randomClientId = r.nextInt(clusterMembers);
|
||||||
|
log.info( "shutting down " + randomClientId );
|
||||||
|
final int shutdownPort = clients.get(randomClientId).get_gossipManager().getMyself().getPort();
|
||||||
|
final String shutdownId = clients.get(randomClientId).get_gossipManager().getMyself().getId();
|
||||||
|
clients.get(randomClientId).shutdown();
|
||||||
|
TUnit.assertThat(new Callable<Integer> (){
|
||||||
|
public Integer call() throws Exception {
|
||||||
|
int total = 0;
|
||||||
|
for (int i = 0; i < clusterMembers; ++i) {
|
||||||
|
total += clients.get(i).get_gossipManager().getMemberList().size();
|
||||||
|
}
|
||||||
|
return total;
|
||||||
|
}}).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(16);
|
||||||
|
|
||||||
|
// start client again
|
||||||
|
GossipService gossipService = new GossipService("127.0.0.1", shutdownPort, shutdownId + "",
|
||||||
|
startupMembers, settings,
|
||||||
|
new GossipListener(){
|
||||||
|
@Override
|
||||||
|
public void gossipEvent(GossipMember member, GossipState state) {
|
||||||
|
//System.out.println("revived " + member+" "+ state);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
clients.add(gossipService);
|
||||||
|
gossipService.start();
|
||||||
|
|
||||||
|
// verify that the client is alive again for every node
|
||||||
|
TUnit.assertThat(new Callable<Integer> (){
|
||||||
|
public Integer call() throws Exception {
|
||||||
|
int total = 0;
|
||||||
|
for (int i = 0; i < clusterMembers; ++i) {
|
||||||
|
total += clients.get(i).get_gossipManager().getMemberList().size();
|
||||||
|
}
|
||||||
|
return total;
|
||||||
|
}}).afterWaitingAtMost(70, TimeUnit.SECONDS).isEqualTo(20);
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
Reference in New Issue
Block a user