GOSSIP-15 avoid busy loop (ChiaHung Lin via egc)

This commit is contained in:
Edward Capriolo
2016-09-23 22:13:40 -04:00
parent 9a1af76df8
commit 375cee2a3b
4 changed files with 99 additions and 166 deletions

View File

@ -70,8 +70,8 @@ public class GossipService {
} }
public void start() { public void start() {
LOGGER.debug("Starting: " + gossipManager.getName() + " - " + get_gossipManager().getMyself().getUri()); LOGGER.debug("Starting: " + get_gossipManager().getMyself().getUri());
gossipManager.start(); gossipManager.init();
} }
public void shutdown() { public void shutdown() {

View File

@ -17,59 +17,103 @@
*/ */
package org.apache.gossip.manager; package org.apache.gossip.manager;
import java.io.IOException;
import java.net.DatagramSocket;
import java.util.List; import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.gossip.GossipService; import org.apache.gossip.GossipService;
import org.apache.gossip.LocalGossipMember; import org.apache.gossip.LocalGossipMember;
import org.apache.gossip.model.ActiveGossipOk;
import org.apache.gossip.model.GossipMember;
import org.apache.gossip.model.Response;
import org.apache.gossip.udp.UdpActiveGossipMessage;
import org.apache.log4j.Logger;
import org.codehaus.jackson.map.ObjectMapper;
/** /**
* [The active thread: periodically send gossip request.] The class handles gossiping the membership * [The active thread: periodically send gossip request.] The class handles gossiping the membership
* list. This information is important to maintaining a common state among all the nodes, and is * list. This information is important to maintaining a common state among all the nodes, and is
* important for detecting failures. * important for detecting failures.
*/ */
abstract public class ActiveGossipThread implements Runnable { public class ActiveGossipThread {
public static final Logger LOGGER = Logger.getLogger(ActiveGossipThread.class);
private ScheduledExecutorService scheduledExecutorService ;
private ObjectMapper MAPPER = new ObjectMapper();
private final Random random;
protected final GossipManager gossipManager; protected final GossipManager gossipManager;
private final GossipCore gossipCore;
private final AtomicBoolean keepRunning; public ActiveGossipThread(GossipManager gossipManager, GossipCore gossipCore) {
public ActiveGossipThread(GossipManager gossipManager) {
this.gossipManager = gossipManager; this.gossipManager = gossipManager;
this.keepRunning = new AtomicBoolean(true); this.scheduledExecutorService = Executors.newScheduledThreadPool(1024);
this.gossipCore = gossipCore;
this.random = new Random();
} }
@Override public void init() {
public void run() { scheduledExecutorService.scheduleAtFixedRate(
while (keepRunning.get()) { () -> sendMembershipList(gossipManager.getMyself(), gossipManager.getLiveMembers()), 0,
try { gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
TimeUnit.MILLISECONDS.sleep(gossipManager.getSettings().getGossipInterval()); scheduledExecutorService.scheduleAtFixedRate(
() -> sendMembershipList(gossipManager.getMyself(), gossipManager.getDeadMembers()), 0,
// contact a live member. gossipManager.getSettings().getGossipInterval(), TimeUnit.MILLISECONDS);
sendMembershipList(gossipManager.getMyself(), gossipManager.getLiveMembers());
// contact a dead member.
sendMembershipList(gossipManager.getMyself(), gossipManager.getDeadMembers());
} catch (InterruptedException e) {
GossipService.LOGGER.error(e);
keepRunning.set(false);
}
}
shutdown();
} }
public void shutdown() { public void shutdown() {
keepRunning.set(false); this.scheduledExecutorService.shutdown();
try {
this.scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
LOGGER.warn("Did not complete shutdown", e);
}
} }
/** /**
* Performs the sending of the membership list, after we have incremented our own heartbeat. * Performs the sending of the membership list, after we have incremented our own heartbeat.
*/ */
abstract protected void sendMembershipList(LocalGossipMember me, protected void sendMembershipList(LocalGossipMember me, List<LocalGossipMember> memberList) {
List<LocalGossipMember> memberList);
me.setHeartbeat(System.currentTimeMillis());
LocalGossipMember member = selectPartner(memberList);
if (member == null) {
GossipService.LOGGER.debug("Send sendMembershipList() is called without action");
return;
} else {
GossipService.LOGGER.debug("Send sendMembershipList() is called to " + member.toString());
}
try (DatagramSocket socket = new DatagramSocket()) {
socket.setSoTimeout(gossipManager.getSettings().getGossipInterval());
UdpActiveGossipMessage message = new UdpActiveGossipMessage();
message.setUriFrom(gossipManager.getMyself().getUri().toASCIIString());
message.setUuid(UUID.randomUUID().toString());
message.getMembers().add(convert(me));
for (LocalGossipMember other : memberList) {
message.getMembers().add(convert(other));
}
byte[] json_bytes = MAPPER.writeValueAsString(message).getBytes();
int packet_length = json_bytes.length;
if (packet_length < GossipManager.MAX_PACKET_SIZE) {
Response r = gossipCore.send(message, member.getUri());
if (r instanceof ActiveGossipOk){
//maybe count metrics here
} else {
LOGGER.warn("Message "+ message + " generated response "+ r);
}
} else {
GossipService.LOGGER.error("The length of the to be send message is too large ("
+ packet_length + " > " + GossipManager.MAX_PACKET_SIZE + ").");
}
} catch (IOException e1) {
GossipService.LOGGER.warn(e1);
}
}
/** /**
* Abstract method which should be implemented by a subclass. This method should return a member * Abstract method which should be implemented by a subclass. This method should return a member
* of the list to gossip with. * of the list to gossip with.
@ -78,5 +122,23 @@ abstract public class ActiveGossipThread implements Runnable {
* The list of members which are stored in the local list of members. * The list of members which are stored in the local list of members.
* @return The chosen LocalGossipMember to gossip with. * @return The chosen LocalGossipMember to gossip with.
*/ */
abstract protected LocalGossipMember selectPartner(List<LocalGossipMember> memberList); protected LocalGossipMember selectPartner(List<LocalGossipMember> memberList) {
LocalGossipMember member = null;
if (memberList.size() > 0) {
int randomNeighborIndex = random.nextInt(memberList.size());
member = memberList.get(randomNeighborIndex);
} else {
LOGGER.debug("I am alone in this world.");
}
return member;
}
private GossipMember convert(LocalGossipMember member){
GossipMember gm = new GossipMember();
gm.setCluster(member.getClusterName());
gm.setHeartbeat(member.getHeartbeat());
gm.setUri(member.getUri().toASCIIString());
gm.setId(member.getId());
return gm;
}
} }

View File

@ -40,9 +40,8 @@ import org.apache.gossip.LocalGossipMember;
import org.apache.gossip.event.GossipListener; import org.apache.gossip.event.GossipListener;
import org.apache.gossip.event.GossipState; import org.apache.gossip.event.GossipState;
import org.apache.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread; import org.apache.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread;
import org.apache.gossip.manager.random.RandomActiveGossipThread;
public abstract class GossipManager extends Thread implements NotificationListener { public abstract class GossipManager implements NotificationListener {
public static final Logger LOGGER = Logger.getLogger(GossipManager.class); public static final Logger LOGGER = Logger.getLogger(GossipManager.class);
@ -179,7 +178,7 @@ public abstract class GossipManager extends Thread implements NotificationListen
* Starts the client. Specifically, start the various cycles for this protocol. Start the gossip * Starts the client. Specifically, start the various cycles for this protocol. Start the gossip
* thread and start the receiver thread. * thread and start the receiver thread.
*/ */
public void run() { public void init() {
for (LocalGossipMember member : members.keySet()) { for (LocalGossipMember member : members.keySet()) {
if (member != me) { if (member != me) {
member.startTimeoutTimer(); member.startTimeoutTimer();
@ -187,17 +186,9 @@ public abstract class GossipManager extends Thread implements NotificationListen
} }
passiveGossipThread = new OnlyProcessReceivedPassiveGossipThread(this, gossipCore); passiveGossipThread = new OnlyProcessReceivedPassiveGossipThread(this, gossipCore);
gossipThreadExecutor.execute(passiveGossipThread); gossipThreadExecutor.execute(passiveGossipThread);
activeGossipThread = new RandomActiveGossipThread(this, this.gossipCore); activeGossipThread = new ActiveGossipThread(this, this.gossipCore);
gossipThreadExecutor.execute(activeGossipThread); activeGossipThread.init();
GossipService.LOGGER.debug("The GossipService is started."); GossipService.LOGGER.debug("The GossipService is started.");
while (gossipServiceRunning.get()) {
try {
// TODO
TimeUnit.MILLISECONDS.sleep(1);
} catch (InterruptedException e) {
GossipService.LOGGER.warn("The GossipClient was interrupted.");
}
}
} }
/** /**

View File

@ -1,120 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.manager.random;
import java.io.IOException;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import org.apache.gossip.GossipService;
import org.apache.gossip.LocalGossipMember;
import org.apache.gossip.manager.ActiveGossipThread;
import org.apache.gossip.manager.GossipCore;
import org.apache.gossip.manager.GossipManager;
import org.apache.gossip.model.ActiveGossipOk;
import org.apache.gossip.model.GossipMember;
import org.apache.gossip.model.Response;
import org.apache.gossip.udp.UdpActiveGossipMessage;
import org.apache.log4j.Logger;
import org.codehaus.jackson.map.ObjectMapper;
public class RandomActiveGossipThread extends ActiveGossipThread {
public static final Logger LOGGER = Logger.getLogger(RandomActiveGossipThread.class);
protected ObjectMapper MAPPER = new ObjectMapper();
/** The Random used for choosing a member to gossip with. */
private final Random random;
private final GossipCore gossipCore;
public RandomActiveGossipThread(GossipManager gossipManager, GossipCore gossipCore) {
super(gossipManager);
random = new Random();
this.gossipCore = gossipCore;
}
/**
* [The selectToSend() function.] Find a random peer from the local membership list. In the case
* where this client is the only member in the list, this method will return null.
*
* @return Member random member if list is greater than 1, null otherwise
*/
protected LocalGossipMember selectPartner(List<LocalGossipMember> memberList) {
LocalGossipMember member = null;
if (memberList.size() > 0) {
int randomNeighborIndex = random.nextInt(memberList.size());
member = memberList.get(randomNeighborIndex);
} else {
GossipService.LOGGER.debug("I am alone in this world.");
}
return member;
}
protected void sendMembershipList(LocalGossipMember me, List<LocalGossipMember> memberList) {
me.setHeartbeat(System.currentTimeMillis());
LocalGossipMember member = selectPartner(memberList);
if (member == null) {
GossipService.LOGGER.debug("Send sendMembershipList() is called without action");
return;
} else {
GossipService.LOGGER.debug("Send sendMembershipList() is called to " + member.toString());
}
try (DatagramSocket socket = new DatagramSocket()) {
socket.setSoTimeout(gossipManager.getSettings().getGossipInterval());
UdpActiveGossipMessage message = new UdpActiveGossipMessage();
message.setUriFrom(gossipManager.getMyself().getUri().toASCIIString());
message.setUuid(UUID.randomUUID().toString());
message.getMembers().add(convert(me));
for (LocalGossipMember other : memberList) {
message.getMembers().add(convert(other));
}
byte[] json_bytes = MAPPER.writeValueAsString(message).getBytes();
int packet_length = json_bytes.length;
if (packet_length < GossipManager.MAX_PACKET_SIZE) {
Response r = gossipCore.send(message, member.getUri());
if (r instanceof ActiveGossipOk){
//maybe count metrics here
} else {
LOGGER.warn("Message "+ message + " generated response "+ r);
}
} else {
GossipService.LOGGER.error("The length of the to be send message is too large ("
+ packet_length + " > " + GossipManager.MAX_PACKET_SIZE + ").");
}
} catch (IOException e1) {
GossipService.LOGGER.warn(e1);
}
}
private GossipMember convert(LocalGossipMember member){
GossipMember gm = new GossipMember();
gm.setCluster(member.getClusterName());
gm.setHeartbeat(member.getHeartbeat());
gm.setUri(member.getUri().toASCIIString());
gm.setId(member.getId());
return gm;
}
}