GOSSIP-59 OrSet implementation

This commit is contained in:
Edward Capriolo
2017-02-22 23:35:14 -05:00
parent b71be5e16a
commit 026b8bb488
9 changed files with 551 additions and 22 deletions

View File

@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.crdt;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.Version;
import com.fasterxml.jackson.databind.module.SimpleModule;
abstract class OrSetMixin<E> {
@JsonCreator
OrSetMixin(@JsonProperty("elements") Map<E, Set<UUID>> w, @JsonProperty("tombstones") Map<E, Set<UUID>> h) { }
@JsonProperty("elements") abstract Map<E, Set<UUID>> getElements();
@JsonProperty("tombstones") abstract Map<E, Set<UUID>> getTombstones();
@JsonIgnore abstract boolean isEmpty();
}
abstract class GrowOnlySetMixin<E>{
@JsonCreator
GrowOnlySetMixin(@JsonProperty("elements") Set<E> elements){ }
@JsonProperty("elements") abstract Set<E> getElements();
@JsonIgnore abstract boolean isEmpty();
}
//If anyone wants to take a stab at this. please have at it
//https://github.com/FasterXML/jackson-datatype-guava/blob/master/src/main/java/com/fasterxml/jackson/datatype/guava/ser/MultimapSerializer.java
public class CrdtModule extends SimpleModule {
private static final long serialVersionUID = 6134836523275023418L;
public CrdtModule() {
super("CrdtModule", new Version(0, 0, 0, "0.0.0", "org.apache.gossip", "gossip"));
}
@Override
public void setupModule(SetupContext context) {
context.setMixInAnnotations(OrSet.class, OrSetMixin.class);
context.setMixInAnnotations(GrowOnlySet.class, GrowOnlySetMixin.class);
}
}

View File

@ -20,7 +20,7 @@ package org.apache.gossip.crdt;
import java.util.Set;
public interface CrdtSet<ElementType, SetType extends Set<ElementType>, R extends CrdtSet<ElementType, SetType, R>>
extends Crdt<SetType, R>, Set<ElementType> {
extends Crdt<SetType, R> {
}

View File

@ -66,72 +66,58 @@ public class GrowOnlySet<ElementType> implements CrdtSet<ElementType, Set<Elemen
return new GrowOnlySet<>(hidden);
}
@Override
public int size() {
return hidden.size();
}
@Override
public boolean isEmpty() {
return hidden.isEmpty();
}
@Override
public boolean contains(Object o) {
return hidden.contains(o);
}
@Override
public Iterator<ElementType> iterator() {
Set<ElementType> copy = new HashSet<>();
copy.addAll(hidden);
return copy.iterator();
}
@Override
public Object[] toArray() {
return hidden.toArray();
}
@Override
public <T> T[] toArray(T[] a) {
return hidden.toArray(a);
}
@Override
public boolean add(ElementType e) {
throw new UnsupportedOperationException();
}
@Override
public boolean remove(Object o) {
throw new UnsupportedOperationException();
}
@Override
public boolean containsAll(Collection<?> c) {
return hidden.containsAll(c);
}
@Override
public boolean addAll(Collection<? extends ElementType> c) {
throw new UnsupportedOperationException();
}
@Override
public boolean retainAll(Collection<?> c) {
throw new UnsupportedOperationException();
}
@Override
public boolean removeAll(Collection<?> c) {
throw new UnsupportedOperationException();
}
@Override
public void clear() {
throw new UnsupportedOperationException();
}
@Override
@ -165,4 +151,7 @@ public class GrowOnlySet<ElementType> implements CrdtSet<ElementType, Set<Elemen
return true;
}
Set<ElementType> getElements(){
return hidden;
}
}

View File

