GOSSIP-71 not merging correctly (egc & maxim)
This commit is contained in:
@ -17,16 +17,9 @@
|
||||
*/
|
||||
package org.apache.gossip.crdt;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.*;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.function.BiConsumer;
|
||||
|
||||
import org.apache.gossip.crdt.OrSet.Builder.Operation;
|
||||
|
||||
@ -86,11 +79,34 @@ public class OrSet<E> implements Crdt<Set<E>, OrSet<E>> {
|
||||
val = computeValue();
|
||||
}
|
||||
|
||||
static Set<UUID> mergeSets(Set<UUID> a, Set<UUID> b) {
|
||||
if ((a == null || a.isEmpty()) && (b == null || b.isEmpty())) {
|
||||
return null;
|
||||
}
|
||||
Set<UUID> res = new HashSet<>(a);
|
||||
res.addAll(b);
|
||||
return res;
|
||||
}
|
||||
|
||||
private void internalSetMerge(Map<E, Set<UUID>> map, E key, Set<UUID> value) {
|
||||
if (value == null) {
|
||||
return;
|
||||
}
|
||||
map.merge(key, value, OrSet::mergeSets);
|
||||
}
|
||||
|
||||
public OrSet(OrSet<E> left, OrSet<E> right){
|
||||
elements.putAll(left.elements);
|
||||
elements.putAll(right.elements);
|
||||
tombstones.putAll(left.tombstones);
|
||||
tombstones.putAll(right.tombstones);
|
||||
BiConsumer<Map<E, Set<UUID>>, Map<E, Set<UUID>>> internalMerge = (items, other) -> {
|
||||
for (Entry<E, Set<UUID>> l : other.entrySet()){
|
||||
internalSetMerge(items, l.getKey(), l.getValue());
|
||||
}
|
||||
};
|
||||
|
||||
internalMerge.accept(elements, left.elements);
|
||||
internalMerge.accept(elements, right.elements);
|
||||
internalMerge.accept(tombstones, left.tombstones);
|
||||
internalMerge.accept(tombstones, right.tombstones);
|
||||
|
||||
val = computeValue();
|
||||
}
|
||||
|
||||
@ -103,29 +119,14 @@ public class OrSet<E> implements Crdt<Set<E>, OrSet<E>> {
|
||||
return new OrSet<E>(this, other);
|
||||
}
|
||||
|
||||
private void internalAdd(E element){
|
||||
Set<UUID> l = elements.get(element);
|
||||
if (l == null){
|
||||
Set<UUID> d = new HashSet<UUID>();
|
||||
d.add(UUID.randomUUID());
|
||||
elements.put(element, d);
|
||||
} else {
|
||||
l.add(UUID.randomUUID());
|
||||
}
|
||||
private void internalAdd(E element) {
|
||||
Set<UUID> toMerge = new HashSet<>();
|
||||
toMerge.add(UUID.randomUUID());
|
||||
internalSetMerge(elements, element, toMerge);
|
||||
}
|
||||
|
||||
private void internalRemove(E element){
|
||||
Set<UUID> elementIds = elements.get(element);
|
||||
if (elementIds == null){
|
||||
//deleting elements not in the list
|
||||
return;
|
||||
}
|
||||
Set<UUID> current = tombstones.get(element);
|
||||
if (current != null){
|
||||
current.addAll(elementIds);
|
||||
} else {
|
||||
tombstones.put(element, elementIds);
|
||||
}
|
||||
internalSetMerge(tombstones, element, elements.get(element));
|
||||
}
|
||||
|
||||
/*
|
||||
@ -134,18 +135,10 @@ public class OrSet<E> implements Crdt<Set<E>, OrSet<E>> {
|
||||
private Set<E> computeValue(){
|
||||
Set<E> values = new HashSet<>();
|
||||
for (Entry<E, Set<UUID>> entry: elements.entrySet()){
|
||||
if (entry.getValue() == null || entry.getValue().size() == 0){
|
||||
continue;
|
||||
}
|
||||
Set<UUID> deleteIds = tombstones.get(entry.getKey());
|
||||
if (deleteIds == null){
|
||||
// if not all tokens for current element are in tombstones
|
||||
if (deleteIds == null || !deleteIds.containsAll(entry.getValue())) {
|
||||
values.add(entry.getKey());
|
||||
} else {
|
||||
if (!deleteIds.containsAll(entry.getValue())){
|
||||
values.add(entry.getKey());
|
||||
} else {
|
||||
//if all the entry uuid is deleted the entry is deleted
|
||||
}
|
||||
}
|
||||
}
|
||||
return values;
|
||||
|
@ -0,0 +1,91 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.gossip.examples;
|
||||
|
||||
import com.codahale.metrics.MetricRegistry;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStreamReader;
|
||||
import java.net.URI;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
|
||||
import org.apache.gossip.GossipService;
|
||||
import org.apache.gossip.GossipSettings;
|
||||
import org.apache.gossip.RemoteGossipMember;
|
||||
import org.apache.gossip.crdt.OrSet;
|
||||
import org.apache.gossip.model.SharedGossipDataMessage;
|
||||
|
||||
public class StandAloneNodeCrdtOrSet {
|
||||
public static void main (String [] args) throws InterruptedException, IOException{
|
||||
GossipSettings s = new GossipSettings();
|
||||
s.setWindowSize(10);
|
||||
s.setConvictThreshold(1.0);
|
||||
s.setGossipInterval(10);
|
||||
GossipService gossipService = new GossipService("mycluster", URI.create(args[0]), args[1], new HashMap<String, String>(),
|
||||
Arrays.asList( new RemoteGossipMember("mycluster", URI.create(args[2]), args[3])), s, (a,b) -> {}, new MetricRegistry());
|
||||
gossipService.start();
|
||||
|
||||
new Thread(() -> {
|
||||
while (true){
|
||||
System.out.println("Live: " + gossipService.getGossipManager().getLiveMembers());
|
||||
System.out.println("Dead: " + gossipService.getGossipManager().getDeadMembers());
|
||||
System.out.println("---------- " + (gossipService.getGossipManager().findCrdt("abc") == null ? "":
|
||||
gossipService.getGossipManager().findCrdt("abc").value()));
|
||||
System.out.println("********** " + gossipService.getGossipManager().findCrdt("abc"));
|
||||
try {
|
||||
Thread.sleep(2000);
|
||||
} catch (Exception e) {}
|
||||
}
|
||||
}).start();
|
||||
|
||||
String line = null;
|
||||
try (BufferedReader br = new BufferedReader(new InputStreamReader(System.in))){
|
||||
while ( (line = br.readLine()) != null){
|
||||
System.out.println(line);
|
||||
char op = line.charAt(0);
|
||||
String val = line.substring(2);
|
||||
if (op == 'a'){
|
||||
addData(val, gossipService);
|
||||
} else {
|
||||
removeData(val, gossipService);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static void removeData(String val, GossipService gossipService){
|
||||
OrSet<String> s = (OrSet<String>) gossipService.getGossipManager().findCrdt("abc");
|
||||
SharedGossipDataMessage m = new SharedGossipDataMessage();
|
||||
m.setExpireAt(Long.MAX_VALUE);
|
||||
m.setKey("abc");
|
||||
m.setPayload(new OrSet<String>(s , new OrSet.Builder<String>().remove(val)));
|
||||
m.setTimestamp(System.currentTimeMillis());
|
||||
gossipService.getGossipManager().merge(m);
|
||||
}
|
||||
|
||||
private static void addData(String val, GossipService gossipService){
|
||||
SharedGossipDataMessage m = new SharedGossipDataMessage();
|
||||
m.setExpireAt(Long.MAX_VALUE);
|
||||
m.setKey("abc");
|
||||
m.setPayload(new OrSet<String>(val));
|
||||
m.setTimestamp(System.currentTimeMillis());
|
||||
gossipService.getGossipManager().merge(m);
|
||||
}
|
||||
}
|
@ -114,23 +114,31 @@ public class GossipCore implements GossipCoreConstants {
|
||||
|
||||
@SuppressWarnings({ "unchecked", "rawtypes" })
|
||||
public void addSharedData(SharedGossipDataMessage message) {
|
||||
SharedGossipDataMessage previous = sharedData.get(message.getKey());
|
||||
if (previous == null) {
|
||||
sharedData.putIfAbsent(message.getKey(), message);
|
||||
} else {
|
||||
while (true){
|
||||
SharedGossipDataMessage previous = sharedData.putIfAbsent(message.getKey(), message);
|
||||
if (previous == null){
|
||||
return;
|
||||
}
|
||||
if (message.getPayload() instanceof Crdt){
|
||||
SharedGossipDataMessage curretnt = sharedData.get(message.getKey());
|
||||
SharedGossipDataMessage merged = new SharedGossipDataMessage();
|
||||
merged.setExpireAt(message.getExpireAt());
|
||||
merged.setKey(curretnt.getKey());
|
||||
merged.setKey(message.getKey());
|
||||
merged.setNodeId(message.getNodeId());
|
||||
merged.setTimestamp(message.getTimestamp());
|
||||
Crdt mergedCrdt = ((Crdt) message.getPayload()).merge((Crdt)curretnt.getPayload());
|
||||
merged.setPayload( mergedCrdt );
|
||||
sharedData.put(curretnt.getKey(), merged);
|
||||
Crdt mergedCrdt = ((Crdt) previous.getPayload()).merge((Crdt) message.getPayload());
|
||||
merged.setPayload(mergedCrdt);
|
||||
boolean replaced = sharedData.replace(message.getKey(), previous, merged);
|
||||
if (replaced){
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
if (previous.getTimestamp() < message.getTimestamp()) {
|
||||
sharedData.replace(message.getKey(), previous, message);
|
||||
if (previous.getTimestamp() < message.getTimestamp()){
|
||||
boolean result = sharedData.replace(message.getKey(), previous, message);
|
||||
if (result){
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -363,8 +371,8 @@ public class GossipCore implements GossipCoreConstants {
|
||||
@SuppressWarnings("rawtypes")
|
||||
public Crdt merge(SharedGossipDataMessage message) {
|
||||
for (;;){
|
||||
SharedGossipDataMessage ret = sharedData.putIfAbsent(message.getKey(), message);
|
||||
if (ret == null){
|
||||
SharedGossipDataMessage previous = sharedData.putIfAbsent(message.getKey(), message);
|
||||
if (previous == null){
|
||||
return (Crdt) message.getPayload();
|
||||
}
|
||||
SharedGossipDataMessage copy = new SharedGossipDataMessage();
|
||||
@ -373,9 +381,9 @@ public class GossipCore implements GossipCoreConstants {
|
||||
copy.setNodeId(message.getNodeId());
|
||||
copy.setTimestamp(message.getTimestamp());
|
||||
@SuppressWarnings("unchecked")
|
||||
Crdt merged = ((Crdt) ret.getPayload()).merge((Crdt) message.getPayload());
|
||||
Crdt merged = ((Crdt) previous.getPayload()).merge((Crdt) message.getPayload());
|
||||
copy.setPayload(merged);
|
||||
boolean replaced = sharedData.replace(message.getKey(), ret, copy);
|
||||
boolean replaced = sharedData.replace(message.getKey(), previous, copy);
|
||||
if (replaced){
|
||||
return merged;
|
||||
}
|
||||
|
@ -317,6 +317,7 @@ public abstract class GossipManager {
|
||||
}
|
||||
return gossipCore.merge(message);
|
||||
}
|
||||
|
||||
public GossipDataMessage findPerNodeGossipData(String nodeId, String key){
|
||||
ConcurrentHashMap<String, GossipDataMessage> j = gossipCore.getPerNodeData().get(nodeId);
|
||||
if (j == null){
|
||||
|
@ -102,4 +102,15 @@ public class OrSetTest {
|
||||
Assert.assertEquals(back, i);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void mergeTestSame() {
|
||||
OrSet<Integer> i = new OrSet<>(19);
|
||||
OrSet<Integer> j = new OrSet<>(19);
|
||||
OrSet<Integer> k = i.merge(j);
|
||||
Assert.assertEquals(2, k.getElements().get(19).size());
|
||||
OrSet<Integer> y = new OrSet<>(k, new OrSet.Builder<Integer>().remove(19));
|
||||
Assert.assertEquals(2, y.getTombstones().get(19).size());
|
||||
Assert.assertEquals(2, y.getElements().get(19).size());
|
||||
Assert.assertEquals(new OrSet<Integer>().value(), y.value());
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user