diff --git a/gossip-base/pom.xml b/gossip-base/pom.xml
index 34f346c..e72a455 100644
--- a/gossip-base/pom.xml
+++ b/gossip-base/pom.xml
@@ -73,6 +73,13 @@
+
+ org.mockito
+ mockito-core
+ ${mockito.version}
+ test
+
+
diff --git a/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtModule.java b/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtModule.java
index bb1a052..1c95b28 100644
--- a/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtModule.java
+++ b/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtModule.java
@@ -61,6 +61,13 @@ abstract class GrowOnlyCounterMixin {
@JsonProperty("counters") abstract Map getCounters();
}
+abstract class PNCounterMixin {
+ @JsonCreator
+ PNCounterMixin(@JsonProperty("p-counters") Map up, @JsonProperty("n-counters") Map down) { }
+ @JsonProperty("p-counters") abstract Map getPCounters();
+ @JsonProperty("n-counters") abstract Map getNCounters();
+}
+
//If anyone wants to take a stab at this. please have at it
//https://github.com/FasterXML/jackson-datatype-guava/blob/master/src/main/java/com/fasterxml/jackson/datatype/guava/ser/MultimapSerializer.java
public class CrdtModule extends SimpleModule {
@@ -76,6 +83,7 @@ public class CrdtModule extends SimpleModule {
context.setMixInAnnotations(OrSet.class, OrSetMixin.class);
context.setMixInAnnotations(GrowOnlySet.class, GrowOnlySetMixin.class);
context.setMixInAnnotations(GrowOnlyCounter.class, GrowOnlyCounterMixin.class);
+ context.setMixInAnnotations(PNCounter.class, PNCounterMixin.class);
context.setMixInAnnotations(LWWSet.class, LWWSetMixin.class);
context.setMixInAnnotations(LWWSet.Timestamps.class, LWWSetTimestampsMixin.class);
}
diff --git a/gossip-base/src/main/java/org/apache/gossip/crdt/PNCounter.java b/gossip-base/src/main/java/org/apache/gossip/crdt/PNCounter.java
new file mode 100644
index 0000000..f00a5f1
--- /dev/null
+++ b/gossip-base/src/main/java/org/apache/gossip/crdt/PNCounter.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.crdt;
+
+import java.util.Map;
+
+import org.apache.gossip.manager.GossipManager;
+
+public class PNCounter implements CrdtCounter {
+
+ private final GrowOnlyCounter pCount;
+
+ private final GrowOnlyCounter nCount;
+
+ PNCounter(Map pCounters, Map nCounters) {
+ pCount = new GrowOnlyCounter(pCounters);
+ nCount = new GrowOnlyCounter(nCounters);
+ }
+
+ public PNCounter(PNCounter starter, Builder builder) {
+ GrowOnlyCounter.Builder pBuilder = builder.makeGrowOnlyCounterBuilder(builder.pCount());
+ pCount = new GrowOnlyCounter(starter.pCount, pBuilder);
+ GrowOnlyCounter.Builder nBuilder = builder.makeGrowOnlyCounterBuilder(builder.nCount());
+ nCount = new GrowOnlyCounter(starter.nCount, nBuilder);
+ }
+
+ public PNCounter(Builder builder) {
+ GrowOnlyCounter.Builder pBuilder = builder.makeGrowOnlyCounterBuilder(builder.pCount());
+ pCount = new GrowOnlyCounter(pBuilder);
+ GrowOnlyCounter.Builder nBuilder = builder.makeGrowOnlyCounterBuilder(builder.nCount());
+ nCount = new GrowOnlyCounter(nBuilder);
+ }
+
+ public PNCounter(GossipManager manager) {
+ pCount = new GrowOnlyCounter(manager);
+ nCount = new GrowOnlyCounter(manager);
+ }
+
+ public PNCounter(PNCounter starter, PNCounter other) {
+ pCount = new GrowOnlyCounter(starter.pCount, other.pCount);
+ nCount = new GrowOnlyCounter(starter.nCount, other.nCount);
+ }
+
+ @Override
+ public PNCounter merge(PNCounter other) {
+ return new PNCounter(this, other);
+ }
+
+ @Override
+ public Long value() {
+ long pValue = (long) pCount.value();
+ long nValue = (long) nCount.value();
+ return pValue - nValue;
+ }
+
+ @Override
+ public PNCounter optimize() {
+ return new PNCounter(pCount.getCounters(), nCount.getCounters());
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (getClass() != obj.getClass())
+ return false;
+ PNCounter other = (PNCounter) obj;
+ return value().longValue() == other.value().longValue();
+ }
+
+ @Override
+ public String toString() {
+ return "PnCounter [pCount=" + pCount + ", nCount=" + nCount + ", value=" + value() + "]";
+ }
+
+ Map getPCounters() {
+ return pCount.getCounters();
+ }
+
+ Map getNCounters() {
+ return nCount.getCounters();
+ }
+
+ public static class Builder {
+
+ private final GossipManager myManager;
+
+ private long value = 0L;
+
+ public Builder(GossipManager gossipManager) {
+ myManager = gossipManager;
+ }
+
+ public long pCount() {
+ if (value > 0) {
+ return value;
+ }
+ return 0;
+ }
+
+ public long nCount() {
+ if (value < 0) {
+ return -value;
+ }
+ return 0;
+ }
+
+ public org.apache.gossip.crdt.GrowOnlyCounter.Builder makeGrowOnlyCounterBuilder(long value) {
+ org.apache.gossip.crdt.GrowOnlyCounter.Builder ret = new org.apache.gossip.crdt.GrowOnlyCounter.Builder(
+ myManager);
+ ret.increment(value);
+ return ret;
+ }
+
+ public PNCounter.Builder increment(long delta) {
+ value += delta;
+ return this;
+ }
+
+ public PNCounter.Builder decrement(long delta) {
+ value -= delta;
+ return this;
+ }
+ }
+
+}
diff --git a/gossip-base/src/test/java/org/apache/gossip/crdt/PNCounterTest.java b/gossip-base/src/test/java/org/apache/gossip/crdt/PNCounterTest.java
new file mode 100644
index 0000000..8128cde
--- /dev/null
+++ b/gossip-base/src/test/java/org/apache/gossip/crdt/PNCounterTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.crdt;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.gossip.LocalMember;
+import org.apache.gossip.manager.GossipManager;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class PNCounterTest {
+
+ private List mockManagers;
+
+ @Before
+ public void setupMocks() {
+ GossipManager manager1 = mock(GossipManager.class);
+ LocalMember mockMember1 = mock(LocalMember.class);
+ when(mockMember1.getId()).thenReturn("x");
+ when(manager1.getMyself()).thenReturn(mockMember1);
+
+ GossipManager manager2 = mock(GossipManager.class);
+ LocalMember mockMember2 = mock(LocalMember.class);
+ when(mockMember2.getId()).thenReturn("y");
+ when(manager2.getMyself()).thenReturn(mockMember2);
+
+ GossipManager manager3 = mock(GossipManager.class);
+ LocalMember mockMember3 = mock(LocalMember.class);
+ when(mockMember3.getId()).thenReturn("z");
+ when(manager3.getMyself()).thenReturn(mockMember3);
+
+ mockManagers = new ArrayList();
+ mockManagers.add(manager1);
+ mockManagers.add(manager2);
+ mockManagers.add(manager3);
+ }
+
+ @Test
+ public void existanceTest() {
+ PNCounter counter = new PNCounter(mockManagers.get(0));
+ Assert.assertEquals(0, (long) counter.value());
+ }
+
+ @Test
+ public void localOperationTest() {
+ PNCounter counter = new PNCounter(mockManagers.get(0));
+ Assert.assertEquals(0, (long) counter.value());
+
+ counter = new PNCounter(counter, new PNCounter.Builder(mockManagers.get(0)).increment(5L));
+ Assert.assertEquals(5, (long) counter.value());
+
+ counter = new PNCounter(counter, new PNCounter.Builder(mockManagers.get(0)).increment(4L));
+ Assert.assertEquals(9, (long) counter.value());
+
+ counter = new PNCounter(counter, new PNCounter.Builder(mockManagers.get(0)).decrement(3L));
+ Assert.assertEquals(6, (long) counter.value());
+
+ counter = new PNCounter(counter, new PNCounter.Builder(mockManagers.get(0)).decrement(12L));
+ Assert.assertEquals(-6, (long) counter.value());
+ }
+
+ @Test
+ public void oddballLocalOperationTest() {
+ PNCounter counter = new PNCounter(mockManagers.get(0));
+ Assert.assertEquals(0, (long) counter.value());
+
+ counter = new PNCounter(counter, new PNCounter.Builder(mockManagers.get(0)).increment(-5L));
+ Assert.assertEquals(-5, (long) counter.value());
+
+ counter = new PNCounter(counter, new PNCounter.Builder(mockManagers.get(0)).increment(4L));
+ Assert.assertEquals(-1, (long) counter.value());
+
+ counter = new PNCounter(counter, new PNCounter.Builder(mockManagers.get(0)).decrement(-3L));
+ Assert.assertEquals(2, (long) counter.value());
+
+ counter = new PNCounter(counter, new PNCounter.Builder(mockManagers.get(0)).decrement(-12L));
+ Assert.assertEquals(14, (long) counter.value());
+ }
+
+ @Test
+ public void networkLikeOperations() {
+ PNCounter counter1 = new PNCounter(mockManagers.get(0));
+ PNCounter counter2 = new PNCounter(mockManagers.get(1));
+ PNCounter counter3 = new PNCounter(mockManagers.get(2));
+
+ Assert.assertEquals(0, (long) counter1.value());
+ Assert.assertEquals(0, (long) counter2.value());
+ Assert.assertEquals(0, (long) counter3.value());
+
+ counter1 = new PNCounter(counter1, new PNCounter.Builder(mockManagers.get(0)).increment(3L));
+ Assert.assertEquals(3, (long) counter1.value());
+
+ counter2 = new PNCounter(counter2, new PNCounter.Builder(mockManagers.get(1)).increment(5L));
+ Assert.assertEquals(5, (long) counter2.value());
+
+ counter3 = new PNCounter(counter3, new PNCounter.Builder(mockManagers.get(2)).decrement(7L));
+ Assert.assertEquals(-7, (long) counter3.value());
+
+ // 2 becomes 2 and 1
+ counter2 = counter2.merge(counter1);
+ Assert.assertEquals(8, (long) counter2.value());
+
+ // 3 becomes 3 and 1
+ counter3 = counter3.merge(counter1);
+ Assert.assertEquals(-4, (long) counter3.value());
+
+ // 3 becomes all
+ counter3 = counter3.merge(counter2);
+ Assert.assertEquals(1, (long) counter3.value());
+
+ // 2 becomes all - different order
+ counter2 = counter2.merge(counter3);
+ Assert.assertEquals(1, (long) counter3.value());
+ }
+
+}
diff --git a/gossip-examples/src/main/java/org/apache/gossip/examples/StandAlonePNCounter.java b/gossip-examples/src/main/java/org/apache/gossip/examples/StandAlonePNCounter.java
new file mode 100644
index 0000000..b0015be
--- /dev/null
+++ b/gossip-examples/src/main/java/org/apache/gossip/examples/StandAlonePNCounter.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gossip.examples;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.gossip.GossipSettings;
+import org.apache.gossip.LocalMember;
+import org.apache.gossip.RemoteMember;
+import org.apache.gossip.crdt.PNCounter;
+import org.apache.gossip.manager.GossipManager;
+import org.apache.gossip.manager.GossipManagerBuilder;
+import org.apache.gossip.model.SharedDataMessage;
+
+public class StandAlonePNCounter {
+ private static ExampleCommon common = new ExampleCommon();
+ private static String lastInput = "{None}";
+
+ public static void main(String[] args) throws InterruptedException, IOException {
+ args = common.checkArgsForClearFlag(args);
+ GossipSettings s = new GossipSettings();
+ s.setWindowSize(1000);
+ s.setGossipInterval(100);
+ GossipManager gossipService = GossipManagerBuilder
+ .newBuilder()
+ .cluster("mycluster")
+ .uri(URI.create(args[0])).id(args[1])
+ .gossipMembers(
+ Arrays.asList(new RemoteMember("mycluster", URI.create(args[2]), args[3])))
+ .gossipSettings(s)
+ .build();
+ gossipService.init();
+
+ new Thread(() -> {
+ while (true) {
+ common.optionallyClearTerminal();
+ printLiveMembers(gossipService);
+ printDeadMambers(gossipService);
+ printValues(gossipService);
+ try {
+ Thread.sleep(2000);
+ } catch (Exception ignore) {
+ }
+ }
+ }).start();
+
+ String line = null;
+ try (BufferedReader br = new BufferedReader(new InputStreamReader(System.in))) {
+ while ((line = br.readLine()) != null) {
+ System.out.println(line);
+ char op = line.charAt(0);
+ char blank = line.charAt(1);
+ String val = line.substring(2);
+ Long l = null;
+ boolean valid = true;
+ try {
+ l = Long.valueOf(val);
+ } catch (NumberFormatException ex) {
+ valid = false;
+ }
+ valid = valid &&
+ (
+ (blank == ' ') &&
+ ((op == 'i') || (op == 'd'))
+ );
+ if (valid) {
+ if (op == 'i') {
+ increment(l, gossipService);
+ } else if (op == 'd') {
+ decrement(l, gossipService);
+ }
+ }
+ setLastInput(line,valid);
+ }
+ }
+ }
+
+ private static void printValues(GossipManager gossipService) {
+ System.out.println("Last Input: " + getLastInput());
+ System.out.println("---------- " + (gossipService.findCrdt("myPNCounter") == null ? ""
+ : gossipService.findCrdt("myPNCounter").value()));
+ System.out.println("********** " + gossipService.findCrdt("myPNCounter"));
+ }
+
+ private static void printDeadMambers(GossipManager gossipService) {
+ List members = gossipService.getDeadMembers();
+ if (members.isEmpty()) {
+ System.out.println("Dead: (none)");
+ return;
+ }
+ System.out.println("Dead: " + members.get(0));
+ for (int i = 1; i < members.size(); i++) {
+ System.out.println(" : " + members.get(i));
+ }
+ }
+
+ private static void printLiveMembers(GossipManager gossipService) {
+ List members = gossipService.getLiveMembers();
+ if (members.isEmpty()) {
+ System.out.println("Live: (none)");
+ return;
+ }
+ System.out.println("Live: " + members.get(0));
+ for (int i = 1; i < members.size(); i++) {
+ System.out.println(" : " + members.get(i));
+ }
+ }
+
+ private static void increment(Long l, GossipManager gossipManager) {
+ PNCounter c = (PNCounter) gossipManager.findCrdt("myPNCounter");
+ if (c == null) {
+ c = new PNCounter(new PNCounter.Builder(gossipManager).increment((l)));
+ } else {
+ c = new PNCounter(c, new PNCounter.Builder(gossipManager).increment((l)));
+ }
+ SharedDataMessage m = new SharedDataMessage();
+ m.setExpireAt(Long.MAX_VALUE);
+ m.setKey("myPNCounter");
+ m.setPayload(c);
+ m.setTimestamp(System.currentTimeMillis());
+ gossipManager.merge(m);
+ }
+
+ private static void decrement(Long l, GossipManager gossipManager) {
+ PNCounter c = (PNCounter) gossipManager.findCrdt("myPNCounter");
+ if (c == null) {
+ c = new PNCounter(new PNCounter.Builder(gossipManager).decrement((l)));
+ } else {
+ c = new PNCounter(c, new PNCounter.Builder(gossipManager).decrement((l)));
+ }
+ SharedDataMessage m = new SharedDataMessage();
+ m.setExpireAt(Long.MAX_VALUE);
+ m.setKey("myPNCounter");
+ m.setPayload(c);
+ m.setTimestamp(System.currentTimeMillis());
+ gossipManager.merge(m);
+ }
+
+ private static void setLastInput(String input, boolean valid) {
+ lastInput = input;
+ if (! valid) {
+ lastInput += " (invalid)";
+ }
+ }
+
+ private static String getLastInput() {
+ return lastInput;
+ }
+
+}
\ No newline at end of file
diff --git a/gossip-itest/src/test/java/org/apache/gossip/DataTest.java b/gossip-itest/src/test/java/org/apache/gossip/DataTest.java
index 53408f8..e91426c 100644
--- a/gossip-itest/src/test/java/org/apache/gossip/DataTest.java
+++ b/gossip-itest/src/test/java/org/apache/gossip/DataTest.java
@@ -17,55 +17,65 @@
*/
package org.apache.gossip;
-import io.teknek.tunit.TUnit;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
import org.apache.gossip.crdt.GrowOnlyCounter;
import org.apache.gossip.crdt.GrowOnlySet;
import org.apache.gossip.crdt.LWWSet;
import org.apache.gossip.crdt.OrSet;
+import org.apache.gossip.crdt.PNCounter;
import org.apache.gossip.manager.GossipManager;
import org.apache.gossip.manager.GossipManagerBuilder;
import org.apache.gossip.model.PerNodeDataMessage;
import org.apache.gossip.model.SharedDataMessage;
-import org.junit.Assert;
import org.junit.Test;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.UnknownHostException;
-import java.util.*;
-import java.util.concurrent.TimeUnit;
+import io.teknek.tunit.TUnit;
public class DataTest extends AbstractIntegrationBase {
private String orSetKey = "cror";
private String lwwSetKey = "crlww";
private String gCounterKey = "crdtgc";
+ private String pnCounterKey = "crdtpn";
@Test
- public void dataTest() throws InterruptedException, UnknownHostException, URISyntaxException{
+ public void dataTest() throws InterruptedException, UnknownHostException, URISyntaxException {
GossipSettings settings = new GossipSettings();
settings.setPersistRingState(false);
settings.setPersistDataState(false);
String cluster = UUID.randomUUID().toString();
int seedNodes = 1;
List startupMembers = new ArrayList<>();
- for (int i = 1; i < seedNodes + 1; ++i){
+ for (int i = 1; i < seedNodes + 1; ++i) {
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
startupMembers.add(new RemoteMember(cluster, uri, i + ""));
}
final List clients = new ArrayList<>();
final int clusterMembers = 2;
- for (int i = 1; i < clusterMembers + 1; ++i){
+ for (int i = 1; i < clusterMembers + 1; ++i) {
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
- GossipManager gossipService = GossipManagerBuilder.newBuilder().cluster(cluster).uri(uri)
- .id(i + "").gossipMembers(startupMembers).gossipSettings(settings).build();
+ GossipManager gossipService = GossipManagerBuilder
+ .newBuilder()
+ .cluster(cluster).uri(uri)
+ .id(i + "")
+ .gossipMembers(startupMembers)
+ .gossipSettings(settings).build();
clients.add(gossipService);
gossipService.init();
register(gossipService);
}
TUnit.assertThat(() -> {
int total = 0;
- for (int i = 0; i < clusterMembers; ++i){
+ for (int i = 0; i < clusterMembers; ++i) {
total += clients.get(i).getLiveMembers().size();
}
return total;
@@ -89,105 +99,173 @@ public class DataTest extends AbstractIntegrationBase {
return x.getPayload();
}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("c");
-
givenDifferentDatumsInSet(clients);
assertThatListIsMerged(clients);
testOrSet(clients);
testLWWSet(clients);
- // test g counter
- givenDifferentIncrement(clients);
- assertThatCountIsUpdated(clients, 3);
- givenIncreaseOther(clients);
- assertThatCountIsUpdated(clients, 7);
+ testGrowOnlyCounter(clients);
+ testPNCounter(clients);
- for (int i = 0; i < clusterMembers; ++i){
+ for (int i = 0; i < clusterMembers; ++i) {
clients.get(i).shutdown();
}
}
- private void testOrSet(final List clients){
- //populate
+ private void testOrSet(final List clients) {
+ // populate
clients.get(0).merge(generateSharedMsg(orSetKey, new OrSet<>("1", "2")));
clients.get(1).merge(generateSharedMsg(orSetKey, new OrSet<>("3", "4")));
- //assert merge
+ // assert merge
assertMerged(clients.get(0), orSetKey, new OrSet<>("1", "2", "3", "4").value());
assertMerged(clients.get(1), orSetKey, new OrSet<>("1", "2", "3", "4").value());
- //drop element
+ // drop element
@SuppressWarnings("unchecked")
OrSet o = (OrSet) clients.get(0).findCrdt(orSetKey);
OrSet o2 = new OrSet<>(o, new OrSet.Builder().remove("3"));
clients.get(0).merge(generateSharedMsg(orSetKey, o2));
- //assert deletion
+ // assert deletion
assertMerged(clients.get(0), orSetKey, new OrSet<>("1", "2", "4").value());
assertMerged(clients.get(1), orSetKey, new OrSet<>("1", "2", "4").value());
}
- private void testLWWSet(final List clients){
- //populate
+ private void testLWWSet(final List clients) {
+ // populate
clients.get(0).merge(generateSharedMsg(lwwSetKey, new LWWSet<>("1", "2")));
clients.get(1).merge(generateSharedMsg(lwwSetKey, new LWWSet<>("3", "4")));
- //assert merge
+ // assert merge
assertMerged(clients.get(0), lwwSetKey, new LWWSet<>("1", "2", "3", "4").value());
assertMerged(clients.get(1), lwwSetKey, new LWWSet<>("1", "2", "3", "4").value());
- //drop element
+ // drop element
@SuppressWarnings("unchecked")
LWWSet lww = (LWWSet) clients.get(0).findCrdt(lwwSetKey);
clients.get(0).merge(generateSharedMsg(lwwSetKey, lww.remove("3")));
- //assert deletion
+ // assert deletion
assertMerged(clients.get(0), lwwSetKey, new OrSet<>("1", "2", "4").value());
assertMerged(clients.get(1), lwwSetKey, new OrSet<>("1", "2", "4").value());
}
- private void givenDifferentIncrement(final List clients){
+ private void testGrowOnlyCounter(List clients) {
+ givenDifferentIncrement(clients);
+ assertThatCountIsUpdated(clients, 3);
+ givenIncreaseOther(clients);
+ assertThatCountIsUpdated(clients, 7);
+ }
+
+ private void testPNCounter(List clients) {
+ givenPNCounter(clients);
+ assertThatPNCounterSettlesAt(clients, 0);
+ int[] delta1 = { 2, 3 };
+ givenPNCounterUpdate(clients, delta1);
+ assertThatPNCounterSettlesAt(clients, 5);
+ int[] delta2 = { -3, 5 };
+ givenPNCounterUpdate(clients, delta2);
+ assertThatPNCounterSettlesAt(clients, 7);
+ int[] delta3 = { 1, 1 };
+ givenPNCounterUpdate(clients, delta3);
+ assertThatPNCounterSettlesAt(clients, 9);
+ int[] delta4 = { 1, -7 };
+ givenPNCounterUpdate(clients, delta4);
+ assertThatPNCounterSettlesAt(clients, 3);
+ }
+
+ private void givenDifferentIncrement(final List clients) {
Object payload = new GrowOnlyCounter(new GrowOnlyCounter.Builder(clients.get(0)).increment(1L));
clients.get(0).merge(generateSharedMsg(gCounterKey, payload));
payload = new GrowOnlyCounter(new GrowOnlyCounter.Builder(clients.get(1)).increment(2L));
clients.get(1).merge(generateSharedMsg(gCounterKey, payload));
}
- private void givenIncreaseOther(final List clients){
+ private void givenIncreaseOther(final List clients) {
GrowOnlyCounter gc = (GrowOnlyCounter) clients.get(1).findCrdt(gCounterKey);
GrowOnlyCounter gc2 = new GrowOnlyCounter(gc,
- new GrowOnlyCounter.Builder(clients.get(1)).increment(4L));
+ new GrowOnlyCounter.Builder(clients.get(1)).increment(4L));
clients.get(1).merge(generateSharedMsg(gCounterKey, gc2));
}
- private void assertMerged(final GossipManager client, String key, final Set expected){
- TUnit.assertThat(() -> client.findCrdt(key).value())
- .afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(expected);
+ private void assertMerged(final GossipManager client, String key, final Set expected) {
+ TUnit.assertThat(() -> client.findCrdt(key).value()).afterWaitingAtMost(10, TimeUnit.SECONDS)
+ .isEqualTo(expected);
}
- private void givenDifferentDatumsInSet(final List clients){
+ private void givenDifferentDatumsInSet(final List clients) {
clients.get(0).merge(CrdtMessage("1"));
clients.get(1).merge(CrdtMessage("2"));
}
-
- private void assertThatCountIsUpdated(final List clients, long finalCount){
+ private void assertThatCountIsUpdated(final List clients, long finalCount) {
TUnit.assertThat(() -> clients.get(0).findCrdt(gCounterKey))
- .afterWaitingAtMost(10, TimeUnit.SECONDS)
- .isEqualTo(new GrowOnlyCounter(new GrowOnlyCounter.Builder(clients.get(0)).increment(finalCount)));
+ .afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(new GrowOnlyCounter(
+ new GrowOnlyCounter.Builder(clients.get(0)).increment(finalCount)));
}
- private void assertThatListIsMerged(final List clients){
- TUnit.assertThat(() -> clients.get(0).findCrdt("cr"))
- .afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(new GrowOnlySet<>(Arrays.asList("1", "2")));
+ private void assertThatListIsMerged(final List clients) {
+ TUnit.assertThat(() -> clients.get(0).findCrdt("cr")).afterWaitingAtMost(10, TimeUnit.SECONDS)
+ .isEqualTo(new GrowOnlySet<>(Arrays.asList("1", "2")));
+ }
+
+ private void givenPNCounter(List clients) {
+ {
+ SharedDataMessage d = new SharedDataMessage();
+ d.setKey(pnCounterKey);
+ d.setPayload(new PNCounter(new PNCounter.Builder(clients.get(0))));
+ d.setExpireAt(Long.MAX_VALUE);
+ d.setTimestamp(System.currentTimeMillis());
+ clients.get(0).merge(d);
+ }
+ {
+ SharedDataMessage d = new SharedDataMessage();
+ d.setKey(pnCounterKey);
+ d.setPayload(new PNCounter(new PNCounter.Builder(clients.get(1))));
+ d.setExpireAt(Long.MAX_VALUE);
+ d.setTimestamp(System.currentTimeMillis());
+ clients.get(1).merge(d);
+ }
}
- private SharedDataMessage CrdtMessage(String item){
+ private void givenPNCounterUpdate(List clients, int[] deltaArray) {
+ int clientIndex = 0;
+ for (int delta: deltaArray) {
+ PNCounter c = (PNCounter) clients.get(clientIndex).findCrdt(pnCounterKey);
+ c = new PNCounter(c, new PNCounter.Builder(clients.get(clientIndex)).increment(((long)delta)));
+ SharedDataMessage d = new SharedDataMessage();
+ d.setKey(pnCounterKey);
+ d.setPayload(c);
+ d.setExpireAt(Long.MAX_VALUE);
+ d.setTimestamp(System.currentTimeMillis());
+ clients.get(clientIndex).merge(d);
+ clientIndex = (clientIndex + 1) % clients.size();
+ }
+ }
+
+ private void assertThatPNCounterSettlesAt(List clients, long expectedValue) {
+ for (GossipManager client: clients) {
+ TUnit.assertThat(() -> {
+ long value = 0;
+ Object o = client.findCrdt(pnCounterKey);
+ if (o != null) {
+ PNCounter c = (PNCounter)o;
+ value = c.value();
+ }
+ return value;
+ }).afterWaitingAtMost(10, TimeUnit.SECONDS)
+ .isEqualTo(expectedValue);
+ }
+ }
+
+ private SharedDataMessage CrdtMessage(String item) {
return generateSharedMsg("cr", new GrowOnlySet<>(Arrays.asList(item)));
}
- private PerNodeDataMessage generatePerNodeMsg(String key, Object payload){
+ private PerNodeDataMessage generatePerNodeMsg(String key, Object payload) {
PerNodeDataMessage g = new PerNodeDataMessage();
g.setExpireAt(Long.MAX_VALUE);
g.setKey(key);
@@ -196,7 +274,7 @@ public class DataTest extends AbstractIntegrationBase {
return g;
}
- private SharedDataMessage generateSharedMsg(String key, Object payload){
+ private SharedDataMessage generateSharedMsg(String key, Object payload) {
SharedDataMessage d = new SharedDataMessage();
d.setKey(key);
d.setPayload(payload);
diff --git a/pom.xml b/pom.xml
index 8740319..75a54a4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -32,7 +32,8 @@
4.12.0-M2
1.2.17
0.0.0
-
+ 2.8.9
+
3.5.1
2.10
@@ -116,6 +117,12 @@
${tunit.version}
test
+
+ org.mockito
+ mockito-core
+ ${mockito.version}
+ test
+
@@ -200,6 +207,8 @@
eclipse_template.xml
**/*.mycluster.*.json
+
+ **/*.log