renamed packages from 'google' to 'apache' and updated necessary imports

This commit is contained in:
Dorian Ellerbe
2016-05-29 00:50:32 -04:00
parent 4d2ea58bab
commit 3ca8e0f9c8
23 changed files with 78 additions and 80 deletions

View File

@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.manager;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.gossip.GossipService;
import org.apache.gossip.LocalGossipMember;
/**
* [The active thread: periodically send gossip request.] The class handles gossiping the membership
* list. This information is important to maintaining a common state among all the nodes, and is
* important for detecting failures.
*/
abstract public class ActiveGossipThread implements Runnable {
protected final GossipManager gossipManager;
private final AtomicBoolean keepRunning;
public ActiveGossipThread(GossipManager gossipManager) {
this.gossipManager = gossipManager;
this.keepRunning = new AtomicBoolean(true);
}
@Override
public void run() {
while (keepRunning.get()) {
try {
TimeUnit.MILLISECONDS.sleep(gossipManager.getSettings().getGossipInterval());
sendMembershipList(gossipManager.getMyself(), gossipManager.getMemberList());
} catch (InterruptedException e) {
GossipService.LOGGER.error(e);
keepRunning.set(false);
}
}
shutdown();
}
public void shutdown() {
keepRunning.set(false);
}
/**
* Performs the sending of the membership list, after we have incremented our own heartbeat.
*/
abstract protected void sendMembershipList(LocalGossipMember me,
List<LocalGossipMember> memberList);
/**
* Abstract method which should be implemented by a subclass. This method should return a member
* of the list to gossip with.
*
* @param memberList
* The list of members which are stored in the local list of members.
* @return The chosen LocalGossipMember to gossip with.
*/
abstract protected LocalGossipMember selectPartner(List<LocalGossipMember> memberList);
}

View File

