From c544b8bf167a099ec4cac6a94b059fa71ce8a8cc Mon Sep 17 00:00:00 2001 From: Gary Dusbabek Date: Thu, 13 Apr 2017 13:02:48 -0500 Subject: [PATCH] GOSSIP-80 Sundry cleanups * remove redundant parameter from method call. * remove uncessary threadpool. * Simplify `GossipCore.sendOneWay()` * Cleanup useage of `MessageInvoker` * `DefaultMessageInvoker` replaced by a factory * `MessageInvokerCombiner` replaced by same factory * Alter `MessageInvokerTest` to not rely on specific types * Remove type assertion from `GossipManagerBuilderTest` * Merge `MessageInvoker` with `MessageHandler` * This required changing method signature return type from `void` to `boolean`. --- .../org/apache/gossip/manager/GossipCore.java | 42 +------- .../apache/gossip/manager/GossipManager.java | 12 +-- .../gossip/manager/GossipManagerBuilder.java | 16 +-- .../handlers/ActiveGossipMessageHandler.java | 12 ++- .../handlers/DefaultMessageInvoker.java | 40 ------- .../manager/handlers/MessageHandler.java | 8 +- .../handlers/MessageHandlerFactory.java | 58 ++++++++++ .../manager/handlers/MessageInvoker.java | 33 ------ .../handlers/MessageInvokerCombiner.java | 48 --------- .../handlers/PerNodeDataMessageHandler.java | 10 +- .../manager/handlers/ResponseHandler.java | 11 +- .../handlers/SharedDataMessageHandler.java | 10 +- .../handlers/ShutdownMessageHandler.java | 10 +- ...eInvoker.java => TypedMessageHandler.java} | 10 +- .../manager/GossipManagerBuilderTest.java | 22 ++-- ...vokerTest.java => MessageHandlerTest.java} | 102 +++++++++--------- 16 files changed, 202 insertions(+), 242 deletions(-) delete mode 100644 gossip-base/src/main/java/org/apache/gossip/manager/handlers/DefaultMessageInvoker.java create mode 100644 gossip-base/src/main/java/org/apache/gossip/manager/handlers/MessageHandlerFactory.java delete mode 100644 gossip-base/src/main/java/org/apache/gossip/manager/handlers/MessageInvoker.java delete mode 100644 gossip-base/src/main/java/org/apache/gossip/manager/handlers/MessageInvokerCombiner.java rename gossip-base/src/main/java/org/apache/gossip/manager/handlers/{SimpleMessageInvoker.java => TypedMessageHandler.java} (83%) rename gossip-base/src/test/java/org/apache/gossip/manager/handlers/{MessageInvokerTest.java => MessageHandlerTest.java} (58%) diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/GossipCore.java b/gossip-base/src/main/java/org/apache/gossip/manager/GossipCore.java index f53419d..d01a84c 100644 --- a/gossip-base/src/main/java/org/apache/gossip/manager/GossipCore.java +++ b/gossip-base/src/main/java/org/apache/gossip/manager/GossipCore.java @@ -57,7 +57,6 @@ public class GossipCore implements GossipCoreConstants { public static final Logger LOGGER = Logger.getLogger(GossipCore.class); private final GossipManager gossipManager; private ConcurrentHashMap requests; - private ThreadPoolExecutor service; private final ConcurrentHashMap> perNodeData; private final ConcurrentHashMap sharedData; private final BlockingQueue workQueue; @@ -71,15 +70,12 @@ public class GossipCore implements GossipCoreConstants { this.gossipManager = manager; requests = new ConcurrentHashMap<>(); workQueue = new ArrayBlockingQueue<>(1024); - service = new ThreadPoolExecutor(1, 5, 1, TimeUnit.SECONDS, workQueue, new ThreadPoolExecutor.DiscardOldestPolicy()); perNodeData = new ConcurrentHashMap<>(); sharedData = new ConcurrentHashMap<>(); metrics.register(WORKQUEUE_SIZE, (Gauge)() -> workQueue.size()); metrics.register(PER_NODE_DATA_SIZE, (Gauge)() -> perNodeData.size()); metrics.register(SHARED_DATA_SIZE, (Gauge)() -> sharedData.size()); metrics.register(REQUEST_SIZE, (Gauge)() -> requests.size()); - metrics.register(THREADPOOL_ACTIVE, (Gauge)() -> service.getActiveCount()); - metrics.register(THREADPOOL_SIZE, (Gauge)() -> service.getPoolSize()); messageSerdeException = metrics.meter(MESSAGE_SERDE_EXCEPTION); tranmissionException = metrics.meter(MESSAGE_TRANSMISSION_EXCEPTION); tranmissionSuccess = metrics.meter(MESSAGE_TRANSMISSION_SUCCESS); @@ -178,17 +174,10 @@ public class GossipCore implements GossipCoreConstants { } public void shutdown(){ - service.shutdown(); - try { - service.awaitTermination(1, TimeUnit.SECONDS); - } catch (InterruptedException e) { - LOGGER.warn(e); - } - service.shutdownNow(); } public void receive(Base base) { - if (!gossipManager.getMessageInvoker().invoke(this, gossipManager, base)) { + if (!gossipManager.getMessageHandler().invoke(this, gossipManager, base)) { LOGGER.warn("received message can not be handled"); } } @@ -268,29 +257,10 @@ public class GossipCore implements GossipCoreConstants { * @param message the message to send * @param u the uri to send it to */ - public void sendOneWay(Base message, URI u){ - byte[] json_bytes; + public void sendOneWay(Base message, URI u) { try { - if (privKey == null){ - json_bytes = gossipManager.getObjectMapper().writeValueAsBytes(message); - } else { - SignedPayload p = new SignedPayload(); - p.setData(gossipManager.getObjectMapper().writeValueAsString(message).getBytes()); - p.setSignature(sign(p.getData())); - json_bytes = gossipManager.getObjectMapper().writeValueAsBytes(p); - } - } catch (IOException e) { - messageSerdeException.mark(); - throw new RuntimeException(e); - } - try (DatagramSocket socket = new DatagramSocket()) { - socket.setSoTimeout(gossipManager.getSettings().getGossipInterval() * 2); - InetAddress dest = InetAddress.getByName(u.getHost()); - DatagramPacket datagramPacket = new DatagramPacket(json_bytes, json_bytes.length, dest, u.getPort()); - socket.send(datagramPacket); - tranmissionSuccess.mark(); - } catch (IOException ex) { - tranmissionException.mark(); + sendInternal(message, u); + } catch (RuntimeException ex) { LOGGER.debug("Send one way failed", ex); } } @@ -304,13 +274,11 @@ public class GossipCore implements GossipCoreConstants { /** * Merge lists from remote members and update heartbeats * - * @param gossipManager * @param senderMember * @param remoteList * */ - public void mergeLists(GossipManager gossipManager, RemoteMember senderMember, - List remoteList) { + public void mergeLists(RemoteMember senderMember, List remoteList) { if (LOGGER.isDebugEnabled()){ debugState(senderMember, remoteList); } diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java b/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java index c2b50ae..ff70ccc 100644 --- a/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java +++ b/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java @@ -25,7 +25,7 @@ import org.apache.gossip.Member; import org.apache.gossip.crdt.Crdt; import org.apache.gossip.event.GossipListener; import org.apache.gossip.event.GossipState; -import org.apache.gossip.manager.handlers.MessageInvoker; +import org.apache.gossip.manager.handlers.MessageHandler; import org.apache.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread; import org.apache.gossip.model.PerNodeDataMessage; import org.apache.gossip.model.SharedDataMessage; @@ -64,14 +64,14 @@ public abstract class GossipManager { private final GossipMemberStateRefresher memberStateRefresher; private final ObjectMapper objectMapper; - private final MessageInvoker messageInvoker; + private final MessageHandler messageHandler; public GossipManager(String cluster, URI uri, String id, Map properties, GossipSettings settings, List gossipMembers, GossipListener listener, MetricRegistry registry, - ObjectMapper objectMapper, MessageInvoker messageInvoker) { + ObjectMapper objectMapper, MessageHandler messageHandler) { this.settings = settings; - this.messageInvoker = messageInvoker; + this.messageHandler = messageHandler; clock = new SystemClock(); me = new LocalMember(cluster, uri, id, clock.nanoTime(), properties, @@ -101,8 +101,8 @@ public abstract class GossipManager { readSavedDataState(); } - public MessageInvoker getMessageInvoker() { - return messageInvoker; + public MessageHandler getMessageHandler() { + return messageHandler; } public ConcurrentSkipListMap getMembers() { diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/GossipManagerBuilder.java b/gossip-base/src/main/java/org/apache/gossip/manager/GossipManagerBuilder.java index b87045b..bb73177 100644 --- a/gossip-base/src/main/java/org/apache/gossip/manager/GossipManagerBuilder.java +++ b/gossip-base/src/main/java/org/apache/gossip/manager/GossipManagerBuilder.java @@ -25,8 +25,8 @@ import org.apache.gossip.GossipSettings; import org.apache.gossip.StartupSettings; import org.apache.gossip.crdt.CrdtModule; import org.apache.gossip.event.GossipListener; -import org.apache.gossip.manager.handlers.DefaultMessageInvoker; -import org.apache.gossip.manager.handlers.MessageInvoker; +import org.apache.gossip.manager.handlers.MessageHandler; +import org.apache.gossip.manager.handlers.MessageHandlerFactory; import java.net.URI; import java.util.ArrayList; @@ -50,7 +50,7 @@ public class GossipManagerBuilder { private MetricRegistry registry; private Map properties; private ObjectMapper objectMapper; - private MessageInvoker messageInvoker; + private MessageHandler messageHandler; private ManagerBuilder() {} @@ -114,8 +114,8 @@ public class GossipManagerBuilder { return this; } - public ManagerBuilder messageInvoker(MessageInvoker messageInvoker) { - this.messageInvoker = messageInvoker; + public ManagerBuilder messageHandler(MessageHandler messageHandler) { + this.messageHandler = messageHandler; return this; } @@ -142,10 +142,10 @@ public class GossipManagerBuilder { objectMapper.registerModule(new CrdtModule()); objectMapper.configure(Feature.WRITE_NUMBERS_AS_STRINGS, false); } - if (messageInvoker == null) { - messageInvoker = new DefaultMessageInvoker(); + if (messageHandler == null) { + messageHandler = MessageHandlerFactory.defaultHandler(); } - return new GossipManager(cluster, uri, id, properties, settings, gossipMembers, listener, registry, objectMapper, messageInvoker) {} ; + return new GossipManager(cluster, uri, id, properties, settings, gossipMembers, listener, registry, objectMapper, messageHandler) {} ; } } diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/ActiveGossipMessageHandler.java b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/ActiveGossipMessageHandler.java index f5e568e..e89179b 100644 --- a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/ActiveGossipMessageHandler.java +++ b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/ActiveGossipMessageHandler.java @@ -32,8 +32,15 @@ import java.util.ArrayList; import java.util.List; public class ActiveGossipMessageHandler implements MessageHandler { + + /** + * @param gossipCore context. + * @param gossipManager context. + * @param base message reference. + * @return boolean indicating success. + */ @Override - public void invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) { + public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) { List remoteGossipMembers = new ArrayList<>(); RemoteMember senderMember = null; UdpActiveGossipMessage activeGossipMessage = (UdpActiveGossipMessage) base; @@ -69,6 +76,7 @@ public class ActiveGossipMessageHandler implements MessageHandler { o.setUriFrom(activeGossipMessage.getUriFrom()); o.setUuid(activeGossipMessage.getUuid()); gossipCore.sendOneWay(o, senderMember.getUri()); - gossipCore.mergeLists(gossipManager, senderMember, remoteGossipMembers); + gossipCore.mergeLists(senderMember, remoteGossipMembers); + return true; } } diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/DefaultMessageInvoker.java b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/DefaultMessageInvoker.java deleted file mode 100644 index 5b78ce3..0000000 --- a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/DefaultMessageInvoker.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gossip.manager.handlers; - -import org.apache.gossip.manager.GossipCore; -import org.apache.gossip.manager.GossipManager; -import org.apache.gossip.model.*; - -public class DefaultMessageInvoker implements MessageInvoker { - private final MessageInvokerCombiner mic; - - public DefaultMessageInvoker() { - mic = new MessageInvokerCombiner(); - mic.add(new SimpleMessageInvoker(Response.class, new ResponseHandler())); - mic.add(new SimpleMessageInvoker(ShutdownMessage.class, new ShutdownMessageHandler())); - mic.add(new SimpleMessageInvoker(PerNodeDataMessage.class, new PerNodeDataMessageHandler())); - mic.add(new SimpleMessageInvoker(SharedDataMessage.class, new SharedDataMessageHandler())); - mic.add(new SimpleMessageInvoker(ActiveGossipMessage.class, new ActiveGossipMessageHandler())); - } - - public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) { - return mic.invoke(gossipCore, gossipManager, base); - } -} diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/MessageHandler.java b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/MessageHandler.java index 4b5d49d..5af9b14 100644 --- a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/MessageHandler.java +++ b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/MessageHandler.java @@ -22,5 +22,11 @@ import org.apache.gossip.manager.GossipManager; import org.apache.gossip.model.Base; public interface MessageHandler { - void invoke(GossipCore gossipCore, GossipManager gossipManager, Base base); + /** + * @param gossipCore context. + * @param gossipManager context. + * @param base message reference. + * @return boolean indicating success. + */ + boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base); } \ No newline at end of file diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/MessageHandlerFactory.java b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/MessageHandlerFactory.java new file mode 100644 index 0000000..fff9430 --- /dev/null +++ b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/MessageHandlerFactory.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gossip.manager.handlers; + +import org.apache.gossip.manager.GossipCore; +import org.apache.gossip.manager.GossipManager; +import org.apache.gossip.model.ActiveGossipMessage; +import org.apache.gossip.model.Base; +import org.apache.gossip.model.PerNodeDataMessage; +import org.apache.gossip.model.Response; +import org.apache.gossip.model.SharedDataMessage; +import org.apache.gossip.model.ShutdownMessage; + +import java.util.Arrays; + +public class MessageHandlerFactory { + + public static MessageHandler defaultHandler() { + return concurrentHandler( + new TypedMessageHandler(Response.class, new ResponseHandler()), + new TypedMessageHandler(ShutdownMessage.class, new ShutdownMessageHandler()), + new TypedMessageHandler(PerNodeDataMessage.class, new PerNodeDataMessageHandler()), + new TypedMessageHandler(SharedDataMessage.class, new SharedDataMessageHandler()), + new TypedMessageHandler(ActiveGossipMessage.class, new ActiveGossipMessageHandler()) + ); + } + + public static MessageHandler concurrentHandler(MessageHandler... handlers) { + if (handlers == null) throw new NullPointerException("handlers cannot be null"); + if (Arrays.asList(handlers).stream().filter(i -> i != null).count() != handlers.length) { + throw new NullPointerException("found at least one null handler"); + } + return new MessageHandler() { + @Override + public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) { + // return true if at least one of the component handlers return true. + return Arrays.asList(handlers).stream().filter((mi) -> mi.invoke(gossipCore, gossipManager, base)).count() > 0; + } + }; + } +} + diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/MessageInvoker.java b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/MessageInvoker.java deleted file mode 100644 index 70be408..0000000 --- a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/MessageInvoker.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gossip.manager.handlers; - -import org.apache.gossip.manager.GossipCore; -import org.apache.gossip.manager.GossipManager; -import org.apache.gossip.model.Base; - -public interface MessageInvoker { - /** - * - * @param gossipCore - * @param gossipManager - * @param base - * @return true if the invoker processed the message type - */ - boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base); -} diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/MessageInvokerCombiner.java b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/MessageInvokerCombiner.java deleted file mode 100644 index 5faf6a5..0000000 --- a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/MessageInvokerCombiner.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gossip.manager.handlers; - -import org.apache.gossip.manager.GossipCore; -import org.apache.gossip.manager.GossipManager; -import org.apache.gossip.model.Base; - -import java.util.List; -import java.util.concurrent.CopyOnWriteArrayList; - -public class MessageInvokerCombiner implements MessageInvoker { - private final List invokers = new CopyOnWriteArrayList<>(); - - public MessageInvokerCombiner() { - } - - public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) { - return invokers.stream().filter((mi) -> mi.invoke(gossipCore, gossipManager, base)).count() > 0; - } - - public void clear() { - invokers.clear(); - } - - public void add(MessageInvoker mi) { - if (mi == null) { - throw new NullPointerException(); - } - invokers.add(mi); - } -} diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/PerNodeDataMessageHandler.java b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/PerNodeDataMessageHandler.java index b3a785e..0ad0d91 100644 --- a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/PerNodeDataMessageHandler.java +++ b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/PerNodeDataMessageHandler.java @@ -23,9 +23,17 @@ import org.apache.gossip.model.Base; import org.apache.gossip.udp.UdpPerNodeDataMessage; public class PerNodeDataMessageHandler implements MessageHandler { + + /** + * @param gossipCore context. + * @param gossipManager context. + * @param base message reference. + * @return boolean indicating success. + */ @Override - public void invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) { + public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) { UdpPerNodeDataMessage message = (UdpPerNodeDataMessage) base; gossipCore.addPerNodeData(message); + return true; } } diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/ResponseHandler.java b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/ResponseHandler.java index 2f33b01..1070ff7 100644 --- a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/ResponseHandler.java +++ b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/ResponseHandler.java @@ -23,11 +23,20 @@ import org.apache.gossip.model.Base; import org.apache.gossip.udp.Trackable; public class ResponseHandler implements MessageHandler { + + /** + * @param gossipCore context. + * @param gossipManager context. + * @param base message reference. + * @return boolean indicating success. + */ @Override - public void invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) { + public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) { if (base instanceof Trackable) { Trackable t = (Trackable) base; gossipCore.handleResponse(t.getUuid() + "/" + t.getUriFrom(), (Base) t); + return true; } + return false; } } diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/SharedDataMessageHandler.java b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/SharedDataMessageHandler.java index 89ca4a0..3fe3033 100644 --- a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/SharedDataMessageHandler.java +++ b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/SharedDataMessageHandler.java @@ -23,9 +23,17 @@ import org.apache.gossip.model.Base; import org.apache.gossip.udp.UdpSharedDataMessage; public class SharedDataMessageHandler implements MessageHandler{ + + /** + * @param gossipCore context. + * @param gossipManager context. + * @param base message reference. + * @return boolean indicating success. + */ @Override - public void invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) { + public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) { UdpSharedDataMessage message = (UdpSharedDataMessage) base; gossipCore.addSharedData(message); + return true; } } diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/ShutdownMessageHandler.java b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/ShutdownMessageHandler.java index a40c7a1..40e4c07 100644 --- a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/ShutdownMessageHandler.java +++ b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/ShutdownMessageHandler.java @@ -24,8 +24,15 @@ import org.apache.gossip.model.PerNodeDataMessage; import org.apache.gossip.model.ShutdownMessage; public class ShutdownMessageHandler implements MessageHandler { + + /** + * @param gossipCore context. + * @param gossipManager context. + * @param base message reference. + * @return boolean indicating success. + */ @Override - public void invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) { + public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) { ShutdownMessage s = (ShutdownMessage) base; PerNodeDataMessage m = new PerNodeDataMessage(); m.setKey(ShutdownMessage.PER_NODE_KEY); @@ -34,5 +41,6 @@ public class ShutdownMessageHandler implements MessageHandler { m.setTimestamp(System.currentTimeMillis()); m.setExpireAt(System.currentTimeMillis() + 30L * 1000L); gossipCore.addPerNodeData(m); + return true; } } diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/SimpleMessageInvoker.java b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/TypedMessageHandler.java similarity index 83% rename from gossip-base/src/main/java/org/apache/gossip/manager/handlers/SimpleMessageInvoker.java rename to gossip-base/src/main/java/org/apache/gossip/manager/handlers/TypedMessageHandler.java index 0f410d2..b40461d 100644 --- a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/SimpleMessageInvoker.java +++ b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/TypedMessageHandler.java @@ -21,11 +21,11 @@ import org.apache.gossip.manager.GossipCore; import org.apache.gossip.manager.GossipManager; import org.apache.gossip.model.Base; -public class SimpleMessageInvoker implements MessageInvoker { +public class TypedMessageHandler implements MessageHandler { final private Class messageClass; final private MessageHandler messageHandler; - public SimpleMessageInvoker(Class messageClass, MessageHandler messageHandler) { + public TypedMessageHandler(Class messageClass, MessageHandler messageHandler) { if (messageClass == null || messageHandler == null) { throw new NullPointerException(); } @@ -33,6 +33,12 @@ public class SimpleMessageInvoker implements MessageInvoker { this.messageHandler = messageHandler; } + /** + * @param gossipCore context. + * @param gossipManager context. + * @param base message reference. + * @return true if types match, false otherwise. + */ @Override public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) { if (messageClass.isAssignableFrom(base.getClass())) { diff --git a/gossip-base/src/test/java/org/apache/gossip/manager/GossipManagerBuilderTest.java b/gossip-base/src/test/java/org/apache/gossip/manager/GossipManagerBuilderTest.java index 8842643..959f818 100644 --- a/gossip-base/src/test/java/org/apache/gossip/manager/GossipManagerBuilderTest.java +++ b/gossip-base/src/test/java/org/apache/gossip/manager/GossipManagerBuilderTest.java @@ -21,10 +21,9 @@ import com.codahale.metrics.MetricRegistry; import org.apache.gossip.Member; import org.apache.gossip.GossipSettings; import org.apache.gossip.LocalMember; -import org.apache.gossip.manager.handlers.DefaultMessageInvoker; -import org.apache.gossip.manager.handlers.MessageInvoker; +import org.apache.gossip.manager.handlers.MessageHandler; import org.apache.gossip.manager.handlers.ResponseHandler; -import org.apache.gossip.manager.handlers.SimpleMessageInvoker; +import org.apache.gossip.manager.handlers.TypedMessageHandler; import org.junit.Assert; import org.junit.jupiter.api.Test; import org.junit.platform.runner.JUnitPlatform; @@ -77,28 +76,27 @@ public class GossipManagerBuilderTest { } @Test - public void createDefaultMessageInvokerIfNull() throws URISyntaxException { + public void createDefaultMessageHandlerIfNull() throws URISyntaxException { GossipManager gossipManager = GossipManagerBuilder.newBuilder() .id("id") .cluster("aCluster") .uri(new URI("udp://localhost:2000")) .gossipSettings(new GossipSettings()) - .messageInvoker(null).registry(new MetricRegistry()).build(); - assertNotNull(gossipManager.getMessageInvoker()); - Assert.assertEquals(gossipManager.getMessageInvoker().getClass(), new DefaultMessageInvoker().getClass()); + .messageHandler(null).registry(new MetricRegistry()).build(); + assertNotNull(gossipManager.getMessageHandler()); } @Test - public void testMessageInvokerKeeping() throws URISyntaxException { - MessageInvoker mi = new SimpleMessageInvoker(Response.class, new ResponseHandler()); + public void testMessageHandlerKeeping() throws URISyntaxException { + MessageHandler mi = new TypedMessageHandler(Response.class, new ResponseHandler()); GossipManager gossipManager = GossipManagerBuilder.newBuilder() .id("id") .cluster("aCluster") .uri(new URI("udp://localhost:2000")) .gossipSettings(new GossipSettings()) - .messageInvoker(mi).registry(new MetricRegistry()).build(); - assertNotNull(gossipManager.getMessageInvoker()); - Assert.assertEquals(gossipManager.getMessageInvoker(), mi); + .messageHandler(mi).registry(new MetricRegistry()).build(); + assertNotNull(gossipManager.getMessageHandler()); + Assert.assertEquals(gossipManager.getMessageHandler(), mi); } @Test diff --git a/gossip-base/src/test/java/org/apache/gossip/manager/handlers/MessageInvokerTest.java b/gossip-base/src/test/java/org/apache/gossip/manager/handlers/MessageHandlerTest.java similarity index 58% rename from gossip-base/src/test/java/org/apache/gossip/manager/handlers/MessageInvokerTest.java rename to gossip-base/src/test/java/org/apache/gossip/manager/handlers/MessageHandlerTest.java index 571d7ba..c035d21 100644 --- a/gossip-base/src/test/java/org/apache/gossip/manager/handlers/MessageInvokerTest.java +++ b/gossip-base/src/test/java/org/apache/gossip/manager/handlers/MessageHandlerTest.java @@ -25,7 +25,7 @@ import org.apache.gossip.udp.UdpSharedDataMessage; import org.junit.Assert; import org.junit.Test; -public class MessageInvokerTest { +public class MessageHandlerTest { private class FakeMessage extends Base { public FakeMessage() { } @@ -46,8 +46,9 @@ public class MessageInvokerTest { data = 0; } - public void invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) { + public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) { data = ((FakeMessageData) base).data; + return true; } } @@ -58,32 +59,33 @@ public class MessageInvokerTest { counter = 0; } - public void invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) { + public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) { counter++; + return true; } } @Test - public void testSimpleInvoker() { - MessageInvoker mi = new SimpleMessageInvoker(FakeMessage.class, new FakeMessageHandler()); + public void testSimpleHandler() { + MessageHandler mi = new TypedMessageHandler(FakeMessage.class, new FakeMessageHandler()); Assert.assertTrue(mi.invoke(null, null, new FakeMessage())); Assert.assertFalse(mi.invoke(null, null, new ActiveGossipMessage())); } @Test(expected = NullPointerException.class) - public void testSimpleInvokerNullClassConstructor() { - new SimpleMessageInvoker(null, new FakeMessageHandler()); + public void testSimpleHandlerNullClassConstructor() { + new TypedMessageHandler(null, new FakeMessageHandler()); } @Test(expected = NullPointerException.class) - public void testSimpleInvokerNullHandlerConstructor() { - new SimpleMessageInvoker(FakeMessage.class, null); + public void testSimpleHandlerNullHandlerConstructor() { + new TypedMessageHandler(FakeMessage.class, null); } @Test - public void testCallCountSimpleInvoker() { + public void testCallCountSimpleHandler() { FakeMessageHandler h = new FakeMessageHandler(); - MessageInvoker mi = new SimpleMessageInvoker(FakeMessage.class, h); + MessageHandler mi = new TypedMessageHandler(FakeMessage.class, h); mi.invoke(null, null, new FakeMessage()); Assert.assertEquals(1, h.counter); mi.invoke(null, null, new ActiveGossipMessage()); @@ -93,76 +95,78 @@ public class MessageInvokerTest { } @Test(expected = NullPointerException.class) - public void cantAddNullInvoker() { - MessageInvokerCombiner mi = new MessageInvokerCombiner(); - mi.add(null); + @SuppressWarnings("all") + public void cantAddNullHandler() { + MessageHandler handler = MessageHandlerFactory.concurrentHandler(null); + } + + @Test(expected = NullPointerException.class) + public void cantAddNullHandler2() { + MessageHandler handler = MessageHandlerFactory.concurrentHandler( + new TypedMessageHandler(FakeMessage.class, new FakeMessageHandler()), + null, + new TypedMessageHandler(FakeMessage.class, new FakeMessageHandler()) + ); } @Test - public void testCombinerClear() { - MessageInvokerCombiner mi = new MessageInvokerCombiner(); - mi.add(new SimpleMessageInvoker(FakeMessage.class, new FakeMessageHandler())); - Assert.assertTrue(mi.invoke(null, null, new FakeMessage())); - - mi.clear(); - Assert.assertFalse(mi.invoke(null, null, new FakeMessage())); - } - - @Test - public void testMessageInvokerCombiner() { + public void testMessageHandlerCombiner() { //Empty combiner - false result - MessageInvokerCombiner mi = new MessageInvokerCombiner(); + MessageHandler mi = MessageHandlerFactory.concurrentHandler(); Assert.assertFalse(mi.invoke(null, null, new Base())); FakeMessageHandler h = new FakeMessageHandler(); - mi.add(new SimpleMessageInvoker(FakeMessage.class, h)); - mi.add(new SimpleMessageInvoker(FakeMessage.class, h)); + mi = MessageHandlerFactory.concurrentHandler( + new TypedMessageHandler(FakeMessage.class, h), + new TypedMessageHandler(FakeMessage.class, h) + ); Assert.assertTrue(mi.invoke(null, null, new FakeMessage())); Assert.assertFalse(mi.invoke(null, null, new ActiveGossipMessage())); Assert.assertEquals(2, h.counter); - + //Increase size in runtime. Should be 3 calls: 2+3 = 5 - mi.add(new SimpleMessageInvoker(FakeMessage.class, h)); + mi = MessageHandlerFactory.concurrentHandler(mi, new TypedMessageHandler(FakeMessage.class, h)); Assert.assertTrue(mi.invoke(null, null, new FakeMessage())); Assert.assertEquals(5, h.counter); } @Test - public void testMessageInvokerCombiner2levels() { - MessageInvokerCombiner mi = new MessageInvokerCombiner(); + public void testMessageHandlerCombiner2levels() { FakeMessageHandler h = new FakeMessageHandler(); - MessageInvokerCombiner mi1 = new MessageInvokerCombiner(); - mi1.add(new SimpleMessageInvoker(FakeMessage.class, h)); - mi1.add(new SimpleMessageInvoker(FakeMessage.class, h)); + MessageHandler mi1 = MessageHandlerFactory.concurrentHandler( + new TypedMessageHandler(FakeMessage.class, h), + new TypedMessageHandler(FakeMessage.class, h) + ); - MessageInvokerCombiner mi2 = new MessageInvokerCombiner(); - mi2.add(new SimpleMessageInvoker(FakeMessage.class, h)); - mi2.add(new SimpleMessageInvoker(FakeMessage.class, h)); - - mi.add(mi1); - mi.add(mi2); + MessageHandler mi2 = MessageHandlerFactory.concurrentHandler( + new TypedMessageHandler(FakeMessage.class, h), + new TypedMessageHandler(FakeMessage.class, h) + ); + MessageHandler mi = MessageHandlerFactory.concurrentHandler(mi1, mi2); + Assert.assertTrue(mi.invoke(null, null, new FakeMessage())); Assert.assertEquals(4, h.counter); } @Test - public void testMessageInvokerCombinerDataShipping() { - MessageInvokerCombiner mi = new MessageInvokerCombiner(); + public void testMessageHandlerCombinerDataShipping() { + MessageHandler mi = MessageHandlerFactory.concurrentHandler(); FakeMessageDataHandler h = new FakeMessageDataHandler(); - mi.add(new SimpleMessageInvoker(FakeMessageData.class, h)); + mi = MessageHandlerFactory.concurrentHandler(mi, new TypedMessageHandler(FakeMessageData.class, h)); Assert.assertTrue(mi.invoke(null, null, new FakeMessageData(101))); Assert.assertEquals(101, h.data); } @Test - public void testCombiningDefaultInvoker() { - MessageInvokerCombiner mi = new MessageInvokerCombiner(); - mi.add(new DefaultMessageInvoker()); - mi.add(new SimpleMessageInvoker(FakeMessage.class, new FakeMessageHandler())); + public void testCombiningDefaultHandler() { + MessageHandler mi = MessageHandlerFactory.concurrentHandler( + MessageHandlerFactory.defaultHandler(), + new TypedMessageHandler(FakeMessage.class, new FakeMessageHandler()) + ); //UdpSharedGossipDataMessage with null gossipCore -> exception boolean thrown = false; try { @@ -171,7 +175,7 @@ public class MessageInvokerTest { thrown = true; } Assert.assertTrue(thrown); - //DefaultInvoker skips FakeMessage and FakeHandler works ok + //skips FakeMessage and FakeHandler works ok Assert.assertTrue(mi.invoke(null, null, new FakeMessage())); }