GOSSIP-43 cleanup sockets, readme, and use lambdas
This commit is contained in:
@ -18,7 +18,6 @@
|
||||
package org.apache.gossip.manager;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.DatagramSocket;
|
||||
import java.util.List;
|
||||
|
||||
import java.util.Map.Entry;
|
||||
@ -116,8 +115,7 @@ public class ActiveGossipThread {
|
||||
sharedDataHistogram.update(System.currentTimeMillis() - startTime);
|
||||
return;
|
||||
}
|
||||
try (DatagramSocket socket = new DatagramSocket()) {
|
||||
socket.setSoTimeout(gossipManager.getSettings().getGossipInterval());
|
||||
try {
|
||||
for (Entry<String, SharedGossipDataMessage> innerEntry : this.gossipCore.getSharedData().entrySet()){
|
||||
UdpSharedGossipDataMessage message = new UdpSharedGossipDataMessage();
|
||||
message.setUuid(UUID.randomUUID().toString());
|
||||
@ -127,7 +125,6 @@ public class ActiveGossipThread {
|
||||
message.setNodeId(innerEntry.getValue().getNodeId());
|
||||
message.setTimestamp(innerEntry.getValue().getTimestamp());
|
||||
message.setPayload(innerEntry.getValue().getPayload());
|
||||
message.setTimestamp(innerEntry.getValue().getTimestamp());
|
||||
byte[] json_bytes = MAPPER.writeValueAsString(message).getBytes();
|
||||
int packet_length = json_bytes.length;
|
||||
if (packet_length < GossipManager.MAX_PACKET_SIZE) {
|
||||
@ -152,8 +149,7 @@ public class ActiveGossipThread {
|
||||
sendPerNodeDataHistogram.update(System.currentTimeMillis() - startTime);
|
||||
return;
|
||||
}
|
||||
try (DatagramSocket socket = new DatagramSocket()) {
|
||||
socket.setSoTimeout(gossipManager.getSettings().getGossipInterval());
|
||||
try {
|
||||
for (Entry<String, ConcurrentHashMap<String, GossipDataMessage>> entry : gossipCore.getPerNodeData().entrySet()){
|
||||
for (Entry<String, GossipDataMessage> innerEntry : entry.getValue().entrySet()){
|
||||
UdpGossipDataMessage message = new UdpGossipDataMessage();
|
||||
@ -164,7 +160,6 @@ public class ActiveGossipThread {
|
||||
message.setNodeId(innerEntry.getValue().getNodeId());
|
||||
message.setTimestamp(innerEntry.getValue().getTimestamp());
|
||||
message.setPayload(innerEntry.getValue().getPayload());
|
||||
message.setTimestamp(innerEntry.getValue().getTimestamp());
|
||||
byte[] json_bytes = MAPPER.writeValueAsString(message).getBytes();
|
||||
int packet_length = json_bytes.length;
|
||||
if (packet_length < GossipManager.MAX_PACKET_SIZE) {
|
||||
@ -190,12 +185,12 @@ public class ActiveGossipThread {
|
||||
LocalGossipMember member = selectPartner(gossipManager.getDeadMembers());
|
||||
sendMembershipList(gossipManager.getMyself(), member);
|
||||
}
|
||||
|
||||
/**
|
||||
* Performs the sending of the membership list, after we have incremented our own heartbeat.
|
||||
*/
|
||||
protected void sendMembershipList(LocalGossipMember me, LocalGossipMember member) {
|
||||
long startTime = System.currentTimeMillis();
|
||||
|
||||
me.setHeartbeat(System.nanoTime());
|
||||
if (member == null) {
|
||||
LOGGER.debug("Send sendMembershipList() is called without action");
|
||||
@ -204,8 +199,7 @@ public class ActiveGossipThread {
|
||||
} else {
|
||||
LOGGER.debug("Send sendMembershipList() is called to " + member.toString());
|
||||
}
|
||||
try (DatagramSocket socket = new DatagramSocket()) {
|
||||
socket.setSoTimeout(gossipManager.getSettings().getGossipInterval());
|
||||
try {
|
||||
UdpActiveGossipMessage message = new UdpActiveGossipMessage();
|
||||
message.setUriFrom(gossipManager.getMyself().getUri().toASCIIString());
|
||||
message.setUuid(UUID.randomUUID().toString());
|
||||
|
@ -67,7 +67,6 @@ public class GossipCore {
|
||||
private final ConcurrentHashMap<String, SharedGossipDataMessage> sharedData;
|
||||
private final BlockingQueue<Runnable> workQueue;
|
||||
|
||||
|
||||
public GossipCore(GossipManager manager){
|
||||
this.gossipManager = manager;
|
||||
requests = new ConcurrentHashMap<>();
|
||||
@ -175,6 +174,11 @@ public class GossipCore {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends a blocking message. Throws exception when tranmission fails
|
||||
* @param message
|
||||
* @param uri
|
||||
*/
|
||||
private void sendInternal(Base message, URI uri){
|
||||
byte[] json_bytes;
|
||||
try {
|
||||
@ -186,6 +190,7 @@ public class GossipCore {
|
||||
if (packet_length < GossipManager.MAX_PACKET_SIZE) {
|
||||
byte[] buf = UdpUtil.createBuffer(packet_length, json_bytes);
|
||||
try (DatagramSocket socket = new DatagramSocket()) {
|
||||
socket.setSoTimeout(gossipManager.getSettings().getGossipInterval() * 2);
|
||||
InetAddress dest = InetAddress.getByName(uri.getHost());
|
||||
DatagramPacket datagramPacket = new DatagramPacket(buf, buf.length, dest, uri.getPort());
|
||||
socket.send(datagramPacket);
|
||||
@ -245,9 +250,14 @@ public class GossipCore {
|
||||
requests.remove(t.getUuid() + "/" + t.getUriFrom());
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends a message across the network while blocking. Catches and ignores IOException in transmission. Used
|
||||
* when the protocol for the message is not to wait for a response
|
||||
* @param message the message to send
|
||||
* @param u the uri to send it to
|
||||
*/
|
||||
public void sendOneWay(Base message, URI u){
|
||||
byte[] json_bytes;
|
||||
try {
|
||||
@ -259,13 +269,13 @@ public class GossipCore {
|
||||
if (packet_length < GossipManager.MAX_PACKET_SIZE) {
|
||||
byte[] buf = UdpUtil.createBuffer(packet_length, json_bytes);
|
||||
try (DatagramSocket socket = new DatagramSocket()) {
|
||||
socket.setSoTimeout(gossipManager.getSettings().getGossipInterval() * 2);
|
||||
InetAddress dest = InetAddress.getByName(u.getHost());
|
||||
DatagramPacket datagramPacket = new DatagramPacket(buf, buf.length, dest, u.getPort());
|
||||
socket.send(datagramPacket);
|
||||
} catch (IOException ex) { }
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Merge lists from remote members and update heartbeats
|
||||
@ -280,36 +290,31 @@ public class GossipCore {
|
||||
if (LOGGER.isDebugEnabled()){
|
||||
debugState(senderMember, remoteList);
|
||||
}
|
||||
// if the person sending to us is in the dead list consider them up
|
||||
for (LocalGossipMember i : gossipManager.getDeadMembers()) {
|
||||
if (i.getId().equals(senderMember.getId())) {
|
||||
LOGGER.debug(gossipManager.getMyself() + " contacted by dead member " + senderMember.getUri());
|
||||
i.recordHeartbeat(senderMember.getHeartbeat());
|
||||
i.setHeartbeat(senderMember.getHeartbeat());
|
||||
//TODO set node to UP here
|
||||
|
||||
//TODO consider forcing an UP here
|
||||
}
|
||||
}
|
||||
for (GossipMember remoteMember : remoteList) {
|
||||
if (remoteMember.getId().equals(gossipManager.getMyself().getId())) {
|
||||
continue;
|
||||
}
|
||||
LocalGossipMember m = new LocalGossipMember(remoteMember.getClusterName(),
|
||||
LocalGossipMember aNewMember = new LocalGossipMember(remoteMember.getClusterName(),
|
||||
remoteMember.getUri(),
|
||||
remoteMember.getId(),
|
||||
remoteMember.getHeartbeat(),
|
||||
gossipManager.getSettings().getWindowSize(),
|
||||
gossipManager.getSettings().getMinimumSamples());
|
||||
m.recordHeartbeat(remoteMember.getHeartbeat());
|
||||
|
||||
Object result = gossipManager.getMembers().putIfAbsent(m, GossipState.UP);
|
||||
aNewMember.recordHeartbeat(remoteMember.getHeartbeat());
|
||||
Object result = gossipManager.getMembers().putIfAbsent(aNewMember, GossipState.UP);
|
||||
if (result != null){
|
||||
for (Entry<LocalGossipMember, GossipState> l : gossipManager.getMembers().entrySet()){
|
||||
if (l.getKey().getId().equals(remoteMember.getId())){
|
||||
//if (l.getKey().getHeartbeat() < remoteMember.getHeartbeat()){
|
||||
l.getKey().recordHeartbeat(remoteMember.getHeartbeat());
|
||||
l.getKey().setHeartbeat(remoteMember.getHeartbeat());
|
||||
//}
|
||||
for (Entry<LocalGossipMember, GossipState> localMember : gossipManager.getMembers().entrySet()){
|
||||
if (localMember.getKey().getId().equals(remoteMember.getId())){
|
||||
localMember.getKey().recordHeartbeat(remoteMember.getHeartbeat());
|
||||
localMember.getKey().setHeartbeat(remoteMember.getHeartbeat());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -31,6 +31,7 @@ import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
@ -86,7 +87,7 @@ public abstract class GossipManager {
|
||||
clock = new SystemClock();
|
||||
dataReaper = new DataReaper(gossipCore, clock);
|
||||
me = new LocalGossipMember(cluster, uri, id, clock.nanoTime(),
|
||||
+ settings.getWindowSize(), settings.getMinimumSamples());
|
||||
settings.getWindowSize(), settings.getMinimumSamples());
|
||||
members = new ConcurrentSkipListMap<>();
|
||||
for (GossipMember startupMember : gossipMembers) {
|
||||
if (!startupMember.equals(me)) {
|
||||
@ -112,19 +113,15 @@ public abstract class GossipManager {
|
||||
return settings;
|
||||
}
|
||||
|
||||
// TODO: Use some java 8 goodness for these functions.
|
||||
|
||||
/**
|
||||
* @return a read only list of members found in the DOWN state.
|
||||
*/
|
||||
public List<LocalGossipMember> getDeadMembers() {
|
||||
List<LocalGossipMember> down = new ArrayList<>();
|
||||
for (Entry<LocalGossipMember, GossipState> entry : members.entrySet()) {
|
||||
if (GossipState.DOWN.equals(entry.getValue())) {
|
||||
down.add(entry.getKey());
|
||||
}
|
||||
}
|
||||
return Collections.unmodifiableList(down);
|
||||
return Collections.unmodifiableList(
|
||||
members.entrySet()
|
||||
.stream()
|
||||
.filter(entry -> GossipState.DOWN.equals(entry.getValue()))
|
||||
.map(Entry::getKey).collect(Collectors.toList()));
|
||||
}
|
||||
|
||||
/**
|
||||
@ -132,13 +129,11 @@ public abstract class GossipManager {
|
||||
* @return a read only list of members found in the UP state
|
||||
*/
|
||||
public List<LocalGossipMember> getLiveMembers() {
|
||||
List<LocalGossipMember> up = new ArrayList<>();
|
||||
for (Entry<LocalGossipMember, GossipState> entry : members.entrySet()) {
|
||||
if (GossipState.UP.equals(entry.getValue())) {
|
||||
up.add(entry.getKey());
|
||||
}
|
||||
}
|
||||
return Collections.unmodifiableList(up);
|
||||
return Collections.unmodifiableList(
|
||||
members.entrySet()
|
||||
.stream()
|
||||
.filter(entry -> GossipState.UP.equals(entry.getValue()))
|
||||
.map(Entry::getKey).collect(Collectors.toList()));
|
||||
}
|
||||
|
||||
public LocalGossipMember getMyself() {
|
||||
|
Reference in New Issue
Block a user