GOSSIP-56 GossipCore should allow registration of handlers

MessageInvoker idea. Returns true when it managed to invoke one of
handlers. User can build any structure of handlers.
See tests: MessageInvokerTest.
This commit is contained in:
Maxim Rusak
2017-02-16 12:35:56 +03:00
parent 7106cc400a
commit 2133cb0549
15 changed files with 701 additions and 159 deletions

View File

@ -17,6 +17,17 @@
*/ */
package org.apache.gossip.manager; package org.apache.gossip.manager;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import org.apache.gossip.GossipMember;
import org.apache.gossip.LocalGossipMember;
import org.apache.gossip.RemoteGossipMember;
import org.apache.gossip.event.GossipState;
import org.apache.gossip.model.*;
import org.apache.gossip.udp.Trackable;
import org.apache.log4j.Logger;
import java.io.File; import java.io.File;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
@ -24,50 +35,12 @@ import java.net.DatagramPacket;
import java.net.DatagramSocket; import java.net.DatagramSocket;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.security.*;
import java.security.InvalidKeyException;
import java.security.KeyFactory;
import java.security.NoSuchAlgorithmException;
import java.security.NoSuchProviderException;
import java.security.PrivateKey;
import java.security.Signature;
import java.security.SignatureException;
import java.security.spec.InvalidKeySpecException; import java.security.spec.InvalidKeySpecException;
import java.security.spec.PKCS8EncodedKeySpec; import java.security.spec.PKCS8EncodedKeySpec;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.gossip.GossipMember;
import org.apache.gossip.LocalGossipMember;
import org.apache.gossip.RemoteGossipMember;
import org.apache.gossip.event.GossipState;
import org.apache.gossip.model.ActiveGossipMessage;
import org.apache.gossip.model.Base;
import org.apache.gossip.model.GossipDataMessage;
import org.apache.gossip.model.Response;
import org.apache.gossip.model.SharedGossipDataMessage;
import org.apache.gossip.model.ShutdownMessage;
import org.apache.gossip.model.SignedPayload;
import org.apache.gossip.udp.Trackable;
import org.apache.gossip.udp.UdpActiveGossipMessage;
import org.apache.gossip.udp.UdpActiveGossipOk;
import org.apache.gossip.udp.UdpGossipDataMessage;
import org.apache.gossip.udp.UdpNotAMemberFault;
import org.apache.gossip.udp.UdpSharedGossipDataMessage;
import org.apache.log4j.Logger;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
public class GossipCore implements GossipCoreConstants { public class GossipCore implements GossipCoreConstants {
@ -100,6 +73,7 @@ public class GossipCore implements GossipCoreConstants {
messageSerdeException = metrics.meter(MESSAGE_SERDE_EXCEPTION); messageSerdeException = metrics.meter(MESSAGE_SERDE_EXCEPTION);
tranmissionException = metrics.meter(MESSAGE_TRANSMISSION_EXCEPTION); tranmissionException = metrics.meter(MESSAGE_TRANSMISSION_EXCEPTION);
tranmissionSuccess = metrics.meter(MESSAGE_TRANSMISSION_SUCCESS); tranmissionSuccess = metrics.meter(MESSAGE_TRANSMISSION_SUCCESS);
if (manager.getSettings().isSignMessages()){ if (manager.getSettings().isSignMessages()){
File privateKey = new File(manager.getSettings().getPathToKeyStore(), manager.getMyself().getId()); File privateKey = new File(manager.getSettings().getPathToKeyStore(), manager.getMyself().getId());
File publicKey = new File(manager.getSettings().getPathToKeyStore(), manager.getMyself().getId() + ".pub"); File publicKey = new File(manager.getSettings().getPathToKeyStore(), manager.getMyself().getId() + ".pub");
@ -183,67 +157,8 @@ public class GossipCore implements GossipCoreConstants {
} }
public void receive(Base base) { public void receive(Base base) {
if (base instanceof Response){ if (!gossipManager.getMessageInvoker().invoke(this, gossipManager, base)) {
if (base instanceof Trackable){ LOGGER.warn("received message can not be handled");
Trackable t = (Trackable) base;
requests.put(t.getUuid() + "/" + t.getUriFrom(), (Base) t);
}
}
if (base instanceof ShutdownMessage){
ShutdownMessage s = (ShutdownMessage) base;
GossipDataMessage m = new GossipDataMessage();
m.setKey(ShutdownMessage.PER_NODE_KEY);
m.setNodeId(s.getNodeId());
m.setPayload(base);
m.setTimestamp(System.currentTimeMillis());
m.setExpireAt(System.currentTimeMillis() + 30L * 1000L);
addPerNodeData(m);
}
if (base instanceof GossipDataMessage) {
UdpGossipDataMessage message = (UdpGossipDataMessage) base;
addPerNodeData(message);
}
if (base instanceof SharedGossipDataMessage){
UdpSharedGossipDataMessage message = (UdpSharedGossipDataMessage) base;
addSharedData(message);
}
if (base instanceof ActiveGossipMessage){
List<GossipMember> remoteGossipMembers = new ArrayList<>();
RemoteGossipMember senderMember = null;
UdpActiveGossipMessage activeGossipMessage = (UdpActiveGossipMessage) base;
for (int i = 0; i < activeGossipMessage.getMembers().size(); i++) {
URI u = null;
try {
u = new URI(activeGossipMessage.getMembers().get(i).getUri());
} catch (URISyntaxException e) {
LOGGER.debug("Gossip message with faulty URI", e);
continue;
}
RemoteGossipMember member = new RemoteGossipMember(
activeGossipMessage.getMembers().get(i).getCluster(),
u,
activeGossipMessage.getMembers().get(i).getId(),
activeGossipMessage.getMembers().get(i).getHeartbeat(),
activeGossipMessage.getMembers().get(i).getProperties());
if (i == 0) {
senderMember = member;
}
if (!(member.getClusterName().equals(gossipManager.getMyself().getClusterName()))){
UdpNotAMemberFault f = new UdpNotAMemberFault();
f.setException("Not a member of this cluster " + i);
f.setUriFrom(activeGossipMessage.getUriFrom());
f.setUuid(activeGossipMessage.getUuid());
LOGGER.warn(f);
sendOneWay(f, member.getUri());
continue;
}
remoteGossipMembers.add(member);
}
UdpActiveGossipOk o = new UdpActiveGossipOk();
o.setUriFrom(activeGossipMessage.getUriFrom());
o.setUuid(activeGossipMessage.getUuid());
sendOneWay(o, senderMember.getUri());
mergeLists(gossipManager, senderMember, remoteGossipMembers);
} }
} }
@ -365,6 +280,10 @@ public class GossipCore implements GossipCoreConstants {
} }
} }
public void addRequest(String k, Base v) {
requests.put(k, v);
}
/** /**
* Merge lists from remote members and update heartbeats * Merge lists from remote members and update heartbeats
* *
@ -373,7 +292,7 @@ public class GossipCore implements GossipCoreConstants {
* @param remoteList * @param remoteList
* *
*/ */
protected void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember, public void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember,
List<GossipMember> remoteList) { List<GossipMember> remoteList) {
if (LOGGER.isDebugEnabled()){ if (LOGGER.isDebugEnabled()){
debugState(senderMember, remoteList); debugState(senderMember, remoteList);
@ -426,5 +345,4 @@ public class GossipCore implements GossipCoreConstants {
"Dead " + gossipManager.getDeadMembers()+ "\n" + "Dead " + gossipManager.getDeadMembers()+ "\n" +
"======================="); "=======================");
} }
} }

