diff --git a/src/main/java/org/apache/gossip/GossipMember.java b/src/main/java/org/apache/gossip/GossipMember.java index f2834bd..703ac55 100644 --- a/src/main/java/org/apache/gossip/GossipMember.java +++ b/src/main/java/org/apache/gossip/GossipMember.java @@ -28,11 +28,11 @@ import java.util.Map; public abstract class GossipMember implements Comparable { - protected final URI uri; + protected URI uri; protected volatile long heartbeat; - protected final String clusterName; + protected String clusterName; /** * The purpose of the id field is to be able for nodes to identify themselves beyond their @@ -64,6 +64,7 @@ public abstract class GossipMember implements Comparable { this.properties = properties; } + protected GossipMember(){} /** * Get the name of the cluster the member belongs to. * @@ -78,7 +79,7 @@ public abstract class GossipMember implements Comparable { * @return The member address in the form IP/host:port Similar to the toString in * {@link InetSocketAddress} */ - public String getAddress() { + public String computeAddress() { return uri.getHost() + ":" + uri.getPort(); } @@ -118,7 +119,7 @@ public abstract class GossipMember implements Comparable { } public String toString() { - return "Member [address=" + getAddress() + ", id=" + id + ", heartbeat=" + heartbeat + "]"; + return "Member [address=" + computeAddress() + ", id=" + id + ", heartbeat=" + heartbeat + "]"; } /** @@ -128,7 +129,7 @@ public abstract class GossipMember implements Comparable { public int hashCode() { final int prime = 31; int result = 1; - String address = getAddress(); + String address = computeAddress(); result = prime * result + ((address == null) ? 0 : address.hashCode()) + (clusterName == null ? 0 : clusterName.hashCode()); return result; @@ -155,11 +156,11 @@ public abstract class GossipMember implements Comparable { return false; } // The object is the same of they both have the same address (hostname and port). - return getAddress().equals(((LocalGossipMember) obj).getAddress()) + return computeAddress().equals(((LocalGossipMember) obj).computeAddress()) && getClusterName().equals(((LocalGossipMember) obj).getClusterName()); } public int compareTo(GossipMember other) { - return this.getAddress().compareTo(other.getAddress()); + return this.computeAddress().compareTo(other.computeAddress()); } } diff --git a/src/main/java/org/apache/gossip/GossipSettings.java b/src/main/java/org/apache/gossip/GossipSettings.java index 1fed914..60a443f 100644 --- a/src/main/java/org/apache/gossip/GossipSettings.java +++ b/src/main/java/org/apache/gossip/GossipSettings.java @@ -40,7 +40,7 @@ public class GossipSettings { /** the threshold for the detector */ //private double convictThreshold = 2.606201185901408; - private double convictThreshold = 4.5; + private double convictThreshold = 2.606201185901408; private String distribution = "exponential"; @@ -48,6 +48,14 @@ public class GossipSettings { private Map activeGossipProperties = new HashMap<>(); + private String pathToRingState = "./"; + + private boolean persistRingState = true; + + private String pathToDataState = "./"; + + private boolean persistDataState = true; + /** * Construct GossipSettings with default settings. */ @@ -162,5 +170,37 @@ public class GossipSettings { public void setActiveGossipProperties(Map activeGossipProperties) { this.activeGossipProperties = activeGossipProperties; } + + public String getPathToRingState() { + return pathToRingState; + } + + public void setPathToRingState(String pathToRingState) { + this.pathToRingState = pathToRingState; + } + + public boolean isPersistRingState() { + return persistRingState; + } + + public void setPersistRingState(boolean persistRingState) { + this.persistRingState = persistRingState; + } + + public String getPathToDataState() { + return pathToDataState; + } + + public void setPathToDataState(String pathToDataState) { + this.pathToDataState = pathToDataState; + } + + public boolean isPersistDataState() { + return persistDataState; + } + + public void setPersistDataState(boolean persistDataState) { + this.persistDataState = persistDataState; + } } diff --git a/src/main/java/org/apache/gossip/LocalGossipMember.java b/src/main/java/org/apache/gossip/LocalGossipMember.java index 557ffcb..05874f5 100644 --- a/src/main/java/org/apache/gossip/LocalGossipMember.java +++ b/src/main/java/org/apache/gossip/LocalGossipMember.java @@ -29,7 +29,7 @@ import org.apache.gossip.accrual.FailureDetector; */ public class LocalGossipMember extends GossipMember { /** The failure detector for this member */ - private transient final FailureDetector detector; + private transient FailureDetector detector; /** * @@ -46,6 +46,10 @@ public class LocalGossipMember extends GossipMember { detector = new FailureDetector(this, minSamples, windowSize, distribution); } + protected LocalGossipMember(){ + + } + public void recordHeartbeat(long now){ detector.recordHeartbeat(now); } diff --git a/src/main/java/org/apache/gossip/StartupSettings.java b/src/main/java/org/apache/gossip/StartupSettings.java index 0117be7..ab5f764 100644 --- a/src/main/java/org/apache/gossip/StartupSettings.java +++ b/src/main/java/org/apache/gossip/StartupSettings.java @@ -198,7 +198,7 @@ public class StartupSettings { RemoteGossipMember member = new RemoteGossipMember(child.get("cluster").asText(), uri3, "", 0, new HashMap()); settings.addGossipMember(member); - configMembersDetails += member.getAddress(); + configMembersDetails += member.computeAddress(); configMembersDetails += ", "; } log.info(configMembersDetails + "]"); diff --git a/src/main/java/org/apache/gossip/examples/StandAloneDatacenterAndRack.java b/src/main/java/org/apache/gossip/examples/StandAloneDatacenterAndRack.java new file mode 100644 index 0000000..dfeabd7 --- /dev/null +++ b/src/main/java/org/apache/gossip/examples/StandAloneDatacenterAndRack.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gossip.examples; + +import java.net.URI; +import java.net.UnknownHostException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import org.apache.gossip.GossipService; +import org.apache.gossip.GossipSettings; +import org.apache.gossip.RemoteGossipMember; +import org.apache.gossip.manager.DatacenterRackAwareActiveGossiper; + +import com.codahale.metrics.MetricRegistry; + +public class StandAloneDatacenterAndRack { + + public static void main (String [] args) throws UnknownHostException, InterruptedException { + GossipSettings s = new GossipSettings(); + s.setWindowSize(10); + s.setConvictThreshold(1.0); + s.setGossipInterval(1000); + s.setActiveGossipClass(DatacenterRackAwareActiveGossiper.class.getName()); + Map gossipProps = new HashMap<>(); + gossipProps.put("sameRackGossipIntervalMs", "2000"); + gossipProps.put("differentDatacenterGossipIntervalMs", "10000"); + s.setActiveGossipProperties(gossipProps); + + + Map props = new HashMap<>(); + props.put(DatacenterRackAwareActiveGossiper.DATACENTER, args[4]); + props.put(DatacenterRackAwareActiveGossiper.RACK, args[5]); + GossipService gossipService = new GossipService("mycluster", URI.create(args[0]), args[1], + props, Arrays.asList(new RemoteGossipMember("mycluster", URI.create(args[2]), args[3])), + s, (a, b) -> { }, new MetricRegistry()); + gossipService.start(); + while (true){ + System.out.println("Live: " + gossipService.getGossipManager().getLiveMembers()); + System.out.println("Dead: " + gossipService.getGossipManager().getDeadMembers()); + Thread.sleep(2000); + } + } +} diff --git a/src/main/java/org/apache/gossip/manager/DatacenterRackAwareActiveGossiper.java b/src/main/java/org/apache/gossip/manager/DatacenterRackAwareActiveGossiper.java index 40b9c28..4f5dfdc 100644 --- a/src/main/java/org/apache/gossip/manager/DatacenterRackAwareActiveGossiper.java +++ b/src/main/java/org/apache/gossip/manager/DatacenterRackAwareActiveGossiper.java @@ -180,7 +180,8 @@ public class DatacenterRackAwareActiveGossiper extends AbstractActiveGossiper { } private void sendToSameRackMember() { - sendMembershipList(gossipManager.getMyself(), selectPartner(sameRackNodes())); + LocalGossipMember i = selectPartner(sameRackNodes()); + sendMembershipList(gossipManager.getMyself(), i); } private void sendToSameRackMemberPerNode() { diff --git a/src/main/java/org/apache/gossip/manager/GossipManager.java b/src/main/java/org/apache/gossip/manager/GossipManager.java index 840efb9..04afc28 100644 --- a/src/main/java/org/apache/gossip/manager/GossipManager.java +++ b/src/main/java/org/apache/gossip/manager/GossipManager.java @@ -54,35 +54,24 @@ public abstract class GossipManager { public static final Logger LOGGER = Logger.getLogger(GossipManager.class); private final ConcurrentSkipListMap members; - private final LocalGossipMember me; - private final GossipSettings settings; - private final AtomicBoolean gossipServiceRunning; - private final GossipListener listener; - private AbstractActiveGossiper activeGossipThread; - private PassiveGossipThread passiveGossipThread; - private ExecutorService gossipThreadExecutor; - private final GossipCore gossipCore; - private final DataReaper dataReaper; - private final Clock clock; - private final ScheduledExecutorService scheduledServiced; - - private MetricRegistry registry; - + private final MetricRegistry registry; + private final RingStatePersister ringState; + private final UserDataPersister userDataState; + public GossipManager(String cluster, URI uri, String id, Map properties, GossipSettings settings, List gossipMembers, GossipListener listener, MetricRegistry registry) { - this.settings = settings; gossipCore = new GossipCore(this, registry); clock = new SystemClock(); @@ -105,6 +94,10 @@ public abstract class GossipManager { this.listener = listener; this.scheduledServiced = Executors.newScheduledThreadPool(1); this.registry = registry; + this.ringState = new RingStatePersister(this); + this.userDataState = new UserDataPersister(this, this.gossipCore); + readSavedRingState(); + readSavedDataState(); } public ConcurrentSkipListMap getMembers() { @@ -150,6 +143,7 @@ public abstract class GossipManager { throw new RuntimeException(e); } } + /** * Starts the client. Specifically, start the various cycles for this protocol. Start the gossip * thread and start the receiver thread. @@ -160,13 +154,14 @@ public abstract class GossipManager { activeGossipThread = constructActiveGossiper(); activeGossipThread.init(); dataReaper.init(); + scheduledServiced.scheduleAtFixedRate(ringState, 60, 60, TimeUnit.SECONDS); + scheduledServiced.scheduleAtFixedRate(userDataState, 60, 60, TimeUnit.SECONDS); scheduledServiced.scheduleAtFixedRate(() -> { try { for (Entry entry : members.entrySet()) { Double result = null; try { result = entry.getKey().detect(clock.nanoTime()); - //System.out.println(entry.getKey() +" "+ result); if (result != null) { if (result > settings.getConvictThreshold() && entry.getValue() == GossipState.UP) { members.put(entry.getKey(), GossipState.DOWN); @@ -195,6 +190,27 @@ public abstract class GossipManager { LOGGER.debug("The GossipManager is started."); } + private void readSavedRingState() { + for (LocalGossipMember l : ringState.readFromDisk()){ + LocalGossipMember member = new LocalGossipMember(l.getClusterName(), + l.getUri(), l.getId(), + clock.nanoTime(), l.getProperties(), settings.getWindowSize(), + settings.getMinimumSamples(), settings.getDistribution()); + members.putIfAbsent(member, GossipState.DOWN); + } + } + + private void readSavedDataState() { + for (Entry> l : userDataState.readPerNodeFromDisk().entrySet()){ + for (Entry j : l.getValue().entrySet()){ + gossipCore.addPerNodeData(j.getValue()); + } + } + for (Entry l: userDataState.readSharedDataFromDisk().entrySet()){ + gossipCore.addSharedData(l.getValue()); + } + } + /** * Shutdown the gossip service. */ @@ -217,6 +233,14 @@ public abstract class GossipManager { } catch (InterruptedException e) { LOGGER.error(e); } + gossipThreadExecutor.shutdownNow(); + scheduledServiced.shutdown(); + try { + scheduledServiced.awaitTermination(1, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOGGER.error(e); + } + scheduledServiced.shutdownNow(); } public void gossipPerNodeData(GossipDataMessage message){ @@ -266,6 +290,13 @@ public abstract class GossipManager { public DataReaper getDataReaper() { return dataReaper; } + + public RingStatePersister getRingState() { + return ringState; + } + public UserDataPersister getUserDataState() { + return userDataState; + } } diff --git a/src/main/java/org/apache/gossip/manager/RingStatePersister.java b/src/main/java/org/apache/gossip/manager/RingStatePersister.java new file mode 100644 index 0000000..24b464a --- /dev/null +++ b/src/main/java/org/apache/gossip/manager/RingStatePersister.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.manager; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.NavigableSet; + +import org.apache.gossip.LocalGossipMember; +import org.apache.log4j.Logger; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; + +public class RingStatePersister implements Runnable { + + private static final Logger LOGGER = Logger.getLogger(RingStatePersister.class); + private static final ObjectMapper MAPPER = new ObjectMapper(); + private static final TypeReference> REF + = new TypeReference>() { }; + private GossipManager parent; + + public RingStatePersister(GossipManager parent){ + this.parent = parent; + } + + @Override + public void run() { + writeToDisk(); + } + + File computeTarget(){ + return new File(parent.getSettings().getPathToRingState(), "ringstate." + parent.getMyself().getClusterName() + "." + + parent.getMyself().getId() + ".json"); + } + + void writeToDisk(){ + if (!parent.getSettings().isPersistRingState()){ + return; + } + NavigableSet i = parent.getMembers().keySet(); + try (FileOutputStream fos = new FileOutputStream(computeTarget())){ + MAPPER.writeValue(fos, i); + } catch (IOException e) { + LOGGER.debug(e); + } + } + + List readFromDisk(){ + if (!parent.getSettings().isPersistRingState()){ + return Collections.emptyList(); + } + try (FileInputStream fos = new FileInputStream(computeTarget())){ + return MAPPER.readValue(fos, REF); + } catch (IOException e) { + LOGGER.debug(e); + } + return Collections.emptyList(); + } + +} diff --git a/src/main/java/org/apache/gossip/manager/UserDataPersister.java b/src/main/java/org/apache/gossip/manager/UserDataPersister.java new file mode 100644 index 0000000..c67677a --- /dev/null +++ b/src/main/java/org/apache/gossip/manager/UserDataPersister.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.manager; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.gossip.model.GossipDataMessage; +import org.apache.gossip.model.SharedGossipDataMessage; +import org.apache.log4j.Logger; + +import com.fasterxml.jackson.databind.ObjectMapper; + +public class UserDataPersister implements Runnable { + + private static final Logger LOGGER = Logger.getLogger(UserDataPersister.class); + private static final ObjectMapper MAPPER = new ObjectMapper(); + private final GossipManager parent; + private final GossipCore gossipCore; + + UserDataPersister(GossipManager parent, GossipCore gossipCore){ + this.parent = parent; + this.gossipCore = gossipCore; + MAPPER.enableDefaultTyping(); + } + + File computeSharedTarget(){ + return new File(parent.getSettings().getPathToDataState(), "shareddata." + + parent.getMyself().getClusterName() + "." + parent.getMyself().getId() + ".json"); + } + + File computePerNodeTarget() { + return new File(parent.getSettings().getPathToDataState(), "pernodedata." + + parent.getMyself().getClusterName() + "." + parent.getMyself().getId() + ".json"); + } + + @SuppressWarnings("unchecked") + ConcurrentHashMap> readPerNodeFromDisk(){ + if (!parent.getSettings().isPersistDataState()){ + return new ConcurrentHashMap>(); + } + try (FileInputStream fos = new FileInputStream(computePerNodeTarget())){ + return MAPPER.readValue(fos, ConcurrentHashMap.class); + } catch (IOException e) { + LOGGER.debug(e); + } + return new ConcurrentHashMap>(); + } + + void writePerNodeToDisk(){ + if (!parent.getSettings().isPersistDataState()){ + return; + } + try (FileOutputStream fos = new FileOutputStream(computePerNodeTarget())){ + MAPPER.writeValue(fos, gossipCore.getPerNodeData()); + } catch (IOException e) { + LOGGER.warn(e); + } + } + + void writeSharedToDisk(){ + if (!parent.getSettings().isPersistDataState()){ + return; + } + try (FileOutputStream fos = new FileOutputStream(computeSharedTarget())){ + MAPPER.writeValue(fos, gossipCore.getSharedData()); + } catch (IOException e) { + LOGGER.warn(e); + } + } + + @SuppressWarnings("unchecked") + ConcurrentHashMap readSharedDataFromDisk(){ + if (!parent.getSettings().isPersistRingState()){ + return new ConcurrentHashMap(); + } + try (FileInputStream fos = new FileInputStream(computeSharedTarget())){ + return MAPPER.readValue(fos, ConcurrentHashMap.class); + } catch (IOException e) { + LOGGER.debug(e); + } + return new ConcurrentHashMap(); + } + + /** + * Writes all pernode and shared data to disk + */ + @Override + public void run() { + writePerNodeToDisk(); + writeSharedToDisk(); + } +} diff --git a/src/test/java/org/apache/gossip/DataTest.java b/src/test/java/org/apache/gossip/DataTest.java index 83879f9..98c7ee0 100644 --- a/src/test/java/org/apache/gossip/DataTest.java +++ b/src/test/java/org/apache/gossip/DataTest.java @@ -39,6 +39,8 @@ public class DataTest { @Test public void dataTest() throws InterruptedException, UnknownHostException, URISyntaxException{ GossipSettings settings = new GossipSettings(); + settings.setPersistRingState(false); + settings.setPersistDataState(false); String cluster = UUID.randomUUID().toString(); int seedNodes = 1; List startupMembers = new ArrayList<>(); diff --git a/src/test/java/org/apache/gossip/IdAndPropertyTest.java b/src/test/java/org/apache/gossip/IdAndPropertyTest.java index 2a98f01..1eb0aee 100644 --- a/src/test/java/org/apache/gossip/IdAndPropertyTest.java +++ b/src/test/java/org/apache/gossip/IdAndPropertyTest.java @@ -25,7 +25,7 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeUnit; import org.apache.gossip.manager.DatacenterRackAwareActiveGossiper; import org.junit.jupiter.api.Test; @@ -75,8 +75,7 @@ public class IdAndPropertyTest { value = gossipService2.getGossipManager().getLiveMembers().get(0).getProperties().get("a"); } catch (RuntimeException e){ } return value; - }).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo("b"); - + }).afterWaitingAtMost(10, TimeUnit.SECONDS).isEqualTo("b"); gossipService1.shutdown(); gossipService2.shutdown(); diff --git a/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java b/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java index 9d02556..2386084 100644 --- a/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java +++ b/src/test/java/org/apache/gossip/ShutdownDeadtimeTest.java @@ -47,7 +47,9 @@ public class ShutdownDeadtimeTest { @Test public void DeadNodesDoNotComeAliveAgain() throws InterruptedException, UnknownHostException, URISyntaxException { - GossipSettings settings = new GossipSettings(1000, 10000, 1000, 1, 5.0, "exponential"); + GossipSettings settings = new GossipSettings(1000, 10000, 1000, 1, 2.0, "exponential"); + settings.setPersistRingState(false); + settings.setPersistDataState(false); String cluster = UUID.randomUUID().toString(); int seedNodes = 3; List startupMembers = new ArrayList<>(); diff --git a/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java b/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java index bc4004d..aa797f5 100644 --- a/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java +++ b/src/test/java/org/apache/gossip/TenNodeThreeSeedTest.java @@ -48,6 +48,8 @@ public class TenNodeThreeSeedTest { public void abc(int base) throws InterruptedException, UnknownHostException, URISyntaxException{ GossipSettings settings = new GossipSettings(); + settings.setPersistRingState(false); + settings.setPersistDataState(false); String cluster = UUID.randomUUID().toString(); int seedNodes = 3; List startupMembers = new ArrayList<>(); diff --git a/src/test/java/org/apache/gossip/manager/DataReaperTest.java b/src/test/java/org/apache/gossip/manager/DataReaperTest.java index b4ac45d..a9c861c 100644 --- a/src/test/java/org/apache/gossip/manager/DataReaperTest.java +++ b/src/test/java/org/apache/gossip/manager/DataReaperTest.java @@ -39,6 +39,8 @@ public class DataReaperTest { @Test public void testReaperOneShot() { GossipSettings settings = new GossipSettings(); + settings.setPersistRingState(false); + settings.setPersistDataState(false); GossipManager gm = RandomGossipManager.newBuilder().cluster("abc").settings(settings) .withId(myId).uri(URI.create("udp://localhost:6000")).registry(registry).build(); gm.init(); diff --git a/src/test/java/org/apache/gossip/manager/RingPersistenceTest.java b/src/test/java/org/apache/gossip/manager/RingPersistenceTest.java new file mode 100644 index 0000000..6e41bdc --- /dev/null +++ b/src/test/java/org/apache/gossip/manager/RingPersistenceTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.manager; + +import java.io.File; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.UnknownHostException; +import java.util.Arrays; +import java.util.HashMap; +import org.apache.gossip.GossipService; +import org.apache.gossip.GossipSettings; +import org.apache.gossip.RemoteGossipMember; +import org.junit.Assert; +import org.junit.Test; + +import com.codahale.metrics.MetricRegistry; + +public class RingPersistenceTest { + + @Test + public void givenThatRingIsPersisted() throws UnknownHostException, InterruptedException, URISyntaxException { + GossipSettings settings = new GossipSettings(); + File f = aGossiperPersists(settings); + Assert.assertTrue(f.exists()); + aNewInstanceGetsRingInfo(settings); + f.delete(); + } + + private File aGossiperPersists(GossipSettings settings) throws UnknownHostException, InterruptedException, URISyntaxException { + GossipService gossipService = new GossipService("a", new URI("udp://" + "127.0.0.1" + ":" + (29000 + 1)), "1", new HashMap(), + Arrays.asList( + new RemoteGossipMember("a", new URI("udp://" + "127.0.0.1" + ":" + (29000 + 0)), "0"), + new RemoteGossipMember("a", new URI("udp://" + "127.0.0.1" + ":" + (29000 + 2)), "2") + ), + settings, (a, b) -> { }, new MetricRegistry()); + gossipService.getGossipManager().getRingState().writeToDisk(); + return gossipService.getGossipManager().getRingState().computeTarget(); + } + + private void aNewInstanceGetsRingInfo(GossipSettings settings) throws UnknownHostException, InterruptedException, URISyntaxException{ + GossipService gossipService2 = new GossipService("a", new URI("udp://" + "127.0.0.1" + ":" + (29000 + 1)), "1", new HashMap(), + Arrays.asList(), + settings, (a, b) -> { }, new MetricRegistry()); + Assert.assertEquals(2, gossipService2.getGossipManager().getMembers().size()); + } + +} diff --git a/src/test/java/org/apache/gossip/manager/UserDataPersistenceTest.java b/src/test/java/org/apache/gossip/manager/UserDataPersistenceTest.java new file mode 100644 index 0000000..e0cbcf4 --- /dev/null +++ b/src/test/java/org/apache/gossip/manager/UserDataPersistenceTest.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gossip.manager; + +import java.io.File; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.UnknownHostException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.gossip.GossipService; +import org.apache.gossip.GossipSettings; +import org.apache.gossip.model.GossipDataMessage; +import org.apache.gossip.model.SharedGossipDataMessage; +import org.junit.Assert; +import org.junit.Test; + +import com.codahale.metrics.MetricRegistry; + +public class UserDataPersistenceTest { + + @Test + public void givenThatRingIsPersisted() throws UnknownHostException, InterruptedException, URISyntaxException { + String nodeId = "1"; + GossipSettings settings = new GossipSettings(); + { //Create a gossip service and force it to persist its user data + GossipService gossipService = new GossipService("a", + new URI("udp://" + "127.0.0.1" + ":" + (29000 + 1)), nodeId, new HashMap(), + Arrays.asList(), settings, (a, b) -> { }, new MetricRegistry()); + gossipService.start(); + gossipService.gossipPerNodeData(getToothpick()); + gossipService.gossipSharedData(getAnotherToothpick()); + gossipService.getGossipManager().getUserDataState().writePerNodeToDisk(); + gossipService.getGossipManager().getUserDataState().writeSharedToDisk(); + { //read the raw data and confirm + ConcurrentHashMap> l = gossipService.getGossipManager().getUserDataState().readPerNodeFromDisk(); + Assert.assertEquals("red", ((AToothpick) l.get(nodeId).get("a").getPayload()).getColor()); + } + { + ConcurrentHashMap l = + gossipService.getGossipManager().getUserDataState().readSharedDataFromDisk(); + Assert.assertEquals("blue", ((AToothpick) l.get("a").getPayload()).getColor()); + } + gossipService.shutdown(); + } + { //recreate the service and see that the data is read back in + GossipService gossipService = new GossipService("a", + new URI("udp://" + "127.0.0.1" + ":" + (29000 + 1)), nodeId, new HashMap(), + Arrays.asList(), settings, (a, b) -> { }, new MetricRegistry()); + gossipService.start(); + Assert.assertEquals("red", ((AToothpick) gossipService.findPerNodeData(nodeId, "a").getPayload()).getColor()); + Assert.assertEquals("blue", ((AToothpick) gossipService.findSharedData("a").getPayload()).getColor()); + File f = gossipService.getGossipManager().getUserDataState().computeSharedTarget(); + File g = gossipService.getGossipManager().getUserDataState().computePerNodeTarget(); + gossipService.shutdown(); + f.delete(); + g.delete(); + } + } + + public GossipDataMessage getToothpick(){ + AToothpick a = new AToothpick(); + a.setColor("red"); + GossipDataMessage d = new GossipDataMessage(); + d.setExpireAt(Long.MAX_VALUE); + d.setKey("a"); + d.setPayload(a); + d.setTimestamp(System.currentTimeMillis()); + return d; + } + + public SharedGossipDataMessage getAnotherToothpick(){ + AToothpick a = new AToothpick(); + a.setColor("blue"); + SharedGossipDataMessage d = new SharedGossipDataMessage(); + d.setExpireAt(Long.MAX_VALUE); + d.setKey("a"); + d.setPayload(a); + d.setTimestamp(System.currentTimeMillis()); + return d; + } + + public static class AToothpick { + private String color; + public AToothpick(){ + + } + public String getColor() { + return color; + } + public void setColor(String color) { + this.color = color; + } + + } +}