GOSSIP-77 better send()

This commit is contained in:
Edward Capriolo
2017-03-20 22:04:07 -04:00
parent 4aafc3ba3b
commit 6a4d50cae7
2 changed files with 28 additions and 33 deletions

View File

@ -45,9 +45,18 @@ import java.util.concurrent.*;
public class GossipCore implements GossipCoreConstants { public class GossipCore implements GossipCoreConstants {
class LatchAndBase {
private final CountDownLatch latch;
private volatile Base base;
LatchAndBase(){
latch = new CountDownLatch(1);
}
}
public static final Logger LOGGER = Logger.getLogger(GossipCore.class); public static final Logger LOGGER = Logger.getLogger(GossipCore.class);
private final GossipManager gossipManager; private final GossipManager gossipManager;
private ConcurrentHashMap<String, Base> requests; private ConcurrentHashMap<String, LatchAndBase> requests;
private ThreadPoolExecutor service; private ThreadPoolExecutor service;
private final ConcurrentHashMap<String, ConcurrentHashMap<String, PerNodeDataMessage>> perNodeData; private final ConcurrentHashMap<String, ConcurrentHashMap<String, PerNodeDataMessage>> perNodeData;
private final ConcurrentHashMap<String, SharedDataMessage> sharedData; private final ConcurrentHashMap<String, SharedDataMessage> sharedData;
@ -224,46 +233,30 @@ public class GossipCore implements GossipCoreConstants {
} }
final Trackable t; final Trackable t;
LatchAndBase latchAndBase = null;
if (message instanceof Trackable){ if (message instanceof Trackable){
t = (Trackable) message; t = (Trackable) message;
latchAndBase = new LatchAndBase();
requests.put(t.getUuid() + "/" + t.getUriFrom(), latchAndBase);
} else { } else {
t = null; t = null;
} }
sendInternal(message, uri); sendInternal(message, uri);
if (t == null){ if (latchAndBase == null){
return null; return null;
} }
final Future<Response> response = service.submit( new Callable<Response>(){
@Override
public Response call() throws Exception {
while(true){
Base b = requests.remove(t.getUuid() + "/" + t.getUriFrom());
if (b != null){
return (Response) b;
}
try {
Thread.sleep(0, 555555);
} catch (InterruptedException e) {
}
}
}
});
try { try {
//TODO this needs to be a setting base on attempts/second boolean complete = latchAndBase.latch.await(1, TimeUnit.SECONDS);
return response.get(1, TimeUnit.SECONDS); if (complete){
return (Response) latchAndBase.base;
} else{
return null;
}
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} catch (ExecutionException e) {
LOGGER.debug(e.getMessage(), e);
return null;
} catch (TimeoutException e) {
boolean cancelled = response.cancel(true);
LOGGER.debug(String.format("Threadpool timeout attempting to contact %s, cancelled ? %b", uri.toString(), cancelled));
return null;
} finally { } finally {
if (t != null){ if (latchAndBase != null){
requests.remove(t.getUuid() + "/" + t.getUriFrom()); requests.remove(t.getUuid() + "/" + t.getUriFrom());
} }
} }
@ -302,8 +295,10 @@ public class GossipCore implements GossipCoreConstants {
} }
} }
public void addRequest(String k, Base v) { public void handleResponse(String k, Base v) {
requests.put(k, v); LatchAndBase latch = requests.get(k);
latch.base = v;
latch.latch.countDown();
} }
/** /**

View File

@ -27,7 +27,7 @@ public class ResponseHandler implements MessageHandler {
public void invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) { public void invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
if (base instanceof Trackable) { if (base instanceof Trackable) {
Trackable t = (Trackable) base; Trackable t = (Trackable) base;
gossipCore.addRequest(t.getUuid() + "/" + t.getUriFrom(), (Base) t); gossipCore.handleResponse(t.getUuid() + "/" + t.getUriFrom(), (Base) t);
} }
} }
} }