View File

@ -19,6 +19,17 @@ package org.apache.gossip.manager;
import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.MetricRegistry;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.gossip.GossipMember;
import org.apache.gossip.GossipSettings;
import org.apache.gossip.LocalGossipMember;
import org.apache.gossip.event.GossipListener;
import org.apache.gossip.event.GossipState;
import org.apache.gossip.manager.handlers.MessageInvoker;
import org.apache.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread;
import org.apache.gossip.model.GossipDataMessage;
import org.apache.gossip.model.SharedGossipDataMessage;
import org.apache.gossip.model.ShutdownMessage;
import org.apache.log4j.Logger;
import java.lang.reflect.Constructor; import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException; import java.lang.reflect.InvocationTargetException;
@ -28,28 +39,10 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.*;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.log4j.Logger;
import org.apache.gossip.GossipMember;
import org.apache.gossip.GossipSettings;
import org.apache.gossip.LocalGossipMember;
import org.apache.gossip.event.GossipListener;
import org.apache.gossip.event.GossipState;
import org.apache.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread;
import org.apache.gossip.model.GossipDataMessage;
import org.apache.gossip.model.SharedGossipDataMessage;
import org.apache.gossip.model.ShutdownMessage;
public abstract class GossipManager { public abstract class GossipManager {
@ -72,10 +65,14 @@ public abstract class GossipManager {
private final UserDataPersister userDataState; private final UserDataPersister userDataState;
private final ObjectMapper objectMapper; private final ObjectMapper objectMapper;
private final MessageInvoker messageInvoker;
public GossipManager(String cluster, public GossipManager(String cluster,
URI uri, String id, Map<String, String> properties, GossipSettings settings, URI uri, String id, Map<String, String> properties, GossipSettings settings,
List<GossipMember> gossipMembers, GossipListener listener, MetricRegistry registry, ObjectMapper objectMapper) { List<GossipMember> gossipMembers, GossipListener listener, MetricRegistry registry,
ObjectMapper objectMapper, MessageInvoker messageInvoker) {
this.settings = settings; this.settings = settings;
this.messageInvoker = messageInvoker;
clock = new SystemClock(); clock = new SystemClock();
me = new LocalGossipMember(cluster, uri, id, clock.nanoTime(), properties, me = new LocalGossipMember(cluster, uri, id, clock.nanoTime(), properties,
settings.getWindowSize(), settings.getMinimumSamples(), settings.getDistribution()); settings.getWindowSize(), settings.getMinimumSamples(), settings.getDistribution());
@ -104,6 +101,10 @@ public abstract class GossipManager {
readSavedDataState(); readSavedDataState();
} }
public MessageInvoker getMessageInvoker() {
return messageInvoker;
}
public ConcurrentSkipListMap<LocalGossipMember, GossipState> getMembers() { public ConcurrentSkipListMap<LocalGossipMember, GossipState> getMembers() {
return members; return members;
} }

View File

@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.manager.handlers;
import org.apache.gossip.GossipMember;
import org.apache.gossip.RemoteGossipMember;
import org.apache.gossip.manager.GossipCore;
import org.apache.gossip.manager.GossipManager;
import org.apache.gossip.model.Base;
import org.apache.gossip.udp.UdpActiveGossipMessage;
import org.apache.gossip.udp.UdpActiveGossipOk;
import org.apache.gossip.udp.UdpNotAMemberFault;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
public class ActiveGossipMessageHandler implements MessageHandler {
@Override
public void invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
List<GossipMember> remoteGossipMembers = new ArrayList<>();
RemoteGossipMember senderMember = null;
UdpActiveGossipMessage activeGossipMessage = (UdpActiveGossipMessage) base;
for (int i = 0; i < activeGossipMessage.getMembers().size(); i++) {
URI u;
try {
u = new URI(activeGossipMessage.getMembers().get(i).getUri());
} catch (URISyntaxException e) {
GossipCore.LOGGER.debug("Gossip message with faulty URI", e);
continue;
}
RemoteGossipMember member = new RemoteGossipMember(
activeGossipMessage.getMembers().get(i).getCluster(),
u,
activeGossipMessage.getMembers().get(i).getId(),
activeGossipMessage.getMembers().get(i).getHeartbeat(),
activeGossipMessage.getMembers().get(i).getProperties());
if (i == 0) {
senderMember = member;
}
if (!(member.getClusterName().equals(gossipManager.getMyself().getClusterName()))) {
UdpNotAMemberFault f = new UdpNotAMemberFault();
f.setException("Not a member of this cluster " + i);
f.setUriFrom(activeGossipMessage.getUriFrom());
f.setUuid(activeGossipMessage.getUuid());
GossipCore.LOGGER.warn(f);
gossipCore.sendOneWay(f, member.getUri());
continue;
}
remoteGossipMembers.add(member);
}
UdpActiveGossipOk o = new UdpActiveGossipOk();
o.setUriFrom(activeGossipMessage.getUriFrom());
o.setUuid(activeGossipMessage.getUuid());
gossipCore.sendOneWay(o, senderMember.getUri());
gossipCore.mergeLists(gossipManager, senderMember, remoteGossipMembers);
}
}

View File

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.manager.handlers;
import org.apache.gossip.manager.GossipCore;
import org.apache.gossip.manager.GossipManager;
import org.apache.gossip.model.*;
public class DefaultMessageInvoker implements MessageInvoker {
private final MessageInvokerCombiner mic;
public DefaultMessageInvoker() {
mic = new MessageInvokerCombiner();
mic.add(new SimpleMessageInvoker(Response.class, new ResponseHandler()));
mic.add(new SimpleMessageInvoker(ShutdownMessage.class, new ShutdownMessageHandler()));
mic.add(new SimpleMessageInvoker(GossipDataMessage.class, new GossipDataMessageHandler()));
mic.add(new SimpleMessageInvoker(SharedGossipDataMessage.class, new SharedGossipDataMessageHandler()));
mic.add(new SimpleMessageInvoker(ActiveGossipMessage.class, new ActiveGossipMessageHandler()));
}
public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
return mic.invoke(gossipCore, gossipManager, base);
}
}

View File

@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.manager.handlers;
import org.apache.gossip.manager.GossipCore;
import org.apache.gossip.manager.GossipManager;
import org.apache.gossip.model.Base;
import org.apache.gossip.udp.UdpGossipDataMessage;
public class GossipDataMessageHandler implements MessageHandler {
@Override
public void invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
UdpGossipDataMessage message = (UdpGossipDataMessage) base;
gossipCore.addPerNodeData(message);
}
}

View File

@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.manager.handlers;
import org.apache.gossip.manager.GossipCore;
import org.apache.gossip.manager.GossipManager;
import org.apache.gossip.model.Base;
public interface MessageHandler {
void invoke(GossipCore gossipCore, GossipManager gossipManager, Base base);
}

View File

@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.manager.handlers;
import org.apache.gossip.manager.GossipCore;
import org.apache.gossip.manager.GossipManager;
import org.apache.gossip.model.Base;
public interface MessageInvoker {
boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base);
}

