GOSSIP-55 Added event handlers to notify share data and per node data changes
This commit is contained in:
committed by
Miraj Abeysekara
parent
9c9d96e564
commit
ade33a9e58
@ -0,0 +1,42 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.gossip.event.data;
|
||||||
|
|
||||||
|
public class DataEventConstants {
|
||||||
|
|
||||||
|
// MetricRegistry
|
||||||
|
public static final String PER_NODE_DATA_SUBSCRIBERS_SIZE
|
||||||
|
= "gossip.event.data.pernode.subscribers.size";
|
||||||
|
public static final String PER_NODE_DATA_SUBSCRIBERS_QUEUE_SIZE
|
||||||
|
= "gossip.event.data.pernode.subscribers.queue.size";
|
||||||
|
public static final String SHARED_DATA_SUBSCRIBERS_SIZE
|
||||||
|
= "gossip.event.data.shared.subscribers.size";
|
||||||
|
public static final String SHARED_DATA_SUBSCRIBERS_QUEUE_SIZE
|
||||||
|
= "gossip.event.data.shared.subscribers.queue.size";
|
||||||
|
|
||||||
|
// Thread pool
|
||||||
|
public static final int PER_NODE_DATA_QUEUE_SIZE = 64;
|
||||||
|
public static final int PER_NODE_DATA_CORE_POOL_SIZE = 1;
|
||||||
|
public static final int PER_NODE_DATA_MAX_POOL_SIZE = 30;
|
||||||
|
public static final int PER_NODE_DATA_KEEP_ALIVE_TIME_SECONDS = 1;
|
||||||
|
public static final int SHARED_DATA_QUEUE_SIZE = 64;
|
||||||
|
public static final int SHARED_DATA_CORE_POOL_SIZE = 1;
|
||||||
|
public static final int SHARED_DATA_MAX_POOL_SIZE = 30;
|
||||||
|
public static final int SHARED_DATA_KEEP_ALIVE_TIME_SECONDS = 1;
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,102 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.gossip.event.data;
|
||||||
|
|
||||||
|
import com.codahale.metrics.Gauge;
|
||||||
|
import com.codahale.metrics.MetricRegistry;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.ArrayBlockingQueue;
|
||||||
|
import java.util.concurrent.BlockingQueue;
|
||||||
|
import java.util.concurrent.CopyOnWriteArrayList;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
public class DataEventManager {
|
||||||
|
|
||||||
|
private final List<UpdateNodeDataEventHandler> perNodeDataHandlers;
|
||||||
|
private final BlockingQueue<Runnable> perNodeDataHandlerQueue;
|
||||||
|
private final ExecutorService perNodeDataEventExecutor;
|
||||||
|
private final List<UpdateSharedDataEventHandler> sharedDataHandlers;
|
||||||
|
private final BlockingQueue<Runnable> sharedDataHandlerQueue;
|
||||||
|
private final ExecutorService sharedDataEventExecutor;
|
||||||
|
|
||||||
|
public DataEventManager(MetricRegistry metrics) {
|
||||||
|
perNodeDataHandlers = new CopyOnWriteArrayList<>();
|
||||||
|
perNodeDataHandlerQueue = new ArrayBlockingQueue<>(DataEventConstants.PER_NODE_DATA_QUEUE_SIZE);
|
||||||
|
perNodeDataEventExecutor = new ThreadPoolExecutor(
|
||||||
|
DataEventConstants.PER_NODE_DATA_CORE_POOL_SIZE,
|
||||||
|
DataEventConstants.PER_NODE_DATA_MAX_POOL_SIZE,
|
||||||
|
DataEventConstants.PER_NODE_DATA_KEEP_ALIVE_TIME_SECONDS, TimeUnit.SECONDS,
|
||||||
|
perNodeDataHandlerQueue, new ThreadPoolExecutor.DiscardOldestPolicy());
|
||||||
|
|
||||||
|
sharedDataHandlers = new CopyOnWriteArrayList<>();
|
||||||
|
sharedDataHandlerQueue = new ArrayBlockingQueue<>(DataEventConstants.SHARED_DATA_QUEUE_SIZE);
|
||||||
|
sharedDataEventExecutor = new ThreadPoolExecutor(DataEventConstants.SHARED_DATA_CORE_POOL_SIZE,
|
||||||
|
DataEventConstants.SHARED_DATA_MAX_POOL_SIZE,
|
||||||
|
DataEventConstants.SHARED_DATA_KEEP_ALIVE_TIME_SECONDS, TimeUnit.SECONDS,
|
||||||
|
sharedDataHandlerQueue, new ThreadPoolExecutor.DiscardOldestPolicy());
|
||||||
|
|
||||||
|
metrics.register(DataEventConstants.PER_NODE_DATA_SUBSCRIBERS_SIZE,
|
||||||
|
(Gauge<Integer>) () -> perNodeDataHandlers.size());
|
||||||
|
metrics.register(DataEventConstants.PER_NODE_DATA_SUBSCRIBERS_QUEUE_SIZE,
|
||||||
|
(Gauge<Integer>) () -> perNodeDataHandlerQueue.size());
|
||||||
|
metrics.register(DataEventConstants.SHARED_DATA_SUBSCRIBERS_SIZE,
|
||||||
|
(Gauge<Integer>) () -> sharedDataHandlers.size());
|
||||||
|
metrics.register(DataEventConstants.SHARED_DATA_SUBSCRIBERS_QUEUE_SIZE,
|
||||||
|
(Gauge<Integer>) () -> sharedDataHandlerQueue.size());
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public void notifySharedData(final String key, final Object newValue, final Object oldValue) {
|
||||||
|
sharedDataHandlers.forEach(handler -> sharedDataEventExecutor
|
||||||
|
.execute(() -> handler.onUpdate(key, oldValue, newValue)));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void notifyPerNodeData(final String nodeId, final String key, final Object newValue,
|
||||||
|
final Object oldValue) {
|
||||||
|
perNodeDataHandlers.forEach(handler -> perNodeDataEventExecutor
|
||||||
|
.execute(() -> handler.onUpdate(nodeId, key, oldValue, newValue)));
|
||||||
|
}
|
||||||
|
|
||||||
|
public void registerPerNodeDataSubscriber(UpdateNodeDataEventHandler handler) {
|
||||||
|
perNodeDataHandlers.add(handler);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void unregisterPerNodeDataSubscriber(UpdateNodeDataEventHandler handler) {
|
||||||
|
perNodeDataHandlers.remove(handler);
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getPerNodeSubscribersSize() {
|
||||||
|
return perNodeDataHandlers.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void registerSharedDataSubscriber(UpdateSharedDataEventHandler handler) {
|
||||||
|
sharedDataHandlers.add(handler);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void unregisterSharedDataSubscriber(UpdateSharedDataEventHandler handler) {
|
||||||
|
sharedDataHandlers.remove(handler);
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getSharedDataSubscribersSize() {
|
||||||
|
return sharedDataHandlers.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,37 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.gossip.event.data;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Event handler interface for the per node data items.
|
||||||
|
* Classes which implement this interface get notifications when per node data item get changed.
|
||||||
|
*/
|
||||||
|
public interface UpdateNodeDataEventHandler {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This method get called when a per node datum get changed.
|
||||||
|
*
|
||||||
|
* @param nodeId id of the node that change the value
|
||||||
|
* @param key key of the datum
|
||||||
|
* @param oldValue previous value of the datum or null if the datum is discovered
|
||||||
|
* for the first time
|
||||||
|
* @param newValue updated value of the datum
|
||||||
|
*/
|
||||||
|
void onUpdate(String nodeId, String key, Object oldValue, Object newValue);
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,34 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.gossip.event.data;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Event handler interface for shared data items.
|
||||||
|
* Classes which implement this interface get notifications when shared data get changed.
|
||||||
|
*/
|
||||||
|
public interface UpdateSharedDataEventHandler {
|
||||||
|
/**
|
||||||
|
* This method get called when shared data get changed.
|
||||||
|
*
|
||||||
|
* @param key key of the shared data item
|
||||||
|
* @param oldValue previous value or null if the data is discovered for the first time
|
||||||
|
* @param newValue updated value of the data item
|
||||||
|
*/
|
||||||
|
void onUpdate(String key, Object oldValue, Object newValue);
|
||||||
|
|
||||||
|
}
|
@ -20,12 +20,18 @@ package org.apache.gossip.manager;
|
|||||||
import com.codahale.metrics.Gauge;
|
import com.codahale.metrics.Gauge;
|
||||||
import com.codahale.metrics.Meter;
|
import com.codahale.metrics.Meter;
|
||||||
import com.codahale.metrics.MetricRegistry;
|
import com.codahale.metrics.MetricRegistry;
|
||||||
import org.apache.gossip.Member;
|
|
||||||
import org.apache.gossip.LocalMember;
|
import org.apache.gossip.LocalMember;
|
||||||
|
import org.apache.gossip.Member;
|
||||||
import org.apache.gossip.RemoteMember;
|
import org.apache.gossip.RemoteMember;
|
||||||
import org.apache.gossip.crdt.Crdt;
|
import org.apache.gossip.crdt.Crdt;
|
||||||
import org.apache.gossip.event.GossipState;
|
import org.apache.gossip.event.GossipState;
|
||||||
import org.apache.gossip.model.*;
|
import org.apache.gossip.event.data.DataEventManager;
|
||||||
|
import org.apache.gossip.event.data.UpdateNodeDataEventHandler;
|
||||||
|
import org.apache.gossip.event.data.UpdateSharedDataEventHandler;
|
||||||
|
import org.apache.gossip.model.Base;
|
||||||
|
import org.apache.gossip.model.PerNodeDataMessage;
|
||||||
|
import org.apache.gossip.model.Response;
|
||||||
|
import org.apache.gossip.model.SharedDataMessage;
|
||||||
import org.apache.gossip.udp.Trackable;
|
import org.apache.gossip.udp.Trackable;
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
|
|
||||||
@ -55,6 +61,7 @@ public class GossipCore implements GossipCoreConstants {
|
|||||||
private final Meter messageSerdeException;
|
private final Meter messageSerdeException;
|
||||||
private final Meter tranmissionException;
|
private final Meter tranmissionException;
|
||||||
private final Meter tranmissionSuccess;
|
private final Meter tranmissionSuccess;
|
||||||
|
private final DataEventManager eventManager;
|
||||||
|
|
||||||
public GossipCore(GossipManager manager, MetricRegistry metrics){
|
public GossipCore(GossipManager manager, MetricRegistry metrics){
|
||||||
this.gossipManager = manager;
|
this.gossipManager = manager;
|
||||||
@ -62,6 +69,7 @@ public class GossipCore implements GossipCoreConstants {
|
|||||||
workQueue = new ArrayBlockingQueue<>(1024);
|
workQueue = new ArrayBlockingQueue<>(1024);
|
||||||
perNodeData = new ConcurrentHashMap<>();
|
perNodeData = new ConcurrentHashMap<>();
|
||||||
sharedData = new ConcurrentHashMap<>();
|
sharedData = new ConcurrentHashMap<>();
|
||||||
|
eventManager = new DataEventManager(metrics);
|
||||||
metrics.register(WORKQUEUE_SIZE, (Gauge<Integer>)() -> workQueue.size());
|
metrics.register(WORKQUEUE_SIZE, (Gauge<Integer>)() -> workQueue.size());
|
||||||
metrics.register(PER_NODE_DATA_SIZE, (Gauge<Integer>)() -> perNodeData.size());
|
metrics.register(PER_NODE_DATA_SIZE, (Gauge<Integer>)() -> perNodeData.size());
|
||||||
metrics.register(SHARED_DATA_SIZE, (Gauge<Integer>)() -> sharedData.size());
|
metrics.register(SHARED_DATA_SIZE, (Gauge<Integer>)() -> sharedData.size());
|
||||||
@ -76,6 +84,7 @@ public class GossipCore implements GossipCoreConstants {
|
|||||||
while (true){
|
while (true){
|
||||||
SharedDataMessage previous = sharedData.putIfAbsent(message.getKey(), message);
|
SharedDataMessage previous = sharedData.putIfAbsent(message.getKey(), message);
|
||||||
if (previous == null){
|
if (previous == null){
|
||||||
|
eventManager.notifySharedData(message.getKey(), message.getPayload(), null);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (message.getPayload() instanceof Crdt){
|
if (message.getPayload() instanceof Crdt){
|
||||||
@ -88,12 +97,17 @@ public class GossipCore implements GossipCoreConstants {
|
|||||||
merged.setPayload(mergedCrdt);
|
merged.setPayload(mergedCrdt);
|
||||||
boolean replaced = sharedData.replace(message.getKey(), previous, merged);
|
boolean replaced = sharedData.replace(message.getKey(), previous, merged);
|
||||||
if (replaced){
|
if (replaced){
|
||||||
|
if(!merged.getPayload().equals(previous.getPayload())) {
|
||||||
|
eventManager
|
||||||
|
.notifySharedData(message.getKey(), merged.getPayload(), previous.getPayload());
|
||||||
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (previous.getTimestamp() < message.getTimestamp()){
|
if (previous.getTimestamp() < message.getTimestamp()){
|
||||||
boolean result = sharedData.replace(message.getKey(), previous, message);
|
boolean result = sharedData.replace(message.getKey(), previous, message);
|
||||||
if (result){
|
if (result){
|
||||||
|
eventManager.notifySharedData(message.getKey(), message.getPayload(), previous.getPayload());
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@ -111,11 +125,16 @@ public class GossipCore implements GossipCoreConstants {
|
|||||||
PerNodeDataMessage current = nodeMap.get(message.getKey());
|
PerNodeDataMessage current = nodeMap.get(message.getKey());
|
||||||
if (current == null){
|
if (current == null){
|
||||||
nodeMap.putIfAbsent(message.getKey(), message);
|
nodeMap.putIfAbsent(message.getKey(), message);
|
||||||
|
eventManager.notifyPerNodeData(message.getNodeId(), message.getKey(), message.getPayload(), null);
|
||||||
} else {
|
} else {
|
||||||
if (current.getTimestamp() < message.getTimestamp()){
|
if (current.getTimestamp() < message.getTimestamp()){
|
||||||
nodeMap.replace(message.getKey(), current, message);
|
nodeMap.replace(message.getKey(), current, message);
|
||||||
|
eventManager.notifyPerNodeData(message.getNodeId(), message.getKey(), message.getPayload(),
|
||||||
|
current.getPayload());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
eventManager.notifyPerNodeData(message.getNodeId(), message.getKey(), message.getPayload(), null);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -297,4 +316,20 @@ public class GossipCore implements GossipCoreConstants {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void registerPerNodeDataSubscriber(UpdateNodeDataEventHandler handler){
|
||||||
|
eventManager.registerPerNodeDataSubscriber(handler);
|
||||||
|
}
|
||||||
|
|
||||||
|
void registerSharedDataSubscriber(UpdateSharedDataEventHandler handler){
|
||||||
|
eventManager.registerSharedDataSubscriber(handler);
|
||||||
|
}
|
||||||
|
|
||||||
|
void unregisterPerNodeDataSubscriber(UpdateNodeDataEventHandler handler){
|
||||||
|
eventManager.unregisterPerNodeDataSubscriber(handler);
|
||||||
|
}
|
||||||
|
|
||||||
|
void unregisterSharedDataSubscriber(UpdateSharedDataEventHandler handler){
|
||||||
|
eventManager.unregisterSharedDataSubscriber(handler);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -26,6 +26,8 @@ import org.apache.gossip.Member;
|
|||||||
import org.apache.gossip.crdt.Crdt;
|
import org.apache.gossip.crdt.Crdt;
|
||||||
import org.apache.gossip.event.GossipListener;
|
import org.apache.gossip.event.GossipListener;
|
||||||
import org.apache.gossip.event.GossipState;
|
import org.apache.gossip.event.GossipState;
|
||||||
|
import org.apache.gossip.event.data.UpdateNodeDataEventHandler;
|
||||||
|
import org.apache.gossip.event.data.UpdateSharedDataEventHandler;
|
||||||
import org.apache.gossip.manager.handlers.MessageHandler;
|
import org.apache.gossip.manager.handlers.MessageHandler;
|
||||||
import org.apache.gossip.model.PerNodeDataMessage;
|
import org.apache.gossip.model.PerNodeDataMessage;
|
||||||
import org.apache.gossip.model.SharedDataMessage;
|
import org.apache.gossip.model.SharedDataMessage;
|
||||||
@ -348,4 +350,20 @@ public abstract class GossipManager {
|
|||||||
return new File(manager.getSettings().getPathToDataState(), "pernodedata."
|
return new File(manager.getSettings().getPathToDataState(), "pernodedata."
|
||||||
+ manager.getMyself().getClusterName() + "." + manager.getMyself().getId() + ".json");
|
+ manager.getMyself().getClusterName() + "." + manager.getMyself().getId() + ".json");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void registerPerNodeDataSubscriber(UpdateNodeDataEventHandler handler){
|
||||||
|
gossipCore.registerPerNodeDataSubscriber(handler);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void registerSharedDataSubscriber(UpdateSharedDataEventHandler handler){
|
||||||
|
gossipCore.registerSharedDataSubscriber(handler);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void unregisterPerNodeDataSubscriber(UpdateNodeDataEventHandler handler){
|
||||||
|
gossipCore.unregisterPerNodeDataSubscriber(handler);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void unregisterSharedDataSubscriber(UpdateSharedDataEventHandler handler){
|
||||||
|
gossipCore.unregisterSharedDataSubscriber(handler);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,166 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.gossip.event.data;
|
||||||
|
|
||||||
|
import com.codahale.metrics.MetricRegistry;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.platform.runner.JUnitPlatform;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
|
||||||
|
import java.util.concurrent.Semaphore;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
@RunWith(JUnitPlatform.class)
|
||||||
|
public class DataEventManagerTest {
|
||||||
|
|
||||||
|
private static Semaphore semaphore;
|
||||||
|
private String receivedNodeId;
|
||||||
|
private String receivedKey;
|
||||||
|
private Object receivedNewValue;
|
||||||
|
private Object receivedOldValue;
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setup() {
|
||||||
|
semaphore = new Semaphore(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void perNodeDataEventHandlerAddRemoveTest() {
|
||||||
|
DataEventManager eventManager = new DataEventManager(new MetricRegistry());
|
||||||
|
|
||||||
|
UpdateNodeDataEventHandler nodeDataEventHandler = (nodeId, key, oldValue, newValue) -> {
|
||||||
|
};
|
||||||
|
|
||||||
|
eventManager.registerPerNodeDataSubscriber(nodeDataEventHandler);
|
||||||
|
Assert.assertEquals(1, eventManager.getPerNodeSubscribersSize());
|
||||||
|
eventManager.unregisterPerNodeDataSubscriber(nodeDataEventHandler);
|
||||||
|
Assert.assertEquals(0, eventManager.getPerNodeSubscribersSize());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test whether the per node data events are fired for matching key
|
||||||
|
@Test
|
||||||
|
public void perNodeDataEventHandlerTest() throws InterruptedException {
|
||||||
|
DataEventManager eventManager = new DataEventManager(new MetricRegistry());
|
||||||
|
resetData();
|
||||||
|
|
||||||
|
// A new subscriber "Juliet" is like to notified when per node data change for the key "Romeo"
|
||||||
|
UpdateNodeDataEventHandler juliet = (nodeId, key, oldValue, newValue) -> {
|
||||||
|
if(!key.equals("Romeo")) return;
|
||||||
|
receivedNodeId = nodeId;
|
||||||
|
receivedKey = key;
|
||||||
|
receivedNewValue = newValue;
|
||||||
|
receivedOldValue = oldValue;
|
||||||
|
semaphore.release();
|
||||||
|
};
|
||||||
|
// Juliet register with eventManager
|
||||||
|
eventManager.registerPerNodeDataSubscriber(juliet);
|
||||||
|
// Romeo is going to sleep after having dinner
|
||||||
|
eventManager.notifyPerNodeData("Montague", "Romeo", "sleeping", "eating");
|
||||||
|
|
||||||
|
// Juliet should notified
|
||||||
|
semaphore.tryAcquire(2, TimeUnit.SECONDS);
|
||||||
|
Assert.assertEquals("Montague", receivedNodeId);
|
||||||
|
Assert.assertEquals("Romeo", receivedKey);
|
||||||
|
Assert.assertEquals("sleeping", receivedNewValue);
|
||||||
|
Assert.assertEquals("eating", receivedOldValue);
|
||||||
|
|
||||||
|
eventManager.unregisterPerNodeDataSubscriber(juliet);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void sharedDataEventHandlerAddRemoveTest() {
|
||||||
|
DataEventManager eventManager = new DataEventManager(new MetricRegistry());
|
||||||
|
|
||||||
|
UpdateSharedDataEventHandler sharedDataEventHandler = (key, oldValue, newValue) -> {
|
||||||
|
|
||||||
|
};
|
||||||
|
eventManager.registerSharedDataSubscriber(sharedDataEventHandler);
|
||||||
|
Assert.assertEquals(1, eventManager.getSharedDataSubscribersSize());
|
||||||
|
eventManager.unregisterSharedDataSubscriber(sharedDataEventHandler);
|
||||||
|
Assert.assertEquals(0, eventManager.getSharedDataSubscribersSize());
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test whether the shared data events are fired
|
||||||
|
@Test
|
||||||
|
public void sharedDataEventHandlerTest() throws InterruptedException {
|
||||||
|
DataEventManager eventManager = new DataEventManager(new MetricRegistry());
|
||||||
|
resetData();
|
||||||
|
|
||||||
|
// A new subscriber "Alice" is like to notified when shared data change for the key "technology"
|
||||||
|
UpdateSharedDataEventHandler alice = (key, oldValue, newValue) -> {
|
||||||
|
if(!key.equals("technology")) return;
|
||||||
|
receivedKey = key;
|
||||||
|
receivedNewValue = newValue;
|
||||||
|
receivedOldValue = oldValue;
|
||||||
|
semaphore.release();
|
||||||
|
};
|
||||||
|
// Alice register with eventManager
|
||||||
|
eventManager.registerSharedDataSubscriber(alice);
|
||||||
|
|
||||||
|
// technology key get changed
|
||||||
|
eventManager.notifySharedData("technology", "Java has lambda", "Java is fast");
|
||||||
|
|
||||||
|
// Alice should notified
|
||||||
|
semaphore.tryAcquire(2, TimeUnit.SECONDS);
|
||||||
|
Assert.assertEquals("technology", receivedKey);
|
||||||
|
Assert.assertEquals("Java has lambda", receivedNewValue);
|
||||||
|
Assert.assertEquals("Java is fast", receivedOldValue);
|
||||||
|
|
||||||
|
eventManager.unregisterSharedDataSubscriber(alice);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test the MetricRegistry
|
||||||
|
@Test
|
||||||
|
public void metricRegistryTest() {
|
||||||
|
MetricRegistry registry = new MetricRegistry();
|
||||||
|
|
||||||
|
DataEventManager eventManager = new DataEventManager(registry);
|
||||||
|
|
||||||
|
UpdateNodeDataEventHandler nodeDataEventHandler = (nodeId, key, oldValue, newValue) -> {
|
||||||
|
};
|
||||||
|
|
||||||
|
UpdateSharedDataEventHandler sharedDataEventHandler = (key, oldValue, newValue) -> {
|
||||||
|
};
|
||||||
|
|
||||||
|
eventManager.registerPerNodeDataSubscriber(nodeDataEventHandler);
|
||||||
|
eventManager.registerSharedDataSubscriber(sharedDataEventHandler);
|
||||||
|
|
||||||
|
Assert.assertEquals(1,
|
||||||
|
registry.getGauges().get(DataEventConstants.PER_NODE_DATA_SUBSCRIBERS_SIZE).getValue());
|
||||||
|
Assert.assertEquals(0,
|
||||||
|
registry.getGauges().get(DataEventConstants.PER_NODE_DATA_SUBSCRIBERS_QUEUE_SIZE)
|
||||||
|
.getValue());
|
||||||
|
Assert.assertEquals(1,
|
||||||
|
registry.getGauges().get(DataEventConstants.SHARED_DATA_SUBSCRIBERS_SIZE).getValue());
|
||||||
|
Assert.assertEquals(0,
|
||||||
|
registry.getGauges().get(DataEventConstants.SHARED_DATA_SUBSCRIBERS_QUEUE_SIZE)
|
||||||
|
.getValue());
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private void resetData() {
|
||||||
|
receivedNodeId = null;
|
||||||
|
receivedKey = null;
|
||||||
|
receivedNewValue = null;
|
||||||
|
receivedOldValue = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -79,10 +79,21 @@ public class StandAloneNodeCrdtOrSet {
|
|||||||
} else if (op == 'g') {
|
} else if (op == 'g') {
|
||||||
gcount(val, gossipService);
|
gcount(val, gossipService);
|
||||||
}
|
}
|
||||||
|
if (op == 'l') {
|
||||||
|
listen(val, gossipService);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static void listen(String val, GossipManager gossipManager) {
|
||||||
|
gossipManager.registerSharedDataSubscriber((key, oldValue, newValue) -> {
|
||||||
|
if (key.equals(val)) {
|
||||||
|
System.out.println("Event Handler fired! " + oldValue + " " + newValue);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
private static void gcount(String val, GossipManager gossipManager) {
|
private static void gcount(String val, GossipManager gossipManager) {
|
||||||
GrowOnlyCounter c = (GrowOnlyCounter) gossipManager.findCrdt("def");
|
GrowOnlyCounter c = (GrowOnlyCounter) gossipManager.findCrdt("def");
|
||||||
Long l = Long.valueOf(val);
|
Long l = Long.valueOf(val);
|
||||||
|
@ -0,0 +1,130 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.gossip;
|
||||||
|
|
||||||
|
import io.teknek.tunit.TUnit;
|
||||||
|
import org.apache.gossip.manager.GossipManager;
|
||||||
|
import org.apache.gossip.manager.GossipManagerBuilder;
|
||||||
|
import org.apache.gossip.model.PerNodeDataMessage;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.platform.runner.JUnitPlatform;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
|
||||||
|
import java.net.URI;
|
||||||
|
import java.net.URISyntaxException;
|
||||||
|
import java.net.UnknownHostException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.UUID;
|
||||||
|
import java.util.concurrent.Semaphore;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
@RunWith(JUnitPlatform.class)
|
||||||
|
public class PerNodeDataEventTest extends AbstractIntegrationBase {
|
||||||
|
|
||||||
|
private String receivedKey = "";
|
||||||
|
private String receivingNodeId = "";
|
||||||
|
private Object receivingNodeDataNewValue = "";
|
||||||
|
private Object receivingNodeDataOldValue = "";
|
||||||
|
private Semaphore lock = new Semaphore(0);
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void perNodeDataEventTest()
|
||||||
|
throws InterruptedException, UnknownHostException, URISyntaxException {
|
||||||
|
GossipSettings settings = new GossipSettings();
|
||||||
|
settings.setPersistRingState(false);
|
||||||
|
settings.setPersistDataState(false);
|
||||||
|
String cluster = UUID.randomUUID().toString();
|
||||||
|
int seedNodes = 1;
|
||||||
|
List<Member> startupMembers = new ArrayList<>();
|
||||||
|
for (int i = 1; i < seedNodes + 1; ++i) {
|
||||||
|
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
|
||||||
|
startupMembers.add(new RemoteMember(cluster, uri, i + ""));
|
||||||
|
}
|
||||||
|
final List<GossipManager> clients = new ArrayList<>();
|
||||||
|
final int clusterMembers = 2;
|
||||||
|
for (int i = 1; i < clusterMembers + 1; ++i) {
|
||||||
|
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
|
||||||
|
GossipManager gossipService = GossipManagerBuilder.newBuilder().cluster(cluster).uri(uri)
|
||||||
|
.id(i + "").gossipMembers(startupMembers).gossipSettings(settings).build();
|
||||||
|
clients.add(gossipService);
|
||||||
|
gossipService.init();
|
||||||
|
register(gossipService);
|
||||||
|
}
|
||||||
|
|
||||||
|
// check whether the members are discovered
|
||||||
|
TUnit.assertThat(() -> {
|
||||||
|
int total = 0;
|
||||||
|
for (int i = 0; i < clusterMembers; ++i) {
|
||||||
|
total += clients.get(i).getLiveMembers().size();
|
||||||
|
}
|
||||||
|
return total;
|
||||||
|
}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(2);
|
||||||
|
|
||||||
|
// Adding new data to Node 1
|
||||||
|
clients.get(0).gossipPerNodeData(getPerNodeData("category", "distributed"));
|
||||||
|
|
||||||
|
// Node 2 is interested in data changes for the key "organization" and "category"
|
||||||
|
clients.get(1).registerPerNodeDataSubscriber((nodeId, key, oldValue, newValue) -> {
|
||||||
|
if (!key.equals("organization") && !key.equals("category")) return;
|
||||||
|
receivingNodeId = nodeId;
|
||||||
|
receivedKey = key;
|
||||||
|
receivingNodeDataOldValue = oldValue;
|
||||||
|
receivingNodeDataNewValue = newValue;
|
||||||
|
lock.release();
|
||||||
|
});
|
||||||
|
|
||||||
|
// Node 2 first time adds Node 1 data
|
||||||
|
lock.tryAcquire(10, TimeUnit.SECONDS);
|
||||||
|
Assert.assertEquals("1", receivingNodeId);
|
||||||
|
Assert.assertEquals("category", receivedKey);
|
||||||
|
Assert.assertEquals(null, receivingNodeDataOldValue);
|
||||||
|
Assert.assertEquals("distributed", receivingNodeDataNewValue);
|
||||||
|
|
||||||
|
// Node 1 adds new per node data
|
||||||
|
clients.get(0).gossipPerNodeData(getPerNodeData("organization", "apache"));
|
||||||
|
// Node 2 adds new data key from Node 1
|
||||||
|
lock.tryAcquire(10, TimeUnit.SECONDS);
|
||||||
|
Assert.assertEquals("1", receivingNodeId);
|
||||||
|
Assert.assertEquals("organization", receivedKey);
|
||||||
|
Assert.assertEquals(null, receivingNodeDataOldValue);
|
||||||
|
Assert.assertEquals("apache", receivingNodeDataNewValue);
|
||||||
|
|
||||||
|
// Node 1 updates its value
|
||||||
|
clients.get(0).gossipPerNodeData(getPerNodeData("organization", "apache-gossip"));
|
||||||
|
// Node 2 updates existing value
|
||||||
|
lock.tryAcquire(10, TimeUnit.SECONDS);
|
||||||
|
Assert.assertEquals("1", receivingNodeId);
|
||||||
|
Assert.assertEquals("organization", receivedKey);
|
||||||
|
Assert.assertEquals("apache", receivingNodeDataOldValue);
|
||||||
|
Assert.assertEquals("apache-gossip", receivingNodeDataNewValue);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private PerNodeDataMessage getPerNodeData(String key, String value) {
|
||||||
|
PerNodeDataMessage g = new PerNodeDataMessage();
|
||||||
|
g.setExpireAt(Long.MAX_VALUE);
|
||||||
|
g.setKey(key);
|
||||||
|
g.setPayload(value);
|
||||||
|
g.setTimestamp(System.currentTimeMillis());
|
||||||
|
return g;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,205 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.gossip;
|
||||||
|
|
||||||
|
import io.teknek.tunit.TUnit;
|
||||||
|
import org.apache.gossip.crdt.GrowOnlyCounter;
|
||||||
|
import org.apache.gossip.manager.GossipManager;
|
||||||
|
import org.apache.gossip.manager.GossipManagerBuilder;
|
||||||
|
import org.apache.gossip.model.SharedDataMessage;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.platform.runner.JUnitPlatform;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
|
||||||
|
import java.net.URI;
|
||||||
|
import java.net.URISyntaxException;
|
||||||
|
import java.net.UnknownHostException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.UUID;
|
||||||
|
import java.util.concurrent.Semaphore;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
@RunWith(JUnitPlatform.class)
|
||||||
|
public class SharedDataEventTest extends AbstractIntegrationBase {
|
||||||
|
|
||||||
|
private String receivedKey = "";
|
||||||
|
private Object receivingNodeDataNewValue = "";
|
||||||
|
private Object receivingNodeDataOldValue = "";
|
||||||
|
private String gCounterKey = "gCounter";
|
||||||
|
private Semaphore lock = new Semaphore(0);
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void sharedDataEventTest()
|
||||||
|
throws InterruptedException, UnknownHostException, URISyntaxException {
|
||||||
|
GossipSettings settings = new GossipSettings();
|
||||||
|
settings.setPersistRingState(false);
|
||||||
|
settings.setPersistDataState(false);
|
||||||
|
String cluster = UUID.randomUUID().toString();
|
||||||
|
int seedNodes = 1;
|
||||||
|
List<Member> startupMembers = new ArrayList<>();
|
||||||
|
for (int i = 1; i < seedNodes + 1; ++i) {
|
||||||
|
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
|
||||||
|
startupMembers.add(new RemoteMember(cluster, uri, i + ""));
|
||||||
|
}
|
||||||
|
final List<GossipManager> clients = new ArrayList<>();
|
||||||
|
final int clusterMembers = 2;
|
||||||
|
for (int i = 1; i < clusterMembers + 1; ++i) {
|
||||||
|
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
|
||||||
|
GossipManager gossipService = GossipManagerBuilder.newBuilder().cluster(cluster).uri(uri)
|
||||||
|
.id(i + "").gossipMembers(startupMembers).gossipSettings(settings).build();
|
||||||
|
clients.add(gossipService);
|
||||||
|
gossipService.init();
|
||||||
|
register(gossipService);
|
||||||
|
}
|
||||||
|
|
||||||
|
// check whether the members are discovered
|
||||||
|
TUnit.assertThat(() -> {
|
||||||
|
int total = 0;
|
||||||
|
for (int i = 0; i < clusterMembers; ++i) {
|
||||||
|
total += clients.get(i).getLiveMembers().size();
|
||||||
|
}
|
||||||
|
return total;
|
||||||
|
}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(2);
|
||||||
|
|
||||||
|
// Adding new data to Node 1
|
||||||
|
clients.get(0).gossipSharedData(sharedNodeData("category", "distributed"));
|
||||||
|
|
||||||
|
// Node 2 is interested in data changes for the key "organization" and "category"
|
||||||
|
clients.get(1).registerSharedDataSubscriber((key, oldValue, newValue) -> {
|
||||||
|
if (!key.equals("organization") && !key.equals("category"))
|
||||||
|
return;
|
||||||
|
receivedKey = key;
|
||||||
|
receivingNodeDataOldValue = oldValue;
|
||||||
|
receivingNodeDataNewValue = newValue;
|
||||||
|
lock.release();
|
||||||
|
});
|
||||||
|
|
||||||
|
// Node 2 first time gets shared data
|
||||||
|
lock.tryAcquire(10, TimeUnit.SECONDS);
|
||||||
|
Assert.assertEquals("category", receivedKey);
|
||||||
|
Assert.assertEquals(null, receivingNodeDataOldValue);
|
||||||
|
Assert.assertEquals("distributed", receivingNodeDataNewValue);
|
||||||
|
|
||||||
|
// Node 1 adds new per node data
|
||||||
|
clients.get(0).gossipSharedData(sharedNodeData("organization", "apache"));
|
||||||
|
// Node 2 adds new shared data
|
||||||
|
lock.tryAcquire(10, TimeUnit.SECONDS);
|
||||||
|
Assert.assertEquals("organization", receivedKey);
|
||||||
|
Assert.assertEquals(null, receivingNodeDataOldValue);
|
||||||
|
Assert.assertEquals("apache", receivingNodeDataNewValue);
|
||||||
|
|
||||||
|
// Node 1 updates its value
|
||||||
|
clients.get(0).gossipSharedData(sharedNodeData("organization", "apache-gossip"));
|
||||||
|
|
||||||
|
// Node 2 updates existing value
|
||||||
|
lock.tryAcquire(10, TimeUnit.SECONDS);
|
||||||
|
Assert.assertEquals("organization", receivedKey);
|
||||||
|
Assert.assertEquals("apache", receivingNodeDataOldValue);
|
||||||
|
Assert.assertEquals("apache-gossip", receivingNodeDataNewValue);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void CrdtDataChangeEventTest()
|
||||||
|
throws InterruptedException, UnknownHostException, URISyntaxException {
|
||||||
|
GossipSettings settings = new GossipSettings();
|
||||||
|
settings.setPersistRingState(false);
|
||||||
|
settings.setPersistDataState(false);
|
||||||
|
String cluster = UUID.randomUUID().toString();
|
||||||
|
int seedNodes = 1;
|
||||||
|
List<Member> startupMembers = new ArrayList<>();
|
||||||
|
for (int i = 1; i < seedNodes + 1; ++i) {
|
||||||
|
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
|
||||||
|
startupMembers.add(new RemoteMember(cluster, uri, i + ""));
|
||||||
|
}
|
||||||
|
final List<GossipManager> clients = new ArrayList<>();
|
||||||
|
final int clusterMembers = 3;
|
||||||
|
for (int i = 1; i < clusterMembers + 1; ++i) {
|
||||||
|
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
|
||||||
|
GossipManager gossipService = GossipManagerBuilder.newBuilder().cluster(cluster).uri(uri)
|
||||||
|
.id(i + "").gossipMembers(startupMembers).gossipSettings(settings).build();
|
||||||
|
clients.add(gossipService);
|
||||||
|
gossipService.init();
|
||||||
|
register(gossipService);
|
||||||
|
}
|
||||||
|
|
||||||
|
// check whether the members are discovered
|
||||||
|
TUnit.assertThat(() -> {
|
||||||
|
int total = 0;
|
||||||
|
for (int i = 0; i < clusterMembers; ++i) {
|
||||||
|
total += clients.get(i).getLiveMembers().size();
|
||||||
|
}
|
||||||
|
return total;
|
||||||
|
}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(2);
|
||||||
|
|
||||||
|
clients.get(1).registerSharedDataSubscriber((key, oldValue, newValue) -> {
|
||||||
|
receivedKey = key;
|
||||||
|
receivingNodeDataOldValue = oldValue;
|
||||||
|
receivingNodeDataNewValue = newValue;
|
||||||
|
lock.release();
|
||||||
|
});
|
||||||
|
|
||||||
|
// Add initial gCounter to Node 1
|
||||||
|
SharedDataMessage d = new SharedDataMessage();
|
||||||
|
d.setKey(gCounterKey);
|
||||||
|
d.setPayload(new GrowOnlyCounter(new GrowOnlyCounter.Builder(clients.get(0)).increment(1L)));
|
||||||
|
d.setExpireAt(Long.MAX_VALUE);
|
||||||
|
d.setTimestamp(System.currentTimeMillis());
|
||||||
|
clients.get(0).merge(d);
|
||||||
|
|
||||||
|
// Check if initial Crdt received
|
||||||
|
lock.tryAcquire(10, TimeUnit.SECONDS);
|
||||||
|
Assert.assertEquals("gCounter", receivedKey);
|
||||||
|
Assert.assertEquals(null, receivingNodeDataOldValue);
|
||||||
|
Assert.assertTrue(receivingNodeDataNewValue instanceof GrowOnlyCounter);
|
||||||
|
Assert.assertEquals(1, ((GrowOnlyCounter) receivingNodeDataNewValue).value().longValue());
|
||||||
|
|
||||||
|
// Node 3 Updates the gCounter by 4
|
||||||
|
GrowOnlyCounter gc = (GrowOnlyCounter) clients.get(2).findCrdt(gCounterKey);
|
||||||
|
GrowOnlyCounter gcNew = new GrowOnlyCounter(gc,
|
||||||
|
new GrowOnlyCounter.Builder(clients.get(2)).increment(4L));
|
||||||
|
|
||||||
|
d = new SharedDataMessage();
|
||||||
|
d.setKey(gCounterKey);
|
||||||
|
d.setPayload(gcNew);
|
||||||
|
d.setExpireAt(Long.MAX_VALUE);
|
||||||
|
d.setTimestamp(System.currentTimeMillis());
|
||||||
|
clients.get(2).merge(d);
|
||||||
|
|
||||||
|
// Check if Node 3's Crdt update is received in Node 2 event handler
|
||||||
|
lock.tryAcquire(10, TimeUnit.SECONDS);
|
||||||
|
Assert.assertEquals("gCounter", receivedKey);
|
||||||
|
Assert.assertTrue(receivingNodeDataOldValue instanceof GrowOnlyCounter);
|
||||||
|
Assert.assertEquals(1, ((GrowOnlyCounter) receivingNodeDataOldValue).value().longValue());
|
||||||
|
Assert.assertTrue(receivingNodeDataNewValue instanceof GrowOnlyCounter);
|
||||||
|
Assert.assertEquals(5, ((GrowOnlyCounter) receivingNodeDataNewValue).value().longValue());
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private SharedDataMessage sharedNodeData(String key, String value) {
|
||||||
|
SharedDataMessage g = new SharedDataMessage();
|
||||||
|
g.setExpireAt(Long.MAX_VALUE);
|
||||||
|
g.setKey(key);
|
||||||
|
g.setPayload(value);
|
||||||
|
g.setTimestamp(System.currentTimeMillis());
|
||||||
|
return g;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Reference in New Issue
Block a user