GOSSIP-63 merge cleanup
This commit is contained in:
24
src/main/java/org/apache/gossip/crdt/CrdtCounter.java
Normal file
24
src/main/java/org/apache/gossip/crdt/CrdtCounter.java
Normal file
@ -0,0 +1,24 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.gossip.crdt;
|
||||||
|
|
||||||
|
public interface CrdtCounter<ValueType extends Number, R extends CrdtCounter<ValueType, R>>
|
||||||
|
extends Crdt<ValueType, R> {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -42,6 +42,12 @@ abstract class GrowOnlySetMixin<E>{
|
|||||||
@JsonIgnore abstract boolean isEmpty();
|
@JsonIgnore abstract boolean isEmpty();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
abstract class GrowOnlyCounterMixin {
|
||||||
|
@JsonCreator
|
||||||
|
GrowOnlyCounterMixin(@JsonProperty("counters") Map<String, Long> counters) { }
|
||||||
|
@JsonProperty("counters") abstract Map<String, Long> getCounters();
|
||||||
|
}
|
||||||
|
|
||||||
//If anyone wants to take a stab at this. please have at it
|
//If anyone wants to take a stab at this. please have at it
|
||||||
//https://github.com/FasterXML/jackson-datatype-guava/blob/master/src/main/java/com/fasterxml/jackson/datatype/guava/ser/MultimapSerializer.java
|
//https://github.com/FasterXML/jackson-datatype-guava/blob/master/src/main/java/com/fasterxml/jackson/datatype/guava/ser/MultimapSerializer.java
|
||||||
public class CrdtModule extends SimpleModule {
|
public class CrdtModule extends SimpleModule {
|
||||||
@ -56,6 +62,7 @@ public class CrdtModule extends SimpleModule {
|
|||||||
public void setupModule(SetupContext context) {
|
public void setupModule(SetupContext context) {
|
||||||
context.setMixInAnnotations(OrSet.class, OrSetMixin.class);
|
context.setMixInAnnotations(OrSet.class, OrSetMixin.class);
|
||||||
context.setMixInAnnotations(GrowOnlySet.class, GrowOnlySetMixin.class);
|
context.setMixInAnnotations(GrowOnlySet.class, GrowOnlySetMixin.class);
|
||||||
|
context.setMixInAnnotations(GrowOnlyCounter.class, GrowOnlyCounterMixin.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
119
src/main/java/org/apache/gossip/crdt/GrowOnlyCounter.java
Normal file
119
src/main/java/org/apache/gossip/crdt/GrowOnlyCounter.java
Normal file
@ -0,0 +1,119 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.gossip.crdt;
|
||||||
|
|
||||||
|
import org.apache.gossip.manager.GossipManager;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
public class GrowOnlyCounter implements CrdtCounter<Long, GrowOnlyCounter> {
|
||||||
|
|
||||||
|
private final Map<String, Long> counters = new HashMap<>();
|
||||||
|
|
||||||
|
GrowOnlyCounter(Map<String, Long> counters) {
|
||||||
|
this.counters.putAll(counters);
|
||||||
|
}
|
||||||
|
|
||||||
|
public GrowOnlyCounter(GrowOnlyCounter growOnlyCounter, Builder builder) {
|
||||||
|
counters.putAll(growOnlyCounter.counters);
|
||||||
|
if (counters.containsKey(builder.myId)) {
|
||||||
|
Long newValue = counters.get(builder.myId) + builder.counter;
|
||||||
|
counters.replace(builder.myId, newValue);
|
||||||
|
} else {
|
||||||
|
counters.put(builder.myId, builder.counter);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public GrowOnlyCounter(Builder builder) {
|
||||||
|
counters.put(builder.myId, builder.counter);
|
||||||
|
}
|
||||||
|
|
||||||
|
public GrowOnlyCounter(GossipManager manager) {
|
||||||
|
counters.put(manager.getMyself().getId(), 0L);
|
||||||
|
}
|
||||||
|
|
||||||
|
public GrowOnlyCounter(GrowOnlyCounter growOnlyCounter, GrowOnlyCounter other) {
|
||||||
|
counters.putAll(growOnlyCounter.counters);
|
||||||
|
for (Map.Entry<String, Long> entry : other.counters.entrySet()) {
|
||||||
|
String otherKey = entry.getKey();
|
||||||
|
Long otherValue = entry.getValue();
|
||||||
|
|
||||||
|
if (counters.containsKey(otherKey)) {
|
||||||
|
Long newValue = Math.max(counters.get(otherKey), otherValue);
|
||||||
|
counters.replace(otherKey, newValue);
|
||||||
|
} else {
|
||||||
|
counters.put(otherKey, otherValue);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public GrowOnlyCounter merge(GrowOnlyCounter other) {
|
||||||
|
return new GrowOnlyCounter(this, other);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Long value() {
|
||||||
|
Long globalCount = 0L;
|
||||||
|
for (Long increment : counters.values()) {
|
||||||
|
globalCount += increment;
|
||||||
|
}
|
||||||
|
return globalCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public GrowOnlyCounter optimize() {
|
||||||
|
return new GrowOnlyCounter(counters);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object obj) {
|
||||||
|
if (getClass() != obj.getClass())
|
||||||
|
return false;
|
||||||
|
GrowOnlyCounter other = (GrowOnlyCounter) obj;
|
||||||
|
return value().longValue() == other.value().longValue();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "GrowOnlyCounter [counters= " + counters + ", Value=" + value() + "]";
|
||||||
|
}
|
||||||
|
|
||||||
|
Map<String, Long> getCounters() {
|
||||||
|
return counters;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class Builder {
|
||||||
|
|
||||||
|
private final String myId;
|
||||||
|
|
||||||
|
private Long counter;
|
||||||
|
|
||||||
|
public Builder(GossipManager gossipManager) {
|
||||||
|
myId = gossipManager.getMyself().getId();
|
||||||
|
counter = 0L;
|
||||||
|
}
|
||||||
|
|
||||||
|
public GrowOnlyCounter.Builder increment(Integer count) {
|
||||||
|
counter += count;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -26,6 +26,7 @@ import java.util.List;
|
|||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.apache.gossip.crdt.GrowOnlyCounter;
|
||||||
import org.apache.gossip.crdt.GrowOnlySet;
|
import org.apache.gossip.crdt.GrowOnlySet;
|
||||||
import org.apache.gossip.crdt.OrSet;
|
import org.apache.gossip.crdt.OrSet;
|
||||||
import org.apache.gossip.manager.GossipManager;
|
import org.apache.gossip.manager.GossipManager;
|
||||||
@ -39,6 +40,7 @@ import io.teknek.tunit.TUnit;
|
|||||||
public class DataTest extends AbstractIntegrationBase {
|
public class DataTest extends AbstractIntegrationBase {
|
||||||
|
|
||||||
private String orSetKey = "cror";
|
private String orSetKey = "cror";
|
||||||
|
private String gCounterKey = "crdtgc";
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void dataTest() throws InterruptedException, UnknownHostException, URISyntaxException{
|
public void dataTest() throws InterruptedException, UnknownHostException, URISyntaxException{
|
||||||
@ -88,6 +90,7 @@ public class DataTest extends AbstractIntegrationBase {
|
|||||||
return x.getPayload();
|
return x.getPayload();
|
||||||
}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("c");
|
}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("c");
|
||||||
|
|
||||||
|
|
||||||
givenDifferentDatumsInSet(clients);
|
givenDifferentDatumsInSet(clients);
|
||||||
assertThatListIsMerged(clients);
|
assertThatListIsMerged(clients);
|
||||||
|
|
||||||
@ -95,6 +98,49 @@ public class DataTest extends AbstractIntegrationBase {
|
|||||||
assertThatOrSetIsMerged(clients);
|
assertThatOrSetIsMerged(clients);
|
||||||
dropIt(clients);
|
dropIt(clients);
|
||||||
assertThatOrSetDelIsMerged(clients);
|
assertThatOrSetDelIsMerged(clients);
|
||||||
|
|
||||||
|
|
||||||
|
// test g counter
|
||||||
|
givenDifferentIncrement(clients);
|
||||||
|
assertThatCountIsUpdated(clients, 3);
|
||||||
|
givenIncreaseOther(clients);
|
||||||
|
assertThatCountIsUpdated(clients, 7);
|
||||||
|
|
||||||
|
for (int i = 0; i < clusterMembers; ++i) {
|
||||||
|
clients.get(i).shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void givenDifferentIncrement(final List<GossipManager> clients) {
|
||||||
|
{
|
||||||
|
SharedDataMessage d = new SharedDataMessage();
|
||||||
|
d.setKey(gCounterKey);
|
||||||
|
d.setPayload(new GrowOnlyCounter(new GrowOnlyCounter.Builder(clients.get(0)).increment(1)));
|
||||||
|
d.setExpireAt(Long.MAX_VALUE);
|
||||||
|
d.setTimestamp(System.currentTimeMillis());
|
||||||
|
clients.get(0).merge(d);
|
||||||
|
}
|
||||||
|
{
|
||||||
|
SharedDataMessage d = new SharedDataMessage();
|
||||||
|
d.setKey(gCounterKey);
|
||||||
|
d.setPayload(new GrowOnlyCounter(new GrowOnlyCounter.Builder(clients.get(1)).increment(2)));
|
||||||
|
d.setExpireAt(Long.MAX_VALUE);
|
||||||
|
d.setTimestamp(System.currentTimeMillis());
|
||||||
|
clients.get(1).merge(d);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void givenIncreaseOther(final List<GossipManager> clients) {
|
||||||
|
GrowOnlyCounter gc = (GrowOnlyCounter) clients.get(1).findCrdt(gCounterKey);
|
||||||
|
GrowOnlyCounter gc2 = new GrowOnlyCounter(gc,
|
||||||
|
new GrowOnlyCounter.Builder(clients.get(1)).increment(4));
|
||||||
|
|
||||||
|
SharedDataMessage d = new SharedDataMessage();
|
||||||
|
d.setKey(gCounterKey);
|
||||||
|
d.setPayload(gc2);
|
||||||
|
d.setExpireAt(Long.MAX_VALUE);
|
||||||
|
d.setTimestamp(System.currentTimeMillis());
|
||||||
|
clients.get(1).merge(d);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void givenOrs(List<GossipManager> clients) {
|
private void givenOrs(List<GossipManager> clients) {
|
||||||
@ -148,6 +194,14 @@ public class DataTest extends AbstractIntegrationBase {
|
|||||||
clients.get(1).merge(CrdtMessage("2"));
|
clients.get(1).merge(CrdtMessage("2"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private void assertThatCountIsUpdated(final List<GossipManager> clients, int finalCount) {
|
||||||
|
TUnit.assertThat(() -> {
|
||||||
|
return clients.get(0).findCrdt(gCounterKey);
|
||||||
|
}).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(new GrowOnlyCounter(
|
||||||
|
new GrowOnlyCounter.Builder(clients.get(0)).increment(finalCount)));
|
||||||
|
}
|
||||||
|
|
||||||
private void assertThatListIsMerged(final List<GossipManager> clients){
|
private void assertThatListIsMerged(final List<GossipManager> clients){
|
||||||
TUnit.assertThat(() -> {
|
TUnit.assertThat(() -> {
|
||||||
return clients.get(0).findCrdt("cr");
|
return clients.get(0).findCrdt("cr");
|
||||||
|
@ -0,0 +1,54 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.gossip.crdt;
|
||||||
|
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
public class GrowOnlyCounterTest {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void mergeTest() {
|
||||||
|
|
||||||
|
Map<String, Long> node1Counter = new HashMap<>();
|
||||||
|
node1Counter.put("1", 3L);
|
||||||
|
Map<String, Long> node2Counter = new HashMap<>();
|
||||||
|
node2Counter.put("2", 1L);
|
||||||
|
Map<String, Long> node3Counter = new HashMap<>();
|
||||||
|
node3Counter.put("3", 2L);
|
||||||
|
|
||||||
|
GrowOnlyCounter gCounter1 = new GrowOnlyCounter(node1Counter);
|
||||||
|
GrowOnlyCounter gCounter2 = new GrowOnlyCounter(node2Counter);
|
||||||
|
GrowOnlyCounter gCounter3 = new GrowOnlyCounter(node3Counter);
|
||||||
|
|
||||||
|
// After node 2 receive from node 1
|
||||||
|
gCounter2 = gCounter2.merge(gCounter1);
|
||||||
|
Assert.assertEquals(4, (long) gCounter2.value());
|
||||||
|
|
||||||
|
// After node 3 receive from node 1
|
||||||
|
gCounter3 = gCounter3.merge(gCounter1);
|
||||||
|
Assert.assertEquals(5, (long) gCounter3.value());
|
||||||
|
|
||||||
|
// After node 3 receive from node 2
|
||||||
|
gCounter3 = gCounter3.merge(gCounter2);
|
||||||
|
Assert.assertEquals(6, (long) gCounter3.value());
|
||||||
|
}
|
||||||
|
}
|
Reference in New Issue
Block a user