View File

@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.manager.handlers;
import org.apache.gossip.manager.GossipCore;
import org.apache.gossip.manager.GossipManager;
import org.apache.gossip.model.Base;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
public class MessageInvokerCombiner implements MessageInvoker {
private List<MessageInvoker> invokers;
public MessageInvokerCombiner() {
}
public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
if (invokers == null) {
return false;
}
boolean result = false;
for (MessageInvoker mi : invokers) {
result = mi.invoke(gossipCore, gossipManager, base) || result;
}
return result;
}
public void clear() {
invokers = null;
}
public void add(MessageInvoker mi) {
if (mi == null) {
throw new NullPointerException();
}
if (invokers == null) {
invokers = new CopyOnWriteArrayList<>();
}
invokers.add(mi);
}
}

View File

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.manager.handlers;
import org.apache.gossip.manager.GossipCore;
import org.apache.gossip.manager.GossipManager;
import org.apache.gossip.model.Base;
import org.apache.gossip.model.GossipDataMessage;
import org.apache.gossip.model.ShutdownMessage;
import org.apache.gossip.udp.Trackable;
public class ResponseHandler implements MessageHandler {
@Override
public void invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
if (base instanceof Trackable) {
Trackable t = (Trackable) base;
gossipCore.addRequest(t.getUuid() + "/" + t.getUriFrom(), (Base) t);
}
}
}