@ -0,0 +1,217 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.manager;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.management.Notification;
import javax.management.NotificationListener;
import org.apache.log4j.Logger;
import org.apache.gossip.GossipMember;
import org.apache.gossip.GossipService;
import org.apache.gossip.GossipSettings;
import org.apache.gossip.LocalGossipMember;
import org.apache.gossip.event.GossipListener;
import org.apache.gossip.event.GossipState;
public abstract class GossipManager extends Thread implements NotificationListener {
public static final Logger LOGGER = Logger.getLogger(GossipManager.class);
public static final int MAX_PACKET_SIZE = 102400;
private final ConcurrentSkipListMap<LocalGossipMember, GossipState> members;
private final LocalGossipMember me;
private final GossipSettings settings;
private final AtomicBoolean gossipServiceRunning;
private final Class<? extends PassiveGossipThread> passiveGossipThreadClass;
private final Class<? extends ActiveGossipThread> activeGossipThreadClass;
private final GossipListener listener;
private ActiveGossipThread activeGossipThread;
private PassiveGossipThread passiveGossipThread;
private ExecutorService gossipThreadExecutor;
public GossipManager(Class<? extends PassiveGossipThread> passiveGossipThreadClass,
Class<? extends ActiveGossipThread> activeGossipThreadClass, String cluster,
String address, int port, String id, GossipSettings settings,
List<GossipMember> gossipMembers, GossipListener listener) {
this.passiveGossipThreadClass = passiveGossipThreadClass;
this.activeGossipThreadClass = activeGossipThreadClass;
this.settings = settings;
me = new LocalGossipMember(cluster, address, port, id, System.currentTimeMillis(), this,
settings.getCleanupInterval());
members = new ConcurrentSkipListMap<>();
for (GossipMember startupMember : gossipMembers) {
if (!startupMember.equals(me)) {
LocalGossipMember member = new LocalGossipMember(startupMember.getClusterName(),
startupMember.getHost(), startupMember.getPort(), startupMember.getId(),
System.currentTimeMillis(), this, settings.getCleanupInterval());
members.put(member, GossipState.UP);
GossipService.LOGGER.debug(member);
}
}
gossipThreadExecutor = Executors.newCachedThreadPool();
gossipServiceRunning = new AtomicBoolean(true);
this.listener = listener;
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
public void run() {
GossipService.LOGGER.debug("Service has been shutdown...");
}
}));
}
/**
* All timers associated with a member will trigger this method when it goes off. The timer will
* go off if we have not heard from this member in <code> _settings.T_CLEANUP </code> time.
*/
@Override
public void handleNotification(Notification notification, Object handback) {
LocalGossipMember deadMember = (LocalGossipMember) notification.getUserData();
GossipService.LOGGER.debug("Dead member detected: " + deadMember);
members.put(deadMember, GossipState.DOWN);
if (listener != null) {
listener.gossipEvent(deadMember, GossipState.DOWN);
}
}
public void revivieMember(LocalGossipMember m) {
for (Entry<LocalGossipMember, GossipState> it : this.members.entrySet()) {
if (it.getKey().getId().equals(m.getId())) {
it.getKey().disableTimer();
}
}
members.remove(m);
members.put(m, GossipState.UP);
if (listener != null) {
listener.gossipEvent(m, GossipState.UP);
}
}
public void createOrRevivieMember(LocalGossipMember m) {
members.put(m, GossipState.UP);
if (listener != null) {
listener.gossipEvent(m, GossipState.UP);
}
}
public GossipSettings getSettings() {
return settings;
}
/**
*
* @return a read only list of members found in the UP state
*/
public List<LocalGossipMember> getMemberList() {
List<LocalGossipMember> up = new ArrayList<>();
for (Entry<LocalGossipMember, GossipState> entry : members.entrySet()) {
if (GossipState.UP.equals(entry.getValue())) {
up.add(entry.getKey());
}
}
return Collections.unmodifiableList(up);
}
public LocalGossipMember getMyself() {
return me;
}
public List<LocalGossipMember> getDeadList() {
List<LocalGossipMember> up = new ArrayList<>();
for (Entry<LocalGossipMember, GossipState> entry : members.entrySet()) {
if (GossipState.DOWN.equals(entry.getValue())) {
up.add(entry.getKey());
}
}
return Collections.unmodifiableList(up);
}
/**
* Starts the client. Specifically, start the various cycles for this protocol. Start the gossip
* thread and start the receiver thread.
*/
public void run() {
for (LocalGossipMember member : members.keySet()) {
if (member != me) {
member.startTimeoutTimer();
}
}
try {
passiveGossipThread = passiveGossipThreadClass.getConstructor(GossipManager.class)
.newInstance(this);
gossipThreadExecutor.execute(passiveGossipThread);
activeGossipThread = activeGossipThreadClass.getConstructor(GossipManager.class)
.newInstance(this);
gossipThreadExecutor.execute(activeGossipThread);
} catch (InstantiationException | IllegalAccessException | IllegalArgumentException
| InvocationTargetException | NoSuchMethodException | SecurityException e1) {
throw new RuntimeException(e1);
}
GossipService.LOGGER.debug("The GossipService is started.");
while (gossipServiceRunning.get()) {
try {
// TODO
TimeUnit.MILLISECONDS.sleep(1);
} catch (InterruptedException e) {
GossipService.LOGGER.warn("The GossipClient was interrupted.");
}
}
}
/**
* Shutdown the gossip service.
*/
public void shutdown() {
gossipServiceRunning.set(false);
gossipThreadExecutor.shutdown();
if (passiveGossipThread != null) {
passiveGossipThread.shutdown();
}
if (activeGossipThread != null) {
activeGossipThread.shutdown();
}
try {
boolean result = gossipThreadExecutor.awaitTermination(1000, TimeUnit.MILLISECONDS);
if (!result) {
LOGGER.error("executor shutdown timed out");
}
} catch (InterruptedException e) {
LOGGER.error(e);
}
}
}

View File

