GOSSIP-80 Sundry cleanups

* remove redundant parameter from method call.
* remove uncessary threadpool.
* Simplify `GossipCore.sendOneWay()`
* Cleanup useage of `MessageInvoker`
  * `DefaultMessageInvoker` replaced by a factory
  * `MessageInvokerCombiner` replaced by same factory
  * Alter `MessageInvokerTest` to not rely on specific types
  * Remove type assertion from `GossipManagerBuilderTest`
* Merge `MessageInvoker` with `MessageHandler`
  * This required changing method signature return type from `void` to `boolean`.
This commit is contained in:
Gary Dusbabek
2017-04-13 13:02:48 -05:00
parent 298b1ae3ae
commit c544b8bf16
16 changed files with 202 additions and 242 deletions

View File

@ -57,7 +57,6 @@ public class GossipCore implements GossipCoreConstants {
public static final Logger LOGGER = Logger.getLogger(GossipCore.class); public static final Logger LOGGER = Logger.getLogger(GossipCore.class);
private final GossipManager gossipManager; private final GossipManager gossipManager;
private ConcurrentHashMap<String, LatchAndBase> requests; private ConcurrentHashMap<String, LatchAndBase> requests;
private ThreadPoolExecutor service;
private final ConcurrentHashMap<String, ConcurrentHashMap<String, PerNodeDataMessage>> perNodeData; private final ConcurrentHashMap<String, ConcurrentHashMap<String, PerNodeDataMessage>> perNodeData;
private final ConcurrentHashMap<String, SharedDataMessage> sharedData; private final ConcurrentHashMap<String, SharedDataMessage> sharedData;
private final BlockingQueue<Runnable> workQueue; private final BlockingQueue<Runnable> workQueue;
@ -71,15 +70,12 @@ public class GossipCore implements GossipCoreConstants {
this.gossipManager = manager; this.gossipManager = manager;
requests = new ConcurrentHashMap<>(); requests = new ConcurrentHashMap<>();
workQueue = new ArrayBlockingQueue<>(1024); workQueue = new ArrayBlockingQueue<>(1024);
service = new ThreadPoolExecutor(1, 5, 1, TimeUnit.SECONDS, workQueue, new ThreadPoolExecutor.DiscardOldestPolicy());
perNodeData = new ConcurrentHashMap<>(); perNodeData = new ConcurrentHashMap<>();
sharedData = new ConcurrentHashMap<>(); sharedData = new ConcurrentHashMap<>();
metrics.register(WORKQUEUE_SIZE, (Gauge<Integer>)() -> workQueue.size()); metrics.register(WORKQUEUE_SIZE, (Gauge<Integer>)() -> workQueue.size());
metrics.register(PER_NODE_DATA_SIZE, (Gauge<Integer>)() -> perNodeData.size()); metrics.register(PER_NODE_DATA_SIZE, (Gauge<Integer>)() -> perNodeData.size());
metrics.register(SHARED_DATA_SIZE, (Gauge<Integer>)() -> sharedData.size()); metrics.register(SHARED_DATA_SIZE, (Gauge<Integer>)() -> sharedData.size());
metrics.register(REQUEST_SIZE, (Gauge<Integer>)() -> requests.size()); metrics.register(REQUEST_SIZE, (Gauge<Integer>)() -> requests.size());
metrics.register(THREADPOOL_ACTIVE, (Gauge<Integer>)() -> service.getActiveCount());
metrics.register(THREADPOOL_SIZE, (Gauge<Integer>)() -> service.getPoolSize());
messageSerdeException = metrics.meter(MESSAGE_SERDE_EXCEPTION); messageSerdeException = metrics.meter(MESSAGE_SERDE_EXCEPTION);
tranmissionException = metrics.meter(MESSAGE_TRANSMISSION_EXCEPTION); tranmissionException = metrics.meter(MESSAGE_TRANSMISSION_EXCEPTION);
tranmissionSuccess = metrics.meter(MESSAGE_TRANSMISSION_SUCCESS); tranmissionSuccess = metrics.meter(MESSAGE_TRANSMISSION_SUCCESS);
@ -178,17 +174,10 @@ public class GossipCore implements GossipCoreConstants {
} }
public void shutdown(){ public void shutdown(){
service.shutdown();
try {
service.awaitTermination(1, TimeUnit.SECONDS);
} catch (InterruptedException e) {
LOGGER.warn(e);
}
service.shutdownNow();
} }
public void receive(Base base) { public void receive(Base base) {
if (!gossipManager.getMessageInvoker().invoke(this, gossipManager, base)) { if (!gossipManager.getMessageHandler().invoke(this, gossipManager, base)) {
LOGGER.warn("received message can not be handled"); LOGGER.warn("received message can not be handled");
} }
} }
@ -269,28 +258,9 @@ public class GossipCore implements GossipCoreConstants {
* @param u the uri to send it to * @param u the uri to send it to
*/ */
public void sendOneWay(Base message, URI u) { public void sendOneWay(Base message, URI u) {
byte[] json_bytes;
try { try {
if (privKey == null){ sendInternal(message, u);
json_bytes = gossipManager.getObjectMapper().writeValueAsBytes(message); } catch (RuntimeException ex) {
} else {
SignedPayload p = new SignedPayload();
p.setData(gossipManager.getObjectMapper().writeValueAsString(message).getBytes());
p.setSignature(sign(p.getData()));
json_bytes = gossipManager.getObjectMapper().writeValueAsBytes(p);
}
} catch (IOException e) {
messageSerdeException.mark();
throw new RuntimeException(e);
}
try (DatagramSocket socket = new DatagramSocket()) {
socket.setSoTimeout(gossipManager.getSettings().getGossipInterval() * 2);
InetAddress dest = InetAddress.getByName(u.getHost());
DatagramPacket datagramPacket = new DatagramPacket(json_bytes, json_bytes.length, dest, u.getPort());
socket.send(datagramPacket);
tranmissionSuccess.mark();
} catch (IOException ex) {
tranmissionException.mark();
LOGGER.debug("Send one way failed", ex); LOGGER.debug("Send one way failed", ex);
} }
} }
@ -304,13 +274,11 @@ public class GossipCore implements GossipCoreConstants {
/** /**
* Merge lists from remote members and update heartbeats * Merge lists from remote members and update heartbeats
* *
* @param gossipManager
* @param senderMember * @param senderMember
* @param remoteList * @param remoteList
* *
*/ */
public void mergeLists(GossipManager gossipManager, RemoteMember senderMember, public void mergeLists(RemoteMember senderMember, List<Member> remoteList) {
List<Member> remoteList) {
if (LOGGER.isDebugEnabled()){ if (LOGGER.isDebugEnabled()){
debugState(senderMember, remoteList); debugState(senderMember, remoteList);
} }

View File

@ -25,7 +25,7 @@ import org.apache.gossip.Member;
import org.apache.gossip.crdt.Crdt; import org.apache.gossip.crdt.Crdt;
import org.apache.gossip.event.GossipListener; import org.apache.gossip.event.GossipListener;
import org.apache.gossip.event.GossipState; import org.apache.gossip.event.GossipState;
import org.apache.gossip.manager.handlers.MessageInvoker; import org.apache.gossip.manager.handlers.MessageHandler;
import org.apache.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread; import org.apache.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread;
import org.apache.gossip.model.PerNodeDataMessage; import org.apache.gossip.model.PerNodeDataMessage;
import org.apache.gossip.model.SharedDataMessage; import org.apache.gossip.model.SharedDataMessage;
@ -64,14 +64,14 @@ public abstract class GossipManager {
private final GossipMemberStateRefresher memberStateRefresher; private final GossipMemberStateRefresher memberStateRefresher;
private final ObjectMapper objectMapper; private final ObjectMapper objectMapper;
private final MessageInvoker messageInvoker; private final MessageHandler messageHandler;
public GossipManager(String cluster, public GossipManager(String cluster,
URI uri, String id, Map<String, String> properties, GossipSettings settings, URI uri, String id, Map<String, String> properties, GossipSettings settings,
List<Member> gossipMembers, GossipListener listener, MetricRegistry registry, List<Member> gossipMembers, GossipListener listener, MetricRegistry registry,
ObjectMapper objectMapper, MessageInvoker messageInvoker) { ObjectMapper objectMapper, MessageHandler messageHandler) {
this.settings = settings; this.settings = settings;
this.messageInvoker = messageInvoker; this.messageHandler = messageHandler;
clock = new SystemClock(); clock = new SystemClock();
me = new LocalMember(cluster, uri, id, clock.nanoTime(), properties, me = new LocalMember(cluster, uri, id, clock.nanoTime(), properties,
@ -101,8 +101,8 @@ public abstract class GossipManager {
readSavedDataState(); readSavedDataState();
} }
public MessageInvoker getMessageInvoker() { public MessageHandler getMessageHandler() {
return messageInvoker; return messageHandler;
} }
public ConcurrentSkipListMap<LocalMember, GossipState> getMembers() { public ConcurrentSkipListMap<LocalMember, GossipState> getMembers() {

View File

@ -25,8 +25,8 @@ import org.apache.gossip.GossipSettings;
import org.apache.gossip.StartupSettings; import org.apache.gossip.StartupSettings;
import org.apache.gossip.crdt.CrdtModule; import org.apache.gossip.crdt.CrdtModule;
import org.apache.gossip.event.GossipListener; import org.apache.gossip.event.GossipListener;
import org.apache.gossip.manager.handlers.DefaultMessageInvoker; import org.apache.gossip.manager.handlers.MessageHandler;
import org.apache.gossip.manager.handlers.MessageInvoker; import org.apache.gossip.manager.handlers.MessageHandlerFactory;
import java.net.URI; import java.net.URI;
import java.util.ArrayList; import java.util.ArrayList;
@ -50,7 +50,7 @@ public class GossipManagerBuilder {
private MetricRegistry registry; private MetricRegistry registry;
private Map<String,String> properties; private Map<String,String> properties;
private ObjectMapper objectMapper; private ObjectMapper objectMapper;
private MessageInvoker messageInvoker; private MessageHandler messageHandler;
private ManagerBuilder() {} private ManagerBuilder() {}
@ -114,8 +114,8 @@ public class GossipManagerBuilder {
return this; return this;
} }
public ManagerBuilder messageInvoker(MessageInvoker messageInvoker) { public ManagerBuilder messageHandler(MessageHandler messageHandler) {
this.messageInvoker = messageInvoker; this.messageHandler = messageHandler;
return this; return this;
} }
@ -142,10 +142,10 @@ public class GossipManagerBuilder {
objectMapper.registerModule(new CrdtModule()); objectMapper.registerModule(new CrdtModule());
objectMapper.configure(Feature.WRITE_NUMBERS_AS_STRINGS, false); objectMapper.configure(Feature.WRITE_NUMBERS_AS_STRINGS, false);
} }
if (messageInvoker == null) { if (messageHandler == null) {
messageInvoker = new DefaultMessageInvoker(); messageHandler = MessageHandlerFactory.defaultHandler();
} }
return new GossipManager(cluster, uri, id, properties, settings, gossipMembers, listener, registry, objectMapper, messageInvoker) {} ; return new GossipManager(cluster, uri, id, properties, settings, gossipMembers, listener, registry, objectMapper, messageHandler) {} ;
} }
} }

View File

@ -32,8 +32,15 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
public class ActiveGossipMessageHandler implements MessageHandler { public class ActiveGossipMessageHandler implements MessageHandler {
/**
* @param gossipCore context.
* @param gossipManager context.
* @param base message reference.
* @return boolean indicating success.
*/
@Override @Override
public void invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) { public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
List<Member> remoteGossipMembers = new ArrayList<>(); List<Member> remoteGossipMembers = new ArrayList<>();
RemoteMember senderMember = null; RemoteMember senderMember = null;
UdpActiveGossipMessage activeGossipMessage = (UdpActiveGossipMessage) base; UdpActiveGossipMessage activeGossipMessage = (UdpActiveGossipMessage) base;
@ -69,6 +76,7 @@ public class ActiveGossipMessageHandler implements MessageHandler {
o.setUriFrom(activeGossipMessage.getUriFrom()); o.setUriFrom(activeGossipMessage.getUriFrom());
o.setUuid(activeGossipMessage.getUuid()); o.setUuid(activeGossipMessage.getUuid());
gossipCore.sendOneWay(o, senderMember.getUri()); gossipCore.sendOneWay(o, senderMember.getUri());
gossipCore.mergeLists(gossipManager, senderMember, remoteGossipMembers); gossipCore.mergeLists(senderMember, remoteGossipMembers);
return true;
} }
} }

View File

@ -1,40 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.manager.handlers;
import org.apache.gossip.manager.GossipCore;
import org.apache.gossip.manager.GossipManager;
import org.apache.gossip.model.*;
public class DefaultMessageInvoker implements MessageInvoker {
private final MessageInvokerCombiner mic;
public DefaultMessageInvoker() {
mic = new MessageInvokerCombiner();
mic.add(new SimpleMessageInvoker(Response.class, new ResponseHandler()));
mic.add(new SimpleMessageInvoker(ShutdownMessage.class, new ShutdownMessageHandler()));
mic.add(new SimpleMessageInvoker(PerNodeDataMessage.class, new PerNodeDataMessageHandler()));
mic.add(new SimpleMessageInvoker(SharedDataMessage.class, new SharedDataMessageHandler()));
mic.add(new SimpleMessageInvoker(ActiveGossipMessage.class, new ActiveGossipMessageHandler()));
}
public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
return mic.invoke(gossipCore, gossipManager, base);
}
}

View File

@ -22,5 +22,11 @@ import org.apache.gossip.manager.GossipManager;
import org.apache.gossip.model.Base; import org.apache.gossip.model.Base;
public interface MessageHandler { public interface MessageHandler {
void invoke(GossipCore gossipCore, GossipManager gossipManager, Base base); /**
* @param gossipCore context.
* @param gossipManager context.
* @param base message reference.
* @return boolean indicating success.
*/
boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base);
} }

View File

@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.manager.handlers;
import org.apache.gossip.manager.GossipCore;
import org.apache.gossip.manager.GossipManager;
import org.apache.gossip.model.ActiveGossipMessage;
import org.apache.gossip.model.Base;
import org.apache.gossip.model.PerNodeDataMessage;
import org.apache.gossip.model.Response;
import org.apache.gossip.model.SharedDataMessage;
import org.apache.gossip.model.ShutdownMessage;
import java.util.Arrays;
public class MessageHandlerFactory {
public static MessageHandler defaultHandler() {
return concurrentHandler(
new TypedMessageHandler(Response.class, new ResponseHandler()),
new TypedMessageHandler(ShutdownMessage.class, new ShutdownMessageHandler()),
new TypedMessageHandler(PerNodeDataMessage.class, new PerNodeDataMessageHandler()),
new TypedMessageHandler(SharedDataMessage.class, new SharedDataMessageHandler()),
new TypedMessageHandler(ActiveGossipMessage.class, new ActiveGossipMessageHandler())
);
}
public static MessageHandler concurrentHandler(MessageHandler... handlers) {
if (handlers == null) throw new NullPointerException("handlers cannot be null");
if (Arrays.asList(handlers).stream().filter(i -> i != null).count() != handlers.length) {
throw new NullPointerException("found at least one null handler");
}
return new MessageHandler() {
@Override
public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
// return true if at least one of the component handlers return true.
return Arrays.asList(handlers).stream().filter((mi) -> mi.invoke(gossipCore, gossipManager, base)).count() > 0;
}
};
}
}

View File

@ -1,33 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.manager.handlers;
import org.apache.gossip.manager.GossipCore;
import org.apache.gossip.manager.GossipManager;
import org.apache.gossip.model.Base;
public interface MessageInvoker {
/**
*
* @param gossipCore
* @param gossipManager
* @param base
* @return true if the invoker processed the message type
*/
boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base);
}

View File

@ -1,48 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.manager.handlers;
import org.apache.gossip.manager.GossipCore;
import org.apache.gossip.manager.GossipManager;
import org.apache.gossip.model.Base;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
public class MessageInvokerCombiner implements MessageInvoker {
private final List<MessageInvoker> invokers = new CopyOnWriteArrayList<>();
public MessageInvokerCombiner() {
}
public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
return invokers.stream().filter((mi) -> mi.invoke(gossipCore, gossipManager, base)).count() > 0;
}
public void clear() {
invokers.clear();
}
public void add(MessageInvoker mi) {
if (mi == null) {
throw new NullPointerException();
}
invokers.add(mi);
}
}