View File

@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.manager.handlers;
import org.apache.gossip.manager.GossipCore;
import org.apache.gossip.manager.GossipManager;
import org.apache.gossip.model.Base;
import org.apache.gossip.udp.UdpSharedGossipDataMessage;
public class SharedGossipDataMessageHandler implements MessageHandler{
@Override
public void invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
UdpSharedGossipDataMessage message = (UdpSharedGossipDataMessage) base;
gossipCore.addSharedData(message);
}
}

View File

@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.manager.handlers;
import org.apache.gossip.manager.GossipCore;
import org.apache.gossip.manager.GossipManager;
import org.apache.gossip.model.Base;
import org.apache.gossip.model.GossipDataMessage;
import org.apache.gossip.model.ShutdownMessage;
public class ShutdownMessageHandler implements MessageHandler {
@Override
public void invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
ShutdownMessage s = (ShutdownMessage) base;
GossipDataMessage m = new GossipDataMessage();
m.setKey(ShutdownMessage.PER_NODE_KEY);
m.setNodeId(s.getNodeId());
m.setPayload(base);
m.setTimestamp(System.currentTimeMillis());
m.setExpireAt(System.currentTimeMillis() + 30L * 1000L);
gossipCore.addPerNodeData(m);
}
}

View File

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.manager.handlers;
import org.apache.gossip.manager.GossipCore;
import org.apache.gossip.manager.GossipManager;
import org.apache.gossip.model.Base;
public class SimpleMessageInvoker implements MessageInvoker {
final private Class<?> messageClass;
final private MessageHandler messageHandler;
public SimpleMessageInvoker(Class<?> messageClass, MessageHandler messageHandler) {
if (messageClass == null || messageHandler == null) {
throw new NullPointerException();
}
this.messageClass = messageClass;
this.messageHandler = messageHandler;
}
@Override
public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
if (messageClass.isAssignableFrom(base.getClass())) {
messageHandler.invoke(gossipCore, gossipManager, base);
return true;
} else {
return false;
}
}
}

