diff --git a/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java b/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java index 4ea0ab6..2fe8a0c 100644 --- a/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java +++ b/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java @@ -68,6 +68,10 @@ public class GossipSettings { private LockManagerSettings lockManagerSettings = LockManagerSettings .getLockManagerDefaultSettings(); + private boolean bulkTransfer = false; + + private int bulkTransferSize = StartupSettings.DEFAULT_BULK_TRANSFER_SIZE; + /** * Construct GossipSettings with default settings. */ @@ -82,14 +86,15 @@ public class GossipSettings { * @param cleanupInterval * The cleanup interval in ms. */ - public GossipSettings(int gossipInterval, int cleanupInterval, int windowSize, - int minimumSamples, double convictThreshold, String distribution) { + public GossipSettings(int gossipInterval, int cleanupInterval, int windowSize, int minimumSamples, + double convictThreshold, String distribution, boolean bulkTransfer) { this.gossipInterval = gossipInterval; this.cleanupInterval = cleanupInterval; this.windowSize = windowSize; this.minimumSamples = minimumSamples; this.convictThreshold = convictThreshold; this.distribution = distribution; + this.bulkTransfer = bulkTransfer; } /** @@ -258,4 +263,20 @@ public class GossipSettings { public void setLockManagerSettings(LockManagerSettings lockManagerSettings) { this.lockManagerSettings = lockManagerSettings; } + + public boolean isBulkTransfer() { + return bulkTransfer; + } + + public void setBulkTransfer(boolean bulkTransfer) { + this.bulkTransfer = bulkTransfer; + } + + public int getBulkTransferSize() { + return bulkTransferSize; + } + + public void setBulkTransferSize(int bulkTransferSize) { + this.bulkTransferSize = bulkTransferSize; + } } diff --git a/gossip-base/src/main/java/org/apache/gossip/StartupSettings.java b/gossip-base/src/main/java/org/apache/gossip/StartupSettings.java index dd30e88..23608f2 100644 --- a/gossip-base/src/main/java/org/apache/gossip/StartupSettings.java +++ b/gossip-base/src/main/java/org/apache/gossip/StartupSettings.java @@ -55,6 +55,10 @@ public class StartupSettings { /** The list with gossip members to start with. */ private final List gossipMembers; + /** Default setting values */ + private static final boolean DEFAULT_BULK_TRANSFER = false; + public static final int DEFAULT_BULK_TRANSFER_SIZE = 100; + /** * Constructor. * @@ -175,6 +179,7 @@ public class StartupSettings { properties.put(i.getKey(), i.getValue().asText()); } //TODO constants as defaults? + // TODO setting keys as constants? int gossipInterval = jsonObject.get("gossip_interval").intValue(); int cleanupInterval = jsonObject.get("cleanup_interval").intValue(); int windowSize = jsonObject.get("window_size").intValue(); @@ -182,6 +187,12 @@ public class StartupSettings { double convictThreshold = jsonObject.get("convict_threshold").asDouble(); String cluster = jsonObject.get("cluster").textValue(); String distribution = jsonObject.get("distribution").textValue(); + boolean bulkTransfer = jsonObject.has("bulk_transfer") ? + jsonObject.get("bulk_transfer").booleanValue() : + DEFAULT_BULK_TRANSFER; + int bulkTransferSize = jsonObject.has("bulk_transfer_size") ? + jsonObject.get("bulk_transfer_size").intValue() : + DEFAULT_BULK_TRANSFER_SIZE; if (cluster == null){ throw new IllegalArgumentException("cluster was null. It is required"); } @@ -192,8 +203,9 @@ public class StartupSettings { jsonObject.get("protocol_manager_class").textValue() : null; URI uri2 = new URI(uri); - GossipSettings gossipSettings = new GossipSettings(gossipInterval, cleanupInterval, windowSize, minSamples, - convictThreshold, distribution); + GossipSettings gossipSettings = new GossipSettings(gossipInterval, cleanupInterval, windowSize, + minSamples, convictThreshold, distribution, bulkTransfer); + gossipSettings.setBulkTransferSize(bulkTransferSize); if (transportClass != null) { gossipSettings.setTransportManagerClass(transportClass); } diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.java b/gossip-base/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.java index adf2530..4bd44f2 100644 --- a/gossip-base/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.java +++ b/gossip-base/src/main/java/org/apache/gossip/manager/AbstractActiveGossiper.java @@ -24,6 +24,7 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import com.codahale.metrics.Histogram; import com.codahale.metrics.MetricRegistry; +import org.apache.gossip.GossipSettings; import org.apache.gossip.LocalMember; import org.apache.gossip.model.ActiveGossipOk; import org.apache.gossip.model.PerNodeDataMessage; @@ -31,40 +32,40 @@ import org.apache.gossip.model.Member; import org.apache.gossip.model.Response; import org.apache.gossip.model.SharedDataMessage; import org.apache.gossip.model.ShutdownMessage; -import org.apache.gossip.udp.UdpActiveGossipMessage; -import org.apache.gossip.udp.UdpPerNodeDataMessage; -import org.apache.gossip.udp.UdpSharedDataMessage; +import org.apache.gossip.udp.*; import org.apache.log4j.Logger; import static com.codahale.metrics.MetricRegistry.name; /** - * The ActiveGossipThread is sends information. Pick a random partner and send the membership list to that partner + * The ActiveGossipThread sends information. Pick a random partner and send the membership list to that partner */ public abstract class AbstractActiveGossiper { protected static final Logger LOGGER = Logger.getLogger(AbstractActiveGossiper.class); - + protected final GossipManager gossipManager; protected final GossipCore gossipCore; private final Histogram sharedDataHistogram; private final Histogram sendPerNodeDataHistogram; - private final Histogram sendMembershipHistorgram; + private final Histogram sendMembershipHistogram; private final Random random; + private final GossipSettings gossipSettings; public AbstractActiveGossiper(GossipManager gossipManager, GossipCore gossipCore, MetricRegistry registry) { this.gossipManager = gossipManager; this.gossipCore = gossipCore; sharedDataHistogram = registry.histogram(name(AbstractActiveGossiper.class, "sharedDataHistogram-time")); sendPerNodeDataHistogram = registry.histogram(name(AbstractActiveGossiper.class, "sendPerNodeDataHistogram-time")); - sendMembershipHistorgram = registry.histogram(name(AbstractActiveGossiper.class, "sendMembershipHistorgram-time")); + sendMembershipHistogram = registry.histogram(name(AbstractActiveGossiper.class, "sendMembershipHistogram-time")); random = new Random(); + gossipSettings = gossipManager.getSettings(); } public void init() { } - + public void shutdown() { } @@ -78,12 +79,22 @@ public abstract class AbstractActiveGossiper { m.setShutdownAtNanos(gossipManager.getClock().nanoTime()); gossipCore.sendOneWay(m, target.getUri()); } - - public final void sendSharedData(LocalMember me, LocalMember member){ - if (member == null){ + + public final void sendSharedData(LocalMember me, LocalMember member) { + if (member == null) { return; } long startTime = System.currentTimeMillis(); + if (gossipSettings.isBulkTransfer()) { + sendSharedDataInBulkInternal(me, member); + } else { + sendSharedDataInternal(me, member); + } + sharedDataHistogram.update(System.currentTimeMillis() - startTime); + } + + /** Send shared data one entry at a time. */ + private void sendSharedDataInternal(LocalMember me, LocalMember member) { for (Entry innerEntry : gossipCore.getSharedData().entrySet()){ if (innerEntry.getValue().getReplicable() != null && !innerEntry.getValue().getReplicable() .shouldReplicate(me, member, innerEntry.getValue())) { @@ -92,22 +103,60 @@ public abstract class AbstractActiveGossiper { UdpSharedDataMessage message = new UdpSharedDataMessage(); message.setUuid(UUID.randomUUID().toString()); message.setUriFrom(me.getId()); - message.setExpireAt(innerEntry.getValue().getExpireAt()); - message.setKey(innerEntry.getValue().getKey()); - message.setNodeId(innerEntry.getValue().getNodeId()); - message.setTimestamp(innerEntry.getValue().getTimestamp()); - message.setPayload(innerEntry.getValue().getPayload()); - message.setReplicable(innerEntry.getValue().getReplicable()); + copySharedDataMessage(innerEntry.getValue(), message); gossipCore.sendOneWay(message, member.getUri()); } - sharedDataHistogram.update(System.currentTimeMillis() - startTime); } - + + /** Send shared data by batching together several entries. */ + private void sendSharedDataInBulkInternal(LocalMember me, LocalMember member) { + UdpSharedDataBulkMessage udpMessage = new UdpSharedDataBulkMessage(); + udpMessage.setUuid(UUID.randomUUID().toString()); + udpMessage.setUriFrom(me.getId()); + for (Entry innerEntry : gossipCore.getSharedData().entrySet()) { + if (innerEntry.getValue().getReplicable() != null && !innerEntry.getValue().getReplicable() + .shouldReplicate(me, member, innerEntry.getValue())) { + continue; + } + SharedDataMessage message = new SharedDataMessage(); + copySharedDataMessage(innerEntry.getValue(), message); + udpMessage.addMessage(message); + if (udpMessage.getMessages().size() == gossipSettings.getBulkTransferSize()) { + gossipCore.sendOneWay(udpMessage, member.getUri()); + udpMessage = new UdpSharedDataBulkMessage(); + udpMessage.setUuid(UUID.randomUUID().toString()); + udpMessage.setUriFrom(me.getId()); + } + } + if (udpMessage.getMessages().size() > 0) { + gossipCore.sendOneWay(udpMessage, member.getUri()); + } + } + + private void copySharedDataMessage(SharedDataMessage original, SharedDataMessage copy) { + copy.setExpireAt(original.getExpireAt()); + copy.setKey(original.getKey()); + copy.setNodeId(original.getNodeId()); + copy.setTimestamp(original.getTimestamp()); + copy.setPayload(original.getPayload()); + copy.setReplicable(original.getReplicable()); + } + public final void sendPerNodeData(LocalMember me, LocalMember member){ if (member == null){ return; } long startTime = System.currentTimeMillis(); + if (gossipSettings.isBulkTransfer()) { + sendPerNodeDataInBulkInternal(me, member); + } else { + sendPerNodeDataInternal(me, member); + } + sendPerNodeDataHistogram.update(System.currentTimeMillis() - startTime); + } + + /** Send per node data one entry at a time. */ + private void sendPerNodeDataInternal(LocalMember me, LocalMember member) { for (Entry> entry : gossipCore.getPerNodeData().entrySet()){ for (Entry innerEntry : entry.getValue().entrySet()){ if (innerEntry.getValue().getReplicable() != null && !innerEntry.getValue().getReplicable() @@ -117,18 +166,49 @@ public abstract class AbstractActiveGossiper { UdpPerNodeDataMessage message = new UdpPerNodeDataMessage(); message.setUuid(UUID.randomUUID().toString()); message.setUriFrom(me.getId()); - message.setExpireAt(innerEntry.getValue().getExpireAt()); - message.setKey(innerEntry.getValue().getKey()); - message.setNodeId(innerEntry.getValue().getNodeId()); - message.setTimestamp(innerEntry.getValue().getTimestamp()); - message.setPayload(innerEntry.getValue().getPayload()); - message.setReplicable(innerEntry.getValue().getReplicable()); - gossipCore.sendOneWay(message, member.getUri()); + copyPerNodeDataMessage(innerEntry.getValue(), message); + gossipCore.sendOneWay(message, member.getUri()); } } - sendPerNodeDataHistogram.update(System.currentTimeMillis() - startTime); + } - + + /** Send per node data by batching together several entries. */ + private void sendPerNodeDataInBulkInternal(LocalMember me, LocalMember member) { + for (Entry> entry : gossipCore.getPerNodeData().entrySet()){ + UdpPerNodeDataBulkMessage udpMessage = new UdpPerNodeDataBulkMessage(); + udpMessage.setUuid(UUID.randomUUID().toString()); + udpMessage.setUriFrom(me.getId()); + for (Entry innerEntry : entry.getValue().entrySet()){ + if (innerEntry.getValue().getReplicable() != null && !innerEntry.getValue().getReplicable() + .shouldReplicate(me, member, innerEntry.getValue())) { + continue; + } + PerNodeDataMessage message = new PerNodeDataMessage(); + copyPerNodeDataMessage(innerEntry.getValue(), message); + udpMessage.addMessage(message); + if (udpMessage.getMessages().size() == gossipSettings.getBulkTransferSize()) { + gossipCore.sendOneWay(udpMessage, member.getUri()); + udpMessage = new UdpPerNodeDataBulkMessage(); + udpMessage.setUuid(UUID.randomUUID().toString()); + udpMessage.setUriFrom(me.getId()); + } + } + if (udpMessage.getMessages().size() > 0) { + gossipCore.sendOneWay(udpMessage, member.getUri()); + } + } + } + + private void copyPerNodeDataMessage(PerNodeDataMessage original, PerNodeDataMessage copy) { + copy.setExpireAt(original.getExpireAt()); + copy.setKey(original.getKey()); + copy.setNodeId(original.getNodeId()); + copy.setTimestamp(original.getTimestamp()); + copy.setPayload(original.getPayload()); + copy.setReplicable(original.getReplicable()); + } + /** * Performs the sending of the membership list, after we have incremented our own heartbeat. */ @@ -151,9 +231,9 @@ public abstract class AbstractActiveGossiper { } else { LOGGER.debug("Message " + message + " generated response " + r); } - sendMembershipHistorgram.update(System.currentTimeMillis() - startTime); + sendMembershipHistogram.update(System.currentTimeMillis() - startTime); } - + protected final Member convert(LocalMember member){ Member gm = new Member(); gm.setCluster(member.getClusterName()); @@ -163,9 +243,9 @@ public abstract class AbstractActiveGossiper { gm.setProperties(member.getProperties()); return gm; } - + /** - * + * * @param memberList * An immutable list * @return The chosen LocalGossipMember to gossip with. diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/MessageHandlerFactory.java b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/MessageHandlerFactory.java index fff9430..abce76d 100644 --- a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/MessageHandlerFactory.java +++ b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/MessageHandlerFactory.java @@ -20,37 +20,36 @@ package org.apache.gossip.manager.handlers; import org.apache.gossip.manager.GossipCore; import org.apache.gossip.manager.GossipManager; -import org.apache.gossip.model.ActiveGossipMessage; -import org.apache.gossip.model.Base; -import org.apache.gossip.model.PerNodeDataMessage; -import org.apache.gossip.model.Response; -import org.apache.gossip.model.SharedDataMessage; -import org.apache.gossip.model.ShutdownMessage; +import org.apache.gossip.model.*; import java.util.Arrays; public class MessageHandlerFactory { - + public static MessageHandler defaultHandler() { return concurrentHandler( new TypedMessageHandler(Response.class, new ResponseHandler()), new TypedMessageHandler(ShutdownMessage.class, new ShutdownMessageHandler()), new TypedMessageHandler(PerNodeDataMessage.class, new PerNodeDataMessageHandler()), new TypedMessageHandler(SharedDataMessage.class, new SharedDataMessageHandler()), - new TypedMessageHandler(ActiveGossipMessage.class, new ActiveGossipMessageHandler()) + new TypedMessageHandler(ActiveGossipMessage.class, new ActiveGossipMessageHandler()), + new TypedMessageHandler(PerNodeDataBulkMessage.class, new PerNodeDataBulkMessageHandler()), + new TypedMessageHandler(SharedDataBulkMessage.class, new SharedDataBulkMessageHandler()) ); } - + public static MessageHandler concurrentHandler(MessageHandler... handlers) { - if (handlers == null) throw new NullPointerException("handlers cannot be null"); + if (handlers == null) + throw new NullPointerException("handlers cannot be null"); if (Arrays.asList(handlers).stream().filter(i -> i != null).count() != handlers.length) { throw new NullPointerException("found at least one null handler"); } return new MessageHandler() { - @Override - public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) { + @Override public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, + Base base) { // return true if at least one of the component handlers return true. - return Arrays.asList(handlers).stream().filter((mi) -> mi.invoke(gossipCore, gossipManager, base)).count() > 0; + return Arrays.asList(handlers).stream() + .filter((mi) -> mi.invoke(gossipCore, gossipManager, base)).count() > 0; } }; } diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/PerNodeDataBulkMessageHandler.java b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/PerNodeDataBulkMessageHandler.java new file mode 100644 index 0000000..37024e9 --- /dev/null +++ b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/PerNodeDataBulkMessageHandler.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.manager.handlers; + +import org.apache.gossip.manager.GossipCore; +import org.apache.gossip.manager.GossipManager; +import org.apache.gossip.model.Base; +import org.apache.gossip.model.PerNodeDataMessage; +import org.apache.gossip.udp.UdpPerNodeDataBulkMessage; + +public class PerNodeDataBulkMessageHandler implements MessageHandler { + + /** + * @param gossipCore context. + * @param gossipManager context. + * @param base message reference. + * @return boolean indicating success. + */ + @Override + public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) { + UdpPerNodeDataBulkMessage udpMessage = (UdpPerNodeDataBulkMessage) base; + for (PerNodeDataMessage dataMsg: udpMessage.getMessages()) + gossipCore.addPerNodeData(dataMsg); + return true; + } +} diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/PerNodeDataMessageHandler.java b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/PerNodeDataMessageHandler.java index 0ad0d91..4ac47b9 100644 --- a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/PerNodeDataMessageHandler.java +++ b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/PerNodeDataMessageHandler.java @@ -23,7 +23,7 @@ import org.apache.gossip.model.Base; import org.apache.gossip.udp.UdpPerNodeDataMessage; public class PerNodeDataMessageHandler implements MessageHandler { - + /** * @param gossipCore context. * @param gossipManager context. diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/handlers/SharedDataBulkMessageHandler.java b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/SharedDataBulkMessageHandler.java new file mode 100644 index 0000000..a062f95 --- /dev/null +++ b/gossip-base/src/main/java/org/apache/gossip/manager/handlers/SharedDataBulkMessageHandler.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.manager.handlers; + +import org.apache.gossip.manager.GossipCore; +import org.apache.gossip.manager.GossipManager; +import org.apache.gossip.model.Base; +import org.apache.gossip.model.SharedDataMessage; +import org.apache.gossip.udp.UdpSharedDataBulkMessage; + +public class SharedDataBulkMessageHandler implements MessageHandler{ + + /** + * @param gossipCore context. + * @param gossipManager context. + * @param base message reference. + * @return boolean indicating success. + */ + @Override + public boolean invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) { + UdpSharedDataBulkMessage udpMessage = (UdpSharedDataBulkMessage) base; + for (SharedDataMessage dataMsg: udpMessage.getMessages()) + gossipCore.addSharedData(dataMsg); + return true; + } +} diff --git a/gossip-base/src/main/java/org/apache/gossip/model/Base.java b/gossip-base/src/main/java/org/apache/gossip/model/Base.java index 1b66310..e9183b0 100644 --- a/gossip-base/src/main/java/org/apache/gossip/model/Base.java +++ b/gossip-base/src/main/java/org/apache/gossip/model/Base.java @@ -19,9 +19,9 @@ package org.apache.gossip.model; import org.apache.gossip.udp.UdpActiveGossipMessage; import org.apache.gossip.udp.UdpActiveGossipOk; -import org.apache.gossip.udp.UdpPerNodeDataMessage; +import org.apache.gossip.udp.UdpPerNodeDataBulkMessage; import org.apache.gossip.udp.UdpNotAMemberFault; -import org.apache.gossip.udp.UdpSharedDataMessage; +import org.apache.gossip.udp.UdpSharedDataBulkMessage; import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.fasterxml.jackson.annotation.JsonSubTypes; @@ -40,9 +40,9 @@ import com.fasterxml.jackson.annotation.JsonSubTypes.Type; @Type(value = UdpActiveGossipMessage.class, name = "UdpActiveGossipMessage"), @Type(value = UdpNotAMemberFault.class, name = "UdpNotAMemberFault"), @Type(value = PerNodeDataMessage.class, name = "PerNodeDataMessage"), - @Type(value = UdpPerNodeDataMessage.class, name = "UdpPerNodeDataMessage"), + @Type(value = UdpPerNodeDataBulkMessage.class, name = "UdpPerNodeDataMessage"), @Type(value = SharedDataMessage.class, name = "SharedDataMessage"), - @Type(value = UdpSharedDataMessage.class, name = "UdpSharedDataMessage") + @Type(value = UdpSharedDataBulkMessage.class, name = "UdpSharedDataMessage") }) public class Base { diff --git a/gossip-base/src/main/java/org/apache/gossip/model/PerNodeDataBulkMessage.java b/gossip-base/src/main/java/org/apache/gossip/model/PerNodeDataBulkMessage.java new file mode 100644 index 0000000..bb138a5 --- /dev/null +++ b/gossip-base/src/main/java/org/apache/gossip/model/PerNodeDataBulkMessage.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.model; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +public class PerNodeDataBulkMessage extends Base { + private List messages = new ArrayList<>(); + + public void addMessage(PerNodeDataMessage msg) { + messages.add(msg); + } + + public List getMessages() { + return messages; + } + + @Override public String toString() { + return "GossipDataBulkMessage[" + messages.stream().map(Object::toString) + .collect(Collectors.joining(",")) + "]"; + } +} diff --git a/gossip-base/src/main/java/org/apache/gossip/model/SharedDataBulkMessage.java b/gossip-base/src/main/java/org/apache/gossip/model/SharedDataBulkMessage.java new file mode 100644 index 0000000..7b67430 --- /dev/null +++ b/gossip-base/src/main/java/org/apache/gossip/model/SharedDataBulkMessage.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.model; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +public class SharedDataBulkMessage extends Base { + private List messages = new ArrayList<>(); + + public void addMessage(SharedDataMessage msg) { + messages.add(msg); + } + + public List getMessages() { + return messages; + } + + @Override public String toString() { + return "SharedGossipDataBulkMessage[" + messages.stream().map(Object::toString) + .collect(Collectors.joining(",")) + "]"; + } +} diff --git a/gossip-base/src/main/java/org/apache/gossip/model/SharedDataMessage.java b/gossip-base/src/main/java/org/apache/gossip/model/SharedDataMessage.java index 4b1a1ea..e148189 100644 --- a/gossip-base/src/main/java/org/apache/gossip/model/SharedDataMessage.java +++ b/gossip-base/src/main/java/org/apache/gossip/model/SharedDataMessage.java @@ -28,7 +28,7 @@ public class SharedDataMessage extends Base { private Long timestamp; private Long expireAt; private Replicable replicable; - + public String getNodeId() { return nodeId; } diff --git a/gossip-base/src/main/java/org/apache/gossip/udp/UdpPerNodeDataBulkMessage.java b/gossip-base/src/main/java/org/apache/gossip/udp/UdpPerNodeDataBulkMessage.java new file mode 100644 index 0000000..99eb1e5 --- /dev/null +++ b/gossip-base/src/main/java/org/apache/gossip/udp/UdpPerNodeDataBulkMessage.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.udp; + +import org.apache.gossip.model.PerNodeDataBulkMessage; + +public class UdpPerNodeDataBulkMessage extends PerNodeDataBulkMessage implements Trackable { + + private String uriFrom; + private String uuid; + + public String getUriFrom() { + return uriFrom; + } + + public void setUriFrom(String uriFrom) { + this.uriFrom = uriFrom; + } + + public String getUuid() { + return uuid; + } + + public void setUuid(String uuid) { + this.uuid = uuid; + } + + @Override + public String toString() { + return "UdpGossipDataMessage [uriFrom=" + uriFrom + ", uuid=" + uuid + + ", messages=[" + super.toString() + "] ]"; + } + +} diff --git a/gossip-base/src/main/java/org/apache/gossip/udp/UdpSharedDataBulkMessage.java b/gossip-base/src/main/java/org/apache/gossip/udp/UdpSharedDataBulkMessage.java new file mode 100644 index 0000000..8dc8be1 --- /dev/null +++ b/gossip-base/src/main/java/org/apache/gossip/udp/UdpSharedDataBulkMessage.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.udp; + +import org.apache.gossip.model.SharedDataBulkMessage; + +public class UdpSharedDataBulkMessage extends SharedDataBulkMessage implements Trackable { + + private String uriFrom; + private String uuid; + + public String getUriFrom() { + return uriFrom; + } + + public void setUriFrom(String uriFrom) { + this.uriFrom = uriFrom; + } + + public String getUuid() { + return uuid; + } + + public void setUuid(String uuid) { + this.uuid = uuid; + } + + @Override + public String toString() { + return "UdpSharedGossipDataMessage [uriFrom=" + uriFrom + ", uuid=" + uuid + ", getNodeId()=" + + ", messages=[" + super.toString() + "] ]"; + } + +} diff --git a/gossip-itest/src/test/java/org/apache/gossip/PerNodeDataEventTest.java b/gossip-itest/src/test/java/org/apache/gossip/PerNodeDataEventTest.java index 59136d1..cc269f9 100644 --- a/gossip-itest/src/test/java/org/apache/gossip/PerNodeDataEventTest.java +++ b/gossip-itest/src/test/java/org/apache/gossip/PerNodeDataEventTest.java @@ -23,19 +23,17 @@ import org.apache.gossip.manager.GossipManagerBuilder; import org.apache.gossip.model.PerNodeDataMessage; import org.junit.Assert; import org.junit.Test; -import org.junit.platform.runner.JUnitPlatform; import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.net.URI; import java.net.URISyntaxException; import java.net.UnknownHostException; -import java.util.ArrayList; -import java.util.List; -import java.util.UUID; +import java.util.*; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; -@RunWith(JUnitPlatform.class) +@RunWith(Parameterized.class) public class PerNodeDataEventTest extends AbstractIntegrationBase { private String receivedKey = ""; @@ -43,25 +41,39 @@ public class PerNodeDataEventTest extends AbstractIntegrationBase { private Object receivingNodeDataNewValue = ""; private Object receivingNodeDataOldValue = ""; private Semaphore lock = new Semaphore(0); - - + private int base; + private boolean bulkTransfer; + + public PerNodeDataEventTest(int base, boolean bulkTransfer) { + this.base = base; + this.bulkTransfer = bulkTransfer; + } + + @Parameterized.Parameters(name = "{index} bulkTransfer={1}") + public static Collection data() { + return Arrays.asList(new Object[][]{ + {50000, false}, {55000, true} + }); + } + @Test public void perNodeDataEventTest() throws InterruptedException, UnknownHostException, URISyntaxException { GossipSettings settings = new GossipSettings(); settings.setPersistRingState(false); settings.setPersistDataState(false); + settings.setBulkTransfer(bulkTransfer); String cluster = UUID.randomUUID().toString(); int seedNodes = 1; List startupMembers = new ArrayList<>(); for (int i = 1; i < seedNodes + 1; ++i) { - URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i)); + URI uri = new URI("udp://" + "127.0.0.1" + ":" + (base + i)); startupMembers.add(new RemoteMember(cluster, uri, i + "")); } final List clients = new ArrayList<>(); final int clusterMembers = 2; for (int i = 1; i < clusterMembers + 1; ++i) { - URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i)); + URI uri = new URI("udp://" + "127.0.0.1" + ":" + (base + i)); GossipManager gossipService = GossipManagerBuilder.newBuilder().cluster(cluster).uri(uri) .id(i + "").gossipMembers(startupMembers).gossipSettings(settings).build(); clients.add(gossipService); diff --git a/gossip-itest/src/test/java/org/apache/gossip/SharedDataEventTest.java b/gossip-itest/src/test/java/org/apache/gossip/SharedDataEventTest.java index 8dbfcb3..547cd45 100644 --- a/gossip-itest/src/test/java/org/apache/gossip/SharedDataEventTest.java +++ b/gossip-itest/src/test/java/org/apache/gossip/SharedDataEventTest.java @@ -24,19 +24,17 @@ import org.apache.gossip.manager.GossipManagerBuilder; import org.apache.gossip.model.SharedDataMessage; import org.junit.Assert; import org.junit.Test; -import org.junit.platform.runner.JUnitPlatform; import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.net.URI; import java.net.URISyntaxException; import java.net.UnknownHostException; -import java.util.ArrayList; -import java.util.List; -import java.util.UUID; +import java.util.*; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; -@RunWith(JUnitPlatform.class) +@RunWith(Parameterized.class) public class SharedDataEventTest extends AbstractIntegrationBase { private String receivedKey = ""; @@ -44,6 +42,20 @@ public class SharedDataEventTest extends AbstractIntegrationBase { private Object receivingNodeDataOldValue = ""; private String gCounterKey = "gCounter"; private Semaphore lock = new Semaphore(0); + private int base; + private boolean bulkTransfer; + + public SharedDataEventTest(int base, boolean bulkTransfer) { + this.base = base; + this.bulkTransfer = bulkTransfer; + } + + @Parameterized.Parameters(name = "{index} bulkTransfer={1}") + public static Collection data() { + return Arrays.asList(new Object[][]{ + {50000, false}, {55000, true} + }); + } @Test public void sharedDataEventTest() @@ -51,17 +63,18 @@ public class SharedDataEventTest extends AbstractIntegrationBase { GossipSettings settings = new GossipSettings(); settings.setPersistRingState(false); settings.setPersistDataState(false); + settings.setBulkTransfer(bulkTransfer); String cluster = UUID.randomUUID().toString(); int seedNodes = 1; List startupMembers = new ArrayList<>(); for (int i = 1; i < seedNodes + 1; ++i) { - URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i)); + URI uri = new URI("udp://" + "127.0.0.1" + ":" + (base + i)); startupMembers.add(new RemoteMember(cluster, uri, i + "")); } final List clients = new ArrayList<>(); final int clusterMembers = 2; for (int i = 1; i < clusterMembers + 1; ++i) { - URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i)); + URI uri = new URI("udp://" + "127.0.0.1" + ":" + (base + i)); GossipManager gossipService = GossipManagerBuilder.newBuilder().cluster(cluster).uri(uri) .id(i + "").gossipMembers(startupMembers).gossipSettings(settings).build(); clients.add(gossipService); @@ -126,13 +139,13 @@ public class SharedDataEventTest extends AbstractIntegrationBase { int seedNodes = 1; List startupMembers = new ArrayList<>(); for (int i = 1; i < seedNodes + 1; ++i) { - URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i)); + URI uri = new URI("udp://" + "127.0.0.1" + ":" + (base + i)); startupMembers.add(new RemoteMember(cluster, uri, i + "")); } final List clients = new ArrayList<>(); final int clusterMembers = 3; for (int i = 1; i < clusterMembers + 1; ++i) { - URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i)); + URI uri = new URI("udp://" + "127.0.0.1" + ":" + (base + i)); GossipManager gossipService = GossipManagerBuilder.newBuilder().cluster(cluster).uri(uri) .id(i + "").gossipMembers(startupMembers).gossipSettings(settings).build(); clients.add(gossipService); diff --git a/gossip-itest/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java b/gossip-itest/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java index dd5bfe9..e5c3639 100644 --- a/gossip-itest/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java +++ b/gossip-itest/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java @@ -49,7 +49,7 @@ public class ShutdownDeadtimeTest { @Test public void DeadNodesDoNotComeAliveAgain() throws InterruptedException, UnknownHostException, URISyntaxException { - GossipSettings settings = new GossipSettings(100, 10000, 1000, 1, 10.0, "normal"); + GossipSettings settings = new GossipSettings(100, 10000, 1000, 1, 10.0, "normal", false); settings.setPersistRingState(false); settings.setPersistDataState(false); String cluster = UUID.randomUUID().toString(); diff --git a/gossip-itest/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java b/gossip-itest/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java index 8ae783e..eb9abcd 100644 --- a/gossip-itest/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java +++ b/gossip-itest/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java @@ -47,7 +47,7 @@ public class TenNodeThreeSeedTest { } public void abc(int base) throws InterruptedException, UnknownHostException, URISyntaxException { - GossipSettings settings = new GossipSettings(1000, 10000, 1000, 1, 1.6, "exponential"); + GossipSettings settings = new GossipSettings(1000, 10000, 1000, 1, 1.6, "exponential", false); settings.setPersistRingState(false); settings.setPersistDataState(false); String cluster = UUID.randomUUID().toString();