diff --git a/gossip-base/src/main/java/org/apache/gossip/event/data/DataEventConstants.java b/gossip-base/src/main/java/org/apache/gossip/event/data/DataEventConstants.java new file mode 100644 index 0000000..217087f --- /dev/null +++ b/gossip-base/src/main/java/org/apache/gossip/event/data/DataEventConstants.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.event.data; + +public class DataEventConstants { + + // MetricRegistry + public static final String PER_NODE_DATA_SUBSCRIBERS_SIZE + = "gossip.event.data.pernode.subscribers.size"; + public static final String PER_NODE_DATA_SUBSCRIBERS_QUEUE_SIZE + = "gossip.event.data.pernode.subscribers.queue.size"; + public static final String SHARED_DATA_SUBSCRIBERS_SIZE + = "gossip.event.data.shared.subscribers.size"; + public static final String SHARED_DATA_SUBSCRIBERS_QUEUE_SIZE + = "gossip.event.data.shared.subscribers.queue.size"; + + // Thread pool + public static final int PER_NODE_DATA_QUEUE_SIZE = 64; + public static final int PER_NODE_DATA_CORE_POOL_SIZE = 1; + public static final int PER_NODE_DATA_MAX_POOL_SIZE = 30; + public static final int PER_NODE_DATA_KEEP_ALIVE_TIME_SECONDS = 1; + public static final int SHARED_DATA_QUEUE_SIZE = 64; + public static final int SHARED_DATA_CORE_POOL_SIZE = 1; + public static final int SHARED_DATA_MAX_POOL_SIZE = 30; + public static final int SHARED_DATA_KEEP_ALIVE_TIME_SECONDS = 1; + +} diff --git a/gossip-base/src/main/java/org/apache/gossip/event/data/DataEventManager.java b/gossip-base/src/main/java/org/apache/gossip/event/data/DataEventManager.java new file mode 100644 index 0000000..3124df1 --- /dev/null +++ b/gossip-base/src/main/java/org/apache/gossip/event/data/DataEventManager.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.event.data; + +import com.codahale.metrics.Gauge; +import com.codahale.metrics.MetricRegistry; + +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +public class DataEventManager { + + private final List perNodeDataHandlers; + private final BlockingQueue perNodeDataHandlerQueue; + private final ExecutorService perNodeDataEventExecutor; + private final List sharedDataHandlers; + private final BlockingQueue sharedDataHandlerQueue; + private final ExecutorService sharedDataEventExecutor; + + public DataEventManager(MetricRegistry metrics) { + perNodeDataHandlers = new CopyOnWriteArrayList<>(); + perNodeDataHandlerQueue = new ArrayBlockingQueue<>(DataEventConstants.PER_NODE_DATA_QUEUE_SIZE); + perNodeDataEventExecutor = new ThreadPoolExecutor( + DataEventConstants.PER_NODE_DATA_CORE_POOL_SIZE, + DataEventConstants.PER_NODE_DATA_MAX_POOL_SIZE, + DataEventConstants.PER_NODE_DATA_KEEP_ALIVE_TIME_SECONDS, TimeUnit.SECONDS, + perNodeDataHandlerQueue, new ThreadPoolExecutor.DiscardOldestPolicy()); + + sharedDataHandlers = new CopyOnWriteArrayList<>(); + sharedDataHandlerQueue = new ArrayBlockingQueue<>(DataEventConstants.SHARED_DATA_QUEUE_SIZE); + sharedDataEventExecutor = new ThreadPoolExecutor(DataEventConstants.SHARED_DATA_CORE_POOL_SIZE, + DataEventConstants.SHARED_DATA_MAX_POOL_SIZE, + DataEventConstants.SHARED_DATA_KEEP_ALIVE_TIME_SECONDS, TimeUnit.SECONDS, + sharedDataHandlerQueue, new ThreadPoolExecutor.DiscardOldestPolicy()); + + metrics.register(DataEventConstants.PER_NODE_DATA_SUBSCRIBERS_SIZE, + (Gauge) () -> perNodeDataHandlers.size()); + metrics.register(DataEventConstants.PER_NODE_DATA_SUBSCRIBERS_QUEUE_SIZE, + (Gauge) () -> perNodeDataHandlerQueue.size()); + metrics.register(DataEventConstants.SHARED_DATA_SUBSCRIBERS_SIZE, + (Gauge) () -> sharedDataHandlers.size()); + metrics.register(DataEventConstants.SHARED_DATA_SUBSCRIBERS_QUEUE_SIZE, + (Gauge) () -> sharedDataHandlerQueue.size()); + + } + + public void notifySharedData(final String key, final Object newValue, final Object oldValue) { + sharedDataHandlers.forEach(handler -> sharedDataEventExecutor + .execute(() -> handler.onUpdate(key, oldValue, newValue))); + } + + public void notifyPerNodeData(final String nodeId, final String key, final Object newValue, + final Object oldValue) { + perNodeDataHandlers.forEach(handler -> perNodeDataEventExecutor + .execute(() -> handler.onUpdate(nodeId, key, oldValue, newValue))); + } + + public void registerPerNodeDataSubscriber(UpdateNodeDataEventHandler handler) { + perNodeDataHandlers.add(handler); + } + + public void unregisterPerNodeDataSubscriber(UpdateNodeDataEventHandler handler) { + perNodeDataHandlers.remove(handler); + } + + public int getPerNodeSubscribersSize() { + return perNodeDataHandlers.size(); + } + + public void registerSharedDataSubscriber(UpdateSharedDataEventHandler handler) { + sharedDataHandlers.add(handler); + } + + public void unregisterSharedDataSubscriber(UpdateSharedDataEventHandler handler) { + sharedDataHandlers.remove(handler); + } + + public int getSharedDataSubscribersSize() { + return sharedDataHandlers.size(); + } + +} diff --git a/gossip-base/src/main/java/org/apache/gossip/event/data/UpdateNodeDataEventHandler.java b/gossip-base/src/main/java/org/apache/gossip/event/data/UpdateNodeDataEventHandler.java new file mode 100644 index 0000000..ca88c17 --- /dev/null +++ b/gossip-base/src/main/java/org/apache/gossip/event/data/UpdateNodeDataEventHandler.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.event.data; + +/** + * Event handler interface for the per node data items. + * Classes which implement this interface get notifications when per node data item get changed. + */ +public interface UpdateNodeDataEventHandler { + + /** + * This method get called when a per node datum get changed. + * + * @param nodeId id of the node that change the value + * @param key key of the datum + * @param oldValue previous value of the datum or null if the datum is discovered + * for the first time + * @param newValue updated value of the datum + */ + void onUpdate(String nodeId, String key, Object oldValue, Object newValue); + +} diff --git a/gossip-base/src/main/java/org/apache/gossip/event/data/UpdateSharedDataEventHandler.java b/gossip-base/src/main/java/org/apache/gossip/event/data/UpdateSharedDataEventHandler.java new file mode 100644 index 0000000..5655732 --- /dev/null +++ b/gossip-base/src/main/java/org/apache/gossip/event/data/UpdateSharedDataEventHandler.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.event.data; + +/** + * Event handler interface for shared data items. + * Classes which implement this interface get notifications when shared data get changed. + */ +public interface UpdateSharedDataEventHandler { + /** + * This method get called when shared data get changed. + * + * @param key key of the shared data item + * @param oldValue previous value or null if the data is discovered for the first time + * @param newValue updated value of the data item + */ + void onUpdate(String key, Object oldValue, Object newValue); + +} diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/GossipCore.java b/gossip-base/src/main/java/org/apache/gossip/manager/GossipCore.java index e034432..4167664 100644 --- a/gossip-base/src/main/java/org/apache/gossip/manager/GossipCore.java +++ b/gossip-base/src/main/java/org/apache/gossip/manager/GossipCore.java @@ -20,12 +20,18 @@ package org.apache.gossip.manager; import com.codahale.metrics.Gauge; import com.codahale.metrics.Meter; import com.codahale.metrics.MetricRegistry; -import org.apache.gossip.Member; import org.apache.gossip.LocalMember; +import org.apache.gossip.Member; import org.apache.gossip.RemoteMember; import org.apache.gossip.crdt.Crdt; import org.apache.gossip.event.GossipState; -import org.apache.gossip.model.*; +import org.apache.gossip.event.data.DataEventManager; +import org.apache.gossip.event.data.UpdateNodeDataEventHandler; +import org.apache.gossip.event.data.UpdateSharedDataEventHandler; +import org.apache.gossip.model.Base; +import org.apache.gossip.model.PerNodeDataMessage; +import org.apache.gossip.model.Response; +import org.apache.gossip.model.SharedDataMessage; import org.apache.gossip.udp.Trackable; import org.apache.log4j.Logger; @@ -55,13 +61,15 @@ public class GossipCore implements GossipCoreConstants { private final Meter messageSerdeException; private final Meter tranmissionException; private final Meter tranmissionSuccess; - + private final DataEventManager eventManager; + public GossipCore(GossipManager manager, MetricRegistry metrics){ this.gossipManager = manager; requests = new ConcurrentHashMap<>(); workQueue = new ArrayBlockingQueue<>(1024); perNodeData = new ConcurrentHashMap<>(); sharedData = new ConcurrentHashMap<>(); + eventManager = new DataEventManager(metrics); metrics.register(WORKQUEUE_SIZE, (Gauge)() -> workQueue.size()); metrics.register(PER_NODE_DATA_SIZE, (Gauge)() -> perNodeData.size()); metrics.register(SHARED_DATA_SIZE, (Gauge)() -> sharedData.size()); @@ -76,6 +84,7 @@ public class GossipCore implements GossipCoreConstants { while (true){ SharedDataMessage previous = sharedData.putIfAbsent(message.getKey(), message); if (previous == null){ + eventManager.notifySharedData(message.getKey(), message.getPayload(), null); return; } if (message.getPayload() instanceof Crdt){ @@ -88,12 +97,17 @@ public class GossipCore implements GossipCoreConstants { merged.setPayload(mergedCrdt); boolean replaced = sharedData.replace(message.getKey(), previous, merged); if (replaced){ + if(!merged.getPayload().equals(previous.getPayload())) { + eventManager + .notifySharedData(message.getKey(), merged.getPayload(), previous.getPayload()); + } return; } } else { if (previous.getTimestamp() < message.getTimestamp()){ boolean result = sharedData.replace(message.getKey(), previous, message); if (result){ + eventManager.notifySharedData(message.getKey(), message.getPayload(), previous.getPayload()); return; } } else { @@ -102,7 +116,7 @@ public class GossipCore implements GossipCoreConstants { } } } - + public void addPerNodeData(PerNodeDataMessage message){ ConcurrentHashMap nodeMap = new ConcurrentHashMap<>(); nodeMap.put(message.getKey(), message); @@ -111,11 +125,16 @@ public class GossipCore implements GossipCoreConstants { PerNodeDataMessage current = nodeMap.get(message.getKey()); if (current == null){ nodeMap.putIfAbsent(message.getKey(), message); + eventManager.notifyPerNodeData(message.getNodeId(), message.getKey(), message.getPayload(), null); } else { if (current.getTimestamp() < message.getTimestamp()){ nodeMap.replace(message.getKey(), current, message); + eventManager.notifyPerNodeData(message.getNodeId(), message.getKey(), message.getPayload(), + current.getPayload()); } } + } else { + eventManager.notifyPerNodeData(message.getNodeId(), message.getKey(), message.getPayload(), null); } } @@ -178,7 +197,7 @@ public class GossipCore implements GossipCoreConstants { sendInternal(message, uri); if (latchAndBase == null){ return null; - } + } try { boolean complete = latchAndBase.latch.await(1, TimeUnit.SECONDS); @@ -297,4 +316,20 @@ public class GossipCore implements GossipCoreConstants { } } } + + void registerPerNodeDataSubscriber(UpdateNodeDataEventHandler handler){ + eventManager.registerPerNodeDataSubscriber(handler); + } + + void registerSharedDataSubscriber(UpdateSharedDataEventHandler handler){ + eventManager.registerSharedDataSubscriber(handler); + } + + void unregisterPerNodeDataSubscriber(UpdateNodeDataEventHandler handler){ + eventManager.unregisterPerNodeDataSubscriber(handler); + } + + void unregisterSharedDataSubscriber(UpdateSharedDataEventHandler handler){ + eventManager.unregisterSharedDataSubscriber(handler); + } } diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java b/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java index 133a79f..d839b2e 100644 --- a/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java +++ b/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java @@ -26,6 +26,8 @@ import org.apache.gossip.Member; import org.apache.gossip.crdt.Crdt; import org.apache.gossip.event.GossipListener; import org.apache.gossip.event.GossipState; +import org.apache.gossip.event.data.UpdateNodeDataEventHandler; +import org.apache.gossip.event.data.UpdateSharedDataEventHandler; import org.apache.gossip.manager.handlers.MessageHandler; import org.apache.gossip.model.PerNodeDataMessage; import org.apache.gossip.model.SharedDataMessage; @@ -348,4 +350,20 @@ public abstract class GossipManager { return new File(manager.getSettings().getPathToDataState(), "pernodedata." + manager.getMyself().getClusterName() + "." + manager.getMyself().getId() + ".json"); } + + public void registerPerNodeDataSubscriber(UpdateNodeDataEventHandler handler){ + gossipCore.registerPerNodeDataSubscriber(handler); + } + + public void registerSharedDataSubscriber(UpdateSharedDataEventHandler handler){ + gossipCore.registerSharedDataSubscriber(handler); + } + + public void unregisterPerNodeDataSubscriber(UpdateNodeDataEventHandler handler){ + gossipCore.unregisterPerNodeDataSubscriber(handler); + } + + public void unregisterSharedDataSubscriber(UpdateSharedDataEventHandler handler){ + gossipCore.unregisterSharedDataSubscriber(handler); + } } diff --git a/gossip-base/src/test/java/org/apache/gossip/event/data/DataEventManagerTest.java b/gossip-base/src/test/java/org/apache/gossip/event/data/DataEventManagerTest.java new file mode 100644 index 0000000..d9d778f --- /dev/null +++ b/gossip-base/src/test/java/org/apache/gossip/event/data/DataEventManagerTest.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.event.data; + +import com.codahale.metrics.MetricRegistry; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.platform.runner.JUnitPlatform; +import org.junit.runner.RunWith; + +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; + +@RunWith(JUnitPlatform.class) +public class DataEventManagerTest { + + private static Semaphore semaphore; + private String receivedNodeId; + private String receivedKey; + private Object receivedNewValue; + private Object receivedOldValue; + + @BeforeClass + public static void setup() { + semaphore = new Semaphore(0); + } + + @Test + public void perNodeDataEventHandlerAddRemoveTest() { + DataEventManager eventManager = new DataEventManager(new MetricRegistry()); + + UpdateNodeDataEventHandler nodeDataEventHandler = (nodeId, key, oldValue, newValue) -> { + }; + + eventManager.registerPerNodeDataSubscriber(nodeDataEventHandler); + Assert.assertEquals(1, eventManager.getPerNodeSubscribersSize()); + eventManager.unregisterPerNodeDataSubscriber(nodeDataEventHandler); + Assert.assertEquals(0, eventManager.getPerNodeSubscribersSize()); + } + + // Test whether the per node data events are fired for matching key + @Test + public void perNodeDataEventHandlerTest() throws InterruptedException { + DataEventManager eventManager = new DataEventManager(new MetricRegistry()); + resetData(); + + // A new subscriber "Juliet" is like to notified when per node data change for the key "Romeo" + UpdateNodeDataEventHandler juliet = (nodeId, key, oldValue, newValue) -> { + if(!key.equals("Romeo")) return; + receivedNodeId = nodeId; + receivedKey = key; + receivedNewValue = newValue; + receivedOldValue = oldValue; + semaphore.release(); + }; + // Juliet register with eventManager + eventManager.registerPerNodeDataSubscriber(juliet); + // Romeo is going to sleep after having dinner + eventManager.notifyPerNodeData("Montague", "Romeo", "sleeping", "eating"); + + // Juliet should notified + semaphore.tryAcquire(2, TimeUnit.SECONDS); + Assert.assertEquals("Montague", receivedNodeId); + Assert.assertEquals("Romeo", receivedKey); + Assert.assertEquals("sleeping", receivedNewValue); + Assert.assertEquals("eating", receivedOldValue); + + eventManager.unregisterPerNodeDataSubscriber(juliet); + } + + @Test + public void sharedDataEventHandlerAddRemoveTest() { + DataEventManager eventManager = new DataEventManager(new MetricRegistry()); + + UpdateSharedDataEventHandler sharedDataEventHandler = (key, oldValue, newValue) -> { + + }; + eventManager.registerSharedDataSubscriber(sharedDataEventHandler); + Assert.assertEquals(1, eventManager.getSharedDataSubscribersSize()); + eventManager.unregisterSharedDataSubscriber(sharedDataEventHandler); + Assert.assertEquals(0, eventManager.getSharedDataSubscribersSize()); + + } + + // Test whether the shared data events are fired + @Test + public void sharedDataEventHandlerTest() throws InterruptedException { + DataEventManager eventManager = new DataEventManager(new MetricRegistry()); + resetData(); + + // A new subscriber "Alice" is like to notified when shared data change for the key "technology" + UpdateSharedDataEventHandler alice = (key, oldValue, newValue) -> { + if(!key.equals("technology")) return; + receivedKey = key; + receivedNewValue = newValue; + receivedOldValue = oldValue; + semaphore.release(); + }; + // Alice register with eventManager + eventManager.registerSharedDataSubscriber(alice); + + // technology key get changed + eventManager.notifySharedData("technology", "Java has lambda", "Java is fast"); + + // Alice should notified + semaphore.tryAcquire(2, TimeUnit.SECONDS); + Assert.assertEquals("technology", receivedKey); + Assert.assertEquals("Java has lambda", receivedNewValue); + Assert.assertEquals("Java is fast", receivedOldValue); + + eventManager.unregisterSharedDataSubscriber(alice); + } + + // Test the MetricRegistry + @Test + public void metricRegistryTest() { + MetricRegistry registry = new MetricRegistry(); + + DataEventManager eventManager = new DataEventManager(registry); + + UpdateNodeDataEventHandler nodeDataEventHandler = (nodeId, key, oldValue, newValue) -> { + }; + + UpdateSharedDataEventHandler sharedDataEventHandler = (key, oldValue, newValue) -> { + }; + + eventManager.registerPerNodeDataSubscriber(nodeDataEventHandler); + eventManager.registerSharedDataSubscriber(sharedDataEventHandler); + + Assert.assertEquals(1, + registry.getGauges().get(DataEventConstants.PER_NODE_DATA_SUBSCRIBERS_SIZE).getValue()); + Assert.assertEquals(0, + registry.getGauges().get(DataEventConstants.PER_NODE_DATA_SUBSCRIBERS_QUEUE_SIZE) + .getValue()); + Assert.assertEquals(1, + registry.getGauges().get(DataEventConstants.SHARED_DATA_SUBSCRIBERS_SIZE).getValue()); + Assert.assertEquals(0, + registry.getGauges().get(DataEventConstants.SHARED_DATA_SUBSCRIBERS_QUEUE_SIZE) + .getValue()); + + } + + private void resetData() { + receivedNodeId = null; + receivedKey = null; + receivedNewValue = null; + receivedOldValue = null; + } + +} diff --git a/gossip-examples/src/main/java/org/apache/gossip/examples/StandAloneNodeCrdtOrSet.java b/gossip-examples/src/main/java/org/apache/gossip/examples/StandAloneNodeCrdtOrSet.java index 7d4db93..78c7782 100644 --- a/gossip-examples/src/main/java/org/apache/gossip/examples/StandAloneNodeCrdtOrSet.java +++ b/gossip-examples/src/main/java/org/apache/gossip/examples/StandAloneNodeCrdtOrSet.java @@ -79,10 +79,21 @@ public class StandAloneNodeCrdtOrSet { } else if (op == 'g') { gcount(val, gossipService); } + if (op == 'l') { + listen(val, gossipService); + } } } } - + + private static void listen(String val, GossipManager gossipManager) { + gossipManager.registerSharedDataSubscriber((key, oldValue, newValue) -> { + if (key.equals(val)) { + System.out.println("Event Handler fired! " + oldValue + " " + newValue); + } + }); + } + private static void gcount(String val, GossipManager gossipManager) { GrowOnlyCounter c = (GrowOnlyCounter) gossipManager.findCrdt("def"); Long l = Long.valueOf(val); diff --git a/gossip-itest/src/test/java/org/apache/gossip/PerNodeDataEventTest.java b/gossip-itest/src/test/java/org/apache/gossip/PerNodeDataEventTest.java new file mode 100644 index 0000000..59136d1 --- /dev/null +++ b/gossip-itest/src/test/java/org/apache/gossip/PerNodeDataEventTest.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip; + +import io.teknek.tunit.TUnit; +import org.apache.gossip.manager.GossipManager; +import org.apache.gossip.manager.GossipManagerBuilder; +import org.apache.gossip.model.PerNodeDataMessage; +import org.junit.Assert; +import org.junit.Test; +import org.junit.platform.runner.JUnitPlatform; +import org.junit.runner.RunWith; + +import java.net.URI; +import java.net.URISyntaxException; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; + +@RunWith(JUnitPlatform.class) +public class PerNodeDataEventTest extends AbstractIntegrationBase { + + private String receivedKey = ""; + private String receivingNodeId = ""; + private Object receivingNodeDataNewValue = ""; + private Object receivingNodeDataOldValue = ""; + private Semaphore lock = new Semaphore(0); + + + @Test + public void perNodeDataEventTest() + throws InterruptedException, UnknownHostException, URISyntaxException { + GossipSettings settings = new GossipSettings(); + settings.setPersistRingState(false); + settings.setPersistDataState(false); + String cluster = UUID.randomUUID().toString(); + int seedNodes = 1; + List startupMembers = new ArrayList<>(); + for (int i = 1; i < seedNodes + 1; ++i) { + URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i)); + startupMembers.add(new RemoteMember(cluster, uri, i + "")); + } + final List clients = new ArrayList<>(); + final int clusterMembers = 2; + for (int i = 1; i < clusterMembers + 1; ++i) { + URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i)); + GossipManager gossipService = GossipManagerBuilder.newBuilder().cluster(cluster).uri(uri) + .id(i + "").gossipMembers(startupMembers).gossipSettings(settings).build(); + clients.add(gossipService); + gossipService.init(); + register(gossipService); + } + + // check whether the members are discovered + TUnit.assertThat(() -> { + int total = 0; + for (int i = 0; i < clusterMembers; ++i) { + total += clients.get(i).getLiveMembers().size(); + } + return total; + }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(2); + + // Adding new data to Node 1 + clients.get(0).gossipPerNodeData(getPerNodeData("category", "distributed")); + + // Node 2 is interested in data changes for the key "organization" and "category" + clients.get(1).registerPerNodeDataSubscriber((nodeId, key, oldValue, newValue) -> { + if (!key.equals("organization") && !key.equals("category")) return; + receivingNodeId = nodeId; + receivedKey = key; + receivingNodeDataOldValue = oldValue; + receivingNodeDataNewValue = newValue; + lock.release(); + }); + + // Node 2 first time adds Node 1 data + lock.tryAcquire(10, TimeUnit.SECONDS); + Assert.assertEquals("1", receivingNodeId); + Assert.assertEquals("category", receivedKey); + Assert.assertEquals(null, receivingNodeDataOldValue); + Assert.assertEquals("distributed", receivingNodeDataNewValue); + + // Node 1 adds new per node data + clients.get(0).gossipPerNodeData(getPerNodeData("organization", "apache")); + // Node 2 adds new data key from Node 1 + lock.tryAcquire(10, TimeUnit.SECONDS); + Assert.assertEquals("1", receivingNodeId); + Assert.assertEquals("organization", receivedKey); + Assert.assertEquals(null, receivingNodeDataOldValue); + Assert.assertEquals("apache", receivingNodeDataNewValue); + + // Node 1 updates its value + clients.get(0).gossipPerNodeData(getPerNodeData("organization", "apache-gossip")); + // Node 2 updates existing value + lock.tryAcquire(10, TimeUnit.SECONDS); + Assert.assertEquals("1", receivingNodeId); + Assert.assertEquals("organization", receivedKey); + Assert.assertEquals("apache", receivingNodeDataOldValue); + Assert.assertEquals("apache-gossip", receivingNodeDataNewValue); + + } + + private PerNodeDataMessage getPerNodeData(String key, String value) { + PerNodeDataMessage g = new PerNodeDataMessage(); + g.setExpireAt(Long.MAX_VALUE); + g.setKey(key); + g.setPayload(value); + g.setTimestamp(System.currentTimeMillis()); + return g; + } + +} diff --git a/gossip-itest/src/test/java/org/apache/gossip/SharedDataEventTest.java b/gossip-itest/src/test/java/org/apache/gossip/SharedDataEventTest.java new file mode 100644 index 0000000..56f1657 --- /dev/null +++ b/gossip-itest/src/test/java/org/apache/gossip/SharedDataEventTest.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip; + +import io.teknek.tunit.TUnit; +import org.apache.gossip.crdt.GrowOnlyCounter; +import org.apache.gossip.manager.GossipManager; +import org.apache.gossip.manager.GossipManagerBuilder; +import org.apache.gossip.model.SharedDataMessage; +import org.junit.Assert; +import org.junit.Test; +import org.junit.platform.runner.JUnitPlatform; +import org.junit.runner.RunWith; + +import java.net.URI; +import java.net.URISyntaxException; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; + +@RunWith(JUnitPlatform.class) +public class SharedDataEventTest extends AbstractIntegrationBase { + + private String receivedKey = ""; + private Object receivingNodeDataNewValue = ""; + private Object receivingNodeDataOldValue = ""; + private String gCounterKey = "gCounter"; + private Semaphore lock = new Semaphore(0); + + @Test + public void sharedDataEventTest() + throws InterruptedException, UnknownHostException, URISyntaxException { + GossipSettings settings = new GossipSettings(); + settings.setPersistRingState(false); + settings.setPersistDataState(false); + String cluster = UUID.randomUUID().toString(); + int seedNodes = 1; + List startupMembers = new ArrayList<>(); + for (int i = 1; i < seedNodes + 1; ++i) { + URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i)); + startupMembers.add(new RemoteMember(cluster, uri, i + "")); + } + final List clients = new ArrayList<>(); + final int clusterMembers = 2; + for (int i = 1; i < clusterMembers + 1; ++i) { + URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i)); + GossipManager gossipService = GossipManagerBuilder.newBuilder().cluster(cluster).uri(uri) + .id(i + "").gossipMembers(startupMembers).gossipSettings(settings).build(); + clients.add(gossipService); + gossipService.init(); + register(gossipService); + } + + // check whether the members are discovered + TUnit.assertThat(() -> { + int total = 0; + for (int i = 0; i < clusterMembers; ++i) { + total += clients.get(i).getLiveMembers().size(); + } + return total; + }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(2); + + // Adding new data to Node 1 + clients.get(0).gossipSharedData(sharedNodeData("category", "distributed")); + + // Node 2 is interested in data changes for the key "organization" and "category" + clients.get(1).registerSharedDataSubscriber((key, oldValue, newValue) -> { + if (!key.equals("organization") && !key.equals("category")) + return; + receivedKey = key; + receivingNodeDataOldValue = oldValue; + receivingNodeDataNewValue = newValue; + lock.release(); + }); + + // Node 2 first time gets shared data + lock.tryAcquire(10, TimeUnit.SECONDS); + Assert.assertEquals("category", receivedKey); + Assert.assertEquals(null, receivingNodeDataOldValue); + Assert.assertEquals("distributed", receivingNodeDataNewValue); + + // Node 1 adds new per node data + clients.get(0).gossipSharedData(sharedNodeData("organization", "apache")); + // Node 2 adds new shared data + lock.tryAcquire(10, TimeUnit.SECONDS); + Assert.assertEquals("organization", receivedKey); + Assert.assertEquals(null, receivingNodeDataOldValue); + Assert.assertEquals("apache", receivingNodeDataNewValue); + + // Node 1 updates its value + clients.get(0).gossipSharedData(sharedNodeData("organization", "apache-gossip")); + + // Node 2 updates existing value + lock.tryAcquire(10, TimeUnit.SECONDS); + Assert.assertEquals("organization", receivedKey); + Assert.assertEquals("apache", receivingNodeDataOldValue); + Assert.assertEquals("apache-gossip", receivingNodeDataNewValue); + + } + + @Test + public void CrdtDataChangeEventTest() + throws InterruptedException, UnknownHostException, URISyntaxException { + GossipSettings settings = new GossipSettings(); + settings.setPersistRingState(false); + settings.setPersistDataState(false); + String cluster = UUID.randomUUID().toString(); + int seedNodes = 1; + List startupMembers = new ArrayList<>(); + for (int i = 1; i < seedNodes + 1; ++i) { + URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i)); + startupMembers.add(new RemoteMember(cluster, uri, i + "")); + } + final List clients = new ArrayList<>(); + final int clusterMembers = 3; + for (int i = 1; i < clusterMembers + 1; ++i) { + URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i)); + GossipManager gossipService = GossipManagerBuilder.newBuilder().cluster(cluster).uri(uri) + .id(i + "").gossipMembers(startupMembers).gossipSettings(settings).build(); + clients.add(gossipService); + gossipService.init(); + register(gossipService); + } + + // check whether the members are discovered + TUnit.assertThat(() -> { + int total = 0; + for (int i = 0; i < clusterMembers; ++i) { + total += clients.get(i).getLiveMembers().size(); + } + return total; + }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(2); + + clients.get(1).registerSharedDataSubscriber((key, oldValue, newValue) -> { + receivedKey = key; + receivingNodeDataOldValue = oldValue; + receivingNodeDataNewValue = newValue; + lock.release(); + }); + + // Add initial gCounter to Node 1 + SharedDataMessage d = new SharedDataMessage(); + d.setKey(gCounterKey); + d.setPayload(new GrowOnlyCounter(new GrowOnlyCounter.Builder(clients.get(0)).increment(1L))); + d.setExpireAt(Long.MAX_VALUE); + d.setTimestamp(System.currentTimeMillis()); + clients.get(0).merge(d); + + // Check if initial Crdt received + lock.tryAcquire(10, TimeUnit.SECONDS); + Assert.assertEquals("gCounter", receivedKey); + Assert.assertEquals(null, receivingNodeDataOldValue); + Assert.assertTrue(receivingNodeDataNewValue instanceof GrowOnlyCounter); + Assert.assertEquals(1, ((GrowOnlyCounter) receivingNodeDataNewValue).value().longValue()); + + // Node 3 Updates the gCounter by 4 + GrowOnlyCounter gc = (GrowOnlyCounter) clients.get(2).findCrdt(gCounterKey); + GrowOnlyCounter gcNew = new GrowOnlyCounter(gc, + new GrowOnlyCounter.Builder(clients.get(2)).increment(4L)); + + d = new SharedDataMessage(); + d.setKey(gCounterKey); + d.setPayload(gcNew); + d.setExpireAt(Long.MAX_VALUE); + d.setTimestamp(System.currentTimeMillis()); + clients.get(2).merge(d); + + // Check if Node 3's Crdt update is received in Node 2 event handler + lock.tryAcquire(10, TimeUnit.SECONDS); + Assert.assertEquals("gCounter", receivedKey); + Assert.assertTrue(receivingNodeDataOldValue instanceof GrowOnlyCounter); + Assert.assertEquals(1, ((GrowOnlyCounter) receivingNodeDataOldValue).value().longValue()); + Assert.assertTrue(receivingNodeDataNewValue instanceof GrowOnlyCounter); + Assert.assertEquals(5, ((GrowOnlyCounter) receivingNodeDataNewValue).value().longValue()); + + } + + private SharedDataMessage sharedNodeData(String key, String value) { + SharedDataMessage g = new SharedDataMessage(); + g.setExpireAt(Long.MAX_VALUE); + g.setKey(key); + g.setPayload(value); + g.setTimestamp(System.currentTimeMillis()); + return g; + } + +}