View File

@ -19,17 +19,18 @@ package org.apache.gossip.manager.random;
import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.MetricRegistry;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.gossip.GossipMember; import org.apache.gossip.GossipMember;
import org.apache.gossip.GossipSettings; import org.apache.gossip.GossipSettings;
import org.apache.gossip.event.GossipListener; import org.apache.gossip.event.GossipListener;
import org.apache.gossip.manager.GossipManager; import org.apache.gossip.manager.GossipManager;
import org.apache.gossip.manager.handlers.DefaultMessageInvoker;
import org.apache.gossip.manager.handlers.MessageInvoker;
import java.net.URI; import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class RandomGossipManager extends GossipManager { public class RandomGossipManager extends GossipManager {
@ -47,6 +48,7 @@ public class RandomGossipManager extends GossipManager {
private MetricRegistry registry; private MetricRegistry registry;
private Map<String,String> properties; private Map<String,String> properties;
private ObjectMapper objectMapper; private ObjectMapper objectMapper;
private MessageInvoker messageInvoker;
private ManagerBuilder() {} private ManagerBuilder() {}
@ -101,6 +103,11 @@ public class RandomGossipManager extends GossipManager {
return this; return this;
} }
public ManagerBuilder messageInvoker(MessageInvoker messageInvoker) {
this.messageInvoker = messageInvoker;
return this;
}
public RandomGossipManager build() { public RandomGossipManager build() {
checkArgument(id != null, "You must specify an id"); checkArgument(id != null, "You must specify an id");
checkArgument(cluster != null, "You must specify a cluster name"); checkArgument(cluster != null, "You must specify a cluster name");
@ -120,12 +127,15 @@ public class RandomGossipManager extends GossipManager {
objectMapper = new ObjectMapper(); objectMapper = new ObjectMapper();
objectMapper.enableDefaultTyping(); objectMapper.enableDefaultTyping();
} }
return new RandomGossipManager(cluster, uri, id, properties, settings, gossipMembers, listener, registry, objectMapper); if (messageInvoker == null) {
messageInvoker = new DefaultMessageInvoker();
}
return new RandomGossipManager(cluster, uri, id, properties, settings, gossipMembers, listener, registry, objectMapper, messageInvoker);
} }
} }
private RandomGossipManager(String cluster, URI uri, String id, Map<String,String> properties, GossipSettings settings, private RandomGossipManager(String cluster, URI uri, String id, Map<String,String> properties, GossipSettings settings,
List<GossipMember> gossipMembers, GossipListener listener, MetricRegistry registry, ObjectMapper objectMapper) { List<GossipMember> gossipMembers, GossipListener listener, MetricRegistry registry, ObjectMapper objectMapper, MessageInvoker messageInvoker) {
super(cluster, uri, id, properties, settings, gossipMembers, listener, registry, objectMapper); super(cluster, uri, id, properties, settings, gossipMembers, listener, registry, objectMapper, messageInvoker);
} }
} }

