diff --git a/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtModule.java b/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtModule.java index cfb3f47..bb1a052 100644 --- a/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtModule.java +++ b/gossip-base/src/main/java/org/apache/gossip/crdt/CrdtModule.java @@ -35,6 +35,19 @@ abstract class OrSetMixin { @JsonIgnore abstract boolean isEmpty(); } +abstract class LWWSetMixin { + @JsonCreator + LWWSetMixin(@JsonProperty("data") Map struct) { } + @JsonProperty("data") abstract Map getStruct(); +} + +abstract class LWWSetTimestampsMixin { + @JsonCreator + LWWSetTimestampsMixin(@JsonProperty("add") long latestAdd, @JsonProperty("remove") long latestRemove) { } + @JsonProperty("add") abstract long getLatestAdd(); + @JsonProperty("remove") abstract long getLatestRemove(); +} + abstract class GrowOnlySetMixin{ @JsonCreator GrowOnlySetMixin(@JsonProperty("elements") Set elements){ } @@ -63,6 +76,8 @@ public class CrdtModule extends SimpleModule { context.setMixInAnnotations(OrSet.class, OrSetMixin.class); context.setMixInAnnotations(GrowOnlySet.class, GrowOnlySetMixin.class); context.setMixInAnnotations(GrowOnlyCounter.class, GrowOnlyCounterMixin.class); + context.setMixInAnnotations(LWWSet.class, LWWSetMixin.class); + context.setMixInAnnotations(LWWSet.Timestamps.class, LWWSetTimestampsMixin.class); } } diff --git a/gossip-base/src/main/java/org/apache/gossip/crdt/LWWSet.java b/gossip-base/src/main/java/org/apache/gossip/crdt/LWWSet.java new file mode 100644 index 0000000..b51ce7a --- /dev/null +++ b/gossip-base/src/main/java/org/apache/gossip/crdt/LWWSet.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.crdt; + +import org.apache.gossip.manager.Clock; +import org.apache.gossip.manager.SystemClock; + +import java.util.*; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class LWWSet implements CrdtSet, LWWSet> { + static private Clock clock = new SystemClock(); + + private final Map struct; + + static class Timestamps { + private final long latestAdd; + private final long latestRemove; + + Timestamps(){ + latestAdd = 0; + latestRemove = 0; + } + + Timestamps(long add, long remove){ + latestAdd = add; + latestRemove = remove; + } + + long getLatestAdd() { + return latestAdd; + } + + long getLatestRemove() { + return latestRemove; + } + + // consider element present when addTime >= removeTime, so we prefer add to remove + boolean isPresent(){ + return latestAdd >= latestRemove; + } + + Timestamps updateAdd(){ + return new Timestamps(clock.nanoTime(), latestRemove); + } + + Timestamps updateRemove(){ + return new Timestamps(latestAdd, clock.nanoTime()); + } + + Timestamps merge(Timestamps other){ + if (other == null){ + return this; + } + return new Timestamps(Math.max(latestAdd, other.latestAdd), Math.max(latestRemove, other.latestRemove)); + } + } + + + public LWWSet(){ + struct = new HashMap<>(); + } + + @SafeVarargs + public LWWSet(ElementType... elements){ + this(new HashSet<>(Arrays.asList(elements))); + } + + public LWWSet(Set set){ + struct = new HashMap<>(); + for (ElementType e : set){ + struct.put(e, new Timestamps().updateAdd()); + } + } + + public LWWSet(LWWSet first, LWWSet second){ + Function timestampsFor = p -> { + Timestamps firstTs = first.struct.get(p); + Timestamps secondTs = second.struct.get(p); + if (firstTs == null){ + return secondTs; + } + return firstTs.merge(secondTs); + }; + struct = Stream.concat(first.struct.keySet().stream(), second.struct.keySet().stream()) + .distinct().collect(Collectors.toMap(p -> p, timestampsFor)); + } + + public LWWSet add(ElementType e){ + return this.merge(new LWWSet<>(e)); + } + + // for serialization + LWWSet(Map struct){ + this.struct = struct; + } + + Map getStruct() { + return struct; + } + + + public LWWSet remove(ElementType e){ + Timestamps eTimestamps = struct.get(e); + if (eTimestamps == null || !eTimestamps.isPresent()){ + return this; + } + Map changeMap = new HashMap<>(); + changeMap.put(e, eTimestamps.updateRemove()); + return this.merge(new LWWSet<>(changeMap)); + } + + @Override + public LWWSet merge(LWWSet other){ + return new LWWSet<>(this, other); + } + + @Override + public Set value(){ + return struct.entrySet().stream() + .filter(entry -> entry.getValue().isPresent()) + .map(Map.Entry::getKey) + .collect(Collectors.toSet()); + } + + @Override + public LWWSet optimize(){ + return this; + } + + @Override + public boolean equals(Object obj){ + return this == obj || (obj != null && getClass() == obj.getClass() && value().equals(((LWWSet) obj).value())); + } +} \ No newline at end of file diff --git a/gossip-base/src/test/java/org/apache/gossip/crdt/LWWSetTest.java b/gossip-base/src/test/java/org/apache/gossip/crdt/LWWSetTest.java new file mode 100644 index 0000000..bdd3258 --- /dev/null +++ b/gossip-base/src/test/java/org/apache/gossip/crdt/LWWSetTest.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.crdt; + +import org.apache.gossip.manager.Clock; +import org.apache.gossip.manager.SystemClock; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class LWWSetTest { + static private Clock clock = new SystemClock(); + private Set sampleSet; + + @Before + public void setup(){ + sampleSet = new HashSet<>(); + sampleSet.add(4); + sampleSet.add(5); + sampleSet.add(12); + } + + @Test + public void setConstructorTest(){ + Assert.assertEquals(new LWWSet<>(sampleSet).value(), sampleSet); + } + + @Test + public void stressWithSetTest(){ + Set set = new HashSet<>(); + LWWSet lww = new LWWSet<>(); + for (int it = 0; it < 100; it++){ + LWWSet newLww; + if (it % 5 == 1){ + //deleting existing + Integer forDelete = set.stream().skip((long) (set.size() * Math.random())).findFirst().get(); + newLww = lww.remove(forDelete); + Assert.assertEquals(lww.value(), set); // check old version is immutable + set.remove(forDelete); + } else { + //adding + Integer forAdd = (int) (10000 * Math.random()); + newLww = lww.add(forAdd); + Assert.assertEquals(lww.value(), set); // check old version is immutable + set.add(forAdd); + } + lww = newLww; + Assert.assertEquals(lww.value(), set); + } + } + + @Test + public void equalsTest(){ + LWWSet lww = new LWWSet<>(sampleSet); + Assert.assertFalse(lww.equals(sampleSet)); + LWWSet newLww = lww.add(25); + sampleSet.add(25); + Assert.assertFalse(newLww.equals(lww)); + Assert.assertEquals(new LWWSet<>(sampleSet), newLww); + } + + @Test + public void valueTest() { + Map map = new HashMap<>(); + map.put('a', new LWWSet.Timestamps(1, 0)); + map.put('b', new LWWSet.Timestamps(1, 2)); + map.put('c', new LWWSet.Timestamps(3, 3)); + Set toTest = new HashSet<>(); + toTest.add('a'); // for 'a' addTime > removeTime + toTest.add('c'); // for 'c' times are equal, we prefer add to remove + Assert.assertEquals(new LWWSet<>(map).value(), toTest); + Assert.assertEquals(new LWWSet<>(map), new LWWSet<>('a', 'c')); + } + + @Test + public void removeMissingTest(){ + LWWSet lww = new LWWSet<>(sampleSet); + lww = lww.add(25); + lww = lww.remove(25); + Assert.assertEquals(lww.value(), sampleSet); + lww = lww.remove(25); + lww = lww.add(25); + sampleSet.add(25); + Assert.assertEquals(lww.value(), sampleSet); + } + + @Test + public void stressMergeTest(){ + // in one-process context, add, remove and merge operations of lww are equal to operations of Set + // we've already checked it. Now just check merge + Set set1 = new HashSet<>(), set2 = new HashSet<>(); + LWWSet lww1 = new LWWSet<>(), lww2 = new LWWSet<>(); + + for (int it = 0; it < 100; it++){ + Integer forAdd = (int) (10000 * Math.random()); + if (it % 2 == 0){ + set1.add(forAdd); + lww1 = lww1.add(forAdd); + } else { + set2.add(forAdd); + lww2 = lww2.add(forAdd); + } + } + Assert.assertEquals(lww1.value(), set1); + Assert.assertEquals(lww2.value(), set2); + Set mergedSet = Stream.concat(set1.stream(), set2.stream()).collect(Collectors.toSet()); + Assert.assertEquals(lww1.merge(lww2).value(), mergedSet); + } + + @Test + public void fakeTimeMergeTest(){ + // try to create LWWSet with time from future (simulate other process with its own clock) and validate result + // check remove from the future + Map map = new HashMap<>(); + map.put(25, new LWWSet.Timestamps(clock.nanoTime(), clock.nanoTime() + 100000)); + LWWSet lww = new LWWSet<>(map); + Assert.assertEquals(lww, new LWWSet()); + //create new LWWSet with element 25, and merge with other LWW which has remove in future + Assert.assertEquals(new LWWSet<>(25).merge(lww), new LWWSet()); + + // add in future + map.put(25, new LWWSet.Timestamps(clock.nanoTime() + 100000, 0)); + lww = new LWWSet<>(map); + lww = lww.remove(25); + Assert.assertEquals(lww, new LWWSet<>(25)); // 25 is still here + } + + @Test + public void optimizeTest(){ + Assert.assertEquals(new LWWSet<>(sampleSet).value(), sampleSet); + Assert.assertEquals(new LWWSet<>(sampleSet).optimize().value(), sampleSet); + } +} diff --git a/gossip-itest/src/test/java/org/apache/gossip/DataTest.java b/gossip-itest/src/test/java/org/apache/gossip/DataTest.java index 9fe9aa9..53408f8 100644 --- a/gossip-itest/src/test/java/org/apache/gossip/DataTest.java +++ b/gossip-itest/src/test/java/org/apache/gossip/DataTest.java @@ -17,222 +17,191 @@ */ package org.apache.gossip; -import java.net.URI; -import java.net.URISyntaxException; -import java.net.UnknownHostException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.UUID; -import java.util.concurrent.TimeUnit; - +import io.teknek.tunit.TUnit; import org.apache.gossip.crdt.GrowOnlyCounter; import org.apache.gossip.crdt.GrowOnlySet; +import org.apache.gossip.crdt.LWWSet; import org.apache.gossip.crdt.OrSet; import org.apache.gossip.manager.GossipManager; import org.apache.gossip.manager.GossipManagerBuilder; import org.apache.gossip.model.PerNodeDataMessage; import org.apache.gossip.model.SharedDataMessage; +import org.junit.Assert; import org.junit.Test; -import io.teknek.tunit.TUnit; - +import java.net.URI; +import java.net.URISyntaxException; +import java.net.UnknownHostException; +import java.util.*; +import java.util.concurrent.TimeUnit; + public class DataTest extends AbstractIntegrationBase { - + private String orSetKey = "cror"; + private String lwwSetKey = "crlww"; private String gCounterKey = "crdtgc"; - + @Test public void dataTest() throws InterruptedException, UnknownHostException, URISyntaxException{ GossipSettings settings = new GossipSettings(); - settings.setPersistRingState(false); + settings.setPersistRingState(false); settings.setPersistDataState(false); String cluster = UUID.randomUUID().toString(); int seedNodes = 1; List startupMembers = new ArrayList<>(); - for (int i = 1; i < seedNodes+1; ++i) { + for (int i = 1; i < seedNodes + 1; ++i){ URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i)); startupMembers.add(new RemoteMember(cluster, uri, i + "")); } final List clients = new ArrayList<>(); final int clusterMembers = 2; - for (int i = 1; i < clusterMembers + 1; ++i) { + for (int i = 1; i < clusterMembers + 1; ++i){ URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i)); GossipManager gossipService = GossipManagerBuilder.newBuilder().cluster(cluster).uri(uri) - .id(i + "").gossipMembers(startupMembers).gossipSettings(settings).build(); + .id(i + "").gossipMembers(startupMembers).gossipSettings(settings).build(); clients.add(gossipService); gossipService.init(); register(gossipService); } TUnit.assertThat(() -> { int total = 0; - for (int i = 0; i < clusterMembers; ++i) { + for (int i = 0; i < clusterMembers; ++i){ total += clients.get(i).getLiveMembers().size(); } return total; }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(2); - clients.get(0).gossipPerNodeData(msg()); - clients.get(0).gossipSharedData(sharedMsg()); + clients.get(0).gossipPerNodeData(generatePerNodeMsg("a", "b")); + clients.get(0).gossipSharedData(generateSharedMsg("a", "c")); - TUnit.assertThat(()-> { + TUnit.assertThat(() -> { PerNodeDataMessage x = clients.get(1).findPerNodeGossipData(1 + "", "a"); if (x == null) return ""; else return x.getPayload(); }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("b"); - - TUnit.assertThat(() -> { + + TUnit.assertThat(() -> { SharedDataMessage x = clients.get(1).findSharedGossipData("a"); if (x == null) return ""; else return x.getPayload(); }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("c"); - - + + givenDifferentDatumsInSet(clients); assertThatListIsMerged(clients); - - givenOrs(clients); - assertThatOrSetIsMerged(clients); - dropIt(clients); - assertThatOrSetDelIsMerged(clients); - + testOrSet(clients); + testLWWSet(clients); + // test g counter givenDifferentIncrement(clients); assertThatCountIsUpdated(clients, 3); givenIncreaseOther(clients); assertThatCountIsUpdated(clients, 7); - for (int i = 0; i < clusterMembers; ++i) { + for (int i = 0; i < clusterMembers; ++i){ clients.get(i).shutdown(); } } - - private void givenDifferentIncrement(final List clients) { - { - SharedDataMessage d = new SharedDataMessage(); - d.setKey(gCounterKey); - d.setPayload(new GrowOnlyCounter(new GrowOnlyCounter.Builder(clients.get(0)).increment(1L))); - d.setExpireAt(Long.MAX_VALUE); - d.setTimestamp(System.currentTimeMillis()); - clients.get(0).merge(d); - } - { - SharedDataMessage d = new SharedDataMessage(); - d.setKey(gCounterKey); - d.setPayload(new GrowOnlyCounter(new GrowOnlyCounter.Builder(clients.get(1)).increment(2L))); - d.setExpireAt(Long.MAX_VALUE); - d.setTimestamp(System.currentTimeMillis()); - clients.get(1).merge(d); - } - } - private void givenIncreaseOther(final List clients) { - GrowOnlyCounter gc = (GrowOnlyCounter) clients.get(1).findCrdt(gCounterKey); - GrowOnlyCounter gc2 = new GrowOnlyCounter(gc, - new GrowOnlyCounter.Builder(clients.get(1)).increment(4L)); + private void testOrSet(final List clients){ + //populate + clients.get(0).merge(generateSharedMsg(orSetKey, new OrSet<>("1", "2"))); + clients.get(1).merge(generateSharedMsg(orSetKey, new OrSet<>("3", "4"))); - SharedDataMessage d = new SharedDataMessage(); - d.setKey(gCounterKey); - d.setPayload(gc2); - d.setExpireAt(Long.MAX_VALUE); - d.setTimestamp(System.currentTimeMillis()); - clients.get(1).merge(d); - } + //assert merge + assertMerged(clients.get(0), orSetKey, new OrSet<>("1", "2", "3", "4").value()); + assertMerged(clients.get(1), orSetKey, new OrSet<>("1", "2", "3", "4").value()); - private void givenOrs(List clients) { - { - SharedDataMessage d = new SharedDataMessage(); - d.setKey(orSetKey); - d.setPayload(new OrSet("1", "2")); - d.setExpireAt(Long.MAX_VALUE); - d.setTimestamp(System.currentTimeMillis()); - clients.get(0).merge(d); - } - { - SharedDataMessage d = new SharedDataMessage(); - d.setKey(orSetKey); - d.setPayload(new OrSet("3", "4")); - d.setExpireAt(Long.MAX_VALUE); - d.setTimestamp(System.currentTimeMillis()); - clients.get(1).merge(d); - } - } - - private void dropIt(List clients) { + //drop element @SuppressWarnings("unchecked") OrSet o = (OrSet) clients.get(0).findCrdt(orSetKey); - OrSet o2 = new OrSet(o, new OrSet.Builder().remove("3")); - SharedDataMessage d = new SharedDataMessage(); - d.setKey(orSetKey); - d.setPayload(o2); - d.setExpireAt(Long.MAX_VALUE); - d.setTimestamp(System.currentTimeMillis()); - clients.get(0).merge(d); + OrSet o2 = new OrSet<>(o, new OrSet.Builder().remove("3")); + clients.get(0).merge(generateSharedMsg(orSetKey, o2)); + + //assert deletion + assertMerged(clients.get(0), orSetKey, new OrSet<>("1", "2", "4").value()); + assertMerged(clients.get(1), orSetKey, new OrSet<>("1", "2", "4").value()); } - - private void assertThatOrSetIsMerged(final List clients){ - TUnit.assertThat(() -> { - return clients.get(0).findCrdt(orSetKey).value(); - }).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(new OrSet("1", "2", "3", "4").value()); - TUnit.assertThat(() -> { - return clients.get(1).findCrdt(orSetKey).value(); - }).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(new OrSet("1", "2", "3", "4").value()); + + private void testLWWSet(final List clients){ + //populate + clients.get(0).merge(generateSharedMsg(lwwSetKey, new LWWSet<>("1", "2"))); + clients.get(1).merge(generateSharedMsg(lwwSetKey, new LWWSet<>("3", "4"))); + + //assert merge + assertMerged(clients.get(0), lwwSetKey, new LWWSet<>("1", "2", "3", "4").value()); + assertMerged(clients.get(1), lwwSetKey, new LWWSet<>("1", "2", "3", "4").value()); + + //drop element + @SuppressWarnings("unchecked") + LWWSet lww = (LWWSet) clients.get(0).findCrdt(lwwSetKey); + clients.get(0).merge(generateSharedMsg(lwwSetKey, lww.remove("3"))); + + //assert deletion + assertMerged(clients.get(0), lwwSetKey, new OrSet<>("1", "2", "4").value()); + assertMerged(clients.get(1), lwwSetKey, new OrSet<>("1", "2", "4").value()); } - - private void assertThatOrSetDelIsMerged(final List clients){ - TUnit.assertThat(() -> { - return clients.get(0).findCrdt(orSetKey); - }).afterWaitingAtMost(10, TimeUnit.SECONDS).equals(new OrSet("1", "2", "4")); + + private void givenDifferentIncrement(final List clients){ + Object payload = new GrowOnlyCounter(new GrowOnlyCounter.Builder(clients.get(0)).increment(1L)); + clients.get(0).merge(generateSharedMsg(gCounterKey, payload)); + payload = new GrowOnlyCounter(new GrowOnlyCounter.Builder(clients.get(1)).increment(2L)); + clients.get(1).merge(generateSharedMsg(gCounterKey, payload)); + } + + private void givenIncreaseOther(final List clients){ + GrowOnlyCounter gc = (GrowOnlyCounter) clients.get(1).findCrdt(gCounterKey); + GrowOnlyCounter gc2 = new GrowOnlyCounter(gc, + new GrowOnlyCounter.Builder(clients.get(1)).increment(4L)); + + clients.get(1).merge(generateSharedMsg(gCounterKey, gc2)); + } + + private void assertMerged(final GossipManager client, String key, final Set expected){ + TUnit.assertThat(() -> client.findCrdt(key).value()) + .afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(expected); } private void givenDifferentDatumsInSet(final List clients){ clients.get(0).merge(CrdtMessage("1")); clients.get(1).merge(CrdtMessage("2")); } - - private void assertThatCountIsUpdated(final List clients, long finalCount) { - TUnit.assertThat(() -> { - return clients.get(0).findCrdt(gCounterKey); - }).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(new GrowOnlyCounter( - new GrowOnlyCounter.Builder(clients.get(0)).increment(finalCount))); + + private void assertThatCountIsUpdated(final List clients, long finalCount){ + TUnit.assertThat(() -> clients.get(0).findCrdt(gCounterKey)) + .afterWaitingAtMost(10, TimeUnit.SECONDS) + .isEqualTo(new GrowOnlyCounter(new GrowOnlyCounter.Builder(clients.get(0)).increment(finalCount))); } private void assertThatListIsMerged(final List clients){ - TUnit.assertThat(() -> { - return clients.get(0).findCrdt("cr"); - }).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(new GrowOnlySet(Arrays.asList("1","2"))); + TUnit.assertThat(() -> clients.get(0).findCrdt("cr")) + .afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(new GrowOnlySet<>(Arrays.asList("1", "2"))); } - + private SharedDataMessage CrdtMessage(String item){ + return generateSharedMsg("cr", new GrowOnlySet<>(Arrays.asList(item))); + } + + private PerNodeDataMessage generatePerNodeMsg(String key, Object payload){ + PerNodeDataMessage g = new PerNodeDataMessage(); + g.setExpireAt(Long.MAX_VALUE); + g.setKey(key); + g.setPayload(payload); + g.setTimestamp(System.currentTimeMillis()); + return g; + } + + private SharedDataMessage generateSharedMsg(String key, Object payload){ SharedDataMessage d = new SharedDataMessage(); - d.setKey("cr"); - d.setPayload(new GrowOnlySet( Arrays.asList(item))); + d.setKey(key); + d.setPayload(payload); d.setExpireAt(Long.MAX_VALUE); d.setTimestamp(System.currentTimeMillis()); return d; } - - private PerNodeDataMessage msg(){ - PerNodeDataMessage g = new PerNodeDataMessage(); - g.setExpireAt(Long.MAX_VALUE); - g.setKey("a"); - g.setPayload("b"); - g.setTimestamp(System.currentTimeMillis()); - return g; - } - - private SharedDataMessage sharedMsg(){ - SharedDataMessage g = new SharedDataMessage(); - g.setExpireAt(Long.MAX_VALUE); - g.setKey("a"); - g.setPayload("c"); - g.setTimestamp(System.currentTimeMillis()); - return g; - } - -} +} \ No newline at end of file diff --git a/gossip-protocol-jackson/src/test/java/org/apache/gossip/protocol/json/JacksonTest.java b/gossip-protocol-jackson/src/test/java/org/apache/gossip/protocol/json/JacksonTest.java index cbac460..3c90ea1 100644 --- a/gossip-protocol-jackson/src/test/java/org/apache/gossip/protocol/json/JacksonTest.java +++ b/gossip-protocol-jackson/src/test/java/org/apache/gossip/protocol/json/JacksonTest.java @@ -22,6 +22,7 @@ import com.codahale.metrics.MetricRegistry; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.gossip.GossipSettings; import org.apache.gossip.Member; +import org.apache.gossip.crdt.LWWSet; import org.apache.gossip.crdt.OrSet; import org.apache.gossip.manager.GossipManager; import org.apache.gossip.manager.GossipManagerBuilder; @@ -80,6 +81,22 @@ public class JacksonTest { OrSet back = objectMapper.readValue(s, OrSet.class); Assert.assertEquals(back, i); } + + @Test + public void jacksonCrdtLWWSetTest() { + ObjectMapper objectMapper = JacksonProtocolManager.buildObjectMapper(simpleSettings(new GossipSettings())); + + LWWSet lww = new LWWSet<>("a", "b", "c"); + + try { + String lwwS = objectMapper.writeValueAsString(lww); + @SuppressWarnings("unchecked") + LWWSet parsedLww = objectMapper.readValue(lwwS, LWWSet.class); + Assert.assertEquals(lww, parsedLww); + } catch (Exception e) { + Assert.fail("LWWSet se/de error"); + } + } @Test public void testMessageEqualityAssumptions() {