This commit is contained in:
Edward Capriolo
2017-06-20 08:56:39 -04:00
10 changed files with 786 additions and 6 deletions

View File

@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.event.data;
public class DataEventConstants {
// MetricRegistry
public static final String PER_NODE_DATA_SUBSCRIBERS_SIZE
= "gossip.event.data.pernode.subscribers.size";
public static final String PER_NODE_DATA_SUBSCRIBERS_QUEUE_SIZE
= "gossip.event.data.pernode.subscribers.queue.size";
public static final String SHARED_DATA_SUBSCRIBERS_SIZE
= "gossip.event.data.shared.subscribers.size";
public static final String SHARED_DATA_SUBSCRIBERS_QUEUE_SIZE
= "gossip.event.data.shared.subscribers.queue.size";
// Thread pool
public static final int PER_NODE_DATA_QUEUE_SIZE = 64;
public static final int PER_NODE_DATA_CORE_POOL_SIZE = 1;
public static final int PER_NODE_DATA_MAX_POOL_SIZE = 30;
public static final int PER_NODE_DATA_KEEP_ALIVE_TIME_SECONDS = 1;
public static final int SHARED_DATA_QUEUE_SIZE = 64;
public static final int SHARED_DATA_CORE_POOL_SIZE = 1;
public static final int SHARED_DATA_MAX_POOL_SIZE = 30;
public static final int SHARED_DATA_KEEP_ALIVE_TIME_SECONDS = 1;
}

View File

@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.event.data;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class DataEventManager {
private final List<UpdateNodeDataEventHandler> perNodeDataHandlers;
private final BlockingQueue<Runnable> perNodeDataHandlerQueue;
private final ExecutorService perNodeDataEventExecutor;
private final List<UpdateSharedDataEventHandler> sharedDataHandlers;
private final BlockingQueue<Runnable> sharedDataHandlerQueue;
private final ExecutorService sharedDataEventExecutor;
public DataEventManager(MetricRegistry metrics) {
perNodeDataHandlers = new CopyOnWriteArrayList<>();
perNodeDataHandlerQueue = new ArrayBlockingQueue<>(DataEventConstants.PER_NODE_DATA_QUEUE_SIZE);
perNodeDataEventExecutor = new ThreadPoolExecutor(
DataEventConstants.PER_NODE_DATA_CORE_POOL_SIZE,
DataEventConstants.PER_NODE_DATA_MAX_POOL_SIZE,
DataEventConstants.PER_NODE_DATA_KEEP_ALIVE_TIME_SECONDS, TimeUnit.SECONDS,
perNodeDataHandlerQueue, new ThreadPoolExecutor.DiscardOldestPolicy());
sharedDataHandlers = new CopyOnWriteArrayList<>();
sharedDataHandlerQueue = new ArrayBlockingQueue<>(DataEventConstants.SHARED_DATA_QUEUE_SIZE);
sharedDataEventExecutor = new ThreadPoolExecutor(DataEventConstants.SHARED_DATA_CORE_POOL_SIZE,
DataEventConstants.SHARED_DATA_MAX_POOL_SIZE,
DataEventConstants.SHARED_DATA_KEEP_ALIVE_TIME_SECONDS, TimeUnit.SECONDS,
sharedDataHandlerQueue, new ThreadPoolExecutor.DiscardOldestPolicy());
metrics.register(DataEventConstants.PER_NODE_DATA_SUBSCRIBERS_SIZE,
(Gauge<Integer>) () -> perNodeDataHandlers.size());
metrics.register(DataEventConstants.PER_NODE_DATA_SUBSCRIBERS_QUEUE_SIZE,
(Gauge<Integer>) () -> perNodeDataHandlerQueue.size());
metrics.register(DataEventConstants.SHARED_DATA_SUBSCRIBERS_SIZE,
(Gauge<Integer>) () -> sharedDataHandlers.size());
metrics.register(DataEventConstants.SHARED_DATA_SUBSCRIBERS_QUEUE_SIZE,
(Gauge<Integer>) () -> sharedDataHandlerQueue.size());
}
public void notifySharedData(final String key, final Object newValue, final Object oldValue) {
sharedDataHandlers.forEach(handler -> sharedDataEventExecutor
.execute(() -> handler.onUpdate(key, oldValue, newValue)));
}
public void notifyPerNodeData(final String nodeId, final String key, final Object newValue,
final Object oldValue) {
perNodeDataHandlers.forEach(handler -> perNodeDataEventExecutor
.execute(() -> handler.onUpdate(nodeId, key, oldValue, newValue)));
}
public void registerPerNodeDataSubscriber(UpdateNodeDataEventHandler handler) {
perNodeDataHandlers.add(handler);
}
public void unregisterPerNodeDataSubscriber(UpdateNodeDataEventHandler handler) {
perNodeDataHandlers.remove(handler);
}
public int getPerNodeSubscribersSize() {
return perNodeDataHandlers.size();
}
public void registerSharedDataSubscriber(UpdateSharedDataEventHandler handler) {
sharedDataHandlers.add(handler);
}
public void unregisterSharedDataSubscriber(UpdateSharedDataEventHandler handler) {
sharedDataHandlers.remove(handler);
}
public int getSharedDataSubscribersSize() {
return sharedDataHandlers.size();
}
}