@ -0,0 +1,169 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.manager;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.SocketException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.gossip.GossipMember;
import org.apache.gossip.GossipService;
import org.apache.gossip.model.ActiveGossipMessage;
import org.apache.log4j.Logger;
import org.codehaus.jackson.map.ObjectMapper;
import org.apache.gossip.RemoteGossipMember;
/**
* [The passive thread: reply to incoming gossip request.] This class handles the passive cycle,
* where this client has received an incoming message. For now, this message is always the
* membership list, but if you choose to gossip additional information, you will need some logic to
* determine the incoming message.
*/
abstract public class PassiveGossipThread implements Runnable {
public static final Logger LOGGER = Logger.getLogger(PassiveGossipThread.class);
/** The socket used for the passive thread of the gossip service. */
private final DatagramSocket server;
private final GossipManager gossipManager;
private final AtomicBoolean keepRunning;
private final String cluster;
private final ObjectMapper MAPPER = new ObjectMapper();
public PassiveGossipThread(GossipManager gossipManager) {
this.gossipManager = gossipManager;
try {
SocketAddress socketAddress = new InetSocketAddress(gossipManager.getMyself().getHost(),
gossipManager.getMyself().getPort());
server = new DatagramSocket(socketAddress);
GossipService.LOGGER.debug("Gossip service successfully initialized on port "
+ gossipManager.getMyself().getPort());
GossipService.LOGGER.debug("I am " + gossipManager.getMyself());
cluster = gossipManager.getMyself().getClusterName();
if (cluster == null){
throw new IllegalArgumentException("cluster was null");
}
} catch (SocketException ex) {
GossipService.LOGGER.warn(ex);
throw new RuntimeException(ex);
}
keepRunning = new AtomicBoolean(true);
}
@Override
public void run() {
while (keepRunning.get()) {
try {
byte[] buf = new byte[server.getReceiveBufferSize()];
DatagramPacket p = new DatagramPacket(buf, buf.length);
server.receive(p);
int packet_length = 0;
for (int i = 0; i < 4; i++) {
int shift = (4 - 1 - i) * 8;
packet_length += (buf[i] & 0x000000FF) << shift;
}
if (packet_length <= GossipManager.MAX_PACKET_SIZE) {
byte[] json_bytes = new byte[packet_length];
for (int i = 0; i < packet_length; i++) {
json_bytes[i] = buf[i + 4];
}
if (GossipService.LOGGER.isDebugEnabled()){
String receivedMessage = new String(json_bytes);
GossipService.LOGGER.debug("Received message (" + packet_length + " bytes): "
+ receivedMessage);
}
try {
List<GossipMember> remoteGossipMembers = new ArrayList<>();
RemoteGossipMember senderMember = null;
ActiveGossipMessage activeGossipMessage = MAPPER.readValue(json_bytes,
ActiveGossipMessage.class);
for (int i = 0; i < activeGossipMessage.getMembers().size(); i++) {
RemoteGossipMember member = new RemoteGossipMember(
activeGossipMessage.getMembers().get(i).getCluster(),
activeGossipMessage.getMembers().get(i).getHost(),
activeGossipMessage.getMembers().get(i).getPort(),
activeGossipMessage.getMembers().get(i).getId(),
activeGossipMessage.getMembers().get(i).getHeartbeat());
if (!(member.getClusterName().equals(cluster))){
GossipService.LOGGER.warn("Note a member of this cluster " + i);
continue;
}
// This is the first member found, so this should be the member who is communicating
// with me.
if (i == 0) {
senderMember = member;
}
remoteGossipMembers.add(member);
}
mergeLists(gossipManager, senderMember, remoteGossipMembers);
} catch (RuntimeException ex) {
GossipService.LOGGER.error("Unable to process message", ex);
}
} else {
GossipService.LOGGER
.error("The received message is not of the expected size, it has been dropped.");
}
} catch (IOException e) {
GossipService.LOGGER.error(e);
System.out.println(e);
keepRunning.set(false);
}
}
shutdown();
}
public void shutdown() {
try {
server.close();
} catch (RuntimeException ex) {
}
}
/**
* Abstract method for merging the local and remote list.
*
* @param gossipManager
* The GossipManager for retrieving the local members and dead members list.
* @param senderMember
* The member who is sending this list, this could be used to send a response if the
* remote list contains out-dated information.
* @param remoteList
* The list of members known at the remote side.
*/
abstract protected void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember,
List<GossipMember> remoteList);
}
/*
* random comments // Check whether the package is smaller than the maximal packet length. // A
* package larger than this would not be possible to be send from a GossipService, // since this is
* check before sending the message. // This could normally only occur when the list of members is
* very big, // or when the packet is malformed, and the first 4 bytes is not the right in anymore.
* // For this reason we regards the message.
*/

View File

