diff --git a/src/main/java/org/apache/gossip/GossipService.java b/src/main/java/org/apache/gossip/GossipService.java index f32eb35..f216c33 100644 --- a/src/main/java/org/apache/gossip/GossipService.java +++ b/src/main/java/org/apache/gossip/GossipService.java @@ -131,4 +131,5 @@ public class GossipService { public SharedGossipDataMessage findSharedData(String key){ return getGossipManager().findSharedGossipData(key); } + } diff --git a/src/main/java/org/apache/gossip/crdt/Crdt.java b/src/main/java/org/apache/gossip/crdt/Crdt.java new file mode 100644 index 0000000..8edfa8c --- /dev/null +++ b/src/main/java/org/apache/gossip/crdt/Crdt.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.crdt; +/** + * + * Immutable type + * + * @param + * @param + */ +public interface Crdt> { + + + MergeReturnType merge(MergeReturnType other); + SetType value(); + /** + * Called to self optimize. Some CRDTs may use some mechanism to clean up be + * removing obsolete data outside the scope of merging. IE this could clean up + * temporal values, old copies etc. + * @return the Crdt structure optimized + */ + MergeReturnType optimize(); + +} diff --git a/src/main/java/org/apache/gossip/crdt/CrdtBiFunctionMerge.java b/src/main/java/org/apache/gossip/crdt/CrdtBiFunctionMerge.java new file mode 100644 index 0000000..1ac7a30 --- /dev/null +++ b/src/main/java/org/apache/gossip/crdt/CrdtBiFunctionMerge.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.crdt; + +import java.util.function.BiFunction; + +@SuppressWarnings("rawtypes") +public class CrdtBiFunctionMerge implements BiFunction { + + @SuppressWarnings("unchecked") + @Override + public Crdt apply(Crdt t, Crdt u) { + if (t == null && u == null){ + return null; + } else if (t == null){ + return u; + } else if (u == null){ + return t; + } + if (! u.getClass().equals(t.getClass())){ + throw new IllegalArgumentException( "Can not merge " + t.getClass() + " "+ u.getClass()); + } + return t.merge(u); + } + + @SuppressWarnings("unchecked") + public static Crdt applyStatic(Crdt t, Crdt u){ + if (t == null && u == null){ + return null; + } else if (t == null){ + return u; + } else if (u == null){ + return t; + } + if (! u.getClass().equals(t.getClass())){ + throw new IllegalArgumentException( "Can not merge " + t.getClass() + " "+ u.getClass()); + } + return t.merge(u); + } +} diff --git a/src/main/java/org/apache/gossip/crdt/CrdtSet.java b/src/main/java/org/apache/gossip/crdt/CrdtSet.java new file mode 100644 index 0000000..3a5fbca --- /dev/null +++ b/src/main/java/org/apache/gossip/crdt/CrdtSet.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.crdt; + +import java.util.Set; + +public interface CrdtSet, R extends CrdtSet> +extends Crdt, Set { + +} + diff --git a/src/main/java/org/apache/gossip/crdt/GrowOnlySet.java b/src/main/java/org/apache/gossip/crdt/GrowOnlySet.java new file mode 100644 index 0000000..0b1771b --- /dev/null +++ b/src/main/java/org/apache/gossip/crdt/GrowOnlySet.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.crdt; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashSet; +import java.util.Set; + +public class GrowOnlySet implements CrdtSet, GrowOnlySet>{ + + private final Set hidden = new LinkedHashSet<>(); + + @SuppressWarnings("unused") + /* + * Used by SerDe + */ + private GrowOnlySet(){ + + } + + public GrowOnlySet(Set c){ + hidden.addAll(c); + } + + public GrowOnlySet(Collection c){ + hidden.addAll(c); + } + + public GrowOnlySet(GrowOnlySet first, GrowOnlySet second){ + hidden.addAll(first.value()); + hidden.addAll(second.value()); + } + + @Override + public GrowOnlySet merge(GrowOnlySet other) { + return new GrowOnlySet<>(this, other); + } + + @Override + public Set value() { + Set copy = new LinkedHashSet<>(); + copy.addAll(hidden); + return Collections.unmodifiableSet(copy); + } + + @Override + public GrowOnlySet optimize() { + return new GrowOnlySet<>(hidden); + } + + @Override + public int size() { + return hidden.size(); + } + + @Override + public boolean isEmpty() { + return hidden.isEmpty(); + } + + @Override + public boolean contains(Object o) { + return hidden.contains(o); + } + + @Override + public Iterator iterator() { + Set copy = new HashSet<>(); + copy.addAll(hidden); + return copy.iterator(); + } + + @Override + public Object[] toArray() { + return hidden.toArray(); + } + + @Override + public T[] toArray(T[] a) { + return hidden.toArray(a); + } + + @Override + public boolean add(ElementType e) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean remove(Object o) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean containsAll(Collection c) { + return hidden.containsAll(c); + } + + @Override + public boolean addAll(Collection c) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean retainAll(Collection c) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean removeAll(Collection c) { + throw new UnsupportedOperationException(); + } + + @Override + public void clear() { + throw new UnsupportedOperationException(); + + } + + @Override + public String toString() { + return "GrowOnlySet [hidden=" + hidden + "]"; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((hidden == null) ? 0 : hidden.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + @SuppressWarnings("rawtypes") + GrowOnlySet other = (GrowOnlySet) obj; + if (hidden == null) { + if (other.hidden != null) + return false; + } else if (!hidden.equals(other.hidden)) + return false; + return true; + } + +} diff --git a/src/main/java/org/apache/gossip/manager/GossipCore.java b/src/main/java/org/apache/gossip/manager/GossipCore.java index 403acf4..de54597 100644 --- a/src/main/java/org/apache/gossip/manager/GossipCore.java +++ b/src/main/java/org/apache/gossip/manager/GossipCore.java @@ -23,6 +23,7 @@ import com.codahale.metrics.MetricRegistry; import org.apache.gossip.GossipMember; import org.apache.gossip.LocalGossipMember; import org.apache.gossip.RemoteGossipMember; +import org.apache.gossip.crdt.Crdt; import org.apache.gossip.event.GossipState; import org.apache.gossip.model.*; import org.apache.gossip.udp.Trackable; @@ -111,15 +112,27 @@ public class GossipCore implements GossipCoreConstants { } } - public void addSharedData(SharedGossipDataMessage message){ - SharedGossipDataMessage previous = sharedData.get(message.getKey()); - if (previous == null){ - sharedData.putIfAbsent(message.getKey(), message); - } else { - if (previous.getTimestamp() < message.getTimestamp()){ - sharedData.replace(message.getKey(), previous, message); - } - } + @SuppressWarnings({ "unchecked", "rawtypes" }) + public void addSharedData(SharedGossipDataMessage message) { + SharedGossipDataMessage previous = sharedData.get(message.getKey()); + if (previous == null) { + sharedData.putIfAbsent(message.getKey(), message); + } else { + if (message.getPayload() instanceof Crdt){ + SharedGossipDataMessage m = sharedData.get(message.getKey()); + SharedGossipDataMessage merged = new SharedGossipDataMessage(); + merged.setExpireAt(message.getExpireAt()); + merged.setKey(m.getKey()); + merged.setNodeId(message.getNodeId()); + merged.setTimestamp(message.getTimestamp()); + merged.setPayload( ((Crdt) message.getPayload()).merge((Crdt)m.getPayload())); + sharedData.put(m.getKey(), merged); + } else { + if (previous.getTimestamp() < message.getTimestamp()) { + sharedData.replace(message.getKey(), previous, message); + } + } + } } public void addPerNodeData(GossipDataMessage message){ @@ -345,4 +358,25 @@ public class GossipCore implements GossipCoreConstants { "Dead " + gossipManager.getDeadMembers()+ "\n" + "======================="); } + + @SuppressWarnings("rawtypes") + public Crdt merge(SharedGossipDataMessage message) { + for (;;){ + SharedGossipDataMessage ret = sharedData.putIfAbsent(message.getKey(), message); + if (ret == null){ + return (Crdt) message.getPayload(); + } + SharedGossipDataMessage copy = new SharedGossipDataMessage(); + copy.setExpireAt(message.getExpireAt()); + copy.setKey(message.getKey()); + copy.setNodeId(message.getNodeId()); + @SuppressWarnings("unchecked") + Crdt merged = ((Crdt) ret.getPayload()).merge((Crdt) message.getPayload()); + message.setPayload(merged); + boolean replaced = sharedData.replace(message.getKey(), ret, copy); + if (replaced){ + return merged; + } + } + } } diff --git a/src/main/java/org/apache/gossip/manager/GossipManager.java b/src/main/java/org/apache/gossip/manager/GossipManager.java index ab8e4ae..0140f00 100644 --- a/src/main/java/org/apache/gossip/manager/GossipManager.java +++ b/src/main/java/org/apache/gossip/manager/GossipManager.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.gossip.GossipMember; import org.apache.gossip.GossipSettings; import org.apache.gossip.LocalGossipMember; +import org.apache.gossip.crdt.Crdt; import org.apache.gossip.event.GossipListener; import org.apache.gossip.event.GossipState; import org.apache.gossip.manager.handlers.MessageInvoker; @@ -291,6 +292,31 @@ public abstract class GossipManager { gossipCore.addSharedData(message); } + + @SuppressWarnings("rawtypes") + public Crdt findCrdt(String key){ + SharedGossipDataMessage l = gossipCore.getSharedData().get(key); + if (l == null){ + return null; + } + if (l.getExpireAt() < clock.currentTimeMillis()){ + return null; + } else { + return (Crdt) l.getPayload(); + } + } + + @SuppressWarnings("rawtypes") + public Crdt merge(SharedGossipDataMessage message){ + Objects.nonNull(message.getKey()); + Objects.nonNull(message.getTimestamp()); + Objects.nonNull(message.getPayload()); + message.setNodeId(me.getId()); + if (! (message.getPayload() instanceof Crdt)){ + throw new IllegalArgumentException("Not a subclass of CRDT " + message.getPayload()); + } + return gossipCore.merge(message); + } public GossipDataMessage findPerNodeGossipData(String nodeId, String key){ ConcurrentHashMap j = gossipCore.getPerNodeData().get(nodeId); if (j == null){ diff --git a/src/test/java/org/apache/gossip/DataTest.java b/src/test/java/org/apache/gossip/DataTest.java index b5fa705..f05636b 100644 --- a/src/test/java/org/apache/gossip/DataTest.java +++ b/src/test/java/org/apache/gossip/DataTest.java @@ -22,12 +22,13 @@ import java.net.URI; import java.net.URISyntaxException; import java.net.UnknownHostException; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.UUID; -import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; +import org.apache.gossip.crdt.GrowOnlySet; import org.apache.gossip.model.GossipDataMessage; import org.apache.gossip.model.SharedGossipDataMessage; import org.junit.Test; @@ -58,43 +59,60 @@ public class DataTest { clients.add(gossipService); gossipService.start(); } - TUnit.assertThat(new Callable (){ - public Integer call() throws Exception { - int total = 0; - for (int i = 0; i < clusterMembers; ++i) { - total += clients.get(i).getGossipManager().getLiveMembers().size(); - } - return total; - }}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(2); + TUnit.assertThat(() -> { + int total = 0; + for (int i = 0; i < clusterMembers; ++i) { + total += clients.get(i).getGossipManager().getLiveMembers().size(); + } + return total; + }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(2); clients.get(0).gossipPerNodeData(msg()); clients.get(0).gossipSharedData(sharedMsg()); - TUnit.assertThat(new Callable() { - public Object call() throws Exception { - GossipDataMessage x = clients.get(1).findPerNodeData(1 + "", "a"); - if (x == null) - return ""; - else - return x.getPayload(); - } + TUnit.assertThat(()-> { + GossipDataMessage x = clients.get(1).findPerNodeData(1 + "", "a"); + if (x == null) + return ""; + else + return x.getPayload(); }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("b"); - TUnit.assertThat(new Callable() { - public Object call() throws Exception { - SharedGossipDataMessage x = clients.get(1).findSharedData("a"); - if (x == null) - return ""; - else - return x.getPayload(); - } + TUnit.assertThat(() -> { + SharedGossipDataMessage x = clients.get(1).findSharedData("a"); + if (x == null) + return ""; + else + return x.getPayload(); }).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("c"); + givenDifferentDatumsInSet(clients); + assertThatListIsMerged(clients); for (int i = 0; i < clusterMembers; ++i) { clients.get(i).shutdown(); } } + private void givenDifferentDatumsInSet(final List clients){ + clients.get(0).getGossipManager().merge(CrdtMessage("1")); + clients.get(1).getGossipManager().merge(CrdtMessage("2")); + } + + private void assertThatListIsMerged(final List clients){ + TUnit.assertThat(() -> { + return clients.get(0).getGossipManager().findCrdt("cr"); + }).afterWaitingAtMost(10, TimeUnit.SECONDS).equals(new GrowOnlySet(Arrays.asList("1","2"))); + } + + private SharedGossipDataMessage CrdtMessage(String item){ + SharedGossipDataMessage d = new SharedGossipDataMessage(); + d.setKey("cr"); + d.setPayload(new GrowOnlySet( Arrays.asList(item))); + d.setExpireAt(Long.MAX_VALUE); + d.setTimestamp(System.currentTimeMillis()); + return d; + } + private GossipDataMessage msg(){ GossipDataMessage g = new GossipDataMessage(); g.setExpireAt(Long.MAX_VALUE); diff --git a/src/test/java/org/apache/gossip/crdt/GrowOnlySetTest.java b/src/test/java/org/apache/gossip/crdt/GrowOnlySetTest.java new file mode 100644 index 0000000..d4f12b6 --- /dev/null +++ b/src/test/java/org/apache/gossip/crdt/GrowOnlySetTest.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.crdt; + +import java.util.Arrays; +import java.util.concurrent.ConcurrentHashMap; + +import org.junit.Assert; +import org.junit.Test; + +public class GrowOnlySetTest { + + @SuppressWarnings("rawtypes") + @Test + public void mergeTest(){ + ConcurrentHashMap a = new ConcurrentHashMap<>(); + GrowOnlySet gset = new GrowOnlySet<>(Arrays.asList("a", "b")); + Assert.assertEquals(gset, a.merge("a", gset, new CrdtBiFunctionMerge())); + GrowOnlySet over = new GrowOnlySet<>(Arrays.asList("b", "d")); + Assert.assertEquals(new GrowOnlySet<>(Arrays.asList("a", "b", "d")), + a.merge("a", over, CrdtBiFunctionMerge::applyStatic)); + } +}