GOSSIP-53 CRDT types

This commit is contained in:
Edward Capriolo
2017-02-19 16:14:30 -05:00
parent 400cb40cba
commit b71be5e16a
9 changed files with 439 additions and 34 deletions

View File

@ -131,4 +131,5 @@ public class GossipService {
public SharedGossipDataMessage findSharedData(String key){
return getGossipManager().findSharedGossipData(key);
}
}

View File

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.crdt;
/**
*
* Immutable type
*
* @param <SetType>
* @param <MergeReturnType>
*/
public interface Crdt<SetType, MergeReturnType extends Crdt<SetType, MergeReturnType>> {
MergeReturnType merge(MergeReturnType other);
SetType value();
/**
* Called to self optimize. Some CRDTs may use some mechanism to clean up be
* removing obsolete data outside the scope of merging. IE this could clean up
* temporal values, old copies etc.
* @return the Crdt structure optimized
*/
MergeReturnType optimize();
}

View File

@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.crdt;
import java.util.function.BiFunction;
@SuppressWarnings("rawtypes")
public class CrdtBiFunctionMerge implements BiFunction<Crdt,Crdt,Crdt> {
@SuppressWarnings("unchecked")
@Override
public Crdt apply(Crdt t, Crdt u) {
if (t == null && u == null){
return null;
} else if (t == null){
return u;
} else if (u == null){
return t;
}
if (! u.getClass().equals(t.getClass())){
throw new IllegalArgumentException( "Can not merge " + t.getClass() + " "+ u.getClass());
}
return t.merge(u);
}
@SuppressWarnings("unchecked")
public static Crdt applyStatic(Crdt t, Crdt u){
if (t == null && u == null){
return null;
} else if (t == null){
return u;
} else if (u == null){
return t;
}
if (! u.getClass().equals(t.getClass())){
throw new IllegalArgumentException( "Can not merge " + t.getClass() + " "+ u.getClass());
}
return t.merge(u);
}
}

View File

@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.crdt;
import java.util.Set;
public interface CrdtSet<ElementType, SetType extends Set<ElementType>, R extends CrdtSet<ElementType, SetType, R>>
extends Crdt<SetType, R>, Set<ElementType> {
}

View File

