diff --git a/src/main/java/org/apache/gossip/manager/GossipCore.java b/src/main/java/org/apache/gossip/manager/GossipCore.java index e3dcb21..f53419d 100644 --- a/src/main/java/org/apache/gossip/manager/GossipCore.java +++ b/src/main/java/org/apache/gossip/manager/GossipCore.java @@ -45,9 +45,18 @@ import java.util.concurrent.*; public class GossipCore implements GossipCoreConstants { + class LatchAndBase { + private final CountDownLatch latch; + private volatile Base base; + + LatchAndBase(){ + latch = new CountDownLatch(1); + } + + } public static final Logger LOGGER = Logger.getLogger(GossipCore.class); private final GossipManager gossipManager; - private ConcurrentHashMap requests; + private ConcurrentHashMap requests; private ThreadPoolExecutor service; private final ConcurrentHashMap> perNodeData; private final ConcurrentHashMap sharedData; @@ -224,46 +233,30 @@ public class GossipCore implements GossipCoreConstants { } final Trackable t; + LatchAndBase latchAndBase = null; if (message instanceof Trackable){ t = (Trackable) message; + latchAndBase = new LatchAndBase(); + requests.put(t.getUuid() + "/" + t.getUriFrom(), latchAndBase); } else { t = null; } sendInternal(message, uri); - if (t == null){ + if (latchAndBase == null){ return null; - } - final Future response = service.submit( new Callable(){ - @Override - public Response call() throws Exception { - while(true){ - Base b = requests.remove(t.getUuid() + "/" + t.getUriFrom()); - if (b != null){ - return (Response) b; - } - try { - Thread.sleep(0, 555555); - } catch (InterruptedException e) { - - } - } - } - }); - + } + try { - //TODO this needs to be a setting base on attempts/second - return response.get(1, TimeUnit.SECONDS); + boolean complete = latchAndBase.latch.await(1, TimeUnit.SECONDS); + if (complete){ + return (Response) latchAndBase.base; + } else{ + return null; + } } catch (InterruptedException e) { throw new RuntimeException(e); - } catch (ExecutionException e) { - LOGGER.debug(e.getMessage(), e); - return null; - } catch (TimeoutException e) { - boolean cancelled = response.cancel(true); - LOGGER.debug(String.format("Threadpool timeout attempting to contact %s, cancelled ? %b", uri.toString(), cancelled)); - return null; } finally { - if (t != null){ + if (latchAndBase != null){ requests.remove(t.getUuid() + "/" + t.getUriFrom()); } } @@ -302,8 +295,10 @@ public class GossipCore implements GossipCoreConstants { } } - public void addRequest(String k, Base v) { - requests.put(k, v); + public void handleResponse(String k, Base v) { + LatchAndBase latch = requests.get(k); + latch.base = v; + latch.latch.countDown(); } /** diff --git a/src/main/java/org/apache/gossip/manager/handlers/ResponseHandler.java b/src/main/java/org/apache/gossip/manager/handlers/ResponseHandler.java index 36102d5..2f33b01 100644 --- a/src/main/java/org/apache/gossip/manager/handlers/ResponseHandler.java +++ b/src/main/java/org/apache/gossip/manager/handlers/ResponseHandler.java @@ -27,7 +27,7 @@ public class ResponseHandler implements MessageHandler { public void invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) { if (base instanceof Trackable) { Trackable t = (Trackable) base; - gossipCore.addRequest(t.getUuid() + "/" + t.getUriFrom(), (Base) t); + gossipCore.handleResponse(t.getUuid() + "/" + t.getUriFrom(), (Base) t); } } }