Merge branch 'GOSSIP-66' of https://github.com/makrusak/incubator-gossip
This commit is contained in:
@ -54,6 +54,13 @@ abstract class MaxChangeSetMixin<E> {
|
||||
@JsonProperty("data") abstract Map<E, Integer> getStruct();
|
||||
}
|
||||
|
||||
abstract class TwoPhaseSetMixin<E> {
|
||||
@JsonCreator
|
||||
TwoPhaseSetMixin(@JsonProperty("added") Set<E> added, @JsonProperty("removed") Set<E> removed) { }
|
||||
@JsonProperty("added") abstract Set<E> getAdded();
|
||||
@JsonProperty("removed") abstract Set<E> getRemoved();
|
||||
}
|
||||
|
||||
abstract class GrowOnlySetMixin<E>{
|
||||
@JsonCreator
|
||||
GrowOnlySetMixin(@JsonProperty("elements") Set<E> elements){ }
|
||||
@ -93,6 +100,7 @@ public class CrdtModule extends SimpleModule {
|
||||
context.setMixInAnnotations(LwwSet.class, LWWSetMixin.class);
|
||||
context.setMixInAnnotations(LwwSet.Timestamps.class, LWWSetTimestampsMixin.class);
|
||||
context.setMixInAnnotations(MaxChangeSet.class, MaxChangeSetMixin.class);
|
||||
context.setMixInAnnotations(TwoPhaseSet.class, TwoPhaseSetMixin.class);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -0,0 +1,115 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.gossip.crdt;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
import java.util.function.BiFunction;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
/*
|
||||
Two-Phase CrdtSet.
|
||||
You can add element only once and remove only once.
|
||||
You cannot remove element which is not present.
|
||||
|
||||
Read more: https://github.com/aphyr/meangirls#2p-set
|
||||
You can view examples of usage in tests:
|
||||
TwoPhaseSetTest - unit tests
|
||||
DataTest - integration test with 2 nodes, TwoPhaseSet was serialized/deserialized, sent between nodes, merged
|
||||
*/
|
||||
|
||||
public class TwoPhaseSet<ElementType> implements CrdtAddRemoveSet<ElementType, Set<ElementType>, TwoPhaseSet<ElementType>> {
|
||||
private final Set<ElementType> added;
|
||||
private final Set<ElementType> removed;
|
||||
|
||||
public TwoPhaseSet(){
|
||||
added = new HashSet<>();
|
||||
removed = new HashSet<>();
|
||||
}
|
||||
|
||||
@SafeVarargs
|
||||
public TwoPhaseSet(ElementType... elements){
|
||||
this(new HashSet<>(Arrays.asList(elements)));
|
||||
}
|
||||
|
||||
public TwoPhaseSet(Set<ElementType> set){
|
||||
this();
|
||||
for (ElementType e : set){
|
||||
added.add(e);
|
||||
}
|
||||
}
|
||||
|
||||
public TwoPhaseSet(TwoPhaseSet<ElementType> first, TwoPhaseSet<ElementType> second){
|
||||
BiFunction<Set<ElementType>, Set<ElementType>, Set<ElementType>> mergeSets = (f, s) ->
|
||||
Stream.concat(f.stream(), s.stream()).collect(Collectors.toSet());
|
||||
|
||||
added = mergeSets.apply(first.added, second.added);
|
||||
removed = mergeSets.apply(first.removed, second.removed);
|
||||
}
|
||||
|
||||
TwoPhaseSet(Set<ElementType> added, Set<ElementType> removed){
|
||||
this.added = added;
|
||||
this.removed = removed;
|
||||
}
|
||||
|
||||
Set<ElementType> getAdded(){
|
||||
return added;
|
||||
}
|
||||
|
||||
Set<ElementType> getRemoved(){
|
||||
return removed;
|
||||
}
|
||||
|
||||
public TwoPhaseSet<ElementType> add(ElementType e){
|
||||
if (removed.contains(e) || added.contains(e)){
|
||||
return this;
|
||||
}
|
||||
return this.merge(new TwoPhaseSet<>(e));
|
||||
}
|
||||
|
||||
public TwoPhaseSet<ElementType> remove(ElementType e){
|
||||
if (removed.contains(e) || !added.contains(e)){
|
||||
return this;
|
||||
}
|
||||
Set<ElementType> eSet = new HashSet<>(Collections.singletonList(e));
|
||||
return this.merge(new TwoPhaseSet<>(eSet, eSet));
|
||||
}
|
||||
|
||||
@Override
|
||||
public TwoPhaseSet<ElementType> merge(TwoPhaseSet<ElementType> other){
|
||||
return new TwoPhaseSet<>(this, other);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<ElementType> value(){
|
||||
return added.stream().filter(e -> !removed.contains(e)).collect(Collectors.toSet());
|
||||
}
|
||||
|
||||
@Override
|
||||
public TwoPhaseSet<ElementType> optimize(){
|
||||
return new TwoPhaseSet<>(value(), removed);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj){
|
||||
return this == obj || (obj != null && getClass() == obj.getClass() && value().equals(((TwoPhaseSet) obj).value()));
|
||||
}
|
||||
}
|
@ -19,8 +19,9 @@ package org.apache.gossip.crdt;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
@ -28,6 +29,8 @@ import java.util.stream.Stream;
|
||||
|
||||
/*
|
||||
Abstract test suit to test CrdtSets with Add and Remove operations.
|
||||
You can use this suite only if your set supports multiple additions/deletions
|
||||
and has behavior similar to Set in single-threaded environment.
|
||||
It compares them with simple sets, validates add, remove, equals, value, etc. operations
|
||||
To use it you should:
|
||||
1. subclass this and implement constructors
|
||||
@ -36,7 +39,8 @@ import java.util.stream.Stream;
|
||||
*/
|
||||
|
||||
@Ignore
|
||||
public abstract class AbstractCRDTStringSetTest<SetType extends CrdtAddRemoveSet<String, Set<String>, SetType>> {
|
||||
public abstract class AddRemoveStringSetTest<SetType extends CrdtAddRemoveSet<String, Set<String>, SetType>> {
|
||||
|
||||
abstract SetType construct(Set<String> set);
|
||||
|
||||
abstract SetType construct();
|
@ -27,7 +27,7 @@ import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
public class LwwSetTest extends AbstractCRDTStringSetTest<LwwSet<String>> {
|
||||
public class LwwSetTest extends AddRemoveStringSetTest<LwwSet<String>> {
|
||||
static private Clock clock = new SystemClock();
|
||||
|
||||
LwwSet<String> construct(Set<String> set){
|
||||
|
@ -25,7 +25,7 @@ import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
public class MaxChangeSetTest extends AbstractCRDTStringSetTest<MaxChangeSet<String>> {
|
||||
public class MaxChangeSetTest extends AddRemoveStringSetTest<MaxChangeSet<String>> {
|
||||
MaxChangeSet<String> construct(Set<String> set){
|
||||
return new MaxChangeSet<>(set);
|
||||
}
|
||||
|
@ -25,7 +25,7 @@ import java.util.Set;
|
||||
import java.util.SortedSet;
|
||||
import java.util.TreeSet;
|
||||
|
||||
public class OrSetTest extends AbstractCRDTStringSetTest<OrSet<String>> {
|
||||
public class OrSetTest extends AddRemoveStringSetTest<OrSet<String>> {
|
||||
OrSet<String> construct(){
|
||||
return new OrSet<>();
|
||||
}
|
||||
|
@ -0,0 +1,101 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.gossip.crdt;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
import java.util.function.BiConsumer;
|
||||
|
||||
public class TwoPhaseSetTest {
|
||||
|
||||
private Set<String> sampleSet;
|
||||
|
||||
@Before
|
||||
public void setup(){
|
||||
sampleSet = new HashSet<>();
|
||||
sampleSet.add("a");
|
||||
sampleSet.add("b");
|
||||
sampleSet.add("d");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void setConstructorTest(){
|
||||
Assert.assertEquals(new TwoPhaseSet<>(sampleSet).value(), sampleSet);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void valueTest(){
|
||||
Set<Character> added = new HashSet<>();
|
||||
added.add('a');
|
||||
added.add('b');
|
||||
Set<Character> removed = new HashSet<>();
|
||||
removed.add('b');
|
||||
Assert.assertEquals(new TwoPhaseSet<>(added, removed), new TwoPhaseSet<>('a'));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void optimizeTest(){
|
||||
TwoPhaseSet<String> set = new TwoPhaseSet<>(sampleSet);
|
||||
set = set.remove("b");
|
||||
Assert.assertEquals(set.optimize(), set);
|
||||
// check that optimize in this case actually works
|
||||
Assert.assertTrue(set.optimize().getAdded().size() < set.getAdded().size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void immutabilityTest(){
|
||||
TwoPhaseSet<String> set = new TwoPhaseSet<>(sampleSet);
|
||||
TwoPhaseSet<String> newSet = set.remove("b");
|
||||
Assert.assertNotEquals(set, newSet);
|
||||
Assert.assertEquals(set, new TwoPhaseSet<>(sampleSet));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void removeMissingAddExistingLimitsTest(){
|
||||
BiConsumer<TwoPhaseSet<?>, TwoPhaseSet<?>> checkInternals = (f, s) -> {
|
||||
Assert.assertEquals(s, f);
|
||||
Assert.assertEquals(s.getRemoved(), f.getRemoved());
|
||||
Assert.assertEquals(s.getAdded(), f.getAdded());
|
||||
};
|
||||
TwoPhaseSet<String> set = new TwoPhaseSet<>(sampleSet);
|
||||
// remove missing
|
||||
checkInternals.accept(set, set.remove("e"));
|
||||
// add existing
|
||||
checkInternals.accept(set, set.add("a"));
|
||||
// limits
|
||||
TwoPhaseSet<String> newSet = set.remove("a"); // allow this remove
|
||||
Assert.assertEquals(newSet.add("a"), new TwoPhaseSet<>("b", "d")); // discard this add, "a" was added and removed
|
||||
}
|
||||
|
||||
@Test
|
||||
public void mergeTest(){
|
||||
TwoPhaseSet<String> f = new TwoPhaseSet<>(sampleSet);
|
||||
TwoPhaseSet<String> s = new TwoPhaseSet<>("a", "c");
|
||||
s = s.remove("a");
|
||||
TwoPhaseSet<String> res = f.merge(s);
|
||||
Assert.assertEquals(res, new TwoPhaseSet<>(f, s)); // check two-sets constructor
|
||||
|
||||
// "a" was both added and deleted in second set => it's deleted in result
|
||||
// "b" and "d" comes from first set and "c" comes from second
|
||||
Assert.assertEquals(res, new TwoPhaseSet<>("b", "c", "d"));
|
||||
}
|
||||
}
|
@ -25,6 +25,7 @@ import org.apache.gossip.crdt.LwwSet;
|
||||
import org.apache.gossip.crdt.MaxChangeSet;
|
||||
import org.apache.gossip.crdt.OrSet;
|
||||
import org.apache.gossip.crdt.PNCounter;
|
||||
import org.apache.gossip.crdt.TwoPhaseSet;
|
||||
import org.apache.gossip.manager.GossipManager;
|
||||
import org.apache.gossip.manager.GossipManagerBuilder;
|
||||
import org.apache.gossip.model.PerNodeDataMessage;
|
||||
@ -147,6 +148,11 @@ public class DataTest {
|
||||
crdtSetTest("crmcs", MaxChangeSet::new);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void TwoPhaseSetTest(){
|
||||
crdtSetTest("crtps", TwoPhaseSet::new);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void GrowOnlyCounterTest(){
|
||||
Consumer<Long> assertCountUpdated = count -> {
|
||||
|
@ -25,6 +25,7 @@ import org.apache.gossip.Member;
|
||||
import org.apache.gossip.crdt.LwwSet;
|
||||
import org.apache.gossip.crdt.MaxChangeSet;
|
||||
import org.apache.gossip.crdt.OrSet;
|
||||
import org.apache.gossip.crdt.TwoPhaseSet;
|
||||
import org.apache.gossip.manager.GossipManager;
|
||||
import org.apache.gossip.manager.GossipManagerBuilder;
|
||||
import org.apache.gossip.protocol.ProtocolManager;
|
||||
@ -98,17 +99,22 @@ public class JacksonTest {
|
||||
|
||||
@Test
|
||||
public void jacksonOrSetTest(){
|
||||
jacksonCrdtSeDeTest(new OrSet<>("1", "2", "3"), OrSet.class);
|
||||
jacksonCrdtSeDeTest(new OrSet<>("1", "2", "3").remove("2"), OrSet.class);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void jacksonLWWSetTest(){
|
||||
jacksonCrdtSeDeTest(new LwwSet<>("1", "2", "3"), LwwSet.class);
|
||||
jacksonCrdtSeDeTest(new LwwSet<>("1", "2", "3").remove("2"), LwwSet.class);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void jacksonMaxChangeSetTest(){
|
||||
jacksonCrdtSeDeTest(new MaxChangeSet<>("1", "2", "3"), MaxChangeSet.class);
|
||||
jacksonCrdtSeDeTest(new MaxChangeSet<>("1", "2", "3").remove("2"), MaxChangeSet.class);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void jacksonTwoPhaseSetTest(){
|
||||
jacksonCrdtSeDeTest(new TwoPhaseSet<>("1", "2", "3").remove("2"), TwoPhaseSet.class);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
Reference in New Issue
Block a user