From 2133cb0549f8e5e2ed1b24a76681368e620f750a Mon Sep 17 00:00:00 2001 From: Maxim Rusak Date: Thu, 16 Feb 2017 12:35:56 +0300 Subject: [PATCH] GOSSIP-56 GossipCore should allow registration of handlers MessageInvoker idea. Returns true when it managed to invoke one of handlers. User can build any structure of handlers. See tests: MessageInvokerTest. --- .../org/apache/gossip/manager/GossipCore.java | 178 +++++------------- .../apache/gossip/manager/GossipManager.java | 45 ++--- .../handlers/ActiveGossipMessageHandler.java | 74 ++++++++ .../handlers/DefaultMessageInvoker.java | 40 ++++ .../handlers/GossipDataMessageHandler.java | 31 +++ .../manager/handlers/MessageHandler.java | 26 +++ .../manager/handlers/MessageInvoker.java | 26 +++ .../handlers/MessageInvokerCombiner.java | 58 ++++++ .../manager/handlers/ResponseHandler.java | 35 ++++ .../SharedGossipDataMessageHandler.java | 31 +++ .../handlers/ShutdownMessageHandler.java | 38 ++++ .../handlers/SimpleMessageInvoker.java | 45 +++++ .../manager/random/RandomGossipManager.java | 24 ++- .../RandomGossipManagerBuilderTest.java | 31 +++ .../manager/handlers/MessageInvokerTest.java | 178 ++++++++++++++++++ 15 files changed, 701 insertions(+), 159 deletions(-) create mode 100644 src/main/java/org/apache/gossip/manager/handlers/ActiveGossipMessageHandler.java create mode 100644 src/main/java/org/apache/gossip/manager/handlers/DefaultMessageInvoker.java create mode 100644 src/main/java/org/apache/gossip/manager/handlers/GossipDataMessageHandler.java create mode 100644 src/main/java/org/apache/gossip/manager/handlers/MessageHandler.java create mode 100644 src/main/java/org/apache/gossip/manager/handlers/MessageInvoker.java create mode 100644 src/main/java/org/apache/gossip/manager/handlers/MessageInvokerCombiner.java create mode 100644 src/main/java/org/apache/gossip/manager/handlers/ResponseHandler.java create mode 100644 src/main/java/org/apache/gossip/manager/handlers/SharedGossipDataMessageHandler.java create mode 100644 src/main/java/org/apache/gossip/manager/handlers/ShutdownMessageHandler.java create mode 100644 src/main/java/org/apache/gossip/manager/handlers/SimpleMessageInvoker.java create mode 100644 src/test/java/org/apache/gossip/manager/handlers/MessageInvokerTest.java diff --git a/src/main/java/org/apache/gossip/manager/GossipCore.java b/src/main/java/org/apache/gossip/manager/GossipCore.java index 6f97a74..403acf4 100644 --- a/src/main/java/org/apache/gossip/manager/GossipCore.java +++ b/src/main/java/org/apache/gossip/manager/GossipCore.java @@ -17,6 +17,17 @@ */ package org.apache.gossip.manager; +import com.codahale.metrics.Gauge; +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; +import org.apache.gossip.GossipMember; +import org.apache.gossip.LocalGossipMember; +import org.apache.gossip.RemoteGossipMember; +import org.apache.gossip.event.GossipState; +import org.apache.gossip.model.*; +import org.apache.gossip.udp.Trackable; +import org.apache.log4j.Logger; + import java.io.File; import java.io.FileInputStream; import java.io.IOException; @@ -24,53 +35,15 @@ import java.net.DatagramPacket; import java.net.DatagramSocket; import java.net.InetAddress; import java.net.URI; -import java.net.URISyntaxException; -import java.security.InvalidKeyException; -import java.security.KeyFactory; -import java.security.NoSuchAlgorithmException; -import java.security.NoSuchProviderException; -import java.security.PrivateKey; -import java.security.Signature; -import java.security.SignatureException; +import java.security.*; import java.security.spec.InvalidKeySpecException; import java.security.spec.PKCS8EncodedKeySpec; -import java.util.ArrayList; import java.util.List; import java.util.Map.Entry; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import org.apache.gossip.GossipMember; -import org.apache.gossip.LocalGossipMember; -import org.apache.gossip.RemoteGossipMember; -import org.apache.gossip.event.GossipState; -import org.apache.gossip.model.ActiveGossipMessage; -import org.apache.gossip.model.Base; -import org.apache.gossip.model.GossipDataMessage; -import org.apache.gossip.model.Response; -import org.apache.gossip.model.SharedGossipDataMessage; -import org.apache.gossip.model.ShutdownMessage; -import org.apache.gossip.model.SignedPayload; -import org.apache.gossip.udp.Trackable; -import org.apache.gossip.udp.UdpActiveGossipMessage; -import org.apache.gossip.udp.UdpActiveGossipOk; -import org.apache.gossip.udp.UdpGossipDataMessage; -import org.apache.gossip.udp.UdpNotAMemberFault; -import org.apache.gossip.udp.UdpSharedGossipDataMessage; -import org.apache.log4j.Logger; - -import com.codahale.metrics.Gauge; -import com.codahale.metrics.Meter; -import com.codahale.metrics.MetricRegistry; +import java.util.concurrent.*; public class GossipCore implements GossipCoreConstants { - + public static final Logger LOGGER = Logger.getLogger(GossipCore.class); private final GossipManager gossipManager; private ConcurrentHashMap requests; @@ -83,7 +56,7 @@ public class GossipCore implements GossipCoreConstants { private final Meter messageSerdeException; private final Meter tranmissionException; private final Meter tranmissionSuccess; - + public GossipCore(GossipManager manager, MetricRegistry metrics){ this.gossipManager = manager; requests = new ConcurrentHashMap<>(); @@ -100,6 +73,7 @@ public class GossipCore implements GossipCoreConstants { messageSerdeException = metrics.meter(MESSAGE_SERDE_EXCEPTION); tranmissionException = metrics.meter(MESSAGE_TRANSMISSION_EXCEPTION); tranmissionSuccess = metrics.meter(MESSAGE_TRANSMISSION_SUCCESS); + if (manager.getSettings().isSignMessages()){ File privateKey = new File(manager.getSettings().getPathToKeyStore(), manager.getMyself().getId()); File publicKey = new File(manager.getSettings().getPathToKeyStore(), manager.getMyself().getId() + ".pub"); @@ -124,7 +98,7 @@ public class GossipCore implements GossipCoreConstants { privKey = null; } } - + private byte [] sign(byte [] bytes){ Signature dsa; try { @@ -136,7 +110,7 @@ public class GossipCore implements GossipCoreConstants { throw new RuntimeException(e); } } - + public void addSharedData(SharedGossipDataMessage message){ SharedGossipDataMessage previous = sharedData.get(message.getKey()); if (previous == null){ @@ -163,11 +137,11 @@ public class GossipCore implements GossipCoreConstants { } } } - + public ConcurrentHashMap> getPerNodeData(){ return perNodeData; } - + public ConcurrentHashMap getSharedData() { return sharedData; } @@ -181,74 +155,15 @@ public class GossipCore implements GossipCoreConstants { } service.shutdownNow(); } - - public void receive(Base base){ - if (base instanceof Response){ - if (base instanceof Trackable){ - Trackable t = (Trackable) base; - requests.put(t.getUuid() + "/" + t.getUriFrom(), (Base) t); - } - } - if (base instanceof ShutdownMessage){ - ShutdownMessage s = (ShutdownMessage) base; - GossipDataMessage m = new GossipDataMessage(); - m.setKey(ShutdownMessage.PER_NODE_KEY); - m.setNodeId(s.getNodeId()); - m.setPayload(base); - m.setTimestamp(System.currentTimeMillis()); - m.setExpireAt(System.currentTimeMillis() + 30L * 1000L); - addPerNodeData(m); - } - if (base instanceof GossipDataMessage) { - UdpGossipDataMessage message = (UdpGossipDataMessage) base; - addPerNodeData(message); - } - if (base instanceof SharedGossipDataMessage){ - UdpSharedGossipDataMessage message = (UdpSharedGossipDataMessage) base; - addSharedData(message); - } - if (base instanceof ActiveGossipMessage){ - List remoteGossipMembers = new ArrayList<>(); - RemoteGossipMember senderMember = null; - UdpActiveGossipMessage activeGossipMessage = (UdpActiveGossipMessage) base; - for (int i = 0; i < activeGossipMessage.getMembers().size(); i++) { - URI u = null; - try { - u = new URI(activeGossipMessage.getMembers().get(i).getUri()); - } catch (URISyntaxException e) { - LOGGER.debug("Gossip message with faulty URI", e); - continue; - } - RemoteGossipMember member = new RemoteGossipMember( - activeGossipMessage.getMembers().get(i).getCluster(), - u, - activeGossipMessage.getMembers().get(i).getId(), - activeGossipMessage.getMembers().get(i).getHeartbeat(), - activeGossipMessage.getMembers().get(i).getProperties()); - if (i == 0) { - senderMember = member; - } - if (!(member.getClusterName().equals(gossipManager.getMyself().getClusterName()))){ - UdpNotAMemberFault f = new UdpNotAMemberFault(); - f.setException("Not a member of this cluster " + i); - f.setUriFrom(activeGossipMessage.getUriFrom()); - f.setUuid(activeGossipMessage.getUuid()); - LOGGER.warn(f); - sendOneWay(f, member.getUri()); - continue; - } - remoteGossipMembers.add(member); - } - UdpActiveGossipOk o = new UdpActiveGossipOk(); - o.setUriFrom(activeGossipMessage.getUriFrom()); - o.setUuid(activeGossipMessage.getUuid()); - sendOneWay(o, senderMember.getUri()); - mergeLists(gossipManager, senderMember, remoteGossipMembers); + + public void receive(Base base) { + if (!gossipManager.getMessageInvoker().invoke(this, gossipManager, base)) { + LOGGER.warn("received message can not be handled"); } } - + /** - * Sends a blocking message. + * Sends a blocking message. * @param message * @param uri * @throws RuntimeException if data can not be serialized or in transmission error @@ -277,15 +192,15 @@ public class GossipCore implements GossipCoreConstants { } catch (IOException e) { tranmissionException.mark(); throw new RuntimeException(e); - } + } } - + public Response send(Base message, URI uri){ if (LOGGER.isDebugEnabled()){ - LOGGER.debug("Sending " + message); + LOGGER.debug("Sending " + message); LOGGER.debug("Current request queue " + requests); } - + final Trackable t; if (message instanceof Trackable){ t = (Trackable) message; @@ -307,12 +222,12 @@ public class GossipCore implements GossipCoreConstants { try { Thread.sleep(0, 555555); } catch (InterruptedException e) { - + } } } }); - + try { //TODO this needs to be a setting base on attempts/second return response.get(1, TimeUnit.SECONDS); @@ -324,14 +239,14 @@ public class GossipCore implements GossipCoreConstants { } catch (TimeoutException e) { boolean cancelled = response.cancel(true); LOGGER.debug(String.format("Threadpool timeout attempting to contact %s, cancelled ? %b", uri.toString(), cancelled)); - return null; + return null; } finally { if (t != null){ requests.remove(t.getUuid() + "/" + t.getUriFrom()); } } } - + /** * Sends a message across the network while blocking. Catches and ignores IOException in transmission. Used * when the protocol for the message is not to wait for a response @@ -359,21 +274,25 @@ public class GossipCore implements GossipCoreConstants { DatagramPacket datagramPacket = new DatagramPacket(json_bytes, json_bytes.length, dest, u.getPort()); socket.send(datagramPacket); tranmissionSuccess.mark(); - } catch (IOException ex) { + } catch (IOException ex) { tranmissionException.mark(); LOGGER.debug("Send one way failed", ex); } } + public void addRequest(String k, Base v) { + requests.put(k, v); + } + /** * Merge lists from remote members and update heartbeats - * + * * @param gossipManager * @param senderMember * @param remoteList - * + * */ - protected void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember, + public void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember, List remoteList) { if (LOGGER.isDebugEnabled()){ debugState(senderMember, remoteList); @@ -390,10 +309,10 @@ public class GossipCore implements GossipCoreConstants { if (remoteMember.getId().equals(gossipManager.getMyself().getId())) { continue; } - LocalGossipMember aNewMember = new LocalGossipMember(remoteMember.getClusterName(), - remoteMember.getUri(), - remoteMember.getId(), - remoteMember.getHeartbeat(), + LocalGossipMember aNewMember = new LocalGossipMember(remoteMember.getClusterName(), + remoteMember.getUri(), + remoteMember.getId(), + remoteMember.getHeartbeat(), remoteMember.getProperties(), gossipManager.getSettings().getWindowSize(), gossipManager.getSettings().getMinimumSamples(), @@ -414,11 +333,11 @@ public class GossipCore implements GossipCoreConstants { debugState(senderMember, remoteList); } } - + private void debugState(RemoteGossipMember senderMember, List remoteList){ LOGGER.warn( - "-----------------------\n" + + "-----------------------\n" + "Me " + gossipManager.getMyself() + "\n" + "Sender " + senderMember + "\n" + "RemoteList " + remoteList + "\n" + @@ -426,5 +345,4 @@ public class GossipCore implements GossipCoreConstants { "Dead " + gossipManager.getDeadMembers()+ "\n" + "======================="); } - } diff --git a/src/main/java/org/apache/gossip/manager/GossipManager.java b/src/main/java/org/apache/gossip/manager/GossipManager.java index 9221aa6..ab8e4ae 100644 --- a/src/main/java/org/apache/gossip/manager/GossipManager.java +++ b/src/main/java/org/apache/gossip/manager/GossipManager.java @@ -19,6 +19,17 @@ package org.apache.gossip.manager; import com.codahale.metrics.MetricRegistry; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.gossip.GossipMember; +import org.apache.gossip.GossipSettings; +import org.apache.gossip.LocalGossipMember; +import org.apache.gossip.event.GossipListener; +import org.apache.gossip.event.GossipState; +import org.apache.gossip.manager.handlers.MessageInvoker; +import org.apache.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread; +import org.apache.gossip.model.GossipDataMessage; +import org.apache.gossip.model.SharedGossipDataMessage; +import org.apache.gossip.model.ShutdownMessage; +import org.apache.log4j.Logger; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; @@ -28,28 +39,10 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Objects; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; -import org.apache.log4j.Logger; - -import org.apache.gossip.GossipMember; -import org.apache.gossip.GossipSettings; -import org.apache.gossip.LocalGossipMember; -import org.apache.gossip.event.GossipListener; -import org.apache.gossip.event.GossipState; -import org.apache.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread; - -import org.apache.gossip.model.GossipDataMessage; -import org.apache.gossip.model.SharedGossipDataMessage; -import org.apache.gossip.model.ShutdownMessage; - public abstract class GossipManager { @@ -71,11 +64,15 @@ public abstract class GossipManager { private final RingStatePersister ringState; private final UserDataPersister userDataState; private final ObjectMapper objectMapper; - + + private final MessageInvoker messageInvoker; + public GossipManager(String cluster, - URI uri, String id, Map properties, GossipSettings settings, - List gossipMembers, GossipListener listener, MetricRegistry registry, ObjectMapper objectMapper) { + URI uri, String id, Map properties, GossipSettings settings, + List gossipMembers, GossipListener listener, MetricRegistry registry, + ObjectMapper objectMapper, MessageInvoker messageInvoker) { this.settings = settings; + this.messageInvoker = messageInvoker; clock = new SystemClock(); me = new LocalGossipMember(cluster, uri, id, clock.nanoTime(), properties, settings.getWindowSize(), settings.getMinimumSamples(), settings.getDistribution()); @@ -104,6 +101,10 @@ public abstract class GossipManager { readSavedDataState(); } + public MessageInvoker getMessageInvoker() { + return messageInvoker; + } + public ConcurrentSkipListMap getMembers() { return members; } diff --git a/src/main/java/org/apache/gossip/manager/handlers/ActiveGossipMessageHandler.java b/src/main/java/org/apache/gossip/manager/handlers/ActiveGossipMessageHandler.java new file mode 100644 index 0000000..54aa40c --- /dev/null +++ b/src/main/java/org/apache/gossip/manager/handlers/ActiveGossipMessageHandler.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.manager.handlers; + +import org.apache.gossip.GossipMember; +import org.apache.gossip.RemoteGossipMember; +import org.apache.gossip.manager.GossipCore; +import org.apache.gossip.manager.GossipManager; +import org.apache.gossip.model.Base; +import org.apache.gossip.udp.UdpActiveGossipMessage; +import org.apache.gossip.udp.UdpActiveGossipOk; +import org.apache.gossip.udp.UdpNotAMemberFault; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.List; + +public class ActiveGossipMessageHandler implements MessageHandler { + @Override + public void invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) { + List remoteGossipMembers = new ArrayList<>(); + RemoteGossipMember senderMember = null; + UdpActiveGossipMessage activeGossipMessage = (UdpActiveGossipMessage) base; + for (int i = 0; i < activeGossipMessage.getMembers().size(); i++) { + URI u; + try { + u = new URI(activeGossipMessage.getMembers().get(i).getUri()); + } catch (URISyntaxException e) { + GossipCore.LOGGER.debug("Gossip message with faulty URI", e); + continue; + } + RemoteGossipMember member = new RemoteGossipMember( + activeGossipMessage.getMembers().get(i).getCluster(), + u, + activeGossipMessage.getMembers().get(i).getId(), + activeGossipMessage.getMembers().get(i).getHeartbeat(), + activeGossipMessage.getMembers().get(i).getProperties()); + if (i == 0) { + senderMember = member; + } + if (!(member.getClusterName().equals(gossipManager.getMyself().getClusterName()))) { + UdpNotAMemberFault f = new UdpNotAMemberFault(); + f.setException("Not a member of this cluster " + i); + f.setUriFrom(activeGossipMessage.getUriFrom()); + f.setUuid(activeGossipMessage.getUuid()); + GossipCore.LOGGER.warn(f); + gossipCore.sendOneWay(f, member.getUri()); + continue; + } + remoteGossipMembers.add(member); + } + UdpActiveGossipOk o = new UdpActiveGossipOk(); + o.setUriFrom(activeGossipMessage.getUriFrom()); + o.setUuid(activeGossipMessage.getUuid()); + gossipCore.sendOneWay(o, senderMember.getUri()); + gossipCore.mergeLists(gossipManager, senderMember, remoteGossipMembers); + } +} diff --git a/src/main/java/org/apache/gossip/manager/handlers/DefaultMessageInvoker.java b/src/main/java/org/apache/gossip/manager/handlers/DefaultMessageInvoker.java new file mode 100644 index 0000000..034691d --- /dev/null +++ b/src/main/java/org/apache/gossip/manager/handlers/DefaultMessageInvoker.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gossip.manager.handlers; + +import org.apache.gossip.manager.GossipCore; +import org.apache.gossip.manager.GossipManager; +import org.apache.gossip.model.*; + +public class DefaultMessageInvoker implements MessageInvoker { + private final MessageInvokerCombiner mic; + + public DefaultMessageInvoker() { + mic = new MessageInvokerCombiner(); + mic.add(new SimpleMessageInvoker(Response.class, new ResponseHandler())); + mic.add(new SimpleMessageInvoker(ShutdownMessage.class, new ShutdownMessageHandler())); + mic.add(new SimpleMessageInvoker(GossipDataMessage.class, new GossipDataMessageHandler())); + mic.add(new SimpleMessageInvoker(SharedGossipDataMessage.class, new SharedGossipDataMessageHandler())); + mic.add(new SimpleMessageInvoker(ActiveGossipMessage.class, new ActiveGossipMessageHandler())); + } + + public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) { + return mic.invoke(gossipCore, gossipManager, base); + } +} diff --git a/src/main/java/org/apache/gossip/manager/handlers/GossipDataMessageHandler.java b/src/main/java/org/apache/gossip/manager/handlers/GossipDataMessageHandler.java new file mode 100644 index 0000000..edf2579 --- /dev/null +++ b/src/main/java/org/apache/gossip/manager/handlers/GossipDataMessageHandler.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.manager.handlers; + +import org.apache.gossip.manager.GossipCore; +import org.apache.gossip.manager.GossipManager; +import org.apache.gossip.model.Base; +import org.apache.gossip.udp.UdpGossipDataMessage; + +public class GossipDataMessageHandler implements MessageHandler { + @Override + public void invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) { + UdpGossipDataMessage message = (UdpGossipDataMessage) base; + gossipCore.addPerNodeData(message); + } +} diff --git a/src/main/java/org/apache/gossip/manager/handlers/MessageHandler.java b/src/main/java/org/apache/gossip/manager/handlers/MessageHandler.java new file mode 100644 index 0000000..4b5d49d --- /dev/null +++ b/src/main/java/org/apache/gossip/manager/handlers/MessageHandler.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.manager.handlers; + +import org.apache.gossip.manager.GossipCore; +import org.apache.gossip.manager.GossipManager; +import org.apache.gossip.model.Base; + +public interface MessageHandler { + void invoke(GossipCore gossipCore, GossipManager gossipManager, Base base); +} \ No newline at end of file diff --git a/src/main/java/org/apache/gossip/manager/handlers/MessageInvoker.java b/src/main/java/org/apache/gossip/manager/handlers/MessageInvoker.java new file mode 100644 index 0000000..b4a39e3 --- /dev/null +++ b/src/main/java/org/apache/gossip/manager/handlers/MessageInvoker.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.manager.handlers; + +import org.apache.gossip.manager.GossipCore; +import org.apache.gossip.manager.GossipManager; +import org.apache.gossip.model.Base; + +public interface MessageInvoker { + boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base); +} diff --git a/src/main/java/org/apache/gossip/manager/handlers/MessageInvokerCombiner.java b/src/main/java/org/apache/gossip/manager/handlers/MessageInvokerCombiner.java new file mode 100644 index 0000000..7b654f6 --- /dev/null +++ b/src/main/java/org/apache/gossip/manager/handlers/MessageInvokerCombiner.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gossip.manager.handlers; + +import org.apache.gossip.manager.GossipCore; +import org.apache.gossip.manager.GossipManager; +import org.apache.gossip.model.Base; + +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; + +public class MessageInvokerCombiner implements MessageInvoker { + private List invokers; + + public MessageInvokerCombiner() { + } + + public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) { + if (invokers == null) { + return false; + } + boolean result = false; + for (MessageInvoker mi : invokers) { + result = mi.invoke(gossipCore, gossipManager, base) || result; + } + return result; + } + + public void clear() { + invokers = null; + } + + public void add(MessageInvoker mi) { + if (mi == null) { + throw new NullPointerException(); + } + if (invokers == null) { + invokers = new CopyOnWriteArrayList<>(); + } + invokers.add(mi); + } +} diff --git a/src/main/java/org/apache/gossip/manager/handlers/ResponseHandler.java b/src/main/java/org/apache/gossip/manager/handlers/ResponseHandler.java new file mode 100644 index 0000000..ad1c2aa --- /dev/null +++ b/src/main/java/org/apache/gossip/manager/handlers/ResponseHandler.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.manager.handlers; + +import org.apache.gossip.manager.GossipCore; +import org.apache.gossip.manager.GossipManager; +import org.apache.gossip.model.Base; +import org.apache.gossip.model.GossipDataMessage; +import org.apache.gossip.model.ShutdownMessage; +import org.apache.gossip.udp.Trackable; + +public class ResponseHandler implements MessageHandler { + @Override + public void invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) { + if (base instanceof Trackable) { + Trackable t = (Trackable) base; + gossipCore.addRequest(t.getUuid() + "/" + t.getUriFrom(), (Base) t); + } + } +} diff --git a/src/main/java/org/apache/gossip/manager/handlers/SharedGossipDataMessageHandler.java b/src/main/java/org/apache/gossip/manager/handlers/SharedGossipDataMessageHandler.java new file mode 100644 index 0000000..e9d5343 --- /dev/null +++ b/src/main/java/org/apache/gossip/manager/handlers/SharedGossipDataMessageHandler.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.manager.handlers; + +import org.apache.gossip.manager.GossipCore; +import org.apache.gossip.manager.GossipManager; +import org.apache.gossip.model.Base; +import org.apache.gossip.udp.UdpSharedGossipDataMessage; + +public class SharedGossipDataMessageHandler implements MessageHandler{ + @Override + public void invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) { + UdpSharedGossipDataMessage message = (UdpSharedGossipDataMessage) base; + gossipCore.addSharedData(message); + } +} diff --git a/src/main/java/org/apache/gossip/manager/handlers/ShutdownMessageHandler.java b/src/main/java/org/apache/gossip/manager/handlers/ShutdownMessageHandler.java new file mode 100644 index 0000000..c4adea2 --- /dev/null +++ b/src/main/java/org/apache/gossip/manager/handlers/ShutdownMessageHandler.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.manager.handlers; + +import org.apache.gossip.manager.GossipCore; +import org.apache.gossip.manager.GossipManager; +import org.apache.gossip.model.Base; +import org.apache.gossip.model.GossipDataMessage; +import org.apache.gossip.model.ShutdownMessage; + +public class ShutdownMessageHandler implements MessageHandler { + @Override + public void invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) { + ShutdownMessage s = (ShutdownMessage) base; + GossipDataMessage m = new GossipDataMessage(); + m.setKey(ShutdownMessage.PER_NODE_KEY); + m.setNodeId(s.getNodeId()); + m.setPayload(base); + m.setTimestamp(System.currentTimeMillis()); + m.setExpireAt(System.currentTimeMillis() + 30L * 1000L); + gossipCore.addPerNodeData(m); + } +} diff --git a/src/main/java/org/apache/gossip/manager/handlers/SimpleMessageInvoker.java b/src/main/java/org/apache/gossip/manager/handlers/SimpleMessageInvoker.java new file mode 100644 index 0000000..0f410d2 --- /dev/null +++ b/src/main/java/org/apache/gossip/manager/handlers/SimpleMessageInvoker.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.manager.handlers; + +import org.apache.gossip.manager.GossipCore; +import org.apache.gossip.manager.GossipManager; +import org.apache.gossip.model.Base; + +public class SimpleMessageInvoker implements MessageInvoker { + final private Class messageClass; + final private MessageHandler messageHandler; + + public SimpleMessageInvoker(Class messageClass, MessageHandler messageHandler) { + if (messageClass == null || messageHandler == null) { + throw new NullPointerException(); + } + this.messageClass = messageClass; + this.messageHandler = messageHandler; + } + + @Override + public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) { + if (messageClass.isAssignableFrom(base.getClass())) { + messageHandler.invoke(gossipCore, gossipManager, base); + return true; + } else { + return false; + } + } +} diff --git a/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java b/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java index 00e3378..bf8a8c3 100644 --- a/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java +++ b/src/main/java/org/apache/gossip/manager/random/RandomGossipManager.java @@ -19,17 +19,18 @@ package org.apache.gossip.manager.random; import com.codahale.metrics.MetricRegistry; import com.fasterxml.jackson.databind.ObjectMapper; - import org.apache.gossip.GossipMember; import org.apache.gossip.GossipSettings; import org.apache.gossip.event.GossipListener; import org.apache.gossip.manager.GossipManager; +import org.apache.gossip.manager.handlers.DefaultMessageInvoker; +import org.apache.gossip.manager.handlers.MessageInvoker; import java.net.URI; -import java.util.List; -import java.util.Map; import java.util.ArrayList; import java.util.HashMap; +import java.util.List; +import java.util.Map; public class RandomGossipManager extends GossipManager { @@ -47,6 +48,7 @@ public class RandomGossipManager extends GossipManager { private MetricRegistry registry; private Map properties; private ObjectMapper objectMapper; + private MessageInvoker messageInvoker; private ManagerBuilder() {} @@ -100,7 +102,12 @@ public class RandomGossipManager extends GossipManager { this.objectMapper = objectMapper; return this; } - + + public ManagerBuilder messageInvoker(MessageInvoker messageInvoker) { + this.messageInvoker = messageInvoker; + return this; + } + public RandomGossipManager build() { checkArgument(id != null, "You must specify an id"); checkArgument(cluster != null, "You must specify a cluster name"); @@ -120,12 +127,15 @@ public class RandomGossipManager extends GossipManager { objectMapper = new ObjectMapper(); objectMapper.enableDefaultTyping(); } - return new RandomGossipManager(cluster, uri, id, properties, settings, gossipMembers, listener, registry, objectMapper); + if (messageInvoker == null) { + messageInvoker = new DefaultMessageInvoker(); + } + return new RandomGossipManager(cluster, uri, id, properties, settings, gossipMembers, listener, registry, objectMapper, messageInvoker); } } private RandomGossipManager(String cluster, URI uri, String id, Map properties, GossipSettings settings, - List gossipMembers, GossipListener listener, MetricRegistry registry, ObjectMapper objectMapper) { - super(cluster, uri, id, properties, settings, gossipMembers, listener, registry, objectMapper); + List gossipMembers, GossipListener listener, MetricRegistry registry, ObjectMapper objectMapper, MessageInvoker messageInvoker) { + super(cluster, uri, id, properties, settings, gossipMembers, listener, registry, objectMapper, messageInvoker); } } diff --git a/src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java b/src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java index cf38492..2d04087 100644 --- a/src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java +++ b/src/test/java/org/apache/gossip/manager/RandomGossipManagerBuilderTest.java @@ -21,11 +21,17 @@ import com.codahale.metrics.MetricRegistry; import org.apache.gossip.GossipMember; import org.apache.gossip.GossipSettings; import org.apache.gossip.LocalGossipMember; +import org.apache.gossip.manager.handlers.DefaultMessageInvoker; +import org.apache.gossip.manager.handlers.MessageInvoker; +import org.apache.gossip.manager.handlers.ResponseHandler; +import org.apache.gossip.manager.handlers.SimpleMessageInvoker; import org.apache.gossip.manager.random.RandomGossipManager; +import org.junit.Assert; import org.junit.jupiter.api.Test; import org.junit.platform.runner.JUnitPlatform; import org.junit.runner.RunWith; +import javax.xml.ws.Response; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; @@ -71,6 +77,31 @@ public class RandomGossipManagerBuilderTest { assertNotNull(gossipManager.getLiveMembers()); } + @Test + public void createDefaultMessageInvokerIfNull() throws URISyntaxException { + RandomGossipManager gossipManager = RandomGossipManager.newBuilder() + .withId("id") + .cluster("aCluster") + .uri(new URI("udp://localhost:2000")) + .settings(new GossipSettings()) + .messageInvoker(null).registry(new MetricRegistry()).build(); + assertNotNull(gossipManager.getMessageInvoker()); + Assert.assertEquals(gossipManager.getMessageInvoker().getClass(), new DefaultMessageInvoker().getClass()); + } + + @Test + public void testMessageInvokerKeeping() throws URISyntaxException { + MessageInvoker mi = new SimpleMessageInvoker(Response.class, new ResponseHandler()); + RandomGossipManager gossipManager = RandomGossipManager.newBuilder() + .withId("id") + .cluster("aCluster") + .uri(new URI("udp://localhost:2000")) + .settings(new GossipSettings()) + .messageInvoker(mi).registry(new MetricRegistry()).build(); + assertNotNull(gossipManager.getMessageInvoker()); + Assert.assertEquals(gossipManager.getMessageInvoker(), mi); + } + @Test public void useMemberListIfProvided() throws URISyntaxException { LocalGossipMember member = new LocalGossipMember( diff --git a/src/test/java/org/apache/gossip/manager/handlers/MessageInvokerTest.java b/src/test/java/org/apache/gossip/manager/handlers/MessageInvokerTest.java new file mode 100644 index 0000000..d402d59 --- /dev/null +++ b/src/test/java/org/apache/gossip/manager/handlers/MessageInvokerTest.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.manager.handlers; + +import org.apache.gossip.manager.GossipCore; +import org.apache.gossip.manager.GossipManager; +import org.apache.gossip.model.ActiveGossipMessage; +import org.apache.gossip.model.Base; +import org.apache.gossip.udp.UdpSharedGossipDataMessage; +import org.junit.Assert; +import org.junit.Test; + +public class MessageInvokerTest { + private class FakeMessage extends Base { + public FakeMessage() { + } + } + + private class FakeMessageData extends Base { + public int data; + + public FakeMessageData(int data) { + this.data = data; + } + } + + private class FakeMessageDataHandler implements MessageHandler { + public int data; + + public FakeMessageDataHandler() { + data = 0; + } + + public void invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) { + data = ((FakeMessageData) base).data; + } + } + + private class FakeMessageHandler implements MessageHandler { + public int counter; + + public FakeMessageHandler() { + counter = 0; + } + + public void invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) { + counter++; + } + } + + @Test + public void testSimpleInvoker() { + MessageInvoker mi = new SimpleMessageInvoker(FakeMessage.class, new FakeMessageHandler()); + Assert.assertTrue(mi.invoke(null, null, new FakeMessage())); + Assert.assertFalse(mi.invoke(null, null, new ActiveGossipMessage())); + } + + @Test(expected = NullPointerException.class) + public void testSimpleInvokerNullClassConstructor() { + new SimpleMessageInvoker(null, new FakeMessageHandler()); + } + + @Test(expected = NullPointerException.class) + public void testSimpleInvokerNullHandlerConstructor() { + new SimpleMessageInvoker(FakeMessage.class, null); + } + + @Test + public void testCallCountSimpleInvoker() { + FakeMessageHandler h = new FakeMessageHandler(); + MessageInvoker mi = new SimpleMessageInvoker(FakeMessage.class, h); + mi.invoke(null, null, new FakeMessage()); + Assert.assertEquals(1, h.counter); + mi.invoke(null, null, new ActiveGossipMessage()); + Assert.assertEquals(1, h.counter); + mi.invoke(null, null, new FakeMessage()); + Assert.assertEquals(2, h.counter); + } + + @Test(expected = NullPointerException.class) + public void cantAddNullInvoker() { + MessageInvokerCombiner mi = new MessageInvokerCombiner(); + mi.add(null); + } + + @Test + public void testCombinerClear() { + MessageInvokerCombiner mi = new MessageInvokerCombiner(); + mi.add(new SimpleMessageInvoker(FakeMessage.class, new FakeMessageHandler())); + Assert.assertTrue(mi.invoke(null, null, new FakeMessage())); + + mi.clear(); + Assert.assertFalse(mi.invoke(null, null, new FakeMessage())); + } + + @Test + public void testMessageInvokerCombiner() { + //Empty combiner - false result + MessageInvokerCombiner mi = new MessageInvokerCombiner(); + Assert.assertFalse(mi.invoke(null, null, new Base())); + + FakeMessageHandler h = new FakeMessageHandler(); + mi.add(new SimpleMessageInvoker(FakeMessage.class, h)); + mi.add(new SimpleMessageInvoker(FakeMessage.class, h)); + + Assert.assertTrue(mi.invoke(null, null, new FakeMessage())); + Assert.assertFalse(mi.invoke(null, null, new ActiveGossipMessage())); + Assert.assertEquals(2, h.counter); + + //Increase size in runtime. Should be 3 calls: 2+3 = 5 + mi.add(new SimpleMessageInvoker(FakeMessage.class, h)); + Assert.assertTrue(mi.invoke(null, null, new FakeMessage())); + Assert.assertEquals(5, h.counter); + } + + @Test + public void testMessageInvokerCombiner2levels() { + MessageInvokerCombiner mi = new MessageInvokerCombiner(); + FakeMessageHandler h = new FakeMessageHandler(); + + MessageInvokerCombiner mi1 = new MessageInvokerCombiner(); + mi1.add(new SimpleMessageInvoker(FakeMessage.class, h)); + mi1.add(new SimpleMessageInvoker(FakeMessage.class, h)); + + MessageInvokerCombiner mi2 = new MessageInvokerCombiner(); + mi2.add(new SimpleMessageInvoker(FakeMessage.class, h)); + mi2.add(new SimpleMessageInvoker(FakeMessage.class, h)); + + mi.add(mi1); + mi.add(mi2); + + Assert.assertTrue(mi.invoke(null, null, new FakeMessage())); + Assert.assertEquals(4, h.counter); + } + + @Test + public void testMessageInvokerCombinerDataShipping() { + MessageInvokerCombiner mi = new MessageInvokerCombiner(); + FakeMessageDataHandler h = new FakeMessageDataHandler(); + mi.add(new SimpleMessageInvoker(FakeMessageData.class, h)); + + Assert.assertTrue(mi.invoke(null, null, new FakeMessageData(101))); + Assert.assertEquals(101, h.data); + } + + @Test + public void testCombiningDefaultInvoker() { + MessageInvokerCombiner mi = new MessageInvokerCombiner(); + mi.add(new DefaultMessageInvoker()); + mi.add(new SimpleMessageInvoker(FakeMessage.class, new FakeMessageHandler())); + //UdpSharedGossipDataMessage with null gossipCore -> exception + boolean thrown = false; + try { + mi.invoke(null, null, new UdpSharedGossipDataMessage()); + } catch (NullPointerException e) { + thrown = true; + } + Assert.assertTrue(thrown); + //DefaultInvoker skips FakeMessage and FakeHandler works ok + Assert.assertTrue(mi.invoke(null, null, new FakeMessage())); + } + +}