GOSSIP-47 sign data

This commit is contained in:
Edward Capriolo
2017-02-11 00:03:56 -05:00
parent 21a263b079
commit 7106cc400a
9 changed files with 346 additions and 10 deletions

View File

@ -56,6 +56,11 @@ public class GossipSettings {
private boolean persistDataState = true;
private String pathToKeyStore = "./keys";
private boolean signMessages = false;
/**
* Construct GossipSettings with default settings.
*/
@ -203,4 +208,20 @@ public class GossipSettings {
this.persistDataState = persistDataState;
}
public String getPathToKeyStore() {
return pathToKeyStore;
}
public void setPathToKeyStore(String pathToKeyStore) {
this.pathToKeyStore = pathToKeyStore;
}
public boolean isSignMessages() {
return signMessages;
}
public void setSignMessages(boolean signMessages) {
this.signMessages = signMessages;
}
}

View File

@ -17,12 +17,23 @@
*/
package org.apache.gossip.manager;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.security.InvalidKeyException;
import java.security.KeyFactory;
import java.security.NoSuchAlgorithmException;
import java.security.NoSuchProviderException;
import java.security.PrivateKey;
import java.security.Signature;
import java.security.SignatureException;
import java.security.spec.InvalidKeySpecException;
import java.security.spec.PKCS8EncodedKeySpec;
import java.util.ArrayList;
import java.util.List;
import java.util.Map.Entry;
@ -45,6 +56,7 @@ import org.apache.gossip.model.GossipDataMessage;
import org.apache.gossip.model.Response;
import org.apache.gossip.model.SharedGossipDataMessage;
import org.apache.gossip.model.ShutdownMessage;
import org.apache.gossip.model.SignedPayload;
import org.apache.gossip.udp.Trackable;
import org.apache.gossip.udp.UdpActiveGossipMessage;
import org.apache.gossip.udp.UdpActiveGossipOk;
@ -66,6 +78,8 @@ public class GossipCore implements GossipCoreConstants {
private final ConcurrentHashMap<String, ConcurrentHashMap<String, GossipDataMessage>> perNodeData;
private final ConcurrentHashMap<String, SharedGossipDataMessage> sharedData;
private final BlockingQueue<Runnable> workQueue;
private final PKCS8EncodedKeySpec privKeySpec;
private final PrivateKey privKey;
private final Meter messageSerdeException;
private final Meter tranmissionException;
private final Meter tranmissionSuccess;
@ -86,6 +100,41 @@ public class GossipCore implements GossipCoreConstants {
messageSerdeException = metrics.meter(MESSAGE_SERDE_EXCEPTION);
tranmissionException = metrics.meter(MESSAGE_TRANSMISSION_EXCEPTION);
tranmissionSuccess = metrics.meter(MESSAGE_TRANSMISSION_SUCCESS);
if (manager.getSettings().isSignMessages()){
File privateKey = new File(manager.getSettings().getPathToKeyStore(), manager.getMyself().getId());
File publicKey = new File(manager.getSettings().getPathToKeyStore(), manager.getMyself().getId() + ".pub");
if (!privateKey.exists()){
throw new IllegalArgumentException("private key not found " + privateKey);
}
if (!publicKey.exists()){
throw new IllegalArgumentException("public key not found " + publicKey);
}
try (FileInputStream keyfis = new FileInputStream(privateKey)) {
byte[] encKey = new byte[keyfis.available()];
keyfis.read(encKey);
keyfis.close();
privKeySpec = new PKCS8EncodedKeySpec(encKey);
KeyFactory keyFactory = KeyFactory.getInstance("DSA");
privKey = keyFactory.generatePrivate(privKeySpec);
} catch (NoSuchAlgorithmException | InvalidKeySpecException | IOException e) {
throw new RuntimeException("failed hard", e);
}
} else {
privKeySpec = null;
privKey = null;
}
}
private byte [] sign(byte [] bytes){
Signature dsa;
try {
dsa = Signature.getInstance("SHA1withDSA", "SUN");
dsa.initSign(privKey);
dsa.update(bytes);
return dsa.sign();
} catch (NoSuchAlgorithmException | NoSuchProviderException | InvalidKeyException | SignatureException e) {
throw new RuntimeException(e);
}
}
public void addSharedData(SharedGossipDataMessage message){
@ -207,7 +256,14 @@ public class GossipCore implements GossipCoreConstants {
private void sendInternal(Base message, URI uri){
byte[] json_bytes;
try {
json_bytes = gossipManager.getObjectMapper().writeValueAsString(message).getBytes();
if (privKey == null){
json_bytes = gossipManager.getObjectMapper().writeValueAsBytes(message);
} else {
SignedPayload p = new SignedPayload();
p.setData(gossipManager.getObjectMapper().writeValueAsString(message).getBytes());
p.setSignature(sign(p.getData()));
json_bytes = gossipManager.getObjectMapper().writeValueAsBytes(p);
}
} catch (IOException e) {
messageSerdeException.mark();
throw new RuntimeException(e);
@ -285,7 +341,14 @@ public class GossipCore implements GossipCoreConstants {
public void sendOneWay(Base message, URI u){
byte[] json_bytes;
try {
json_bytes = gossipManager.getObjectMapper().writeValueAsBytes(message);
if (privKey == null){
json_bytes = gossipManager.getObjectMapper().writeValueAsBytes(message);
} else {
SignedPayload p = new SignedPayload();
p.setData(gossipManager.getObjectMapper().writeValueAsString(message).getBytes());
p.setSignature(sign(p.getData()));
json_bytes = gossipManager.getObjectMapper().writeValueAsBytes(p);
}
} catch (IOException e) {
messageSerdeException.mark();
throw new RuntimeException(e);

View File

@ -76,11 +76,11 @@ public abstract class GossipManager {
URI uri, String id, Map<String,String> properties, GossipSettings settings,
List<GossipMember> gossipMembers, GossipListener listener, MetricRegistry registry, ObjectMapper objectMapper) {
this.settings = settings;
gossipCore = new GossipCore(this, registry);
clock = new SystemClock();
dataReaper = new DataReaper(gossipCore, clock);
me = new LocalGossipMember(cluster, uri, id, clock.nanoTime(), properties,
settings.getWindowSize(), settings.getMinimumSamples(), settings.getDistribution());
gossipCore = new GossipCore(this, registry);
dataReaper = new DataReaper(gossipCore, clock);
members = new ConcurrentSkipListMap<>();
for (GossipMember startupMember : gossipMembers) {
if (!startupMember.equals(me)) {
@ -338,4 +338,8 @@ public abstract class GossipManager {
return objectMapper;
}
public MetricRegistry getRegistry() {
return registry;
}
}

View File

@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.manager;
public interface PassiveGossipConstants {
String SIGNED_MESSAGE = "gossip.passive.signed_message";
String UNSIGNED_MESSAGE = "gossip.passive.unsigned_message";
}

View File

@ -26,8 +26,11 @@ import java.net.SocketException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.gossip.model.Base;
import org.apache.gossip.model.SignedPayload;
import org.apache.log4j.Logger;
import com.codahale.metrics.Meter;
/**
* This class handles the passive cycle,
* where this client has received an incoming message.
@ -41,6 +44,8 @@ abstract public class PassiveGossipThread implements Runnable {
private final AtomicBoolean keepRunning;
private final GossipCore gossipCore;
private final GossipManager gossipManager;
private final Meter signed;
private final Meter unsigned;
public PassiveGossipThread(GossipManager gossipManager, GossipCore gossipCore) {
this.gossipManager = gossipManager;
@ -52,14 +57,13 @@ abstract public class PassiveGossipThread implements Runnable {
SocketAddress socketAddress = new InetSocketAddress(gossipManager.getMyself().getUri().getHost(),
gossipManager.getMyself().getUri().getPort());
server = new DatagramSocket(socketAddress);
LOGGER.debug("Gossip service successfully initialized on port "
+ gossipManager.getMyself().getUri().getPort());
LOGGER.debug("I am " + gossipManager.getMyself());
} catch (SocketException ex) {
LOGGER.warn(ex);
throw new RuntimeException(ex);
}
keepRunning = new AtomicBoolean(true);
signed = gossipManager.getRegistry().meter(PassiveGossipConstants.SIGNED_MESSAGE);
unsigned = gossipManager.getRegistry().meter(PassiveGossipConstants.UNSIGNED_MESSAGE);
}
@Override
@ -72,7 +76,15 @@ abstract public class PassiveGossipThread implements Runnable {
debug(p.getData());
try {
Base activeGossipMessage = gossipManager.getObjectMapper().readValue(p.getData(), Base.class);
gossipCore.receive(activeGossipMessage);
if (activeGossipMessage instanceof SignedPayload){
SignedPayload s = (SignedPayload) activeGossipMessage;
Base nested = gossipManager.getObjectMapper().readValue(s.getData(), Base.class);
gossipCore.receive(nested);
signed.mark();
} else {
gossipCore.receive(activeGossipMessage);
unsigned.mark();
}
} catch (RuntimeException ex) {//TODO trap json exception
LOGGER.error("Unable to process message", ex);
}

View File

@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.model;
public class SignedPayload extends Base{
private byte [] data;
private byte [] signature;
public byte[] getData() {
return data;
}
public void setData(byte[] data) {
this.data = data;
}
public byte[] getSignature() {
return signature;
}
public void setSignature(byte[] signature) {
this.signature = signature;
}
}

View File

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.secure;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.security.KeyPair;
import java.security.KeyPairGenerator;
import java.security.NoSuchAlgorithmException;
import java.security.NoSuchProviderException;
import java.security.PrivateKey;
import java.security.PublicKey;
import java.security.SecureRandom;
public class KeyTool {
public static void generatePubandPrivateKeyFiles(String path, String id)
throws NoSuchAlgorithmException, NoSuchProviderException, IOException{
SecureRandom r = new SecureRandom();
KeyPairGenerator keyGen = KeyPairGenerator.getInstance("DSA", "SUN");
keyGen.initialize(1024, r);
KeyPair pair = keyGen.generateKeyPair();
PrivateKey priv = pair.getPrivate();
PublicKey pub = pair.getPublic();
{
FileOutputStream sigfos = new FileOutputStream(new File(path, id));
sigfos.write(priv.getEncoded());
sigfos.close();
}
{
FileOutputStream sigfos = new FileOutputStream(new File(path, id + ".pub"));
sigfos.write(pub.getEncoded());
sigfos.close();
}
}
public static void main (String [] args) throws
NoSuchAlgorithmException, NoSuchProviderException, IOException{
generatePubandPrivateKeyFiles(args[0], args[1]);
}
}

View File

@ -79,7 +79,6 @@ public class DataTest {
}
}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo("b");
TUnit.assertThat(new Callable<Object>() {
public Object call() throws Exception {
SharedGossipDataMessage x = clients.get(1).findSharedData("a");

View File

@ -0,0 +1,121 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.security.NoSuchAlgorithmException;
import java.security.NoSuchProviderException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.gossip.manager.PassiveGossipConstants;
import org.apache.gossip.secure.KeyTool;
import org.junit.Assert;
import org.junit.Test;
import com.codahale.metrics.MetricRegistry;
import io.teknek.tunit.TUnit;
public class SignedMessageTest {
@Test(expected=IllegalArgumentException.class)
public void ifSignMustHaveKeys()
throws URISyntaxException, UnknownHostException, InterruptedException {
String cluster = UUID.randomUUID().toString();
GossipSettings settings = gossiperThatSigns();
List<GossipMember> startupMembers = new ArrayList<>();
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (30000 + 1));
GossipService gossipService = new GossipService(cluster, uri, 1 + "",
new HashMap<String, String>(), startupMembers, settings, (a, b) -> { },
new MetricRegistry());
gossipService.start();
}
private GossipSettings gossiperThatSigns(){
GossipSettings settings = new GossipSettings();
settings.setPersistRingState(false);
settings.setPersistDataState(false);
settings.setSignMessages(true);
return settings;
}
@Test
public void dataTest() throws InterruptedException, URISyntaxException, NoSuchAlgorithmException, NoSuchProviderException, IOException{
String keys = "./keys";
GossipSettings settings = gossiperThatSigns();
setup(keys);
String cluster = UUID.randomUUID().toString();
List<GossipMember> startupMembers = new ArrayList<>();
for (int i = 1; i < 2; ++i) {
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (30000 + i));
startupMembers.add(new RemoteGossipMember(cluster, uri, i + ""));
}
final List<GossipService> clients = new ArrayList<>();
for (int i = 1; i < 3; ++i) {
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (30000 + i));
GossipService gossipService = new GossipService(cluster, uri, i + "",
new HashMap<String,String>(), startupMembers, settings,
(a,b) -> {}, new MetricRegistry());
clients.add(gossipService);
gossipService.start();
}
assertTwoAlive(clients);
assertOnlySignedMessages(clients);
cleanup(keys, clients);
}
private void assertTwoAlive(List<GossipService> clients){
TUnit.assertThat(() -> {
int total = 0;
for (int i = 0; i < clients.size(); ++i) {
total += clients.get(i).getGossipManager().getLiveMembers().size();
}
return total;
}).afterWaitingAtMost(20, TimeUnit.SECONDS).isEqualTo(2);
}
private void assertOnlySignedMessages(List<GossipService> clients){
Assert.assertEquals(0, clients.get(0).getGossipManager().getRegistry()
.meter(PassiveGossipConstants.UNSIGNED_MESSAGE).getCount());
Assert.assertTrue(clients.get(0).getGossipManager().getRegistry()
.meter(PassiveGossipConstants.SIGNED_MESSAGE).getCount() > 0);
}
private void cleanup(String keys, List<GossipService> clients){
new File(keys, "1").delete();
new File(keys, "2").delete();
new File(keys).delete();
for (int i = 0; i < clients.size(); ++i) {
clients.get(i).shutdown();
}
}
private void setup(String keys) throws NoSuchAlgorithmException, NoSuchProviderException, IOException {
new File(keys).mkdir();
KeyTool.generatePubandPrivateKeyFiles(keys, "1");
KeyTool.generatePubandPrivateKeyFiles(keys, "2");
}
}