GOSSIP-63 Added CRDT G-Counter implementation

This commit is contained in:
Mirage Abeysekara
2017-03-08 10:21:03 +05:30
parent 22b9e756d4
commit da1aba9c71
5 changed files with 254 additions and 3 deletions

View File

@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.crdt;
public interface CrdtCounter<ValueType extends Number, R extends CrdtCounter<ValueType, R>>
extends Crdt<ValueType, R> {
}

View File

@ -42,6 +42,12 @@ abstract class GrowOnlySetMixin<E>{
@JsonIgnore abstract boolean isEmpty();
}
abstract class GrowOnlyCounterMixin {
@JsonCreator
GrowOnlyCounterMixin(@JsonProperty("counters") Map<String, Long> counters) { }
@JsonProperty("counters") abstract Map<String, Long> getCounters();
}
//If anyone wants to take a stab at this. please have at it
//https://github.com/FasterXML/jackson-datatype-guava/blob/master/src/main/java/com/fasterxml/jackson/datatype/guava/ser/MultimapSerializer.java
public class CrdtModule extends SimpleModule {
@ -56,6 +62,7 @@ public class CrdtModule extends SimpleModule {
public void setupModule(SetupContext context) {
context.setMixInAnnotations(OrSet.class, OrSetMixin.class);
context.setMixInAnnotations(GrowOnlySet.class, GrowOnlySetMixin.class);
context.setMixInAnnotations(GrowOnlyCounter.class, GrowOnlyCounterMixin.class);
}
}

View File

@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.crdt;
import org.apache.gossip.manager.GossipManager;
import java.util.HashMap;
import java.util.Map;
public class GrowOnlyCounter implements CrdtCounter<Long, GrowOnlyCounter> {
private final Map<String, Long> counters = new HashMap<>();
GrowOnlyCounter(Map<String, Long> counters) {
this.counters.putAll(counters);
}
public GrowOnlyCounter(GrowOnlyCounter growOnlyCounter, Builder builder) {
counters.putAll(growOnlyCounter.counters);
if (counters.containsKey(builder.myId)) {
Long newValue = counters.get(builder.myId) + builder.counter;
counters.replace(builder.myId, newValue);
} else {
counters.put(builder.myId, builder.counter);
}
}
public GrowOnlyCounter(Builder builder) {
counters.put(builder.myId, builder.counter);
}
public GrowOnlyCounter(GossipManager manager) {
counters.put(manager.getMyself().getId(), 0L);
}
public GrowOnlyCounter(GrowOnlyCounter growOnlyCounter, GrowOnlyCounter other) {
counters.putAll(growOnlyCounter.counters);
for (Map.Entry<String, Long> entry : other.counters.entrySet()) {
String otherKey = entry.getKey();
Long otherValue = entry.getValue();
if (counters.containsKey(otherKey)) {
Long newValue = Math.max(counters.get(otherKey), otherValue);
counters.replace(otherKey, newValue);
} else {
counters.put(otherKey, otherValue);
}
}
}
@Override
public GrowOnlyCounter merge(GrowOnlyCounter other) {
return new GrowOnlyCounter(this, other);
}
@Override
public Long value() {
Long globalCount = 0L;
for (Long increment : counters.values()) {
globalCount += increment;
}
return globalCount;
}
@Override
public GrowOnlyCounter optimize() {
return new GrowOnlyCounter(counters);
}
@Override
public boolean equals(Object obj) {
if (getClass() != obj.getClass())
return false;
GrowOnlyCounter other = (GrowOnlyCounter) obj;
return value().longValue() == other.value().longValue();
}
@Override
public String toString() {
return "GrowOnlyCounter [counters= " + counters + ", Value=" + value() + "]";
}
Map<String, Long> getCounters() {
return counters;
}
public static class Builder {
private final String myId;
private Long counter;
public Builder(GossipManager gossipManager) {
myId = gossipManager.getMyself().getId();
counter = 0L;
}
public GrowOnlyCounter.Builder increment(Integer count) {
counter += count;
return this;
}
}
}

View File

@ -28,6 +28,7 @@ import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.gossip.crdt.GrowOnlyCounter;
import org.apache.gossip.crdt.GrowOnlySet;
import org.apache.gossip.crdt.OrSet;
import org.apache.gossip.model.GossipDataMessage;
@ -39,6 +40,7 @@ import io.teknek.tunit.TUnit;
public class DataTest {
private String orSetKey = "cror";
private String gCounterKey = "crdtgc";
@Test
public void dataTest() throws InterruptedException, UnknownHostException, URISyntaxException{
@ -72,7 +74,7 @@ public class DataTest {
clients.get(0).gossipPerNodeData(msg());
clients.get(0).gossipSharedData(sharedMsg());
TUnit.assertThat(()-> {
TUnit.assertThat(()-> {
GossipDataMessage x = clients.get(1).findPerNodeData(1 + "", "a");
if (x == null)
return "";
@ -80,7 +82,7 @@ public class DataTest {
return x.getPayload();
}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("b");
TUnit.assertThat(() -> {
TUnit.assertThat(() -> {
SharedGossipDataMessage x = clients.get(1).findSharedData("a");
if (x == null)
return "";
@ -96,11 +98,49 @@ public class DataTest {
dropIt(clients);
assertThatOrSetDelIsMerged(clients);
// test g counter
givenDifferentIncrement(clients);
assertThatCountIsUpdated(clients, 3);
givenIncreaseOther(clients);
assertThatCountIsUpdated(clients, 7);
for (int i = 0; i < clusterMembers; ++i) {
clients.get(i).shutdown();
}
}
private void givenDifferentIncrement(final List<GossipService> clients) {
{
SharedGossipDataMessage d = new SharedGossipDataMessage();
d.setKey(gCounterKey);
d.setPayload(new GrowOnlyCounter(new GrowOnlyCounter.Builder(clients.get(0).getGossipManager()).increment(1)));
d.setExpireAt(Long.MAX_VALUE);
d.setTimestamp(System.currentTimeMillis());
clients.get(0).getGossipManager().merge(d);
}
{
SharedGossipDataMessage d = new SharedGossipDataMessage();
d.setKey(gCounterKey);
d.setPayload(new GrowOnlyCounter(new GrowOnlyCounter.Builder(clients.get(1).getGossipManager()).increment(2)));
d.setExpireAt(Long.MAX_VALUE);
d.setTimestamp(System.currentTimeMillis());
clients.get(1).getGossipManager().merge(d);
}
}
private void givenIncreaseOther(final List<GossipService> clients) {
GrowOnlyCounter gc = (GrowOnlyCounter) clients.get(1).getGossipManager().findCrdt(gCounterKey);
GrowOnlyCounter gc2 = new GrowOnlyCounter(gc,
new GrowOnlyCounter.Builder(clients.get(1).getGossipManager()).increment(4));
SharedGossipDataMessage d = new SharedGossipDataMessage();
d.setKey(gCounterKey);
d.setPayload(gc2);
d.setExpireAt(Long.MAX_VALUE);
d.setTimestamp(System.currentTimeMillis());
clients.get(1).getGossipManager().merge(d);
}
private void givenOrs(List<GossipService> clients) {
{
SharedGossipDataMessage d = new SharedGossipDataMessage();
@ -152,6 +192,13 @@ public class DataTest {
clients.get(1).getGossipManager().merge(CrdtMessage("2"));
}
private void assertThatCountIsUpdated(final List<GossipService> clients, int finalCount) {
TUnit.assertThat(() -> {
return clients.get(0).getGossipManager().findCrdt(gCounterKey);
}).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(new GrowOnlyCounter(
new GrowOnlyCounter.Builder(clients.get(0).getGossipManager()).increment(finalCount)));
}
private void assertThatListIsMerged(final List<GossipService> clients){
TUnit.assertThat(() -> {
return clients.get(0).getGossipManager().findCrdt("cr");
@ -164,7 +211,7 @@ public class DataTest {
d.setPayload(new GrowOnlySet<String>( Arrays.asList(item)));
d.setExpireAt(Long.MAX_VALUE);
d.setTimestamp(System.currentTimeMillis());
return d;
return d;
}
private GossipDataMessage msg(){

View File

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.crdt;
import org.junit.Assert;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
public class GrowOnlyCounterTest {
@Test
public void mergeTest() {
Map<String, Long> node1Counter = new HashMap<>();
node1Counter.put("1", 3L);
Map<String, Long> node2Counter = new HashMap<>();
node2Counter.put("2", 1L);
Map<String, Long> node3Counter = new HashMap<>();
node3Counter.put("3", 2L);
GrowOnlyCounter gCounter1 = new GrowOnlyCounter(node1Counter);
GrowOnlyCounter gCounter2 = new GrowOnlyCounter(node2Counter);
GrowOnlyCounter gCounter3 = new GrowOnlyCounter(node3Counter);
// After node 2 receive from node 1
gCounter2 = gCounter2.merge(gCounter1);
Assert.assertEquals(4, (long) gCounter2.value());
// After node 3 receive from node 1
gCounter3 = gCounter3.merge(gCounter1);
Assert.assertEquals(5, (long) gCounter3.value());
// After node 3 receive from node 2
gCounter3 = gCounter3.merge(gCounter2);
Assert.assertEquals(6, (long) gCounter3.value());
}
}