GOSSIP-66 Implement Crdt 2P-Set

This commit is contained in:
Maxim Rusak
2017-06-30 10:15:26 +03:00
parent f71460ab3d
commit 89af0ac112
9 changed files with 248 additions and 8 deletions

View File

@ -54,6 +54,13 @@ abstract class MaxChangeSetMixin<E> {
@JsonProperty("data") abstract Map<E, Integer> getStruct(); @JsonProperty("data") abstract Map<E, Integer> getStruct();
} }
abstract class TwoPhaseSetMixin<E> {
@JsonCreator
TwoPhaseSetMixin(@JsonProperty("added") Set<E> added, @JsonProperty("removed") Set<E> removed) { }
@JsonProperty("added") abstract Set<E> getAdded();
@JsonProperty("removed") abstract Set<E> getRemoved();
}
abstract class GrowOnlySetMixin<E>{ abstract class GrowOnlySetMixin<E>{
@JsonCreator @JsonCreator
GrowOnlySetMixin(@JsonProperty("elements") Set<E> elements){ } GrowOnlySetMixin(@JsonProperty("elements") Set<E> elements){ }
@ -93,6 +100,7 @@ public class CrdtModule extends SimpleModule {
context.setMixInAnnotations(LwwSet.class, LWWSetMixin.class); context.setMixInAnnotations(LwwSet.class, LWWSetMixin.class);
context.setMixInAnnotations(LwwSet.Timestamps.class, LWWSetTimestampsMixin.class); context.setMixInAnnotations(LwwSet.Timestamps.class, LWWSetTimestampsMixin.class);
context.setMixInAnnotations(MaxChangeSet.class, MaxChangeSetMixin.class); context.setMixInAnnotations(MaxChangeSet.class, MaxChangeSetMixin.class);
context.setMixInAnnotations(TwoPhaseSet.class, TwoPhaseSetMixin.class);
} }
} }

View File

@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.crdt;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/*
Two-Phase CrdtSet.
You can add element only once and remove only once.
You cannot remove element which is not present.
Read more: https://github.com/aphyr/meangirls#2p-set
You can view examples of usage in tests:
TwoPhaseSetTest - unit tests
DataTest - integration test with 2 nodes, TwoPhaseSet was serialized/deserialized, sent between nodes, merged
*/
public class TwoPhaseSet<ElementType> implements CrdtAddRemoveSet<ElementType, Set<ElementType>, TwoPhaseSet<ElementType>> {
private final Set<ElementType> added;
private final Set<ElementType> removed;
public TwoPhaseSet(){
added = new HashSet<>();
removed = new HashSet<>();
}
@SafeVarargs
public TwoPhaseSet(ElementType... elements){
this(new HashSet<>(Arrays.asList(elements)));
}
public TwoPhaseSet(Set<ElementType> set){
this();
for (ElementType e : set){
added.add(e);
}
}
public TwoPhaseSet(TwoPhaseSet<ElementType> first, TwoPhaseSet<ElementType> second){
BiFunction<Set<ElementType>, Set<ElementType>, Set<ElementType>> mergeSets = (f, s) ->
Stream.concat(f.stream(), s.stream()).collect(Collectors.toSet());
added = mergeSets.apply(first.added, second.added);
removed = mergeSets.apply(first.removed, second.removed);
}
TwoPhaseSet(Set<ElementType> added, Set<ElementType> removed){
this.added = added;
this.removed = removed;
}
Set<ElementType> getAdded(){
return added;
}
Set<ElementType> getRemoved(){
return removed;
}
public TwoPhaseSet<ElementType> add(ElementType e){
if (removed.contains(e) || added.contains(e)){
return this;
}
return this.merge(new TwoPhaseSet<>(e));
}
public TwoPhaseSet<ElementType> remove(ElementType e){
if (removed.contains(e) || !added.contains(e)){
return this;
}
Set<ElementType> eSet = new HashSet<>(Collections.singletonList(e));
return this.merge(new TwoPhaseSet<>(eSet, eSet));
}
@Override
public TwoPhaseSet<ElementType> merge(TwoPhaseSet<ElementType> other){
return new TwoPhaseSet<>(this, other);
}
@Override
public Set<ElementType> value(){
return added.stream().filter(e -> !removed.contains(e)).collect(Collectors.toSet());
}
@Override
public TwoPhaseSet<ElementType> optimize(){
return new TwoPhaseSet<>(value(), removed);
}
@Override
public boolean equals(Object obj){
return this == obj || (obj != null && getClass() == obj.getClass() && value().equals(((TwoPhaseSet) obj).value()));
}
}

