GOSSIP-81 Move Jackson and UDP to their own modules

Part of what makes this work is the test implementation of TransportManager.

This PR is pretty straightforward. A few gotchas though:
* A message signing test was moved into `JacksonTests` because that is
  where the signing actually happens.
* A CRDT serializing test was moved there as well. It's the best place
  for now.
* No UDP tests at all. I plan to fix that in a bit. Reasoning is that it is
  difficult to test any TransportManager implementation without bring up
  a full stack. I plan to address this in the future (GOSSIP-83).
* Simple round trip Jackson serialization tests.
This commit is contained in:
Gary Dusbabek
2017-04-19 13:46:04 -05:00
parent 851cd93e67
commit e3010c8542
24 changed files with 748 additions and 74 deletions

View File

@ -1,4 +1,21 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
@ -35,36 +52,6 @@
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>${metrics.version}</version></dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>${junit.jupiter.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>${junit.jupiter.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
<version>${junit.vintage.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.platform</groupId>
<artifactId>junit-platform-runner</artifactId>
<version>${junit.platform.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.teknek</groupId>
<artifactId>tunit</artifactId>
<version>${tunit.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>

View File

@ -45,8 +45,8 @@ public class GossipSettings {
private String activeGossipClass = "org.apache.gossip.manager.SimpleActiveGossipper";
private String transportManagerClass = "org.apache.gossip.transport.UdpTransportManager";
private String protocolManagerClass = "org.apache.gossip.protocol.JacksonProtocolManager";
private String transportManagerClass = "org.apache.gossip.transport.udp.UdpTransportManager";
private String protocolManagerClass = "org.apache.gossip.protocol.json.JacksonProtocolManager";
private Map<String,String> activeGossipProperties = new HashMap<>();
@ -230,7 +230,15 @@ public class GossipSettings {
return transportManagerClass;
}
public void setTransportManagerClass(String transportManagerClass) {
this.transportManagerClass = transportManagerClass;
}
public String getProtocolManagerClass() {
return protocolManagerClass;
}
public void setProtocolManagerClass(String protocolManagerClass) {
this.protocolManagerClass = protocolManagerClass;
}
}

View File

@ -185,10 +185,22 @@ public class StartupSettings {
if (cluster == null){
throw new IllegalArgumentException("cluster was null. It is required");
}
String transportClass = jsonObject.has("transport_manager_class") ?
jsonObject.get("transport_manager_class").textValue() :
null;
String protocolClass = jsonObject.has("protocol_manager_class") ?
jsonObject.get("protocol_manager_class").textValue() :
null;
URI uri2 = new URI(uri);
StartupSettings settings = new StartupSettings(id, uri2,
new GossipSettings(gossipInterval, cleanupInterval, windowSize,
minSamples, convictThreshold, distribution), cluster);
GossipSettings gossipSettings = new GossipSettings(gossipInterval, cleanupInterval, windowSize, minSamples,
convictThreshold, distribution);
if (transportClass != null) {
gossipSettings.setTransportManagerClass(transportClass);
}
if (protocolClass != null) {
gossipSettings.setProtocolManagerClass(protocolClass);
}
StartupSettings settings = new StartupSettings(id, uri2, gossipSettings, cluster);
String configMembersDetails = "Config-members [";
JsonNode membersJSON = jsonObject.get("members");
Iterator<JsonNode> it = membersJSON.iterator();

View File

@ -61,7 +61,10 @@ public class PassiveGossipThread implements Runnable {
LOGGER.error("Unable to process message", ex);
}
} catch (IOException e) {
LOGGER.error(e);
// InterruptedException are completely normal here because of the blocking lifecycle.
if (!(e.getCause() instanceof InterruptedException)) {
LOGGER.error(e);
}
keepRunning.set(false);
}
}

View File

@ -1,131 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.protocol;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.gossip.GossipSettings;
import org.apache.gossip.crdt.CrdtModule;
import org.apache.gossip.manager.PassiveGossipConstants;
import org.apache.gossip.model.Base;
import org.apache.gossip.model.SignedPayload;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.security.InvalidKeyException;
import java.security.KeyFactory;
import java.security.NoSuchAlgorithmException;
import java.security.NoSuchProviderException;
import java.security.PrivateKey;
import java.security.Signature;
import java.security.SignatureException;
import java.security.spec.InvalidKeySpecException;
import java.security.spec.PKCS8EncodedKeySpec;
// this class is constructed by reflection in GossipManager.
public class JacksonProtocolManager implements ProtocolManager {
private final ObjectMapper objectMapper;
private final PrivateKey privKey;
private final Meter signed;
private final Meter unsigned;
/** required for reflection to work! */
public JacksonProtocolManager(GossipSettings settings, String id, MetricRegistry registry) {
// set up object mapper.
objectMapper = buildObjectMapper(settings);
// set up message signing.
if (settings.isSignMessages()){
File privateKey = new File(settings.getPathToKeyStore(), id);
File publicKey = new File(settings.getPathToKeyStore(), id + ".pub");
if (!privateKey.exists()){
throw new IllegalArgumentException("private key not found " + privateKey);
}
if (!publicKey.exists()){
throw new IllegalArgumentException("public key not found " + publicKey);
}
try (FileInputStream keyfis = new FileInputStream(privateKey)) {
byte[] encKey = new byte[keyfis.available()];
keyfis.read(encKey);
keyfis.close();
PKCS8EncodedKeySpec privKeySpec = new PKCS8EncodedKeySpec(encKey);
KeyFactory keyFactory = KeyFactory.getInstance("DSA");
privKey = keyFactory.generatePrivate(privKeySpec);
} catch (NoSuchAlgorithmException | InvalidKeySpecException | IOException e) {
throw new RuntimeException("failed hard", e);
}
} else {
privKey = null;
}
signed = registry.meter(PassiveGossipConstants.SIGNED_MESSAGE);
unsigned = registry.meter(PassiveGossipConstants.UNSIGNED_MESSAGE);
}
@Override
public byte[] write(Base message) throws IOException {
byte[] json_bytes;
if (privKey == null){
json_bytes = objectMapper.writeValueAsBytes(message);
} else {
SignedPayload p = new SignedPayload();
p.setData(objectMapper.writeValueAsString(message).getBytes());
p.setSignature(sign(p.getData(), privKey));
json_bytes = objectMapper.writeValueAsBytes(p);
}
return json_bytes;
}
@Override
public Base read(byte[] buf) throws IOException {
Base activeGossipMessage = objectMapper.readValue(buf, Base.class);
if (activeGossipMessage instanceof SignedPayload){
SignedPayload s = (SignedPayload) activeGossipMessage;
signed.mark();
return objectMapper.readValue(s.getData(), Base.class);
} else {
unsigned.mark();
return activeGossipMessage;
}
}
public static ObjectMapper buildObjectMapper(GossipSettings settings) {
ObjectMapper om = new ObjectMapper();
om.enableDefaultTyping();
// todo: should be specified in the configuration.
om.registerModule(new CrdtModule());
om.configure(JsonGenerator.Feature.WRITE_NUMBERS_AS_STRINGS, false);
return om;
}
private static byte[] sign(byte [] bytes, PrivateKey pk){
Signature dsa;
try {
dsa = Signature.getInstance("SHA1withDSA", "SUN");
dsa.initSign(pk);
dsa.update(bytes);
return dsa.sign();
} catch (NoSuchAlgorithmException | NoSuchProviderException | InvalidKeyException | SignatureException e) {
throw new RuntimeException(e);
}
}
}

View File

@ -66,7 +66,8 @@ public abstract class AbstractTransportManager implements TransportManager {
try {
boolean result = gossipThreadExecutor.awaitTermination(10, TimeUnit.MILLISECONDS);
if (!result) {
LOGGER.error("executor shutdown timed out");
// common when blocking patterns are used to read data from a socket.
LOGGER.warn("executor shutdown timed out");
}
} catch (InterruptedException e) {
LOGGER.error(e);

View File

@ -1,98 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.transport;
import org.apache.gossip.manager.GossipCore;
import org.apache.gossip.manager.GossipManager;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.SocketException;
import java.net.URI;
/**
* This class is constructed by reflection in GossipManager.
* It manages transport (byte read/write) operations over UDP.
*/
public class UdpTransportManager extends AbstractTransportManager {
public static final Logger LOGGER = Logger.getLogger(UdpTransportManager.class);
/** The socket used for the passive thread of the gossip service. */
private final DatagramSocket server;
private final int soTimeout;
/** required for reflection to work! */
public UdpTransportManager(GossipManager gossipManager, GossipCore gossipCore) {
super(gossipManager, gossipCore);
soTimeout = gossipManager.getSettings().getGossipInterval() * 2;
try {
SocketAddress socketAddress = new InetSocketAddress(gossipManager.getMyself().getUri().getHost(),
gossipManager.getMyself().getUri().getPort());
server = new DatagramSocket(socketAddress);
} catch (SocketException ex) {
LOGGER.warn(ex);
throw new RuntimeException(ex);
}
}
@Override
public void shutdown() {
server.close();
super.shutdown();
}
/**
* blocking read a message.
* @return buffer of message contents.
* @throws IOException
*/
public byte[] read() throws IOException {
byte[] buf = new byte[server.getReceiveBufferSize()];
DatagramPacket p = new DatagramPacket(buf, buf.length);
server.receive(p);
debug(p.getData());
return p.getData();
}
@Override
public void send(URI endpoint, byte[] buf) throws IOException {
DatagramSocket socket = new DatagramSocket();
socket.setSoTimeout(soTimeout);
InetAddress dest = InetAddress.getByName(endpoint.getHost());
DatagramPacket payload = new DatagramPacket(buf, buf.length, dest, endpoint.getPort());
socket.send(payload);
// todo: investigate UDP socket reuse. It would save a little setup/teardown time wrt to the local socket.
socket.close();
}
private void debug(byte[] jsonBytes) {
if (LOGGER.isDebugEnabled()){
String receivedMessage = new String(jsonBytes);
LOGGER.debug("Received message ( bytes): " + receivedMessage);
}
}
}

View File

@ -47,6 +47,8 @@ public class DataTest extends AbstractIntegrationBase {
GossipSettings settings = new GossipSettings();
settings.setPersistRingState(false);
settings.setPersistDataState(false);
settings.setTransportManagerClass("org.apache.gossip.transport.UnitTestTransportManager");
settings.setProtocolManagerClass("org.apache.gossip.protocol.UnitTestProtocolManager");
String cluster = UUID.randomUUID().toString();
int seedNodes = 1;
List<Member> startupMembers = new ArrayList<>();

View File

@ -43,6 +43,8 @@ public class IdAndPropertyTest extends AbstractIntegrationBase {
public void testDatacenterRackGossiper() throws URISyntaxException, UnknownHostException, InterruptedException {
GossipSettings settings = new GossipSettings();
settings.setActiveGossipClass(DatacenterRackAwareActiveGossiper.class.getName());
settings.setTransportManagerClass("org.apache.gossip.transport.UnitTestTransportManager");
settings.setProtocolManagerClass("org.apache.gossip.protocol.UnitTestProtocolManager");
List<Member> startupMembers = new ArrayList<>();
Map<String, String> x = new HashMap<>();
x.put("a", "b");

View File

@ -44,10 +44,14 @@ public class ShutdownDeadtimeTest {
private static final Logger log = Logger.getLogger(ShutdownDeadtimeTest.class);
// Note: this test is floppy depending on the values in GossipSettings (smaller values seem to do harm), and the
// sleep that happens after startup.
@Test
public void DeadNodesDoNotComeAliveAgain()
throws InterruptedException, UnknownHostException, URISyntaxException {
GossipSettings settings = new GossipSettings(100, 10000, 1000, 1, 10.0, "normal");
settings.setTransportManagerClass("org.apache.gossip.transport.UnitTestTransportManager");
settings.setProtocolManagerClass("org.apache.gossip.protocol.UnitTestProtocolManager");
settings.setPersistRingState(false);
settings.setPersistDataState(false);
String cluster = UUID.randomUUID().toString();
@ -70,7 +74,7 @@ public class ShutdownDeadtimeTest {
.build();
clients.add(gossipService);
gossipService.init();
Thread.sleep(1000);
}
TUnit.assertThat(new Callable<Integer>() {
public Integer call() throws Exception {

View File

@ -21,7 +21,6 @@ import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.security.NoSuchAlgorithmException;
import java.security.NoSuchProviderException;
import java.util.ArrayList;
@ -41,28 +40,13 @@ import io.teknek.tunit.TUnit;
public class SignedMessageTest extends AbstractIntegrationBase {
@Test(expected = IllegalArgumentException.class)
public void ifSignMustHaveKeys()
throws URISyntaxException, UnknownHostException, InterruptedException {
String cluster = UUID.randomUUID().toString();
GossipSettings settings = gossiperThatSigns();
List<Member> startupMembers = new ArrayList<>();
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (30000 + 1));
GossipManager gossipService = GossipManagerBuilder.newBuilder()
.cluster(cluster)
.uri(uri)
.id(1 + "")
.gossipMembers(startupMembers)
.gossipSettings(settings)
.build();
gossipService.init();
}
private GossipSettings gossiperThatSigns(){
GossipSettings settings = new GossipSettings();
settings.setPersistRingState(false);
settings.setPersistDataState(false);
settings.setSignMessages(true);
settings.setTransportManagerClass("org.apache.gossip.transport.UnitTestTransportManager");
settings.setProtocolManagerClass("org.apache.gossip.protocol.UnitTestProtocolManager");
return settings;
}

View File

@ -47,11 +47,14 @@ public class StartupSettingsTest {
settingsFile.deleteOnExit();
writeSettingsFile(settingsFile);
URI uri = new URI("udp://" + "127.0.0.1" + ":" + 50000);
GossipSettings firstGossipSettings = new GossipSettings();
firstGossipSettings.setTransportManagerClass("org.apache.gossip.transport.UnitTestTransportManager");
firstGossipSettings.setProtocolManagerClass("org.apache.gossip.protocol.UnitTestProtocolManager");
GossipManager firstService = GossipManagerBuilder.newBuilder()
.cluster(CLUSTER)
.uri(uri)
.id("1")
.gossipSettings(new GossipSettings()).build();
.gossipSettings(firstGossipSettings).build();
firstService.init();
GossipManager manager = GossipManagerBuilder.newBuilder()
.startupSettings(StartupSettings.fromJSONFile(settingsFile)).build();
@ -72,6 +75,8 @@ public class StartupSettingsTest {
" \"cleanup_interval\":10000,\n" +
" \"convict_threshold\":2.6,\n" +
" \"distribution\":\"exponential\",\n" +
" \"transport_manager_class\":\"org.apache.gossip.transport.UnitTestTransportManager\",\n" +
" \"protocol_manager_class\":\"org.apache.gossip.protocol.UnitTestProtocolManager\",\n" +
" \"properties\":{},\n" +
" \"members\":[\n" +
" {\"cluster\": \"" + CLUSTER + "\",\"uri\":\"udp://127.0.0.1:5000\"}\n" +

View File

@ -50,6 +50,8 @@ public class TenNodeThreeSeedTest {
GossipSettings settings = new GossipSettings(1000, 10000, 1000, 1, 1.6, "exponential");
settings.setPersistRingState(false);
settings.setPersistDataState(false);
settings.setTransportManagerClass("org.apache.gossip.transport.UnitTestTransportManager");
settings.setProtocolManagerClass("org.apache.gossip.protocol.UnitTestProtocolManager");
String cluster = UUID.randomUUID().toString();
int seedNodes = 3;
List<Member> startupMembers = new ArrayList<>();

View File

@ -17,15 +17,10 @@
*/
package org.apache.gossip.crdt;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.SortedSet;
import java.util.TreeSet;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.gossip.GossipSettings;
import org.apache.gossip.protocol.JacksonProtocolManager;
import org.junit.Assert;
import org.junit.Test;
@ -85,16 +80,6 @@ public class OrSetTest {
Assert.assertEquals(i.value(), j.value());
}
@Test
public void serialTest() throws InterruptedException, URISyntaxException, IOException {
ObjectMapper objectMapper = JacksonProtocolManager.buildObjectMapper(new GossipSettings());
OrSet<Integer> i = new OrSet<Integer>(new OrSet.Builder<Integer>().add(1).remove(1));
String s = objectMapper.writeValueAsString(i);
@SuppressWarnings("unchecked")
OrSet<Integer> back = objectMapper.readValue(s, OrSet.class);
Assert.assertEquals(back, i);
}
@Test
public void mergeTestSame() {
OrSet<Integer> i = new OrSet<>(19);

View File

@ -40,6 +40,8 @@ public class DataReaperTest {
GossipSettings settings = new GossipSettings();
settings.setPersistRingState(false);
settings.setPersistDataState(false);
settings.setTransportManagerClass("org.apache.gossip.transport.UnitTestTransportManager");
settings.setProtocolManagerClass("org.apache.gossip.protocol.UnitTestProtocolManager");
GossipManager gm = GossipManagerBuilder.newBuilder().cluster("abc").gossipSettings(settings)
.id(myId).uri(URI.create("udp://localhost:6000")).registry(registry).build();
gm.init();
@ -88,6 +90,8 @@ public class DataReaperTest {
String key = "key";
String value = "a";
GossipSettings settings = new GossipSettings();
settings.setTransportManagerClass("org.apache.gossip.transport.UnitTestTransportManager");
settings.setProtocolManagerClass("org.apache.gossip.protocol.UnitTestProtocolManager");
GossipManager gm = GossipManagerBuilder.newBuilder().cluster("abc").gossipSettings(settings)
.id(myId).uri(URI.create("udp://localhost:7000")).registry(registry).build();
gm.init();

View File

@ -35,6 +35,8 @@ public class UserDataPersistenceTest {
private GossipManager sameService() throws URISyntaxException {
GossipSettings settings = new GossipSettings();
settings.setTransportManagerClass("org.apache.gossip.transport.UnitTestTransportManager");
settings.setProtocolManagerClass("org.apache.gossip.protocol.UnitTestProtocolManager");
return GossipManagerBuilder.newBuilder()
.cluster("a")
.uri(new URI("udp://" + "127.0.0.1" + ":" + (29000 + 1)))

View File

@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.protocol;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import org.apache.gossip.GossipSettings;
import org.apache.gossip.manager.PassiveGossipConstants;
import org.apache.gossip.model.Base;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
// doesn't serialize anything besides longs. Uses a static lookup table to read and write objects.
public class UnitTestProtocolManager implements ProtocolManager {
// so it can be shared across gossipers. this works as long as each object has a different memory address.
private static final Map<Long, Base> lookup = new ConcurrentHashMap<>();
private final Meter meter;
public UnitTestProtocolManager(GossipSettings settings, String id, MetricRegistry registry) {
meter = settings.isSignMessages() ?
registry.meter(PassiveGossipConstants.SIGNED_MESSAGE) :
registry.meter(PassiveGossipConstants.UNSIGNED_MESSAGE);
}
private static byte[] longToBytes(long val) {
byte[] b = new byte[8];
b[7] = (byte) (val);
b[6] = (byte) (val >>> 8);
b[5] = (byte) (val >>> 16);
b[4] = (byte) (val >>> 24);
b[3] = (byte) (val >>> 32);
b[2] = (byte) (val >>> 40);
b[1] = (byte) (val >>> 48);
b[0] = (byte) (val >>> 56);
return b;
}
static long bytesToLong(byte[] b) {
return ((b[7] & 0xFFL)) +
((b[6] & 0xFFL) << 8) +
((b[5] & 0xFFL) << 16) +
((b[4] & 0xFFL) << 24) +
((b[3] & 0xFFL) << 32) +
((b[2] & 0xFFL) << 40) +
((b[1] & 0xFFL) << 48) +
(((long) b[0]) << 56);
}
@Override
public byte[] write(Base message) throws IOException {
long hashCode = System.identityHashCode(message);
byte[] serialized = longToBytes(hashCode);
lookup.put(hashCode, message);
meter.mark();
return serialized;
}
@Override
public Base read(byte[] buf) throws IOException {
long hashCode = bytesToLong(buf);
return lookup.remove(hashCode);
}
}

View File

@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.transport;
import org.apache.gossip.manager.GossipCore;
import org.apache.gossip.manager.GossipManager;
import java.io.IOException;
import java.net.URI;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
/** Only use in unit tests! */
public class UnitTestTransportManager extends AbstractTransportManager {
private static final Map<URI, UnitTestTransportManager> allManagers = new ConcurrentHashMap<>();
private final URI localEndpoint;
private BlockingQueue<byte[]> buffers = new ArrayBlockingQueue<byte[]>(1000);
public UnitTestTransportManager(GossipManager gossipManager, GossipCore gossipCore) {
super(gossipManager, gossipCore);
localEndpoint = gossipManager.getMyself().getUri();
}
@Override
public void send(URI endpoint, byte[] buf) throws IOException {
if (allManagers.containsKey(endpoint)) {
try {
allManagers.get(endpoint).buffers.put(buf);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
}
@Override
public byte[] read() throws IOException {
try {
return buffers.take();
} catch (InterruptedException ex) {
// probably not the right thing to do, but we'll see.
throw new IOException(ex);
}
}
@Override
public void shutdown() {
allManagers.remove(localEndpoint);
super.shutdown();
}
@Override
public void startEndpoint() {
allManagers.put(localEndpoint, this);
super.startEndpoint();
}
}