Skip to content

Commit

Permalink
Snapshot temporary directory support configurable (#933)
Browse files Browse the repository at this point in the history
  • Loading branch information
qiujiayu committed Mar 15, 2023
1 parent 813fddf commit ba66daa
Show file tree
Hide file tree
Showing 9 changed files with 67 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,10 @@ public interface JRaftServiceFactory {

/**
* Creates a raft snapshot storage
* @param uri The snapshot storage uri from {@link NodeOptions#getSnapshotUri()}
* @param raftOptions the raft options.
* @param nodeOptions the node options.
* @return storage to store state machine snapshot.
*/
SnapshotStorage createSnapshotStorage(final String uri, final RaftOptions raftOptions);
SnapshotStorage createSnapshotStorage(final NodeOptions nodeOptions);

/**
* Creates a raft meta storage.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package com.alipay.sofa.jraft.core;

import com.alipay.sofa.jraft.option.NodeOptions;
import org.apache.commons.lang.StringUtils;

import com.alipay.sofa.jraft.JRaftServiceFactory;
Expand Down Expand Up @@ -51,9 +52,11 @@ public LogStorage createLogStorage(final String uri, final RaftOptions raftOptio
}

@Override
public SnapshotStorage createSnapshotStorage(final String uri, final RaftOptions raftOptions) {
public SnapshotStorage createSnapshotStorage(final NodeOptions nodeOptions) {
String uri = nodeOptions.getSnapshotUri();
String tempUri = nodeOptions.getSnapshotTempUri();
Requires.requireTrue(!StringUtils.isBlank(uri), "Blank snapshot storage uri.");
return new LocalSnapshotStorage(uri, raftOptions);
return new LocalSnapshotStorage(uri, tempUri, nodeOptions.getRaftOptions());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,6 @@ private boolean initSnapshotStorage() {
}
this.snapshotExecutor = new SnapshotExecutorImpl();
final SnapshotExecutorOptions opts = new SnapshotExecutorOptions();
opts.setUri(this.options.getSnapshotUri());
opts.setFsmCaller(this.fsmCaller);
opts.setNode(this);
opts.setLogManager(this.logManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ public class NodeOptions extends RpcOptions implements Copiable<NodeOptions> {
// Describe a specific SnapshotStorage in format ${type}://${parameters}
private String snapshotUri;

private String snapshotTempUri;

// If enable, we will filter duplicate files before copy remote snapshot,
// to avoid useless transmission. Two files in local and remote are duplicate,
// only if they has the same filename and the same checksum (stored in file meta).
Expand Down Expand Up @@ -366,6 +368,14 @@ public void setSnapshotUri(final String snapshotUri) {
this.snapshotUri = snapshotUri;
}

public String getSnapshotTempUri() {
return snapshotTempUri;
}

public void setSnapshotTempUri(String snapshotTempUri) {
this.snapshotTempUri = snapshotTempUri;
}

public boolean isFilterBeforeCopyRemote() {
return this.filterBeforeCopyRemote;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@
*/
public class SnapshotExecutorOptions {

// URI of SnapshotStorage
private String uri;
private FSMCaller fsmCaller;
private NodeImpl node;
private LogManager logManager;
Expand All @@ -49,14 +47,6 @@ public void setSnapshotThrottle(SnapshotThrottle snapshotThrottle) {
this.snapshotThrottle = snapshotThrottle;
}

public String getUri() {
return this.uri;
}

public void setUri(String uri) {
this.uri = uri;
}

public FSMCaller getFsmCaller() {
return this.fsmCaller;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@
package com.alipay.sofa.jraft.storage.snapshot;

import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import com.alipay.sofa.jraft.option.NodeOptions;
import com.alipay.sofa.jraft.util.ThreadPoolsFactory;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
Expand Down Expand Up @@ -224,16 +226,18 @@ public SnapshotReader start() {

@Override
public boolean init(final SnapshotExecutorOptions opts) {
if (StringUtils.isBlank(opts.getUri())) {
this.node = opts.getNode();
Objects.requireNonNull(this.node, "Node is null.");
NodeOptions nodeOptions = this.node.getOptions();
String snapshotUri = nodeOptions.getSnapshotUri();
if (StringUtils.isBlank(snapshotUri)) {
LOG.error("Snapshot uri is empty.");
return false;
}
this.logManager = opts.getLogManager();
this.fsmCaller = opts.getFsmCaller();
this.node = opts.getNode();
this.term = opts.getInitTerm();
this.snapshotStorage = this.node.getServiceFactory().createSnapshotStorage(opts.getUri(),
this.node.getRaftOptions());
this.snapshotStorage = this.node.getServiceFactory().createSnapshotStorage(nodeOptions);
if (opts.isFilterBeforeCopyRemote()) {
this.snapshotStorage.setFilterBeforeCopyRemote();
}
Expand All @@ -257,7 +261,7 @@ public boolean init(final SnapshotExecutorOptions opts) {
}
this.loadingSnapshotMeta = reader.load();
if (this.loadingSnapshotMeta == null) {
LOG.error("Fail to load meta from {}.", opts.getUri());
LOG.error("Fail to load meta from {}.", snapshotUri);
Utils.closeQuietly(reader);
return false;
}
Expand All @@ -276,7 +280,7 @@ public boolean init(final SnapshotExecutorOptions opts) {
Utils.closeQuietly(reader);
}
if (!done.status.isOk()) {
LOG.error("Fail to load snapshot from {}, FirstSnapshotLoadDone status is {}.", opts.getUri(), done.status);
LOG.error("Fail to load snapshot from {}, FirstSnapshotLoadDone status is {}.", snapshotUri, done.status);
return false;
}
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand All @@ -28,6 +29,7 @@
import java.util.concurrent.locks.ReentrantLock;

import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -48,7 +50,7 @@
* Snapshot storage based on local file storage.
*
* @author boyan (boyan@alibaba-inc.com)
*
* <p>
* 2018-Mar-13 2:11:30 PM
*/
public class LocalSnapshotStorage implements SnapshotStorage {
Expand All @@ -58,6 +60,7 @@ public class LocalSnapshotStorage implements SnapshotStorage {
private static final String TEMP_PATH = "temp";
private final ConcurrentMap<Long, AtomicInteger> refMap = new ConcurrentHashMap<>();
private final String path;
private final String tempPath;
private Endpoint addr;
private boolean filterBeforeCopyRemote;
private long lastSnapshotIndex;
Expand All @@ -78,14 +81,33 @@ public void setServerAddr(Endpoint addr) {
this.addr = addr;
}

public LocalSnapshotStorage(String path, RaftOptions raftOptions) {
public LocalSnapshotStorage(String path, String tempPath, RaftOptions raftOptions) {
super();
this.path = path;
if (StringUtils.isEmpty(tempPath)) {
this.tempPath = buildTempPath(this.path);
} else {
File pathFile = new File(path);
File tempPathFile = new File(tempPath);

String pathAbsolutePath = pathFile.getAbsolutePath();
String tempPathAbsolutePath = tempPathFile.getAbsolutePath();
if (pathAbsolutePath.equals(tempPathAbsolutePath) || pathAbsolutePath.startsWith(tempPathAbsolutePath)) {
this.tempPath = buildTempPath(this.path);
} else {
this.tempPath = tempPath;
}
LOG.info("The snapshot temp path is {}", this.tempPath);
}
this.lastSnapshotIndex = 0;
this.raftOptions = raftOptions;
this.lock = new ReentrantLock();
}

private String buildTempPath(String path) {
return Paths.get(path, TEMP_PATH).toString();
}

public long getLastSnapshotIndex() {
this.lock.lock();
try {
Expand All @@ -108,13 +130,12 @@ public boolean init(final Void v) {

// delete temp snapshot
if (!this.filterBeforeCopyRemote) {
final String tempSnapshotPath = this.path + File.separator + TEMP_PATH;
final File tempFile = new File(tempSnapshotPath);
final File tempFile = new File(this.tempPath);
if (tempFile.exists()) {
try {
FileUtils.forceDelete(tempFile);
} catch (final IOException e) {
LOG.error("Fail to delete temp snapshot path {}.", tempSnapshotPath, e);
LOG.error("Fail to delete temp snapshot path {}.", this.tempPath, e);
return false;
}
}
Expand Down Expand Up @@ -223,7 +244,6 @@ void close(final LocalSnapshotWriter writer, final boolean keepDataOnError) thro
break;
}
// rename temp to new
final String tempPath = this.path + File.separator + TEMP_PATH;
final String newPath = getSnapshotPath(newIndex);

if (!destroySnapshot(newPath)) {
Expand All @@ -232,11 +252,11 @@ void close(final LocalSnapshotWriter writer, final boolean keepDataOnError) thro
ioe = new IOException("Fail to delete new snapshot path: " + newPath);
break;
}
LOG.info("Renaming {} to {}.", tempPath, newPath);
if (!Utils.atomicMoveFile(new File(tempPath), new File(newPath), true)) {
LOG.error("Renamed temp snapshot failed, from path {} to path {}.", tempPath, newPath);
LOG.info("Renaming {} to {}.", this.tempPath, newPath);
if (!Utils.atomicMoveFile(new File(this.tempPath), new File(newPath), true)) {
LOG.error("Renamed temp snapshot failed, from path {} to path {}.", this.tempPath, newPath);
ret = RaftError.EIO.getNumber();
ioe = new IOException("Fail to rename temp snapshot from: " + tempPath + " to: " + newPath);
ioe = new IOException("Fail to rename temp snapshot from: " + this.tempPath + " to: " + newPath);
break;
}
ref(newIndex);
Expand Down Expand Up @@ -283,15 +303,14 @@ public SnapshotWriter create(final boolean fromEmpty) {
LocalSnapshotWriter writer = null;
// noinspection ConstantConditions
do {
final String snapshotPath = this.path + File.separator + TEMP_PATH;
// delete temp
// TODO: Notify watcher before deleting
if (new File(snapshotPath).exists() && fromEmpty) {
if (!destroySnapshot(snapshotPath)) {
if (new File(this.tempPath).exists() && fromEmpty) {
if (!destroySnapshot(this.tempPath)) {
break;
}
}
writer = new LocalSnapshotWriter(snapshotPath, this, this.raftOptions);
writer = new LocalSnapshotWriter(this.tempPath, this, this.raftOptions);
if (!writer.init(null)) {
LOG.error("Fail to init snapshot writer.");
writer = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,19 +118,22 @@ public void setup() throws Exception {
this.uri = "remote://" + this.hostPort + "/" + this.readerId;
this.copyOpts = new CopyOptions();

NodeOptions nodeOptions = new NodeOptions();
nodeOptions.setSnapshotUri(this.path);
Mockito.when(this.node.getGroupId()).thenReturn(GROUP_ID);
Mockito.when(this.node.getRaftOptions()).thenReturn(new RaftOptions());
Mockito.when(this.node.getOptions()).thenReturn(new NodeOptions());
Mockito.when(this.node.getRaftOptions()).thenReturn(this.raftOptions);
Mockito.when(this.node.getOptions()).thenReturn(nodeOptions);
Mockito.when(this.node.getRpcService()).thenReturn(this.raftClientService);
Mockito.when(this.node.getTimerManager()).thenReturn(this.timerManager);
Mockito.when(this.node.getServiceFactory()).thenReturn(DefaultJRaftServiceFactory.newInstance());

this.executor = new SnapshotExecutorImpl();
final SnapshotExecutorOptions opts = new SnapshotExecutorOptions();
opts.setFsmCaller(this.fSMCaller);
opts.setInitTerm(0);
opts.setNode(this.node);
opts.setLogManager(this.logManager);
opts.setUri(this.path);

this.addr = new Endpoint("localhost", 8081);
opts.setAddr(this.addr);
assertTrue(this.executor.init(opts));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public void setup() throws Exception {
.setLastIncludedTerm(1).build());
this.table.saveToFile(snapshotPath + File.separator + Snapshot.JRAFT_SNAPSHOT_META_FILE);

this.snapshotStorage = new LocalSnapshotStorage(path, new RaftOptions());
this.snapshotStorage = new LocalSnapshotStorage(path, null, new RaftOptions());
assertTrue(this.snapshotStorage.init(null));
}

Expand Down

0 comments on commit ba66daa

Please sign in to comment.