diff --git a/gossip-base/pom.xml b/gossip-base/pom.xml index b9739f6..34f346c 100644 --- a/gossip-base/pom.xml +++ b/gossip-base/pom.xml @@ -75,4 +75,4 @@ - \ No newline at end of file + diff --git a/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java b/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java index 2ceb453..792af85 100644 --- a/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java +++ b/gossip-base/src/main/java/org/apache/gossip/GossipSettings.java @@ -44,7 +44,7 @@ public class GossipSettings { private String distribution = "normal"; private String activeGossipClass = "org.apache.gossip.manager.SimpleActiveGossipper"; - + private String transportManagerClass = "org.apache.gossip.transport.udp.UdpTransportManager"; private String protocolManagerClass = "org.apache.gossip.protocol.json.JacksonProtocolManager"; @@ -241,4 +241,5 @@ public class GossipSettings { public void setProtocolManagerClass(String protocolManagerClass) { this.protocolManagerClass = protocolManagerClass; } + } diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java b/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java index b1752cd..133a79f 100644 --- a/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java +++ b/gossip-base/src/main/java/org/apache/gossip/manager/GossipManager.java @@ -50,7 +50,9 @@ public abstract class GossipManager { public static final Logger LOGGER = Logger.getLogger(GossipManager.class); // this mapper is used for ring and user-data persistence only. NOT messages. - public static final ObjectMapper metdataObjectMapper = new ObjectMapper() {{ + public static final ObjectMapper metdataObjectMapper = new ObjectMapper() { + private static final long serialVersionUID = 1L; + { enableDefaultTyping(); configure(JsonGenerator.Feature.WRITE_NUMBERS_AS_STRINGS, false); }}; diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java b/gossip-base/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java deleted file mode 100644 index 03a874c..0000000 --- a/gossip-base/src/main/java/org/apache/gossip/manager/PassiveGossipThread.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gossip.manager; - -import java.io.IOException; - -import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.gossip.model.Base; -import org.apache.log4j.Logger; - - -/** - * This class handles the passive cycle, - * where this client has received an incoming message. - */ -public class PassiveGossipThread implements Runnable { - - public static final Logger LOGGER = Logger.getLogger(PassiveGossipThread.class); - - - private final AtomicBoolean keepRunning; - private final GossipCore gossipCore; - private final GossipManager gossipManager; - - public PassiveGossipThread(GossipManager gossipManager, GossipCore gossipCore) { - this.gossipManager = gossipManager; - this.gossipCore = gossipCore; - if (gossipManager.getMyself().getClusterName() == null){ - throw new IllegalArgumentException("Cluster was null"); - } - - keepRunning = new AtomicBoolean(true); - } - - @Override - public void run() { - while (keepRunning.get()) { - try { - byte[] buf = gossipManager.getTransportManager().read(); - try { - Base message = gossipManager.getProtocolManager().read(buf); - gossipCore.receive(message); - gossipManager.getMemberStateRefresher().run(); - } catch (RuntimeException ex) {//TODO trap json exception - LOGGER.error("Unable to process message", ex); - } - } catch (IOException e) { - // InterruptedException are completely normal here because of the blocking lifecycle. - if (!(e.getCause() instanceof InterruptedException)) { - LOGGER.error(e); - } - keepRunning.set(false); - } - } - } - - public void requestStop() { - keepRunning.set(false); - } -} \ No newline at end of file diff --git a/gossip-base/src/main/java/org/apache/gossip/manager/RingStatePersister.java b/gossip-base/src/main/java/org/apache/gossip/manager/RingStatePersister.java index 0af9f12..5334ad4 100644 --- a/gossip-base/src/main/java/org/apache/gossip/manager/RingStatePersister.java +++ b/gossip-base/src/main/java/org/apache/gossip/manager/RingStatePersister.java @@ -22,14 +22,11 @@ import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.NavigableSet; -import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.gossip.LocalMember; -import org.apache.gossip.crdt.CrdtModule; import org.apache.log4j.Logger; public class RingStatePersister implements Runnable { diff --git a/gossip-base/src/main/java/org/apache/gossip/transport/AbstractTransportManager.java b/gossip-base/src/main/java/org/apache/gossip/transport/AbstractTransportManager.java index 33db038..82b0dfb 100644 --- a/gossip-base/src/main/java/org/apache/gossip/transport/AbstractTransportManager.java +++ b/gossip-base/src/main/java/org/apache/gossip/transport/AbstractTransportManager.java @@ -21,7 +21,6 @@ import com.codahale.metrics.MetricRegistry; import org.apache.gossip.manager.AbstractActiveGossiper; import org.apache.gossip.manager.GossipCore; import org.apache.gossip.manager.GossipManager; -import org.apache.gossip.manager.PassiveGossipThread; import org.apache.gossip.utils.ReflectionUtils; import org.apache.log4j.Logger; @@ -36,14 +35,14 @@ public abstract class AbstractTransportManager implements TransportManager { public static final Logger LOGGER = Logger.getLogger(AbstractTransportManager.class); - private final PassiveGossipThread passiveGossipThread; private final ExecutorService gossipThreadExecutor; - private final AbstractActiveGossiper activeGossipThread; + protected final GossipManager gossipManager; + protected final GossipCore gossipCore; public AbstractTransportManager(GossipManager gossipManager, GossipCore gossipCore) { - - passiveGossipThread = new PassiveGossipThread(gossipManager, gossipCore); + this.gossipManager = gossipManager; + this.gossipCore = gossipCore; gossipThreadExecutor = Executors.newCachedThreadPool(); activeGossipThread = ReflectionUtils.constructWithReflection( gossipManager.getSettings().getActiveGossipClass(), @@ -58,7 +57,6 @@ public abstract class AbstractTransportManager implements TransportManager { // shut down threads etc. @Override public void shutdown() { - passiveGossipThread.requestStop(); gossipThreadExecutor.shutdown(); if (activeGossipThread != null) { activeGossipThread.shutdown(); @@ -77,11 +75,9 @@ public abstract class AbstractTransportManager implements TransportManager { @Override public void startActiveGossiper() { - activeGossipThread.init(); + activeGossipThread.init(); } @Override - public void startEndpoint() { - gossipThreadExecutor.execute(passiveGossipThread); - } + public abstract void startEndpoint(); } diff --git a/gossip-base/src/test/java/org/apache/gossip/manager/handlers/MessageHandlerTest.java b/gossip-base/src/test/java/org/apache/gossip/manager/handlers/MessageHandlerTest.java index ec91d67..42b9353 100644 --- a/gossip-base/src/test/java/org/apache/gossip/manager/handlers/MessageHandlerTest.java +++ b/gossip-base/src/test/java/org/apache/gossip/manager/handlers/MessageHandlerTest.java @@ -102,7 +102,7 @@ public class MessageHandlerTest { @Test(expected = NullPointerException.class) public void cantAddNullHandler2() { - MessageHandler handler = MessageHandlerFactory.concurrentHandler( + MessageHandlerFactory.concurrentHandler( new TypedMessageHandler(FakeMessage.class, new FakeMessageHandler()), null, new TypedMessageHandler(FakeMessage.class, new FakeMessageHandler()) diff --git a/gossip-base/src/test/java/org/apache/gossip/transport/UnitTestTransportManager.java b/gossip-base/src/test/java/org/apache/gossip/transport/UnitTestTransportManager.java index a783b75..206bc62 100644 --- a/gossip-base/src/test/java/org/apache/gossip/transport/UnitTestTransportManager.java +++ b/gossip-base/src/test/java/org/apache/gossip/transport/UnitTestTransportManager.java @@ -29,7 +29,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; /** Only use in unit tests! */ -public class UnitTestTransportManager extends AbstractTransportManager { +public class UnitTestTransportManager extends AbstractTransportManager { private static final Map allManagers = new ConcurrentHashMap<>(); @@ -71,6 +71,5 @@ public class UnitTestTransportManager extends AbstractTransportManager { @Override public void startEndpoint() { allManagers.put(localEndpoint, this); - super.startEndpoint(); } } diff --git a/gossip-itest/pom.xml b/gossip-itest/pom.xml new file mode 100644 index 0000000..6067732 --- /dev/null +++ b/gossip-itest/pom.xml @@ -0,0 +1,88 @@ + + + + 4.0.0 + + + org.apache.gossip + gossip-parent + 0.1.3-incubating-SNAPSHOT + ../pom.xml + + + Gossip itest + gossip-itest + 0.1.3-incubating-SNAPSHOT + + + + org.apache.gossip + gossip-base + ${project.version} + + + org.apache.gossip + gossip-base + ${project.version} + test-jar + test + + + org.apache.gossip + gossip-protocol-jackson + ${project.version} + + + org.apache.gossip + gossip-transport-udp + ${project.version} + + + + + + + maven-compiler-plugin + 3.1 + + ${java.version} + ${java.version} + + + + maven-surefire-plugin + 2.19.1 + + + ${project.build.directory} + + + + + org.junit.platform + junit-platform-surefire-provider + ${junit.platform.version} + + + + + + diff --git a/gossip-base/src/test/java/org/apache/gossip/DataTest.java b/gossip-itest/src/test/java/org/apache/gossip/DataTest.java similarity index 97% rename from gossip-base/src/test/java/org/apache/gossip/DataTest.java rename to gossip-itest/src/test/java/org/apache/gossip/DataTest.java index bb33dc2..9fe9aa9 100644 --- a/gossip-base/src/test/java/org/apache/gossip/DataTest.java +++ b/gossip-itest/src/test/java/org/apache/gossip/DataTest.java @@ -36,7 +36,7 @@ import org.apache.gossip.model.SharedDataMessage; import org.junit.Test; import io.teknek.tunit.TUnit; - + public class DataTest extends AbstractIntegrationBase { private String orSetKey = "cror"; @@ -45,10 +45,8 @@ public class DataTest extends AbstractIntegrationBase { @Test public void dataTest() throws InterruptedException, UnknownHostException, URISyntaxException{ GossipSettings settings = new GossipSettings(); - settings.setPersistRingState(false); + settings.setPersistRingState(false); settings.setPersistDataState(false); - settings.setTransportManagerClass("org.apache.gossip.transport.UnitTestTransportManager"); - settings.setProtocolManagerClass("org.apache.gossip.protocol.UnitTestProtocolManager"); String cluster = UUID.randomUUID().toString(); int seedNodes = 1; List startupMembers = new ArrayList<>(); diff --git a/gossip-base/src/test/java/org/apache/gossip/IdAndPropertyTest.java b/gossip-itest/src/test/java/org/apache/gossip/IdAndPropertyTest.java similarity index 94% rename from gossip-base/src/test/java/org/apache/gossip/IdAndPropertyTest.java rename to gossip-itest/src/test/java/org/apache/gossip/IdAndPropertyTest.java index 1b6a32a..7f550de 100644 --- a/gossip-base/src/test/java/org/apache/gossip/IdAndPropertyTest.java +++ b/gossip-itest/src/test/java/org/apache/gossip/IdAndPropertyTest.java @@ -43,8 +43,6 @@ public class IdAndPropertyTest extends AbstractIntegrationBase { public void testDatacenterRackGossiper() throws URISyntaxException, UnknownHostException, InterruptedException { GossipSettings settings = new GossipSettings(); settings.setActiveGossipClass(DatacenterRackAwareActiveGossiper.class.getName()); - settings.setTransportManagerClass("org.apache.gossip.transport.UnitTestTransportManager"); - settings.setProtocolManagerClass("org.apache.gossip.protocol.UnitTestProtocolManager"); List startupMembers = new ArrayList<>(); Map x = new HashMap<>(); x.put("a", "b"); diff --git a/gossip-base/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java b/gossip-itest/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java similarity index 96% rename from gossip-base/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java rename to gossip-itest/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java index 30c52bc..dd5bfe9 100644 --- a/gossip-base/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java +++ b/gossip-itest/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java @@ -50,8 +50,6 @@ public class ShutdownDeadtimeTest { public void DeadNodesDoNotComeAliveAgain() throws InterruptedException, UnknownHostException, URISyntaxException { GossipSettings settings = new GossipSettings(100, 10000, 1000, 1, 10.0, "normal"); - settings.setTransportManagerClass("org.apache.gossip.transport.UnitTestTransportManager"); - settings.setProtocolManagerClass("org.apache.gossip.protocol.UnitTestProtocolManager"); settings.setPersistRingState(false); settings.setPersistDataState(false); String cluster = UUID.randomUUID().toString(); diff --git a/gossip-base/src/test/java/org/apache/gossip/SignedMessageTest.java b/gossip-itest/src/test/java/org/apache/gossip/SignedMessageTest.java similarity index 95% rename from gossip-base/src/test/java/org/apache/gossip/SignedMessageTest.java rename to gossip-itest/src/test/java/org/apache/gossip/SignedMessageTest.java index f669a23..e288cb8 100644 --- a/gossip-base/src/test/java/org/apache/gossip/SignedMessageTest.java +++ b/gossip-itest/src/test/java/org/apache/gossip/SignedMessageTest.java @@ -45,8 +45,6 @@ public class SignedMessageTest extends AbstractIntegrationBase { settings.setPersistRingState(false); settings.setPersistDataState(false); settings.setSignMessages(true); - settings.setTransportManagerClass("org.apache.gossip.transport.UnitTestTransportManager"); - settings.setProtocolManagerClass("org.apache.gossip.protocol.UnitTestProtocolManager"); return settings; } diff --git a/gossip-base/src/test/java/org/apache/gossip/StartupSettingsTest.java b/gossip-itest/src/test/java/org/apache/gossip/StartupSettingsTest.java similarity index 100% rename from gossip-base/src/test/java/org/apache/gossip/StartupSettingsTest.java rename to gossip-itest/src/test/java/org/apache/gossip/StartupSettingsTest.java diff --git a/gossip-base/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java b/gossip-itest/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java similarity index 94% rename from gossip-base/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java rename to gossip-itest/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java index c6d7d46..8ae783e 100644 --- a/gossip-base/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java +++ b/gossip-itest/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java @@ -50,8 +50,6 @@ public class TenNodeThreeSeedTest { GossipSettings settings = new GossipSettings(1000, 10000, 1000, 1, 1.6, "exponential"); settings.setPersistRingState(false); settings.setPersistDataState(false); - settings.setTransportManagerClass("org.apache.gossip.transport.UnitTestTransportManager"); - settings.setProtocolManagerClass("org.apache.gossip.protocol.UnitTestProtocolManager"); String cluster = UUID.randomUUID().toString(); int seedNodes = 3; List startupMembers = new ArrayList<>(); diff --git a/gossip-protocol-jackson/pom.xml b/gossip-protocol-jackson/pom.xml index 067a27e..128a26d 100644 --- a/gossip-protocol-jackson/pom.xml +++ b/gossip-protocol-jackson/pom.xml @@ -36,16 +36,16 @@ org.apache.gossip gossip-base - 0.1.3-incubating-SNAPSHOT + ${project.version} org.apache.gossip gossip-base - 0.1.3-incubating-SNAPSHOT + ${project.version} test-jar test - \ No newline at end of file + diff --git a/gossip-protocol-jackson/src/test/java/org/apache/gossip/protocol/json/JacksonTest.java b/gossip-protocol-jackson/src/test/java/org/apache/gossip/protocol/json/JacksonTest.java index bd8a949..cbac460 100644 --- a/gossip-protocol-jackson/src/test/java/org/apache/gossip/protocol/json/JacksonTest.java +++ b/gossip-protocol-jackson/src/test/java/org/apache/gossip/protocol/json/JacksonTest.java @@ -25,9 +25,7 @@ import org.apache.gossip.Member; import org.apache.gossip.crdt.OrSet; import org.apache.gossip.manager.GossipManager; import org.apache.gossip.manager.GossipManagerBuilder; -import org.apache.gossip.model.Base; import org.apache.gossip.protocol.ProtocolManager; -import org.apache.gossip.udp.Trackable; import org.junit.Assert; import org.junit.Test; @@ -36,11 +34,7 @@ import java.net.URI; import java.net.URISyntaxException; import java.net.UnknownHostException; import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; import java.util.List; -import java.util.Map; -import java.util.Objects; import java.util.UUID; public class JacksonTest { diff --git a/gossip-protocol-jackson/src/test/java/org/apache/gossip/protocol/json/TestMessage.java b/gossip-protocol-jackson/src/test/java/org/apache/gossip/protocol/json/TestMessage.java index 43032de..7ac211d 100644 --- a/gossip-protocol-jackson/src/test/java/org/apache/gossip/protocol/json/TestMessage.java +++ b/gossip-protocol-jackson/src/test/java/org/apache/gossip/protocol/json/TestMessage.java @@ -41,6 +41,7 @@ class TestMessage extends Base implements Trackable { private Object[] arrayOfThings; private Map mapOfThings = new HashMap<>(); + @SuppressWarnings("unused")//Used by ObjectMapper private TestMessage() { } diff --git a/gossip-transport-udp/pom.xml b/gossip-transport-udp/pom.xml index 2e79b1a..446aace 100644 --- a/gossip-transport-udp/pom.xml +++ b/gossip-transport-udp/pom.xml @@ -36,8 +36,8 @@ org.apache.gossip gossip-base - 0.1.3-incubating-SNAPSHOT + ${project.version} - \ No newline at end of file + diff --git a/gossip-transport-udp/src/main/java/org/apache/gossip/transport/udp/UdpTransportManager.java b/gossip-transport-udp/src/main/java/org/apache/gossip/transport/udp/UdpTransportManager.java index 3f509a6..d6aaa15 100644 --- a/gossip-transport-udp/src/main/java/org/apache/gossip/transport/udp/UdpTransportManager.java +++ b/gossip-transport-udp/src/main/java/org/apache/gossip/transport/udp/UdpTransportManager.java @@ -19,6 +19,7 @@ package org.apache.gossip.transport.udp; import org.apache.gossip.manager.GossipCore; import org.apache.gossip.manager.GossipManager; +import org.apache.gossip.model.Base; import org.apache.gossip.transport.AbstractTransportManager; import org.apache.log4j.Logger; @@ -30,12 +31,13 @@ import java.net.InetSocketAddress; import java.net.SocketAddress; import java.net.SocketException; import java.net.URI; +import java.util.concurrent.atomic.AtomicBoolean; /** * This class is constructed by reflection in GossipManager. * It manages transport (byte read/write) operations over UDP. */ -public class UdpTransportManager extends AbstractTransportManager { +public class UdpTransportManager extends AbstractTransportManager implements Runnable { public static final Logger LOGGER = Logger.getLogger(UdpTransportManager.class); @@ -44,12 +46,14 @@ public class UdpTransportManager extends AbstractTransportManager { private final int soTimeout; + private final Thread me; + + private final AtomicBoolean keepRunning = new AtomicBoolean(true); + /** required for reflection to work! */ public UdpTransportManager(GossipManager gossipManager, GossipCore gossipCore) { super(gossipManager, gossipCore); - soTimeout = gossipManager.getSettings().getGossipInterval() * 2; - try { SocketAddress socketAddress = new InetSocketAddress(gossipManager.getMyself().getUri().getHost(), gossipManager.getMyself().getUri().getPort()); @@ -58,12 +62,38 @@ public class UdpTransportManager extends AbstractTransportManager { LOGGER.warn(ex); throw new RuntimeException(ex); } + me = new Thread(this); } + @Override + public void run() { + while (keepRunning.get()) { + try { + byte[] buf = read(); + try { + Base message = gossipManager.getProtocolManager().read(buf); + gossipCore.receive(message); + //TODO this is suspect + gossipManager.getMemberStateRefresher().run(); + } catch (RuntimeException ex) {//TODO trap json exception + LOGGER.error("Unable to process message", ex); + } + } catch (IOException e) { + // InterruptedException are completely normal here because of the blocking lifecycle. + if (!(e.getCause() instanceof InterruptedException)) { + LOGGER.error(e); + } + keepRunning.set(false); + } + } + } + @Override public void shutdown() { + keepRunning.set(false); server.close(); super.shutdown(); + me.interrupt(); } /** @@ -81,13 +111,13 @@ public class UdpTransportManager extends AbstractTransportManager { @Override public void send(URI endpoint, byte[] buf) throws IOException { - DatagramSocket socket = new DatagramSocket(); - socket.setSoTimeout(soTimeout); - InetAddress dest = InetAddress.getByName(endpoint.getHost()); - DatagramPacket payload = new DatagramPacket(buf, buf.length, dest, endpoint.getPort()); - socket.send(payload); // todo: investigate UDP socket reuse. It would save a little setup/teardown time wrt to the local socket. - socket.close(); + try (DatagramSocket socket = new DatagramSocket()){ + socket.setSoTimeout(soTimeout); + InetAddress dest = InetAddress.getByName(endpoint.getHost()); + DatagramPacket payload = new DatagramPacket(buf, buf.length, dest, endpoint.getPort()); + socket.send(payload); + } } private void debug(byte[] jsonBytes) { @@ -96,4 +126,10 @@ public class UdpTransportManager extends AbstractTransportManager { LOGGER.debug("Received message ( bytes): " + receivedMessage); } } + + @Override + public void startEndpoint() { + me.start(); + } + } diff --git a/gossip-transport-udp/src/test/java/org/apache/gossip/transport/udp/UdpTransportIntegrationTest.java b/gossip-transport-udp/src/test/java/org/apache/gossip/transport/udp/UdpTransportIntegrationTest.java index 5258374..8a27d0a 100644 --- a/gossip-transport-udp/src/test/java/org/apache/gossip/transport/udp/UdpTransportIntegrationTest.java +++ b/gossip-transport-udp/src/test/java/org/apache/gossip/transport/udp/UdpTransportIntegrationTest.java @@ -17,14 +17,9 @@ */ package org.apache.gossip.transport.udp; -import org.apache.gossip.GossipSettings; -import org.junit.Assert; import org.junit.Ignore; import org.junit.Test; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; - public class UdpTransportIntegrationTest { // It's currently impossible to create a UdpTransportManager without bringing up an entire stack. diff --git a/pom.xml b/pom.xml index 97aa409..1c48306 100644 --- a/pom.xml +++ b/pom.xml @@ -58,6 +58,7 @@ gossip-base gossip-transport-udp gossip-protocol-jackson + gossip-itest A peer to peer cluster discovery service