View File

@ -23,9 +23,17 @@ import org.apache.gossip.model.Base;
import org.apache.gossip.udp.UdpPerNodeDataMessage; import org.apache.gossip.udp.UdpPerNodeDataMessage;
public class PerNodeDataMessageHandler implements MessageHandler { public class PerNodeDataMessageHandler implements MessageHandler {
/**
* @param gossipCore context.
* @param gossipManager context.
* @param base message reference.
* @return boolean indicating success.
*/
@Override @Override
public void invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) { public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
UdpPerNodeDataMessage message = (UdpPerNodeDataMessage) base; UdpPerNodeDataMessage message = (UdpPerNodeDataMessage) base;
gossipCore.addPerNodeData(message); gossipCore.addPerNodeData(message);
return true;
} }
} }

View File

@ -23,11 +23,20 @@ import org.apache.gossip.model.Base;
import org.apache.gossip.udp.Trackable; import org.apache.gossip.udp.Trackable;
public class ResponseHandler implements MessageHandler { public class ResponseHandler implements MessageHandler {
/**
* @param gossipCore context.
* @param gossipManager context.
* @param base message reference.
* @return boolean indicating success.
*/
@Override @Override
public void invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) { public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
if (base instanceof Trackable) { if (base instanceof Trackable) {
Trackable t = (Trackable) base; Trackable t = (Trackable) base;
gossipCore.handleResponse(t.getUuid() + "/" + t.getUriFrom(), (Base) t); gossipCore.handleResponse(t.getUuid() + "/" + t.getUriFrom(), (Base) t);
} return true;
}
return false;
} }
} }

