From 22b9e756d4b16f6108f563ca764934220be3e452 Mon Sep 17 00:00:00 2001 From: Edward Capriolo Date: Mon, 27 Feb 2017 23:54:43 -0500 Subject: [PATCH] GOSSIP-71 not merging correctly (egc & maxim) --- .../java/org/apache/gossip/crdt/OrSet.java | 79 ++++++++-------- .../examples/StandAloneNodeCrdtOrSet.java | 91 +++++++++++++++++++ .../org/apache/gossip/manager/GossipCore.java | 40 ++++---- .../apache/gossip/manager/GossipManager.java | 1 + .../org/apache/gossip/crdt/OrSetTest.java | 13 ++- 5 files changed, 164 insertions(+), 60 deletions(-) create mode 100644 src/main/java/org/apache/gossip/examples/StandAloneNodeCrdtOrSet.java diff --git a/src/main/java/org/apache/gossip/crdt/OrSet.java b/src/main/java/org/apache/gossip/crdt/OrSet.java index 972377f..f84dbc7 100644 --- a/src/main/java/org/apache/gossip/crdt/OrSet.java +++ b/src/main/java/org/apache/gossip/crdt/OrSet.java @@ -17,16 +17,9 @@ */ package org.apache.gossip.crdt; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.Map.Entry; -import java.util.Set; -import java.util.UUID; +import java.util.function.BiConsumer; import org.apache.gossip.crdt.OrSet.Builder.Operation; @@ -86,11 +79,34 @@ public class OrSet implements Crdt, OrSet> { val = computeValue(); } + static Set mergeSets(Set a, Set b) { + if ((a == null || a.isEmpty()) && (b == null || b.isEmpty())) { + return null; + } + Set res = new HashSet<>(a); + res.addAll(b); + return res; + } + + private void internalSetMerge(Map> map, E key, Set value) { + if (value == null) { + return; + } + map.merge(key, value, OrSet::mergeSets); + } + public OrSet(OrSet left, OrSet right){ - elements.putAll(left.elements); - elements.putAll(right.elements); - tombstones.putAll(left.tombstones); - tombstones.putAll(right.tombstones); + BiConsumer>, Map>> internalMerge = (items, other) -> { + for (Entry> l : other.entrySet()){ + internalSetMerge(items, l.getKey(), l.getValue()); + } + }; + + internalMerge.accept(elements, left.elements); + internalMerge.accept(elements, right.elements); + internalMerge.accept(tombstones, left.tombstones); + internalMerge.accept(tombstones, right.tombstones); + val = computeValue(); } @@ -103,29 +119,14 @@ public class OrSet implements Crdt, OrSet> { return new OrSet(this, other); } - private void internalAdd(E element){ - Set l = elements.get(element); - if (l == null){ - Set d = new HashSet(); - d.add(UUID.randomUUID()); - elements.put(element, d); - } else { - l.add(UUID.randomUUID()); - } + private void internalAdd(E element) { + Set toMerge = new HashSet<>(); + toMerge.add(UUID.randomUUID()); + internalSetMerge(elements, element, toMerge); } private void internalRemove(E element){ - Set elementIds = elements.get(element); - if (elementIds == null){ - //deleting elements not in the list - return; - } - Set current = tombstones.get(element); - if (current != null){ - current.addAll(elementIds); - } else { - tombstones.put(element, elementIds); - } + internalSetMerge(tombstones, element, elements.get(element)); } /* @@ -134,18 +135,10 @@ public class OrSet implements Crdt, OrSet> { private Set computeValue(){ Set values = new HashSet<>(); for (Entry> entry: elements.entrySet()){ - if (entry.getValue() == null || entry.getValue().size() == 0){ - continue; - } Set deleteIds = tombstones.get(entry.getKey()); - if (deleteIds == null){ + // if not all tokens for current element are in tombstones + if (deleteIds == null || !deleteIds.containsAll(entry.getValue())) { values.add(entry.getKey()); - } else { - if (!deleteIds.containsAll(entry.getValue())){ - values.add(entry.getKey()); - } else { - //if all the entry uuid is deleted the entry is deleted - } } } return values; diff --git a/src/main/java/org/apache/gossip/examples/StandAloneNodeCrdtOrSet.java b/src/main/java/org/apache/gossip/examples/StandAloneNodeCrdtOrSet.java new file mode 100644 index 0000000..d1c1751 --- /dev/null +++ b/src/main/java/org/apache/gossip/examples/StandAloneNodeCrdtOrSet.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.examples; + +import com.codahale.metrics.MetricRegistry; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.net.URI; +import java.util.Arrays; +import java.util.HashMap; + +import org.apache.gossip.GossipService; +import org.apache.gossip.GossipSettings; +import org.apache.gossip.RemoteGossipMember; +import org.apache.gossip.crdt.OrSet; +import org.apache.gossip.model.SharedGossipDataMessage; + +public class StandAloneNodeCrdtOrSet { + public static void main (String [] args) throws InterruptedException, IOException{ + GossipSettings s = new GossipSettings(); + s.setWindowSize(10); + s.setConvictThreshold(1.0); + s.setGossipInterval(10); + GossipService gossipService = new GossipService("mycluster", URI.create(args[0]), args[1], new HashMap(), + Arrays.asList( new RemoteGossipMember("mycluster", URI.create(args[2]), args[3])), s, (a,b) -> {}, new MetricRegistry()); + gossipService.start(); + + new Thread(() -> { + while (true){ + System.out.println("Live: " + gossipService.getGossipManager().getLiveMembers()); + System.out.println("Dead: " + gossipService.getGossipManager().getDeadMembers()); + System.out.println("---------- " + (gossipService.getGossipManager().findCrdt("abc") == null ? "": + gossipService.getGossipManager().findCrdt("abc").value())); + System.out.println("********** " + gossipService.getGossipManager().findCrdt("abc")); + try { + Thread.sleep(2000); + } catch (Exception e) {} + } + }).start(); + + String line = null; + try (BufferedReader br = new BufferedReader(new InputStreamReader(System.in))){ + while ( (line = br.readLine()) != null){ + System.out.println(line); + char op = line.charAt(0); + String val = line.substring(2); + if (op == 'a'){ + addData(val, gossipService); + } else { + removeData(val, gossipService); + } + } + } + } + + private static void removeData(String val, GossipService gossipService){ + OrSet s = (OrSet) gossipService.getGossipManager().findCrdt("abc"); + SharedGossipDataMessage m = new SharedGossipDataMessage(); + m.setExpireAt(Long.MAX_VALUE); + m.setKey("abc"); + m.setPayload(new OrSet(s , new OrSet.Builder().remove(val))); + m.setTimestamp(System.currentTimeMillis()); + gossipService.getGossipManager().merge(m); + } + + private static void addData(String val, GossipService gossipService){ + SharedGossipDataMessage m = new SharedGossipDataMessage(); + m.setExpireAt(Long.MAX_VALUE); + m.setKey("abc"); + m.setPayload(new OrSet(val)); + m.setTimestamp(System.currentTimeMillis()); + gossipService.getGossipManager().merge(m); + } +} diff --git a/src/main/java/org/apache/gossip/manager/GossipCore.java b/src/main/java/org/apache/gossip/manager/GossipCore.java index dff6413..a24b125 100644 --- a/src/main/java/org/apache/gossip/manager/GossipCore.java +++ b/src/main/java/org/apache/gossip/manager/GossipCore.java @@ -114,28 +114,36 @@ public class GossipCore implements GossipCoreConstants { @SuppressWarnings({ "unchecked", "rawtypes" }) public void addSharedData(SharedGossipDataMessage message) { - SharedGossipDataMessage previous = sharedData.get(message.getKey()); - if (previous == null) { - sharedData.putIfAbsent(message.getKey(), message); - } else { + while (true){ + SharedGossipDataMessage previous = sharedData.putIfAbsent(message.getKey(), message); + if (previous == null){ + return; + } if (message.getPayload() instanceof Crdt){ - SharedGossipDataMessage curretnt = sharedData.get(message.getKey()); SharedGossipDataMessage merged = new SharedGossipDataMessage(); merged.setExpireAt(message.getExpireAt()); - merged.setKey(curretnt.getKey()); + merged.setKey(message.getKey()); merged.setNodeId(message.getNodeId()); merged.setTimestamp(message.getTimestamp()); - Crdt mergedCrdt = ((Crdt) message.getPayload()).merge((Crdt)curretnt.getPayload()); - merged.setPayload( mergedCrdt ); - sharedData.put(curretnt.getKey(), merged); + Crdt mergedCrdt = ((Crdt) previous.getPayload()).merge((Crdt) message.getPayload()); + merged.setPayload(mergedCrdt); + boolean replaced = sharedData.replace(message.getKey(), previous, merged); + if (replaced){ + return; + } } else { - if (previous.getTimestamp() < message.getTimestamp()) { - sharedData.replace(message.getKey(), previous, message); + if (previous.getTimestamp() < message.getTimestamp()){ + boolean result = sharedData.replace(message.getKey(), previous, message); + if (result){ + return; + } + } else { + return; } } } } - + public void addPerNodeData(GossipDataMessage message){ ConcurrentHashMap nodeMap = new ConcurrentHashMap<>(); nodeMap.put(message.getKey(), message); @@ -363,8 +371,8 @@ public class GossipCore implements GossipCoreConstants { @SuppressWarnings("rawtypes") public Crdt merge(SharedGossipDataMessage message) { for (;;){ - SharedGossipDataMessage ret = sharedData.putIfAbsent(message.getKey(), message); - if (ret == null){ + SharedGossipDataMessage previous = sharedData.putIfAbsent(message.getKey(), message); + if (previous == null){ return (Crdt) message.getPayload(); } SharedGossipDataMessage copy = new SharedGossipDataMessage(); @@ -373,9 +381,9 @@ public class GossipCore implements GossipCoreConstants { copy.setNodeId(message.getNodeId()); copy.setTimestamp(message.getTimestamp()); @SuppressWarnings("unchecked") - Crdt merged = ((Crdt) ret.getPayload()).merge((Crdt) message.getPayload()); + Crdt merged = ((Crdt) previous.getPayload()).merge((Crdt) message.getPayload()); copy.setPayload(merged); - boolean replaced = sharedData.replace(message.getKey(), ret, copy); + boolean replaced = sharedData.replace(message.getKey(), previous, copy); if (replaced){ return merged; } diff --git a/src/main/java/org/apache/gossip/manager/GossipManager.java b/src/main/java/org/apache/gossip/manager/GossipManager.java index 0140f00..4b28f2f 100644 --- a/src/main/java/org/apache/gossip/manager/GossipManager.java +++ b/src/main/java/org/apache/gossip/manager/GossipManager.java @@ -317,6 +317,7 @@ public abstract class GossipManager { } return gossipCore.merge(message); } + public GossipDataMessage findPerNodeGossipData(String nodeId, String key){ ConcurrentHashMap j = gossipCore.getPerNodeData().get(nodeId); if (j == null){ diff --git a/src/test/java/org/apache/gossip/crdt/OrSetTest.java b/src/test/java/org/apache/gossip/crdt/OrSetTest.java index 8b8766a..e576764 100644 --- a/src/test/java/org/apache/gossip/crdt/OrSetTest.java +++ b/src/test/java/org/apache/gossip/crdt/OrSetTest.java @@ -102,4 +102,15 @@ public class OrSetTest { Assert.assertEquals(back, i); } -} + @Test + public void mergeTestSame() { + OrSet i = new OrSet<>(19); + OrSet j = new OrSet<>(19); + OrSet k = i.merge(j); + Assert.assertEquals(2, k.getElements().get(19).size()); + OrSet y = new OrSet<>(k, new OrSet.Builder().remove(19)); + Assert.assertEquals(2, y.getTombstones().get(19).size()); + Assert.assertEquals(2, y.getElements().get(19).size()); + Assert.assertEquals(new OrSet().value(), y.value()); + } +} \ No newline at end of file