package com.google.code.gossip.manager; import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import javax.management.Notification; import javax.management.NotificationListener; import com.google.code.gossip.GossipMember; import com.google.code.gossip.GossipService; import com.google.code.gossip.GossipSettings; import com.google.code.gossip.LocalGossipMember; public abstract class GossipManager extends Thread implements NotificationListener { /** The maximal number of bytes the packet with the GOSSIP may be. (Default is 100 kb) */ public static final int MAX_PACKET_SIZE = 102400; /** The list of members which are in the gossip group (not including myself). */ private ArrayList _memberList; /** The list of members which are known to be dead. */ private ArrayList _deadList; /** The member I am representing. */ private LocalGossipMember _me; /** The settings for gossiping. */ private GossipSettings _settings; /** A boolean whether the gossip service should keep running. */ private AtomicBoolean _gossipServiceRunning; /** A ExecutorService used for executing the active and passive gossip threads. */ private ExecutorService _gossipThreadExecutor; private Class _passiveGossipThreadClass; private PassiveGossipThread passiveGossipThread; private Class _activeGossipThreadClass; private ActiveGossipThread activeGossipThread; public GossipManager(Class passiveGossipThreadClass, Class activeGossipThreadClass, String address, int port, String id, GossipSettings settings, ArrayList gossipMembers) { _passiveGossipThreadClass = passiveGossipThreadClass; _activeGossipThreadClass = activeGossipThreadClass; _settings = settings; _me = new LocalGossipMember(address, port, id, 0, this, settings.getCleanupInterval()); _memberList = new ArrayList(); _deadList = new ArrayList(); for (GossipMember startupMember : gossipMembers) { if (!startupMember.equals(_me)) { LocalGossipMember member = new LocalGossipMember(startupMember.getHost(), startupMember.getPort(), startupMember.getId(), 0, this, settings.getCleanupInterval()); _memberList.add(member); GossipService.LOGGER.debug(member); } } _gossipServiceRunning = new AtomicBoolean(true); Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { public void run() { GossipService.LOGGER.info("Service has been shutdown..."); } })); } /** * All timers associated with a member will trigger this method when it goes * off. The timer will go off if we have not heard from this member in * _settings.T_CLEANUP time. */ @Override public void handleNotification(Notification notification, Object handback) { LocalGossipMember deadMember = (LocalGossipMember) notification.getUserData(); GossipService.LOGGER.info("Dead member detected: " + deadMember); synchronized (this._memberList) { this._memberList.remove(deadMember); } synchronized (this._deadList) { this._deadList.add(deadMember); } } public GossipSettings getSettings() { return _settings; } /** * Get a clone of the memberlist. * @return */ public ArrayList getMemberList() { return _memberList; } public LocalGossipMember getMyself() { return _me; } public ArrayList getDeadList() { return _deadList; } /** * Starts the client. Specifically, start the various cycles for this protocol. * Start the gossip thread and start the receiver thread. * @throws InterruptedException */ public void run() { for (LocalGossipMember member : _memberList) { if (member != _me) { member.startTimeoutTimer(); } } _gossipThreadExecutor = Executors.newCachedThreadPool(); try { passiveGossipThread = _passiveGossipThreadClass.getConstructor(GossipManager.class).newInstance(this); _gossipThreadExecutor.execute(passiveGossipThread); activeGossipThread = _activeGossipThreadClass.getConstructor(GossipManager.class).newInstance(this); _gossipThreadExecutor.execute(activeGossipThread); } catch (InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException | NoSuchMethodException | SecurityException e1) { throw new RuntimeException(e1); } GossipService.LOGGER.info("The GossipService is started."); while(_gossipServiceRunning.get()) { try { //TODO TimeUnit.MILLISECONDS.sleep(1); } catch (InterruptedException e) { GossipService.LOGGER.info("The GossipClient was interrupted."); } } } /** * Shutdown the gossip service. */ public void shutdown() { _gossipThreadExecutor.shutdown(); passiveGossipThread.shutdown(); activeGossipThread.shutdown(); try { boolean result = _gossipThreadExecutor.awaitTermination(1000, TimeUnit.MILLISECONDS); System.err.println("Terminate retuned " + result); } catch (InterruptedException e) { e.printStackTrace(); } _gossipServiceRunning.set(false); } }