@ -0,0 +1,168 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.crdt;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.Set;
public class GrowOnlySet<ElementType> implements CrdtSet<ElementType, Set<ElementType>, GrowOnlySet<ElementType>>{
private final Set<ElementType> hidden = new LinkedHashSet<>();
@SuppressWarnings("unused")
/*
* Used by SerDe
*/
private GrowOnlySet(){
}
public GrowOnlySet(Set<ElementType> c){
hidden.addAll(c);
}
public GrowOnlySet(Collection<ElementType> c){
hidden.addAll(c);
}
public GrowOnlySet(GrowOnlySet<ElementType> first, GrowOnlySet<ElementType> second){
hidden.addAll(first.value());
hidden.addAll(second.value());
}
@Override
public GrowOnlySet<ElementType> merge(GrowOnlySet<ElementType> other) {
return new GrowOnlySet<>(this, other);
}
@Override
public Set<ElementType> value() {
Set<ElementType> copy = new LinkedHashSet<>();
copy.addAll(hidden);
return Collections.unmodifiableSet(copy);
}
@Override
public GrowOnlySet<ElementType> optimize() {
return new GrowOnlySet<>(hidden);
}
@Override
public int size() {
return hidden.size();
}
@Override
public boolean isEmpty() {
return hidden.isEmpty();
}
@Override
public boolean contains(Object o) {
return hidden.contains(o);
}
@Override
public Iterator<ElementType> iterator() {
Set<ElementType> copy = new HashSet<>();
copy.addAll(hidden);
return copy.iterator();
}
@Override
public Object[] toArray() {
return hidden.toArray();
}
@Override
public <T> T[] toArray(T[] a) {
return hidden.toArray(a);
}
@Override
public boolean add(ElementType e) {
throw new UnsupportedOperationException();
}
@Override
public boolean remove(Object o) {
throw new UnsupportedOperationException();
}
@Override
public boolean containsAll(Collection<?> c) {
return hidden.containsAll(c);
}
@Override
public boolean addAll(Collection<? extends ElementType> c) {
throw new UnsupportedOperationException();
}
@Override
public boolean retainAll(Collection<?> c) {
throw new UnsupportedOperationException();
}
@Override
public boolean removeAll(Collection<?> c) {
throw new UnsupportedOperationException();
}
@Override
public void clear() {
throw new UnsupportedOperationException();
}
@Override
public String toString() {
return "GrowOnlySet [hidden=" + hidden + "]";
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((hidden == null) ? 0 : hidden.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
@SuppressWarnings("rawtypes")
GrowOnlySet other = (GrowOnlySet) obj;
if (hidden == null) {
if (other.hidden != null)
return false;
} else if (!hidden.equals(other.hidden))
return false;
return true;
}
}

View File

@ -23,6 +23,7 @@ import com.codahale.metrics.MetricRegistry;
import org.apache.gossip.GossipMember;
import org.apache.gossip.LocalGossipMember;
import org.apache.gossip.RemoteGossipMember;
import org.apache.gossip.crdt.Crdt;
import org.apache.gossip.event.GossipState;
import org.apache.gossip.model.*;
import org.apache.gossip.udp.Trackable;
@ -111,15 +112,27 @@ public class GossipCore implements GossipCoreConstants {
}
}
public void addSharedData(SharedGossipDataMessage message){
SharedGossipDataMessage previous = sharedData.get(message.getKey());
if (previous == null){
sharedData.putIfAbsent(message.getKey(), message);
} else {
if (previous.getTimestamp() < message.getTimestamp()){
sharedData.replace(message.getKey(), previous, message);
}
}
@SuppressWarnings({ "unchecked", "rawtypes" })
public void addSharedData(SharedGossipDataMessage message) {
SharedGossipDataMessage previous = sharedData.get(message.getKey());
if (previous == null) {
sharedData.putIfAbsent(message.getKey(), message);
} else {
if (message.getPayload() instanceof Crdt){
SharedGossipDataMessage m = sharedData.get(message.getKey());
SharedGossipDataMessage merged = new SharedGossipDataMessage();
merged.setExpireAt(message.getExpireAt());
merged.setKey(m.getKey());
merged.setNodeId(message.getNodeId());
merged.setTimestamp(message.getTimestamp());
merged.setPayload( ((Crdt) message.getPayload()).merge((Crdt)m.getPayload()));
sharedData.put(m.getKey(), merged);
} else {
if (previous.getTimestamp() < message.getTimestamp()) {
sharedData.replace(message.getKey(), previous, message);
}
}
}
}
public void addPerNodeData(GossipDataMessage message){
@ -345,4 +358,25 @@ public class GossipCore implements GossipCoreConstants {
"Dead " + gossipManager.getDeadMembers()+ "\n" +
"=======================");
}
@SuppressWarnings("rawtypes")
public Crdt merge(SharedGossipDataMessage message) {
for (;;){
SharedGossipDataMessage ret = sharedData.putIfAbsent(message.getKey(), message);
if (ret == null){
return (Crdt) message.getPayload();
}
SharedGossipDataMessage copy = new SharedGossipDataMessage();
copy.setExpireAt(message.getExpireAt());
copy.setKey(message.getKey());
copy.setNodeId(message.getNodeId());
@SuppressWarnings("unchecked")
Crdt merged = ((Crdt) ret.getPayload()).merge((Crdt) message.getPayload());
message.setPayload(merged);
boolean replaced = sharedData.replace(message.getKey(), ret, copy);
if (replaced){
return merged;
}
}
}
}

View File

