GOSSIP-75 Vote based locking

This commit is contained in:
Mirage Abeysekara
2017-07-14 22:19:16 +05:30
parent ac83038932
commit 5ed3ed85cf
13 changed files with 1128 additions and 3 deletions

View File

@ -17,6 +17,8 @@
*/
package org.apache.gossip;
import org.apache.gossip.lock.LockManagerSettings;
import java.util.HashMap;
import java.util.Map;
@ -61,7 +63,10 @@ public class GossipSettings {
private String pathToKeyStore = "./keys";
private boolean signMessages = false;
// Settings related to lock manager
private LockManagerSettings lockManagerSettings = LockManagerSettings
.getLockManagerDefaultSettings();
/**
* Construct GossipSettings with default settings.
@ -242,4 +247,15 @@ public class GossipSettings {
this.protocolManagerClass = protocolManagerClass;
}
public LockManagerSettings getLockManagerSettings() {
return lockManagerSettings;
}
/**
* Set the lock settings use by the lock manager
* @param lockManagerSettings lock settings. This object cannot be null.
*/
public void setLockManagerSettings(LockManagerSettings lockManagerSettings) {
this.lockManagerSettings = lockManagerSettings;
}
}

View File

@ -24,6 +24,9 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.core.Version;
import com.fasterxml.jackson.databind.module.SimpleModule;
import org.apache.gossip.LocalMember;
import org.apache.gossip.lock.vote.MajorityVote;
import org.apache.gossip.lock.vote.Vote;
import org.apache.gossip.lock.vote.VoteCandidate;
import org.apache.gossip.replication.BlackListReplicable;
import org.apache.gossip.replication.Replicable;
import org.apache.gossip.replication.WhiteListReplicable;
@ -107,6 +110,31 @@ abstract class BlackListReplicableMixin {
@JsonProperty("blackListMembers") abstract List<LocalMember> getBlackListMembers();
}
abstract class VoteCandidateMixin {
@JsonCreator
VoteCandidateMixin(
@JsonProperty("candidateNodeId") String candidateNodeId,
@JsonProperty("votingKey") String votingKey,
@JsonProperty("votes") Map<String, Vote> votes
) { }
}
abstract class VoteMixin {
@JsonCreator
VoteMixin(
@JsonProperty("votingNode") String votingNode,
@JsonProperty("voteValue") Boolean voteValue,
@JsonProperty("voteExchange") Boolean voteExchange,
@JsonProperty("liveMembers") List<String> liveMembers,
@JsonProperty("deadMembers") List<String> deadMembers
) { }
}
abstract class MajorityVoteMixin<E>{
@JsonCreator
MajorityVoteMixin(@JsonProperty("voteCandidates") Map<String, VoteCandidate> voteCandidateMap){ }
}
//If anyone wants to take a stab at this. please have at it
//https://github.com/FasterXML/jackson-datatype-guava/blob/master/src/main/java/com/fasterxml/jackson/datatype/guava/ser/MultimapSerializer.java
public class CrdtModule extends SimpleModule {
@ -130,6 +158,9 @@ public class CrdtModule extends SimpleModule {
context.setMixInAnnotations(Replicable.class, ReplicableMixin.class);
context.setMixInAnnotations(WhiteListReplicable.class, WhiteListReplicableMixin.class);
context.setMixInAnnotations(BlackListReplicable.class, BlackListReplicableMixin.class);
context.setMixInAnnotations(MajorityVote.class, MajorityVoteMixin.class);
context.setMixInAnnotations(VoteCandidate.class, VoteCandidateMixin.class);
context.setMixInAnnotations(Vote.class, VoteMixin.class);
}
}

View File

