Fixed build
This commit is contained in:
@ -52,7 +52,8 @@ public class DataTest {
|
||||
private final String pnCounterKey = "crdtpn";
|
||||
|
||||
@BeforeClass
|
||||
public static void initializeMembers() throws InterruptedException, UnknownHostException, URISyntaxException{
|
||||
public static void initializeMembers()
|
||||
throws InterruptedException, UnknownHostException, URISyntaxException {
|
||||
final int clusterMembers = 2;
|
||||
|
||||
GossipSettings settings = new GossipSettings();
|
||||
@ -60,107 +61,122 @@ public class DataTest {
|
||||
settings.setPersistDataState(false);
|
||||
String cluster = UUID.randomUUID().toString();
|
||||
List<Member> startupMembers = new ArrayList<>();
|
||||
for (int i = 0; i < clusterMembers; ++i){
|
||||
for (int i = 0; i < clusterMembers; ++i) {
|
||||
int id = i + 1;
|
||||
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + id));
|
||||
startupMembers.add(new RemoteMember(cluster, uri, id + ""));
|
||||
}
|
||||
|
||||
for (Member member : startupMembers){
|
||||
GossipManager gossipService = GossipManagerBuilder.newBuilder().cluster(cluster).uri(member.getUri())
|
||||
.id(member.getId()).gossipMembers(startupMembers).gossipSettings(settings).build();
|
||||
for (Member member : startupMembers) {
|
||||
GossipManager gossipService =
|
||||
GossipManagerBuilder.newBuilder()
|
||||
.cluster(cluster)
|
||||
.uri(member.getUri())
|
||||
.id(member.getId())
|
||||
.gossipMembers(startupMembers)
|
||||
.gossipSettings(settings)
|
||||
.build();
|
||||
clients.add(gossipService);
|
||||
gossipService.init();
|
||||
}
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void shutdownMembers(){
|
||||
for (final GossipManager client : clients){
|
||||
public static void shutdownMembers() {
|
||||
for (final GossipManager client : clients) {
|
||||
client.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void simpleDataTest(){
|
||||
TUnit.assertThat(() -> {
|
||||
int total = 0;
|
||||
for (GossipManager client : clients){
|
||||
total += client.getLiveMembers().size();
|
||||
}
|
||||
return total;
|
||||
}).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(2);
|
||||
public void simpleDataTest() {
|
||||
TUnit.assertThat(
|
||||
() -> {
|
||||
int total = 0;
|
||||
for (GossipManager client : clients) {
|
||||
total += client.getLiveMembers().size();
|
||||
}
|
||||
return total;
|
||||
})
|
||||
.afterWaitingAtMost(10, TimeUnit.SECONDS)
|
||||
.isEqualTo(2);
|
||||
|
||||
clients.get(0).gossipPerNodeData(generatePerNodeMsg("a", "b"));
|
||||
clients.get(0).gossipSharedData(generateSharedMsg("a", "c"));
|
||||
|
||||
TUnit.assertThat(() -> {
|
||||
PerNodeDataMessage x = clients.get(1).findPerNodeGossipData(1 + "", "a");
|
||||
if (x == null)
|
||||
return "";
|
||||
else
|
||||
return x.getPayload();
|
||||
}).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo("b");
|
||||
TUnit.assertThat(
|
||||
() -> {
|
||||
PerNodeDataMessage x = clients.get(1).findPerNodeGossipData(1 + "", "a");
|
||||
if (x == null) return "";
|
||||
else return x.getPayload();
|
||||
})
|
||||
.afterWaitingAtMost(10, TimeUnit.SECONDS)
|
||||
.isEqualTo("b");
|
||||
|
||||
TUnit.assertThat(() -> {
|
||||
SharedDataMessage x = clients.get(1).findSharedGossipData("a");
|
||||
if (x == null)
|
||||
return "";
|
||||
else
|
||||
return x.getPayload();
|
||||
}).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo("c");
|
||||
TUnit.assertThat(
|
||||
() -> {
|
||||
SharedDataMessage x = clients.get(1).findSharedGossipData("a");
|
||||
if (x == null) return "";
|
||||
else return x.getPayload();
|
||||
})
|
||||
.afterWaitingAtMost(10, TimeUnit.SECONDS)
|
||||
.isEqualTo("c");
|
||||
}
|
||||
|
||||
Set<String> setFromList(String... elements){
|
||||
Set<String> setFromList(String... elements) {
|
||||
return new HashSet<>(Arrays.asList(elements));
|
||||
}
|
||||
|
||||
void crdtSetTest(String key, Function<Set<String>, CrdtAddRemoveSet<String, Set<String>, ?>> construct){
|
||||
//populate
|
||||
void crdtSetTest(
|
||||
String key, Function<Set<String>, CrdtAddRemoveSet<String, Set<String>, ?>> construct) {
|
||||
// populate
|
||||
clients.get(0).merge(generateSharedMsg(key, construct.apply(setFromList("1", "2"))));
|
||||
clients.get(1).merge(generateSharedMsg(key, construct.apply(setFromList("3", "4"))));
|
||||
|
||||
assertMergedCrdt(key, construct.apply(setFromList("1", "2", "3", "4")).value());
|
||||
|
||||
//drop element
|
||||
// drop element
|
||||
@SuppressWarnings("unchecked")
|
||||
CrdtAddRemoveSet<String, ?, ?> set = (CrdtAddRemoveSet<String, ?, ?>) clients.get(0).findCrdt(key);
|
||||
CrdtAddRemoveSet<String, ?, ?> set =
|
||||
(CrdtAddRemoveSet<String, ?, ?>) clients.get(0).findCrdt(key);
|
||||
clients.get(0).merge(generateSharedMsg(key, set.remove("3")));
|
||||
|
||||
//assert deletion
|
||||
// assert deletion
|
||||
assertMergedCrdt(key, construct.apply(setFromList("1", "2", "4")).value());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void OrSetTest(){
|
||||
public void OrSetTest() {
|
||||
crdtSetTest("cror", OrSet::new);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void LWWSetTest(){
|
||||
public void LWWSetTest() {
|
||||
crdtSetTest("crlww", LwwSet::new);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void MaxChangeSetTest(){
|
||||
public void MaxChangeSetTest() {
|
||||
crdtSetTest("crmcs", MaxChangeSet::new);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void TwoPhaseSetTest(){
|
||||
public void TwoPhaseSetTest() {
|
||||
crdtSetTest("crtps", TwoPhaseSet::new);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void GrowOnlyCounterTest(){
|
||||
Consumer<Long> assertCountUpdated = count -> {
|
||||
for (GossipManager client : clients){
|
||||
TUnit.assertThat(() -> client.findCrdt(gCounterKey))
|
||||
.afterWaitingAtMost(10, TimeUnit.SECONDS)
|
||||
.isEqualTo(new GrowOnlyCounter(new GrowOnlyCounter.Builder(client).increment(count)));
|
||||
}
|
||||
};
|
||||
//generate different increment
|
||||
public void GrowOnlyCounterTest() {
|
||||
Consumer<Long> assertCountUpdated =
|
||||
count -> {
|
||||
for (GossipManager client : clients) {
|
||||
TUnit.assertThat(() -> client.findCrdt(gCounterKey))
|
||||
.afterWaitingAtMost(10, TimeUnit.SECONDS)
|
||||
.isEqualTo(
|
||||
new GrowOnlyCounter(new GrowOnlyCounter.Builder(client).increment(count)));
|
||||
}
|
||||
};
|
||||
// generate different increment
|
||||
Object payload = new GrowOnlyCounter(new GrowOnlyCounter.Builder(clients.get(0)).increment(1L));
|
||||
clients.get(0).merge(generateSharedMsg(gCounterKey, payload));
|
||||
payload = new GrowOnlyCounter(new GrowOnlyCounter.Builder(clients.get(1)).increment(2L));
|
||||
@ -168,30 +184,39 @@ public class DataTest {
|
||||
|
||||
assertCountUpdated.accept((long) 3);
|
||||
|
||||
//update one
|
||||
// update one
|
||||
GrowOnlyCounter gc = (GrowOnlyCounter) clients.get(1).findCrdt(gCounterKey);
|
||||
GrowOnlyCounter gc2 = new GrowOnlyCounter(gc,
|
||||
new GrowOnlyCounter.Builder(clients.get(1)).increment(4L));
|
||||
GrowOnlyCounter gc2 =
|
||||
new GrowOnlyCounter(gc, new GrowOnlyCounter.Builder(clients.get(1)).increment(4L));
|
||||
clients.get(1).merge(generateSharedMsg(gCounterKey, gc2));
|
||||
|
||||
assertCountUpdated.accept((long) 7);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void PNCounterTest(){
|
||||
Consumer<List<Integer>> counterUpdate = list -> {
|
||||
int clientIndex = 0;
|
||||
for (int delta : list){
|
||||
PNCounter c = (PNCounter) clients.get(clientIndex).findCrdt(pnCounterKey);
|
||||
c = new PNCounter(c, new PNCounter.Builder(clients.get(clientIndex)).increment(((long) delta)));
|
||||
clients.get(clientIndex).merge(generateSharedMsg(pnCounterKey, c));
|
||||
clientIndex = (clientIndex + 1) % clients.size();
|
||||
}
|
||||
};
|
||||
public void PNCounterTest() {
|
||||
Consumer<List<Integer>> counterUpdate =
|
||||
list -> {
|
||||
int clientIndex = 0;
|
||||
for (int delta : list) {
|
||||
PNCounter c = (PNCounter) clients.get(clientIndex).findCrdt(pnCounterKey);
|
||||
c =
|
||||
new PNCounter(
|
||||
c, new PNCounter.Builder(clients.get(clientIndex)).increment(((long) delta)));
|
||||
clients.get(clientIndex).merge(generateSharedMsg(pnCounterKey, c));
|
||||
clientIndex = (clientIndex + 1) % clients.size();
|
||||
}
|
||||
};
|
||||
|
||||
// given PNCounter
|
||||
clients.get(0).merge(generateSharedMsg(pnCounterKey, new PNCounter(new PNCounter.Builder(clients.get(0)))));
|
||||
clients.get(1).merge(generateSharedMsg(pnCounterKey, new PNCounter(new PNCounter.Builder(clients.get(1)))));
|
||||
clients
|
||||
.get(0)
|
||||
.merge(
|
||||
generateSharedMsg(pnCounterKey, new PNCounter(new PNCounter.Builder(clients.get(0)))));
|
||||
clients
|
||||
.get(1)
|
||||
.merge(
|
||||
generateSharedMsg(pnCounterKey, new PNCounter(new PNCounter.Builder(clients.get(1)))));
|
||||
|
||||
assertMergedCrdt(pnCounterKey, (long) 0);
|
||||
|
||||
@ -203,28 +228,29 @@ public class DataTest {
|
||||
|
||||
Long[] expectedResults = {5L, 7L, 9L, 3L};
|
||||
|
||||
for (int i = 0; i < updateLists.size(); i++){
|
||||
for (int i = 0; i < updateLists.size(); i++) {
|
||||
counterUpdate.accept(updateLists.get(i));
|
||||
assertMergedCrdt(pnCounterKey, expectedResults[i]);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void GrowOnlySetTest(){
|
||||
public void GrowOnlySetTest() {
|
||||
clients.get(0).merge(generateSharedMsg("cr", new GrowOnlySet<>(Arrays.asList("1"))));
|
||||
clients.get(1).merge(generateSharedMsg("cr", new GrowOnlySet<>(Arrays.asList("2"))));
|
||||
|
||||
assertMergedCrdt("cr", new GrowOnlySet<>(Arrays.asList("1", "2")).value());
|
||||
}
|
||||
|
||||
private void assertMergedCrdt(String key, Object expected){
|
||||
for (GossipManager client : clients){
|
||||
private void assertMergedCrdt(String key, Object expected) {
|
||||
for (GossipManager client : clients) {
|
||||
TUnit.assertThat(() -> client.findCrdt(key).value())
|
||||
.afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(expected);
|
||||
.afterWaitingAtMost(10, TimeUnit.SECONDS)
|
||||
.isEqualTo(expected);
|
||||
}
|
||||
}
|
||||
|
||||
private PerNodeDataMessage generatePerNodeMsg(String key, Object payload){
|
||||
private PerNodeDataMessage generatePerNodeMsg(String key, Object payload) {
|
||||
PerNodeDataMessage g = new PerNodeDataMessage();
|
||||
g.setExpireAt(Long.MAX_VALUE);
|
||||
g.setKey(key);
|
||||
@ -233,7 +259,7 @@ public class DataTest {
|
||||
return g;
|
||||
}
|
||||
|
||||
private SharedDataMessage generateSharedMsg(String key, Object payload){
|
||||
private SharedDataMessage generateSharedMsg(String key, Object payload) {
|
||||
SharedDataMessage d = new SharedDataMessage();
|
||||
d.setKey(key);
|
||||
d.setPayload(payload);
|
||||
@ -241,4 +267,4 @@ public class DataTest {
|
||||
d.setTimestamp(System.currentTimeMillis());
|
||||
return d;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -17,6 +17,7 @@
|
||||
*/
|
||||
package org.apache.gossip;
|
||||
|
||||
import io.teknek.tunit.TUnit;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.net.UnknownHostException;
|
||||
@ -26,14 +27,11 @@ import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.gossip.manager.DatacenterRackAwareActiveGossiper;
|
||||
import org.apache.gossip.manager.GossipManager;
|
||||
import org.apache.gossip.manager.GossipManagerBuilder;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import io.teknek.tunit.TUnit;
|
||||
|
||||
public class IdAndPropertyTest extends AbstractIntegrationBase {
|
||||
|
||||
@Test
|
||||
|
@ -18,6 +18,12 @@
|
||||
package org.apache.gossip;
|
||||
|
||||
import io.teknek.tunit.TUnit;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.Semaphore;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.apache.gossip.manager.GossipManager;
|
||||
import org.apache.gossip.manager.GossipManagerBuilder;
|
||||
import org.apache.gossip.model.PerNodeDataMessage;
|
||||
@ -26,13 +32,6 @@ import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.Semaphore;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@RunWith(Parameterized.class)
|
||||
public class PerNodeDataEventTest extends AbstractIntegrationBase {
|
||||
|
||||
|
@ -18,6 +18,11 @@
|
||||
package org.apache.gossip;
|
||||
|
||||
import io.teknek.tunit.TUnit;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.apache.gossip.manager.DatacenterRackAwareActiveGossiper;
|
||||
import org.apache.gossip.manager.GossipManager;
|
||||
import org.apache.gossip.manager.GossipManagerBuilder;
|
||||
@ -25,12 +30,6 @@ import org.apache.gossip.model.PerNodeDataMessage;
|
||||
import org.apache.gossip.replication.*;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class PerNodeDataReplicationControlTest extends AbstractIntegrationBase {
|
||||
|
||||
@Test
|
||||
|
@ -35,7 +35,7 @@ import org.junit.runners.Parameterized;
|
||||
|
||||
@RunWith(Parameterized.class)
|
||||
public class SharedDataEventTest extends AbstractIntegrationBase {
|
||||
|
||||
|
||||
private String receivedKey = "";
|
||||
private Object receivingNodeDataNewValue = "";
|
||||
private Object receivingNodeDataOldValue = "";
|
||||
@ -51,14 +51,12 @@ public class SharedDataEventTest extends AbstractIntegrationBase {
|
||||
|
||||
@Parameterized.Parameters(name = "{index} bulkTransfer={1}")
|
||||
public static Collection<Object[]> data() {
|
||||
return Arrays.asList(new Object[][]{
|
||||
{50000, false}, {55000, true}
|
||||
});
|
||||
return Arrays.asList(new Object[][] {{50000, false}, {55000, true}});
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void sharedDataEventTest()
|
||||
throws InterruptedException, UnknownHostException, URISyntaxException {
|
||||
throws InterruptedException, UnknownHostException, URISyntaxException {
|
||||
GossipSettings settings = new GossipSettings();
|
||||
settings.setPersistRingState(false);
|
||||
settings.setPersistDataState(false);
|
||||
@ -74,41 +72,52 @@ public class SharedDataEventTest extends AbstractIntegrationBase {
|
||||
final int clusterMembers = 2;
|
||||
for (int i = 1; i < clusterMembers + 1; ++i) {
|
||||
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (base + i));
|
||||
GossipManager gossipService = GossipManagerBuilder.newBuilder().cluster(cluster).uri(uri)
|
||||
.id(i + "").gossipMembers(startupMembers).gossipSettings(settings).build();
|
||||
GossipManager gossipService =
|
||||
GossipManagerBuilder.newBuilder()
|
||||
.cluster(cluster)
|
||||
.uri(uri)
|
||||
.id(i + "")
|
||||
.gossipMembers(startupMembers)
|
||||
.gossipSettings(settings)
|
||||
.build();
|
||||
clients.add(gossipService);
|
||||
gossipService.init();
|
||||
register(gossipService);
|
||||
}
|
||||
|
||||
|
||||
// check whether the members are discovered
|
||||
TUnit.assertThat(() -> {
|
||||
int total = 0;
|
||||
for (int i = 0; i < clusterMembers; ++i) {
|
||||
total += clients.get(i).getLiveMembers().size();
|
||||
}
|
||||
return total;
|
||||
}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(2);
|
||||
|
||||
TUnit.assertThat(
|
||||
() -> {
|
||||
int total = 0;
|
||||
for (int i = 0; i < clusterMembers; ++i) {
|
||||
total += clients.get(i).getLiveMembers().size();
|
||||
}
|
||||
return total;
|
||||
})
|
||||
.afterWaitingAtMost(20, TimeUnit.SECONDS)
|
||||
.isEqualTo(2);
|
||||
|
||||
// Adding new data to Node 1
|
||||
clients.get(0).gossipSharedData(sharedNodeData("category", "distributed"));
|
||||
|
||||
|
||||
// Node 2 is interested in data changes for the key "organization" and "category"
|
||||
clients.get(1).registerSharedDataSubscriber((key, oldValue, newValue) -> {
|
||||
if (!key.equals("organization") && !key.equals("category"))
|
||||
return;
|
||||
receivedKey = key;
|
||||
receivingNodeDataOldValue = oldValue;
|
||||
receivingNodeDataNewValue = newValue;
|
||||
lock.release();
|
||||
});
|
||||
|
||||
clients
|
||||
.get(1)
|
||||
.registerSharedDataSubscriber(
|
||||
(key, oldValue, newValue) -> {
|
||||
if (!key.equals("organization") && !key.equals("category")) return;
|
||||
receivedKey = key;
|
||||
receivingNodeDataOldValue = oldValue;
|
||||
receivingNodeDataNewValue = newValue;
|
||||
lock.release();
|
||||
});
|
||||
|
||||
// Node 2 first time gets shared data
|
||||
lock.tryAcquire(10, TimeUnit.SECONDS);
|
||||
Assert.assertEquals("category", receivedKey);
|
||||
Assert.assertEquals(null, receivingNodeDataOldValue);
|
||||
Assert.assertEquals("distributed", receivingNodeDataNewValue);
|
||||
|
||||
|
||||
// Node 1 adds new per node data
|
||||
clients.get(0).gossipSharedData(sharedNodeData("organization", "apache"));
|
||||
// Node 2 adds new shared data
|
||||
@ -116,21 +125,20 @@ public class SharedDataEventTest extends AbstractIntegrationBase {
|
||||
Assert.assertEquals("organization", receivedKey);
|
||||
Assert.assertEquals(null, receivingNodeDataOldValue);
|
||||
Assert.assertEquals("apache", receivingNodeDataNewValue);
|
||||
|
||||
|
||||
// Node 1 updates its value
|
||||
clients.get(0).gossipSharedData(sharedNodeData("organization", "apache-gossip"));
|
||||
|
||||
|
||||
// Node 2 updates existing value
|
||||
lock.tryAcquire(10, TimeUnit.SECONDS);
|
||||
Assert.assertEquals("organization", receivedKey);
|
||||
Assert.assertEquals("apache", receivingNodeDataOldValue);
|
||||
Assert.assertEquals("apache-gossip", receivingNodeDataNewValue);
|
||||
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void CrdtDataChangeEventTest()
|
||||
throws InterruptedException, UnknownHostException, URISyntaxException {
|
||||
throws InterruptedException, UnknownHostException, URISyntaxException {
|
||||
GossipSettings settings = new GossipSettings();
|
||||
settings.setPersistRingState(false);
|
||||
settings.setPersistDataState(false);
|
||||
@ -145,29 +153,41 @@ public class SharedDataEventTest extends AbstractIntegrationBase {
|
||||
final int clusterMembers = 3;
|
||||
for (int i = 1; i < clusterMembers + 1; ++i) {
|
||||
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (base + i));
|
||||
GossipManager gossipService = GossipManagerBuilder.newBuilder().cluster(cluster).uri(uri)
|
||||
.id(i + "").gossipMembers(startupMembers).gossipSettings(settings).build();
|
||||
GossipManager gossipService =
|
||||
GossipManagerBuilder.newBuilder()
|
||||
.cluster(cluster)
|
||||
.uri(uri)
|
||||
.id(i + "")
|
||||
.gossipMembers(startupMembers)
|
||||
.gossipSettings(settings)
|
||||
.build();
|
||||
clients.add(gossipService);
|
||||
gossipService.init();
|
||||
register(gossipService);
|
||||
}
|
||||
|
||||
|
||||
// check whether the members are discovered
|
||||
TUnit.assertThat(() -> {
|
||||
int total = 0;
|
||||
for (int i = 0; i < clusterMembers; ++i) {
|
||||
total += clients.get(i).getLiveMembers().size();
|
||||
}
|
||||
return total;
|
||||
}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(2);
|
||||
|
||||
clients.get(1).registerSharedDataSubscriber((key, oldValue, newValue) -> {
|
||||
receivedKey = key;
|
||||
receivingNodeDataOldValue = oldValue;
|
||||
receivingNodeDataNewValue = newValue;
|
||||
lock.release();
|
||||
});
|
||||
|
||||
TUnit.assertThat(
|
||||
() -> {
|
||||
int total = 0;
|
||||
for (int i = 0; i < clusterMembers; ++i) {
|
||||
total += clients.get(i).getLiveMembers().size();
|
||||
}
|
||||
return total;
|
||||
})
|
||||
.afterWaitingAtMost(20, TimeUnit.SECONDS)
|
||||
.isEqualTo(2);
|
||||
|
||||
clients
|
||||
.get(1)
|
||||
.registerSharedDataSubscriber(
|
||||
(key, oldValue, newValue) -> {
|
||||
receivedKey = key;
|
||||
receivingNodeDataOldValue = oldValue;
|
||||
receivingNodeDataNewValue = newValue;
|
||||
lock.release();
|
||||
});
|
||||
|
||||
// Add initial gCounter to Node 1
|
||||
SharedDataMessage d = new SharedDataMessage();
|
||||
d.setKey(gCounterKey);
|
||||
@ -175,37 +195,39 @@ public class SharedDataEventTest extends AbstractIntegrationBase {
|
||||
d.setExpireAt(Long.MAX_VALUE);
|
||||
d.setTimestamp(System.currentTimeMillis());
|
||||
clients.get(0).merge(d);
|
||||
|
||||
|
||||
// Check if initial Crdt received
|
||||
lock.tryAcquire(10, TimeUnit.SECONDS);
|
||||
Assert.assertEquals("gCounter", receivedKey);
|
||||
Assert.assertEquals(null, receivingNodeDataOldValue);
|
||||
Assert.assertTrue(receivingNodeDataNewValue instanceof GrowOnlyCounter);
|
||||
Assert.assertEquals(1, ((GrowOnlyCounter) receivingNodeDataNewValue).value().longValue());
|
||||
|
||||
|
||||
// check whether Node 3 received the gCounter
|
||||
TUnit.assertThat(() -> {
|
||||
GrowOnlyCounter gc = (GrowOnlyCounter) clients.get(2).findCrdt(gCounterKey);
|
||||
if (gc == null) {
|
||||
return "";
|
||||
} else {
|
||||
return gc;
|
||||
}
|
||||
}).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo(
|
||||
new GrowOnlyCounter(new GrowOnlyCounter.Builder(clients.get(0)).increment(1L)));
|
||||
|
||||
TUnit.assertThat(
|
||||
() -> {
|
||||
GrowOnlyCounter gc = (GrowOnlyCounter) clients.get(2).findCrdt(gCounterKey);
|
||||
if (gc == null) {
|
||||
return "";
|
||||
} else {
|
||||
return gc;
|
||||
}
|
||||
})
|
||||
.afterWaitingAtMost(10, TimeUnit.SECONDS)
|
||||
.isEqualTo(new GrowOnlyCounter(new GrowOnlyCounter.Builder(clients.get(0)).increment(1L)));
|
||||
|
||||
// Node 3 Updates the gCounter by 4
|
||||
GrowOnlyCounter gc = (GrowOnlyCounter) clients.get(2).findCrdt(gCounterKey);
|
||||
GrowOnlyCounter gcNew = new GrowOnlyCounter(gc,
|
||||
new GrowOnlyCounter.Builder(clients.get(2)).increment(4L));
|
||||
|
||||
GrowOnlyCounter gcNew =
|
||||
new GrowOnlyCounter(gc, new GrowOnlyCounter.Builder(clients.get(2)).increment(4L));
|
||||
|
||||
d = new SharedDataMessage();
|
||||
d.setKey(gCounterKey);
|
||||
d.setPayload(gcNew);
|
||||
d.setExpireAt(Long.MAX_VALUE);
|
||||
d.setTimestamp(System.currentTimeMillis());
|
||||
clients.get(2).merge(d);
|
||||
|
||||
|
||||
// Check if Node 3's Crdt update is received in Node 2 event handler
|
||||
lock.tryAcquire(10, TimeUnit.SECONDS);
|
||||
Assert.assertEquals("gCounter", receivedKey);
|
||||
@ -213,9 +235,8 @@ public class SharedDataEventTest extends AbstractIntegrationBase {
|
||||
Assert.assertEquals(1, ((GrowOnlyCounter) receivingNodeDataOldValue).value().longValue());
|
||||
Assert.assertTrue(receivingNodeDataNewValue instanceof GrowOnlyCounter);
|
||||
Assert.assertEquals(5, ((GrowOnlyCounter) receivingNodeDataNewValue).value().longValue());
|
||||
|
||||
}
|
||||
|
||||
|
||||
private SharedDataMessage sharedNodeData(String key, String value) {
|
||||
SharedDataMessage g = new SharedDataMessage();
|
||||
g.setExpireAt(Long.MAX_VALUE);
|
||||
@ -224,5 +245,4 @@ public class SharedDataEventTest extends AbstractIntegrationBase {
|
||||
g.setTimestamp(System.currentTimeMillis());
|
||||
return g;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -17,13 +17,6 @@
|
||||
*/
|
||||
package org.apache.gossip;
|
||||
|
||||
import org.apache.gossip.lock.exceptions.VoteFailedException;
|
||||
import org.apache.gossip.manager.GossipManager;
|
||||
import org.apache.gossip.manager.GossipManagerBuilder;
|
||||
import org.apache.gossip.model.SharedDataMessage;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.net.UnknownHostException;
|
||||
@ -31,6 +24,12 @@ import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import org.apache.gossip.lock.exceptions.VoteFailedException;
|
||||
import org.apache.gossip.manager.GossipManager;
|
||||
import org.apache.gossip.manager.GossipManagerBuilder;
|
||||
import org.apache.gossip.model.SharedDataMessage;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
public class SharedDataLockTest extends AbstractIntegrationBase {
|
||||
|
||||
|
@ -44,8 +44,7 @@ import java.util.concurrent.TimeUnit;
|
||||
public class SharedDataReplicationControlTest extends AbstractIntegrationBase {
|
||||
|
||||
@Test
|
||||
public void sharedDataReplicationTest()
|
||||
throws URISyntaxException {
|
||||
public void sharedDataReplicationTest() throws URISyntaxException {
|
||||
generateStandardNodes(3);
|
||||
|
||||
// check whether the members are discovered
|
||||
|
@ -38,8 +38,7 @@ public class ShutdownDeadtimeTest {
|
||||
// harm), and the
|
||||
// sleep that happens after startup.
|
||||
@Test
|
||||
public void DeadNodesDoNotComeAliveAgain()
|
||||
throws InterruptedException, URISyntaxException {
|
||||
public void DeadNodesDoNotComeAliveAgain() throws InterruptedException, URISyntaxException {
|
||||
GossipSettings settings = new GossipSettings(100, 10000, 1000, 1, 10.0, "normal", false);
|
||||
settings.setPersistRingState(false);
|
||||
settings.setPersistDataState(false);
|
||||
@ -68,13 +67,13 @@ public class ShutdownDeadtimeTest {
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
TUnit.assertThat(
|
||||
() -> {
|
||||
int total = 0;
|
||||
for (int i = 0; i < clusterMembers; ++i) {
|
||||
total += clients.get(i).getLiveMembers().size();
|
||||
}
|
||||
return total;
|
||||
})
|
||||
() -> {
|
||||
int total = 0;
|
||||
for (int i = 0; i < clusterMembers; ++i) {
|
||||
total += clients.get(i).getLiveMembers().size();
|
||||
}
|
||||
return total;
|
||||
})
|
||||
.afterWaitingAtMost(40, TimeUnit.SECONDS)
|
||||
.isEqualTo(20);
|
||||
|
||||
@ -86,25 +85,25 @@ public class ShutdownDeadtimeTest {
|
||||
final String shutdownId = clients.get(randomClientId).getMyself().getId();
|
||||
clients.get(randomClientId).shutdown();
|
||||
TUnit.assertThat(
|
||||
() -> {
|
||||
int total = 0;
|
||||
for (int i = 0; i < clusterMembers; ++i) {
|
||||
total += clients.get(i).getLiveMembers().size();
|
||||
}
|
||||
return total;
|
||||
})
|
||||
() -> {
|
||||
int total = 0;
|
||||
for (int i = 0; i < clusterMembers; ++i) {
|
||||
total += clients.get(i).getLiveMembers().size();
|
||||
}
|
||||
return total;
|
||||
})
|
||||
.afterWaitingAtMost(40, TimeUnit.SECONDS)
|
||||
.isEqualTo(16);
|
||||
clients.remove(randomClientId);
|
||||
|
||||
TUnit.assertThat(
|
||||
() -> {
|
||||
int total = 0;
|
||||
for (int i = 0; i < clusterMembers - 1; ++i) {
|
||||
total += clients.get(i).getDeadMembers().size();
|
||||
}
|
||||
return total;
|
||||
})
|
||||
() -> {
|
||||
int total = 0;
|
||||
for (int i = 0; i < clusterMembers - 1; ++i) {
|
||||
total += clients.get(i).getDeadMembers().size();
|
||||
}
|
||||
return total;
|
||||
})
|
||||
.afterWaitingAtMost(50, TimeUnit.SECONDS)
|
||||
.isEqualTo(4);
|
||||
|
||||
@ -123,13 +122,13 @@ public class ShutdownDeadtimeTest {
|
||||
|
||||
// verify that the client is alive again for every node
|
||||
TUnit.assertThat(
|
||||
() -> {
|
||||
int total = 0;
|
||||
for (int i = 0; i < clusterMembers; ++i) {
|
||||
total += clients.get(i).getLiveMembers().size();
|
||||
}
|
||||
return total;
|
||||
})
|
||||
() -> {
|
||||
int total = 0;
|
||||
for (int i = 0; i < clusterMembers; ++i) {
|
||||
total += clients.get(i).getLiveMembers().size();
|
||||
}
|
||||
return total;
|
||||
})
|
||||
.afterWaitingAtMost(60, TimeUnit.SECONDS)
|
||||
.isEqualTo(20);
|
||||
|
||||
|
@ -17,6 +17,7 @@
|
||||
*/
|
||||
package org.apache.gossip;
|
||||
|
||||
import io.teknek.tunit.TUnit;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
@ -28,7 +29,6 @@ import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.gossip.manager.GossipManager;
|
||||
import org.apache.gossip.manager.GossipManagerBuilder;
|
||||
import org.apache.gossip.manager.PassiveGossipConstants;
|
||||
@ -36,8 +36,6 @@ import org.apache.gossip.secure.KeyTool;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import io.teknek.tunit.TUnit;
|
||||
|
||||
public class SignedMessageTest extends AbstractIntegrationBase {
|
||||
|
||||
private GossipSettings gossiperThatSigns() {
|
||||
|
@ -17,18 +17,16 @@
|
||||
*/
|
||||
package org.apache.gossip;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.gossip.manager.GossipManager;
|
||||
import org.apache.gossip.manager.GossipManagerBuilder;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.UUID;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.gossip.manager.GossipManager;
|
||||
import org.apache.gossip.manager.GossipManagerBuilder;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
/** Tests support of using {@code StartupSettings} and thereby reading setup config from file. */
|
||||
@Slf4j
|
||||
|
@ -18,7 +18,6 @@
|
||||
package org.apache.gossip;
|
||||
|
||||
import io.teknek.tunit.TUnit;
|
||||
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.net.UnknownHostException;
|
||||
|
Reference in New Issue
Block a user