@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.gossip.GossipMember;
import org.apache.gossip.GossipSettings;
import org.apache.gossip.LocalGossipMember;
import org.apache.gossip.crdt.Crdt;
import org.apache.gossip.event.GossipListener;
import org.apache.gossip.event.GossipState;
import org.apache.gossip.manager.handlers.MessageInvoker;
@ -291,6 +292,31 @@ public abstract class GossipManager {
gossipCore.addSharedData(message);
}
@SuppressWarnings("rawtypes")
public Crdt findCrdt(String key){
SharedGossipDataMessage l = gossipCore.getSharedData().get(key);
if (l == null){
return null;
}
if (l.getExpireAt() < clock.currentTimeMillis()){
return null;
} else {
return (Crdt) l.getPayload();
}
}
@SuppressWarnings("rawtypes")
public Crdt merge(SharedGossipDataMessage message){
Objects.nonNull(message.getKey());
Objects.nonNull(message.getTimestamp());
Objects.nonNull(message.getPayload());
message.setNodeId(me.getId());
if (! (message.getPayload() instanceof Crdt)){
throw new IllegalArgumentException("Not a subclass of CRDT " + message.getPayload());
}
return gossipCore.merge(message);
}
public GossipDataMessage findPerNodeGossipData(String nodeId, String key){
ConcurrentHashMap<String, GossipDataMessage> j = gossipCore.getPerNodeData().get(nodeId);
if (j == null){

View File

@ -22,12 +22,13 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import org.apache.gossip.crdt.GrowOnlySet;
import org.apache.gossip.model.GossipDataMessage;
import org.apache.gossip.model.SharedGossipDataMessage;
import org.junit.Test;
@ -58,43 +59,60 @@ public class DataTest {
clients.add(gossipService);
gossipService.start();
}
TUnit.assertThat(new Callable<Integer> (){
public Integer call() throws Exception {
int total = 0;
for (int i = 0; i < clusterMembers; ++i) {
total += clients.get(i).getGossipManager().getLiveMembers().size();
}
return total;
}}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(2);
TUnit.assertThat(() -> {
int total = 0;
for (int i = 0; i < clusterMembers; ++i) {
total += clients.get(i).getGossipManager().getLiveMembers().size();
}
return total;
}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(2);
clients.get(0).gossipPerNodeData(msg());
clients.get(0).gossipSharedData(sharedMsg());
TUnit.assertThat(new Callable<Object>() {
public Object call() throws Exception {
GossipDataMessage x = clients.get(1).findPerNodeData(1 + "", "a");
if (x == null)
return "";
else
return x.getPayload();
}
TUnit.assertThat(()-> {
GossipDataMessage x = clients.get(1).findPerNodeData(1 + "", "a");
if (x == null)
return "";
else
return x.getPayload();
}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("b");
TUnit.assertThat(new Callable<Object>() {
public Object call() throws Exception {
SharedGossipDataMessage x = clients.get(1).findSharedData("a");
if (x == null)
return "";
else
return x.getPayload();
}
TUnit.assertThat(() -> {
SharedGossipDataMessage x = clients.get(1).findSharedData("a");
if (x == null)
return "";
else
return x.getPayload();
}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("c");
givenDifferentDatumsInSet(clients);
assertThatListIsMerged(clients);
for (int i = 0; i < clusterMembers; ++i) {
clients.get(i).shutdown();
}
}
private void givenDifferentDatumsInSet(final List<GossipService> clients){
clients.get(0).getGossipManager().merge(CrdtMessage("1"));
clients.get(1).getGossipManager().merge(CrdtMessage("2"));
}
private void assertThatListIsMerged(final List<GossipService> clients){
TUnit.assertThat(() -> {
return clients.get(0).getGossipManager().findCrdt("cr");
}).afterWaitingAtMost(10, TimeUnit.SECONDS).equals(new GrowOnlySet<String>(Arrays.asList("1","2")));
}
private SharedGossipDataMessage CrdtMessage(String item){
SharedGossipDataMessage d = new SharedGossipDataMessage();
d.setKey("cr");
d.setPayload(new GrowOnlySet<String>( Arrays.asList(item)));
d.setExpireAt(Long.MAX_VALUE);
d.setTimestamp(System.currentTimeMillis());
return d;
}
private GossipDataMessage msg(){
GossipDataMessage g = new GossipDataMessage();
g.setExpireAt(Long.MAX_VALUE);

View File

@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.crdt;
import java.util.Arrays;
import java.util.concurrent.ConcurrentHashMap;
import org.junit.Assert;
import org.junit.Test;
public class GrowOnlySetTest {
@SuppressWarnings("rawtypes")
@Test
public void mergeTest(){
ConcurrentHashMap<String, Crdt> a = new ConcurrentHashMap<>();
GrowOnlySet<String> gset = new GrowOnlySet<>(Arrays.asList("a", "b"));
Assert.assertEquals(gset, a.merge("a", gset, new CrdtBiFunctionMerge()));
GrowOnlySet<String> over = new GrowOnlySet<>(Arrays.asList("b", "d"));
Assert.assertEquals(new GrowOnlySet<>(Arrays.asList("a", "b", "d")),
a.merge("a", over, CrdtBiFunctionMerge::applyStatic));
}
}