diff --git a/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java b/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java index 85d73d5..181d9ae 100644 --- a/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java +++ b/src/main/java/org/apache/gossip/manager/ActiveGossipThread.java @@ -45,7 +45,13 @@ abstract public class ActiveGossipThread implements Runnable { while (keepRunning.get()) { try { TimeUnit.MILLISECONDS.sleep(gossipManager.getSettings().getGossipInterval()); + + // contact a live member. sendMembershipList(gossipManager.getMyself(), gossipManager.getLiveMembers()); + + // contact a dead member. + sendMembershipList(gossipManager.getMyself(), gossipManager.getDeadMembers()); + } catch (InterruptedException e) { GossipService.LOGGER.error(e); keepRunning.set(false); diff --git a/src/main/java/org/apache/gossip/manager/GossipCore.java b/src/main/java/org/apache/gossip/manager/GossipCore.java index 7cfe8a9..ab24621 100644 --- a/src/main/java/org/apache/gossip/manager/GossipCore.java +++ b/src/main/java/org/apache/gossip/manager/GossipCore.java @@ -131,7 +131,7 @@ public class GossipCore { if (t == null){ return null; } - Future response = service.submit( new Callable(){ + final Future response = service.submit( new Callable(){ @Override public Response call() throws Exception { while(true){ @@ -156,7 +156,8 @@ public class GossipCore { LOGGER.error(e.getMessage(), e); return null; } catch (TimeoutException e) { - LOGGER.error(e.getMessage(), e); + boolean cancelled = response.cancel(true); + LOGGER.error(String.format("Threadpool timeout attempting to contact %s, cancelled ? %b", uri.toString(), cancelled)); return null; } finally { if (t != null){ diff --git a/src/main/java/org/apache/gossip/manager/GossipManager.java b/src/main/java/org/apache/gossip/manager/GossipManager.java index 59ad91f..79be431 100644 --- a/src/main/java/org/apache/gossip/manager/GossipManager.java +++ b/src/main/java/org/apache/gossip/manager/GossipManager.java @@ -132,6 +132,21 @@ public abstract class GossipManager extends Thread implements NotificationListen return settings; } + // TODO: Use some java 8 goodness for these functions. + + /** + * @return a read only list of members found in the DOWN state. + */ + public List getDeadMembers() { + List down = new ArrayList<>(); + for (Entry entry : members.entrySet()) { + if (GossipState.DOWN.equals(entry.getValue())) { + down.add(entry.getKey()); + } + } + return Collections.unmodifiableList(down); + } + /** * * @return a read only list of members found in the UP state diff --git a/src/main/java/org/apache/gossip/manager/random/RandomActiveGossipThread.java b/src/main/java/org/apache/gossip/manager/random/RandomActiveGossipThread.java index b39a748..03d550c 100644 --- a/src/main/java/org/apache/gossip/manager/random/RandomActiveGossipThread.java +++ b/src/main/java/org/apache/gossip/manager/random/RandomActiveGossipThread.java @@ -71,12 +71,16 @@ public class RandomActiveGossipThread extends ActiveGossipThread { } protected void sendMembershipList(LocalGossipMember me, List memberList) { - GossipService.LOGGER.debug("Send sendMembershipList() is called."); + me.setHeartbeat(System.currentTimeMillis()); LocalGossipMember member = selectPartner(memberList); if (member == null) { + GossipService.LOGGER.debug("Send sendMembershipList() is called without action"); return; + } else { + GossipService.LOGGER.debug("Send sendMembershipList() is called to " + member.toString()); } + try (DatagramSocket socket = new DatagramSocket()) { socket.setSoTimeout(gossipManager.getSettings().getGossipInterval()); UdpActiveGossipMessage message = new UdpActiveGossipMessage();