periodically attempt to contact dead members

This commit is contained in:
Gary Dusbabek
2016-07-18 14:09:43 -05:00
parent d2e8c62b29
commit c4cb0d7c6c
4 changed files with 29 additions and 3 deletions

View File

@ -45,7 +45,13 @@ abstract public class ActiveGossipThread implements Runnable {
while (keepRunning.get()) { while (keepRunning.get()) {
try { try {
TimeUnit.MILLISECONDS.sleep(gossipManager.getSettings().getGossipInterval()); TimeUnit.MILLISECONDS.sleep(gossipManager.getSettings().getGossipInterval());
// contact a live member.
sendMembershipList(gossipManager.getMyself(), gossipManager.getLiveMembers()); sendMembershipList(gossipManager.getMyself(), gossipManager.getLiveMembers());
// contact a dead member.
sendMembershipList(gossipManager.getMyself(), gossipManager.getDeadMembers());
} catch (InterruptedException e) { } catch (InterruptedException e) {
GossipService.LOGGER.error(e); GossipService.LOGGER.error(e);
keepRunning.set(false); keepRunning.set(false);

View File

@ -131,7 +131,7 @@ public class GossipCore {
if (t == null){ if (t == null){
return null; return null;
} }
Future<Response> response = service.submit( new Callable<Response>(){ final Future<Response> response = service.submit( new Callable<Response>(){
@Override @Override
public Response call() throws Exception { public Response call() throws Exception {
while(true){ while(true){
@ -156,7 +156,8 @@ public class GossipCore {
LOGGER.error(e.getMessage(), e); LOGGER.error(e.getMessage(), e);
return null; return null;
} catch (TimeoutException e) { } catch (TimeoutException e) {
LOGGER.error(e.getMessage(), e); boolean cancelled = response.cancel(true);
LOGGER.error(String.format("Threadpool timeout attempting to contact %s, cancelled ? %b", uri.toString(), cancelled));
return null; return null;
} finally { } finally {
if (t != null){ if (t != null){

View File

@ -132,6 +132,21 @@ public abstract class GossipManager extends Thread implements NotificationListen
return settings; return settings;
} }
// TODO: Use some java 8 goodness for these functions.
/**
* @return a read only list of members found in the DOWN state.
*/
public List<LocalGossipMember> getDeadMembers() {
List<LocalGossipMember> down = new ArrayList<>();
for (Entry<LocalGossipMember, GossipState> entry : members.entrySet()) {
if (GossipState.DOWN.equals(entry.getValue())) {
down.add(entry.getKey());
}
}
return Collections.unmodifiableList(down);
}
/** /**
* *
* @return a read only list of members found in the UP state * @return a read only list of members found in the UP state

View File

@ -71,12 +71,16 @@ public class RandomActiveGossipThread extends ActiveGossipThread {
} }
protected void sendMembershipList(LocalGossipMember me, List<LocalGossipMember> memberList) { protected void sendMembershipList(LocalGossipMember me, List<LocalGossipMember> memberList) {
GossipService.LOGGER.debug("Send sendMembershipList() is called.");
me.setHeartbeat(System.currentTimeMillis()); me.setHeartbeat(System.currentTimeMillis());
LocalGossipMember member = selectPartner(memberList); LocalGossipMember member = selectPartner(memberList);
if (member == null) { if (member == null) {
GossipService.LOGGER.debug("Send sendMembershipList() is called without action");
return; return;
} else {
GossipService.LOGGER.debug("Send sendMembershipList() is called to " + member.toString());
} }
try (DatagramSocket socket = new DatagramSocket()) { try (DatagramSocket socket = new DatagramSocket()) {
socket.setSoTimeout(gossipManager.getSettings().getGossipInterval()); socket.setSoTimeout(gossipManager.getSettings().getGossipInterval());
UdpActiveGossipMessage message = new UdpActiveGossipMessage(); UdpActiveGossipMessage message = new UdpActiveGossipMessage();