@ -0,0 +1,318 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.lock;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import org.apache.gossip.Member;
import org.apache.gossip.lock.exceptions.VoteFailedException;
import org.apache.gossip.lock.vote.MajorityVote;
import org.apache.gossip.lock.vote.Vote;
import org.apache.gossip.lock.vote.VoteCandidate;
import org.apache.gossip.manager.GossipManager;
import org.apache.gossip.model.SharedDataMessage;
import org.apache.log4j.Logger;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
public class LockManager {
public static final Logger LOGGER = Logger.getLogger(LockManager.class);
private final GossipManager gossipManager;
private final LockManagerSettings lockSettings;
private final ScheduledExecutorService voteService;
private final AtomicInteger numberOfNodes;
private final Set<String> lockKeys;
// For MetricRegistry
public static final String LOCK_KEY_SET_SIZE = "gossip.lock.key_set_size";
public static final String LOCK_TIME = "gossip.lock.time";
private final Timer lockTimeMetric;
public LockManager(GossipManager gossipManager, final LockManagerSettings lockManagerSettings,
MetricRegistry metrics) {
this.gossipManager = gossipManager;
this.lockSettings = lockManagerSettings;
this.numberOfNodes = new AtomicInteger(lockSettings.getNumberOfNodes());
this.lockKeys = new CopyOnWriteArraySet<>();
metrics.register(LOCK_KEY_SET_SIZE, (Gauge<Integer>) lockKeys::size);
lockTimeMetric = metrics.timer(LOCK_TIME);
// Register listener for lock keys
gossipManager.registerSharedDataSubscriber((key, oldValue, newValue) -> {
if (key.contains("lock/")) {
lockKeys.add(key);
}
});
voteService = Executors.newScheduledThreadPool(2);
voteService.scheduleAtFixedRate(this::updateVotes, 0, lockSettings.getVoteUpdateInterval(),
TimeUnit.MILLISECONDS);
}
public void acquireSharedDataLock(String key) throws VoteFailedException {
final Timer.Context context = lockTimeMetric.time();
gossipManager.merge(generateLockMessage(key));
int deadlockDetectCount = 0;
while (true) {
SharedDataMessage message = gossipManager.findSharedGossipData(generateLockKey(key));
if (message == null || !(message.getPayload() instanceof MajorityVote)) {
continue;
}
MajorityVote majorityVoteResult = (MajorityVote) message.getPayload();
final Map<String, VoteCandidate> voteCandidatesMap = majorityVoteResult.value();
final Map<String, Boolean> voteResultMap = new HashMap<>();
// Store the vote result for each vote candidate nodes
voteCandidatesMap.forEach((candidateId, voteCandidate) -> voteResultMap
.put(candidateId, isVoteSuccess(voteCandidate)));
long passedCandidates = voteResultMap.values().stream().filter(aBoolean -> aBoolean).count();
String myNodeId = gossipManager.getMyself().getId();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("NodeId=" + myNodeId + ", VoteMap=" + voteResultMap + ", WinnerCount="
+ passedCandidates);
}
// Check for possible dead lock when no candidates were won
if (passedCandidates == 0) {
if (isDeadLock(voteCandidatesMap)) {
deadlockDetectCount++;
// Testing for deadlock is not always correct, therefore test for continues deadlocks
if (deadlockDetectCount >= lockSettings.getDeadlockDetectionThreshold()) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Deadlock detected from node " + myNodeId + ". VoteCandidatesMap="
+ voteCandidatesMap);
}
preventDeadLock(voteCandidatesMap);
}
} else {
deadlockDetectCount = 0;
}
} else if (passedCandidates == 1 && voteResultMap.containsKey(myNodeId)) {
context.stop();
if (voteResultMap.get(myNodeId)) {
// There is one winner and that is my node, therefore break the while loop and continue
break;
} else {
throw new VoteFailedException("Node " + myNodeId + " failed to lock on key: " + key);
}
} else if (passedCandidates > 1) {
// Multiple winners are not possible
context.stop();
throw new IllegalStateException("Multiple nodes get voted.");
}
try {
Thread.sleep(lockSettings.getResultCalculationDelay());
} catch (InterruptedException e) {
throw new VoteFailedException("Node " + myNodeId + " failed to lock on key: " + key, e);
}
}
}
// Generate Crdt lock message for voting
private SharedDataMessage generateLockMessage(String key) {
VoteCandidate voteCandidate = new VoteCandidate(gossipManager.getMyself().getId(), key,
new ConcurrentHashMap<>());
voteCandidate.addVote(new Vote(gossipManager.getMyself().getId(), true, false,
gossipManager.getLiveMembers().stream().map(Member::getId).collect(Collectors.toList()),
gossipManager.getDeadMembers().stream().map(Member::getId)
.collect(Collectors.toList())));
Map<String, VoteCandidate> voteCandidateMap = new ConcurrentHashMap<>();
voteCandidateMap.put(voteCandidate.getCandidateNodeId(), voteCandidate);
MajorityVote majorityVote = new MajorityVote(voteCandidateMap);
SharedDataMessage lockMessage = new SharedDataMessage();
lockMessage.setKey(generateLockKey(key));
lockMessage.setPayload(majorityVote);
lockMessage.setExpireAt(Long.MAX_VALUE);
lockMessage.setTimestamp(System.currentTimeMillis());
return lockMessage;
}
// This method will run periodically to vote the other nodes
private void updateVotes() {
for (String lockKey : lockKeys) {
SharedDataMessage message = gossipManager.findSharedGossipData(lockKey);
if (message == null || !(message.getPayload() instanceof MajorityVote)) {
continue;
}
MajorityVote majorityVote = (MajorityVote) message.getPayload();
Map<String, VoteCandidate> voteCandidateMap = majorityVote.value();
String myNodeId = gossipManager.getMyself().getId();
// No need to vote if my node is already voted to every node for the key
if (isVotedToAll(myNodeId, voteCandidateMap)) {
continue;
}
String myVoteCandidate = getVotedCandidateNodeId(myNodeId, voteCandidateMap);
if (myVoteCandidate == null) {
myVoteCandidate = lockSettings.getVoteSelector().getVoteCandidateId(voteCandidateMap.keySet());
}
for (VoteCandidate voteCandidate : voteCandidateMap.values()) {
if (voteCandidate.getCandidateNodeId().equals(myNodeId)) {
continue;
}
// Vote for selected candidate
boolean voteResult = voteCandidate.getCandidateNodeId().equals(myVoteCandidate);
voteCandidate.addVote(new Vote(gossipManager.getMyself().getId(), voteResult, false,
gossipManager.getLiveMembers().stream().map(Member::getId)
.collect(Collectors.toList()),
gossipManager.getDeadMembers().stream().map(Member::getId)
.collect(Collectors.toList())));
}
}
}
// Return true if every node has a vote from given node id.
private boolean isVotedToAll(String nodeId, final Map<String, VoteCandidate> voteCandidates) {
int voteCount = 0;
for (VoteCandidate voteCandidate : voteCandidates.values()) {
if (voteCandidate.getVotes().containsKey(nodeId)) {
voteCount++;
}
}
return voteCount == voteCandidates.size();
}
// Returns true if there is a deadlock for given vote candidates
private boolean isDeadLock(final Map<String, VoteCandidate> voteCandidates) {
boolean result = true;
int numberOfLiveNodes;
if (numberOfNodes.get() > 0) {
numberOfLiveNodes = numberOfNodes.get();
} else {
// numberOfNodes is not set by the user, therefore calculate it.
Set<String> liveNodes = voteCandidates.values().stream()
.map(voteCandidate -> voteCandidate.getVotes().values()).flatMap(Collection::stream)
.map(Vote::getLiveMembers).flatMap(List::stream).collect(Collectors.toSet());
numberOfLiveNodes = liveNodes.size();
}
for (VoteCandidate voteCandidate : voteCandidates.values()) {
result = result && voteCandidate.getVotes().size() == numberOfLiveNodes;
}
return result;
}
// Prevent the deadlock by giving up the votes
private void preventDeadLock(Map<String, VoteCandidate> voteCandidates) {
String myNodeId = gossipManager.getMyself().getId();
VoteCandidate myResults = voteCandidates.get(myNodeId);
if (myResults == null) {
return;
}
// Set of nodes that is going to receive this nodes votes
List<String> donateCandidateIds = voteCandidates.keySet().stream()
.filter(s -> s.compareTo(myNodeId) < 0).collect(Collectors.toList());
if (donateCandidateIds.size() == 0) {
return;
}
// Select a random node to donate
Random randomizer = new Random();
String selectedCandidateId = donateCandidateIds
.get(randomizer.nextInt(donateCandidateIds.size()));
VoteCandidate selectedCandidate = voteCandidates.get(selectedCandidateId);
Set<Vote> myVotes = new HashSet<>(myResults.getVotes().values());
Set<Vote> selectedCandidateVotes = new HashSet<>(selectedCandidate.getVotes().values());
// Exchange the votes
for (Vote myVote : myVotes) {
for (Vote candidateVote : selectedCandidateVotes) {
if (myVote.getVoteValue() && myVote.getVotingNode().equals(candidateVote.getVotingNode())) {
myVote.setVoteExchange(true);
candidateVote.setVoteExchange(true);
selectedCandidate.getVotes().put(myVote.getVotingNode(), myVote);
myResults.getVotes().put(candidateVote.getVotingNode(), candidateVote);
}
}
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Node " + myNodeId + " give up votes to node " + selectedCandidateId);
}
}
private String getVotedCandidateNodeId(String nodeId,
final Map<String, VoteCandidate> voteCandidates) {
for (VoteCandidate voteCandidate : voteCandidates.values()) {
Vote vote = voteCandidate.getVotes().get(nodeId);
if (vote != null && vote.getVoteValue()) {
return voteCandidate.getCandidateNodeId();
}
}
return null;
}
// Return true if the given candidate has passed the vote
private boolean isVoteSuccess(VoteCandidate voteCandidate) {
Set<String> liveNodes = new HashSet<>();
int voteCount = 0;
for (Vote vote : voteCandidate.getVotes().values()) {
liveNodes.addAll(vote.getLiveMembers());
if (vote.getVoteValue()) {
voteCount++;
}
}
int numberOfLiveNodes;
if (numberOfNodes.get() > 0) {
numberOfLiveNodes = numberOfNodes.get();
} else {
numberOfLiveNodes = liveNodes.size();
}
return numberOfLiveNodes > 0 && voteCount >= (numberOfLiveNodes / 2 + 1);
}
private String generateLockKey(String key){
return "lock/" + key;
}
public void shutdown(){
voteService.shutdown();
}
/**
* Get the voted node id from this node for a given key
* @param key key of the data object
* @return Voted node id
*/
public String getVotedCandidateNodeId(String key) {
SharedDataMessage message = gossipManager.findSharedGossipData(generateLockKey(key));
if (message == null || !(message.getPayload() instanceof MajorityVote)) {
return null;
}
MajorityVote majorityVote = (MajorityVote) message.getPayload();
return getVotedCandidateNodeId(gossipManager.getMyself().getId(), majorityVote.value());
}
/**
* Set the number of live nodes. If this value is negative, live nodes will be calculated
* @param numberOfNodes live node count or negative to calculate.
*/
public void setNumberOfNodes(int numberOfNodes) {
this.numberOfNodes.set(numberOfNodes);
}
}