View File

@ -19,8 +19,9 @@ package org.apache.gossip.crdt;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test;
import org.junit.Ignore; import org.junit.Ignore;
import org.junit.Test;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -28,6 +29,8 @@ import java.util.stream.Stream;
/* /*
Abstract test suit to test CrdtSets with Add and Remove operations. Abstract test suit to test CrdtSets with Add and Remove operations.
You can use this suite only if your set supports multiple additions/deletions
and has behavior similar to Set in single-threaded environment.
It compares them with simple sets, validates add, remove, equals, value, etc. operations It compares them with simple sets, validates add, remove, equals, value, etc. operations
To use it you should: To use it you should:
1. subclass this and implement constructors 1. subclass this and implement constructors
@ -36,7 +39,8 @@ import java.util.stream.Stream;
*/ */
@Ignore @Ignore
public abstract class AbstractCRDTStringSetTest<SetType extends CrdtAddRemoveSet<String, Set<String>, SetType>> { public abstract class AddRemoveStringSetTest<SetType extends CrdtAddRemoveSet<String, Set<String>, SetType>> {
abstract SetType construct(Set<String> set); abstract SetType construct(Set<String> set);
abstract SetType construct(); abstract SetType construct();

View File

@ -27,7 +27,7 @@ import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
public class LwwSetTest extends AbstractCRDTStringSetTest<LwwSet<String>> { public class LwwSetTest extends AddRemoveStringSetTest<LwwSet<String>> {
static private Clock clock = new SystemClock(); static private Clock clock = new SystemClock();
LwwSet<String> construct(Set<String> set){ LwwSet<String> construct(Set<String> set){

View File

@ -25,7 +25,7 @@ import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
public class MaxChangeSetTest extends AbstractCRDTStringSetTest<MaxChangeSet<String>> { public class MaxChangeSetTest extends AddRemoveStringSetTest<MaxChangeSet<String>> {
MaxChangeSet<String> construct(Set<String> set){ MaxChangeSet<String> construct(Set<String> set){
return new MaxChangeSet<>(set); return new MaxChangeSet<>(set);
} }

View File

@ -25,7 +25,7 @@ import java.util.Set;
import java.util.SortedSet; import java.util.SortedSet;
import java.util.TreeSet; import java.util.TreeSet;
public class OrSetTest extends AbstractCRDTStringSetTest<OrSet<String>> { public class OrSetTest extends AddRemoveStringSetTest<OrSet<String>> {
OrSet<String> construct(){ OrSet<String> construct(){
return new OrSet<>(); return new OrSet<>();
} }

View File

@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.crdt;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.HashSet;
import java.util.Set;
import java.util.function.BiConsumer;
public class TwoPhaseSetTest {
private Set<String> sampleSet;
@Before
public void setup(){
sampleSet = new HashSet<>();
sampleSet.add("a");
sampleSet.add("b");
sampleSet.add("d");
}
@Test
public void setConstructorTest(){
Assert.assertEquals(new TwoPhaseSet<>(sampleSet).value(), sampleSet);
}
@Test
public void valueTest(){
Set<Character> added = new HashSet<>();
added.add('a');
added.add('b');
Set<Character> removed = new HashSet<>();
removed.add('b');
Assert.assertEquals(new TwoPhaseSet<>(added, removed), new TwoPhaseSet<>('a'));
}
@Test
public void optimizeTest(){
TwoPhaseSet<String> set = new TwoPhaseSet<>(sampleSet);
set = set.remove("b");
Assert.assertEquals(set.optimize(), set);
// check that optimize in this case actually works
Assert.assertTrue(set.optimize().getAdded().size() < set.getAdded().size());
}
@Test
public void immutabilityTest(){
TwoPhaseSet<String> set = new TwoPhaseSet<>(sampleSet);
TwoPhaseSet<String> newSet = set.remove("b");
Assert.assertNotEquals(set, newSet);
Assert.assertEquals(set, new TwoPhaseSet<>(sampleSet));
}
@Test
public void removeMissingAddExistingLimitsTest(){
BiConsumer<TwoPhaseSet<?>, TwoPhaseSet<?>> checkInternals = (f, s) -> {
Assert.assertEquals(s, f);
Assert.assertEquals(s.getRemoved(), f.getRemoved());
Assert.assertEquals(s.getAdded(), f.getAdded());
};
TwoPhaseSet<String> set = new TwoPhaseSet<>(sampleSet);
// remove missing
checkInternals.accept(set, set.remove("e"));
// add existing
checkInternals.accept(set, set.add("a"));
// limits
TwoPhaseSet<String> newSet = set.remove("a"); // allow this remove
Assert.assertEquals(newSet.add("a"), new TwoPhaseSet<>("b", "d")); // discard this add, "a" was added and removed
}
@Test
public void mergeTest(){
TwoPhaseSet<String> f = new TwoPhaseSet<>(sampleSet);
TwoPhaseSet<String> s = new TwoPhaseSet<>("a", "c");
s = s.remove("a");
TwoPhaseSet<String> res = f.merge(s);
Assert.assertEquals(res, new TwoPhaseSet<>(f, s)); // check two-sets constructor
// "a" was both added and deleted in second set => it's deleted in result
// "b" and "d" comes from first set and "c" comes from second
Assert.assertEquals(res, new TwoPhaseSet<>("b", "c", "d"));
}
}

View File

@ -25,6 +25,7 @@ import org.apache.gossip.crdt.LwwSet;
import org.apache.gossip.crdt.MaxChangeSet; import org.apache.gossip.crdt.MaxChangeSet;
import org.apache.gossip.crdt.OrSet; import org.apache.gossip.crdt.OrSet;
import org.apache.gossip.crdt.PNCounter; import org.apache.gossip.crdt.PNCounter;
import org.apache.gossip.crdt.TwoPhaseSet;
import org.apache.gossip.manager.GossipManager; import org.apache.gossip.manager.GossipManager;
import org.apache.gossip.manager.GossipManagerBuilder; import org.apache.gossip.manager.GossipManagerBuilder;
import org.apache.gossip.model.PerNodeDataMessage; import org.apache.gossip.model.PerNodeDataMessage;
@ -147,6 +148,11 @@ public class DataTest {
crdtSetTest("crmcs", MaxChangeSet::new); crdtSetTest("crmcs", MaxChangeSet::new);
} }
@Test
public void TwoPhaseSetTest(){
crdtSetTest("crtps", TwoPhaseSet::new);
}
@Test @Test
public void GrowOnlyCounterTest(){ public void GrowOnlyCounterTest(){
Consumer<Long> assertCountUpdated = count -> { Consumer<Long> assertCountUpdated = count -> {

View File

@ -25,6 +25,7 @@ import org.apache.gossip.Member;
import org.apache.gossip.crdt.LwwSet; import org.apache.gossip.crdt.LwwSet;
import org.apache.gossip.crdt.MaxChangeSet; import org.apache.gossip.crdt.MaxChangeSet;
import org.apache.gossip.crdt.OrSet; import org.apache.gossip.crdt.OrSet;
import org.apache.gossip.crdt.TwoPhaseSet;
import org.apache.gossip.manager.GossipManager; import org.apache.gossip.manager.GossipManager;
import org.apache.gossip.manager.GossipManagerBuilder; import org.apache.gossip.manager.GossipManagerBuilder;
import org.apache.gossip.protocol.ProtocolManager; import org.apache.gossip.protocol.ProtocolManager;
@ -98,17 +99,22 @@ public class JacksonTest {
@Test @Test
public void jacksonOrSetTest(){ public void jacksonOrSetTest(){
jacksonCrdtSeDeTest(new OrSet<>("1", "2", "3"), OrSet.class); jacksonCrdtSeDeTest(new OrSet<>("1", "2", "3").remove("2"), OrSet.class);
} }
@Test @Test
public void jacksonLWWSetTest(){ public void jacksonLWWSetTest(){
jacksonCrdtSeDeTest(new LwwSet<>("1", "2", "3"), LwwSet.class); jacksonCrdtSeDeTest(new LwwSet<>("1", "2", "3").remove("2"), LwwSet.class);
} }
@Test @Test
public void jacksonMaxChangeSetTest(){ public void jacksonMaxChangeSetTest(){
jacksonCrdtSeDeTest(new MaxChangeSet<>("1", "2", "3"), MaxChangeSet.class); jacksonCrdtSeDeTest(new MaxChangeSet<>("1", "2", "3").remove("2"), MaxChangeSet.class);
}
@Test
public void jacksonTwoPhaseSetTest(){
jacksonCrdtSeDeTest(new TwoPhaseSet<>("1", "2", "3").remove("2"), TwoPhaseSet.class);
} }
@Test @Test