View File

@ -21,11 +21,17 @@ import com.codahale.metrics.MetricRegistry;
import org.apache.gossip.GossipMember; import org.apache.gossip.GossipMember;
import org.apache.gossip.GossipSettings; import org.apache.gossip.GossipSettings;
import org.apache.gossip.LocalGossipMember; import org.apache.gossip.LocalGossipMember;
import org.apache.gossip.manager.handlers.DefaultMessageInvoker;
import org.apache.gossip.manager.handlers.MessageInvoker;
import org.apache.gossip.manager.handlers.ResponseHandler;
import org.apache.gossip.manager.handlers.SimpleMessageInvoker;
import org.apache.gossip.manager.random.RandomGossipManager; import org.apache.gossip.manager.random.RandomGossipManager;
import org.junit.Assert;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.platform.runner.JUnitPlatform; import org.junit.platform.runner.JUnitPlatform;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import javax.xml.ws.Response;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.ArrayList; import java.util.ArrayList;
@ -71,6 +77,31 @@ public class RandomGossipManagerBuilderTest {
assertNotNull(gossipManager.getLiveMembers()); assertNotNull(gossipManager.getLiveMembers());
} }
@Test
public void createDefaultMessageInvokerIfNull() throws URISyntaxException {
RandomGossipManager gossipManager = RandomGossipManager.newBuilder()
.withId("id")
.cluster("aCluster")
.uri(new URI("udp://localhost:2000"))
.settings(new GossipSettings())
.messageInvoker(null).registry(new MetricRegistry()).build();
assertNotNull(gossipManager.getMessageInvoker());
Assert.assertEquals(gossipManager.getMessageInvoker().getClass(), new DefaultMessageInvoker().getClass());
}
@Test
public void testMessageInvokerKeeping() throws URISyntaxException {
MessageInvoker mi = new SimpleMessageInvoker(Response.class, new ResponseHandler());
RandomGossipManager gossipManager = RandomGossipManager.newBuilder()
.withId("id")
.cluster("aCluster")
.uri(new URI("udp://localhost:2000"))
.settings(new GossipSettings())
.messageInvoker(mi).registry(new MetricRegistry()).build();
assertNotNull(gossipManager.getMessageInvoker());
Assert.assertEquals(gossipManager.getMessageInvoker(), mi);
}
@Test @Test
public void useMemberListIfProvided() throws URISyntaxException { public void useMemberListIfProvided() throws URISyntaxException {
LocalGossipMember member = new LocalGossipMember( LocalGossipMember member = new LocalGossipMember(

View File

@ -0,0 +1,178 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.manager.handlers;
import org.apache.gossip.manager.GossipCore;
import org.apache.gossip.manager.GossipManager;
import org.apache.gossip.model.ActiveGossipMessage;
import org.apache.gossip.model.Base;
import org.apache.gossip.udp.UdpSharedGossipDataMessage;
import org.junit.Assert;
import org.junit.Test;
public class MessageInvokerTest {
private class FakeMessage extends Base {
public FakeMessage() {
}
}
private class FakeMessageData extends Base {
public int data;
public FakeMessageData(int data) {
this.data = data;
}
}
private class FakeMessageDataHandler implements MessageHandler {
public int data;
public FakeMessageDataHandler() {
data = 0;
}
public void invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
data = ((FakeMessageData) base).data;
}
}
private class FakeMessageHandler implements MessageHandler {
public int counter;
public FakeMessageHandler() {
counter = 0;
}
public void invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
counter++;
}
}
@Test
public void testSimpleInvoker() {
MessageInvoker mi = new SimpleMessageInvoker(FakeMessage.class, new FakeMessageHandler());
Assert.assertTrue(mi.invoke(null, null, new FakeMessage()));
Assert.assertFalse(mi.invoke(null, null, new ActiveGossipMessage()));
}
@Test(expected = NullPointerException.class)
public void testSimpleInvokerNullClassConstructor() {
new SimpleMessageInvoker(null, new FakeMessageHandler());
}
@Test(expected = NullPointerException.class)
public void testSimpleInvokerNullHandlerConstructor() {
new SimpleMessageInvoker(FakeMessage.class, null);
}
@Test
public void testCallCountSimpleInvoker() {
FakeMessageHandler h = new FakeMessageHandler();
MessageInvoker mi = new SimpleMessageInvoker(FakeMessage.class, h);
mi.invoke(null, null, new FakeMessage());
Assert.assertEquals(1, h.counter);
mi.invoke(null, null, new ActiveGossipMessage());
Assert.assertEquals(1, h.counter);
mi.invoke(null, null, new FakeMessage());
Assert.assertEquals(2, h.counter);
}
@Test(expected = NullPointerException.class)
public void cantAddNullInvoker() {
MessageInvokerCombiner mi = new MessageInvokerCombiner();
mi.add(null);
}
@Test
public void testCombinerClear() {
MessageInvokerCombiner mi = new MessageInvokerCombiner();
mi.add(new SimpleMessageInvoker(FakeMessage.class, new FakeMessageHandler()));
Assert.assertTrue(mi.invoke(null, null, new FakeMessage()));
mi.clear();
Assert.assertFalse(mi.invoke(null, null, new FakeMessage()));
}
@Test
public void testMessageInvokerCombiner() {
//Empty combiner - false result
MessageInvokerCombiner mi = new MessageInvokerCombiner();
Assert.assertFalse(mi.invoke(null, null, new Base()));
FakeMessageHandler h = new FakeMessageHandler();
mi.add(new SimpleMessageInvoker(FakeMessage.class, h));
mi.add(new SimpleMessageInvoker(FakeMessage.class, h));
Assert.assertTrue(mi.invoke(null, null, new FakeMessage()));
Assert.assertFalse(mi.invoke(null, null, new ActiveGossipMessage()));
Assert.assertEquals(2, h.counter);
//Increase size in runtime. Should be 3 calls: 2+3 = 5
mi.add(new SimpleMessageInvoker(FakeMessage.class, h));
Assert.assertTrue(mi.invoke(null, null, new FakeMessage()));
Assert.assertEquals(5, h.counter);
}
@Test
public void testMessageInvokerCombiner2levels() {
MessageInvokerCombiner mi = new MessageInvokerCombiner();
FakeMessageHandler h = new FakeMessageHandler();
MessageInvokerCombiner mi1 = new MessageInvokerCombiner();
mi1.add(new SimpleMessageInvoker(FakeMessage.class, h));
mi1.add(new SimpleMessageInvoker(FakeMessage.class, h));
MessageInvokerCombiner mi2 = new MessageInvokerCombiner();
mi2.add(new SimpleMessageInvoker(FakeMessage.class, h));
mi2.add(new SimpleMessageInvoker(FakeMessage.class, h));
mi.add(mi1);
mi.add(mi2);
Assert.assertTrue(mi.invoke(null, null, new FakeMessage()));
Assert.assertEquals(4, h.counter);
}
@Test
public void testMessageInvokerCombinerDataShipping() {
MessageInvokerCombiner mi = new MessageInvokerCombiner();
FakeMessageDataHandler h = new FakeMessageDataHandler();
mi.add(new SimpleMessageInvoker(FakeMessageData.class, h));
Assert.assertTrue(mi.invoke(null, null, new FakeMessageData(101)));
Assert.assertEquals(101, h.data);
}
@Test
public void testCombiningDefaultInvoker() {
MessageInvokerCombiner mi = new MessageInvokerCombiner();
mi.add(new DefaultMessageInvoker());
mi.add(new SimpleMessageInvoker(FakeMessage.class, new FakeMessageHandler()));
//UdpSharedGossipDataMessage with null gossipCore -> exception
boolean thrown = false;
try {
mi.invoke(null, null, new UdpSharedGossipDataMessage());
} catch (NullPointerException e) {
thrown = true;
}
Assert.assertTrue(thrown);
//DefaultInvoker skips FakeMessage and FakeHandler works ok
Assert.assertTrue(mi.invoke(null, null, new FakeMessage()));
}
}