diff --git a/src/main/java/org/apache/gossip/crdt/CrdtCounter.java b/src/main/java/org/apache/gossip/crdt/CrdtCounter.java new file mode 100644 index 0000000..cdc9445 --- /dev/null +++ b/src/main/java/org/apache/gossip/crdt/CrdtCounter.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.crdt; + +public interface CrdtCounter> + extends Crdt { + +} + diff --git a/src/main/java/org/apache/gossip/crdt/CrdtModule.java b/src/main/java/org/apache/gossip/crdt/CrdtModule.java index 0c8a787..cfb3f47 100644 --- a/src/main/java/org/apache/gossip/crdt/CrdtModule.java +++ b/src/main/java/org/apache/gossip/crdt/CrdtModule.java @@ -42,6 +42,12 @@ abstract class GrowOnlySetMixin{ @JsonIgnore abstract boolean isEmpty(); } +abstract class GrowOnlyCounterMixin { + @JsonCreator + GrowOnlyCounterMixin(@JsonProperty("counters") Map counters) { } + @JsonProperty("counters") abstract Map getCounters(); +} + //If anyone wants to take a stab at this. please have at it //https://github.com/FasterXML/jackson-datatype-guava/blob/master/src/main/java/com/fasterxml/jackson/datatype/guava/ser/MultimapSerializer.java public class CrdtModule extends SimpleModule { @@ -56,6 +62,7 @@ public class CrdtModule extends SimpleModule { public void setupModule(SetupContext context) { context.setMixInAnnotations(OrSet.class, OrSetMixin.class); context.setMixInAnnotations(GrowOnlySet.class, GrowOnlySetMixin.class); + context.setMixInAnnotations(GrowOnlyCounter.class, GrowOnlyCounterMixin.class); } } diff --git a/src/main/java/org/apache/gossip/crdt/GrowOnlyCounter.java b/src/main/java/org/apache/gossip/crdt/GrowOnlyCounter.java new file mode 100644 index 0000000..9156142 --- /dev/null +++ b/src/main/java/org/apache/gossip/crdt/GrowOnlyCounter.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gossip.crdt; + +import org.apache.gossip.manager.GossipManager; + +import java.util.HashMap; +import java.util.Map; + +public class GrowOnlyCounter implements CrdtCounter { + + private final Map counters = new HashMap<>(); + + GrowOnlyCounter(Map counters) { + this.counters.putAll(counters); + } + + public GrowOnlyCounter(GrowOnlyCounter growOnlyCounter, Builder builder) { + counters.putAll(growOnlyCounter.counters); + if (counters.containsKey(builder.myId)) { + Long newValue = counters.get(builder.myId) + builder.counter; + counters.replace(builder.myId, newValue); + } else { + counters.put(builder.myId, builder.counter); + } + } + + public GrowOnlyCounter(Builder builder) { + counters.put(builder.myId, builder.counter); + } + + public GrowOnlyCounter(GossipManager manager) { + counters.put(manager.getMyself().getId(), 0L); + } + + public GrowOnlyCounter(GrowOnlyCounter growOnlyCounter, GrowOnlyCounter other) { + counters.putAll(growOnlyCounter.counters); + for (Map.Entry entry : other.counters.entrySet()) { + String otherKey = entry.getKey(); + Long otherValue = entry.getValue(); + + if (counters.containsKey(otherKey)) { + Long newValue = Math.max(counters.get(otherKey), otherValue); + counters.replace(otherKey, newValue); + } else { + counters.put(otherKey, otherValue); + } + } + } + + @Override + public GrowOnlyCounter merge(GrowOnlyCounter other) { + return new GrowOnlyCounter(this, other); + } + + @Override + public Long value() { + Long globalCount = 0L; + for (Long increment : counters.values()) { + globalCount += increment; + } + return globalCount; + } + + @Override + public GrowOnlyCounter optimize() { + return new GrowOnlyCounter(counters); + } + + @Override + public boolean equals(Object obj) { + if (getClass() != obj.getClass()) + return false; + GrowOnlyCounter other = (GrowOnlyCounter) obj; + return value().longValue() == other.value().longValue(); + } + + @Override + public String toString() { + return "GrowOnlyCounter [counters= " + counters + ", Value=" + value() + "]"; + } + + Map getCounters() { + return counters; + } + + public static class Builder { + + private final String myId; + + private Long counter; + + public Builder(GossipManager gossipManager) { + myId = gossipManager.getMyself().getId(); + counter = 0L; + } + + public GrowOnlyCounter.Builder increment(Integer count) { + counter += count; + return this; + } + } +} diff --git a/src/test/java/org/apache/gossip/DataTest.java b/src/test/java/org/apache/gossip/DataTest.java index 147702d..5ca7be7 100644 --- a/src/test/java/org/apache/gossip/DataTest.java +++ b/src/test/java/org/apache/gossip/DataTest.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.UUID; import java.util.concurrent.TimeUnit; +import org.apache.gossip.crdt.GrowOnlyCounter; import org.apache.gossip.crdt.GrowOnlySet; import org.apache.gossip.crdt.OrSet; import org.apache.gossip.manager.GossipManager; @@ -39,6 +40,7 @@ import io.teknek.tunit.TUnit; public class DataTest extends AbstractIntegrationBase { private String orSetKey = "cror"; + private String gCounterKey = "crdtgc"; @Test public void dataTest() throws InterruptedException, UnknownHostException, URISyntaxException{ @@ -72,15 +74,15 @@ public class DataTest extends AbstractIntegrationBase { clients.get(0).gossipPerNodeData(msg()); clients.get(0).gossipSharedData(sharedMsg()); - TUnit.assertThat(()-> { + TUnit.assertThat(()-> { PerNodeDataMessage x = clients.get(1).findPerNodeGossipData(1 + "", "a"); if (x == null) return ""; else return x.getPayload(); }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("b"); - - TUnit.assertThat(() -> { + + TUnit.assertThat(() -> { SharedDataMessage x = clients.get(1).findSharedGossipData("a"); if (x == null) return ""; @@ -88,6 +90,7 @@ public class DataTest extends AbstractIntegrationBase { return x.getPayload(); }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("c"); + givenDifferentDatumsInSet(clients); assertThatListIsMerged(clients); @@ -95,8 +98,51 @@ public class DataTest extends AbstractIntegrationBase { assertThatOrSetIsMerged(clients); dropIt(clients); assertThatOrSetDelIsMerged(clients); + + + // test g counter + givenDifferentIncrement(clients); + assertThatCountIsUpdated(clients, 3); + givenIncreaseOther(clients); + assertThatCountIsUpdated(clients, 7); + + for (int i = 0; i < clusterMembers; ++i) { + clients.get(i).shutdown(); + } } + private void givenDifferentIncrement(final List clients) { + { + SharedDataMessage d = new SharedDataMessage(); + d.setKey(gCounterKey); + d.setPayload(new GrowOnlyCounter(new GrowOnlyCounter.Builder(clients.get(0)).increment(1))); + d.setExpireAt(Long.MAX_VALUE); + d.setTimestamp(System.currentTimeMillis()); + clients.get(0).merge(d); + } + { + SharedDataMessage d = new SharedDataMessage(); + d.setKey(gCounterKey); + d.setPayload(new GrowOnlyCounter(new GrowOnlyCounter.Builder(clients.get(1)).increment(2))); + d.setExpireAt(Long.MAX_VALUE); + d.setTimestamp(System.currentTimeMillis()); + clients.get(1).merge(d); + } + } + + private void givenIncreaseOther(final List clients) { + GrowOnlyCounter gc = (GrowOnlyCounter) clients.get(1).findCrdt(gCounterKey); + GrowOnlyCounter gc2 = new GrowOnlyCounter(gc, + new GrowOnlyCounter.Builder(clients.get(1)).increment(4)); + + SharedDataMessage d = new SharedDataMessage(); + d.setKey(gCounterKey); + d.setPayload(gc2); + d.setExpireAt(Long.MAX_VALUE); + d.setTimestamp(System.currentTimeMillis()); + clients.get(1).merge(d); + } + private void givenOrs(List clients) { { SharedDataMessage d = new SharedDataMessage(); @@ -148,6 +194,14 @@ public class DataTest extends AbstractIntegrationBase { clients.get(1).merge(CrdtMessage("2")); } + + private void assertThatCountIsUpdated(final List clients, int finalCount) { + TUnit.assertThat(() -> { + return clients.get(0).findCrdt(gCounterKey); + }).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(new GrowOnlyCounter( + new GrowOnlyCounter.Builder(clients.get(0)).increment(finalCount))); + } + private void assertThatListIsMerged(final List clients){ TUnit.assertThat(() -> { return clients.get(0).findCrdt("cr"); @@ -160,7 +214,7 @@ public class DataTest extends AbstractIntegrationBase { d.setPayload(new GrowOnlySet( Arrays.asList(item))); d.setExpireAt(Long.MAX_VALUE); d.setTimestamp(System.currentTimeMillis()); - return d; + return d; } private PerNodeDataMessage msg(){ diff --git a/src/test/java/org/apache/gossip/crdt/GrowOnlyCounterTest.java b/src/test/java/org/apache/gossip/crdt/GrowOnlyCounterTest.java new file mode 100644 index 0000000..3a134af --- /dev/null +++ b/src/test/java/org/apache/gossip/crdt/GrowOnlyCounterTest.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.crdt; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +public class GrowOnlyCounterTest { + + @Test + public void mergeTest() { + + Map node1Counter = new HashMap<>(); + node1Counter.put("1", 3L); + Map node2Counter = new HashMap<>(); + node2Counter.put("2", 1L); + Map node3Counter = new HashMap<>(); + node3Counter.put("3", 2L); + + GrowOnlyCounter gCounter1 = new GrowOnlyCounter(node1Counter); + GrowOnlyCounter gCounter2 = new GrowOnlyCounter(node2Counter); + GrowOnlyCounter gCounter3 = new GrowOnlyCounter(node3Counter); + + // After node 2 receive from node 1 + gCounter2 = gCounter2.merge(gCounter1); + Assert.assertEquals(4, (long) gCounter2.value()); + + // After node 3 receive from node 1 + gCounter3 = gCounter3.merge(gCounter1); + Assert.assertEquals(5, (long) gCounter3.value()); + + // After node 3 receive from node 2 + gCounter3 = gCounter3.merge(gCounter2); + Assert.assertEquals(6, (long) gCounter3.value()); + } +}