View File

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.event.data;
/**
* Event handler interface for the per node data items.
* Classes which implement this interface get notifications when per node data item get changed.
*/
public interface UpdateNodeDataEventHandler {
/**
* This method get called when a per node datum get changed.
*
* @param nodeId id of the node that change the value
* @param key key of the datum
* @param oldValue previous value of the datum or null if the datum is discovered
* for the first time
* @param newValue updated value of the datum
*/
void onUpdate(String nodeId, String key, Object oldValue, Object newValue);
}

View File

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.event.data;
/**
* Event handler interface for shared data items.
* Classes which implement this interface get notifications when shared data get changed.
*/
public interface UpdateSharedDataEventHandler {
/**
* This method get called when shared data get changed.
*
* @param key key of the shared data item
* @param oldValue previous value or null if the data is discovered for the first time
* @param newValue updated value of the data item
*/
void onUpdate(String key, Object oldValue, Object newValue);
}

View File

@ -20,12 +20,18 @@ package org.apache.gossip.manager;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import org.apache.gossip.Member;
import org.apache.gossip.LocalMember;
import org.apache.gossip.Member;
import org.apache.gossip.RemoteMember;
import org.apache.gossip.crdt.Crdt;
import org.apache.gossip.event.GossipState;
import org.apache.gossip.model.*;
import org.apache.gossip.event.data.DataEventManager;
import org.apache.gossip.event.data.UpdateNodeDataEventHandler;
import org.apache.gossip.event.data.UpdateSharedDataEventHandler;
import org.apache.gossip.model.Base;
import org.apache.gossip.model.PerNodeDataMessage;
import org.apache.gossip.model.Response;
import org.apache.gossip.model.SharedDataMessage;
import org.apache.gossip.udp.Trackable;
import org.apache.log4j.Logger;
@ -55,13 +61,15 @@ public class GossipCore implements GossipCoreConstants {
private final Meter messageSerdeException;
private final Meter tranmissionException;
private final Meter tranmissionSuccess;
private final DataEventManager eventManager;
public GossipCore(GossipManager manager, MetricRegistry metrics){
this.gossipManager = manager;
requests = new ConcurrentHashMap<>();
workQueue = new ArrayBlockingQueue<>(1024);
perNodeData = new ConcurrentHashMap<>();
sharedData = new ConcurrentHashMap<>();
eventManager = new DataEventManager(metrics);
metrics.register(WORKQUEUE_SIZE, (Gauge<Integer>)() -> workQueue.size());
metrics.register(PER_NODE_DATA_SIZE, (Gauge<Integer>)() -> perNodeData.size());
metrics.register(SHARED_DATA_SIZE, (Gauge<Integer>)() -> sharedData.size());
@ -76,6 +84,7 @@ public class GossipCore implements GossipCoreConstants {
while (true){
SharedDataMessage previous = sharedData.putIfAbsent(message.getKey(), message);
if (previous == null){
eventManager.notifySharedData(message.getKey(), message.getPayload(), null);
return;
}
if (message.getPayload() instanceof Crdt){
@ -88,12 +97,17 @@ public class GossipCore implements GossipCoreConstants {
merged.setPayload(mergedCrdt);
boolean replaced = sharedData.replace(message.getKey(), previous, merged);
if (replaced){
if(!merged.getPayload().equals(previous.getPayload())) {
eventManager
.notifySharedData(message.getKey(), merged.getPayload(), previous.getPayload());
}
return;
}
} else {
if (previous.getTimestamp() < message.getTimestamp()){
boolean result = sharedData.replace(message.getKey(), previous, message);
if (result){
eventManager.notifySharedData(message.getKey(), message.getPayload(), previous.getPayload());
return;
}
} else {
@ -102,7 +116,7 @@ public class GossipCore implements GossipCoreConstants {
}
}
}
public void addPerNodeData(PerNodeDataMessage message){
ConcurrentHashMap<String,PerNodeDataMessage> nodeMap = new ConcurrentHashMap<>();
nodeMap.put(message.getKey(), message);
@ -111,11 +125,16 @@ public class GossipCore implements GossipCoreConstants {
PerNodeDataMessage current = nodeMap.get(message.getKey());
if (current == null){
nodeMap.putIfAbsent(message.getKey(), message);
eventManager.notifyPerNodeData(message.getNodeId(), message.getKey(), message.getPayload(), null);
} else {
if (current.getTimestamp() < message.getTimestamp()){
nodeMap.replace(message.getKey(), current, message);
eventManager.notifyPerNodeData(message.getNodeId(), message.getKey(), message.getPayload(),
current.getPayload());
}
}
} else {
eventManager.notifyPerNodeData(message.getNodeId(), message.getKey(), message.getPayload(), null);
}
}
@ -178,7 +197,7 @@ public class GossipCore implements GossipCoreConstants {
sendInternal(message, uri);
if (latchAndBase == null){
return null;
}
}
try {
boolean complete = latchAndBase.latch.await(1, TimeUnit.SECONDS);
@ -297,4 +316,20 @@ public class GossipCore implements GossipCoreConstants {
}
}
}
void registerPerNodeDataSubscriber(UpdateNodeDataEventHandler handler){
eventManager.registerPerNodeDataSubscriber(handler);
}
void registerSharedDataSubscriber(UpdateSharedDataEventHandler handler){
eventManager.registerSharedDataSubscriber(handler);
}
void unregisterPerNodeDataSubscriber(UpdateNodeDataEventHandler handler){
eventManager.unregisterPerNodeDataSubscriber(handler);
}
void unregisterSharedDataSubscriber(UpdateSharedDataEventHandler handler){
eventManager.unregisterSharedDataSubscriber(handler);
}
}

View File

@ -26,6 +26,8 @@ import org.apache.gossip.Member;
import org.apache.gossip.crdt.Crdt;
import org.apache.gossip.event.GossipListener;
import org.apache.gossip.event.GossipState;
import org.apache.gossip.event.data.UpdateNodeDataEventHandler;
import org.apache.gossip.event.data.UpdateSharedDataEventHandler;
import org.apache.gossip.manager.handlers.MessageHandler;
import org.apache.gossip.model.PerNodeDataMessage;
import org.apache.gossip.model.SharedDataMessage;
@ -348,4 +350,20 @@ public abstract class GossipManager {
return new File(manager.getSettings().getPathToDataState(), "pernodedata."
+ manager.getMyself().getClusterName() + "." + manager.getMyself().getId() + ".json");
}
public void registerPerNodeDataSubscriber(UpdateNodeDataEventHandler handler){
gossipCore.registerPerNodeDataSubscriber(handler);
}
public void registerSharedDataSubscriber(UpdateSharedDataEventHandler handler){
gossipCore.registerSharedDataSubscriber(handler);
}
public void unregisterPerNodeDataSubscriber(UpdateNodeDataEventHandler handler){
gossipCore.unregisterPerNodeDataSubscriber(handler);
}
public void unregisterSharedDataSubscriber(UpdateSharedDataEventHandler handler){
gossipCore.unregisterSharedDataSubscriber(handler);
}
}

View File

@ -0,0 +1,166 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.event.data;
import com.codahale.metrics.MetricRegistry;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.platform.runner.JUnitPlatform;
import org.junit.runner.RunWith;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@RunWith(JUnitPlatform.class)
public class DataEventManagerTest {
private static Semaphore semaphore;
private String receivedNodeId;
private String receivedKey;
private Object receivedNewValue;
private Object receivedOldValue;
@BeforeClass
public static void setup() {
semaphore = new Semaphore(0);
}
@Test
public void perNodeDataEventHandlerAddRemoveTest() {
DataEventManager eventManager = new DataEventManager(new MetricRegistry());
UpdateNodeDataEventHandler nodeDataEventHandler = (nodeId, key, oldValue, newValue) -> {
};
eventManager.registerPerNodeDataSubscriber(nodeDataEventHandler);
Assert.assertEquals(1, eventManager.getPerNodeSubscribersSize());
eventManager.unregisterPerNodeDataSubscriber(nodeDataEventHandler);
Assert.assertEquals(0, eventManager.getPerNodeSubscribersSize());
}
// Test whether the per node data events are fired for matching key
@Test
public void perNodeDataEventHandlerTest() throws InterruptedException {
DataEventManager eventManager = new DataEventManager(new MetricRegistry());
resetData();
// A new subscriber "Juliet" is like to notified when per node data change for the key "Romeo"
UpdateNodeDataEventHandler juliet = (nodeId, key, oldValue, newValue) -> {
if(!key.equals("Romeo")) return;
receivedNodeId = nodeId;
receivedKey = key;
receivedNewValue = newValue;
receivedOldValue = oldValue;
semaphore.release();
};
// Juliet register with eventManager
eventManager.registerPerNodeDataSubscriber(juliet);
// Romeo is going to sleep after having dinner
eventManager.notifyPerNodeData("Montague", "Romeo", "sleeping", "eating");
// Juliet should notified
semaphore.tryAcquire(2, TimeUnit.SECONDS);
Assert.assertEquals("Montague", receivedNodeId);
Assert.assertEquals("Romeo", receivedKey);
Assert.assertEquals("sleeping", receivedNewValue);
Assert.assertEquals("eating", receivedOldValue);
eventManager.unregisterPerNodeDataSubscriber(juliet);
}
@Test
public void sharedDataEventHandlerAddRemoveTest() {
DataEventManager eventManager = new DataEventManager(new MetricRegistry());
UpdateSharedDataEventHandler sharedDataEventHandler = (key, oldValue, newValue) -> {
};
eventManager.registerSharedDataSubscriber(sharedDataEventHandler);
Assert.assertEquals(1, eventManager.getSharedDataSubscribersSize());
eventManager.unregisterSharedDataSubscriber(sharedDataEventHandler);
Assert.assertEquals(0, eventManager.getSharedDataSubscribersSize());
}
// Test whether the shared data events are fired
@Test
public void sharedDataEventHandlerTest() throws InterruptedException {
DataEventManager eventManager = new DataEventManager(new MetricRegistry());
resetData();
// A new subscriber "Alice" is like to notified when shared data change for the key "technology"
UpdateSharedDataEventHandler alice = (key, oldValue, newValue) -> {
if(!key.equals("technology")) return;
receivedKey = key;
receivedNewValue = newValue;
receivedOldValue = oldValue;
semaphore.release();
};
// Alice register with eventManager
eventManager.registerSharedDataSubscriber(alice);
// technology key get changed
eventManager.notifySharedData("technology", "Java has lambda", "Java is fast");
// Alice should notified
semaphore.tryAcquire(2, TimeUnit.SECONDS);
Assert.assertEquals("technology", receivedKey);
Assert.assertEquals("Java has lambda", receivedNewValue);
Assert.assertEquals("Java is fast", receivedOldValue);
eventManager.unregisterSharedDataSubscriber(alice);
}
// Test the MetricRegistry
@Test
public void metricRegistryTest() {
MetricRegistry registry = new MetricRegistry();
DataEventManager eventManager = new DataEventManager(registry);
UpdateNodeDataEventHandler nodeDataEventHandler = (nodeId, key, oldValue, newValue) -> {
};
UpdateSharedDataEventHandler sharedDataEventHandler = (key, oldValue, newValue) -> {
};
eventManager.registerPerNodeDataSubscriber(nodeDataEventHandler);
eventManager.registerSharedDataSubscriber(sharedDataEventHandler);
Assert.assertEquals(1,
registry.getGauges().get(DataEventConstants.PER_NODE_DATA_SUBSCRIBERS_SIZE).getValue());
Assert.assertEquals(0,
registry.getGauges().get(DataEventConstants.PER_NODE_DATA_SUBSCRIBERS_QUEUE_SIZE)
.getValue());
Assert.assertEquals(1,
registry.getGauges().get(DataEventConstants.SHARED_DATA_SUBSCRIBERS_SIZE).getValue());
Assert.assertEquals(0,
registry.getGauges().get(DataEventConstants.SHARED_DATA_SUBSCRIBERS_QUEUE_SIZE)
.getValue());
}
private void resetData() {
receivedNodeId = null;
receivedKey = null;
receivedNewValue = null;
receivedOldValue = null;
}
}

View File

@ -79,10 +79,21 @@ public class StandAloneNodeCrdtOrSet {
} else if (op == 'g') {
gcount(val, gossipService);
}
if (op == 'l') {
listen(val, gossipService);
}
}
}
}
private static void listen(String val, GossipManager gossipManager) {
gossipManager.registerSharedDataSubscriber((key, oldValue, newValue) -> {
if (key.equals(val)) {
System.out.println("Event Handler fired! " + oldValue + " " + newValue);
}
});
}
private static void gcount(String val, GossipManager gossipManager) {
GrowOnlyCounter c = (GrowOnlyCounter) gossipManager.findCrdt("def");
Long l = Long.valueOf(val);

View File

@ -0,0 +1,130 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip;
import io.teknek.tunit.TUnit;
import org.apache.gossip.manager.GossipManager;
import org.apache.gossip.manager.GossipManagerBuilder;
import org.apache.gossip.model.PerNodeDataMessage;
import org.junit.Assert;
import org.junit.Test;
import org.junit.platform.runner.JUnitPlatform;
import org.junit.runner.RunWith;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@RunWith(JUnitPlatform.class)
public class PerNodeDataEventTest extends AbstractIntegrationBase {
private String receivedKey = "";
private String receivingNodeId = "";
private Object receivingNodeDataNewValue = "";
private Object receivingNodeDataOldValue = "";
private Semaphore lock = new Semaphore(0);
@Test
public void perNodeDataEventTest()
throws InterruptedException, UnknownHostException, URISyntaxException {
GossipSettings settings = new GossipSettings();
settings.setPersistRingState(false);
settings.setPersistDataState(false);
String cluster = UUID.randomUUID().toString();
int seedNodes = 1;
List<Member> startupMembers = new ArrayList<>();
for (int i = 1; i < seedNodes + 1; ++i) {
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
startupMembers.add(new RemoteMember(cluster, uri, i + ""));
}
final List<GossipManager> clients = new ArrayList<>();
final int clusterMembers = 2;
for (int i = 1; i < clusterMembers + 1; ++i) {
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
GossipManager gossipService = GossipManagerBuilder.newBuilder().cluster(cluster).uri(uri)
.id(i + "").gossipMembers(startupMembers).gossipSettings(settings).build();
clients.add(gossipService);
gossipService.init();
register(gossipService);
}
// check whether the members are discovered
TUnit.assertThat(() -> {
int total = 0;
for (int i = 0; i < clusterMembers; ++i) {
total += clients.get(i).getLiveMembers().size();
}
return total;
}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(2);
// Adding new data to Node 1
clients.get(0).gossipPerNodeData(getPerNodeData("category", "distributed"));
// Node 2 is interested in data changes for the key "organization" and "category"
clients.get(1).registerPerNodeDataSubscriber((nodeId, key, oldValue, newValue) -> {
if (!key.equals("organization") && !key.equals("category")) return;
receivingNodeId = nodeId;
receivedKey = key;
receivingNodeDataOldValue = oldValue;
receivingNodeDataNewValue = newValue;
lock.release();
});
// Node 2 first time adds Node 1 data
lock.tryAcquire(10, TimeUnit.SECONDS);
Assert.assertEquals("1", receivingNodeId);
Assert.assertEquals("category", receivedKey);
Assert.assertEquals(null, receivingNodeDataOldValue);
Assert.assertEquals("distributed", receivingNodeDataNewValue);
// Node 1 adds new per node data
clients.get(0).gossipPerNodeData(getPerNodeData("organization", "apache"));
// Node 2 adds new data key from Node 1
lock.tryAcquire(10, TimeUnit.SECONDS);
Assert.assertEquals("1", receivingNodeId);
Assert.assertEquals("organization", receivedKey);
Assert.assertEquals(null, receivingNodeDataOldValue);
Assert.assertEquals("apache", receivingNodeDataNewValue);
// Node 1 updates its value
clients.get(0).gossipPerNodeData(getPerNodeData("organization", "apache-gossip"));
// Node 2 updates existing value
lock.tryAcquire(10, TimeUnit.SECONDS);
Assert.assertEquals("1", receivingNodeId);
Assert.assertEquals("organization", receivedKey);
Assert.assertEquals("apache", receivingNodeDataOldValue);
Assert.assertEquals("apache-gossip", receivingNodeDataNewValue);
}
private PerNodeDataMessage getPerNodeData(String key, String value) {
PerNodeDataMessage g = new PerNodeDataMessage();
g.setExpireAt(Long.MAX_VALUE);
g.setKey(key);
g.setPayload(value);
g.setTimestamp(System.currentTimeMillis());
return g;
}
}

View File

@ -0,0 +1,205 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip;
import io.teknek.tunit.TUnit;
import org.apache.gossip.crdt.GrowOnlyCounter;
import org.apache.gossip.manager.GossipManager;
import org.apache.gossip.manager.GossipManagerBuilder;
import org.apache.gossip.model.SharedDataMessage;
import org.junit.Assert;
import org.junit.Test;
import org.junit.platform.runner.JUnitPlatform;
import org.junit.runner.RunWith;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@RunWith(JUnitPlatform.class)
public class SharedDataEventTest extends AbstractIntegrationBase {
private String receivedKey = "";
private Object receivingNodeDataNewValue = "";
private Object receivingNodeDataOldValue = "";
private String gCounterKey = "gCounter";
private Semaphore lock = new Semaphore(0);
@Test
public void sharedDataEventTest()
throws InterruptedException, UnknownHostException, URISyntaxException {
GossipSettings settings = new GossipSettings();
settings.setPersistRingState(false);
settings.setPersistDataState(false);
String cluster = UUID.randomUUID().toString();
int seedNodes = 1;
List<Member> startupMembers = new ArrayList<>();
for (int i = 1; i < seedNodes + 1; ++i) {
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
startupMembers.add(new RemoteMember(cluster, uri, i + ""));
}
final List<GossipManager> clients = new ArrayList<>();
final int clusterMembers = 2;
for (int i = 1; i < clusterMembers + 1; ++i) {
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
GossipManager gossipService = GossipManagerBuilder.newBuilder().cluster(cluster).uri(uri)
.id(i + "").gossipMembers(startupMembers).gossipSettings(settings).build();
clients.add(gossipService);
gossipService.init();
register(gossipService);
}
// check whether the members are discovered
TUnit.assertThat(() -> {
int total = 0;
for (int i = 0; i < clusterMembers; ++i) {
total += clients.get(i).getLiveMembers().size();
}
return total;
}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(2);
// Adding new data to Node 1
clients.get(0).gossipSharedData(sharedNodeData("category", "distributed"));
// Node 2 is interested in data changes for the key "organization" and "category"
clients.get(1).registerSharedDataSubscriber((key, oldValue, newValue) -> {
if (!key.equals("organization") && !key.equals("category"))
return;
receivedKey = key;
receivingNodeDataOldValue = oldValue;
receivingNodeDataNewValue = newValue;
lock.release();
});
// Node 2 first time gets shared data
lock.tryAcquire(10, TimeUnit.SECONDS);
Assert.assertEquals("category", receivedKey);
Assert.assertEquals(null, receivingNodeDataOldValue);
Assert.assertEquals("distributed", receivingNodeDataNewValue);
// Node 1 adds new per node data
clients.get(0).gossipSharedData(sharedNodeData("organization", "apache"));
// Node 2 adds new shared data
lock.tryAcquire(10, TimeUnit.SECONDS);
Assert.assertEquals("organization", receivedKey);
Assert.assertEquals(null, receivingNodeDataOldValue);
Assert.assertEquals("apache", receivingNodeDataNewValue);
// Node 1 updates its value
clients.get(0).gossipSharedData(sharedNodeData("organization", "apache-gossip"));
// Node 2 updates existing value
lock.tryAcquire(10, TimeUnit.SECONDS);
Assert.assertEquals("organization", receivedKey);
Assert.assertEquals("apache", receivingNodeDataOldValue);
Assert.assertEquals("apache-gossip", receivingNodeDataNewValue);
}
@Test
public void CrdtDataChangeEventTest()
throws InterruptedException, UnknownHostException, URISyntaxException {
GossipSettings settings = new GossipSettings();
settings.setPersistRingState(false);
settings.setPersistDataState(false);
String cluster = UUID.randomUUID().toString();
int seedNodes = 1;
List<Member> startupMembers = new ArrayList<>();
for (int i = 1; i < seedNodes + 1; ++i) {
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
startupMembers.add(new RemoteMember(cluster, uri, i + ""));
}
final List<GossipManager> clients = new ArrayList<>();
final int clusterMembers = 3;
for (int i = 1; i < clusterMembers + 1; ++i) {
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
GossipManager gossipService = GossipManagerBuilder.newBuilder().cluster(cluster).uri(uri)
.id(i + "").gossipMembers(startupMembers).gossipSettings(settings).build();
clients.add(gossipService);
gossipService.init();
register(gossipService);
}
// check whether the members are discovered
TUnit.assertThat(() -> {
int total = 0;
for (int i = 0; i < clusterMembers; ++i) {
total += clients.get(i).getLiveMembers().size();
}
return total;
}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(2);
clients.get(1).registerSharedDataSubscriber((key, oldValue, newValue) -> {
receivedKey = key;
receivingNodeDataOldValue = oldValue;
receivingNodeDataNewValue = newValue;
lock.release();
});
// Add initial gCounter to Node 1
SharedDataMessage d = new SharedDataMessage();
d.setKey(gCounterKey);
d.setPayload(new GrowOnlyCounter(new GrowOnlyCounter.Builder(clients.get(0)).increment(1L)));
d.setExpireAt(Long.MAX_VALUE);
d.setTimestamp(System.currentTimeMillis());
clients.get(0).merge(d);
// Check if initial Crdt received
lock.tryAcquire(10, TimeUnit.SECONDS);
Assert.assertEquals("gCounter", receivedKey);
Assert.assertEquals(null, receivingNodeDataOldValue);
Assert.assertTrue(receivingNodeDataNewValue instanceof GrowOnlyCounter);
Assert.assertEquals(1, ((GrowOnlyCounter) receivingNodeDataNewValue).value().longValue());
// Node 3 Updates the gCounter by 4
GrowOnlyCounter gc = (GrowOnlyCounter) clients.get(2).findCrdt(gCounterKey);
GrowOnlyCounter gcNew = new GrowOnlyCounter(gc,
new GrowOnlyCounter.Builder(clients.get(2)).increment(4L));
d = new SharedDataMessage();
d.setKey(gCounterKey);
d.setPayload(gcNew);
d.setExpireAt(Long.MAX_VALUE);
d.setTimestamp(System.currentTimeMillis());
clients.get(2).merge(d);
// Check if Node 3's Crdt update is received in Node 2 event handler
lock.tryAcquire(10, TimeUnit.SECONDS);
Assert.assertEquals("gCounter", receivedKey);
Assert.assertTrue(receivingNodeDataOldValue instanceof GrowOnlyCounter);
Assert.assertEquals(1, ((GrowOnlyCounter) receivingNodeDataOldValue).value().longValue());
Assert.assertTrue(receivingNodeDataNewValue instanceof GrowOnlyCounter);
Assert.assertEquals(5, ((GrowOnlyCounter) receivingNodeDataNewValue).value().longValue());
}
private SharedDataMessage sharedNodeData(String key, String value) {
SharedDataMessage g = new SharedDataMessage();
g.setExpireAt(Long.MAX_VALUE);
g.setKey(key);
g.setPayload(value);
g.setTimestamp(System.currentTimeMillis());
return g;
}
}