From 89af0ac11289e7448a00382a0a93c460d9bfce5c Mon Sep 17 00:00:00 2001 From: Maxim Rusak Date: Fri, 30 Jun 2017 10:15:26 +0300 Subject: [PATCH] GOSSIP-66 Implement Crdt 2P-Set --- .../org/apache/gossip/crdt/CrdtModule.java | 8 ++ .../org/apache/gossip/crdt/TwoPhaseSet.java | 115 ++++++++++++++++++ ...tTest.java => AddRemoveStringSetTest.java} | 8 +- .../org/apache/gossip/crdt/LwwSetTest.java | 2 +- .../apache/gossip/crdt/MaxChangeSetTest.java | 2 +- .../org/apache/gossip/crdt/OrSetTest.java | 2 +- .../apache/gossip/crdt/TwoPhaseSetTest.java | 101 +++++++++++++++ .../test/java/org/apache/gossip/DataTest.java | 6 + .../gossip/protocol/json/JacksonTest.java | 12 +- 9 files changed, 248 insertions(+), 8 deletions(-) create mode 100644 gossip-base/src/main/java/org/apache/gossip/crdt/TwoPhaseSet.java rename gossip-base/src/test/java/org/apache/gossip/crdt/{AbstractCRDTStringSetTest.java => AddRemoveStringSetTest.java} (94%) create mode 100644 gossip-base/src/test/java/org/apache/gossip/crdt/TwoPhaseSetTest.java diff --git a/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtModule.java b/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtModule.java index 7ec96e7..ab5cefa 100644 --- a/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtModule.java +++ b/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtModule.java @@ -54,6 +54,13 @@ abstract class MaxChangeSetMixin { @JsonProperty("data") abstract Map getStruct(); } +abstract class TwoPhaseSetMixin { + @JsonCreator + TwoPhaseSetMixin(@JsonProperty("added") Set added, @JsonProperty("removed") Set removed) { } + @JsonProperty("added") abstract Set getAdded(); + @JsonProperty("removed") abstract Set getRemoved(); +} + abstract class GrowOnlySetMixin{ @JsonCreator GrowOnlySetMixin(@JsonProperty("elements") Set elements){ } @@ -93,6 +100,7 @@ public class CrdtModule extends SimpleModule { context.setMixInAnnotations(LwwSet.class, LWWSetMixin.class); context.setMixInAnnotations(LwwSet.Timestamps.class, LWWSetTimestampsMixin.class); context.setMixInAnnotations(MaxChangeSet.class, MaxChangeSetMixin.class); + context.setMixInAnnotations(TwoPhaseSet.class, TwoPhaseSetMixin.class); } } diff --git a/gossip-base/src/main/java/org/apache/gossip/crdt/TwoPhaseSet.java b/gossip-base/src/main/java/org/apache/gossip/crdt/TwoPhaseSet.java new file mode 100644 index 0000000..a1f44a9 --- /dev/null +++ b/gossip-base/src/main/java/org/apache/gossip/crdt/TwoPhaseSet.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.crdt; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.function.BiFunction; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/* + Two-Phase CrdtSet. + You can add element only once and remove only once. + You cannot remove element which is not present. + + Read more: https://github.com/aphyr/meangirls#2p-set + You can view examples of usage in tests: + TwoPhaseSetTest - unit tests + DataTest - integration test with 2 nodes, TwoPhaseSet was serialized/deserialized, sent between nodes, merged +*/ + +public class TwoPhaseSet implements CrdtAddRemoveSet, TwoPhaseSet> { + private final Set added; + private final Set removed; + + public TwoPhaseSet(){ + added = new HashSet<>(); + removed = new HashSet<>(); + } + + @SafeVarargs + public TwoPhaseSet(ElementType... elements){ + this(new HashSet<>(Arrays.asList(elements))); + } + + public TwoPhaseSet(Set set){ + this(); + for (ElementType e : set){ + added.add(e); + } + } + + public TwoPhaseSet(TwoPhaseSet first, TwoPhaseSet second){ + BiFunction, Set, Set> mergeSets = (f, s) -> + Stream.concat(f.stream(), s.stream()).collect(Collectors.toSet()); + + added = mergeSets.apply(first.added, second.added); + removed = mergeSets.apply(first.removed, second.removed); + } + + TwoPhaseSet(Set added, Set removed){ + this.added = added; + this.removed = removed; + } + + Set getAdded(){ + return added; + } + + Set getRemoved(){ + return removed; + } + + public TwoPhaseSet add(ElementType e){ + if (removed.contains(e) || added.contains(e)){ + return this; + } + return this.merge(new TwoPhaseSet<>(e)); + } + + public TwoPhaseSet remove(ElementType e){ + if (removed.contains(e) || !added.contains(e)){ + return this; + } + Set eSet = new HashSet<>(Collections.singletonList(e)); + return this.merge(new TwoPhaseSet<>(eSet, eSet)); + } + + @Override + public TwoPhaseSet merge(TwoPhaseSet other){ + return new TwoPhaseSet<>(this, other); + } + + @Override + public Set value(){ + return added.stream().filter(e -> !removed.contains(e)).collect(Collectors.toSet()); + } + + @Override + public TwoPhaseSet optimize(){ + return new TwoPhaseSet<>(value(), removed); + } + + @Override + public boolean equals(Object obj){ + return this == obj || (obj != null && getClass() == obj.getClass() && value().equals(((TwoPhaseSet) obj).value())); + } +} \ No newline at end of file diff --git a/gossip-base/src/test/java/org/apache/gossip/crdt/AbstractCRDTStringSetTest.java b/gossip-base/src/test/java/org/apache/gossip/crdt/AddRemoveStringSetTest.java similarity index 94% rename from gossip-base/src/test/java/org/apache/gossip/crdt/AbstractCRDTStringSetTest.java rename to gossip-base/src/test/java/org/apache/gossip/crdt/AddRemoveStringSetTest.java index d4db4ce..6dac9df 100644 --- a/gossip-base/src/test/java/org/apache/gossip/crdt/AbstractCRDTStringSetTest.java +++ b/gossip-base/src/test/java/org/apache/gossip/crdt/AddRemoveStringSetTest.java @@ -19,8 +19,9 @@ package org.apache.gossip.crdt; import org.junit.Assert; import org.junit.Before; -import org.junit.Test; import org.junit.Ignore; +import org.junit.Test; + import java.util.HashSet; import java.util.Set; import java.util.stream.Collectors; @@ -28,6 +29,8 @@ import java.util.stream.Stream; /* Abstract test suit to test CrdtSets with Add and Remove operations. + You can use this suite only if your set supports multiple additions/deletions + and has behavior similar to Set in single-threaded environment. It compares them with simple sets, validates add, remove, equals, value, etc. operations To use it you should: 1. subclass this and implement constructors @@ -36,7 +39,8 @@ import java.util.stream.Stream; */ @Ignore -public abstract class AbstractCRDTStringSetTest, SetType>> { +public abstract class AddRemoveStringSetTest, SetType>> { + abstract SetType construct(Set set); abstract SetType construct(); diff --git a/gossip-base/src/test/java/org/apache/gossip/crdt/LwwSetTest.java b/gossip-base/src/test/java/org/apache/gossip/crdt/LwwSetTest.java index 8200b15..c4da83d 100644 --- a/gossip-base/src/test/java/org/apache/gossip/crdt/LwwSetTest.java +++ b/gossip-base/src/test/java/org/apache/gossip/crdt/LwwSetTest.java @@ -27,7 +27,7 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; -public class LwwSetTest extends AbstractCRDTStringSetTest> { +public class LwwSetTest extends AddRemoveStringSetTest> { static private Clock clock = new SystemClock(); LwwSet construct(Set set){ diff --git a/gossip-base/src/test/java/org/apache/gossip/crdt/MaxChangeSetTest.java b/gossip-base/src/test/java/org/apache/gossip/crdt/MaxChangeSetTest.java index 2ba3f09..3828747 100644 --- a/gossip-base/src/test/java/org/apache/gossip/crdt/MaxChangeSetTest.java +++ b/gossip-base/src/test/java/org/apache/gossip/crdt/MaxChangeSetTest.java @@ -25,7 +25,7 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; -public class MaxChangeSetTest extends AbstractCRDTStringSetTest> { +public class MaxChangeSetTest extends AddRemoveStringSetTest> { MaxChangeSet construct(Set set){ return new MaxChangeSet<>(set); } diff --git a/gossip-base/src/test/java/org/apache/gossip/crdt/OrSetTest.java b/gossip-base/src/test/java/org/apache/gossip/crdt/OrSetTest.java index bdaada9..8b21360 100644 --- a/gossip-base/src/test/java/org/apache/gossip/crdt/OrSetTest.java +++ b/gossip-base/src/test/java/org/apache/gossip/crdt/OrSetTest.java @@ -25,7 +25,7 @@ import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; -public class OrSetTest extends AbstractCRDTStringSetTest> { +public class OrSetTest extends AddRemoveStringSetTest> { OrSet construct(){ return new OrSet<>(); } diff --git a/gossip-base/src/test/java/org/apache/gossip/crdt/TwoPhaseSetTest.java b/gossip-base/src/test/java/org/apache/gossip/crdt/TwoPhaseSetTest.java new file mode 100644 index 0000000..3af1920 --- /dev/null +++ b/gossip-base/src/test/java/org/apache/gossip/crdt/TwoPhaseSetTest.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.crdt; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.HashSet; +import java.util.Set; +import java.util.function.BiConsumer; + +public class TwoPhaseSetTest { + + private Set sampleSet; + + @Before + public void setup(){ + sampleSet = new HashSet<>(); + sampleSet.add("a"); + sampleSet.add("b"); + sampleSet.add("d"); + } + + @Test + public void setConstructorTest(){ + Assert.assertEquals(new TwoPhaseSet<>(sampleSet).value(), sampleSet); + } + + @Test + public void valueTest(){ + Set added = new HashSet<>(); + added.add('a'); + added.add('b'); + Set removed = new HashSet<>(); + removed.add('b'); + Assert.assertEquals(new TwoPhaseSet<>(added, removed), new TwoPhaseSet<>('a')); + } + + @Test + public void optimizeTest(){ + TwoPhaseSet set = new TwoPhaseSet<>(sampleSet); + set = set.remove("b"); + Assert.assertEquals(set.optimize(), set); + // check that optimize in this case actually works + Assert.assertTrue(set.optimize().getAdded().size() < set.getAdded().size()); + } + + @Test + public void immutabilityTest(){ + TwoPhaseSet set = new TwoPhaseSet<>(sampleSet); + TwoPhaseSet newSet = set.remove("b"); + Assert.assertNotEquals(set, newSet); + Assert.assertEquals(set, new TwoPhaseSet<>(sampleSet)); + } + + @Test + public void removeMissingAddExistingLimitsTest(){ + BiConsumer, TwoPhaseSet> checkInternals = (f, s) -> { + Assert.assertEquals(s, f); + Assert.assertEquals(s.getRemoved(), f.getRemoved()); + Assert.assertEquals(s.getAdded(), f.getAdded()); + }; + TwoPhaseSet set = new TwoPhaseSet<>(sampleSet); + // remove missing + checkInternals.accept(set, set.remove("e")); + // add existing + checkInternals.accept(set, set.add("a")); + // limits + TwoPhaseSet newSet = set.remove("a"); // allow this remove + Assert.assertEquals(newSet.add("a"), new TwoPhaseSet<>("b", "d")); // discard this add, "a" was added and removed + } + + @Test + public void mergeTest(){ + TwoPhaseSet f = new TwoPhaseSet<>(sampleSet); + TwoPhaseSet s = new TwoPhaseSet<>("a", "c"); + s = s.remove("a"); + TwoPhaseSet res = f.merge(s); + Assert.assertEquals(res, new TwoPhaseSet<>(f, s)); // check two-sets constructor + + // "a" was both added and deleted in second set => it's deleted in result + // "b" and "d" comes from first set and "c" comes from second + Assert.assertEquals(res, new TwoPhaseSet<>("b", "c", "d")); + } +} diff --git a/gossip-itest/src/test/java/org/apache/gossip/DataTest.java b/gossip-itest/src/test/java/org/apache/gossip/DataTest.java index df078aa..c16174f 100644 --- a/gossip-itest/src/test/java/org/apache/gossip/DataTest.java +++ b/gossip-itest/src/test/java/org/apache/gossip/DataTest.java @@ -25,6 +25,7 @@ import org.apache.gossip.crdt.LwwSet; import org.apache.gossip.crdt.MaxChangeSet; import org.apache.gossip.crdt.OrSet; import org.apache.gossip.crdt.PNCounter; +import org.apache.gossip.crdt.TwoPhaseSet; import org.apache.gossip.manager.GossipManager; import org.apache.gossip.manager.GossipManagerBuilder; import org.apache.gossip.model.PerNodeDataMessage; @@ -147,6 +148,11 @@ public class DataTest { crdtSetTest("crmcs", MaxChangeSet::new); } + @Test + public void TwoPhaseSetTest(){ + crdtSetTest("crtps", TwoPhaseSet::new); + } + @Test public void GrowOnlyCounterTest(){ Consumer assertCountUpdated = count -> { diff --git a/gossip-protocol-jackson/src/test/java/org/apache/gossip/protocol/json/JacksonTest.java b/gossip-protocol-jackson/src/test/java/org/apache/gossip/protocol/json/JacksonTest.java index d391fa1..2a5239c 100644 --- a/gossip-protocol-jackson/src/test/java/org/apache/gossip/protocol/json/JacksonTest.java +++ b/gossip-protocol-jackson/src/test/java/org/apache/gossip/protocol/json/JacksonTest.java @@ -25,6 +25,7 @@ import org.apache.gossip.Member; import org.apache.gossip.crdt.LwwSet; import org.apache.gossip.crdt.MaxChangeSet; import org.apache.gossip.crdt.OrSet; +import org.apache.gossip.crdt.TwoPhaseSet; import org.apache.gossip.manager.GossipManager; import org.apache.gossip.manager.GossipManagerBuilder; import org.apache.gossip.protocol.ProtocolManager; @@ -98,17 +99,22 @@ public class JacksonTest { @Test public void jacksonOrSetTest(){ - jacksonCrdtSeDeTest(new OrSet<>("1", "2", "3"), OrSet.class); + jacksonCrdtSeDeTest(new OrSet<>("1", "2", "3").remove("2"), OrSet.class); } @Test public void jacksonLWWSetTest(){ - jacksonCrdtSeDeTest(new LwwSet<>("1", "2", "3"), LwwSet.class); + jacksonCrdtSeDeTest(new LwwSet<>("1", "2", "3").remove("2"), LwwSet.class); } @Test public void jacksonMaxChangeSetTest(){ - jacksonCrdtSeDeTest(new MaxChangeSet<>("1", "2", "3"), MaxChangeSet.class); + jacksonCrdtSeDeTest(new MaxChangeSet<>("1", "2", "3").remove("2"), MaxChangeSet.class); + } + + @Test + public void jacksonTwoPhaseSetTest(){ + jacksonCrdtSeDeTest(new TwoPhaseSet<>("1", "2", "3").remove("2"), TwoPhaseSet.class); } @Test