View File

@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.lock;
import org.apache.gossip.lock.vote.RandomVoteSelector;
import org.apache.gossip.lock.vote.VoteSelector;
/**
* Stores the lock manager related settings.
*/
public class LockManagerSettings {
// Time between vote updates in ms. Default is 1 second.
private final int voteUpdateInterval;
// Vote selection algorithm. Default is random voting
private final VoteSelector voteSelector;
// Number of nodes available for voting. Default is -1 (Auto calculate)
private final int numberOfNodes;
// Number of times to test for deadlock before preventing. Default is 3
private final int deadlockDetectionThreshold;
// Wait time between vote result calculation. Default is 1000
private final int resultCalculationDelay;
/**
* Construct LockManagerSettings with default settings.
*/
public static LockManagerSettings getLockManagerDefaultSettings() {
return new LockManagerSettings(1000, new RandomVoteSelector(), -1, 3, 1000);
}
/**
* Construct a custom LockManagerSettings
*
* @param voteUpdateInterval Time between vote updates in milliseconds.
* @param voteSelector Vote selection algorithm. Cannot be null
* @param numberOfNodes Number of nodes available for voting. Set to negative value for auto calculate
* @param deadlockDetectionThreshold Number of times to test for deadlock before preventing
* @param resultCalculationDelay Wait time between vote result calculation
*/
public LockManagerSettings(int voteUpdateInterval, VoteSelector voteSelector, int numberOfNodes,
int deadlockDetectionThreshold, int resultCalculationDelay) {
this.voteUpdateInterval = voteUpdateInterval;
this.voteSelector = voteSelector;
this.numberOfNodes = numberOfNodes;
this.deadlockDetectionThreshold = deadlockDetectionThreshold;
this.resultCalculationDelay = resultCalculationDelay;
}
public int getVoteUpdateInterval() {
return voteUpdateInterval;
}
public VoteSelector getVoteSelector() {
return voteSelector;
}
public int getNumberOfNodes() {
return numberOfNodes;
}
public int getDeadlockDetectionThreshold() {
return deadlockDetectionThreshold;
}
public int getResultCalculationDelay() {
return resultCalculationDelay;
}
}