View File

@ -23,9 +23,17 @@ import org.apache.gossip.model.Base;
import org.apache.gossip.udp.UdpSharedDataMessage; import org.apache.gossip.udp.UdpSharedDataMessage;
public class SharedDataMessageHandler implements MessageHandler{ public class SharedDataMessageHandler implements MessageHandler{
/**
* @param gossipCore context.
* @param gossipManager context.
* @param base message reference.
* @return boolean indicating success.
*/
@Override @Override
public void invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) { public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
UdpSharedDataMessage message = (UdpSharedDataMessage) base; UdpSharedDataMessage message = (UdpSharedDataMessage) base;
gossipCore.addSharedData(message); gossipCore.addSharedData(message);
return true;
} }
} }

View File

@ -24,8 +24,15 @@ import org.apache.gossip.model.PerNodeDataMessage;
import org.apache.gossip.model.ShutdownMessage; import org.apache.gossip.model.ShutdownMessage;
public class ShutdownMessageHandler implements MessageHandler { public class ShutdownMessageHandler implements MessageHandler {
/**
* @param gossipCore context.
* @param gossipManager context.
* @param base message reference.
* @return boolean indicating success.
*/
@Override @Override
public void invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) { public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
ShutdownMessage s = (ShutdownMessage) base; ShutdownMessage s = (ShutdownMessage) base;
PerNodeDataMessage m = new PerNodeDataMessage(); PerNodeDataMessage m = new PerNodeDataMessage();
m.setKey(ShutdownMessage.PER_NODE_KEY); m.setKey(ShutdownMessage.PER_NODE_KEY);
@ -34,5 +41,6 @@ public class ShutdownMessageHandler implements MessageHandler {
m.setTimestamp(System.currentTimeMillis()); m.setTimestamp(System.currentTimeMillis());
m.setExpireAt(System.currentTimeMillis() + 30L * 1000L); m.setExpireAt(System.currentTimeMillis() + 30L * 1000L);
gossipCore.addPerNodeData(m); gossipCore.addPerNodeData(m);
return true;
} }
} }

