GOSSIP-41 Transfer gossip data in bulk

This commit is contained in:
pxsalehi
2017-08-28 15:59:15 +02:00
parent ac83038932
commit 7c457eb3f4
17 changed files with 469 additions and 75 deletions

View File

@ -62,6 +62,10 @@ public class GossipSettings {
private boolean signMessages = false;
private boolean bulkTransfer = false;
private int bulkTransferSize = StartupSettings.DEFAULT_BULK_TRANSFER_SIZE;
/**
* Construct GossipSettings with default settings.
@ -77,14 +81,15 @@ public class GossipSettings {
* @param cleanupInterval
* The cleanup interval in ms.
*/
public GossipSettings(int gossipInterval, int cleanupInterval, int windowSize,
int minimumSamples, double convictThreshold, String distribution) {
public GossipSettings(int gossipInterval, int cleanupInterval, int windowSize, int minimumSamples,
double convictThreshold, String distribution, boolean bulkTransfer) {
this.gossipInterval = gossipInterval;
this.cleanupInterval = cleanupInterval;
this.windowSize = windowSize;
this.minimumSamples = minimumSamples;
this.convictThreshold = convictThreshold;
this.distribution = distribution;
this.bulkTransfer = bulkTransfer;
}
/**
@ -242,4 +247,19 @@ public class GossipSettings {
this.protocolManagerClass = protocolManagerClass;
}
public boolean isBulkTransfer() {
return bulkTransfer;
}
public void setBulkTransfer(boolean bulkTransfer) {
this.bulkTransfer = bulkTransfer;
}
public int getBulkTransferSize() {
return bulkTransferSize;
}
public void setBulkTransferSize(int bulkTransferSize) {
this.bulkTransferSize = bulkTransferSize;
}
}

View File

@ -55,6 +55,10 @@ public class StartupSettings {
/** The list with gossip members to start with. */
private final List<Member> gossipMembers;
/** Default setting values */
private static final boolean DEFAULT_BULK_TRANSFER = false;
public static final int DEFAULT_BULK_TRANSFER_SIZE = 100;
/**
* Constructor.
*
@ -175,6 +179,7 @@ public class StartupSettings {
properties.put(i.getKey(), i.getValue().asText());
}
//TODO constants as defaults?
// TODO setting keys as constants?
int gossipInterval = jsonObject.get("gossip_interval").intValue();
int cleanupInterval = jsonObject.get("cleanup_interval").intValue();
int windowSize = jsonObject.get("window_size").intValue();
@ -182,6 +187,12 @@ public class StartupSettings {
double convictThreshold = jsonObject.get("convict_threshold").asDouble();
String cluster = jsonObject.get("cluster").textValue();
String distribution = jsonObject.get("distribution").textValue();
boolean bulkTransfer = jsonObject.has("bulk_transfer") ?
jsonObject.get("bulk_transfer").booleanValue() :
DEFAULT_BULK_TRANSFER;
int bulkTransferSize = jsonObject.has("bulk_transfer_size") ?
jsonObject.get("bulk_transfer_size").intValue() :
DEFAULT_BULK_TRANSFER_SIZE;
if (cluster == null){
throw new IllegalArgumentException("cluster was null. It is required");
}
@ -192,8 +203,9 @@ public class StartupSettings {
jsonObject.get("protocol_manager_class").textValue() :
null;
URI uri2 = new URI(uri);
GossipSettings gossipSettings = new GossipSettings(gossipInterval, cleanupInterval, windowSize, minSamples,
convictThreshold, distribution);
GossipSettings gossipSettings = new GossipSettings(gossipInterval, cleanupInterval, windowSize,
minSamples, convictThreshold, distribution, bulkTransfer);
gossipSettings.setBulkTransferSize(bulkTransferSize);
if (transportClass != null) {
gossipSettings.setTransportManagerClass(transportClass);
}

View File

@ -24,6 +24,7 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.MetricRegistry;
import org.apache.gossip.GossipSettings;
import org.apache.gossip.LocalMember;
import org.apache.gossip.model.ActiveGossipOk;
import org.apache.gossip.model.PerNodeDataMessage;
@ -31,15 +32,13 @@ import org.apache.gossip.model.Member;
import org.apache.gossip.model.Response;
import org.apache.gossip.model.SharedDataMessage;
import org.apache.gossip.model.ShutdownMessage;
import org.apache.gossip.udp.UdpActiveGossipMessage;
import org.apache.gossip.udp.UdpPerNodeDataMessage;
import org.apache.gossip.udp.UdpSharedDataMessage;
import org.apache.gossip.udp.*;
import org.apache.log4j.Logger;
import static com.codahale.metrics.MetricRegistry.name;
/**
* The ActiveGossipThread is sends information. Pick a random partner and send the membership list to that partner
* The ActiveGossipThread sends information. Pick a random partner and send the membership list to that partner
*/
public abstract class AbstractActiveGossiper {
@ -49,16 +48,18 @@ public abstract class AbstractActiveGossiper {
protected final GossipCore gossipCore;
private final Histogram sharedDataHistogram;
private final Histogram sendPerNodeDataHistogram;
private final Histogram sendMembershipHistorgram;
private final Histogram sendMembershipHistogram;
private final Random random;
private final GossipSettings gossipSettings;
public AbstractActiveGossiper(GossipManager gossipManager, GossipCore gossipCore, MetricRegistry registry) {
this.gossipManager = gossipManager;
this.gossipCore = gossipCore;
sharedDataHistogram = registry.histogram(name(AbstractActiveGossiper.class, "sharedDataHistogram-time"));
sendPerNodeDataHistogram = registry.histogram(name(AbstractActiveGossiper.class, "sendPerNodeDataHistogram-time"));
sendMembershipHistorgram = registry.histogram(name(AbstractActiveGossiper.class, "sendMembershipHistorgram-time"));
sendMembershipHistogram = registry.histogram(name(AbstractActiveGossiper.class, "sendMembershipHistogram-time"));
random = new Random();
gossipSettings = gossipManager.getSettings();
}
public void init() {
@ -84,6 +85,16 @@ public abstract class AbstractActiveGossiper {
return;
}
long startTime = System.currentTimeMillis();
if (gossipSettings.isBulkTransfer()) {
sendSharedDataInBulkInternal(me, member);
} else {
sendSharedDataInternal(me, member);
}
sharedDataHistogram.update(System.currentTimeMillis() - startTime);
}
/** Send shared data one entry at a time. */
private void sendSharedDataInternal(LocalMember me, LocalMember member) {
for (Entry<String, SharedDataMessage> innerEntry : gossipCore.getSharedData().entrySet()){
if (innerEntry.getValue().getReplicable() != null && !innerEntry.getValue().getReplicable()
.shouldReplicate(me, member, innerEntry.getValue())) {
@ -92,15 +103,43 @@ public abstract class AbstractActiveGossiper {
UdpSharedDataMessage message = new UdpSharedDataMessage();
message.setUuid(UUID.randomUUID().toString());
message.setUriFrom(me.getId());
message.setExpireAt(innerEntry.getValue().getExpireAt());
message.setKey(innerEntry.getValue().getKey());
message.setNodeId(innerEntry.getValue().getNodeId());
message.setTimestamp(innerEntry.getValue().getTimestamp());
message.setPayload(innerEntry.getValue().getPayload());
message.setReplicable(innerEntry.getValue().getReplicable());
copySharedDataMessage(innerEntry.getValue(), message);
gossipCore.sendOneWay(message, member.getUri());
}
sharedDataHistogram.update(System.currentTimeMillis() - startTime);
}
/** Send shared data by batching together several entries. */
private void sendSharedDataInBulkInternal(LocalMember me, LocalMember member) {
UdpSharedDataBulkMessage udpMessage = new UdpSharedDataBulkMessage();
udpMessage.setUuid(UUID.randomUUID().toString());
udpMessage.setUriFrom(me.getId());
for (Entry<String, SharedDataMessage> innerEntry : gossipCore.getSharedData().entrySet()) {
if (innerEntry.getValue().getReplicable() != null && !innerEntry.getValue().getReplicable()
.shouldReplicate(me, member, innerEntry.getValue())) {
continue;
}
SharedDataMessage message = new SharedDataMessage();
copySharedDataMessage(innerEntry.getValue(), message);
udpMessage.addMessage(message);
if (udpMessage.getMessages().size() == gossipSettings.getBulkTransferSize()) {
gossipCore.sendOneWay(udpMessage, member.getUri());
udpMessage = new UdpSharedDataBulkMessage();
udpMessage.setUuid(UUID.randomUUID().toString());
udpMessage.setUriFrom(me.getId());
}
}
if (udpMessage.getMessages().size() > 0) {
gossipCore.sendOneWay(udpMessage, member.getUri());
}
}
private void copySharedDataMessage(SharedDataMessage original, SharedDataMessage copy) {
copy.setExpireAt(original.getExpireAt());
copy.setKey(original.getKey());
copy.setNodeId(original.getNodeId());
copy.setTimestamp(original.getTimestamp());
copy.setPayload(original.getPayload());
copy.setReplicable(original.getReplicable());
}
public final void sendPerNodeData(LocalMember me, LocalMember member){
@ -108,6 +147,16 @@ public abstract class AbstractActiveGossiper {
return;
}
long startTime = System.currentTimeMillis();
if (gossipSettings.isBulkTransfer()) {
sendPerNodeDataInBulkInternal(me, member);
} else {
sendPerNodeDataInternal(me, member);
}
sendPerNodeDataHistogram.update(System.currentTimeMillis() - startTime);
}
/** Send per node data one entry at a time. */
private void sendPerNodeDataInternal(LocalMember me, LocalMember member) {
for (Entry<String, ConcurrentHashMap<String, PerNodeDataMessage>> entry : gossipCore.getPerNodeData().entrySet()){
for (Entry<String, PerNodeDataMessage> innerEntry : entry.getValue().entrySet()){
if (innerEntry.getValue().getReplicable() != null && !innerEntry.getValue().getReplicable()
@ -117,16 +166,47 @@ public abstract class AbstractActiveGossiper {
UdpPerNodeDataMessage message = new UdpPerNodeDataMessage();
message.setUuid(UUID.randomUUID().toString());
message.setUriFrom(me.getId());
message.setExpireAt(innerEntry.getValue().getExpireAt());
message.setKey(innerEntry.getValue().getKey());
message.setNodeId(innerEntry.getValue().getNodeId());
message.setTimestamp(innerEntry.getValue().getTimestamp());
message.setPayload(innerEntry.getValue().getPayload());
message.setReplicable(innerEntry.getValue().getReplicable());
copyPerNodeDataMessage(innerEntry.getValue(), message);
gossipCore.sendOneWay(message, member.getUri());
}
}
sendPerNodeDataHistogram.update(System.currentTimeMillis() - startTime);
}
/** Send per node data by batching together several entries. */
private void sendPerNodeDataInBulkInternal(LocalMember me, LocalMember member) {
for (Entry<String, ConcurrentHashMap<String, PerNodeDataMessage>> entry : gossipCore.getPerNodeData().entrySet()){
UdpPerNodeDataBulkMessage udpMessage = new UdpPerNodeDataBulkMessage();
udpMessage.setUuid(UUID.randomUUID().toString());
udpMessage.setUriFrom(me.getId());
for (Entry<String, PerNodeDataMessage> innerEntry : entry.getValue().entrySet()){
if (innerEntry.getValue().getReplicable() != null && !innerEntry.getValue().getReplicable()
.shouldReplicate(me, member, innerEntry.getValue())) {
continue;
}
PerNodeDataMessage message = new PerNodeDataMessage();
copyPerNodeDataMessage(innerEntry.getValue(), message);
udpMessage.addMessage(message);
if (udpMessage.getMessages().size() == gossipSettings.getBulkTransferSize()) {
gossipCore.sendOneWay(udpMessage, member.getUri());
udpMessage = new UdpPerNodeDataBulkMessage();
udpMessage.setUuid(UUID.randomUUID().toString());
udpMessage.setUriFrom(me.getId());
}
}
if (udpMessage.getMessages().size() > 0) {
gossipCore.sendOneWay(udpMessage, member.getUri());
}
}
}
private void copyPerNodeDataMessage(PerNodeDataMessage original, PerNodeDataMessage copy) {
copy.setExpireAt(original.getExpireAt());
copy.setKey(original.getKey());
copy.setNodeId(original.getNodeId());
copy.setTimestamp(original.getTimestamp());
copy.setPayload(original.getPayload());
copy.setReplicable(original.getReplicable());
}
/**
@ -151,7 +231,7 @@ public abstract class AbstractActiveGossiper {
} else {
LOGGER.debug("Message " + message + " generated response " + r);
}
sendMembershipHistorgram.update(System.currentTimeMillis() - startTime);
sendMembershipHistogram.update(System.currentTimeMillis() - startTime);
}
protected final Member convert(LocalMember member){

View File

@ -20,12 +20,7 @@ package org.apache.gossip.manager.handlers;
import org.apache.gossip.manager.GossipCore;
import org.apache.gossip.manager.GossipManager;
import org.apache.gossip.model.ActiveGossipMessage;
import org.apache.gossip.model.Base;
import org.apache.gossip.model.PerNodeDataMessage;
import org.apache.gossip.model.Response;
import org.apache.gossip.model.SharedDataMessage;
import org.apache.gossip.model.ShutdownMessage;
import org.apache.gossip.model.*;
import java.util.Arrays;
@ -37,20 +32,24 @@ public class MessageHandlerFactory {
new TypedMessageHandler(ShutdownMessage.class, new ShutdownMessageHandler()),
new TypedMessageHandler(PerNodeDataMessage.class, new PerNodeDataMessageHandler()),
new TypedMessageHandler(SharedDataMessage.class, new SharedDataMessageHandler()),
new TypedMessageHandler(ActiveGossipMessage.class, new ActiveGossipMessageHandler())
new TypedMessageHandler(ActiveGossipMessage.class, new ActiveGossipMessageHandler()),
new TypedMessageHandler(PerNodeDataBulkMessage.class, new PerNodeDataBulkMessageHandler()),
new TypedMessageHandler(SharedDataBulkMessage.class, new SharedDataBulkMessageHandler())
);
}
public static MessageHandler concurrentHandler(MessageHandler... handlers) {
if (handlers == null) throw new NullPointerException("handlers cannot be null");
if (handlers == null)
throw new NullPointerException("handlers cannot be null");
if (Arrays.asList(handlers).stream().filter(i -> i != null).count() != handlers.length) {
throw new NullPointerException("found at least one null handler");
}
return new MessageHandler() {
@Override
public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
@Override public boolean invoke(GossipCore gossipCore, GossipManager gossipManager,
Base base) {
// return true if at least one of the component handlers return true.
return Arrays.asList(handlers).stream().filter((mi) -> mi.invoke(gossipCore, gossipManager, base)).count() > 0;
return Arrays.asList(handlers).stream()
.filter((mi) -> mi.invoke(gossipCore, gossipManager, base)).count() > 0;
}
};
}

View File

@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.manager.handlers;
import org.apache.gossip.manager.GossipCore;
import org.apache.gossip.manager.GossipManager;
import org.apache.gossip.model.Base;
import org.apache.gossip.model.PerNodeDataMessage;
import org.apache.gossip.udp.UdpPerNodeDataBulkMessage;
public class PerNodeDataBulkMessageHandler implements MessageHandler {
/**
* @param gossipCore context.
* @param gossipManager context.
* @param base message reference.
* @return boolean indicating success.
*/
@Override
public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
UdpPerNodeDataBulkMessage udpMessage = (UdpPerNodeDataBulkMessage) base;
for (PerNodeDataMessage dataMsg: udpMessage.getMessages())
gossipCore.addPerNodeData(dataMsg);
return true;
}
}

View File

@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.manager.handlers;
import org.apache.gossip.manager.GossipCore;
import org.apache.gossip.manager.GossipManager;
import org.apache.gossip.model.Base;
import org.apache.gossip.model.SharedDataMessage;
import org.apache.gossip.udp.UdpSharedDataBulkMessage;
public class SharedDataBulkMessageHandler implements MessageHandler{
/**
* @param gossipCore context.
* @param gossipManager context.
* @param base message reference.
* @return boolean indicating success.
*/
@Override
public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
UdpSharedDataBulkMessage udpMessage = (UdpSharedDataBulkMessage) base;
for (SharedDataMessage dataMsg: udpMessage.getMessages())
gossipCore.addSharedData(dataMsg);
return true;
}
}

View File

@ -19,9 +19,9 @@ package org.apache.gossip.model;
import org.apache.gossip.udp.UdpActiveGossipMessage;
import org.apache.gossip.udp.UdpActiveGossipOk;
import org.apache.gossip.udp.UdpPerNodeDataMessage;
import org.apache.gossip.udp.UdpPerNodeDataBulkMessage;
import org.apache.gossip.udp.UdpNotAMemberFault;
import org.apache.gossip.udp.UdpSharedDataMessage;
import org.apache.gossip.udp.UdpSharedDataBulkMessage;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.annotation.JsonSubTypes;
@ -40,9 +40,9 @@ import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
@Type(value = UdpActiveGossipMessage.class, name = "UdpActiveGossipMessage"),
@Type(value = UdpNotAMemberFault.class, name = "UdpNotAMemberFault"),
@Type(value = PerNodeDataMessage.class, name = "PerNodeDataMessage"),
@Type(value = UdpPerNodeDataMessage.class, name = "UdpPerNodeDataMessage"),
@Type(value = UdpPerNodeDataBulkMessage.class, name = "UdpPerNodeDataMessage"),
@Type(value = SharedDataMessage.class, name = "SharedDataMessage"),
@Type(value = UdpSharedDataMessage.class, name = "UdpSharedDataMessage")
@Type(value = UdpSharedDataBulkMessage.class, name = "UdpSharedDataMessage")
})
public class Base {

View File

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.model;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
public class PerNodeDataBulkMessage extends Base {
private List<PerNodeDataMessage> messages = new ArrayList<>();
public void addMessage(PerNodeDataMessage msg) {
messages.add(msg);
}
public List<PerNodeDataMessage> getMessages() {
return messages;
}
@Override public String toString() {
return "GossipDataBulkMessage[" + messages.stream().map(Object::toString)
.collect(Collectors.joining(",")) + "]";
}
}

View File

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.model;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
public class SharedDataBulkMessage extends Base {
private List<SharedDataMessage> messages = new ArrayList<>();
public void addMessage(SharedDataMessage msg) {
messages.add(msg);
}
public List<SharedDataMessage> getMessages() {
return messages;
}
@Override public String toString() {
return "SharedGossipDataBulkMessage[" + messages.stream().map(Object::toString)
.collect(Collectors.joining(",")) + "]";
}
}

View File

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.udp;
import org.apache.gossip.model.PerNodeDataBulkMessage;
public class UdpPerNodeDataBulkMessage extends PerNodeDataBulkMessage implements Trackable {
private String uriFrom;
private String uuid;
public String getUriFrom() {
return uriFrom;
}
public void setUriFrom(String uriFrom) {
this.uriFrom = uriFrom;
}
public String getUuid() {
return uuid;
}
public void setUuid(String uuid) {
this.uuid = uuid;
}
@Override
public String toString() {
return "UdpGossipDataMessage [uriFrom=" + uriFrom + ", uuid=" + uuid
+ ", messages=[" + super.toString() + "] ]";
}
}

View File

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.udp;
import org.apache.gossip.model.SharedDataBulkMessage;
public class UdpSharedDataBulkMessage extends SharedDataBulkMessage implements Trackable {
private String uriFrom;
private String uuid;
public String getUriFrom() {
return uriFrom;
}
public void setUriFrom(String uriFrom) {
this.uriFrom = uriFrom;
}
public String getUuid() {
return uuid;
}
public void setUuid(String uuid) {
this.uuid = uuid;
}
@Override
public String toString() {
return "UdpSharedGossipDataMessage [uriFrom=" + uriFrom + ", uuid=" + uuid + ", getNodeId()="
+ ", messages=[" + super.toString() + "] ]";
}
}

View File

@ -23,19 +23,17 @@ import org.apache.gossip.manager.GossipManagerBuilder;
import org.apache.gossip.model.PerNodeDataMessage;
import org.junit.Assert;
import org.junit.Test;
import org.junit.platform.runner.JUnitPlatform;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.*;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@RunWith(JUnitPlatform.class)
@RunWith(Parameterized.class)
public class PerNodeDataEventTest extends AbstractIntegrationBase {
private String receivedKey = "";
@ -43,7 +41,20 @@ public class PerNodeDataEventTest extends AbstractIntegrationBase {
private Object receivingNodeDataNewValue = "";
private Object receivingNodeDataOldValue = "";
private Semaphore lock = new Semaphore(0);
private int base;
private boolean bulkTransfer;
public PerNodeDataEventTest(int base, boolean bulkTransfer) {
this.base = base;
this.bulkTransfer = bulkTransfer;
}
@Parameterized.Parameters(name = "{index} bulkTransfer={1}")
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][]{
{50000, false}, {55000, true}
});
}
@Test
public void perNodeDataEventTest()
@ -51,17 +62,18 @@ public class PerNodeDataEventTest extends AbstractIntegrationBase {
GossipSettings settings = new GossipSettings();
settings.setPersistRingState(false);
settings.setPersistDataState(false);
settings.setBulkTransfer(bulkTransfer);
String cluster = UUID.randomUUID().toString();
int seedNodes = 1;
List<Member> startupMembers = new ArrayList<>();
for (int i = 1; i < seedNodes + 1; ++i) {
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (base + i));
startupMembers.add(new RemoteMember(cluster, uri, i + ""));
}
final List<GossipManager> clients = new ArrayList<>();
final int clusterMembers = 2;
for (int i = 1; i < clusterMembers + 1; ++i) {
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (base + i));
GossipManager gossipService = GossipManagerBuilder.newBuilder().cluster(cluster).uri(uri)
.id(i + "").gossipMembers(startupMembers).gossipSettings(settings).build();
clients.add(gossipService);

View File

@ -24,19 +24,17 @@ import org.apache.gossip.manager.GossipManagerBuilder;
import org.apache.gossip.model.SharedDataMessage;
import org.junit.Assert;
import org.junit.Test;
import org.junit.platform.runner.JUnitPlatform;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.*;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@RunWith(JUnitPlatform.class)
@RunWith(Parameterized.class)
public class SharedDataEventTest extends AbstractIntegrationBase {
private String receivedKey = "";
@ -44,6 +42,20 @@ public class SharedDataEventTest extends AbstractIntegrationBase {
private Object receivingNodeDataOldValue = "";
private String gCounterKey = "gCounter";
private Semaphore lock = new Semaphore(0);
private int base;
private boolean bulkTransfer;
public SharedDataEventTest(int base, boolean bulkTransfer) {
this.base = base;
this.bulkTransfer = bulkTransfer;
}
@Parameterized.Parameters(name = "{index} bulkTransfer={1}")
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][]{
{50000, false}, {55000, true}
});
}
@Test
public void sharedDataEventTest()
@ -51,17 +63,18 @@ public class SharedDataEventTest extends AbstractIntegrationBase {
GossipSettings settings = new GossipSettings();
settings.setPersistRingState(false);
settings.setPersistDataState(false);
settings.setBulkTransfer(bulkTransfer);
String cluster = UUID.randomUUID().toString();
int seedNodes = 1;
List<Member> startupMembers = new ArrayList<>();
for (int i = 1; i < seedNodes + 1; ++i) {
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (base + i));
startupMembers.add(new RemoteMember(cluster, uri, i + ""));
}
final List<GossipManager> clients = new ArrayList<>();
final int clusterMembers = 2;
for (int i = 1; i < clusterMembers + 1; ++i) {
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (base + i));
GossipManager gossipService = GossipManagerBuilder.newBuilder().cluster(cluster).uri(uri)
.id(i + "").gossipMembers(startupMembers).gossipSettings(settings).build();
clients.add(gossipService);
@ -126,13 +139,13 @@ public class SharedDataEventTest extends AbstractIntegrationBase {
int seedNodes = 1;
List<Member> startupMembers = new ArrayList<>();
for (int i = 1; i < seedNodes + 1; ++i) {
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (base + i));
startupMembers.add(new RemoteMember(cluster, uri, i + ""));
}
final List<GossipManager> clients = new ArrayList<>();
final int clusterMembers = 3;
for (int i = 1; i < clusterMembers + 1; ++i) {
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (base + i));
GossipManager gossipService = GossipManagerBuilder.newBuilder().cluster(cluster).uri(uri)
.id(i + "").gossipMembers(startupMembers).gossipSettings(settings).build();
clients.add(gossipService);

View File

@ -49,7 +49,7 @@ public class ShutdownDeadtimeTest {
@Test
public void DeadNodesDoNotComeAliveAgain()
throws InterruptedException, UnknownHostException, URISyntaxException {
GossipSettings settings = new GossipSettings(100, 10000, 1000, 1, 10.0, "normal");
GossipSettings settings = new GossipSettings(100, 10000, 1000, 1, 10.0, "normal", false);
settings.setPersistRingState(false);
settings.setPersistDataState(false);
String cluster = UUID.randomUUID().toString();

View File

@ -47,7 +47,7 @@ public class TenNodeThreeSeedTest {
}
public void abc(int base) throws InterruptedException, UnknownHostException, URISyntaxException {
GossipSettings settings = new GossipSettings(1000, 10000, 1000, 1, 1.6, "exponential");
GossipSettings settings = new GossipSettings(1000, 10000, 1000, 1, 1.6, "exponential", false);
settings.setPersistRingState(false);
settings.setPersistDataState(false);
String cluster = UUID.randomUUID().toString();