@ -0,0 +1,311 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.crdt;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.UUID;
import org.apache.gossip.crdt.OrSet.Builder.Operation;
/*
* A immutable set
*/
public class OrSet<E> implements Crdt<Set<E>, OrSet<E>> {
private final Map<E, Set<UUID>> elements = new HashMap<>();
private final Map<E, Set<UUID>> tombstones = new HashMap<>();
private final transient Set<E> val;
public OrSet(){
val = computeValue();
}
OrSet(Map<E, Set<UUID>> elements, Map<E, Set<UUID>> tombstones){
this.elements.putAll(elements);
this.tombstones.putAll(tombstones);
val = computeValue();
}
@SafeVarargs
public OrSet(E ... elements){
for (E e: elements){
internalAdd(e);
}
val = computeValue();
}
public OrSet(Builder<E>builder){
for (Builder<E>.OrSetElement<E> e: builder.elements){
if (e.operation == Operation.ADD){
internalAdd(e.element);
} else {
internalRemove(e.element);
}
}
val = computeValue();
}
/**
* This constructor is the way to remove elements from an existing set
* @param set
* @param builder
*/
public OrSet(OrSet<E> set, Builder<E> builder){
elements.putAll(set.elements);
tombstones.putAll(set.tombstones);
for (Builder<E>.OrSetElement<E> e: builder.elements){
if (e.operation == Operation.ADD){
internalAdd(e.element);
} else {
internalRemove(e.element);
}
}
val = computeValue();
}
public OrSet(OrSet<E> left, OrSet<E> right){
elements.putAll(left.elements);
elements.putAll(right.elements);
tombstones.putAll(left.tombstones);
tombstones.putAll(right.tombstones);
val = computeValue();
}
public OrSet.Builder<E> builder(){
return new OrSet.Builder<>();
}
@Override
public OrSet<E> merge(OrSet<E> other) {
return new OrSet<E>(this, other);
}
private void internalAdd(E element){
Set<UUID> l = elements.get(element);
if (l == null){
Set<UUID> d = new HashSet<UUID>();
d.add(UUID.randomUUID());
elements.put(element, d);
} else {
l.add(UUID.randomUUID());
}
}
private void internalRemove(E element){
Set<UUID> elementIds = elements.get(element);
if (elementIds == null){
//deleting elements not in the list
return;
}
Set<UUID> current = tombstones.get(element);
if (current != null){
current.addAll(elementIds);
} else {
tombstones.put(element, elementIds);
}
}
/*
* Computes the live values by analyzing the elements and tombstones
*/
private Set<E> computeValue(){
Set<E> values = new HashSet<>();
for (Entry<E, Set<UUID>> entry: elements.entrySet()){
if (entry.getValue() == null || entry.getValue().size() == 0){
continue;
}
Set<UUID> deleteIds = tombstones.get(entry.getKey());
if (deleteIds == null){
values.add(entry.getKey());
} else {
if (!deleteIds.containsAll(entry.getValue())){
values.add(entry.getKey());
} else {
//if all the entry uuid is deleted the entry is deleted
}
}
}
return values;
}
@Override
public Set<E> value() {
return val;
}
@Override
public OrSet<E> optimize() {
return this;
}
public static class Builder<E> {
public static enum Operation {
ADD, REMOVE
};
private class OrSetElement<EL> {
EL element;
Operation operation;
private OrSetElement(EL element, Operation operation) {
this.element = element;
this.operation = operation;
}
}
private List<OrSetElement<E>> elements = new ArrayList<>();
public Builder<E> add(E element) {
elements.add(new OrSetElement<E>(element, Operation.ADD));
return this;
}
public Builder<E> remove(E element) {
elements.add(new OrSetElement<E>(element, Operation.REMOVE));
return this;
}
public Builder<E> mutate(E element, Operation operation) {
elements.add(new OrSetElement<E>(element, operation));
return this;
}
}
public int size() {
return value().size();
}
public boolean isEmpty() {
return value().size() == 0;
}
public boolean contains(Object o) {
return value().contains(o);
}
public Iterator<E> iterator() {
Iterator<E> managed = value().iterator();
return new Iterator<E>() {
@Override
public void remove() {
throw new IllegalArgumentException();
}
@Override
public boolean hasNext() {
return managed.hasNext();
}
@Override
public E next() {
return managed.next();
}
};
}
public Object[] toArray() {
return value().toArray();
}
public <T> T[] toArray(T[] a) {
return value().toArray(a);
}
public boolean add(E e) {
throw new IllegalArgumentException("Can not add");
}
public boolean remove(Object o) {
throw new IllegalArgumentException();
}
public boolean containsAll(Collection<?> c) {
return this.value().containsAll(c);
}
public boolean addAll(Collection<? extends E> c) {
throw new IllegalArgumentException();
}
public boolean retainAll(Collection<?> c) {
throw new IllegalArgumentException();
}
public boolean removeAll(Collection<?> c) {
throw new IllegalArgumentException();
}
public void clear() {
throw new IllegalArgumentException();
}
@Override
public String toString() {
return "OrSet [elements=" + elements + ", tombstones=" + tombstones + "]" ;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((value() == null) ? 0 : value().hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
@SuppressWarnings("rawtypes")
OrSet other = (OrSet) obj;
if (elements == null) {
if (other.elements != null)
return false;
} else if (!value().equals(other.value()))
return false;
return true;
}
Map<E, Set<UUID>> getElements() {
return elements;
}
Map<E, Set<UUID>> getTombstones() {
return tombstones;
}
}

View File

@ -119,14 +119,15 @@ public class GossipCore implements GossipCoreConstants {
sharedData.putIfAbsent(message.getKey(), message);
} else {
if (message.getPayload() instanceof Crdt){
SharedGossipDataMessage m = sharedData.get(message.getKey());
SharedGossipDataMessage curretnt = sharedData.get(message.getKey());
SharedGossipDataMessage merged = new SharedGossipDataMessage();
merged.setExpireAt(message.getExpireAt());
merged.setKey(m.getKey());
merged.setKey(curretnt.getKey());
merged.setNodeId(message.getNodeId());
merged.setTimestamp(message.getTimestamp());
merged.setPayload( ((Crdt) message.getPayload()).merge((Crdt)m.getPayload()));
sharedData.put(m.getKey(), merged);
Crdt mergedCrdt = ((Crdt) message.getPayload()).merge((Crdt)curretnt.getPayload());
merged.setPayload( mergedCrdt );
sharedData.put(curretnt.getKey(), merged);
} else {
if (previous.getTimestamp() < message.getTimestamp()) {
sharedData.replace(message.getKey(), previous, message);
@ -370,9 +371,10 @@ public class GossipCore implements GossipCoreConstants {
copy.setExpireAt(message.getExpireAt());
copy.setKey(message.getKey());
copy.setNodeId(message.getNodeId());
copy.setTimestamp(message.getTimestamp());
@SuppressWarnings("unchecked")
Crdt merged = ((Crdt) ret.getPayload()).merge((Crdt) message.getPayload());
message.setPayload(merged);
copy.setPayload(merged);
boolean replaced = sharedData.replace(message.getKey(), ret, copy);
if (replaced){
return merged;

View File

@ -18,9 +18,11 @@
package org.apache.gossip.manager.random;
import com.codahale.metrics.MetricRegistry;
import com.fasterxml.jackson.core.JsonGenerator.Feature;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.gossip.GossipMember;
import org.apache.gossip.GossipSettings;
import org.apache.gossip.crdt.CrdtModule;
import org.apache.gossip.event.GossipListener;
import org.apache.gossip.manager.GossipManager;
import org.apache.gossip.manager.handlers.DefaultMessageInvoker;
@ -126,6 +128,8 @@ public class RandomGossipManager extends GossipManager {
if (objectMapper == null) {
objectMapper = new ObjectMapper();
objectMapper.enableDefaultTyping();
objectMapper.registerModule(new CrdtModule());
objectMapper.configure(Feature.WRITE_NUMBERS_AS_STRINGS, false);
}
if (messageInvoker == null) {
messageInvoker = new DefaultMessageInvoker();

View File

@ -42,7 +42,9 @@ public class UdpSharedGossipDataMessage extends SharedGossipDataMessage implemen
@Override
public String toString() {
return "UdpSharedGossipDataMessage [uriFrom=" + uriFrom + ", uuid=" + uuid + "]";
return "UdpSharedGossipDataMessage [uriFrom=" + uriFrom + ", uuid=" + uuid + ", getNodeId()="
+ getNodeId() + ", getKey()=" + getKey() + ", getPayload()=" + getPayload()
+ ", getTimestamp()=" + getTimestamp() + ", getExpireAt()=" + getExpireAt() + "]";
}
}

View File

@ -29,6 +29,7 @@ import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.gossip.crdt.GrowOnlySet;
import org.apache.gossip.crdt.OrSet;
import org.apache.gossip.model.GossipDataMessage;
import org.apache.gossip.model.SharedGossipDataMessage;
import org.junit.Test;
@ -37,6 +38,8 @@ import io.teknek.tunit.TUnit;
public class DataTest {
private String orSetKey = "cror";
@Test
public void dataTest() throws InterruptedException, UnknownHostException, URISyntaxException{
GossipSettings settings = new GossipSettings();
@ -88,11 +91,62 @@ public class DataTest {
givenDifferentDatumsInSet(clients);
assertThatListIsMerged(clients);
givenOrs(clients);
assertThatOrSetIsMerged(clients);
dropIt(clients);
assertThatOrSetDelIsMerged(clients);
for (int i = 0; i < clusterMembers; ++i) {
clients.get(i).shutdown();
}
}
private void givenOrs(List<GossipService> clients) {
{
SharedGossipDataMessage d = new SharedGossipDataMessage();
d.setKey(orSetKey);
d.setPayload(new OrSet<String>("1", "2"));
d.setExpireAt(Long.MAX_VALUE);
d.setTimestamp(System.currentTimeMillis());
clients.get(0).getGossipManager().merge(d);
}
{
SharedGossipDataMessage d = new SharedGossipDataMessage();
d.setKey(orSetKey);
d.setPayload(new OrSet<String>("3", "4"));
d.setExpireAt(Long.MAX_VALUE);
d.setTimestamp(System.currentTimeMillis());
clients.get(1).getGossipManager().merge(d);
}
}
private void dropIt(List<GossipService> clients) {
@SuppressWarnings("unchecked")
OrSet<String> o = (OrSet<String>) clients.get(0).getGossipManager().findCrdt(orSetKey);
OrSet<String> o2 = new OrSet<String>(o, new OrSet.Builder<String>().remove("3"));
SharedGossipDataMessage d = new SharedGossipDataMessage();
d.setKey(orSetKey);
d.setPayload(o2);
d.setExpireAt(Long.MAX_VALUE);
d.setTimestamp(System.currentTimeMillis());
clients.get(0).getGossipManager().merge(d);
}
private void assertThatOrSetIsMerged(final List<GossipService> clients){
TUnit.assertThat(() -> {
return clients.get(0).getGossipManager().findCrdt(orSetKey).value();
}).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(new OrSet<String>("1", "2", "3", "4").value());
TUnit.assertThat(() -> {
return clients.get(1).getGossipManager().findCrdt(orSetKey).value();
}).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(new OrSet<String>("1", "2", "3", "4").value());
}
private void assertThatOrSetDelIsMerged(final List<GossipService> clients){
TUnit.assertThat(() -> {
return clients.get(0).getGossipManager().findCrdt(orSetKey);
}).afterWaitingAtMost(10, TimeUnit.SECONDS).equals(new OrSet<String>("1", "2", "4"));
}
private void givenDifferentDatumsInSet(final List<GossipService> clients){
clients.get(0).getGossipManager().merge(CrdtMessage("1"));
clients.get(1).getGossipManager().merge(CrdtMessage("2"));
@ -101,7 +155,7 @@ public class DataTest {
private void assertThatListIsMerged(final List<GossipService> clients){
TUnit.assertThat(() -> {
return clients.get(0).getGossipManager().findCrdt("cr");
}).afterWaitingAtMost(10, TimeUnit.SECONDS).equals(new GrowOnlySet<String>(Arrays.asList("1","2")));
}).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(new GrowOnlySet<String>(Arrays.asList("1","2")));
}
private SharedGossipDataMessage CrdtMessage(String item){

View File

@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.crdt;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.SortedSet;
import java.util.TreeSet;
import org.apache.gossip.GossipService;
import org.apache.gossip.GossipSettings;
import org.apache.gossip.RemoteGossipMember;
import org.junit.Assert;
import org.junit.Test;
import com.codahale.metrics.MetricRegistry;
public class OrSetTest {
@Test
public void atest() {
OrSet<Integer> i = new OrSet<>(new OrSet.Builder<Integer>().add(4).add(5).add(6).remove(5));
Assert.assertArrayEquals(Arrays.asList(4, 6).toArray(), i.value().toArray());
}
@Test
public void mergeTest(){
OrSet<Integer> i = new OrSet<>(new OrSet.Builder<Integer>().add(4).add(5).add(6).remove(5));
Assert.assertArrayEquals(Arrays.asList(4, 6).toArray(), i.value().toArray());
OrSet<Integer> j = new OrSet<>(new OrSet.Builder<Integer>().add(9).add(4).add(5).remove(6));
OrSet<Integer> h = i.merge(j);
Assert.assertEquals(new OrSet<Integer>(4,6,9,5), h);
}
@Test
public void mergeTest2(){
OrSet<Integer> i = new OrSet<>(new OrSet.Builder<Integer>().add(5).add(4).remove(4).add(6));
Assert.assertEquals(new OrSet<Integer>(5,6), i);
SortedSet<Integer> tree = new TreeSet<>();
for (Integer in: i.value()){
tree.add(in);
}
TreeSet<Integer> compare = new TreeSet<>();
compare.add(5);
compare.add(6);
Assert.assertEquals(tree, compare);
}
@Test
public void mergeTest4() {
Assert.assertArrayEquals(new Integer[] {},
new OrSet<Integer>(new OrSet.Builder<Integer>().add(1).remove(1)).toArray());
}
@Test
public void mergeTest3(){
OrSet<Integer> i = new OrSet<>(1);
OrSet<Integer> j = new OrSet<>(2);
OrSet<Integer> k = new OrSet<>(i.merge(j), new OrSet.Builder<Integer>().remove(1));
Assert.assertArrayEquals(new Integer[] { 2 }, i.merge(j).merge(k).toArray());
Assert.assertArrayEquals(new Integer[] { 2 }, j.merge(i).merge(k).toArray());
Assert.assertArrayEquals(new Integer[] { 2 }, k.merge(i).merge(j).toArray());
Assert.assertArrayEquals(new Integer[] { 2 }, k.merge(j).merge(i).toArray());
Assert.assertEquals(j , i.merge(j.merge(k)));
}
@Test
public void mergeTest9(){
OrSet<Integer> i = new OrSet<>(19);
OrSet<Integer> j = i.merge(i);
Assert.assertEquals(i.value(), j.value());
}
@Test
public void serialTest() throws InterruptedException, URISyntaxException, IOException {
GossipService gossipService2 = new GossipService("a", new URI("udp://" + "127.0.0.1" + ":" + (29000 + 1)), "1", new HashMap<>(),
Arrays.asList(new RemoteGossipMember("a",
new URI("udp://" + "127.0.0.1" + ":" + (29000 + 0)), "0")),
new GossipSettings(), (a, b) -> { }, new MetricRegistry());
OrSet<Integer> i = new OrSet<Integer>(new OrSet.Builder<Integer>().add(1).remove(1));
String s = gossipService2.getGossipManager().getObjectMapper().writeValueAsString(i);
@SuppressWarnings("unchecked")
OrSet<Integer> back = gossipService2.getGossipManager().getObjectMapper().readValue(s, OrSet.class);
Assert.assertEquals(back, i);
}
}