@ -0,0 +1,128 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.manager.impl;
import java.util.List;
import org.apache.gossip.GossipMember;
import org.apache.gossip.GossipService;
import org.apache.gossip.LocalGossipMember;
import org.apache.gossip.RemoteGossipMember;
import org.apache.gossip.manager.GossipManager;
import org.apache.gossip.manager.PassiveGossipThread;
public class OnlyProcessReceivedPassiveGossipThread extends PassiveGossipThread {
public OnlyProcessReceivedPassiveGossipThread(GossipManager gossipManager) {
super(gossipManager);
}
/**
* Merge remote list (received from peer), and our local member list. Simply, we must update the
* heartbeats that the remote list has with our list. Also, some additional logic is needed to
* make sure we have not timed out a member and then immediately received a list with that member.
*
* @param gossipManager
* @param senderMember
* @param remoteList
*/
protected void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember,
List<GossipMember> remoteList) {
// if the person sending to us is in the dead list consider them up
for (LocalGossipMember i : gossipManager.getDeadList()) {
if (i.getId().equals(senderMember.getId())) {
System.out.println(gossipManager.getMyself() + " caught a live one!");
LocalGossipMember newLocalMember = new LocalGossipMember(senderMember.getClusterName(),
senderMember.getHost(), senderMember.getPort(), senderMember.getId(),
senderMember.getHeartbeat(), gossipManager, gossipManager.getSettings()
.getCleanupInterval());
gossipManager.revivieMember(newLocalMember);
newLocalMember.startTimeoutTimer();
}
}
for (GossipMember remoteMember : remoteList) {
if (remoteMember.getId().equals(gossipManager.getMyself().getId())) {
continue;
}
if (gossipManager.getMemberList().contains(remoteMember)) {
LocalGossipMember localMember = gossipManager.getMemberList().get(
gossipManager.getMemberList().indexOf(remoteMember));
if (remoteMember.getHeartbeat() > localMember.getHeartbeat()) {
localMember.setHeartbeat(remoteMember.getHeartbeat());
localMember.resetTimeoutTimer();
}
} else if (!gossipManager.getMemberList().contains(remoteMember)
&& !gossipManager.getDeadList().contains(remoteMember)) {
LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getClusterName(),
remoteMember.getHost(), remoteMember.getPort(), remoteMember.getId(),
remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings()
.getCleanupInterval());
gossipManager.createOrRevivieMember(newLocalMember);
newLocalMember.startTimeoutTimer();
} else {
if (gossipManager.getDeadList().contains(remoteMember)) {
LocalGossipMember localDeadMember = gossipManager.getDeadList().get(
gossipManager.getDeadList().indexOf(remoteMember));
if (remoteMember.getHeartbeat() > localDeadMember.getHeartbeat()) {
LocalGossipMember newLocalMember = new LocalGossipMember(remoteMember.getClusterName(),
remoteMember.getHost(), remoteMember.getPort(), remoteMember.getId(),
remoteMember.getHeartbeat(), gossipManager, gossipManager.getSettings()
.getCleanupInterval());
gossipManager.revivieMember(newLocalMember);
newLocalMember.startTimeoutTimer();
GossipService.LOGGER.debug("Removed remote member " + remoteMember.getAddress()
+ " from dead list and added to local member list.");
} else {
GossipService.LOGGER.debug("me " + gossipManager.getMyself());
GossipService.LOGGER.debug("sender " + senderMember);
GossipService.LOGGER.debug("remote " + remoteList);
GossipService.LOGGER.debug("live " + gossipManager.getMemberList());
GossipService.LOGGER.debug("dead " + gossipManager.getDeadList());
}
} else {
GossipService.LOGGER.debug("me " + gossipManager.getMyself());
GossipService.LOGGER.debug("sender " + senderMember);
GossipService.LOGGER.debug("remote " + remoteList);
GossipService.LOGGER.debug("live " + gossipManager.getMemberList());
GossipService.LOGGER.debug("dead " + gossipManager.getDeadList());
// throw new IllegalArgumentException("wtf");
}
}
}
}
}
/**
* old comment section: // If a member is restarted the heartbeat will restart from 1, so we should
* check // that here. // So a member can become from the dead when it is either larger than a
* previous // heartbeat (due to network failure) // or when the heartbeat is 1 (after a restart of
* the service). // TODO: What if the first message of a gossip service is sent to a dead node? The
* // second member will receive a heartbeat of two. // TODO: The above does happen. Maybe a special
* message for a revived member? // TODO: Or maybe when a member is declared dead for more than //
* _settings.getCleanupInterval() ms, reset the heartbeat to 0. // It will then accept a revived
* member. // The above is now handle by checking whether the heartbeat differs //
* _settings.getCleanupInterval(), it must be restarted.
*/
/*
* // The remote member is back from the dead. // Remove it from the dead list. //
* gossipManager.getDeadList().remove(localDeadMember); // Add it as a new member and add it to the
* member list.
*/

View File

