Small merge conflict

This commit is contained in:
edward
2017-09-03 14:40:45 -04:00
17 changed files with 470 additions and 75 deletions

View File

@ -68,6 +68,10 @@ public class GossipSettings {
private LockManagerSettings lockManagerSettings = LockManagerSettings
.getLockManagerDefaultSettings();
private boolean bulkTransfer = false;
private int bulkTransferSize = StartupSettings.DEFAULT_BULK_TRANSFER_SIZE;
/**
* Construct GossipSettings with default settings.
*/
@ -82,14 +86,15 @@ public class GossipSettings {
* @param cleanupInterval
* The cleanup interval in ms.
*/
public GossipSettings(int gossipInterval, int cleanupInterval, int windowSize,
int minimumSamples, double convictThreshold, String distribution) {
public GossipSettings(int gossipInterval, int cleanupInterval, int windowSize, int minimumSamples,
double convictThreshold, String distribution, boolean bulkTransfer) {
this.gossipInterval = gossipInterval;
this.cleanupInterval = cleanupInterval;
this.windowSize = windowSize;
this.minimumSamples = minimumSamples;
this.convictThreshold = convictThreshold;
this.distribution = distribution;
this.bulkTransfer = bulkTransfer;
}
/**
@ -258,4 +263,20 @@ public class GossipSettings {
public void setLockManagerSettings(LockManagerSettings lockManagerSettings) {
this.lockManagerSettings = lockManagerSettings;
}
public boolean isBulkTransfer() {
return bulkTransfer;
}
public void setBulkTransfer(boolean bulkTransfer) {
this.bulkTransfer = bulkTransfer;
}
public int getBulkTransferSize() {
return bulkTransferSize;
}
public void setBulkTransferSize(int bulkTransferSize) {
this.bulkTransferSize = bulkTransferSize;
}
}

View File

@ -55,6 +55,10 @@ public class StartupSettings {
/** The list with gossip members to start with. */
private final List<Member> gossipMembers;
/** Default setting values */
private static final boolean DEFAULT_BULK_TRANSFER = false;
public static final int DEFAULT_BULK_TRANSFER_SIZE = 100;
/**
* Constructor.
*
@ -175,6 +179,7 @@ public class StartupSettings {
properties.put(i.getKey(), i.getValue().asText());
}
//TODO constants as defaults?
// TODO setting keys as constants?
int gossipInterval = jsonObject.get("gossip_interval").intValue();
int cleanupInterval = jsonObject.get("cleanup_interval").intValue();
int windowSize = jsonObject.get("window_size").intValue();
@ -182,6 +187,12 @@ public class StartupSettings {
double convictThreshold = jsonObject.get("convict_threshold").asDouble();
String cluster = jsonObject.get("cluster").textValue();
String distribution = jsonObject.get("distribution").textValue();
boolean bulkTransfer = jsonObject.has("bulk_transfer") ?
jsonObject.get("bulk_transfer").booleanValue() :
DEFAULT_BULK_TRANSFER;
int bulkTransferSize = jsonObject.has("bulk_transfer_size") ?
jsonObject.get("bulk_transfer_size").intValue() :
DEFAULT_BULK_TRANSFER_SIZE;
if (cluster == null){
throw new IllegalArgumentException("cluster was null. It is required");
}
@ -192,8 +203,9 @@ public class StartupSettings {
jsonObject.get("protocol_manager_class").textValue() :
null;
URI uri2 = new URI(uri);
GossipSettings gossipSettings = new GossipSettings(gossipInterval, cleanupInterval, windowSize, minSamples,
convictThreshold, distribution);
GossipSettings gossipSettings = new GossipSettings(gossipInterval, cleanupInterval, windowSize,
minSamples, convictThreshold, distribution, bulkTransfer);
gossipSettings.setBulkTransferSize(bulkTransferSize);
if (transportClass != null) {
gossipSettings.setTransportManagerClass(transportClass);
}

View File

@ -24,6 +24,7 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.MetricRegistry;
import org.apache.gossip.GossipSettings;
import org.apache.gossip.LocalMember;
import org.apache.gossip.model.ActiveGossipOk;
import org.apache.gossip.model.PerNodeDataMessage;
@ -31,40 +32,40 @@ import org.apache.gossip.model.Member;
import org.apache.gossip.model.Response;
import org.apache.gossip.model.SharedDataMessage;
import org.apache.gossip.model.ShutdownMessage;
import org.apache.gossip.udp.UdpActiveGossipMessage;
import org.apache.gossip.udp.UdpPerNodeDataMessage;
import org.apache.gossip.udp.UdpSharedDataMessage;
import org.apache.gossip.udp.*;
import org.apache.log4j.Logger;
import static com.codahale.metrics.MetricRegistry.name;
/**
* The ActiveGossipThread is sends information. Pick a random partner and send the membership list to that partner
* The ActiveGossipThread sends information. Pick a random partner and send the membership list to that partner
*/
public abstract class AbstractActiveGossiper {
protected static final Logger LOGGER = Logger.getLogger(AbstractActiveGossiper.class);
protected final GossipManager gossipManager;
protected final GossipCore gossipCore;
private final Histogram sharedDataHistogram;
private final Histogram sendPerNodeDataHistogram;
private final Histogram sendMembershipHistorgram;
private final Histogram sendMembershipHistogram;
private final Random random;
private final GossipSettings gossipSettings;
public AbstractActiveGossiper(GossipManager gossipManager, GossipCore gossipCore, MetricRegistry registry) {
this.gossipManager = gossipManager;
this.gossipCore = gossipCore;
sharedDataHistogram = registry.histogram(name(AbstractActiveGossiper.class, "sharedDataHistogram-time"));
sendPerNodeDataHistogram = registry.histogram(name(AbstractActiveGossiper.class, "sendPerNodeDataHistogram-time"));
sendMembershipHistorgram = registry.histogram(name(AbstractActiveGossiper.class, "sendMembershipHistorgram-time"));
sendMembershipHistogram = registry.histogram(name(AbstractActiveGossiper.class, "sendMembershipHistogram-time"));
random = new Random();
gossipSettings = gossipManager.getSettings();
}
public void init() {
}
public void shutdown() {
}
@ -78,12 +79,22 @@ public abstract class AbstractActiveGossiper {
m.setShutdownAtNanos(gossipManager.getClock().nanoTime());
gossipCore.sendOneWay(m, target.getUri());
}
public final void sendSharedData(LocalMember me, LocalMember member){
if (member == null){
public final void sendSharedData(LocalMember me, LocalMember member) {
if (member == null) {
return;
}
long startTime = System.currentTimeMillis();
if (gossipSettings.isBulkTransfer()) {
sendSharedDataInBulkInternal(me, member);
} else {
sendSharedDataInternal(me, member);
}
sharedDataHistogram.update(System.currentTimeMillis() - startTime);
}
/** Send shared data one entry at a time. */
private void sendSharedDataInternal(LocalMember me, LocalMember member) {
for (Entry<String, SharedDataMessage> innerEntry : gossipCore.getSharedData().entrySet()){
if (innerEntry.getValue().getReplicable() != null && !innerEntry.getValue().getReplicable()
.shouldReplicate(me, member, innerEntry.getValue())) {
@ -92,22 +103,60 @@ public abstract class AbstractActiveGossiper {
UdpSharedDataMessage message = new UdpSharedDataMessage();
message.setUuid(UUID.randomUUID().toString());
message.setUriFrom(me.getId());
message.setExpireAt(innerEntry.getValue().getExpireAt());
message.setKey(innerEntry.getValue().getKey());
message.setNodeId(innerEntry.getValue().getNodeId());
message.setTimestamp(innerEntry.getValue().getTimestamp());
message.setPayload(innerEntry.getValue().getPayload());
message.setReplicable(innerEntry.getValue().getReplicable());
copySharedDataMessage(innerEntry.getValue(), message);
gossipCore.sendOneWay(message, member.getUri());
}
sharedDataHistogram.update(System.currentTimeMillis() - startTime);
}
/** Send shared data by batching together several entries. */
private void sendSharedDataInBulkInternal(LocalMember me, LocalMember member) {
UdpSharedDataBulkMessage udpMessage = new UdpSharedDataBulkMessage();
udpMessage.setUuid(UUID.randomUUID().toString());
udpMessage.setUriFrom(me.getId());
for (Entry<String, SharedDataMessage> innerEntry : gossipCore.getSharedData().entrySet()) {
if (innerEntry.getValue().getReplicable() != null && !innerEntry.getValue().getReplicable()
.shouldReplicate(me, member, innerEntry.getValue())) {
continue;
}
SharedDataMessage message = new SharedDataMessage();
copySharedDataMessage(innerEntry.getValue(), message);
udpMessage.addMessage(message);
if (udpMessage.getMessages().size() == gossipSettings.getBulkTransferSize()) {
gossipCore.sendOneWay(udpMessage, member.getUri());
udpMessage = new UdpSharedDataBulkMessage();
udpMessage.setUuid(UUID.randomUUID().toString());
udpMessage.setUriFrom(me.getId());
}
}
if (udpMessage.getMessages().size() > 0) {
gossipCore.sendOneWay(udpMessage, member.getUri());
}
}
private void copySharedDataMessage(SharedDataMessage original, SharedDataMessage copy) {
copy.setExpireAt(original.getExpireAt());
copy.setKey(original.getKey());
copy.setNodeId(original.getNodeId());
copy.setTimestamp(original.getTimestamp());
copy.setPayload(original.getPayload());
copy.setReplicable(original.getReplicable());
}
public final void sendPerNodeData(LocalMember me, LocalMember member){
if (member == null){
return;
}
long startTime = System.currentTimeMillis();
if (gossipSettings.isBulkTransfer()) {
sendPerNodeDataInBulkInternal(me, member);
} else {
sendPerNodeDataInternal(me, member);
}
sendPerNodeDataHistogram.update(System.currentTimeMillis() - startTime);
}
/** Send per node data one entry at a time. */
private void sendPerNodeDataInternal(LocalMember me, LocalMember member) {
for (Entry<String, ConcurrentHashMap<String, PerNodeDataMessage>> entry : gossipCore.getPerNodeData().entrySet()){
for (Entry<String, PerNodeDataMessage> innerEntry : entry.getValue().entrySet()){
if (innerEntry.getValue().getReplicable() != null && !innerEntry.getValue().getReplicable()
@ -117,18 +166,49 @@ public abstract class AbstractActiveGossiper {
UdpPerNodeDataMessage message = new UdpPerNodeDataMessage();
message.setUuid(UUID.randomUUID().toString());
message.setUriFrom(me.getId());
message.setExpireAt(innerEntry.getValue().getExpireAt());
message.setKey(innerEntry.getValue().getKey());
message.setNodeId(innerEntry.getValue().getNodeId());
message.setTimestamp(innerEntry.getValue().getTimestamp());
message.setPayload(innerEntry.getValue().getPayload());
message.setReplicable(innerEntry.getValue().getReplicable());
gossipCore.sendOneWay(message, member.getUri());
copyPerNodeDataMessage(innerEntry.getValue(), message);
gossipCore.sendOneWay(message, member.getUri());
}
}
sendPerNodeDataHistogram.update(System.currentTimeMillis() - startTime);
}
/** Send per node data by batching together several entries. */
private void sendPerNodeDataInBulkInternal(LocalMember me, LocalMember member) {
for (Entry<String, ConcurrentHashMap<String, PerNodeDataMessage>> entry : gossipCore.getPerNodeData().entrySet()){
UdpPerNodeDataBulkMessage udpMessage = new UdpPerNodeDataBulkMessage();
udpMessage.setUuid(UUID.randomUUID().toString());
udpMessage.setUriFrom(me.getId());
for (Entry<String, PerNodeDataMessage> innerEntry : entry.getValue().entrySet()){
if (innerEntry.getValue().getReplicable() != null && !innerEntry.getValue().getReplicable()
.shouldReplicate(me, member, innerEntry.getValue())) {
continue;
}
PerNodeDataMessage message = new PerNodeDataMessage();
copyPerNodeDataMessage(innerEntry.getValue(), message);
udpMessage.addMessage(message);
if (udpMessage.getMessages().size() == gossipSettings.getBulkTransferSize()) {
gossipCore.sendOneWay(udpMessage, member.getUri());
udpMessage = new UdpPerNodeDataBulkMessage();
udpMessage.setUuid(UUID.randomUUID().toString());
udpMessage.setUriFrom(me.getId());
}
}
if (udpMessage.getMessages().size() > 0) {
gossipCore.sendOneWay(udpMessage, member.getUri());
}
}
}
private void copyPerNodeDataMessage(PerNodeDataMessage original, PerNodeDataMessage copy) {
copy.setExpireAt(original.getExpireAt());
copy.setKey(original.getKey());
copy.setNodeId(original.getNodeId());
copy.setTimestamp(original.getTimestamp());
copy.setPayload(original.getPayload());
copy.setReplicable(original.getReplicable());
}
/**
* Performs the sending of the membership list, after we have incremented our own heartbeat.
*/
@ -151,9 +231,9 @@ public abstract class AbstractActiveGossiper {
} else {
LOGGER.debug("Message " + message + " generated response " + r);
}
sendMembershipHistorgram.update(System.currentTimeMillis() - startTime);
sendMembershipHistogram.update(System.currentTimeMillis() - startTime);
}
protected final Member convert(LocalMember member){
Member gm = new Member();
gm.setCluster(member.getClusterName());
@ -163,9 +243,9 @@ public abstract class AbstractActiveGossiper {
gm.setProperties(member.getProperties());
return gm;
}
/**
*
*
* @param memberList
* An immutable list
* @return The chosen LocalGossipMember to gossip with.

View File

@ -20,37 +20,36 @@ package org.apache.gossip.manager.handlers;
import org.apache.gossip.manager.GossipCore;
import org.apache.gossip.manager.GossipManager;
import org.apache.gossip.model.ActiveGossipMessage;
import org.apache.gossip.model.Base;
import org.apache.gossip.model.PerNodeDataMessage;
import org.apache.gossip.model.Response;
import org.apache.gossip.model.SharedDataMessage;
import org.apache.gossip.model.ShutdownMessage;
import org.apache.gossip.model.*;
import java.util.Arrays;
public class MessageHandlerFactory {
public static MessageHandler defaultHandler() {
return concurrentHandler(
new TypedMessageHandler(Response.class, new ResponseHandler()),
new TypedMessageHandler(ShutdownMessage.class, new ShutdownMessageHandler()),
new TypedMessageHandler(PerNodeDataMessage.class, new PerNodeDataMessageHandler()),
new TypedMessageHandler(SharedDataMessage.class, new SharedDataMessageHandler()),
new TypedMessageHandler(ActiveGossipMessage.class, new ActiveGossipMessageHandler())
new TypedMessageHandler(ActiveGossipMessage.class, new ActiveGossipMessageHandler()),
new TypedMessageHandler(PerNodeDataBulkMessage.class, new PerNodeDataBulkMessageHandler()),
new TypedMessageHandler(SharedDataBulkMessage.class, new SharedDataBulkMessageHandler())
);
}
public static MessageHandler concurrentHandler(MessageHandler... handlers) {
if (handlers == null) throw new NullPointerException("handlers cannot be null");
if (handlers == null)
throw new NullPointerException("handlers cannot be null");
if (Arrays.asList(handlers).stream().filter(i -> i != null).count() != handlers.length) {
throw new NullPointerException("found at least one null handler");
}
return new MessageHandler() {
@Override
public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
@Override public boolean invoke(GossipCore gossipCore, GossipManager gossipManager,
Base base) {
// return true if at least one of the component handlers return true.
return Arrays.asList(handlers).stream().filter((mi) -> mi.invoke(gossipCore, gossipManager, base)).count() > 0;
return Arrays.asList(handlers).stream()
.filter((mi) -> mi.invoke(gossipCore, gossipManager, base)).count() > 0;
}
};
}

View File

@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.manager.handlers;
import org.apache.gossip.manager.GossipCore;
import org.apache.gossip.manager.GossipManager;
import org.apache.gossip.model.Base;
import org.apache.gossip.model.PerNodeDataMessage;
import org.apache.gossip.udp.UdpPerNodeDataBulkMessage;
public class PerNodeDataBulkMessageHandler implements MessageHandler {
/**
* @param gossipCore context.
* @param gossipManager context.
* @param base message reference.
* @return boolean indicating success.
*/
@Override
public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
UdpPerNodeDataBulkMessage udpMessage = (UdpPerNodeDataBulkMessage) base;
for (PerNodeDataMessage dataMsg: udpMessage.getMessages())
gossipCore.addPerNodeData(dataMsg);
return true;
}
}

View File

@ -23,7 +23,7 @@ import org.apache.gossip.model.Base;
import org.apache.gossip.udp.UdpPerNodeDataMessage;
public class PerNodeDataMessageHandler implements MessageHandler {
/**
* @param gossipCore context.
* @param gossipManager context.

View File

@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.manager.handlers;
import org.apache.gossip.manager.GossipCore;
import org.apache.gossip.manager.GossipManager;
import org.apache.gossip.model.Base;
import org.apache.gossip.model.SharedDataMessage;
import org.apache.gossip.udp.UdpSharedDataBulkMessage;
public class SharedDataBulkMessageHandler implements MessageHandler{
/**
* @param gossipCore context.
* @param gossipManager context.
* @param base message reference.
* @return boolean indicating success.
*/
@Override
public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
UdpSharedDataBulkMessage udpMessage = (UdpSharedDataBulkMessage) base;
for (SharedDataMessage dataMsg: udpMessage.getMessages())
gossipCore.addSharedData(dataMsg);
return true;
}
}

View File

@ -19,9 +19,9 @@ package org.apache.gossip.model;
import org.apache.gossip.udp.UdpActiveGossipMessage;
import org.apache.gossip.udp.UdpActiveGossipOk;
import org.apache.gossip.udp.UdpPerNodeDataMessage;
import org.apache.gossip.udp.UdpPerNodeDataBulkMessage;
import org.apache.gossip.udp.UdpNotAMemberFault;
import org.apache.gossip.udp.UdpSharedDataMessage;
import org.apache.gossip.udp.UdpSharedDataBulkMessage;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.annotation.JsonSubTypes;
@ -40,9 +40,9 @@ import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
@Type(value = UdpActiveGossipMessage.class, name = "UdpActiveGossipMessage"),
@Type(value = UdpNotAMemberFault.class, name = "UdpNotAMemberFault"),
@Type(value = PerNodeDataMessage.class, name = "PerNodeDataMessage"),
@Type(value = UdpPerNodeDataMessage.class, name = "UdpPerNodeDataMessage"),
@Type(value = UdpPerNodeDataBulkMessage.class, name = "UdpPerNodeDataMessage"),
@Type(value = SharedDataMessage.class, name = "SharedDataMessage"),
@Type(value = UdpSharedDataMessage.class, name = "UdpSharedDataMessage")
@Type(value = UdpSharedDataBulkMessage.class, name = "UdpSharedDataMessage")
})
public class Base {

View File

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.model;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
public class PerNodeDataBulkMessage extends Base {
private List<PerNodeDataMessage> messages = new ArrayList<>();
public void addMessage(PerNodeDataMessage msg) {
messages.add(msg);
}
public List<PerNodeDataMessage> getMessages() {
return messages;
}
@Override public String toString() {
return "GossipDataBulkMessage[" + messages.stream().map(Object::toString)
.collect(Collectors.joining(",")) + "]";
}
}

View File

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.model;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
public class SharedDataBulkMessage extends Base {
private List<SharedDataMessage> messages = new ArrayList<>();
public void addMessage(SharedDataMessage msg) {
messages.add(msg);
}
public List<SharedDataMessage> getMessages() {
return messages;
}
@Override public String toString() {
return "SharedGossipDataBulkMessage[" + messages.stream().map(Object::toString)
.collect(Collectors.joining(",")) + "]";
}
}

View File

@ -28,7 +28,7 @@ public class SharedDataMessage extends Base {
private Long timestamp;
private Long expireAt;
private Replicable<SharedDataMessage> replicable;
public String getNodeId() {
return nodeId;
}

View File

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.udp;
import org.apache.gossip.model.PerNodeDataBulkMessage;
public class UdpPerNodeDataBulkMessage extends PerNodeDataBulkMessage implements Trackable {
private String uriFrom;
private String uuid;
public String getUriFrom() {
return uriFrom;
}
public void setUriFrom(String uriFrom) {
this.uriFrom = uriFrom;
}
public String getUuid() {
return uuid;
}
public void setUuid(String uuid) {
this.uuid = uuid;
}
@Override
public String toString() {
return "UdpGossipDataMessage [uriFrom=" + uriFrom + ", uuid=" + uuid
+ ", messages=[" + super.toString() + "] ]";
}
}

View File

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.udp;
import org.apache.gossip.model.SharedDataBulkMessage;
public class UdpSharedDataBulkMessage extends SharedDataBulkMessage implements Trackable {
private String uriFrom;
private String uuid;
public String getUriFrom() {
return uriFrom;
}
public void setUriFrom(String uriFrom) {
this.uriFrom = uriFrom;
}
public String getUuid() {
return uuid;
}
public void setUuid(String uuid) {
this.uuid = uuid;
}
@Override
public String toString() {
return "UdpSharedGossipDataMessage [uriFrom=" + uriFrom + ", uuid=" + uuid + ", getNodeId()="
+ ", messages=[" + super.toString() + "] ]";
}
}

View File

@ -23,19 +23,17 @@ import org.apache.gossip.manager.GossipManagerBuilder;
import org.apache.gossip.model.PerNodeDataMessage;
import org.junit.Assert;
import org.junit.Test;
import org.junit.platform.runner.JUnitPlatform;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.*;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@RunWith(JUnitPlatform.class)
@RunWith(Parameterized.class)
public class PerNodeDataEventTest extends AbstractIntegrationBase {
private String receivedKey = "";
@ -43,25 +41,39 @@ public class PerNodeDataEventTest extends AbstractIntegrationBase {
private Object receivingNodeDataNewValue = "";
private Object receivingNodeDataOldValue = "";
private Semaphore lock = new Semaphore(0);
private int base;
private boolean bulkTransfer;
public PerNodeDataEventTest(int base, boolean bulkTransfer) {
this.base = base;
this.bulkTransfer = bulkTransfer;
}
@Parameterized.Parameters(name = "{index} bulkTransfer={1}")
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][]{
{50000, false}, {55000, true}
});
}
@Test
public void perNodeDataEventTest()
throws InterruptedException, UnknownHostException, URISyntaxException {
GossipSettings settings = new GossipSettings();
settings.setPersistRingState(false);
settings.setPersistDataState(false);
settings.setBulkTransfer(bulkTransfer);
String cluster = UUID.randomUUID().toString();
int seedNodes = 1;
List<Member> startupMembers = new ArrayList<>();
for (int i = 1; i < seedNodes + 1; ++i) {
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (base + i));
startupMembers.add(new RemoteMember(cluster, uri, i + ""));
}
final List<GossipManager> clients = new ArrayList<>();
final int clusterMembers = 2;
for (int i = 1; i < clusterMembers + 1; ++i) {
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (base + i));
GossipManager gossipService = GossipManagerBuilder.newBuilder().cluster(cluster).uri(uri)
.id(i + "").gossipMembers(startupMembers).gossipSettings(settings).build();
clients.add(gossipService);

View File

@ -24,19 +24,17 @@ import org.apache.gossip.manager.GossipManagerBuilder;
import org.apache.gossip.model.SharedDataMessage;
import org.junit.Assert;
import org.junit.Test;
import org.junit.platform.runner.JUnitPlatform;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.*;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@RunWith(JUnitPlatform.class)
@RunWith(Parameterized.class)
public class SharedDataEventTest extends AbstractIntegrationBase {
private String receivedKey = "";
@ -44,6 +42,20 @@ public class SharedDataEventTest extends AbstractIntegrationBase {
private Object receivingNodeDataOldValue = "";
private String gCounterKey = "gCounter";
private Semaphore lock = new Semaphore(0);
private int base;
private boolean bulkTransfer;
public SharedDataEventTest(int base, boolean bulkTransfer) {
this.base = base;
this.bulkTransfer = bulkTransfer;
}
@Parameterized.Parameters(name = "{index} bulkTransfer={1}")
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][]{
{50000, false}, {55000, true}
});
}
@Test
public void sharedDataEventTest()
@ -51,17 +63,18 @@ public class SharedDataEventTest extends AbstractIntegrationBase {
GossipSettings settings = new GossipSettings();
settings.setPersistRingState(false);
settings.setPersistDataState(false);
settings.setBulkTransfer(bulkTransfer);
String cluster = UUID.randomUUID().toString();
int seedNodes = 1;
List<Member> startupMembers = new ArrayList<>();
for (int i = 1; i < seedNodes + 1; ++i) {
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (base + i));
startupMembers.add(new RemoteMember(cluster, uri, i + ""));
}
final List<GossipManager> clients = new ArrayList<>();
final int clusterMembers = 2;
for (int i = 1; i < clusterMembers + 1; ++i) {
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (base + i));
GossipManager gossipService = GossipManagerBuilder.newBuilder().cluster(cluster).uri(uri)
.id(i + "").gossipMembers(startupMembers).gossipSettings(settings).build();
clients.add(gossipService);
@ -126,13 +139,13 @@ public class SharedDataEventTest extends AbstractIntegrationBase {
int seedNodes = 1;
List<Member> startupMembers = new ArrayList<>();
for (int i = 1; i < seedNodes + 1; ++i) {
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (base + i));
startupMembers.add(new RemoteMember(cluster, uri, i + ""));
}
final List<GossipManager> clients = new ArrayList<>();
final int clusterMembers = 3;
for (int i = 1; i < clusterMembers + 1; ++i) {
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (base + i));
GossipManager gossipService = GossipManagerBuilder.newBuilder().cluster(cluster).uri(uri)
.id(i + "").gossipMembers(startupMembers).gossipSettings(settings).build();
clients.add(gossipService);

View File

@ -49,7 +49,7 @@ public class ShutdownDeadtimeTest {
@Test
public void DeadNodesDoNotComeAliveAgain()
throws InterruptedException, UnknownHostException, URISyntaxException {
GossipSettings settings = new GossipSettings(100, 10000, 1000, 1, 10.0, "normal");
GossipSettings settings = new GossipSettings(100, 10000, 1000, 1, 10.0, "normal", false);
settings.setPersistRingState(false);
settings.setPersistDataState(false);
String cluster = UUID.randomUUID().toString();

View File

@ -47,7 +47,7 @@ public class TenNodeThreeSeedTest {
}
public void abc(int base) throws InterruptedException, UnknownHostException, URISyntaxException {
GossipSettings settings = new GossipSettings(1000, 10000, 1000, 1, 1.6, "exponential");
GossipSettings settings = new GossipSettings(1000, 10000, 1000, 1, 1.6, "exponential", false);
settings.setPersistRingState(false);
settings.setPersistDataState(false);
String cluster = UUID.randomUUID().toString();