GOSSIP-46 Refactor away GossipService cleaner better class names

This commit is contained in:
Edward Capriolo
2017-03-05 15:44:54 -05:00
parent d5fe9f96c8
commit 5199821980
47 changed files with 547 additions and 696 deletions

View File

@ -24,16 +24,16 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.MetricRegistry;
import org.apache.gossip.LocalGossipMember;
import org.apache.gossip.LocalMember;
import org.apache.gossip.model.ActiveGossipOk;
import org.apache.gossip.model.GossipDataMessage;
import org.apache.gossip.model.GossipMember;
import org.apache.gossip.model.PerNodeDataMessage;
import org.apache.gossip.model.Member;
import org.apache.gossip.model.Response;
import org.apache.gossip.model.SharedGossipDataMessage;
import org.apache.gossip.model.SharedDataMessage;
import org.apache.gossip.model.ShutdownMessage;
import org.apache.gossip.udp.UdpActiveGossipMessage;
import org.apache.gossip.udp.UdpGossipDataMessage;
import org.apache.gossip.udp.UdpSharedGossipDataMessage;
import org.apache.gossip.udp.UdpPerNodeDataMessage;
import org.apache.gossip.udp.UdpSharedDataMessage;
import org.apache.log4j.Logger;
import static com.codahale.metrics.MetricRegistry.name;
@ -69,7 +69,7 @@ public abstract class AbstractActiveGossiper {
}
public final void sendShutdownMessage(LocalGossipMember me, LocalGossipMember target){
public final void sendShutdownMessage(LocalMember me, LocalMember target){
if (target == null){
return;
}
@ -79,13 +79,13 @@ public abstract class AbstractActiveGossiper {
gossipCore.sendOneWay(m, target.getUri());
}
public final void sendSharedData(LocalGossipMember me, LocalGossipMember member){
public final void sendSharedData(LocalMember me, LocalMember member){
if (member == null){
return;
}
long startTime = System.currentTimeMillis();
for (Entry<String, SharedGossipDataMessage> innerEntry : gossipCore.getSharedData().entrySet()){
UdpSharedGossipDataMessage message = new UdpSharedGossipDataMessage();
for (Entry<String, SharedDataMessage> innerEntry : gossipCore.getSharedData().entrySet()){
UdpSharedDataMessage message = new UdpSharedDataMessage();
message.setUuid(UUID.randomUUID().toString());
message.setUriFrom(me.getId());
message.setExpireAt(innerEntry.getValue().getExpireAt());
@ -98,14 +98,14 @@ public abstract class AbstractActiveGossiper {
sharedDataHistogram.update(System.currentTimeMillis() - startTime);
}
public final void sendPerNodeData(LocalGossipMember me, LocalGossipMember member){
public final void sendPerNodeData(LocalMember me, LocalMember member){
if (member == null){
return;
}
long startTime = System.currentTimeMillis();
for (Entry<String, ConcurrentHashMap<String, GossipDataMessage>> entry : gossipCore.getPerNodeData().entrySet()){
for (Entry<String, GossipDataMessage> innerEntry : entry.getValue().entrySet()){
UdpGossipDataMessage message = new UdpGossipDataMessage();
for (Entry<String, ConcurrentHashMap<String, PerNodeDataMessage>> entry : gossipCore.getPerNodeData().entrySet()){
for (Entry<String, PerNodeDataMessage> innerEntry : entry.getValue().entrySet()){
UdpPerNodeDataMessage message = new UdpPerNodeDataMessage();
message.setUuid(UUID.randomUUID().toString());
message.setUriFrom(me.getId());
message.setExpireAt(innerEntry.getValue().getExpireAt());
@ -122,7 +122,7 @@ public abstract class AbstractActiveGossiper {
/**
* Performs the sending of the membership list, after we have incremented our own heartbeat.
*/
protected void sendMembershipList(LocalGossipMember me, LocalGossipMember member) {
protected void sendMembershipList(LocalMember me, LocalMember member) {
if (member == null){
return;
}
@ -132,7 +132,7 @@ public abstract class AbstractActiveGossiper {
message.setUriFrom(gossipManager.getMyself().getUri().toASCIIString());
message.setUuid(UUID.randomUUID().toString());
message.getMembers().add(convert(me));
for (LocalGossipMember other : gossipManager.getMembers().keySet()) {
for (LocalMember other : gossipManager.getMembers().keySet()) {
message.getMembers().add(convert(other));
}
Response r = gossipCore.send(message, member.getUri());
@ -144,8 +144,8 @@ public abstract class AbstractActiveGossiper {
sendMembershipHistorgram.update(System.currentTimeMillis() - startTime);
}
protected final GossipMember convert(LocalGossipMember member){
GossipMember gm = new GossipMember();
protected final Member convert(LocalMember member){
Member gm = new Member();
gm.setCluster(member.getClusterName());
gm.setHeartbeat(member.getHeartbeat());
gm.setUri(member.getUri().toASCIIString());
@ -160,8 +160,8 @@ public abstract class AbstractActiveGossiper {
* An immutable list
* @return The chosen LocalGossipMember to gossip with.
*/
protected LocalGossipMember selectPartner(List<LocalGossipMember> memberList) {
LocalGossipMember member = null;
protected LocalMember selectPartner(List<LocalMember> memberList) {
LocalMember member = null;
if (memberList.size() > 0) {
int randomNeighborIndex = random.nextInt(memberList.size());
member = memberList.get(randomNeighborIndex);

View File

@ -23,8 +23,8 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.gossip.model.GossipDataMessage;
import org.apache.gossip.model.SharedGossipDataMessage;
import org.apache.gossip.model.PerNodeDataMessage;
import org.apache.gossip.model.SharedDataMessage;
/**
* We wish to periodically sweep user data and remove entries past their timestamp. This
@ -53,7 +53,7 @@ public class DataReaper {
}
void runSharedOnce(){
for (Entry<String, SharedGossipDataMessage> entry : gossipCore.getSharedData().entrySet()){
for (Entry<String, SharedDataMessage> entry : gossipCore.getSharedData().entrySet()){
if (entry.getValue().getExpireAt() < clock.currentTimeMillis()){
gossipCore.getSharedData().remove(entry.getKey(), entry.getValue());
}
@ -61,13 +61,13 @@ public class DataReaper {
}
void runPerNodeOnce(){
for (Entry<String, ConcurrentHashMap<String, GossipDataMessage>> node : gossipCore.getPerNodeData().entrySet()){
for (Entry<String, ConcurrentHashMap<String, PerNodeDataMessage>> node : gossipCore.getPerNodeData().entrySet()){
reapData(node.getValue());
}
}
void reapData(ConcurrentHashMap<String, GossipDataMessage> concurrentHashMap){
for (Entry<String, GossipDataMessage> entry : concurrentHashMap.entrySet()){
void reapData(ConcurrentHashMap<String, PerNodeDataMessage> concurrentHashMap){
for (Entry<String, PerNodeDataMessage> entry : concurrentHashMap.entrySet()){
if (entry.getValue().getExpireAt() < clock.currentTimeMillis()){
concurrentHashMap.remove(entry.getKey(), entry.getValue());
}

View File

@ -27,7 +27,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.gossip.LocalGossipMember;
import org.apache.gossip.LocalMember;
import com.codahale.metrics.MetricRegistry;
@ -130,14 +130,14 @@ public class DatacenterRackAwareActiveGossiper extends AbstractActiveGossiper {
sendMembershipList(gossipManager.getMyself(), selectPartner(gossipManager.getDeadMembers()));
}
private List<LocalGossipMember> differentDataCenter(){
private List<LocalMember> differentDataCenter(){
String myDc = gossipManager.getMyself().getProperties().get(DATACENTER);
String rack = gossipManager.getMyself().getProperties().get(RACK);
if (myDc == null|| rack == null){
return Collections.emptyList();
}
List<LocalGossipMember> notMyDc = new ArrayList<LocalGossipMember>(10);
for (LocalGossipMember i : gossipManager.getLiveMembers()){
List<LocalMember> notMyDc = new ArrayList<LocalMember>(10);
for (LocalMember i : gossipManager.getLiveMembers()){
if (!myDc.equals(i.getProperties().get(DATACENTER))){
notMyDc.add(i);
}
@ -145,14 +145,14 @@ public class DatacenterRackAwareActiveGossiper extends AbstractActiveGossiper {
return notMyDc;
}
private List<LocalGossipMember> sameDatacenterDifferentRack(){
private List<LocalMember> sameDatacenterDifferentRack(){
String myDc = gossipManager.getMyself().getProperties().get(DATACENTER);
String rack = gossipManager.getMyself().getProperties().get(RACK);
if (myDc == null|| rack == null){
return Collections.emptyList();
}
List<LocalGossipMember> notMyDc = new ArrayList<LocalGossipMember>(10);
for (LocalGossipMember i : gossipManager.getLiveMembers()){
List<LocalMember> notMyDc = new ArrayList<LocalMember>(10);
for (LocalMember i : gossipManager.getLiveMembers()){
if (myDc.equals(i.getProperties().get(DATACENTER)) && !rack.equals(i.getProperties().get(RACK))){
notMyDc.add(i);
}
@ -160,14 +160,14 @@ public class DatacenterRackAwareActiveGossiper extends AbstractActiveGossiper {
return notMyDc;
}
private List<LocalGossipMember> sameRackNodes(){
private List<LocalMember> sameRackNodes(){
String myDc = gossipManager.getMyself().getProperties().get(DATACENTER);
String rack = gossipManager.getMyself().getProperties().get(RACK);
if (myDc == null|| rack == null){
return Collections.emptyList();
}
List<LocalGossipMember> sameDcAndRack = new ArrayList<LocalGossipMember>(10);
for (LocalGossipMember i : gossipManager.getLiveMembers()){
List<LocalMember> sameDcAndRack = new ArrayList<LocalMember>(10);
for (LocalMember i : gossipManager.getLiveMembers()){
if (myDc.equals(i.getProperties().get(DATACENTER))
&& rack.equals(i.getProperties().get(RACK))){
sameDcAndRack.add(i);
@ -177,7 +177,7 @@ public class DatacenterRackAwareActiveGossiper extends AbstractActiveGossiper {
}
private void sendToSameRackMember() {
LocalGossipMember i = selectPartner(sameRackNodes());
LocalMember i = selectPartner(sameRackNodes());
sendMembershipList(gossipManager.getMyself(), i);
}
@ -235,7 +235,7 @@ public class DatacenterRackAwareActiveGossiper extends AbstractActiveGossiper {
* sends an optimistic shutdown message to several clusters nodes
*/
protected void sendShutdownMessage(){
List<LocalGossipMember> l = gossipManager.getLiveMembers();
List<LocalMember> l = gossipManager.getLiveMembers();
int sendTo = l.size() < 3 ? 1 : l.size() / 3;
for (int i = 0; i < sendTo; i++) {
threadService.execute(() -> sendShutdownMessage(gossipManager.getMyself(), selectPartner(l)));

View File

@ -20,9 +20,9 @@ package org.apache.gossip.manager;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import org.apache.gossip.GossipMember;
import org.apache.gossip.LocalGossipMember;
import org.apache.gossip.RemoteGossipMember;
import org.apache.gossip.Member;
import org.apache.gossip.LocalMember;
import org.apache.gossip.RemoteMember;
import org.apache.gossip.crdt.Crdt;
import org.apache.gossip.event.GossipState;
import org.apache.gossip.model.*;
@ -49,8 +49,8 @@ public class GossipCore implements GossipCoreConstants {
private final GossipManager gossipManager;
private ConcurrentHashMap<String, Base> requests;
private ThreadPoolExecutor service;
private final ConcurrentHashMap<String, ConcurrentHashMap<String, GossipDataMessage>> perNodeData;
private final ConcurrentHashMap<String, SharedGossipDataMessage> sharedData;
private final ConcurrentHashMap<String, ConcurrentHashMap<String, PerNodeDataMessage>> perNodeData;
private final ConcurrentHashMap<String, SharedDataMessage> sharedData;
private final BlockingQueue<Runnable> workQueue;
private final PKCS8EncodedKeySpec privKeySpec;
private final PrivateKey privKey;
@ -113,14 +113,14 @@ public class GossipCore implements GossipCoreConstants {
}
@SuppressWarnings({ "unchecked", "rawtypes" })
public void addSharedData(SharedGossipDataMessage message) {
public void addSharedData(SharedDataMessage message) {
while (true){
SharedGossipDataMessage previous = sharedData.putIfAbsent(message.getKey(), message);
SharedDataMessage previous = sharedData.putIfAbsent(message.getKey(), message);
if (previous == null){
return;
}
if (message.getPayload() instanceof Crdt){
SharedGossipDataMessage merged = new SharedGossipDataMessage();
SharedDataMessage merged = new SharedDataMessage();
merged.setExpireAt(message.getExpireAt());
merged.setKey(message.getKey());
merged.setNodeId(message.getNodeId());
@ -144,12 +144,12 @@ public class GossipCore implements GossipCoreConstants {
}
}
public void addPerNodeData(GossipDataMessage message){
ConcurrentHashMap<String,GossipDataMessage> nodeMap = new ConcurrentHashMap<>();
public void addPerNodeData(PerNodeDataMessage message){
ConcurrentHashMap<String,PerNodeDataMessage> nodeMap = new ConcurrentHashMap<>();
nodeMap.put(message.getKey(), message);
nodeMap = perNodeData.putIfAbsent(message.getNodeId(), nodeMap);
if (nodeMap != null){
GossipDataMessage current = nodeMap.get(message.getKey());
PerNodeDataMessage current = nodeMap.get(message.getKey());
if (current == null){
nodeMap.putIfAbsent(message.getKey(), message);
} else {
@ -160,11 +160,11 @@ public class GossipCore implements GossipCoreConstants {
}
}
public ConcurrentHashMap<String, ConcurrentHashMap<String, GossipDataMessage>> getPerNodeData(){
public ConcurrentHashMap<String, ConcurrentHashMap<String, PerNodeDataMessage>> getPerNodeData(){
return perNodeData;
}
public ConcurrentHashMap<String, SharedGossipDataMessage> getSharedData() {
public ConcurrentHashMap<String, SharedDataMessage> getSharedData() {
return sharedData;
}
@ -314,12 +314,12 @@ public class GossipCore implements GossipCoreConstants {
* @param remoteList
*
*/
public void mergeLists(GossipManager gossipManager, RemoteGossipMember senderMember,
List<GossipMember> remoteList) {
public void mergeLists(GossipManager gossipManager, RemoteMember senderMember,
List<Member> remoteList) {
if (LOGGER.isDebugEnabled()){
debugState(senderMember, remoteList);
}
for (LocalGossipMember i : gossipManager.getDeadMembers()) {
for (LocalMember i : gossipManager.getDeadMembers()) {
if (i.getId().equals(senderMember.getId())) {
LOGGER.debug(gossipManager.getMyself() + " contacted by dead member " + senderMember.getUri());
i.recordHeartbeat(senderMember.getHeartbeat());
@ -327,11 +327,11 @@ public class GossipCore implements GossipCoreConstants {
//TODO consider forcing an UP here
}
}
for (GossipMember remoteMember : remoteList) {
for (Member remoteMember : remoteList) {
if (remoteMember.getId().equals(gossipManager.getMyself().getId())) {
continue;
}
LocalGossipMember aNewMember = new LocalGossipMember(remoteMember.getClusterName(),
LocalMember aNewMember = new LocalMember(remoteMember.getClusterName(),
remoteMember.getUri(),
remoteMember.getId(),
remoteMember.getHeartbeat(),
@ -342,7 +342,7 @@ public class GossipCore implements GossipCoreConstants {
aNewMember.recordHeartbeat(remoteMember.getHeartbeat());
Object result = gossipManager.getMembers().putIfAbsent(aNewMember, GossipState.UP);
if (result != null){
for (Entry<LocalGossipMember, GossipState> localMember : gossipManager.getMembers().entrySet()){
for (Entry<LocalMember, GossipState> localMember : gossipManager.getMembers().entrySet()){
if (localMember.getKey().getId().equals(remoteMember.getId())){
localMember.getKey().recordHeartbeat(remoteMember.getHeartbeat());
localMember.getKey().setHeartbeat(remoteMember.getHeartbeat());
@ -356,8 +356,8 @@ public class GossipCore implements GossipCoreConstants {
}
}
private void debugState(RemoteGossipMember senderMember,
List<GossipMember> remoteList){
private void debugState(RemoteMember senderMember,
List<Member> remoteList){
LOGGER.warn(
"-----------------------\n" +
"Me " + gossipManager.getMyself() + "\n" +
@ -369,13 +369,13 @@ public class GossipCore implements GossipCoreConstants {
}
@SuppressWarnings("rawtypes")
public Crdt merge(SharedGossipDataMessage message) {
public Crdt merge(SharedDataMessage message) {
for (;;){
SharedGossipDataMessage previous = sharedData.putIfAbsent(message.getKey(), message);
SharedDataMessage previous = sharedData.putIfAbsent(message.getKey(), message);
if (previous == null){
return (Crdt) message.getPayload();
}
SharedGossipDataMessage copy = new SharedGossipDataMessage();
SharedDataMessage copy = new SharedDataMessage();
copy.setExpireAt(message.getExpireAt());
copy.setKey(message.getKey());
copy.setNodeId(message.getNodeId());

View File

@ -19,16 +19,16 @@ package org.apache.gossip.manager;
import com.codahale.metrics.MetricRegistry;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.gossip.GossipMember;
import org.apache.gossip.Member;
import org.apache.gossip.GossipSettings;
import org.apache.gossip.LocalGossipMember;
import org.apache.gossip.LocalMember;
import org.apache.gossip.crdt.Crdt;
import org.apache.gossip.event.GossipListener;
import org.apache.gossip.event.GossipState;
import org.apache.gossip.manager.handlers.MessageInvoker;
import org.apache.gossip.manager.impl.OnlyProcessReceivedPassiveGossipThread;
import org.apache.gossip.model.GossipDataMessage;
import org.apache.gossip.model.SharedGossipDataMessage;
import org.apache.gossip.model.PerNodeDataMessage;
import org.apache.gossip.model.SharedDataMessage;
import org.apache.gossip.model.ShutdownMessage;
import org.apache.log4j.Logger;
@ -44,13 +44,12 @@ import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
public abstract class GossipManager {
public static final Logger LOGGER = Logger.getLogger(GossipManager.class);
private final ConcurrentSkipListMap<LocalGossipMember, GossipState> members;
private final LocalGossipMember me;
private final ConcurrentSkipListMap<LocalMember, GossipState> members;
private final LocalMember me;
private final GossipSettings settings;
private final AtomicBoolean gossipServiceRunning;
private final GossipListener listener;
@ -70,19 +69,19 @@ public abstract class GossipManager {
public GossipManager(String cluster,
URI uri, String id, Map<String, String> properties, GossipSettings settings,
List<GossipMember> gossipMembers, GossipListener listener, MetricRegistry registry,
List<Member> gossipMembers, GossipListener listener, MetricRegistry registry,
ObjectMapper objectMapper, MessageInvoker messageInvoker) {
this.settings = settings;
this.messageInvoker = messageInvoker;
clock = new SystemClock();
me = new LocalGossipMember(cluster, uri, id, clock.nanoTime(), properties,
me = new LocalMember(cluster, uri, id, clock.nanoTime(), properties,
settings.getWindowSize(), settings.getMinimumSamples(), settings.getDistribution());
gossipCore = new GossipCore(this, registry);
dataReaper = new DataReaper(gossipCore, clock);
members = new ConcurrentSkipListMap<>();
for (GossipMember startupMember : gossipMembers) {
for (Member startupMember : gossipMembers) {
if (!startupMember.equals(me)) {
LocalGossipMember member = new LocalGossipMember(startupMember.getClusterName(),
LocalMember member = new LocalMember(startupMember.getClusterName(),
startupMember.getUri(), startupMember.getId(),
clock.nanoTime(), startupMember.getProperties(), settings.getWindowSize(),
settings.getMinimumSamples(), settings.getDistribution());
@ -106,7 +105,7 @@ public abstract class GossipManager {
return messageInvoker;
}
public ConcurrentSkipListMap<LocalGossipMember, GossipState> getMembers() {
public ConcurrentSkipListMap<LocalMember, GossipState> getMembers() {
return members;
}
@ -117,7 +116,7 @@ public abstract class GossipManager {
/**
* @return a read only list of members found in the DOWN state.
*/
public List<LocalGossipMember> getDeadMembers() {
public List<LocalMember> getDeadMembers() {
return Collections.unmodifiableList(
members.entrySet()
.stream()
@ -129,7 +128,7 @@ public abstract class GossipManager {
*
* @return a read only list of members found in the UP state
*/
public List<LocalGossipMember> getLiveMembers() {
public List<LocalMember> getLiveMembers() {
return Collections.unmodifiableList(
members.entrySet()
.stream()
@ -137,7 +136,7 @@ public abstract class GossipManager {
.map(Entry::getKey).collect(Collectors.toList()));
}
public LocalGossipMember getMyself() {
public LocalMember getMyself() {
return me;
}
@ -164,7 +163,7 @@ public abstract class GossipManager {
scheduledServiced.scheduleAtFixedRate(userDataState, 60, 60, TimeUnit.SECONDS);
scheduledServiced.scheduleAtFixedRate(() -> {
try {
for (Entry<LocalGossipMember, GossipState> entry : members.entrySet()) {
for (Entry<LocalMember, GossipState> entry : members.entrySet()) {
boolean userDown = processOptomisticShutdown(entry);
if (userDown)
continue;
@ -205,8 +204,8 @@ public abstract class GossipManager {
* @param l member to consider
* @return true if node forced down
*/
public boolean processOptomisticShutdown(Entry<LocalGossipMember, GossipState> l){
GossipDataMessage m = findPerNodeGossipData(l.getKey().getId(), ShutdownMessage.PER_NODE_KEY);
public boolean processOptomisticShutdown(Entry<LocalMember, GossipState> l){
PerNodeDataMessage m = findPerNodeGossipData(l.getKey().getId(), ShutdownMessage.PER_NODE_KEY);
if (m == null){
return false;
}
@ -224,8 +223,8 @@ public abstract class GossipManager {
}
private void readSavedRingState() {
for (LocalGossipMember l : ringState.readFromDisk()){
LocalGossipMember member = new LocalGossipMember(l.getClusterName(),
for (LocalMember l : ringState.readFromDisk()){
LocalMember member = new LocalMember(l.getClusterName(),
l.getUri(), l.getId(),
clock.nanoTime(), l.getProperties(), settings.getWindowSize(),
settings.getMinimumSamples(), settings.getDistribution());
@ -234,12 +233,12 @@ public abstract class GossipManager {
}
private void readSavedDataState() {
for (Entry<String, ConcurrentHashMap<String, GossipDataMessage>> l : userDataState.readPerNodeFromDisk().entrySet()){
for (Entry<String, GossipDataMessage> j : l.getValue().entrySet()){
for (Entry<String, ConcurrentHashMap<String, PerNodeDataMessage>> l : userDataState.readPerNodeFromDisk().entrySet()){
for (Entry<String, PerNodeDataMessage> j : l.getValue().entrySet()){
gossipCore.addPerNodeData(j.getValue());
}
}
for (Entry<String, SharedGossipDataMessage> l: userDataState.readSharedDataFromDisk().entrySet()){
for (Entry<String, SharedDataMessage> l: userDataState.readSharedDataFromDisk().entrySet()){
gossipCore.addSharedData(l.getValue());
}
}
@ -276,7 +275,7 @@ public abstract class GossipManager {
scheduledServiced.shutdownNow();
}
public void gossipPerNodeData(GossipDataMessage message){
public void gossipPerNodeData(PerNodeDataMessage message){
Objects.nonNull(message.getKey());
Objects.nonNull(message.getTimestamp());
Objects.nonNull(message.getPayload());
@ -284,7 +283,7 @@ public abstract class GossipManager {
gossipCore.addPerNodeData(message);
}
public void gossipSharedData(SharedGossipDataMessage message){
public void gossipSharedData(SharedDataMessage message){
Objects.nonNull(message.getKey());
Objects.nonNull(message.getTimestamp());
Objects.nonNull(message.getPayload());
@ -295,7 +294,7 @@ public abstract class GossipManager {
@SuppressWarnings("rawtypes")
public Crdt findCrdt(String key){
SharedGossipDataMessage l = gossipCore.getSharedData().get(key);
SharedDataMessage l = gossipCore.getSharedData().get(key);
if (l == null){
return null;
}
@ -307,7 +306,7 @@ public abstract class GossipManager {
}
@SuppressWarnings("rawtypes")
public Crdt merge(SharedGossipDataMessage message){
public Crdt merge(SharedDataMessage message){
Objects.nonNull(message.getKey());
Objects.nonNull(message.getTimestamp());
Objects.nonNull(message.getPayload());
@ -318,12 +317,12 @@ public abstract class GossipManager {
return gossipCore.merge(message);
}
public GossipDataMessage findPerNodeGossipData(String nodeId, String key){
ConcurrentHashMap<String, GossipDataMessage> j = gossipCore.getPerNodeData().get(nodeId);
public PerNodeDataMessage findPerNodeGossipData(String nodeId, String key){
ConcurrentHashMap<String, PerNodeDataMessage> j = gossipCore.getPerNodeData().get(nodeId);
if (j == null){
return null;
} else {
GossipDataMessage l = j.get(key);
PerNodeDataMessage l = j.get(key);
if (l == null){
return null;
}
@ -334,8 +333,8 @@ public abstract class GossipManager {
}
}
public SharedGossipDataMessage findSharedGossipData(String key){
SharedGossipDataMessage l = gossipCore.getSharedData().get(key);
public SharedDataMessage findSharedGossipData(String key){
SharedDataMessage l = gossipCore.getSharedData().get(key);
if (l == null){
return null;
}

View File

@ -15,16 +15,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.manager.random;
package org.apache.gossip.manager;
import com.codahale.metrics.MetricRegistry;
import com.fasterxml.jackson.core.JsonGenerator.Feature;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.gossip.GossipMember;
import org.apache.gossip.Member;
import org.apache.gossip.GossipSettings;
import org.apache.gossip.StartupSettings;
import org.apache.gossip.crdt.CrdtModule;
import org.apache.gossip.event.GossipListener;
import org.apache.gossip.manager.GossipManager;
import org.apache.gossip.manager.handlers.DefaultMessageInvoker;
import org.apache.gossip.manager.handlers.MessageInvoker;
@ -34,7 +34,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class RandomGossipManager extends GossipManager {
public class GossipManagerBuilder {
public static ManagerBuilder newBuilder() {
return new ManagerBuilder();
@ -45,7 +45,7 @@ public class RandomGossipManager extends GossipManager {
private URI uri;
private String id;
private GossipSettings settings;
private List<GossipMember> gossipMembers;
private List<Member> gossipMembers;
private GossipListener listener;
private MetricRegistry registry;
private Map<String,String> properties;
@ -70,17 +70,26 @@ public class RandomGossipManager extends GossipManager {
return this;
}
public ManagerBuilder withId(String id) {
public ManagerBuilder id(String id) {
this.id = id;
return this;
}
public ManagerBuilder settings(GossipSettings settings) {
public ManagerBuilder gossipSettings(GossipSettings settings) {
this.settings = settings;
return this;
}
public ManagerBuilder startupSettings(StartupSettings startupSettings) {
this.cluster = startupSettings.getCluster();
this.id = startupSettings.getId();
this.settings = startupSettings.getGossipSettings();
this.gossipMembers = startupSettings.getGossipMembers();
this.uri = startupSettings.getUri();
return this;
}
public ManagerBuilder gossipMembers(List<GossipMember> members) {
public ManagerBuilder gossipMembers(List<Member> members) {
this.gossipMembers = members;
return this;
}
@ -110,12 +119,14 @@ public class RandomGossipManager extends GossipManager {
return this;
}
public RandomGossipManager build() {
public GossipManager build() {
checkArgument(id != null, "You must specify an id");
checkArgument(cluster != null, "You must specify a cluster name");
checkArgument(settings != null, "You must specify gossip settings");
checkArgument(uri != null, "You must specify a uri");
checkArgument(registry != null, "You must specify a MetricRegistry");
if (registry == null){
registry = new MetricRegistry();
}
if (properties == null){
properties = new HashMap<String,String>();
}
@ -133,13 +144,9 @@ public class RandomGossipManager extends GossipManager {
}
if (messageInvoker == null) {
messageInvoker = new DefaultMessageInvoker();
}
return new RandomGossipManager(cluster, uri, id, properties, settings, gossipMembers, listener, registry, objectMapper, messageInvoker);
}
return new GossipManager(cluster, uri, id, properties, settings, gossipMembers, listener, registry, objectMapper, messageInvoker) {} ;
}
}
private RandomGossipManager(String cluster, URI uri, String id, Map<String,String> properties, GossipSettings settings,
List<GossipMember> gossipMembers, GossipListener listener, MetricRegistry registry, ObjectMapper objectMapper, MessageInvoker messageInvoker) {
super(cluster, uri, id, properties, settings, gossipMembers, listener, registry, objectMapper, messageInvoker);
}
}

View File

@ -26,7 +26,7 @@ import java.util.Collections;
import java.util.List;
import java.util.NavigableSet;
import org.apache.gossip.LocalGossipMember;
import org.apache.gossip.LocalMember;
import org.apache.log4j.Logger;
public class RingStatePersister implements Runnable {
@ -52,7 +52,7 @@ public class RingStatePersister implements Runnable {
if (!parent.getSettings().isPersistRingState()){
return;
}
NavigableSet<LocalGossipMember> i = parent.getMembers().keySet();
NavigableSet<LocalMember> i = parent.getMembers().keySet();
try (FileOutputStream fos = new FileOutputStream(computeTarget())){
parent.getObjectMapper().writeValue(fos, i);
} catch (IOException e) {
@ -61,7 +61,7 @@ public class RingStatePersister implements Runnable {
}
@SuppressWarnings("unchecked")
List<LocalGossipMember> readFromDisk(){
List<LocalMember> readFromDisk(){
if (!parent.getSettings().isPersistRingState()){
return Collections.emptyList();
}

View File

@ -25,7 +25,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.gossip.LocalGossipMember;
import org.apache.gossip.LocalMember;
import com.codahale.metrics.MetricRegistry;
@ -88,12 +88,12 @@ public class SimpleActiveGossipper extends AbstractActiveGossiper {
}
protected void sendToALiveMember(){
LocalGossipMember member = selectPartner(gossipManager.getLiveMembers());
LocalMember member = selectPartner(gossipManager.getLiveMembers());
sendMembershipList(gossipManager.getMyself(), member);
}
protected void sendToDeadMember(){
LocalGossipMember member = selectPartner(gossipManager.getDeadMembers());
LocalMember member = selectPartner(gossipManager.getDeadMembers());
sendMembershipList(gossipManager.getMyself(), member);
}
@ -101,7 +101,7 @@ public class SimpleActiveGossipper extends AbstractActiveGossiper {
* sends an optimistic shutdown message to several clusters nodes
*/
protected void sendShutdownMessage(){
List<LocalGossipMember> l = gossipManager.getLiveMembers();
List<LocalMember> l = gossipManager.getLiveMembers();
int sendTo = l.size() < 3 ? 1 : l.size() / 2;
for (int i = 0; i < sendTo; i++) {
threadService.execute(() -> sendShutdownMessage(gossipManager.getMyself(), selectPartner(l)));

View File

@ -23,8 +23,8 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.gossip.model.GossipDataMessage;
import org.apache.gossip.model.SharedGossipDataMessage;
import org.apache.gossip.model.PerNodeDataMessage;
import org.apache.gossip.model.SharedDataMessage;
import org.apache.log4j.Logger;
public class UserDataPersister implements Runnable {
@ -49,16 +49,16 @@ public class UserDataPersister implements Runnable {
}
@SuppressWarnings("unchecked")
ConcurrentHashMap<String, ConcurrentHashMap<String, GossipDataMessage>> readPerNodeFromDisk(){
ConcurrentHashMap<String, ConcurrentHashMap<String, PerNodeDataMessage>> readPerNodeFromDisk(){
if (!parent.getSettings().isPersistDataState()){
return new ConcurrentHashMap<String, ConcurrentHashMap<String, GossipDataMessage>>();
return new ConcurrentHashMap<String, ConcurrentHashMap<String, PerNodeDataMessage>>();
}
try (FileInputStream fos = new FileInputStream(computePerNodeTarget())){
return parent.getObjectMapper().readValue(fos, ConcurrentHashMap.class);
} catch (IOException e) {
LOGGER.debug(e);
}
return new ConcurrentHashMap<String, ConcurrentHashMap<String, GossipDataMessage>>();
return new ConcurrentHashMap<String, ConcurrentHashMap<String, PerNodeDataMessage>>();
}
void writePerNodeToDisk(){
@ -84,16 +84,16 @@ public class UserDataPersister implements Runnable {
}
@SuppressWarnings("unchecked")
ConcurrentHashMap<String, SharedGossipDataMessage> readSharedDataFromDisk(){
ConcurrentHashMap<String, SharedDataMessage> readSharedDataFromDisk(){
if (!parent.getSettings().isPersistRingState()){
return new ConcurrentHashMap<String, SharedGossipDataMessage>();
return new ConcurrentHashMap<String, SharedDataMessage>();
}
try (FileInputStream fos = new FileInputStream(computeSharedTarget())){
return parent.getObjectMapper().readValue(fos, ConcurrentHashMap.class);
} catch (IOException e) {
LOGGER.debug(e);
}
return new ConcurrentHashMap<String, SharedGossipDataMessage>();
return new ConcurrentHashMap<String, SharedDataMessage>();
}
/**

View File

@ -17,8 +17,8 @@
*/
package org.apache.gossip.manager.handlers;
import org.apache.gossip.GossipMember;
import org.apache.gossip.RemoteGossipMember;
import org.apache.gossip.Member;
import org.apache.gossip.RemoteMember;
import org.apache.gossip.manager.GossipCore;
import org.apache.gossip.manager.GossipManager;
import org.apache.gossip.model.Base;
@ -34,8 +34,8 @@ import java.util.List;
public class ActiveGossipMessageHandler implements MessageHandler {
@Override
public void invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
List<GossipMember> remoteGossipMembers = new ArrayList<>();
RemoteGossipMember senderMember = null;
List<Member> remoteGossipMembers = new ArrayList<>();
RemoteMember senderMember = null;
UdpActiveGossipMessage activeGossipMessage = (UdpActiveGossipMessage) base;
for (int i = 0; i < activeGossipMessage.getMembers().size(); i++) {
URI u;
@ -45,7 +45,7 @@ public class ActiveGossipMessageHandler implements MessageHandler {
GossipCore.LOGGER.debug("Gossip message with faulty URI", e);
continue;
}
RemoteGossipMember member = new RemoteGossipMember(
RemoteMember member = new RemoteMember(
activeGossipMessage.getMembers().get(i).getCluster(),
u,
activeGossipMessage.getMembers().get(i).getId(),

View File

@ -29,8 +29,8 @@ public class DefaultMessageInvoker implements MessageInvoker {
mic = new MessageInvokerCombiner();
mic.add(new SimpleMessageInvoker(Response.class, new ResponseHandler()));
mic.add(new SimpleMessageInvoker(ShutdownMessage.class, new ShutdownMessageHandler()));
mic.add(new SimpleMessageInvoker(GossipDataMessage.class, new GossipDataMessageHandler()));
mic.add(new SimpleMessageInvoker(SharedGossipDataMessage.class, new SharedGossipDataMessageHandler()));
mic.add(new SimpleMessageInvoker(PerNodeDataMessage.class, new PerNodeDataMessageHandler()));
mic.add(new SimpleMessageInvoker(SharedDataMessage.class, new SharedDataMessageHandler()));
mic.add(new SimpleMessageInvoker(ActiveGossipMessage.class, new ActiveGossipMessageHandler()));
}

View File

@ -20,12 +20,12 @@ package org.apache.gossip.manager.handlers;
import org.apache.gossip.manager.GossipCore;
import org.apache.gossip.manager.GossipManager;
import org.apache.gossip.model.Base;
import org.apache.gossip.udp.UdpGossipDataMessage;
import org.apache.gossip.udp.UdpPerNodeDataMessage;
public class GossipDataMessageHandler implements MessageHandler {
public class PerNodeDataMessageHandler implements MessageHandler {
@Override
public void invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
UdpGossipDataMessage message = (UdpGossipDataMessage) base;
UdpPerNodeDataMessage message = (UdpPerNodeDataMessage) base;
gossipCore.addPerNodeData(message);
}
}

View File

@ -20,12 +20,12 @@ package org.apache.gossip.manager.handlers;
import org.apache.gossip.manager.GossipCore;
import org.apache.gossip.manager.GossipManager;
import org.apache.gossip.model.Base;
import org.apache.gossip.udp.UdpSharedGossipDataMessage;
import org.apache.gossip.udp.UdpSharedDataMessage;
public class SharedGossipDataMessageHandler implements MessageHandler{
public class SharedDataMessageHandler implements MessageHandler{
@Override
public void invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
UdpSharedGossipDataMessage message = (UdpSharedGossipDataMessage) base;
UdpSharedDataMessage message = (UdpSharedDataMessage) base;
gossipCore.addSharedData(message);
}
}

View File

@ -20,14 +20,14 @@ package org.apache.gossip.manager.handlers;
import org.apache.gossip.manager.GossipCore;
import org.apache.gossip.manager.GossipManager;
import org.apache.gossip.model.Base;
import org.apache.gossip.model.GossipDataMessage;
import org.apache.gossip.model.PerNodeDataMessage;
import org.apache.gossip.model.ShutdownMessage;
public class ShutdownMessageHandler implements MessageHandler {
@Override
public void invoke(GossipCore gossipCore, GossipManager gossipManager, Base base) {
ShutdownMessage s = (ShutdownMessage) base;
GossipDataMessage m = new GossipDataMessage();
PerNodeDataMessage m = new PerNodeDataMessage();
m.setKey(ShutdownMessage.PER_NODE_KEY);
m.setNodeId(s.getNodeId());
m.setPayload(base);