@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.manager.impl;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.List;
import org.apache.gossip.GossipService;
import org.apache.gossip.LocalGossipMember;
import org.apache.gossip.manager.ActiveGossipThread;
import org.apache.gossip.manager.GossipManager;
import org.apache.gossip.model.ActiveGossipMessage;
import org.apache.gossip.model.GossipMember;
import org.codehaus.jackson.map.ObjectMapper;
abstract public class SendMembersActiveGossipThread extends ActiveGossipThread {
protected ObjectMapper om = new ObjectMapper();
public SendMembersActiveGossipThread(GossipManager gossipManager) {
super(gossipManager);
}
private GossipMember convert(LocalGossipMember member){
GossipMember gm = new GossipMember();
gm.setCluster(member.getClusterName());
gm.setHeartbeat(member.getHeartbeat());
gm.setHost(member.getHost());
gm.setId(member.getId());
gm.setPort(member.getPort());
return gm;
}
/**
* Performs the sending of the membership list, after we have incremented our own heartbeat.
*/
protected void sendMembershipList(LocalGossipMember me, List<LocalGossipMember> memberList) {
GossipService.LOGGER.debug("Send sendMembershipList() is called.");
me.setHeartbeat(System.currentTimeMillis());
LocalGossipMember member = selectPartner(memberList);
if (member == null) {
return;
}
try (DatagramSocket socket = new DatagramSocket()) {
socket.setSoTimeout(gossipManager.getSettings().getGossipInterval());
InetAddress dest = InetAddress.getByName(member.getHost());
ActiveGossipMessage message = new ActiveGossipMessage();
message.getMembers().add(convert(me));
for (LocalGossipMember other : memberList) {
message.getMembers().add(convert(other));
}
byte[] json_bytes = om.writeValueAsString(message).getBytes();
int packet_length = json_bytes.length;
if (packet_length < GossipManager.MAX_PACKET_SIZE) {
byte[] buf = createBuffer(packet_length, json_bytes);
DatagramPacket datagramPacket = new DatagramPacket(buf, buf.length, dest, member.getPort());
socket.send(datagramPacket);
} else {
GossipService.LOGGER.error("The length of the to be send message is too large ("
+ packet_length + " > " + GossipManager.MAX_PACKET_SIZE + ").");
}
} catch (IOException e1) {
GossipService.LOGGER.warn(e1);
}
}
private byte[] createBuffer(int packetLength, byte[] jsonBytes) {
byte[] lengthBytes = new byte[4];
lengthBytes[0] = (byte) (packetLength >> 24);
lengthBytes[1] = (byte) ((packetLength << 8) >> 24);
lengthBytes[2] = (byte) ((packetLength << 16) >> 24);
lengthBytes[3] = (byte) ((packetLength << 24) >> 24);
ByteBuffer byteBuffer = ByteBuffer.allocate(4 + jsonBytes.length);
byteBuffer.put(lengthBytes);
byteBuffer.put(jsonBytes);
byte[] buf = byteBuffer.array();
return buf;
}
}

View File

@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.manager.random;
import java.util.List;
import java.util.Random;
import org.apache.gossip.GossipService;
import org.apache.gossip.LocalGossipMember;
import org.apache.gossip.manager.GossipManager;
import org.apache.gossip.manager.impl.SendMembersActiveGossipThread;
public class RandomActiveGossipThread extends SendMembersActiveGossipThread {
/** The Random used for choosing a member to gossip with. */
private final Random random;
public RandomActiveGossipThread(GossipManager gossipManager) {
super(gossipManager);
random = new Random();
}
/**
* [The selectToSend() function.] Find a random peer from the local membership list. In the case
* where this client is the only member in the list, this method will return null.
*
* @return Member random member if list is greater than 1, null otherwise
*/
protected LocalGossipMember selectPartner(List<LocalGossipMember> memberList) {
LocalGossipMember member = null;
if (memberList.size() > 0) {
int randomNeighborIndex = random.nextInt(memberList.size());
member = memberList.get(randomNeighborIndex);
} else {
GossipService.LOGGER.debug("I am alone in this world.");
}
return member;
}
}

View File

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.manager.random;
import org.apache.gossip.GossipMember;
import org.apache.gossip.GossipSettings;
import org.apache.gossip.event.GossipListener;
import org.apache.gossip.manager.GossipManager;
import org.apache.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread;
import java.util.List;
public class RandomGossipManager extends GossipManager {
public RandomGossipManager(String cluster, String address, int port, String id,
GossipSettings settings, List<GossipMember> gossipMembers, GossipListener listener) {
super(OnlyProcessReceivedPassiveGossipThread.class, RandomActiveGossipThread.class, cluster,
address, port, id, settings, gossipMembers, listener);
}
}