GOSSIP-64 Implement Max-Change-Sets
This commit is contained in:
@ -17,69 +17,81 @@
|
||||
*/
|
||||
package org.apache.gossip;
|
||||
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import io.teknek.tunit.TUnit;
|
||||
import org.apache.gossip.crdt.CrdtAddRemoveSet;
|
||||
import org.apache.gossip.crdt.GrowOnlyCounter;
|
||||
import org.apache.gossip.crdt.GrowOnlySet;
|
||||
import org.apache.gossip.crdt.LWWSet;
|
||||
import org.apache.gossip.crdt.LwwSet;
|
||||
import org.apache.gossip.crdt.MaxChangeSet;
|
||||
import org.apache.gossip.crdt.OrSet;
|
||||
import org.apache.gossip.crdt.PNCounter;
|
||||
import org.apache.gossip.manager.GossipManager;
|
||||
import org.apache.gossip.manager.GossipManagerBuilder;
|
||||
import org.apache.gossip.model.PerNodeDataMessage;
|
||||
import org.apache.gossip.model.SharedDataMessage;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
import io.teknek.tunit.TUnit;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.function.Function;
|
||||
|
||||
public class DataTest extends AbstractIntegrationBase {
|
||||
public class DataTest {
|
||||
private final String gCounterKey = "crdtgc";
|
||||
private final String pnCounterKey = "crdtpn";
|
||||
|
||||
private String orSetKey = "cror";
|
||||
private String lwwSetKey = "crlww";
|
||||
private String gCounterKey = "crdtgc";
|
||||
private String pnCounterKey = "crdtpn";
|
||||
private static final List<GossipManager> clients = new ArrayList<>();
|
||||
|
||||
@BeforeClass
|
||||
public static void initializeMembers() throws InterruptedException, UnknownHostException, URISyntaxException{
|
||||
final int clusterMembers = 2;
|
||||
|
||||
@Test
|
||||
public void dataTest() throws InterruptedException, UnknownHostException, URISyntaxException {
|
||||
GossipSettings settings = new GossipSettings();
|
||||
settings.setPersistRingState(false);
|
||||
settings.setPersistDataState(false);
|
||||
String cluster = UUID.randomUUID().toString();
|
||||
int seedNodes = 1;
|
||||
List<Member> startupMembers = new ArrayList<>();
|
||||
for (int i = 1; i < seedNodes + 1; ++i) {
|
||||
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
|
||||
startupMembers.add(new RemoteMember(cluster, uri, i + ""));
|
||||
for (int i = 0; i < clusterMembers; ++i){
|
||||
int id = i + 1;
|
||||
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + id));
|
||||
startupMembers.add(new RemoteMember(cluster, uri, id + ""));
|
||||
}
|
||||
final List<GossipManager> clients = new ArrayList<>();
|
||||
final int clusterMembers = 2;
|
||||
for (int i = 1; i < clusterMembers + 1; ++i) {
|
||||
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
|
||||
GossipManager gossipService = GossipManagerBuilder
|
||||
.newBuilder()
|
||||
.cluster(cluster).uri(uri)
|
||||
.id(i + "")
|
||||
.gossipMembers(startupMembers)
|
||||
.gossipSettings(settings).build();
|
||||
|
||||
for (Member member : startupMembers){
|
||||
GossipManager gossipService = GossipManagerBuilder.newBuilder().cluster(cluster).uri(member.getUri())
|
||||
.id(member.getId()).gossipMembers(startupMembers).gossipSettings(settings).build();
|
||||
clients.add(gossipService);
|
||||
gossipService.init();
|
||||
register(gossipService);
|
||||
}
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void shutdownMembers(){
|
||||
for (final GossipManager client : clients){
|
||||
client.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void simpleDataTest(){
|
||||
TUnit.assertThat(() -> {
|
||||
int total = 0;
|
||||
for (int i = 0; i < clusterMembers; ++i) {
|
||||
total += clients.get(i).getLiveMembers().size();
|
||||
for (GossipManager client : clients){
|
||||
total += client.getLiveMembers().size();
|
||||
}
|
||||
return total;
|
||||
}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(2);
|
||||
}).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(2);
|
||||
|
||||
clients.get(0).gossipPerNodeData(generatePerNodeMsg("a", "b"));
|
||||
clients.get(0).gossipSharedData(generateSharedMsg("a", "c"));
|
||||
|
||||
@ -89,7 +101,7 @@ public class DataTest extends AbstractIntegrationBase {
|
||||
return "";
|
||||
else
|
||||
return x.getPayload();
|
||||
}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("b");
|
||||
}).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo("b");
|
||||
|
||||
TUnit.assertThat(() -> {
|
||||
SharedDataMessage x = clients.get(1).findSharedGossipData("a");
|
||||
@ -97,175 +109,118 @@ public class DataTest extends AbstractIntegrationBase {
|
||||
return "";
|
||||
else
|
||||
return x.getPayload();
|
||||
}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("c");
|
||||
|
||||
givenDifferentDatumsInSet(clients);
|
||||
assertThatListIsMerged(clients);
|
||||
|
||||
testOrSet(clients);
|
||||
testLWWSet(clients);
|
||||
|
||||
testGrowOnlyCounter(clients);
|
||||
testPNCounter(clients);
|
||||
|
||||
for (int i = 0; i < clusterMembers; ++i) {
|
||||
clients.get(i).shutdown();
|
||||
}
|
||||
}).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo("c");
|
||||
}
|
||||
|
||||
private void testOrSet(final List<GossipManager> clients) {
|
||||
// populate
|
||||
clients.get(0).merge(generateSharedMsg(orSetKey, new OrSet<>("1", "2")));
|
||||
clients.get(1).merge(generateSharedMsg(orSetKey, new OrSet<>("3", "4")));
|
||||
Set<String> setFromList(String... elements){
|
||||
return new HashSet<>(Arrays.asList(elements));
|
||||
}
|
||||
|
||||
// assert merge
|
||||
assertMerged(clients.get(0), orSetKey, new OrSet<>("1", "2", "3", "4").value());
|
||||
assertMerged(clients.get(1), orSetKey, new OrSet<>("1", "2", "3", "4").value());
|
||||
void crdtSetTest(String key, Function<Set<String>, CrdtAddRemoveSet<String, Set<String>, ?>> construct){
|
||||
//populate
|
||||
clients.get(0).merge(generateSharedMsg(key, construct.apply(setFromList("1", "2"))));
|
||||
clients.get(1).merge(generateSharedMsg(key, construct.apply(setFromList("3", "4"))));
|
||||
|
||||
// drop element
|
||||
assertMergedCrdt(key, construct.apply(setFromList("1", "2", "3", "4")).value());
|
||||
|
||||
//drop element
|
||||
@SuppressWarnings("unchecked")
|
||||
OrSet<String> o = (OrSet<String>) clients.get(0).findCrdt(orSetKey);
|
||||
OrSet<String> o2 = new OrSet<>(o, new OrSet.Builder<String>().remove("3"));
|
||||
clients.get(0).merge(generateSharedMsg(orSetKey, o2));
|
||||
CrdtAddRemoveSet<String, ?, ?> set = (CrdtAddRemoveSet<String, ?, ?>) clients.get(0).findCrdt(key);
|
||||
clients.get(0).merge(generateSharedMsg(key, set.remove("3")));
|
||||
|
||||
// assert deletion
|
||||
assertMerged(clients.get(0), orSetKey, new OrSet<>("1", "2", "4").value());
|
||||
assertMerged(clients.get(1), orSetKey, new OrSet<>("1", "2", "4").value());
|
||||
//assert deletion
|
||||
assertMergedCrdt(key, construct.apply(setFromList("1", "2", "4")).value());
|
||||
}
|
||||
|
||||
private void testLWWSet(final List<GossipManager> clients) {
|
||||
// populate
|
||||
clients.get(0).merge(generateSharedMsg(lwwSetKey, new LWWSet<>("1", "2")));
|
||||
clients.get(1).merge(generateSharedMsg(lwwSetKey, new LWWSet<>("3", "4")));
|
||||
|
||||
// assert merge
|
||||
assertMerged(clients.get(0), lwwSetKey, new LWWSet<>("1", "2", "3", "4").value());
|
||||
assertMerged(clients.get(1), lwwSetKey, new LWWSet<>("1", "2", "3", "4").value());
|
||||
|
||||
// drop element
|
||||
@SuppressWarnings("unchecked")
|
||||
LWWSet<String> lww = (LWWSet<String>) clients.get(0).findCrdt(lwwSetKey);
|
||||
clients.get(0).merge(generateSharedMsg(lwwSetKey, lww.remove("3")));
|
||||
|
||||
// assert deletion
|
||||
assertMerged(clients.get(0), lwwSetKey, new OrSet<>("1", "2", "4").value());
|
||||
assertMerged(clients.get(1), lwwSetKey, new OrSet<>("1", "2", "4").value());
|
||||
@Test
|
||||
public void OrSetTest(){
|
||||
crdtSetTest("cror", OrSet::new);
|
||||
}
|
||||
|
||||
private void testGrowOnlyCounter(List<GossipManager> clients) {
|
||||
givenDifferentIncrement(clients);
|
||||
assertThatCountIsUpdated(clients, 3);
|
||||
givenIncreaseOther(clients);
|
||||
assertThatCountIsUpdated(clients, 7);
|
||||
@Test
|
||||
public void LWWSetTest(){
|
||||
crdtSetTest("crlww", LwwSet::new);
|
||||
}
|
||||
|
||||
private void testPNCounter(List<GossipManager> clients) {
|
||||
givenPNCounter(clients);
|
||||
assertThatPNCounterSettlesAt(clients, 0);
|
||||
int[] delta1 = { 2, 3 };
|
||||
givenPNCounterUpdate(clients, delta1);
|
||||
assertThatPNCounterSettlesAt(clients, 5);
|
||||
int[] delta2 = { -3, 5 };
|
||||
givenPNCounterUpdate(clients, delta2);
|
||||
assertThatPNCounterSettlesAt(clients, 7);
|
||||
int[] delta3 = { 1, 1 };
|
||||
givenPNCounterUpdate(clients, delta3);
|
||||
assertThatPNCounterSettlesAt(clients, 9);
|
||||
int[] delta4 = { 1, -7 };
|
||||
givenPNCounterUpdate(clients, delta4);
|
||||
assertThatPNCounterSettlesAt(clients, 3);
|
||||
@Test
|
||||
public void MaxChangeSetTest(){
|
||||
crdtSetTest("crmcs", MaxChangeSet::new);
|
||||
}
|
||||
|
||||
private void givenDifferentIncrement(final List<GossipManager> clients) {
|
||||
@Test
|
||||
public void GrowOnlyCounterTest(){
|
||||
Consumer<Long> assertCountUpdated = count -> {
|
||||
for (GossipManager client : clients){
|
||||
TUnit.assertThat(() -> client.findCrdt(gCounterKey))
|
||||
.afterWaitingAtMost(10, TimeUnit.SECONDS)
|
||||
.isEqualTo(new GrowOnlyCounter(new GrowOnlyCounter.Builder(client).increment(count)));
|
||||
}
|
||||
};
|
||||
//generate different increment
|
||||
Object payload = new GrowOnlyCounter(new GrowOnlyCounter.Builder(clients.get(0)).increment(1L));
|
||||
clients.get(0).merge(generateSharedMsg(gCounterKey, payload));
|
||||
payload = new GrowOnlyCounter(new GrowOnlyCounter.Builder(clients.get(1)).increment(2L));
|
||||
clients.get(1).merge(generateSharedMsg(gCounterKey, payload));
|
||||
}
|
||||
|
||||
private void givenIncreaseOther(final List<GossipManager> clients) {
|
||||
assertCountUpdated.accept((long) 3);
|
||||
|
||||
//update one
|
||||
GrowOnlyCounter gc = (GrowOnlyCounter) clients.get(1).findCrdt(gCounterKey);
|
||||
GrowOnlyCounter gc2 = new GrowOnlyCounter(gc,
|
||||
new GrowOnlyCounter.Builder(clients.get(1)).increment(4L));
|
||||
|
||||
new GrowOnlyCounter.Builder(clients.get(1)).increment(4L));
|
||||
clients.get(1).merge(generateSharedMsg(gCounterKey, gc2));
|
||||
|
||||
assertCountUpdated.accept((long) 7);
|
||||
}
|
||||
|
||||
private void assertMerged(final GossipManager client, String key, final Set<String> expected) {
|
||||
TUnit.assertThat(() -> client.findCrdt(key).value()).afterWaitingAtMost(10, TimeUnit.SECONDS)
|
||||
.isEqualTo(expected);
|
||||
}
|
||||
@Test
|
||||
public void PNCounterTest(){
|
||||
Consumer<List<Integer>> counterUpdate = list -> {
|
||||
int clientIndex = 0;
|
||||
for (int delta : list){
|
||||
PNCounter c = (PNCounter) clients.get(clientIndex).findCrdt(pnCounterKey);
|
||||
c = new PNCounter(c, new PNCounter.Builder(clients.get(clientIndex)).increment(((long) delta)));
|
||||
clients.get(clientIndex).merge(generateSharedMsg(pnCounterKey, c));
|
||||
clientIndex = (clientIndex + 1) % clients.size();
|
||||
}
|
||||
};
|
||||
|
||||
private void givenDifferentDatumsInSet(final List<GossipManager> clients) {
|
||||
clients.get(0).merge(CrdtMessage("1"));
|
||||
clients.get(1).merge(CrdtMessage("2"));
|
||||
}
|
||||
// given PNCounter
|
||||
clients.get(0).merge(generateSharedMsg(pnCounterKey, new PNCounter(new PNCounter.Builder(clients.get(0)))));
|
||||
clients.get(1).merge(generateSharedMsg(pnCounterKey, new PNCounter(new PNCounter.Builder(clients.get(1)))));
|
||||
|
||||
private void assertThatCountIsUpdated(final List<GossipManager> clients, long finalCount) {
|
||||
TUnit.assertThat(() -> clients.get(0).findCrdt(gCounterKey))
|
||||
.afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(new GrowOnlyCounter(
|
||||
new GrowOnlyCounter.Builder(clients.get(0)).increment(finalCount)));
|
||||
}
|
||||
assertMergedCrdt(pnCounterKey, (long) 0);
|
||||
|
||||
private void assertThatListIsMerged(final List<GossipManager> clients) {
|
||||
TUnit.assertThat(() -> clients.get(0).findCrdt("cr")).afterWaitingAtMost(10, TimeUnit.SECONDS)
|
||||
.isEqualTo(new GrowOnlySet<>(Arrays.asList("1", "2")));
|
||||
}
|
||||
|
||||
private void givenPNCounter(List<GossipManager> clients) {
|
||||
{
|
||||
SharedDataMessage d = new SharedDataMessage();
|
||||
d.setKey(pnCounterKey);
|
||||
d.setPayload(new PNCounter(new PNCounter.Builder(clients.get(0))));
|
||||
d.setExpireAt(Long.MAX_VALUE);
|
||||
d.setTimestamp(System.currentTimeMillis());
|
||||
clients.get(0).merge(d);
|
||||
}
|
||||
{
|
||||
SharedDataMessage d = new SharedDataMessage();
|
||||
d.setKey(pnCounterKey);
|
||||
d.setPayload(new PNCounter(new PNCounter.Builder(clients.get(1))));
|
||||
d.setExpireAt(Long.MAX_VALUE);
|
||||
d.setTimestamp(System.currentTimeMillis());
|
||||
clients.get(1).merge(d);
|
||||
List<List<Integer>> updateLists = new ArrayList<>();
|
||||
updateLists.add(Arrays.asList(2, 3));
|
||||
updateLists.add(Arrays.asList(-3, 5));
|
||||
updateLists.add(Arrays.asList(1, 1));
|
||||
updateLists.add(Arrays.asList(1, -7));
|
||||
|
||||
Long[] expectedResults = {5L, 7L, 9L, 3L};
|
||||
|
||||
for (int i = 0; i < updateLists.size(); i++){
|
||||
counterUpdate.accept(updateLists.get(i));
|
||||
assertMergedCrdt(pnCounterKey, expectedResults[i]);
|
||||
}
|
||||
}
|
||||
|
||||
private void givenPNCounterUpdate(List<GossipManager> clients, int[] deltaArray) {
|
||||
int clientIndex = 0;
|
||||
for (int delta: deltaArray) {
|
||||
PNCounter c = (PNCounter) clients.get(clientIndex).findCrdt(pnCounterKey);
|
||||
c = new PNCounter(c, new PNCounter.Builder(clients.get(clientIndex)).increment(((long)delta)));
|
||||
SharedDataMessage d = new SharedDataMessage();
|
||||
d.setKey(pnCounterKey);
|
||||
d.setPayload(c);
|
||||
d.setExpireAt(Long.MAX_VALUE);
|
||||
d.setTimestamp(System.currentTimeMillis());
|
||||
clients.get(clientIndex).merge(d);
|
||||
clientIndex = (clientIndex + 1) % clients.size();
|
||||
@Test
|
||||
public void GrowOnlySetTest(){
|
||||
clients.get(0).merge(generateSharedMsg("cr", new GrowOnlySet<>(Arrays.asList("1"))));
|
||||
clients.get(1).merge(generateSharedMsg("cr", new GrowOnlySet<>(Arrays.asList("2"))));
|
||||
|
||||
assertMergedCrdt("cr", new GrowOnlySet<>(Arrays.asList("1", "2")).value());
|
||||
}
|
||||
|
||||
private void assertMergedCrdt(String key, Object expected){
|
||||
for (GossipManager client : clients){
|
||||
TUnit.assertThat(() -> client.findCrdt(key).value())
|
||||
.afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(expected);
|
||||
}
|
||||
}
|
||||
|
||||
private void assertThatPNCounterSettlesAt(List<GossipManager> clients, long expectedValue) {
|
||||
for (GossipManager client: clients) {
|
||||
TUnit.assertThat(() -> {
|
||||
long value = 0;
|
||||
Object o = client.findCrdt(pnCounterKey);
|
||||
if (o != null) {
|
||||
PNCounter c = (PNCounter)o;
|
||||
value = c.value();
|
||||
}
|
||||
return value;
|
||||
}).afterWaitingAtMost(10, TimeUnit.SECONDS)
|
||||
.isEqualTo(expectedValue);
|
||||
}
|
||||
}
|
||||
|
||||
private SharedDataMessage CrdtMessage(String item) {
|
||||
return generateSharedMsg("cr", new GrowOnlySet<>(Arrays.asList(item)));
|
||||
}
|
||||
|
||||
private PerNodeDataMessage generatePerNodeMsg(String key, Object payload) {
|
||||
private PerNodeDataMessage generatePerNodeMsg(String key, Object payload){
|
||||
PerNodeDataMessage g = new PerNodeDataMessage();
|
||||
g.setExpireAt(Long.MAX_VALUE);
|
||||
g.setKey(key);
|
||||
@ -274,7 +229,7 @@ public class DataTest extends AbstractIntegrationBase {
|
||||
return g;
|
||||
}
|
||||
|
||||
private SharedDataMessage generateSharedMsg(String key, Object payload) {
|
||||
private SharedDataMessage generateSharedMsg(String key, Object payload){
|
||||
SharedDataMessage d = new SharedDataMessage();
|
||||
d.setKey(key);
|
||||
d.setPayload(payload);
|
||||
|
Reference in New Issue
Block a user