View File

@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.lock.exceptions;
/**
* This exception is thrown when the lock based voting is failed.
*/
public class VoteFailedException extends Exception {
/**
* Constructs a new VoteFailedException with the specified detail message.
*
* @param message the detail message.
*/
public VoteFailedException(String message) {
super(message);
}
/**
* Constructs a new VoteFailedException with the specified detail message and
* cause.
*
* @param message the detail message
* @param cause the cause for this exception
*/
public VoteFailedException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@ -0,0 +1,169 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.lock.vote;
import org.apache.gossip.crdt.Crdt;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
* CRDT which used for distribute a votes for a given key.
*/
public class MajorityVote implements Crdt<Map<String, VoteCandidate>, MajorityVote> {
private final Map<String, VoteCandidate> voteCandidates = new ConcurrentHashMap<>();
public MajorityVote(Map<String, VoteCandidate> voteCandidateMap) {
voteCandidates.putAll(voteCandidateMap);
}
@Override
public MajorityVote merge(MajorityVote other) {
Map<String, VoteCandidate> mergedCandidates = new ConcurrentHashMap<>();
Set<String> firstKeySet = this.voteCandidates.keySet();
Set<String> secondKeySet = other.voteCandidates.keySet();
Set<String> sameCandidatesSet = getIntersection(firstKeySet, secondKeySet);
Set<String> differentCandidatesSet = getIntersectionCompliment(firstKeySet, secondKeySet);
// Merge different vote candidates by combining votes
for (String differentCandidateId : differentCandidatesSet) {
if (this.voteCandidates.containsKey(differentCandidateId)) {
mergedCandidates.put(differentCandidateId, this.voteCandidates.get(differentCandidateId));
} else if (other.voteCandidates.containsKey(differentCandidateId)) {
mergedCandidates.put(differentCandidateId, other.voteCandidates.get(differentCandidateId));
}
}
// Merge votes for the same candidate
for (String sameCandidateId : sameCandidatesSet) {
if (this.voteCandidates.containsKey(sameCandidateId) && other.voteCandidates
.containsKey(sameCandidateId)) {
mergedCandidates.put(sameCandidateId,
mergeCandidate(this.voteCandidates.get(sameCandidateId),
other.voteCandidates.get(sameCandidateId)));
}
}
return new MajorityVote(mergedCandidates);
}
// Merge different votes for same candidate
private VoteCandidate mergeCandidate(VoteCandidate firstCandidate,
VoteCandidate secondCandidate) {
VoteCandidate mergeResult = new VoteCandidate(firstCandidate.getCandidateNodeId(),
firstCandidate.getVotingKey(), new ConcurrentHashMap<>());
Set<String> firstKeySet = firstCandidate.getVotes().keySet();
Set<String> secondKeySet = secondCandidate.getVotes().keySet();
Set<String> sameVoteNodeSet = getIntersection(firstKeySet, secondKeySet);
Set<String> differentVoteNodeSet = getIntersectionCompliment(firstKeySet, secondKeySet);
// Merge different voters by combining their votes
for (String differentCandidateId : differentVoteNodeSet) {
if (firstCandidate.getVotes().containsKey(differentCandidateId)) {
mergeResult.getVotes()
.put(differentCandidateId, firstCandidate.getVotes().get(differentCandidateId));
} else if (secondCandidate.getVotes().containsKey(differentCandidateId)) {
mergeResult.getVotes()
.put(differentCandidateId, secondCandidate.getVotes().get(differentCandidateId));
}
}
// Merge vote for same voter
for (String sameVoteNodeId : sameVoteNodeSet) {
if (firstCandidate.getVotes().containsKey(sameVoteNodeId) && secondCandidate.getVotes()
.containsKey(sameVoteNodeId)) {
mergeResult.getVotes().put(sameVoteNodeId,
mergeVote(firstCandidate.getVotes().get(sameVoteNodeId),
secondCandidate.getVotes().get(sameVoteNodeId)));
}
}
return mergeResult;
}
// Merge two votes from same voter
private Vote mergeVote(Vote firstVote, Vote secondVote) {
if (firstVote.getVoteValue().booleanValue() != secondVote.getVoteValue().booleanValue()) {
if (firstVote.getVoteExchange()) {
return firstVote;
} else if (secondVote.getVoteExchange()) {
return secondVote;
} else {
return secondVote;
}
} else {
return secondVote;
}
}
private Set<String> getIntersection(Set<String> first, Set<String> second) {
Set<String> intersection = new HashSet<>(first);
intersection.retainAll(second);
return intersection;
}
private Set<String> getIntersectionCompliment(Set<String> first, Set<String> second) {
Set<String> union = new HashSet<>();
union.addAll(first);
union.addAll(second);
Set<String> intersectionCompliment = new HashSet<>(union);
intersectionCompliment.removeAll(getIntersection(first, second));
return intersectionCompliment;
}
@Override
public Map<String, VoteCandidate> value() {
Map<String, VoteCandidate> copy = new ConcurrentHashMap<>();
copy.putAll(voteCandidates);
return Collections.unmodifiableMap(copy);
}
@Override
public int hashCode() {
return voteCandidates.hashCode();
}
@Override
public boolean equals(Object obj) {
if (obj == null)
return false;
if (obj == this)
return true;
if (!(obj instanceof MajorityVote))
return false;
MajorityVote other = (MajorityVote) obj;
return Objects.equals(voteCandidates, other.voteCandidates);
}
@Override
public String toString() {
return voteCandidates.toString();
}
@Override
public MajorityVote optimize() {
return new MajorityVote(voteCandidates);
}
public Map<String, VoteCandidate> getVoteCandidates() {
return new ConcurrentHashMap<>(voteCandidates);
}
}

View File

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.lock.vote;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.Set;
/**
* VoteSelector implementation which randomly select a voting node.
*/
public class RandomVoteSelector implements VoteSelector {
@Override
public String getVoteCandidateId(Set<String> voteCandidateIds) {
List<String> voteCandidatesIds = new ArrayList<>(voteCandidateIds);
return voteCandidatesIds.get(new Random().nextInt(voteCandidatesIds.size()));
}
}

View File

@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.lock.vote;
import java.util.List;
/**
* Store a voter details.
*/
public class Vote {
private final String votingNode;
private final Boolean voteValue; // TODO: 7/16/17 weight?
private Boolean voteExchange;
private final List<String> liveMembers;
private final List<String> deadMembers;
public Vote(String votingNode, Boolean voteValue, Boolean voteExchange, List<String> liveMembers,
List<String> deadMembers) {
this.votingNode = votingNode;
this.voteValue = voteValue;
this.voteExchange = voteExchange;
this.liveMembers = liveMembers;
this.deadMembers = deadMembers;
}
public String getVotingNode() {
return votingNode;
}
public Boolean getVoteValue() {
return voteValue;
}
public Boolean getVoteExchange() {
return voteExchange;
}
public void setVoteExchange(Boolean voteExchange) {
this.voteExchange = voteExchange;
}
public List<String> getLiveMembers() {
return liveMembers;
}
public List<String> getDeadMembers() {
return deadMembers;
}
@Override
public String toString() {
return "votingNode=" + votingNode + ", voteValue=" + voteValue + ", liveMembers=" + liveMembers
+ ", deadMembers= " + deadMembers;
}
}

View File

@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.lock.vote;
import java.util.Map;
import java.util.Objects;
/**
* Stores the vote candidate details and its votes.
*/
public class VoteCandidate {
private final String candidateNodeId;
private final String votingKey;
private final Map<String, Vote> votes;
public VoteCandidate(String candidateNodeId, String votingKey, Map<String, Vote> votes) {
this.candidateNodeId = candidateNodeId;
this.votingKey = votingKey;
this.votes = votes;
}
public String getCandidateNodeId() {
return candidateNodeId;
}
public String getVotingKey() {
return votingKey;
}
public Map<String, Vote> getVotes() {
return votes;
}
public void addVote(Vote vote) {
votes.put(vote.getVotingNode(), vote);
}
@Override
public int hashCode() {
return Objects.hash(candidateNodeId, votingKey);
}
@Override
public boolean equals(Object obj) {
if (!(obj instanceof VoteCandidate))
return false;
if (obj == this)
return true;
VoteCandidate other = (VoteCandidate) obj;
return this.candidateNodeId.equals(other.candidateNodeId) && this.votingKey
.equals(other.votingKey);
}
@Override
public String toString() {
return "candidateNodeId=" + candidateNodeId + ", votingKey=" + votingKey + ", votes= " + votes;
}
}

View File

@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.lock.vote;
import java.util.Set;
/**
* This interface defines vote selection algorithm for the vote based locking.
*/
public interface VoteSelector {
/**
* This method get call by the lock manager of a node to decide which candidate need to be choose for voting.
*
* @param voteCandidateIds node id set for the vote candidates
* @return selected node id to vote from the given vote candidate set.
*/
String getVoteCandidateId(Set<String> voteCandidateIds);
}

View File

@ -28,6 +28,8 @@ import org.apache.gossip.event.GossipListener;
import org.apache.gossip.event.GossipState;
import org.apache.gossip.event.data.UpdateNodeDataEventHandler;
import org.apache.gossip.event.data.UpdateSharedDataEventHandler;
import org.apache.gossip.lock.LockManager;
import org.apache.gossip.lock.exceptions.VoteFailedException;
import org.apache.gossip.manager.handlers.MessageHandler;
import org.apache.gossip.model.PerNodeDataMessage;
import org.apache.gossip.model.SharedDataMessage;
@ -43,7 +45,11 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.concurrent.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
@ -77,7 +83,8 @@ public abstract class GossipManager {
private final GossipMemberStateRefresher memberStateRefresher;
private final MessageHandler messageHandler;
private final LockManager lockManager;
public GossipManager(String cluster,
URI uri, String id, Map<String, String> properties, GossipSettings settings,
List<Member> gossipMembers, GossipListener listener, MetricRegistry registry,
@ -89,6 +96,7 @@ public abstract class GossipManager {
me = new LocalMember(cluster, uri, id, clock.nanoTime(), properties,
settings.getWindowSize(), settings.getMinimumSamples(), settings.getDistribution());
gossipCore = new GossipCore(this, registry);
this.lockManager = new LockManager(this, settings.getLockManagerSettings(), registry);
dataReaper = new DataReaper(gossipCore, clock);
members = new ConcurrentSkipListMap<>();
for (Member startupMember : gossipMembers) {
@ -221,6 +229,7 @@ public abstract class GossipManager {
*/
public void shutdown() {
gossipServiceRunning.set(false);
lockManager.shutdown();
gossipCore.shutdown();
transportManager.shutdown();
dataReaper.close();
@ -371,4 +380,21 @@ public abstract class GossipManager {
public void registerGossipListener(GossipListener listener) {
memberStateRefresher.register(listener);
}
/**
* Get the lock manager specified with this GossipManager.
* @return lock manager object.
*/
public LockManager getLockManager() {
return lockManager;
}
/**
* Try to acquire a lock on given shared data key.
* @param key key of tha share data object.
* @throws VoteFailedException if the locking is failed.
*/
public void acquireSharedDataLock(String key) throws VoteFailedException{
lockManager.acquireSharedDataLock(key);
}
}

View File

@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip.lock.vote;
import org.junit.Assert;
import org.junit.Test;
import org.junit.platform.runner.JUnitPlatform;
import org.junit.runner.RunWith;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
@RunWith(JUnitPlatform.class)
public class MajorityVoteTest {
@Test
public void differentCandidateMergeTest() {
Map<String, VoteCandidate> voteCandidateMap1 = new HashMap<>();
VoteCandidate candidateA = new VoteCandidate("1", "key1", generateVotes(1, 2, true, true));
voteCandidateMap1.put("1", candidateA);
MajorityVote first = new MajorityVote(voteCandidateMap1);
Map<String, VoteCandidate> voteCandidateMap2 = new HashMap<>();
VoteCandidate candidateB = new VoteCandidate("3", "key1", generateVotes(3, 4, true, false));
voteCandidateMap2.put("3", candidateB);
MajorityVote second = new MajorityVote(voteCandidateMap2);
MajorityVote result = first.merge(second);
Assert.assertTrue(result.value().get("1").getVotes().get("2").getVoteValue());
Assert.assertTrue(!result.value().get("3").getVotes().get("4").getVoteValue());
}
@Test
public void sameCandidateMergeTest() {
Map<String, VoteCandidate> voteCandidateMap1 = new HashMap<>();
VoteCandidate candidateA = new VoteCandidate("1", "key1", generateVotes(1, 2, true, true));
voteCandidateMap1.put("1", candidateA);
MajorityVote first = new MajorityVote(voteCandidateMap1);
Map<String, VoteCandidate> voteCandidateMap2 = new HashMap<>();
VoteCandidate candidateB = new VoteCandidate("1", "key1", generateVotes(3, 4, true, false));
voteCandidateMap2.put("1", candidateB);
MajorityVote second = new MajorityVote(voteCandidateMap2);
MajorityVote result = first.merge(second);
Assert.assertTrue(result.value().get("1").getVotes().get("2").getVoteValue());
Assert.assertTrue(!result.value().get("1").getVotes().get("4").getVoteValue());
}
@Test
public void sameVoteMergeTest() {
Map<String, VoteCandidate> voteCandidateMap1 = new HashMap<>();
VoteCandidate candidateA = new VoteCandidate("1", "key1", generateVotes(1, 2, true, true));
voteCandidateMap1.put("1", candidateA);
MajorityVote first = new MajorityVote(voteCandidateMap1);
Map<String, VoteCandidate> voteCandidateMap2 = new HashMap<>();
VoteCandidate candidateB = new VoteCandidate("1", "key1",
generateVotes(2, 4, true, false, true));
voteCandidateMap2.put("1", candidateB);
MajorityVote second = new MajorityVote(voteCandidateMap2);
MajorityVote result = first.merge(second);
Assert.assertTrue(result.value().get("1").getVotes().get("2").getVoteValue());
}
public Map<String, Vote> generateVotes(int startingNodeId, int endNodeId, boolean... votes) {
Map<String, Vote> voteMap = new HashMap<>();
if ((endNodeId - startingNodeId + 1) > votes.length) {
return voteMap;
}
for (int i = startingNodeId; i <= endNodeId; i++) {
String nodeId = i + "";
voteMap.put(nodeId, new Vote(nodeId, votes[i - startingNodeId], false, new ArrayList<>(),
new ArrayList<>()));
}
return voteMap;
}
}

View File

@ -0,0 +1,125 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gossip;
import org.apache.gossip.lock.exceptions.VoteFailedException;
import org.apache.gossip.manager.GossipManager;
import org.apache.gossip.manager.GossipManagerBuilder;
import org.apache.gossip.model.SharedDataMessage;
import org.junit.Assert;
import org.junit.Test;
import org.junit.platform.runner.JUnitPlatform;
import org.junit.runner.RunWith;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
@RunWith(JUnitPlatform.class)
public class SharedDataLockTest extends AbstractIntegrationBase {
@Test
public void sharedDataLockRandomVoteTest()
throws InterruptedException, UnknownHostException, URISyntaxException {
GossipSettings settings = new GossipSettings();
settings.setPersistRingState(false);
settings.setPersistDataState(false);
String cluster = UUID.randomUUID().toString();
int seedNodes = 1;
List<Member> startupMembers = new ArrayList<>();
for (int i = 1; i < seedNodes + 1; ++i) {
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
startupMembers.add(new RemoteMember(cluster, uri, i + ""));
}
final List<GossipManager> clients = new ArrayList<>();
final int clusterMembers = 10;
for (int i = 1; i < clusterMembers + 1; ++i) {
URI uri = new URI("udp://" + "127.0.0.1" + ":" + (50000 + i));
GossipManager gossipService = GossipManagerBuilder.newBuilder().cluster(cluster).uri(uri)
.id(i + "").gossipMembers(startupMembers).gossipSettings(settings).build();
clients.add(gossipService);
gossipService.getLockManager().setNumberOfNodes(clusterMembers);
gossipService.init();
register(gossipService);
}
// Adding new data to Node 1
clients.get(0).gossipSharedData(sharedNodeData("category", "distributed"));
final AtomicInteger lockSuccessCount = new AtomicInteger(0);
final AtomicInteger lockFailedCount = new AtomicInteger(0);
// Node 1 try to lock on key category
Thread Node1LockingThread = new Thread(() -> {
try {
clients.get(0).acquireSharedDataLock("category");
lockSuccessCount.incrementAndGet();
} catch (VoteFailedException ignore) {
lockFailedCount.incrementAndGet();
}
});
// Node 3 try to lock on key category
Thread Node3LockingThread = new Thread(() -> {
try {
clients.get(2).acquireSharedDataLock("category");
lockSuccessCount.incrementAndGet();
} catch (VoteFailedException ignore) {
lockFailedCount.incrementAndGet();
}
});
// Node 6 try to lock on key category
Thread Node5LockingThread = new Thread(() -> {
try {
clients.get(5).acquireSharedDataLock("category");
lockSuccessCount.incrementAndGet();
} catch (VoteFailedException ignore) {
lockFailedCount.incrementAndGet();
}
});
Node1LockingThread.start();
Node3LockingThread.start();
Node5LockingThread.start();
Node1LockingThread.join();
Node3LockingThread.join();
Node5LockingThread.join();
// Only one node should acquire the lock
Assert.assertEquals(1, lockSuccessCount.get());
// Other nodes should fail
Assert.assertEquals(2, lockFailedCount.get());
}
private SharedDataMessage sharedNodeData(String key, String value) {
SharedDataMessage g = new SharedDataMessage();
g.setExpireAt(Long.MAX_VALUE);
g.setKey(key);
g.setPayload(value);
g.setTimestamp(System.currentTimeMillis());
return g;
}
}