GOSSIP-85 Factor out PassiveGossipThread

This commit is contained in:
Edward Capriolo
2017-04-29 19:45:16 -04:00
parent e3010c8542
commit c62ebaf9b6
22 changed files with 156 additions and 132 deletions

View File

@ -75,4 +75,4 @@
</dependency>
</dependencies>
</project>
</project>

View File

@ -44,7 +44,7 @@ public class GossipSettings {
private String distribution = "normal";
private String activeGossipClass = "org.apache.gossip.manager.SimpleActiveGossipper";
private String transportManagerClass = "org.apache.gossip.transport.udp.UdpTransportManager";
private String protocolManagerClass = "org.apache.gossip.protocol.json.JacksonProtocolManager";
@ -241,4 +241,5 @@ public class GossipSettings {
public void setProtocolManagerClass(String protocolManagerClass) {
this.protocolManagerClass = protocolManagerClass;
}
}

View File

@ -50,7 +50,9 @@ public abstract class GossipManager {
public static final Logger LOGGER = Logger.getLogger(GossipManager.class);
// this mapper is used for ring and user-data persistence only. NOT messages.
public static final ObjectMapper metdataObjectMapper = new ObjectMapper() {{
public static final ObjectMapper metdataObjectMapper = new ObjectMapper() {
private static final long serialVersionUID = 1L;
{
enableDefaultTyping();
configure(JsonGenerator.Feature.WRITE_NUMBERS_AS_STRINGS, false);
}};

View File

@ -1,76 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.manager;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.gossip.model.Base;
import org.apache.log4j.Logger;
/**
* This class handles the passive cycle,
* where this client has received an incoming message.
*/
public class PassiveGossipThread implements Runnable {
public static final Logger LOGGER = Logger.getLogger(PassiveGossipThread.class);
private final AtomicBoolean keepRunning;
private final GossipCore gossipCore;
private final GossipManager gossipManager;
public PassiveGossipThread(GossipManager gossipManager, GossipCore gossipCore) {
this.gossipManager = gossipManager;
this.gossipCore = gossipCore;
if (gossipManager.getMyself().getClusterName() == null){
throw new IllegalArgumentException("Cluster was null");
}
keepRunning = new AtomicBoolean(true);
}
@Override
public void run() {
while (keepRunning.get()) {
try {
byte[] buf = gossipManager.getTransportManager().read();
try {
Base message = gossipManager.getProtocolManager().read(buf);
gossipCore.receive(message);
gossipManager.getMemberStateRefresher().run();
} catch (RuntimeException ex) {//TODO trap json exception
LOGGER.error("Unable to process message", ex);
}
} catch (IOException e) {
// InterruptedException are completely normal here because of the blocking lifecycle.
if (!(e.getCause() instanceof InterruptedException)) {
LOGGER.error(e);
}
keepRunning.set(false);
}
}
}
public void requestStop() {
keepRunning.set(false);
}
}

View File

@ -22,14 +22,11 @@ import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.NavigableSet;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.gossip.LocalMember;
import org.apache.gossip.crdt.CrdtModule;
import org.apache.log4j.Logger;
public class RingStatePersister implements Runnable {

View File

@ -21,7 +21,6 @@ import com.codahale.metrics.MetricRegistry;
import org.apache.gossip.manager.AbstractActiveGossiper;
import org.apache.gossip.manager.GossipCore;
import org.apache.gossip.manager.GossipManager;
import org.apache.gossip.manager.PassiveGossipThread;
import org.apache.gossip.utils.ReflectionUtils;
import org.apache.log4j.Logger;
@ -36,14 +35,14 @@ public abstract class AbstractTransportManager implements TransportManager {
public static final Logger LOGGER = Logger.getLogger(AbstractTransportManager.class);
private final PassiveGossipThread passiveGossipThread;
private final ExecutorService gossipThreadExecutor;
private final AbstractActiveGossiper activeGossipThread;
protected final GossipManager gossipManager;
protected final GossipCore gossipCore;
public AbstractTransportManager(GossipManager gossipManager, GossipCore gossipCore) {
passiveGossipThread = new PassiveGossipThread(gossipManager, gossipCore);
this.gossipManager = gossipManager;
this.gossipCore = gossipCore;
gossipThreadExecutor = Executors.newCachedThreadPool();
activeGossipThread = ReflectionUtils.constructWithReflection(
gossipManager.getSettings().getActiveGossipClass(),
@ -58,7 +57,6 @@ public abstract class AbstractTransportManager implements TransportManager {
// shut down threads etc.
@Override
public void shutdown() {
passiveGossipThread.requestStop();
gossipThreadExecutor.shutdown();
if (activeGossipThread != null) {
activeGossipThread.shutdown();
@ -77,11 +75,9 @@ public abstract class AbstractTransportManager implements TransportManager {
@Override
public void startActiveGossiper() {
activeGossipThread.init();
activeGossipThread.init();
}
@Override
public void startEndpoint() {
gossipThreadExecutor.execute(passiveGossipThread);
}
public abstract void startEndpoint();
}

View File

@ -102,7 +102,7 @@ public class MessageHandlerTest {
@Test(expected = NullPointerException.class)
public void cantAddNullHandler2() {
MessageHandler handler = MessageHandlerFactory.concurrentHandler(
MessageHandlerFactory.concurrentHandler(
new TypedMessageHandler(FakeMessage.class, new FakeMessageHandler()),
null,
new TypedMessageHandler(FakeMessage.class, new FakeMessageHandler())

View File

@ -29,7 +29,7 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
/** Only use in unit tests! */
public class UnitTestTransportManager extends AbstractTransportManager {
public class UnitTestTransportManager extends AbstractTransportManager {
private static final Map<URI, UnitTestTransportManager> allManagers = new ConcurrentHashMap<>();
@ -71,6 +71,5 @@ public class UnitTestTransportManager extends AbstractTransportManager {
@Override
public void startEndpoint() {
allManagers.put(localEndpoint, this);
super.startEndpoint();
}
}

88
gossip-itest/pom.xml Normal file
View File

@ -0,0 +1,88 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.gossip</groupId>
<artifactId>gossip-parent</artifactId>
<version>0.1.3-incubating-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<name>Gossip itest</name>
<artifactId>gossip-itest</artifactId>
<version>0.1.3-incubating-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.gossip</groupId>
<artifactId>gossip-base</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.gossip</groupId>
<artifactId>gossip-base</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.gossip</groupId>
<artifactId>gossip-protocol-jackson</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.gossip</groupId>
<artifactId>gossip-transport-udp</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.19.1</version>
<configuration>
<systemPropertyVariables>
<java.io.tmpdir>${project.build.directory}</java.io.tmpdir>
</systemPropertyVariables>
</configuration>
<dependencies>
<dependency>
<groupId>org.junit.platform</groupId>
<artifactId>junit-platform-surefire-provider</artifactId>
<version>${junit.platform.version}</version>
</dependency>
</dependencies>
</plugin>
</plugins>
</build>
</project>

View File

@ -36,7 +36,7 @@ import org.apache.gossip.model.SharedDataMessage;
import org.junit.Test;
import io.teknek.tunit.TUnit;
public class DataTest extends AbstractIntegrationBase {
private String orSetKey = "cror";
@ -45,10 +45,8 @@ public class DataTest extends AbstractIntegrationBase {
@Test
public void dataTest() throws InterruptedException, UnknownHostException, URISyntaxException{
GossipSettings settings = new GossipSettings();
settings.setPersistRingState(false);
settings.setPersistRingState(false);
settings.setPersistDataState(false);
settings.setTransportManagerClass("org.apache.gossip.transport.UnitTestTransportManager");
settings.setProtocolManagerClass("org.apache.gossip.protocol.UnitTestProtocolManager");
String cluster = UUID.randomUUID().toString();
int seedNodes = 1;
List<Member> startupMembers = new ArrayList<>();

View File

@ -43,8 +43,6 @@ public class IdAndPropertyTest extends AbstractIntegrationBase {
public void testDatacenterRackGossiper() throws URISyntaxException, UnknownHostException, InterruptedException {
GossipSettings settings = new GossipSettings();
settings.setActiveGossipClass(DatacenterRackAwareActiveGossiper.class.getName());
settings.setTransportManagerClass("org.apache.gossip.transport.UnitTestTransportManager");
settings.setProtocolManagerClass("org.apache.gossip.protocol.UnitTestProtocolManager");
List<Member> startupMembers = new ArrayList<>();
Map<String, String> x = new HashMap<>();
x.put("a", "b");

View File

@ -50,8 +50,6 @@ public class ShutdownDeadtimeTest {
public void DeadNodesDoNotComeAliveAgain()
throws InterruptedException, UnknownHostException, URISyntaxException {
GossipSettings settings = new GossipSettings(100, 10000, 1000, 1, 10.0, "normal");
settings.setTransportManagerClass("org.apache.gossip.transport.UnitTestTransportManager");
settings.setProtocolManagerClass("org.apache.gossip.protocol.UnitTestProtocolManager");
settings.setPersistRingState(false);
settings.setPersistDataState(false);
String cluster = UUID.randomUUID().toString();

View File

@ -45,8 +45,6 @@ public class SignedMessageTest extends AbstractIntegrationBase {
settings.setPersistRingState(false);
settings.setPersistDataState(false);
settings.setSignMessages(true);
settings.setTransportManagerClass("org.apache.gossip.transport.UnitTestTransportManager");
settings.setProtocolManagerClass("org.apache.gossip.protocol.UnitTestProtocolManager");
return settings;
}

View File

@ -50,8 +50,6 @@ public class TenNodeThreeSeedTest {
GossipSettings settings = new GossipSettings(1000, 10000, 1000, 1, 1.6, "exponential");
settings.setPersistRingState(false);
settings.setPersistDataState(false);
settings.setTransportManagerClass("org.apache.gossip.transport.UnitTestTransportManager");
settings.setProtocolManagerClass("org.apache.gossip.protocol.UnitTestProtocolManager");
String cluster = UUID.randomUUID().toString();
int seedNodes = 3;
List<Member> startupMembers = new ArrayList<>();

View File

@ -36,16 +36,16 @@
<dependency>
<groupId>org.apache.gossip</groupId>
<artifactId>gossip-base</artifactId>
<version>0.1.3-incubating-SNAPSHOT</version>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.gossip</groupId>
<artifactId>gossip-base</artifactId>
<version>0.1.3-incubating-SNAPSHOT</version>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
</dependencies>
</project>
</project>

View File

@ -25,9 +25,7 @@ import org.apache.gossip.Member;
import org.apache.gossip.crdt.OrSet;
import org.apache.gossip.manager.GossipManager;
import org.apache.gossip.manager.GossipManagerBuilder;
import org.apache.gossip.model.Base;
import org.apache.gossip.protocol.ProtocolManager;
import org.apache.gossip.udp.Trackable;
import org.junit.Assert;
import org.junit.Test;
@ -36,11 +34,7 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
public class JacksonTest {

View File

@ -41,6 +41,7 @@ class TestMessage extends Base implements Trackable {
private Object[] arrayOfThings;
private Map<String, String> mapOfThings = new HashMap<>();
@SuppressWarnings("unused")//Used by ObjectMapper
private TestMessage() {
}

View File

@ -36,8 +36,8 @@
<dependency>
<groupId>org.apache.gossip</groupId>
<artifactId>gossip-base</artifactId>
<version>0.1.3-incubating-SNAPSHOT</version>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
</project>

View File

@ -19,6 +19,7 @@ package org.apache.gossip.transport.udp;
import org.apache.gossip.manager.GossipCore;
import org.apache.gossip.manager.GossipManager;
import org.apache.gossip.model.Base;
import org.apache.gossip.transport.AbstractTransportManager;
import org.apache.log4j.Logger;
@ -30,12 +31,13 @@ import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.SocketException;
import java.net.URI;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* This class is constructed by reflection in GossipManager.
* It manages transport (byte read/write) operations over UDP.
*/
public class UdpTransportManager extends AbstractTransportManager {
public class UdpTransportManager extends AbstractTransportManager implements Runnable {
public static final Logger LOGGER = Logger.getLogger(UdpTransportManager.class);
@ -44,12 +46,14 @@ public class UdpTransportManager extends AbstractTransportManager {
private final int soTimeout;
private final Thread me;
private final AtomicBoolean keepRunning = new AtomicBoolean(true);
/** required for reflection to work! */
public UdpTransportManager(GossipManager gossipManager, GossipCore gossipCore) {
super(gossipManager, gossipCore);
soTimeout = gossipManager.getSettings().getGossipInterval() * 2;
try {
SocketAddress socketAddress = new InetSocketAddress(gossipManager.getMyself().getUri().getHost(),
gossipManager.getMyself().getUri().getPort());
@ -58,12 +62,38 @@ public class UdpTransportManager extends AbstractTransportManager {
LOGGER.warn(ex);
throw new RuntimeException(ex);
}
me = new Thread(this);
}
@Override
public void run() {
while (keepRunning.get()) {
try {
byte[] buf = read();
try {
Base message = gossipManager.getProtocolManager().read(buf);
gossipCore.receive(message);
//TODO this is suspect
gossipManager.getMemberStateRefresher().run();
} catch (RuntimeException ex) {//TODO trap json exception
LOGGER.error("Unable to process message", ex);
}
} catch (IOException e) {
// InterruptedException are completely normal here because of the blocking lifecycle.
if (!(e.getCause() instanceof InterruptedException)) {
LOGGER.error(e);
}
keepRunning.set(false);
}
}
}
@Override
public void shutdown() {
keepRunning.set(false);
server.close();
super.shutdown();
me.interrupt();
}
/**
@ -81,13 +111,13 @@ public class UdpTransportManager extends AbstractTransportManager {
@Override
public void send(URI endpoint, byte[] buf) throws IOException {
DatagramSocket socket = new DatagramSocket();
socket.setSoTimeout(soTimeout);
InetAddress dest = InetAddress.getByName(endpoint.getHost());
DatagramPacket payload = new DatagramPacket(buf, buf.length, dest, endpoint.getPort());
socket.send(payload);
// todo: investigate UDP socket reuse. It would save a little setup/teardown time wrt to the local socket.
socket.close();
try (DatagramSocket socket = new DatagramSocket()){
socket.setSoTimeout(soTimeout);
InetAddress dest = InetAddress.getByName(endpoint.getHost());
DatagramPacket payload = new DatagramPacket(buf, buf.length, dest, endpoint.getPort());
socket.send(payload);
}
}
private void debug(byte[] jsonBytes) {
@ -96,4 +126,10 @@ public class UdpTransportManager extends AbstractTransportManager {
LOGGER.debug("Received message ( bytes): " + receivedMessage);
}
}
@Override
public void startEndpoint() {
me.start();
}
}

View File

@ -17,14 +17,9 @@
*/
package org.apache.gossip.transport.udp;
import org.apache.gossip.GossipSettings;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
public class UdpTransportIntegrationTest {
// It's currently impossible to create a UdpTransportManager without bringing up an entire stack.

View File

@ -58,6 +58,7 @@
<module>gossip-base</module>
<module>gossip-transport-udp</module>
<module>gossip-protocol-jackson</module>
<module>gossip-itest</module>
</modules>
<description>A peer to peer cluster discovery service</description>