From 49cdac62a2fe7b26b5a070c62793d3ec4e2c42b5 Mon Sep 17 00:00:00 2001 From: Terry Weymouth Date: Sat, 10 Jun 2017 17:07:33 -0400 Subject: [PATCH] GOSSIP-62 Implement Crdt PN-Counter restored layout of pom.xml for minimal changes in PR Snapshot - I think I have the basic framework in place. No tests are passing and nothing works, but most of the calls and the builder are in place. capture working code starting working on example code Working examples GOSSIP-65 Implement crdt LWW-Element-Set LWWSet implemented + se/de + unit tests + jackson tests + DataTests GOSSIP-55 Added event handlers to notify share data and per node data changes Reformat code to match apache standard Fixed DataTest errors WRT PNCounter --- gossip-base/pom.xml | 7 + .../org/apache/gossip/crdt/CrdtModule.java | 8 + .../org/apache/gossip/crdt/PNCounter.java | 139 ++++++++++++++ .../org/apache/gossip/crdt/PNCounterTest.java | 137 ++++++++++++++ .../gossip/examples/StandAlonePNCounter.java | 170 +++++++++++++++++ .../test/java/org/apache/gossip/DataTest.java | 172 +++++++++++++----- pom.xml | 11 +- 7 files changed, 596 insertions(+), 48 deletions(-) create mode 100644 gossip-base/src/main/java/org/apache/gossip/crdt/PNCounter.java create mode 100644 gossip-base/src/test/java/org/apache/gossip/crdt/PNCounterTest.java create mode 100644 gossip-examples/src/main/java/org/apache/gossip/examples/StandAlonePNCounter.java diff --git a/gossip-base/pom.xml b/gossip-base/pom.xml index 34f346c..e72a455 100644 --- a/gossip-base/pom.xml +++ b/gossip-base/pom.xml @@ -73,6 +73,13 @@ + + org.mockito + mockito-core + ${mockito.version} + test + + diff --git a/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtModule.java b/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtModule.java index bb1a052..1c95b28 100644 --- a/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtModule.java +++ b/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtModule.java @@ -61,6 +61,13 @@ abstract class GrowOnlyCounterMixin { @JsonProperty("counters") abstract Map getCounters(); } +abstract class PNCounterMixin { + @JsonCreator + PNCounterMixin(@JsonProperty("p-counters") Map up, @JsonProperty("n-counters") Map down) { } + @JsonProperty("p-counters") abstract Map getPCounters(); + @JsonProperty("n-counters") abstract Map getNCounters(); +} + //If anyone wants to take a stab at this. please have at it //https://github.com/FasterXML/jackson-datatype-guava/blob/master/src/main/java/com/fasterxml/jackson/datatype/guava/ser/MultimapSerializer.java public class CrdtModule extends SimpleModule { @@ -76,6 +83,7 @@ public class CrdtModule extends SimpleModule { context.setMixInAnnotations(OrSet.class, OrSetMixin.class); context.setMixInAnnotations(GrowOnlySet.class, GrowOnlySetMixin.class); context.setMixInAnnotations(GrowOnlyCounter.class, GrowOnlyCounterMixin.class); + context.setMixInAnnotations(PNCounter.class, PNCounterMixin.class); context.setMixInAnnotations(LWWSet.class, LWWSetMixin.class); context.setMixInAnnotations(LWWSet.Timestamps.class, LWWSetTimestampsMixin.class); } diff --git a/gossip-base/src/main/java/org/apache/gossip/crdt/PNCounter.java b/gossip-base/src/main/java/org/apache/gossip/crdt/PNCounter.java new file mode 100644 index 0000000..f00a5f1 --- /dev/null +++ b/gossip-base/src/main/java/org/apache/gossip/crdt/PNCounter.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.crdt; + +import java.util.Map; + +import org.apache.gossip.manager.GossipManager; + +public class PNCounter implements CrdtCounter { + + private final GrowOnlyCounter pCount; + + private final GrowOnlyCounter nCount; + + PNCounter(Map pCounters, Map nCounters) { + pCount = new GrowOnlyCounter(pCounters); + nCount = new GrowOnlyCounter(nCounters); + } + + public PNCounter(PNCounter starter, Builder builder) { + GrowOnlyCounter.Builder pBuilder = builder.makeGrowOnlyCounterBuilder(builder.pCount()); + pCount = new GrowOnlyCounter(starter.pCount, pBuilder); + GrowOnlyCounter.Builder nBuilder = builder.makeGrowOnlyCounterBuilder(builder.nCount()); + nCount = new GrowOnlyCounter(starter.nCount, nBuilder); + } + + public PNCounter(Builder builder) { + GrowOnlyCounter.Builder pBuilder = builder.makeGrowOnlyCounterBuilder(builder.pCount()); + pCount = new GrowOnlyCounter(pBuilder); + GrowOnlyCounter.Builder nBuilder = builder.makeGrowOnlyCounterBuilder(builder.nCount()); + nCount = new GrowOnlyCounter(nBuilder); + } + + public PNCounter(GossipManager manager) { + pCount = new GrowOnlyCounter(manager); + nCount = new GrowOnlyCounter(manager); + } + + public PNCounter(PNCounter starter, PNCounter other) { + pCount = new GrowOnlyCounter(starter.pCount, other.pCount); + nCount = new GrowOnlyCounter(starter.nCount, other.nCount); + } + + @Override + public PNCounter merge(PNCounter other) { + return new PNCounter(this, other); + } + + @Override + public Long value() { + long pValue = (long) pCount.value(); + long nValue = (long) nCount.value(); + return pValue - nValue; + } + + @Override + public PNCounter optimize() { + return new PNCounter(pCount.getCounters(), nCount.getCounters()); + } + + @Override + public boolean equals(Object obj) { + if (getClass() != obj.getClass()) + return false; + PNCounter other = (PNCounter) obj; + return value().longValue() == other.value().longValue(); + } + + @Override + public String toString() { + return "PnCounter [pCount=" + pCount + ", nCount=" + nCount + ", value=" + value() + "]"; + } + + Map getPCounters() { + return pCount.getCounters(); + } + + Map getNCounters() { + return nCount.getCounters(); + } + + public static class Builder { + + private final GossipManager myManager; + + private long value = 0L; + + public Builder(GossipManager gossipManager) { + myManager = gossipManager; + } + + public long pCount() { + if (value > 0) { + return value; + } + return 0; + } + + public long nCount() { + if (value < 0) { + return -value; + } + return 0; + } + + public org.apache.gossip.crdt.GrowOnlyCounter.Builder makeGrowOnlyCounterBuilder(long value) { + org.apache.gossip.crdt.GrowOnlyCounter.Builder ret = new org.apache.gossip.crdt.GrowOnlyCounter.Builder( + myManager); + ret.increment(value); + return ret; + } + + public PNCounter.Builder increment(long delta) { + value += delta; + return this; + } + + public PNCounter.Builder decrement(long delta) { + value -= delta; + return this; + } + } + +} diff --git a/gossip-base/src/test/java/org/apache/gossip/crdt/PNCounterTest.java b/gossip-base/src/test/java/org/apache/gossip/crdt/PNCounterTest.java new file mode 100644 index 0000000..8128cde --- /dev/null +++ b/gossip-base/src/test/java/org/apache/gossip/crdt/PNCounterTest.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.crdt; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.gossip.LocalMember; +import org.apache.gossip.manager.GossipManager; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class PNCounterTest { + + private List mockManagers; + + @Before + public void setupMocks() { + GossipManager manager1 = mock(GossipManager.class); + LocalMember mockMember1 = mock(LocalMember.class); + when(mockMember1.getId()).thenReturn("x"); + when(manager1.getMyself()).thenReturn(mockMember1); + + GossipManager manager2 = mock(GossipManager.class); + LocalMember mockMember2 = mock(LocalMember.class); + when(mockMember2.getId()).thenReturn("y"); + when(manager2.getMyself()).thenReturn(mockMember2); + + GossipManager manager3 = mock(GossipManager.class); + LocalMember mockMember3 = mock(LocalMember.class); + when(mockMember3.getId()).thenReturn("z"); + when(manager3.getMyself()).thenReturn(mockMember3); + + mockManagers = new ArrayList(); + mockManagers.add(manager1); + mockManagers.add(manager2); + mockManagers.add(manager3); + } + + @Test + public void existanceTest() { + PNCounter counter = new PNCounter(mockManagers.get(0)); + Assert.assertEquals(0, (long) counter.value()); + } + + @Test + public void localOperationTest() { + PNCounter counter = new PNCounter(mockManagers.get(0)); + Assert.assertEquals(0, (long) counter.value()); + + counter = new PNCounter(counter, new PNCounter.Builder(mockManagers.get(0)).increment(5L)); + Assert.assertEquals(5, (long) counter.value()); + + counter = new PNCounter(counter, new PNCounter.Builder(mockManagers.get(0)).increment(4L)); + Assert.assertEquals(9, (long) counter.value()); + + counter = new PNCounter(counter, new PNCounter.Builder(mockManagers.get(0)).decrement(3L)); + Assert.assertEquals(6, (long) counter.value()); + + counter = new PNCounter(counter, new PNCounter.Builder(mockManagers.get(0)).decrement(12L)); + Assert.assertEquals(-6, (long) counter.value()); + } + + @Test + public void oddballLocalOperationTest() { + PNCounter counter = new PNCounter(mockManagers.get(0)); + Assert.assertEquals(0, (long) counter.value()); + + counter = new PNCounter(counter, new PNCounter.Builder(mockManagers.get(0)).increment(-5L)); + Assert.assertEquals(-5, (long) counter.value()); + + counter = new PNCounter(counter, new PNCounter.Builder(mockManagers.get(0)).increment(4L)); + Assert.assertEquals(-1, (long) counter.value()); + + counter = new PNCounter(counter, new PNCounter.Builder(mockManagers.get(0)).decrement(-3L)); + Assert.assertEquals(2, (long) counter.value()); + + counter = new PNCounter(counter, new PNCounter.Builder(mockManagers.get(0)).decrement(-12L)); + Assert.assertEquals(14, (long) counter.value()); + } + + @Test + public void networkLikeOperations() { + PNCounter counter1 = new PNCounter(mockManagers.get(0)); + PNCounter counter2 = new PNCounter(mockManagers.get(1)); + PNCounter counter3 = new PNCounter(mockManagers.get(2)); + + Assert.assertEquals(0, (long) counter1.value()); + Assert.assertEquals(0, (long) counter2.value()); + Assert.assertEquals(0, (long) counter3.value()); + + counter1 = new PNCounter(counter1, new PNCounter.Builder(mockManagers.get(0)).increment(3L)); + Assert.assertEquals(3, (long) counter1.value()); + + counter2 = new PNCounter(counter2, new PNCounter.Builder(mockManagers.get(1)).increment(5L)); + Assert.assertEquals(5, (long) counter2.value()); + + counter3 = new PNCounter(counter3, new PNCounter.Builder(mockManagers.get(2)).decrement(7L)); + Assert.assertEquals(-7, (long) counter3.value()); + + // 2 becomes 2 and 1 + counter2 = counter2.merge(counter1); + Assert.assertEquals(8, (long) counter2.value()); + + // 3 becomes 3 and 1 + counter3 = counter3.merge(counter1); + Assert.assertEquals(-4, (long) counter3.value()); + + // 3 becomes all + counter3 = counter3.merge(counter2); + Assert.assertEquals(1, (long) counter3.value()); + + // 2 becomes all - different order + counter2 = counter2.merge(counter3); + Assert.assertEquals(1, (long) counter3.value()); + } + +} diff --git a/gossip-examples/src/main/java/org/apache/gossip/examples/StandAlonePNCounter.java b/gossip-examples/src/main/java/org/apache/gossip/examples/StandAlonePNCounter.java new file mode 100644 index 0000000..b0015be --- /dev/null +++ b/gossip-examples/src/main/java/org/apache/gossip/examples/StandAlonePNCounter.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.examples; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.net.URI; +import java.util.Arrays; +import java.util.List; + +import org.apache.gossip.GossipSettings; +import org.apache.gossip.LocalMember; +import org.apache.gossip.RemoteMember; +import org.apache.gossip.crdt.PNCounter; +import org.apache.gossip.manager.GossipManager; +import org.apache.gossip.manager.GossipManagerBuilder; +import org.apache.gossip.model.SharedDataMessage; + +public class StandAlonePNCounter { + private static ExampleCommon common = new ExampleCommon(); + private static String lastInput = "{None}"; + + public static void main(String[] args) throws InterruptedException, IOException { + args = common.checkArgsForClearFlag(args); + GossipSettings s = new GossipSettings(); + s.setWindowSize(1000); + s.setGossipInterval(100); + GossipManager gossipService = GossipManagerBuilder + .newBuilder() + .cluster("mycluster") + .uri(URI.create(args[0])).id(args[1]) + .gossipMembers( + Arrays.asList(new RemoteMember("mycluster", URI.create(args[2]), args[3]))) + .gossipSettings(s) + .build(); + gossipService.init(); + + new Thread(() -> { + while (true) { + common.optionallyClearTerminal(); + printLiveMembers(gossipService); + printDeadMambers(gossipService); + printValues(gossipService); + try { + Thread.sleep(2000); + } catch (Exception ignore) { + } + } + }).start(); + + String line = null; + try (BufferedReader br = new BufferedReader(new InputStreamReader(System.in))) { + while ((line = br.readLine()) != null) { + System.out.println(line); + char op = line.charAt(0); + char blank = line.charAt(1); + String val = line.substring(2); + Long l = null; + boolean valid = true; + try { + l = Long.valueOf(val); + } catch (NumberFormatException ex) { + valid = false; + } + valid = valid && + ( + (blank == ' ') && + ((op == 'i') || (op == 'd')) + ); + if (valid) { + if (op == 'i') { + increment(l, gossipService); + } else if (op == 'd') { + decrement(l, gossipService); + } + } + setLastInput(line,valid); + } + } + } + + private static void printValues(GossipManager gossipService) { + System.out.println("Last Input: " + getLastInput()); + System.out.println("---------- " + (gossipService.findCrdt("myPNCounter") == null ? "" + : gossipService.findCrdt("myPNCounter").value())); + System.out.println("********** " + gossipService.findCrdt("myPNCounter")); + } + + private static void printDeadMambers(GossipManager gossipService) { + List members = gossipService.getDeadMembers(); + if (members.isEmpty()) { + System.out.println("Dead: (none)"); + return; + } + System.out.println("Dead: " + members.get(0)); + for (int i = 1; i < members.size(); i++) { + System.out.println(" : " + members.get(i)); + } + } + + private static void printLiveMembers(GossipManager gossipService) { + List members = gossipService.getLiveMembers(); + if (members.isEmpty()) { + System.out.println("Live: (none)"); + return; + } + System.out.println("Live: " + members.get(0)); + for (int i = 1; i < members.size(); i++) { + System.out.println(" : " + members.get(i)); + } + } + + private static void increment(Long l, GossipManager gossipManager) { + PNCounter c = (PNCounter) gossipManager.findCrdt("myPNCounter"); + if (c == null) { + c = new PNCounter(new PNCounter.Builder(gossipManager).increment((l))); + } else { + c = new PNCounter(c, new PNCounter.Builder(gossipManager).increment((l))); + } + SharedDataMessage m = new SharedDataMessage(); + m.setExpireAt(Long.MAX_VALUE); + m.setKey("myPNCounter"); + m.setPayload(c); + m.setTimestamp(System.currentTimeMillis()); + gossipManager.merge(m); + } + + private static void decrement(Long l, GossipManager gossipManager) { + PNCounter c = (PNCounter) gossipManager.findCrdt("myPNCounter"); + if (c == null) { + c = new PNCounter(new PNCounter.Builder(gossipManager).decrement((l))); + } else { + c = new PNCounter(c, new PNCounter.Builder(gossipManager).decrement((l))); + } + SharedDataMessage m = new SharedDataMessage(); + m.setExpireAt(Long.MAX_VALUE); + m.setKey("myPNCounter"); + m.setPayload(c); + m.setTimestamp(System.currentTimeMillis()); + gossipManager.merge(m); + } + + private static void setLastInput(String input, boolean valid) { + lastInput = input; + if (! valid) { + lastInput += " (invalid)"; + } + } + + private static String getLastInput() { + return lastInput; + } + +} \ No newline at end of file diff --git a/gossip-itest/src/test/java/org/apache/gossip/DataTest.java b/gossip-itest/src/test/java/org/apache/gossip/DataTest.java index 53408f8..e91426c 100644 --- a/gossip-itest/src/test/java/org/apache/gossip/DataTest.java +++ b/gossip-itest/src/test/java/org/apache/gossip/DataTest.java @@ -17,55 +17,65 @@ */ package org.apache.gossip; -import io.teknek.tunit.TUnit; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + import org.apache.gossip.crdt.GrowOnlyCounter; import org.apache.gossip.crdt.GrowOnlySet; import org.apache.gossip.crdt.LWWSet; import org.apache.gossip.crdt.OrSet; +import org.apache.gossip.crdt.PNCounter; import org.apache.gossip.manager.GossipManager; import org.apache.gossip.manager.GossipManagerBuilder; import org.apache.gossip.model.PerNodeDataMessage; import org.apache.gossip.model.SharedDataMessage; -import org.junit.Assert; import org.junit.Test; -import java.net.URI; -import java.net.URISyntaxException; -import java.net.UnknownHostException; -import java.util.*; -import java.util.concurrent.TimeUnit; +import io.teknek.tunit.TUnit; public class DataTest extends AbstractIntegrationBase { private String orSetKey = "cror"; private String lwwSetKey = "crlww"; private String gCounterKey = "crdtgc"; + private String pnCounterKey = "crdtpn"; @Test - public void dataTest() throws InterruptedException, UnknownHostException, URISyntaxException{ + public void dataTest() throws InterruptedException, UnknownHostException, URISyntaxException { GossipSettings settings = new GossipSettings(); settings.setPersistRingState(false); settings.setPersistDataState(false); String cluster = UUID.randomUUID().toString(); int seedNodes = 1; List startupMembers = new ArrayList<>(); - for (int i = 1; i < seedNodes + 1; ++i){ + for (int i = 1; i < seedNodes + 1; ++i) { URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i)); startupMembers.add(new RemoteMember(cluster, uri, i + "")); } final List clients = new ArrayList<>(); final int clusterMembers = 2; - for (int i = 1; i < clusterMembers + 1; ++i){ + for (int i = 1; i < clusterMembers + 1; ++i) { URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i)); - GossipManager gossipService = GossipManagerBuilder.newBuilder().cluster(cluster).uri(uri) - .id(i + "").gossipMembers(startupMembers).gossipSettings(settings).build(); + GossipManager gossipService = GossipManagerBuilder + .newBuilder() + .cluster(cluster).uri(uri) + .id(i + "") + .gossipMembers(startupMembers) + .gossipSettings(settings).build(); clients.add(gossipService); gossipService.init(); register(gossipService); } TUnit.assertThat(() -> { int total = 0; - for (int i = 0; i < clusterMembers; ++i){ + for (int i = 0; i < clusterMembers; ++i) { total += clients.get(i).getLiveMembers().size(); } return total; @@ -89,105 +99,173 @@ public class DataTest extends AbstractIntegrationBase { return x.getPayload(); }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("c"); - givenDifferentDatumsInSet(clients); assertThatListIsMerged(clients); testOrSet(clients); testLWWSet(clients); - // test g counter - givenDifferentIncrement(clients); - assertThatCountIsUpdated(clients, 3); - givenIncreaseOther(clients); - assertThatCountIsUpdated(clients, 7); + testGrowOnlyCounter(clients); + testPNCounter(clients); - for (int i = 0; i < clusterMembers; ++i){ + for (int i = 0; i < clusterMembers; ++i) { clients.get(i).shutdown(); } } - private void testOrSet(final List clients){ - //populate + private void testOrSet(final List clients) { + // populate clients.get(0).merge(generateSharedMsg(orSetKey, new OrSet<>("1", "2"))); clients.get(1).merge(generateSharedMsg(orSetKey, new OrSet<>("3", "4"))); - //assert merge + // assert merge assertMerged(clients.get(0), orSetKey, new OrSet<>("1", "2", "3", "4").value()); assertMerged(clients.get(1), orSetKey, new OrSet<>("1", "2", "3", "4").value()); - //drop element + // drop element @SuppressWarnings("unchecked") OrSet o = (OrSet) clients.get(0).findCrdt(orSetKey); OrSet o2 = new OrSet<>(o, new OrSet.Builder().remove("3")); clients.get(0).merge(generateSharedMsg(orSetKey, o2)); - //assert deletion + // assert deletion assertMerged(clients.get(0), orSetKey, new OrSet<>("1", "2", "4").value()); assertMerged(clients.get(1), orSetKey, new OrSet<>("1", "2", "4").value()); } - private void testLWWSet(final List clients){ - //populate + private void testLWWSet(final List clients) { + // populate clients.get(0).merge(generateSharedMsg(lwwSetKey, new LWWSet<>("1", "2"))); clients.get(1).merge(generateSharedMsg(lwwSetKey, new LWWSet<>("3", "4"))); - //assert merge + // assert merge assertMerged(clients.get(0), lwwSetKey, new LWWSet<>("1", "2", "3", "4").value()); assertMerged(clients.get(1), lwwSetKey, new LWWSet<>("1", "2", "3", "4").value()); - //drop element + // drop element @SuppressWarnings("unchecked") LWWSet lww = (LWWSet) clients.get(0).findCrdt(lwwSetKey); clients.get(0).merge(generateSharedMsg(lwwSetKey, lww.remove("3"))); - //assert deletion + // assert deletion assertMerged(clients.get(0), lwwSetKey, new OrSet<>("1", "2", "4").value()); assertMerged(clients.get(1), lwwSetKey, new OrSet<>("1", "2", "4").value()); } - private void givenDifferentIncrement(final List clients){ + private void testGrowOnlyCounter(List clients) { + givenDifferentIncrement(clients); + assertThatCountIsUpdated(clients, 3); + givenIncreaseOther(clients); + assertThatCountIsUpdated(clients, 7); + } + + private void testPNCounter(List clients) { + givenPNCounter(clients); + assertThatPNCounterSettlesAt(clients, 0); + int[] delta1 = { 2, 3 }; + givenPNCounterUpdate(clients, delta1); + assertThatPNCounterSettlesAt(clients, 5); + int[] delta2 = { -3, 5 }; + givenPNCounterUpdate(clients, delta2); + assertThatPNCounterSettlesAt(clients, 7); + int[] delta3 = { 1, 1 }; + givenPNCounterUpdate(clients, delta3); + assertThatPNCounterSettlesAt(clients, 9); + int[] delta4 = { 1, -7 }; + givenPNCounterUpdate(clients, delta4); + assertThatPNCounterSettlesAt(clients, 3); + } + + private void givenDifferentIncrement(final List clients) { Object payload = new GrowOnlyCounter(new GrowOnlyCounter.Builder(clients.get(0)).increment(1L)); clients.get(0).merge(generateSharedMsg(gCounterKey, payload)); payload = new GrowOnlyCounter(new GrowOnlyCounter.Builder(clients.get(1)).increment(2L)); clients.get(1).merge(generateSharedMsg(gCounterKey, payload)); } - private void givenIncreaseOther(final List clients){ + private void givenIncreaseOther(final List clients) { GrowOnlyCounter gc = (GrowOnlyCounter) clients.get(1).findCrdt(gCounterKey); GrowOnlyCounter gc2 = new GrowOnlyCounter(gc, - new GrowOnlyCounter.Builder(clients.get(1)).increment(4L)); + new GrowOnlyCounter.Builder(clients.get(1)).increment(4L)); clients.get(1).merge(generateSharedMsg(gCounterKey, gc2)); } - private void assertMerged(final GossipManager client, String key, final Set expected){ - TUnit.assertThat(() -> client.findCrdt(key).value()) - .afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(expected); + private void assertMerged(final GossipManager client, String key, final Set expected) { + TUnit.assertThat(() -> client.findCrdt(key).value()).afterWaitingAtMost(10, TimeUnit.SECONDS) + .isEqualTo(expected); } - private void givenDifferentDatumsInSet(final List clients){ + private void givenDifferentDatumsInSet(final List clients) { clients.get(0).merge(CrdtMessage("1")); clients.get(1).merge(CrdtMessage("2")); } - - private void assertThatCountIsUpdated(final List clients, long finalCount){ + private void assertThatCountIsUpdated(final List clients, long finalCount) { TUnit.assertThat(() -> clients.get(0).findCrdt(gCounterKey)) - .afterWaitingAtMost(10, TimeUnit.SECONDS) - .isEqualTo(new GrowOnlyCounter(new GrowOnlyCounter.Builder(clients.get(0)).increment(finalCount))); + .afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(new GrowOnlyCounter( + new GrowOnlyCounter.Builder(clients.get(0)).increment(finalCount))); } - private void assertThatListIsMerged(final List clients){ - TUnit.assertThat(() -> clients.get(0).findCrdt("cr")) - .afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(new GrowOnlySet<>(Arrays.asList("1", "2"))); + private void assertThatListIsMerged(final List clients) { + TUnit.assertThat(() -> clients.get(0).findCrdt("cr")).afterWaitingAtMost(10, TimeUnit.SECONDS) + .isEqualTo(new GrowOnlySet<>(Arrays.asList("1", "2"))); + } + + private void givenPNCounter(List clients) { + { + SharedDataMessage d = new SharedDataMessage(); + d.setKey(pnCounterKey); + d.setPayload(new PNCounter(new PNCounter.Builder(clients.get(0)))); + d.setExpireAt(Long.MAX_VALUE); + d.setTimestamp(System.currentTimeMillis()); + clients.get(0).merge(d); + } + { + SharedDataMessage d = new SharedDataMessage(); + d.setKey(pnCounterKey); + d.setPayload(new PNCounter(new PNCounter.Builder(clients.get(1)))); + d.setExpireAt(Long.MAX_VALUE); + d.setTimestamp(System.currentTimeMillis()); + clients.get(1).merge(d); + } } - private SharedDataMessage CrdtMessage(String item){ + private void givenPNCounterUpdate(List clients, int[] deltaArray) { + int clientIndex = 0; + for (int delta: deltaArray) { + PNCounter c = (PNCounter) clients.get(clientIndex).findCrdt(pnCounterKey); + c = new PNCounter(c, new PNCounter.Builder(clients.get(clientIndex)).increment(((long)delta))); + SharedDataMessage d = new SharedDataMessage(); + d.setKey(pnCounterKey); + d.setPayload(c); + d.setExpireAt(Long.MAX_VALUE); + d.setTimestamp(System.currentTimeMillis()); + clients.get(clientIndex).merge(d); + clientIndex = (clientIndex + 1) % clients.size(); + } + } + + private void assertThatPNCounterSettlesAt(List clients, long expectedValue) { + for (GossipManager client: clients) { + TUnit.assertThat(() -> { + long value = 0; + Object o = client.findCrdt(pnCounterKey); + if (o != null) { + PNCounter c = (PNCounter)o; + value = c.value(); + } + return value; + }).afterWaitingAtMost(10, TimeUnit.SECONDS) + .isEqualTo(expectedValue); + } + } + + private SharedDataMessage CrdtMessage(String item) { return generateSharedMsg("cr", new GrowOnlySet<>(Arrays.asList(item))); } - private PerNodeDataMessage generatePerNodeMsg(String key, Object payload){ + private PerNodeDataMessage generatePerNodeMsg(String key, Object payload) { PerNodeDataMessage g = new PerNodeDataMessage(); g.setExpireAt(Long.MAX_VALUE); g.setKey(key); @@ -196,7 +274,7 @@ public class DataTest extends AbstractIntegrationBase { return g; } - private SharedDataMessage generateSharedMsg(String key, Object payload){ + private SharedDataMessage generateSharedMsg(String key, Object payload) { SharedDataMessage d = new SharedDataMessage(); d.setKey(key); d.setPayload(payload); diff --git a/pom.xml b/pom.xml index 8740319..75a54a4 100644 --- a/pom.xml +++ b/pom.xml @@ -32,7 +32,8 @@ 4.12.0-M2 1.2.17 0.0.0 - + 2.8.9 + 3.5.1 2.10 @@ -116,6 +117,12 @@ ${tunit.version} test + + org.mockito + mockito-core + ${mockito.version} + test + @@ -200,6 +207,8 @@ eclipse_template.xml **/*.mycluster.*.json + + **/*.log