GOSSIP-36 Persist ring state
This commit is contained in:
@ -180,7 +180,8 @@ public class DatacenterRackAwareActiveGossiper extends AbstractActiveGossiper {
|
||||
}
|
||||
|
||||
private void sendToSameRackMember() {
|
||||
sendMembershipList(gossipManager.getMyself(), selectPartner(sameRackNodes()));
|
||||
LocalGossipMember i = selectPartner(sameRackNodes());
|
||||
sendMembershipList(gossipManager.getMyself(), i);
|
||||
}
|
||||
|
||||
private void sendToSameRackMemberPerNode() {
|
||||
|
@ -54,35 +54,24 @@ public abstract class GossipManager {
|
||||
public static final Logger LOGGER = Logger.getLogger(GossipManager.class);
|
||||
|
||||
private final ConcurrentSkipListMap<LocalGossipMember, GossipState> members;
|
||||
|
||||
private final LocalGossipMember me;
|
||||
|
||||
private final GossipSettings settings;
|
||||
|
||||
private final AtomicBoolean gossipServiceRunning;
|
||||
|
||||
private final GossipListener listener;
|
||||
|
||||
private AbstractActiveGossiper activeGossipThread;
|
||||
|
||||
private PassiveGossipThread passiveGossipThread;
|
||||
|
||||
private ExecutorService gossipThreadExecutor;
|
||||
|
||||
private final GossipCore gossipCore;
|
||||
|
||||
private final DataReaper dataReaper;
|
||||
|
||||
private final Clock clock;
|
||||
|
||||
private final ScheduledExecutorService scheduledServiced;
|
||||
|
||||
private MetricRegistry registry;
|
||||
|
||||
private final MetricRegistry registry;
|
||||
private final RingStatePersister ringState;
|
||||
private final UserDataPersister userDataState;
|
||||
|
||||
public GossipManager(String cluster,
|
||||
URI uri, String id, Map<String,String> properties, GossipSettings settings,
|
||||
List<GossipMember> gossipMembers, GossipListener listener, MetricRegistry registry) {
|
||||
|
||||
this.settings = settings;
|
||||
gossipCore = new GossipCore(this, registry);
|
||||
clock = new SystemClock();
|
||||
@ -105,6 +94,10 @@ public abstract class GossipManager {
|
||||
this.listener = listener;
|
||||
this.scheduledServiced = Executors.newScheduledThreadPool(1);
|
||||
this.registry = registry;
|
||||
this.ringState = new RingStatePersister(this);
|
||||
this.userDataState = new UserDataPersister(this, this.gossipCore);
|
||||
readSavedRingState();
|
||||
readSavedDataState();
|
||||
}
|
||||
|
||||
public ConcurrentSkipListMap<LocalGossipMember, GossipState> getMembers() {
|
||||
@ -150,6 +143,7 @@ public abstract class GossipManager {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Starts the client. Specifically, start the various cycles for this protocol. Start the gossip
|
||||
* thread and start the receiver thread.
|
||||
@ -160,13 +154,14 @@ public abstract class GossipManager {
|
||||
activeGossipThread = constructActiveGossiper();
|
||||
activeGossipThread.init();
|
||||
dataReaper.init();
|
||||
scheduledServiced.scheduleAtFixedRate(ringState, 60, 60, TimeUnit.SECONDS);
|
||||
scheduledServiced.scheduleAtFixedRate(userDataState, 60, 60, TimeUnit.SECONDS);
|
||||
scheduledServiced.scheduleAtFixedRate(() -> {
|
||||
try {
|
||||
for (Entry<LocalGossipMember, GossipState> entry : members.entrySet()) {
|
||||
Double result = null;
|
||||
try {
|
||||
result = entry.getKey().detect(clock.nanoTime());
|
||||
//System.out.println(entry.getKey() +" "+ result);
|
||||
if (result != null) {
|
||||
if (result > settings.getConvictThreshold() && entry.getValue() == GossipState.UP) {
|
||||
members.put(entry.getKey(), GossipState.DOWN);
|
||||
@ -195,6 +190,27 @@ public abstract class GossipManager {
|
||||
LOGGER.debug("The GossipManager is started.");
|
||||
}
|
||||
|
||||
private void readSavedRingState() {
|
||||
for (LocalGossipMember l : ringState.readFromDisk()){
|
||||
LocalGossipMember member = new LocalGossipMember(l.getClusterName(),
|
||||
l.getUri(), l.getId(),
|
||||
clock.nanoTime(), l.getProperties(), settings.getWindowSize(),
|
||||
settings.getMinimumSamples(), settings.getDistribution());
|
||||
members.putIfAbsent(member, GossipState.DOWN);
|
||||
}
|
||||
}
|
||||
|
||||
private void readSavedDataState() {
|
||||
for (Entry<String, ConcurrentHashMap<String, GossipDataMessage>> l : userDataState.readPerNodeFromDisk().entrySet()){
|
||||
for (Entry<String, GossipDataMessage> j : l.getValue().entrySet()){
|
||||
gossipCore.addPerNodeData(j.getValue());
|
||||
}
|
||||
}
|
||||
for (Entry<String, SharedGossipDataMessage> l: userDataState.readSharedDataFromDisk().entrySet()){
|
||||
gossipCore.addSharedData(l.getValue());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Shutdown the gossip service.
|
||||
*/
|
||||
@ -217,6 +233,14 @@ public abstract class GossipManager {
|
||||
} catch (InterruptedException e) {
|
||||
LOGGER.error(e);
|
||||
}
|
||||
gossipThreadExecutor.shutdownNow();
|
||||
scheduledServiced.shutdown();
|
||||
try {
|
||||
scheduledServiced.awaitTermination(1, TimeUnit.SECONDS);
|
||||
} catch (InterruptedException e) {
|
||||
LOGGER.error(e);
|
||||
}
|
||||
scheduledServiced.shutdownNow();
|
||||
}
|
||||
|
||||
public void gossipPerNodeData(GossipDataMessage message){
|
||||
@ -266,6 +290,13 @@ public abstract class GossipManager {
|
||||
public DataReaper getDataReaper() {
|
||||
return dataReaper;
|
||||
}
|
||||
|
||||
public RingStatePersister getRingState() {
|
||||
return ringState;
|
||||
}
|
||||
|
||||
public UserDataPersister getUserDataState() {
|
||||
return userDataState;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -0,0 +1,81 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.gossip.manager;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.NavigableSet;
|
||||
|
||||
import org.apache.gossip.LocalGossipMember;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
public class RingStatePersister implements Runnable {
|
||||
|
||||
private static final Logger LOGGER = Logger.getLogger(RingStatePersister.class);
|
||||
private static final ObjectMapper MAPPER = new ObjectMapper();
|
||||
private static final TypeReference<ArrayList<LocalGossipMember>> REF
|
||||
= new TypeReference<ArrayList<LocalGossipMember>>() { };
|
||||
private GossipManager parent;
|
||||
|
||||
public RingStatePersister(GossipManager parent){
|
||||
this.parent = parent;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
writeToDisk();
|
||||
}
|
||||
|
||||
File computeTarget(){
|
||||
return new File(parent.getSettings().getPathToRingState(), "ringstate." + parent.getMyself().getClusterName() + "."
|
||||
+ parent.getMyself().getId() + ".json");
|
||||
}
|
||||
|
||||
void writeToDisk(){
|
||||
if (!parent.getSettings().isPersistRingState()){
|
||||
return;
|
||||
}
|
||||
NavigableSet<LocalGossipMember> i = parent.getMembers().keySet();
|
||||
try (FileOutputStream fos = new FileOutputStream(computeTarget())){
|
||||
MAPPER.writeValue(fos, i);
|
||||
} catch (IOException e) {
|
||||
LOGGER.debug(e);
|
||||
}
|
||||
}
|
||||
|
||||
List<LocalGossipMember> readFromDisk(){
|
||||
if (!parent.getSettings().isPersistRingState()){
|
||||
return Collections.emptyList();
|
||||
}
|
||||
try (FileInputStream fos = new FileInputStream(computeTarget())){
|
||||
return MAPPER.readValue(fos, REF);
|
||||
} catch (IOException e) {
|
||||
LOGGER.debug(e);
|
||||
}
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
}
|
111
src/main/java/org/apache/gossip/manager/UserDataPersister.java
Normal file
111
src/main/java/org/apache/gossip/manager/UserDataPersister.java
Normal file
@ -0,0 +1,111 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.gossip.manager;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import org.apache.gossip.model.GossipDataMessage;
|
||||
import org.apache.gossip.model.SharedGossipDataMessage;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
public class UserDataPersister implements Runnable {
|
||||
|
||||
private static final Logger LOGGER = Logger.getLogger(UserDataPersister.class);
|
||||
private static final ObjectMapper MAPPER = new ObjectMapper();
|
||||
private final GossipManager parent;
|
||||
private final GossipCore gossipCore;
|
||||
|
||||
UserDataPersister(GossipManager parent, GossipCore gossipCore){
|
||||
this.parent = parent;
|
||||
this.gossipCore = gossipCore;
|
||||
MAPPER.enableDefaultTyping();
|
||||
}
|
||||
|
||||
File computeSharedTarget(){
|
||||
return new File(parent.getSettings().getPathToDataState(), "shareddata."
|
||||
+ parent.getMyself().getClusterName() + "." + parent.getMyself().getId() + ".json");
|
||||
}
|
||||
|
||||
File computePerNodeTarget() {
|
||||
return new File(parent.getSettings().getPathToDataState(), "pernodedata."
|
||||
+ parent.getMyself().getClusterName() + "." + parent.getMyself().getId() + ".json");
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
ConcurrentHashMap<String, ConcurrentHashMap<String, GossipDataMessage>> readPerNodeFromDisk(){
|
||||
if (!parent.getSettings().isPersistDataState()){
|
||||
return new ConcurrentHashMap<String, ConcurrentHashMap<String, GossipDataMessage>>();
|
||||
}
|
||||
try (FileInputStream fos = new FileInputStream(computePerNodeTarget())){
|
||||
return MAPPER.readValue(fos, ConcurrentHashMap.class);
|
||||
} catch (IOException e) {
|
||||
LOGGER.debug(e);
|
||||
}
|
||||
return new ConcurrentHashMap<String, ConcurrentHashMap<String, GossipDataMessage>>();
|
||||
}
|
||||
|
||||
void writePerNodeToDisk(){
|
||||
if (!parent.getSettings().isPersistDataState()){
|
||||
return;
|
||||
}
|
||||
try (FileOutputStream fos = new FileOutputStream(computePerNodeTarget())){
|
||||
MAPPER.writeValue(fos, gossipCore.getPerNodeData());
|
||||
} catch (IOException e) {
|
||||
LOGGER.warn(e);
|
||||
}
|
||||
}
|
||||
|
||||
void writeSharedToDisk(){
|
||||
if (!parent.getSettings().isPersistDataState()){
|
||||
return;
|
||||
}
|
||||
try (FileOutputStream fos = new FileOutputStream(computeSharedTarget())){
|
||||
MAPPER.writeValue(fos, gossipCore.getSharedData());
|
||||
} catch (IOException e) {
|
||||
LOGGER.warn(e);
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
ConcurrentHashMap<String, SharedGossipDataMessage> readSharedDataFromDisk(){
|
||||
if (!parent.getSettings().isPersistRingState()){
|
||||
return new ConcurrentHashMap<String, SharedGossipDataMessage>();
|
||||
}
|
||||
try (FileInputStream fos = new FileInputStream(computeSharedTarget())){
|
||||
return MAPPER.readValue(fos, ConcurrentHashMap.class);
|
||||
} catch (IOException e) {
|
||||
LOGGER.debug(e);
|
||||
}
|
||||
return new ConcurrentHashMap<String, SharedGossipDataMessage>();
|
||||
}
|
||||
|
||||
/**
|
||||
* Writes all pernode and shared data to disk
|
||||
*/
|
||||
@Override
|
||||
public void run() {
|
||||
writePerNodeToDisk();
|
||||
writeSharedToDisk();
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user