View File

@ -21,11 +21,11 @@ import org.apache.gossip.manager.GossipCore;
import org.apache.gossip.manager.GossipManager; import org.apache.gossip.manager.GossipManager;
import org.apache.gossip.model.Base; import org.apache.gossip.model.Base;
public class SimpleMessageInvoker implements MessageInvoker { public class TypedMessageHandler implements MessageHandler {
final private Class<?> messageClass; final private Class<?> messageClass;
final private MessageHandler messageHandler; final private MessageHandler messageHandler;
public SimpleMessageInvoker(Class<?> messageClass, MessageHandler messageHandler) { public TypedMessageHandler(Class<?> messageClass, MessageHandler messageHandler) {
if (messageClass == null || messageHandler == null) { if (messageClass == null || messageHandler == null) {
throw new NullPointerException(); throw new NullPointerException();
} }
@ -33,6 +33,12 @@ public class SimpleMessageInvoker implements MessageInvoker {
this.messageHandler = messageHandler; this.messageHandler = messageHandler;
} }
/**
* @param gossipCore context.
* @param gossipManager context.
* @param base message reference.
* @return true if types match, false otherwise.
*/
@Override @Override
public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) { public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
if (messageClass.isAssignableFrom(base.getClass())) { if (messageClass.isAssignableFrom(base.getClass())) {

View File

@ -21,10 +21,9 @@ import com.codahale.metrics.MetricRegistry;
import org.apache.gossip.Member; import org.apache.gossip.Member;
import org.apache.gossip.GossipSettings; import org.apache.gossip.GossipSettings;
import org.apache.gossip.LocalMember; import org.apache.gossip.LocalMember;
import org.apache.gossip.manager.handlers.DefaultMessageInvoker; import org.apache.gossip.manager.handlers.MessageHandler;
import org.apache.gossip.manager.handlers.MessageInvoker;
import org.apache.gossip.manager.handlers.ResponseHandler; import org.apache.gossip.manager.handlers.ResponseHandler;
import org.apache.gossip.manager.handlers.SimpleMessageInvoker; import org.apache.gossip.manager.handlers.TypedMessageHandler;
import org.junit.Assert; import org.junit.Assert;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.platform.runner.JUnitPlatform; import org.junit.platform.runner.JUnitPlatform;
@ -77,28 +76,27 @@ public class GossipManagerBuilderTest {
} }
@Test @Test
public void createDefaultMessageInvokerIfNull() throws URISyntaxException { public void createDefaultMessageHandlerIfNull() throws URISyntaxException {
GossipManager gossipManager = GossipManagerBuilder.newBuilder() GossipManager gossipManager = GossipManagerBuilder.newBuilder()
.id("id") .id("id")
.cluster("aCluster") .cluster("aCluster")
.uri(new URI("udp://localhost:2000")) .uri(new URI("udp://localhost:2000"))
.gossipSettings(new GossipSettings()) .gossipSettings(new GossipSettings())
.messageInvoker(null).registry(new MetricRegistry()).build(); .messageHandler(null).registry(new MetricRegistry()).build();
assertNotNull(gossipManager.getMessageInvoker()); assertNotNull(gossipManager.getMessageHandler());
Assert.assertEquals(gossipManager.getMessageInvoker().getClass(), new DefaultMessageInvoker().getClass());
} }
@Test @Test
public void testMessageInvokerKeeping() throws URISyntaxException { public void testMessageHandlerKeeping() throws URISyntaxException {
MessageInvoker mi = new SimpleMessageInvoker(Response.class, new ResponseHandler()); MessageHandler mi = new TypedMessageHandler(Response.class, new ResponseHandler());
GossipManager gossipManager = GossipManagerBuilder.newBuilder() GossipManager gossipManager = GossipManagerBuilder.newBuilder()
.id("id") .id("id")
.cluster("aCluster") .cluster("aCluster")
.uri(new URI("udp://localhost:2000")) .uri(new URI("udp://localhost:2000"))
.gossipSettings(new GossipSettings()) .gossipSettings(new GossipSettings())
.messageInvoker(mi).registry(new MetricRegistry()).build(); .messageHandler(mi).registry(new MetricRegistry()).build();
assertNotNull(gossipManager.getMessageInvoker()); assertNotNull(gossipManager.getMessageHandler());
Assert.assertEquals(gossipManager.getMessageInvoker(), mi); Assert.assertEquals(gossipManager.getMessageHandler(), mi);
} }
@Test @Test

View File

@ -25,7 +25,7 @@ import org.apache.gossip.udp.UdpSharedDataMessage;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
public class MessageInvokerTest { public class MessageHandlerTest {
private class FakeMessage extends Base { private class FakeMessage extends Base {
public FakeMessage() { public FakeMessage() {
} }
@ -46,8 +46,9 @@ public class MessageInvokerTest {
data = 0; data = 0;
} }
public void invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) { public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
data = ((FakeMessageData) base).data; data = ((FakeMessageData) base).data;
return true;
} }
} }
@ -58,32 +59,33 @@ public class MessageInvokerTest {
counter = 0; counter = 0;
} }
public void invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) { public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
counter++; counter++;
return true;
} }
} }
@Test @Test
public void testSimpleInvoker() { public void testSimpleHandler() {
MessageInvoker mi = new SimpleMessageInvoker(FakeMessage.class, new FakeMessageHandler()); MessageHandler mi = new TypedMessageHandler(FakeMessage.class, new FakeMessageHandler());
Assert.assertTrue(mi.invoke(null, null, new FakeMessage())); Assert.assertTrue(mi.invoke(null, null, new FakeMessage()));
Assert.assertFalse(mi.invoke(null, null, new ActiveGossipMessage())); Assert.assertFalse(mi.invoke(null, null, new ActiveGossipMessage()));
} }
@Test(expected = NullPointerException.class) @Test(expected = NullPointerException.class)
public void testSimpleInvokerNullClassConstructor() { public void testSimpleHandlerNullClassConstructor() {
new SimpleMessageInvoker(null, new FakeMessageHandler()); new TypedMessageHandler(null, new FakeMessageHandler());
} }
@Test(expected = NullPointerException.class) @Test(expected = NullPointerException.class)
public void testSimpleInvokerNullHandlerConstructor() { public void testSimpleHandlerNullHandlerConstructor() {
new SimpleMessageInvoker(FakeMessage.class, null); new TypedMessageHandler(FakeMessage.class, null);
} }
@Test @Test
public void testCallCountSimpleInvoker() { public void testCallCountSimpleHandler() {
FakeMessageHandler h = new FakeMessageHandler(); FakeMessageHandler h = new FakeMessageHandler();
MessageInvoker mi = new SimpleMessageInvoker(FakeMessage.class, h); MessageHandler mi = new TypedMessageHandler(FakeMessage.class, h);
mi.invoke(null, null, new FakeMessage()); mi.invoke(null, null, new FakeMessage());
Assert.assertEquals(1, h.counter); Assert.assertEquals(1, h.counter);
mi.invoke(null, null, new ActiveGossipMessage()); mi.invoke(null, null, new ActiveGossipMessage());
@ -93,76 +95,78 @@ public class MessageInvokerTest {
} }
@Test(expected = NullPointerException.class) @Test(expected = NullPointerException.class)
public void cantAddNullInvoker() { @SuppressWarnings("all")
MessageInvokerCombiner mi = new MessageInvokerCombiner(); public void cantAddNullHandler() {
mi.add(null); MessageHandler handler = MessageHandlerFactory.concurrentHandler(null);
}
@Test(expected = NullPointerException.class)
public void cantAddNullHandler2() {
MessageHandler handler = MessageHandlerFactory.concurrentHandler(
new TypedMessageHandler(FakeMessage.class, new FakeMessageHandler()),
null,
new TypedMessageHandler(FakeMessage.class, new FakeMessageHandler())
);
} }
@Test @Test
public void testCombinerClear() { public void testMessageHandlerCombiner() {
MessageInvokerCombiner mi = new MessageInvokerCombiner();
mi.add(new SimpleMessageInvoker(FakeMessage.class, new FakeMessageHandler()));
Assert.assertTrue(mi.invoke(null, null, new FakeMessage()));
mi.clear();
Assert.assertFalse(mi.invoke(null, null, new FakeMessage()));
}
@Test
public void testMessageInvokerCombiner() {
//Empty combiner - false result //Empty combiner - false result
MessageInvokerCombiner mi = new MessageInvokerCombiner(); MessageHandler mi = MessageHandlerFactory.concurrentHandler();
Assert.assertFalse(mi.invoke(null, null, new Base())); Assert.assertFalse(mi.invoke(null, null, new Base()));
FakeMessageHandler h = new FakeMessageHandler(); FakeMessageHandler h = new FakeMessageHandler();
mi.add(new SimpleMessageInvoker(FakeMessage.class, h)); mi = MessageHandlerFactory.concurrentHandler(
mi.add(new SimpleMessageInvoker(FakeMessage.class, h)); new TypedMessageHandler(FakeMessage.class, h),
new TypedMessageHandler(FakeMessage.class, h)
);
Assert.assertTrue(mi.invoke(null, null, new FakeMessage())); Assert.assertTrue(mi.invoke(null, null, new FakeMessage()));
Assert.assertFalse(mi.invoke(null, null, new ActiveGossipMessage())); Assert.assertFalse(mi.invoke(null, null, new ActiveGossipMessage()));
Assert.assertEquals(2, h.counter); Assert.assertEquals(2, h.counter);
//Increase size in runtime. Should be 3 calls: 2+3 = 5 //Increase size in runtime. Should be 3 calls: 2+3 = 5
mi.add(new SimpleMessageInvoker(FakeMessage.class, h)); mi = MessageHandlerFactory.concurrentHandler(mi, new TypedMessageHandler(FakeMessage.class, h));
Assert.assertTrue(mi.invoke(null, null, new FakeMessage())); Assert.assertTrue(mi.invoke(null, null, new FakeMessage()));
Assert.assertEquals(5, h.counter); Assert.assertEquals(5, h.counter);
} }
@Test @Test
public void testMessageInvokerCombiner2levels() { public void testMessageHandlerCombiner2levels() {
MessageInvokerCombiner mi = new MessageInvokerCombiner();
FakeMessageHandler h = new FakeMessageHandler(); FakeMessageHandler h = new FakeMessageHandler();
MessageInvokerCombiner mi1 = new MessageInvokerCombiner(); MessageHandler mi1 = MessageHandlerFactory.concurrentHandler(
mi1.add(new SimpleMessageInvoker(FakeMessage.class, h)); new TypedMessageHandler(FakeMessage.class, h),
mi1.add(new SimpleMessageInvoker(FakeMessage.class, h)); new TypedMessageHandler(FakeMessage.class, h)
);
MessageInvokerCombiner mi2 = new MessageInvokerCombiner(); MessageHandler mi2 = MessageHandlerFactory.concurrentHandler(
mi2.add(new SimpleMessageInvoker(FakeMessage.class, h)); new TypedMessageHandler(FakeMessage.class, h),
mi2.add(new SimpleMessageInvoker(FakeMessage.class, h)); new TypedMessageHandler(FakeMessage.class, h)
);
mi.add(mi1); MessageHandler mi = MessageHandlerFactory.concurrentHandler(mi1, mi2);
mi.add(mi2);
Assert.assertTrue(mi.invoke(null, null, new FakeMessage())); Assert.assertTrue(mi.invoke(null, null, new FakeMessage()));
Assert.assertEquals(4, h.counter); Assert.assertEquals(4, h.counter);
} }
@Test @Test
public void testMessageInvokerCombinerDataShipping() { public void testMessageHandlerCombinerDataShipping() {
MessageInvokerCombiner mi = new MessageInvokerCombiner(); MessageHandler mi = MessageHandlerFactory.concurrentHandler();
FakeMessageDataHandler h = new FakeMessageDataHandler(); FakeMessageDataHandler h = new FakeMessageDataHandler();
mi.add(new SimpleMessageInvoker(FakeMessageData.class, h)); mi = MessageHandlerFactory.concurrentHandler(mi, new TypedMessageHandler(FakeMessageData.class, h));
Assert.assertTrue(mi.invoke(null, null, new FakeMessageData(101))); Assert.assertTrue(mi.invoke(null, null, new FakeMessageData(101)));
Assert.assertEquals(101, h.data); Assert.assertEquals(101, h.data);
} }
@Test @Test
public void testCombiningDefaultInvoker() { public void testCombiningDefaultHandler() {
MessageInvokerCombiner mi = new MessageInvokerCombiner(); MessageHandler mi = MessageHandlerFactory.concurrentHandler(
mi.add(new DefaultMessageInvoker()); MessageHandlerFactory.defaultHandler(),
mi.add(new SimpleMessageInvoker(FakeMessage.class, new FakeMessageHandler())); new TypedMessageHandler(FakeMessage.class, new FakeMessageHandler())
);
//UdpSharedGossipDataMessage with null gossipCore -> exception //UdpSharedGossipDataMessage with null gossipCore -> exception
boolean thrown = false; boolean thrown = false;
try { try {
@ -171,7 +175,7 @@ public class MessageInvokerTest {
thrown = true; thrown = true;
} }
Assert.assertTrue(thrown); Assert.assertTrue(thrown);
//DefaultInvoker skips FakeMessage and FakeHandler works ok //skips FakeMessage and FakeHandler works ok
Assert.assertTrue(mi.invoke(null, null, new FakeMessage())); Assert.assertTrue(mi.invoke(null, null, new FakeMessage()));
} }