diff --git a/gossip-base/pom.xml b/gossip-base/pom.xml index 3529bd1..b9739f6 100644 --- a/gossip-base/pom.xml +++ b/gossip-base/pom.xml @@ -1,4 +1,21 @@ + @@ -35,36 +52,6 @@ io.dropwizard.metrics metrics-core ${metrics.version} - - org.junit.jupiter - junit-jupiter-api - ${junit.jupiter.version} - test - - - org.junit.jupiter - junit-jupiter-engine - ${junit.jupiter.version} - test - - - org.junit.vintage - junit-vintage-engine - ${junit.vintage.version} - test - - - org.junit.platform - junit-platform-runner - ${junit.platform.version} - test - - - io.teknek - tunit - ${tunit.version} - test - log4j log4j diff --git a/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java b/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java index e4a95d3..2ceb453 100644 --- a/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java +++ b/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java @@ -45,8 +45,8 @@ public class GossipSettings { private String activeGossipClass = "org.apache.gossip.manager.SimpleActiveGossipper"; - private String transportManagerClass = "org.apache.gossip.transport.UdpTransportManager"; - private String protocolManagerClass = "org.apache.gossip.protocol.JacksonProtocolManager"; + private String transportManagerClass = "org.apache.gossip.transport.udp.UdpTransportManager"; + private String protocolManagerClass = "org.apache.gossip.protocol.json.JacksonProtocolManager"; private Map activeGossipProperties = new HashMap<>(); @@ -230,7 +230,15 @@ public class GossipSettings { return transportManagerClass; } + public void setTransportManagerClass(String transportManagerClass) { + this.transportManagerClass = transportManagerClass; + } + public String getProtocolManagerClass() { return protocolManagerClass; } + + public void setProtocolManagerClass(String protocolManagerClass) { + this.protocolManagerClass = protocolManagerClass; + } } diff --git a/gossip-base/src/main/java/org/apache/gossip/StartupSettings.java b/gossip-base/src/main/java/org/apache/gossip/StartupSettings.java index 17eaaf2..dd30e88 100644 --- a/gossip-base/src/main/java/org/apache/gossip/StartupSettings.java +++ b/gossip-base/src/main/java/org/apache/gossip/StartupSettings.java @@ -185,10 +185,22 @@ public class StartupSettings { if (cluster == null){ throw new IllegalArgumentException("cluster was null. It is required"); } + String transportClass = jsonObject.has("transport_manager_class") ? + jsonObject.get("transport_manager_class").textValue() : + null; + String protocolClass = jsonObject.has("protocol_manager_class") ? + jsonObject.get("protocol_manager_class").textValue() : + null; URI uri2 = new URI(uri); - StartupSettings settings = new StartupSettings(id, uri2, - new GossipSettings(gossipInterval, cleanupInterval, windowSize, - minSamples, convictThreshold, distribution), cluster); + GossipSettings gossipSettings = new GossipSettings(gossipInterval, cleanupInterval, windowSize, minSamples, + convictThreshold, distribution); + if (transportClass != null) { + gossipSettings.setTransportManagerClass(transportClass); + } + if (protocolClass != null) { + gossipSettings.setProtocolManagerClass(protocolClass); + } + StartupSettings settings = new StartupSettings(id, uri2, gossipSettings, cluster); String configMembersDetails = "Config-members ["; JsonNode membersJSON = jsonObject.get("members"); Iterator it = membersJSON.iterator(); diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java b/gossip-base/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java index 30e39d5..03a874c 100644 --- a/gossip-base/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java +++ b/gossip-base/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java @@ -61,7 +61,10 @@ public class PassiveGossipThread implements Runnable { LOGGER.error("Unable to process message", ex); } } catch (IOException e) { - LOGGER.error(e); + // InterruptedException are completely normal here because of the blocking lifecycle. + if (!(e.getCause() instanceof InterruptedException)) { + LOGGER.error(e); + } keepRunning.set(false); } } diff --git a/gossip-base/src/main/java/org/apache/gossip/transport/AbstractTransportManager.java b/gossip-base/src/main/java/org/apache/gossip/transport/AbstractTransportManager.java index 497e605..33db038 100644 --- a/gossip-base/src/main/java/org/apache/gossip/transport/AbstractTransportManager.java +++ b/gossip-base/src/main/java/org/apache/gossip/transport/AbstractTransportManager.java @@ -66,7 +66,8 @@ public abstract class AbstractTransportManager implements TransportManager { try { boolean result = gossipThreadExecutor.awaitTermination(10, TimeUnit.MILLISECONDS); if (!result) { - LOGGER.error("executor shutdown timed out"); + // common when blocking patterns are used to read data from a socket. + LOGGER.warn("executor shutdown timed out"); } } catch (InterruptedException e) { LOGGER.error(e); diff --git a/gossip-base/src/test/java/org/apache/gossip/DataTest.java b/gossip-base/src/test/java/org/apache/gossip/DataTest.java index f0c2186..bb33dc2 100644 --- a/gossip-base/src/test/java/org/apache/gossip/DataTest.java +++ b/gossip-base/src/test/java/org/apache/gossip/DataTest.java @@ -47,6 +47,8 @@ public class DataTest extends AbstractIntegrationBase { GossipSettings settings = new GossipSettings(); settings.setPersistRingState(false); settings.setPersistDataState(false); + settings.setTransportManagerClass("org.apache.gossip.transport.UnitTestTransportManager"); + settings.setProtocolManagerClass("org.apache.gossip.protocol.UnitTestProtocolManager"); String cluster = UUID.randomUUID().toString(); int seedNodes = 1; List startupMembers = new ArrayList<>(); diff --git a/gossip-base/src/test/java/org/apache/gossip/IdAndPropertyTest.java b/gossip-base/src/test/java/org/apache/gossip/IdAndPropertyTest.java index 7f550de..1b6a32a 100644 --- a/gossip-base/src/test/java/org/apache/gossip/IdAndPropertyTest.java +++ b/gossip-base/src/test/java/org/apache/gossip/IdAndPropertyTest.java @@ -43,6 +43,8 @@ public class IdAndPropertyTest extends AbstractIntegrationBase { public void testDatacenterRackGossiper() throws URISyntaxException, UnknownHostException, InterruptedException { GossipSettings settings = new GossipSettings(); settings.setActiveGossipClass(DatacenterRackAwareActiveGossiper.class.getName()); + settings.setTransportManagerClass("org.apache.gossip.transport.UnitTestTransportManager"); + settings.setProtocolManagerClass("org.apache.gossip.protocol.UnitTestProtocolManager"); List startupMembers = new ArrayList<>(); Map x = new HashMap<>(); x.put("a", "b"); diff --git a/gossip-base/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java b/gossip-base/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java index 54005c3..30c52bc 100644 --- a/gossip-base/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java +++ b/gossip-base/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java @@ -44,10 +44,14 @@ public class ShutdownDeadtimeTest { private static final Logger log = Logger.getLogger(ShutdownDeadtimeTest.class); + // Note: this test is floppy depending on the values in GossipSettings (smaller values seem to do harm), and the + // sleep that happens after startup. @Test public void DeadNodesDoNotComeAliveAgain() throws InterruptedException, UnknownHostException, URISyntaxException { GossipSettings settings = new GossipSettings(100, 10000, 1000, 1, 10.0, "normal"); + settings.setTransportManagerClass("org.apache.gossip.transport.UnitTestTransportManager"); + settings.setProtocolManagerClass("org.apache.gossip.protocol.UnitTestProtocolManager"); settings.setPersistRingState(false); settings.setPersistDataState(false); String cluster = UUID.randomUUID().toString(); @@ -70,7 +74,7 @@ public class ShutdownDeadtimeTest { .build(); clients.add(gossipService); gossipService.init(); - + Thread.sleep(1000); } TUnit.assertThat(new Callable() { public Integer call() throws Exception { diff --git a/gossip-base/src/test/java/org/apache/gossip/SignedMessageTest.java b/gossip-base/src/test/java/org/apache/gossip/SignedMessageTest.java index 5c3bb76..f669a23 100644 --- a/gossip-base/src/test/java/org/apache/gossip/SignedMessageTest.java +++ b/gossip-base/src/test/java/org/apache/gossip/SignedMessageTest.java @@ -21,7 +21,6 @@ import java.io.File; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; -import java.net.UnknownHostException; import java.security.NoSuchAlgorithmException; import java.security.NoSuchProviderException; import java.util.ArrayList; @@ -41,28 +40,13 @@ import io.teknek.tunit.TUnit; public class SignedMessageTest extends AbstractIntegrationBase { - @Test(expected = IllegalArgumentException.class) - public void ifSignMustHaveKeys() - throws URISyntaxException, UnknownHostException, InterruptedException { - String cluster = UUID.randomUUID().toString(); - GossipSettings settings = gossiperThatSigns(); - List startupMembers = new ArrayList<>(); - URI uri = new URI("udp://" + "127.0.0.1" + ":" + (30000 + 1)); - GossipManager gossipService = GossipManagerBuilder.newBuilder() - .cluster(cluster) - .uri(uri) - .id(1 + "") - .gossipMembers(startupMembers) - .gossipSettings(settings) - .build(); - gossipService.init(); - } - private GossipSettings gossiperThatSigns(){ GossipSettings settings = new GossipSettings(); settings.setPersistRingState(false); settings.setPersistDataState(false); settings.setSignMessages(true); + settings.setTransportManagerClass("org.apache.gossip.transport.UnitTestTransportManager"); + settings.setProtocolManagerClass("org.apache.gossip.protocol.UnitTestProtocolManager"); return settings; } diff --git a/gossip-base/src/test/java/org/apache/gossip/StartupSettingsTest.java b/gossip-base/src/test/java/org/apache/gossip/StartupSettingsTest.java index d6c4a1e..ea93a90 100644 --- a/gossip-base/src/test/java/org/apache/gossip/StartupSettingsTest.java +++ b/gossip-base/src/test/java/org/apache/gossip/StartupSettingsTest.java @@ -47,11 +47,14 @@ public class StartupSettingsTest { settingsFile.deleteOnExit(); writeSettingsFile(settingsFile); URI uri = new URI("udp://" + "127.0.0.1" + ":" + 50000); + GossipSettings firstGossipSettings = new GossipSettings(); + firstGossipSettings.setTransportManagerClass("org.apache.gossip.transport.UnitTestTransportManager"); + firstGossipSettings.setProtocolManagerClass("org.apache.gossip.protocol.UnitTestProtocolManager"); GossipManager firstService = GossipManagerBuilder.newBuilder() .cluster(CLUSTER) .uri(uri) .id("1") - .gossipSettings(new GossipSettings()).build(); + .gossipSettings(firstGossipSettings).build(); firstService.init(); GossipManager manager = GossipManagerBuilder.newBuilder() .startupSettings(StartupSettings.fromJSONFile(settingsFile)).build(); @@ -72,6 +75,8 @@ public class StartupSettingsTest { " \"cleanup_interval\":10000,\n" + " \"convict_threshold\":2.6,\n" + " \"distribution\":\"exponential\",\n" + + " \"transport_manager_class\":\"org.apache.gossip.transport.UnitTestTransportManager\",\n" + + " \"protocol_manager_class\":\"org.apache.gossip.protocol.UnitTestProtocolManager\",\n" + " \"properties\":{},\n" + " \"members\":[\n" + " {\"cluster\": \"" + CLUSTER + "\",\"uri\":\"udp://127.0.0.1:5000\"}\n" + diff --git a/gossip-base/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java b/gossip-base/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java index 8ae783e..c6d7d46 100644 --- a/gossip-base/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java +++ b/gossip-base/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java @@ -50,6 +50,8 @@ public class TenNodeThreeSeedTest { GossipSettings settings = new GossipSettings(1000, 10000, 1000, 1, 1.6, "exponential"); settings.setPersistRingState(false); settings.setPersistDataState(false); + settings.setTransportManagerClass("org.apache.gossip.transport.UnitTestTransportManager"); + settings.setProtocolManagerClass("org.apache.gossip.protocol.UnitTestProtocolManager"); String cluster = UUID.randomUUID().toString(); int seedNodes = 3; List startupMembers = new ArrayList<>(); diff --git a/gossip-base/src/test/java/org/apache/gossip/crdt/OrSetTest.java b/gossip-base/src/test/java/org/apache/gossip/crdt/OrSetTest.java index 4c6014a..70c0d51 100644 --- a/gossip-base/src/test/java/org/apache/gossip/crdt/OrSetTest.java +++ b/gossip-base/src/test/java/org/apache/gossip/crdt/OrSetTest.java @@ -17,15 +17,10 @@ */ package org.apache.gossip.crdt; -import java.io.IOException; -import java.net.URISyntaxException; import java.util.Arrays; import java.util.SortedSet; import java.util.TreeSet; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.gossip.GossipSettings; -import org.apache.gossip.protocol.JacksonProtocolManager; import org.junit.Assert; import org.junit.Test; @@ -85,16 +80,6 @@ public class OrSetTest { Assert.assertEquals(i.value(), j.value()); } - @Test - public void serialTest() throws InterruptedException, URISyntaxException, IOException { - ObjectMapper objectMapper = JacksonProtocolManager.buildObjectMapper(new GossipSettings()); - OrSet i = new OrSet(new OrSet.Builder().add(1).remove(1)); - String s = objectMapper.writeValueAsString(i); - @SuppressWarnings("unchecked") - OrSet back = objectMapper.readValue(s, OrSet.class); - Assert.assertEquals(back, i); - } - @Test public void mergeTestSame() { OrSet i = new OrSet<>(19); diff --git a/gossip-base/src/test/java/org/apache/gossip/manager/DataReaperTest.java b/gossip-base/src/test/java/org/apache/gossip/manager/DataReaperTest.java index e328c24..1a9d43b 100644 --- a/gossip-base/src/test/java/org/apache/gossip/manager/DataReaperTest.java +++ b/gossip-base/src/test/java/org/apache/gossip/manager/DataReaperTest.java @@ -40,6 +40,8 @@ public class DataReaperTest { GossipSettings settings = new GossipSettings(); settings.setPersistRingState(false); settings.setPersistDataState(false); + settings.setTransportManagerClass("org.apache.gossip.transport.UnitTestTransportManager"); + settings.setProtocolManagerClass("org.apache.gossip.protocol.UnitTestProtocolManager"); GossipManager gm = GossipManagerBuilder.newBuilder().cluster("abc").gossipSettings(settings) .id(myId).uri(URI.create("udp://localhost:6000")).registry(registry).build(); gm.init(); @@ -88,6 +90,8 @@ public class DataReaperTest { String key = "key"; String value = "a"; GossipSettings settings = new GossipSettings(); + settings.setTransportManagerClass("org.apache.gossip.transport.UnitTestTransportManager"); + settings.setProtocolManagerClass("org.apache.gossip.protocol.UnitTestProtocolManager"); GossipManager gm = GossipManagerBuilder.newBuilder().cluster("abc").gossipSettings(settings) .id(myId).uri(URI.create("udp://localhost:7000")).registry(registry).build(); gm.init(); diff --git a/gossip-base/src/test/java/org/apache/gossip/manager/UserDataPersistenceTest.java b/gossip-base/src/test/java/org/apache/gossip/manager/UserDataPersistenceTest.java index dde4b74..e1e1127 100644 --- a/gossip-base/src/test/java/org/apache/gossip/manager/UserDataPersistenceTest.java +++ b/gossip-base/src/test/java/org/apache/gossip/manager/UserDataPersistenceTest.java @@ -35,6 +35,8 @@ public class UserDataPersistenceTest { private GossipManager sameService() throws URISyntaxException { GossipSettings settings = new GossipSettings(); + settings.setTransportManagerClass("org.apache.gossip.transport.UnitTestTransportManager"); + settings.setProtocolManagerClass("org.apache.gossip.protocol.UnitTestProtocolManager"); return GossipManagerBuilder.newBuilder() .cluster("a") .uri(new URI("udp://" + "127.0.0.1" + ":" + (29000 + 1))) diff --git a/gossip-base/src/test/java/org/apache/gossip/protocol/UnitTestProtocolManager.java b/gossip-base/src/test/java/org/apache/gossip/protocol/UnitTestProtocolManager.java new file mode 100644 index 0000000..3d52c4a --- /dev/null +++ b/gossip-base/src/test/java/org/apache/gossip/protocol/UnitTestProtocolManager.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gossip.protocol; + +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; +import org.apache.gossip.GossipSettings; +import org.apache.gossip.manager.PassiveGossipConstants; +import org.apache.gossip.model.Base; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +// doesn't serialize anything besides longs. Uses a static lookup table to read and write objects. +public class UnitTestProtocolManager implements ProtocolManager { + + // so it can be shared across gossipers. this works as long as each object has a different memory address. + private static final Map lookup = new ConcurrentHashMap<>(); + private final Meter meter; + + public UnitTestProtocolManager(GossipSettings settings, String id, MetricRegistry registry) { + meter = settings.isSignMessages() ? + registry.meter(PassiveGossipConstants.SIGNED_MESSAGE) : + registry.meter(PassiveGossipConstants.UNSIGNED_MESSAGE); + } + + private static byte[] longToBytes(long val) { + byte[] b = new byte[8]; + b[7] = (byte) (val); + b[6] = (byte) (val >>> 8); + b[5] = (byte) (val >>> 16); + b[4] = (byte) (val >>> 24); + b[3] = (byte) (val >>> 32); + b[2] = (byte) (val >>> 40); + b[1] = (byte) (val >>> 48); + b[0] = (byte) (val >>> 56); + return b; + } + + static long bytesToLong(byte[] b) { + return ((b[7] & 0xFFL)) + + ((b[6] & 0xFFL) << 8) + + ((b[5] & 0xFFL) << 16) + + ((b[4] & 0xFFL) << 24) + + ((b[3] & 0xFFL) << 32) + + ((b[2] & 0xFFL) << 40) + + ((b[1] & 0xFFL) << 48) + + (((long) b[0]) << 56); + } + + @Override + public byte[] write(Base message) throws IOException { + long hashCode = System.identityHashCode(message); + byte[] serialized = longToBytes(hashCode); + lookup.put(hashCode, message); + meter.mark(); + return serialized; + } + + @Override + public Base read(byte[] buf) throws IOException { + long hashCode = bytesToLong(buf); + return lookup.remove(hashCode); + } +} diff --git a/gossip-base/src/test/java/org/apache/gossip/transport/UnitTestTransportManager.java b/gossip-base/src/test/java/org/apache/gossip/transport/UnitTestTransportManager.java new file mode 100644 index 0000000..a783b75 --- /dev/null +++ b/gossip-base/src/test/java/org/apache/gossip/transport/UnitTestTransportManager.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gossip.transport; + +import org.apache.gossip.manager.GossipCore; +import org.apache.gossip.manager.GossipManager; + +import java.io.IOException; +import java.net.URI; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; + +/** Only use in unit tests! */ +public class UnitTestTransportManager extends AbstractTransportManager { + + private static final Map allManagers = new ConcurrentHashMap<>(); + + private final URI localEndpoint; + private BlockingQueue buffers = new ArrayBlockingQueue(1000); + + public UnitTestTransportManager(GossipManager gossipManager, GossipCore gossipCore) { + super(gossipManager, gossipCore); + localEndpoint = gossipManager.getMyself().getUri(); + } + + @Override + public void send(URI endpoint, byte[] buf) throws IOException { + if (allManagers.containsKey(endpoint)) { + try { + allManagers.get(endpoint).buffers.put(buf); + } catch (InterruptedException ex) { + throw new RuntimeException(ex); + } + } + } + + @Override + public byte[] read() throws IOException { + try { + return buffers.take(); + } catch (InterruptedException ex) { + // probably not the right thing to do, but we'll see. + throw new IOException(ex); + } + } + + @Override + public void shutdown() { + allManagers.remove(localEndpoint); + super.shutdown(); + } + + @Override + public void startEndpoint() { + allManagers.put(localEndpoint, this); + super.startEndpoint(); + } +} diff --git a/gossip-protocol-jackson/pom.xml b/gossip-protocol-jackson/pom.xml new file mode 100644 index 0000000..067a27e --- /dev/null +++ b/gossip-protocol-jackson/pom.xml @@ -0,0 +1,51 @@ + + + + 4.0.0 + + + org.apache.gossip + gossip-parent + 0.1.3-incubating-SNAPSHOT + ../pom.xml + + + Gossip Jackson Protocol + gossip-protocol-jackson + 0.1.3-incubating-SNAPSHOT + + + + org.apache.gossip + gossip-base + 0.1.3-incubating-SNAPSHOT + + + org.apache.gossip + gossip-base + 0.1.3-incubating-SNAPSHOT + test-jar + test + + + + + \ No newline at end of file diff --git a/gossip-base/src/main/java/org/apache/gossip/protocol/JacksonProtocolManager.java b/gossip-protocol-jackson/src/main/java/org/apache/gossip/protocol/json/JacksonProtocolManager.java similarity index 98% rename from gossip-base/src/main/java/org/apache/gossip/protocol/JacksonProtocolManager.java rename to gossip-protocol-jackson/src/main/java/org/apache/gossip/protocol/json/JacksonProtocolManager.java index 91ed7f9..499c5ee 100644 --- a/gossip-base/src/main/java/org/apache/gossip/protocol/JacksonProtocolManager.java +++ b/gossip-protocol-jackson/src/main/java/org/apache/gossip/protocol/json/JacksonProtocolManager.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.gossip.protocol; +package org.apache.gossip.protocol.json; import com.codahale.metrics.Meter; import com.codahale.metrics.MetricRegistry; @@ -26,6 +26,7 @@ import org.apache.gossip.crdt.CrdtModule; import org.apache.gossip.manager.PassiveGossipConstants; import org.apache.gossip.model.Base; import org.apache.gossip.model.SignedPayload; +import org.apache.gossip.protocol.ProtocolManager; import java.io.File; import java.io.FileInputStream; diff --git a/gossip-protocol-jackson/src/test/java/org/apache/gossip/protocol/json/JacksonTest.java b/gossip-protocol-jackson/src/test/java/org/apache/gossip/protocol/json/JacksonTest.java new file mode 100644 index 0000000..bd8a949 --- /dev/null +++ b/gossip-protocol-jackson/src/test/java/org/apache/gossip/protocol/json/JacksonTest.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gossip.protocol.json; + +import com.codahale.metrics.MetricRegistry; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.gossip.GossipSettings; +import org.apache.gossip.Member; +import org.apache.gossip.crdt.OrSet; +import org.apache.gossip.manager.GossipManager; +import org.apache.gossip.manager.GossipManagerBuilder; +import org.apache.gossip.model.Base; +import org.apache.gossip.protocol.ProtocolManager; +import org.apache.gossip.udp.Trackable; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.UUID; + +public class JacksonTest { + + private static GossipSettings simpleSettings(GossipSettings settings) { + settings.setPersistRingState(false); + settings.setPersistDataState(false); + settings.setTransportManagerClass("org.apache.gossip.transport.UnitTestTransportManager"); + settings.setProtocolManagerClass("org.apache.gossip.protocol.json.JacksonProtocolManager"); + return settings; + } + + private static GossipSettings withSigning(GossipSettings settings) { + settings.setSignMessages(true); + return settings; + } + + // formerly of SignedMessageTest. + @Test(expected = IllegalArgumentException.class) + public void ifSignMustHaveKeys() + throws URISyntaxException, UnknownHostException, InterruptedException { + String cluster = UUID.randomUUID().toString(); + GossipSettings settings = withSigning(simpleSettings(new GossipSettings())); + List startupMembers = new ArrayList<>(); + URI uri = new URI("udp://" + "127.0.0.1" + ":" + (30000 + 1)); + GossipManager gossipService = GossipManagerBuilder.newBuilder() + .cluster(cluster) + .uri(uri) + .id(1 + "") + .gossipMembers(startupMembers) + .gossipSettings(settings) + .build(); + gossipService.init(); + } + + @Test + public void jacksonSerialTest() throws InterruptedException, URISyntaxException, IOException { + ObjectMapper objectMapper = JacksonProtocolManager.buildObjectMapper(simpleSettings(new GossipSettings())); + + OrSet i = new OrSet(new OrSet.Builder().add(1).remove(1)); + String s = objectMapper.writeValueAsString(i); + @SuppressWarnings("unchecked") + OrSet back = objectMapper.readValue(s, OrSet.class); + Assert.assertEquals(back, i); + } + + @Test + public void testMessageEqualityAssumptions() { + long timeA = System.nanoTime(); + long timeB = System.nanoTime(); + Assert.assertNotEquals(timeA, timeB); + + TestMessage messageA0 = new TestMessage(Long.toHexString(timeA)); + TestMessage messageA1 = new TestMessage(Long.toHexString(timeA)); + TestMessage messageB = new TestMessage(Long.toHexString(timeB)); + + Assert.assertEquals(messageA0, messageA1); + Assert.assertFalse(messageA0 == messageA1); + Assert.assertNotEquals(messageA0, messageB); + Assert.assertNotEquals(messageA1, messageB); + } + + // ideally, we would test the serializability of every message type, but we just want to make sure this works in + // basic cases. + @Test + public void testMessageSerializationRoundTrip() throws Exception { + ProtocolManager mgr = new JacksonProtocolManager(simpleSettings(new GossipSettings()), "foo", new MetricRegistry()); + for (int i = 0; i < 100; i++) { + TestMessage a = new TestMessage(Long.toHexString(System.nanoTime())); + byte[] bytes = mgr.write(a); + TestMessage b = (TestMessage) mgr.read(bytes); + Assert.assertFalse(a == b); + Assert.assertEquals(a, b); + Assert.assertEquals(a.getMapOfThings(), b.getMapOfThings()); // concerned about that one, so explicit check. + } + } +} diff --git a/gossip-protocol-jackson/src/test/java/org/apache/gossip/protocol/json/TestMessage.java b/gossip-protocol-jackson/src/test/java/org/apache/gossip/protocol/json/TestMessage.java new file mode 100644 index 0000000..43032de --- /dev/null +++ b/gossip-protocol-jackson/src/test/java/org/apache/gossip/protocol/json/TestMessage.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.protocol.json; + +import org.apache.gossip.model.Base; +import org.apache.gossip.udp.Trackable; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +/* + * Here is a test class for serialization. I've tried to include a lot of things in it including nested classes. + * Note that there are no Jackson annotations. + * getters and setters are the keys to making this work without the Jackson annotations. + */ +class TestMessage extends Base implements Trackable { + private String unique; + private String from; + private String uuid; + private String derivedField; + private Subclass otherThing; + private float floatValue; + private double doubleValue; + private Object[] arrayOfThings; + private Map mapOfThings = new HashMap<>(); + + private TestMessage() { + } + + TestMessage(String unique) { + this.unique = unique; + from = Integer.toHexString(unique.hashCode()); + uuid = Integer.toHexString(from.hashCode()); + derivedField = Integer.toHexString(uuid.hashCode()); + otherThing = new Subclass(Integer.toHexString(derivedField.hashCode())); + floatValue = (float) unique.hashCode() / (float) from.hashCode(); + doubleValue = (double) uuid.hashCode() / (double) derivedField.hashCode(); + arrayOfThings = new Object[]{ + this.unique, from, uuid, derivedField, otherThing, floatValue, doubleValue + }; + + String curThing = unique; + for (int i = 0; i < 100; i++) { + String key = Integer.toHexString(curThing.hashCode()); + String value = Integer.toHexString(key.hashCode()); + curThing = value; + mapOfThings.put(key, value); + } + } + + @Override + public String getUriFrom() { + return from; + } + + @Override + public void setUriFrom(String uriFrom) { + this.from = uriFrom; + } + + @Override + public String getUuid() { + return uuid; + } + + @Override + public void setUuid(String uuid) { + this.uuid = uuid; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof TestMessage)) return false; + TestMessage that = (TestMessage) o; + return Objects.equals(unique, that.unique) && + Objects.equals(from, that.from) && + Objects.equals(getUuid(), that.getUuid()) && + Objects.equals(derivedField, that.derivedField) && + Objects.equals(floatValue, that.floatValue) && + Objects.equals(doubleValue, that.doubleValue) && + Arrays.equals(arrayOfThings, that.arrayOfThings) && + Objects.equals(mapOfThings, that.mapOfThings); + } + + public String getUnique() { + return unique; + } + + public void setUnique(String unique) { + this.unique = unique; + } + + public String getFrom() { + return from; + } + + public void setFrom(String from) { + this.from = from; + } + + public String getDerivedField() { + return derivedField; + } + + public void setDerivedField(String derivedField) { + this.derivedField = derivedField; + } + + public Subclass getOtherThing() { + return otherThing; + } + + public void setOtherThing(Subclass otherThing) { + this.otherThing = otherThing; + } + + public float getFloatValue() { + return floatValue; + } + + public void setFloatValue(float floatValue) { + this.floatValue = floatValue; + } + + public double getDoubleValue() { + return doubleValue; + } + + public void setDoubleValue(double doubleValue) { + this.doubleValue = doubleValue; + } + + public Object[] getArrayOfThings() { + return arrayOfThings; + } + + public void setArrayOfThings(Object[] arrayOfThings) { + this.arrayOfThings = arrayOfThings; + } + + public Map getMapOfThings() { + return mapOfThings; + } + + public void setMapOfThings(Map mapOfThings) { + this.mapOfThings = mapOfThings; + } + + @Override + public int hashCode() { + return Objects.hash(unique, getUriFrom(), getUuid(), derivedField, floatValue, doubleValue, arrayOfThings, mapOfThings); + } + + static class Subclass { + private String thing; + + public Subclass() { + } + + public Subclass(String thing) { + this.thing = thing; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof Subclass)) return false; + Subclass subclass = (Subclass) o; + return Objects.equals(thing, subclass.thing); + } + + @Override + public int hashCode() { + return Objects.hash(thing); + } + + public String getThing() { + return thing; + } + } +} \ No newline at end of file diff --git a/gossip-transport-udp/pom.xml b/gossip-transport-udp/pom.xml new file mode 100644 index 0000000..2e79b1a --- /dev/null +++ b/gossip-transport-udp/pom.xml @@ -0,0 +1,43 @@ + + + + 4.0.0 + + + org.apache.gossip + gossip-parent + 0.1.3-incubating-SNAPSHOT + ../pom.xml + + + Gossip UDP Transport + gossip-transport-udp + 0.1.3-incubating-SNAPSHOT + + + + org.apache.gossip + gossip-base + 0.1.3-incubating-SNAPSHOT + + + + \ No newline at end of file diff --git a/gossip-base/src/main/java/org/apache/gossip/transport/UdpTransportManager.java b/gossip-transport-udp/src/main/java/org/apache/gossip/transport/udp/UdpTransportManager.java similarity index 97% rename from gossip-base/src/main/java/org/apache/gossip/transport/UdpTransportManager.java rename to gossip-transport-udp/src/main/java/org/apache/gossip/transport/udp/UdpTransportManager.java index d80deec..3f509a6 100644 --- a/gossip-base/src/main/java/org/apache/gossip/transport/UdpTransportManager.java +++ b/gossip-transport-udp/src/main/java/org/apache/gossip/transport/udp/UdpTransportManager.java @@ -15,10 +15,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.gossip.transport; +package org.apache.gossip.transport.udp; import org.apache.gossip.manager.GossipCore; import org.apache.gossip.manager.GossipManager; +import org.apache.gossip.transport.AbstractTransportManager; import org.apache.log4j.Logger; import java.io.IOException; diff --git a/gossip-transport-udp/src/test/java/org/apache/gossip/transport/udp/UdpTransportIntegrationTest.java b/gossip-transport-udp/src/test/java/org/apache/gossip/transport/udp/UdpTransportIntegrationTest.java new file mode 100644 index 0000000..5258374 --- /dev/null +++ b/gossip-transport-udp/src/test/java/org/apache/gossip/transport/udp/UdpTransportIntegrationTest.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.transport.udp; + +import org.apache.gossip.GossipSettings; +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Test; + +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +public class UdpTransportIntegrationTest { + + // It's currently impossible to create a UdpTransportManager without bringing up an entire stack. + // This is because AbstractTransportManager creates a PassiveGossipThread (requires GossipManager, + // GossipCore) and also requires those same things plus a MetricsRegistry to create the + // ActiveGossiper. + // TODO: test UDPTransportManger semantics (read and write) in isolation. + // I've written this test to indicate the direction I want things to go. + // Uncomment/Fix it once the coupling issues are worked out. + @Test @Ignore + public void testRoundTrip() { + /* + GossipSettings settings0 = new GossipSettings(); + GossipSettings settings1 = new GossipSettings(); + UdpTransportManager mgr0 = new UdpTransportManager(settings0); + UdpTransportManager mgr1 = new UdpTransportManager(settings1); + + mgr0.startEndpoint(); + mgr1.startEndpoint(); + mgr0.startActiveGossiper(); + mgr1.startActiveGossiper(); + + // wait a little while for convergence + // perhaps there is a Mockito Whitebox way to foce members + + byte[] data = new byte[] {0,1,2,3,4,5}; + Future someData = asyncWaitForData(mgr1); + mgr0.send(toURI(settings1), data); + + Assert.assertEquals(data, someData.get(1000, TimeUnit.MILLISECONDS)); + + mgr0.shutdown(); + mgr1.shutdown(); + */ + } + + +} diff --git a/pom.xml b/pom.xml index f9c7814..97aa409 100644 --- a/pom.xml +++ b/pom.xml @@ -56,6 +56,8 @@ gossip-base + gossip-transport-udp + gossip-protocol-jackson A peer to peer cluster discovery service @@ -81,6 +83,39 @@ https://issues.apache.org/jira/browse/GOSSIP + + + org.junit.jupiter + junit-jupiter-api + ${junit.jupiter.version} + test + + + org.junit.jupiter + junit-jupiter-engine + ${junit.jupiter.version} + test + + + org.junit.vintage + junit-vintage-engine + ${junit.vintage.version} + test + + + org.junit.platform + junit-platform-runner + ${junit.platform.version} + test + + + io.teknek + tunit + ${tunit.version} + test + + +