Skip to content

Commit

Permalink
Add prefix support to hashed prefix & infix path types on remote store (
Browse files Browse the repository at this point in the history
opensearch-project#15557)

* Add prefix support to hashed prefix & infix path types on remote store

Signed-off-by: Ashish Singh <[email protected]>
  • Loading branch information
ashking94 authored Sep 3, 2024
1 parent 0512e9e commit 33be5a9
Show file tree
Hide file tree
Showing 37 changed files with 617 additions and 101 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add support to upload snapshot shard blobs with hashed prefix ([#15426](https://github.com/opensearch-project/OpenSearch/pull/15426))
- [Remote Publication] Add remote download stats ([#15291](https://github.com/opensearch-project/OpenSearch/pull/15291)))
- Add support for comma-separated list of index names to be used with Snapshot Status API ([#15409](https://github.com/opensearch-project/OpenSearch/pull/15409))
- Add prefix support to hashed prefix & infix path types on remote store ([#15557](https://github.com/opensearch-project/OpenSearch/pull/15557))

### Dependencies
- Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.opensearch.core.util.FileSystemUtils;
import org.opensearch.index.remote.RemoteSegmentStats;
import org.opensearch.index.translog.RemoteTranslogStats;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;

Expand Down Expand Up @@ -67,14 +68,16 @@ public void testLocalRecoveryRollingRestartAndNodeFailure() throws Exception {
assertTrue(remoteSegmentStats.getUploadBytesSucceeded() > 0);
}

String segmentsPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_SEGMENTS_PATH_PREFIX.get(getNodeSettings());
assertBusy(() -> {
String shardPath = getShardLevelBlobPath(
client(),
indexName,
new BlobPath(),
String.valueOf(shardRouting.getId()),
SEGMENTS,
DATA
DATA,
segmentsPathFixedPrefix
).buildAsString();
Path segmentDataRepoPath = segmentRepoPath.resolve(shardPath);
List<String> segmentsNFilesInRepo = Arrays.stream(FileSystemUtils.files(segmentDataRepoPath))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -443,13 +443,15 @@ public void testRestoreInSameRemoteStoreEnabledIndex() throws IOException {

void assertRemoteSegmentsAndTranslogUploaded(String idx) throws IOException {
Client client = client();
String path = getShardLevelBlobPath(client, idx, new BlobPath(), "0", TRANSLOG, METADATA).buildAsString();
String translogPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_PATH_PREFIX.get(getNodeSettings());
String segmentsPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_SEGMENTS_PATH_PREFIX.get(getNodeSettings());
String path = getShardLevelBlobPath(client, idx, new BlobPath(), "0", TRANSLOG, METADATA, translogPathFixedPrefix).buildAsString();
Path remoteTranslogMetadataPath = Path.of(remoteRepoPath + "/" + path);
path = getShardLevelBlobPath(client, idx, new BlobPath(), "0", TRANSLOG, DATA).buildAsString();
path = getShardLevelBlobPath(client, idx, new BlobPath(), "0", TRANSLOG, DATA, translogPathFixedPrefix).buildAsString();
Path remoteTranslogDataPath = Path.of(remoteRepoPath + "/" + path);
path = getShardLevelBlobPath(client, idx, new BlobPath(), "0", SEGMENTS, METADATA).buildAsString();
path = getShardLevelBlobPath(client, idx, new BlobPath(), "0", SEGMENTS, METADATA, segmentsPathFixedPrefix).buildAsString();
Path segmentMetadataPath = Path.of(remoteRepoPath + "/" + path);
path = getShardLevelBlobPath(client, idx, new BlobPath(), "0", SEGMENTS, DATA).buildAsString();
path = getShardLevelBlobPath(client, idx, new BlobPath(), "0", SEGMENTS, DATA, segmentsPathFixedPrefix).buildAsString();
Path segmentDataPath = Path.of(remoteRepoPath + "/" + path);

try (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,16 @@ public void testStaleCommitDeletionWithInvokeFlush() throws Exception {
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l, -1));
int numberOfIterations = randomIntBetween(5, 15);
indexData(numberOfIterations, true, INDEX_NAME);
String shardPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", SEGMENTS, METADATA).buildAsString();
String segmentsPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_SEGMENTS_PATH_PREFIX.get(getNodeSettings());
String shardPath = getShardLevelBlobPath(
client(),
INDEX_NAME,
BlobPath.cleanPath(),
"0",
SEGMENTS,
METADATA,
segmentsPathFixedPrefix
).buildAsString();
Path indexPath = Path.of(segmentRepoPath + "/" + shardPath);
;
IndexShard indexShard = getIndexShard(dataNode, INDEX_NAME);
Expand Down Expand Up @@ -236,7 +245,16 @@ public void testStaleCommitDeletionWithoutInvokeFlush() throws Exception {
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l, -1));
int numberOfIterations = randomIntBetween(5, 15);
indexData(numberOfIterations, false, INDEX_NAME);
String shardPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", SEGMENTS, METADATA).buildAsString();
String segmentsPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_SEGMENTS_PATH_PREFIX.get(getNodeSettings());
String shardPath = getShardLevelBlobPath(
client(),
INDEX_NAME,
BlobPath.cleanPath(),
"0",
SEGMENTS,
METADATA,
segmentsPathFixedPrefix
).buildAsString();
Path indexPath = Path.of(segmentRepoPath + "/" + shardPath);
int actualFileCount = getFileCount(indexPath);
// We also allow (numberOfIterations + 1) as index creation also triggers refresh.
Expand All @@ -247,11 +265,19 @@ public void testStaleCommitDeletionWithMinSegmentFiles_3() throws Exception {
Settings.Builder settings = Settings.builder()
.put(RemoteStoreSettings.CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING.getKey(), "3");
internalCluster().startNode(settings);

String segmentsPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_SEGMENTS_PATH_PREFIX.get(getNodeSettings());
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l, -1));
int numberOfIterations = randomIntBetween(5, 15);
indexData(numberOfIterations, true, INDEX_NAME);
String shardPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", SEGMENTS, METADATA).buildAsString();
String shardPath = getShardLevelBlobPath(
client(),
INDEX_NAME,
BlobPath.cleanPath(),
"0",
SEGMENTS,
METADATA,
segmentsPathFixedPrefix
).buildAsString();
Path indexPath = Path.of(segmentRepoPath + "/" + shardPath);
int actualFileCount = getFileCount(indexPath);
// We also allow (numberOfIterations + 1) as index creation also triggers refresh.
Expand All @@ -271,7 +297,16 @@ public void testStaleCommitDeletionWithMinSegmentFiles_Disabled() throws Excepti
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l, -1));
int numberOfIterations = randomIntBetween(12, 18);
indexData(numberOfIterations, true, INDEX_NAME);
String shardPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", SEGMENTS, METADATA).buildAsString();
String segmentsPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_SEGMENTS_PATH_PREFIX.get(getNodeSettings());
String shardPath = getShardLevelBlobPath(
client(),
INDEX_NAME,
BlobPath.cleanPath(),
"0",
SEGMENTS,
METADATA,
segmentsPathFixedPrefix
).buildAsString();
Path indexPath = Path.of(segmentRepoPath + "/" + shardPath);
;
int actualFileCount = getFileCount(indexPath);
Expand Down Expand Up @@ -604,8 +639,10 @@ public void testFallbackToNodeToNodeSegmentCopy() throws Exception {
indexBulk(INDEX_NAME, 50);
flushAndRefresh(INDEX_NAME);

String segmentsPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_SEGMENTS_PATH_PREFIX.get(getNodeSettings());
// 3. Delete data from remote segment store
String shardPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", SEGMENTS, DATA).buildAsString();
String shardPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", SEGMENTS, DATA, segmentsPathFixedPrefix)
.buildAsString();
Path segmentDataPath = Path.of(segmentRepoPath + "/" + shardPath);

try (Stream<Path> files = Files.list(segmentDataPath)) {
Expand Down Expand Up @@ -844,7 +881,16 @@ public void testLocalOnlyTranslogCleanupOnNodeRestart() throws Exception {
.get()
.getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID);

String shardPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", TRANSLOG, METADATA).buildAsString();
String translogPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_PATH_PREFIX.get(getNodeSettings());
String shardPath = getShardLevelBlobPath(
client(),
INDEX_NAME,
BlobPath.cleanPath(),
"0",
TRANSLOG,
METADATA,
translogPathFixedPrefix
).buildAsString();
Path translogMetaDataPath = Path.of(translogRepoPath + "/" + shardPath);

try (Stream<Path> files = Files.list(translogMetaDataPath)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.settings.Settings;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.nio.file.Path;
Expand Down Expand Up @@ -50,7 +51,10 @@ public void testRemoteRefreshRetryOnFailure() throws Exception {

String indexName = response.getShards()[0].getShardRouting().index().getName();
String indexUuid = response.getShards()[0].getShardRouting().index().getUUID();
String shardPath = getShardLevelBlobPath(client(), indexName, new BlobPath(), "0", SEGMENTS, DATA).buildAsString();

String segmentsPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_SEGMENTS_PATH_PREFIX.get(getNodeSettings());
String shardPath = getShardLevelBlobPath(client(), indexName, new BlobPath(), "0", SEGMENTS, DATA, segmentsPathFixedPrefix)
.buildAsString();
Path segmentDataRepoPath = location.resolve(shardPath);
String segmentDataLocalPath = String.format(Locale.ROOT, "%s/indices/%s/0/index", response.getShards()[0].getDataPath(), indexUuid);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,13 +324,15 @@ public void testRemoteStoreCleanupForDeletedIndex() throws Exception {

final RepositoriesService repositoriesService = internalCluster().getCurrentClusterManagerNodeInstance(RepositoriesService.class);
final BlobStoreRepository remoteStoreRepository = (BlobStoreRepository) repositoriesService.repository(REMOTE_REPO_NAME);
String segmentsPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_SEGMENTS_PATH_PREFIX.get(getNodeSettings());
BlobPath shardLevelBlobPath = getShardLevelBlobPath(
client(),
remoteStoreEnabledIndexName,
remoteStoreRepository.basePath(),
"0",
SEGMENTS,
LOCK_FILES
LOCK_FILES,
segmentsPathFixedPrefix
);
BlobContainer blobContainer = remoteStoreRepository.blobStore().blobContainer(shardLevelBlobPath);
String[] lockFiles;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.index.store.lockmanager.RemoteStoreLockManagerFactory;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.RepositoryCleanupResult;
Expand Down Expand Up @@ -109,7 +110,8 @@ public TransportCleanupRepositoryAction(
SnapshotsService snapshotsService,
ThreadPool threadPool,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver
IndexNameExpressionResolver indexNameExpressionResolver,
RemoteStoreSettings remoteStoreSettings
) {
super(
CleanupRepositoryAction.NAME,
Expand All @@ -122,7 +124,10 @@ public TransportCleanupRepositoryAction(
);
this.repositoriesService = repositoriesService;
this.snapshotsService = snapshotsService;
this.remoteStoreLockManagerFactory = new RemoteStoreLockManagerFactory(() -> repositoriesService);
this.remoteStoreLockManagerFactory = new RemoteStoreLockManagerFactory(
() -> repositoriesService,
remoteStoreSettings.getSegmentsPathFixedPrefix()
);
// We add a state applier that will remove any dangling repository cleanup actions on cluster-manager failover.
// This is safe to do since cleanups will increment the repository state id before executing any operations to prevent concurrent
// operations from corrupting the repository. This is the same safety mechanism used by snapshot deletes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -766,6 +766,8 @@ public void apply(Settings value, Settings current, Settings previous) {
RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_SCHEDULER_INTERVAL,
RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_LOOKBACK_INTERVAL,
RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED,
RemoteStoreSettings.CLUSTER_REMOTE_STORE_SEGMENTS_PATH_PREFIX,
RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_PATH_PREFIX,

SearchService.CLUSTER_ALLOW_DERIVED_FIELD_SETTING,

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.opensearch.index.remote.RemoteStoreEnums.PathType;
import org.opensearch.index.remote.RemoteStorePathStrategy.PathInput;
import org.opensearch.index.remote.RemoteStorePathStrategy.ShardDataPathInput;
import org.opensearch.indices.RemoteStoreSettings;

import java.io.IOException;
import java.util.Collections;
Expand Down Expand Up @@ -68,6 +69,7 @@ public class RemoteIndexPath implements ToXContentFragment {
private final Iterable<String> basePath;
private final PathType pathType;
private final PathHashAlgorithm pathHashAlgorithm;
private final RemoteStoreSettings remoteStoreSettings;

/**
* This keeps the map of paths that would be present in the content of the index path file. For eg - It is possible
Expand All @@ -82,7 +84,8 @@ public RemoteIndexPath(
Iterable<String> basePath,
PathType pathType,
PathHashAlgorithm pathHashAlgorithm,
Map<DataCategory, List<DataType>> pathCreationMap
Map<DataCategory, List<DataType>> pathCreationMap,
RemoteStoreSettings remoteStoreSettings
) {
if (Objects.isNull(pathCreationMap)
|| Objects.isNull(pathType)
Expand Down Expand Up @@ -119,6 +122,7 @@ public RemoteIndexPath(
this.pathType = pathType;
this.pathHashAlgorithm = pathHashAlgorithm;
this.pathCreationMap = pathCreationMap;
this.remoteStoreSettings = remoteStoreSettings;
}

@Override
Expand Down Expand Up @@ -148,6 +152,11 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
.shardId(Integer.toString(shardNo))
.dataCategory(dataCategory)
.dataType(type)
.fixedPrefix(
dataCategory == TRANSLOG
? remoteStoreSettings.getTranslogPathFixedPrefix()
: remoteStoreSettings.getSegmentsPathFixedPrefix()
)
.build();
builder.value(pathType.path(pathInput, pathHashAlgorithm).buildAsString());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.opensearch.gateway.remote.IndexMetadataUploadListener;
import org.opensearch.gateway.remote.RemoteStateTransferException;
import org.opensearch.index.remote.RemoteStoreEnums.PathType;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.node.Node;
import org.opensearch.node.remotestore.RemoteStoreNodeAttribute;
import org.opensearch.repositories.RepositoriesService;
Expand Down Expand Up @@ -79,6 +80,7 @@ public class RemoteIndexPathUploader extends IndexMetadataUploadListener {
private final Settings settings;
private final boolean isRemoteDataAttributePresent;
private final boolean isTranslogSegmentRepoSame;
private final RemoteStoreSettings remoteStoreSettings;
private final Supplier<RepositoriesService> repositoriesService;
private volatile TimeValue metadataUploadTimeout;

Expand All @@ -89,7 +91,8 @@ public RemoteIndexPathUploader(
ThreadPool threadPool,
Settings settings,
Supplier<RepositoriesService> repositoriesService,
ClusterSettings clusterSettings
ClusterSettings clusterSettings,
RemoteStoreSettings remoteStoreSettings
) {
super(threadPool, ThreadPool.Names.GENERIC);
this.settings = Objects.requireNonNull(settings);
Expand All @@ -100,6 +103,7 @@ public RemoteIndexPathUploader(
Objects.requireNonNull(clusterSettings);
metadataUploadTimeout = clusterSettings.get(GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING);
clusterSettings.addSettingsUpdateConsumer(GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING, this::setMetadataUploadTimeout);
this.remoteStoreSettings = remoteStoreSettings;
}

@Override
Expand Down Expand Up @@ -208,7 +212,8 @@ private void writePathToRemoteStore(
basePath,
pathType,
hashAlgorithm,
pathCreationMap
pathCreationMap,
remoteStoreSettings
);
String fileName = generateFileName(indexUUID, idxMD.getVersion(), remoteIndexPath.getVersion());
REMOTE_INDEX_PATH_FORMAT.writeAsyncWithUrgentPriority(remoteIndexPath, blobContainer, fileName, actionListener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.hash.FNV1a;
import org.opensearch.core.common.Strings;
import org.opensearch.index.remote.RemoteStorePathStrategy.PathInput;

import java.util.HashMap;
Expand Down Expand Up @@ -107,7 +108,11 @@ boolean requiresHashAlgorithm() {
@Override
public BlobPath generatePath(PathInput pathInput, PathHashAlgorithm hashAlgorithm) {
assert Objects.nonNull(hashAlgorithm) : "hashAlgorithm is expected to be non-null";
return BlobPath.cleanPath().add(hashAlgorithm.hash(pathInput)).add(pathInput.basePath()).add(pathInput.fixedSubPath());
String fixedPrefix = pathInput.fixedPrefix();
return BlobPath.cleanPath()
.add(Strings.isNullOrEmpty(fixedPrefix) ? hashAlgorithm.hash(pathInput) : fixedPrefix + hashAlgorithm.hash(pathInput))
.add(pathInput.basePath())
.add(pathInput.fixedSubPath());
}

@Override
Expand All @@ -119,7 +124,10 @@ boolean requiresHashAlgorithm() {
@Override
public BlobPath generatePath(PathInput pathInput, PathHashAlgorithm hashAlgorithm) {
assert Objects.nonNull(hashAlgorithm) : "hashAlgorithm is expected to be non-null";
return pathInput.basePath().add(hashAlgorithm.hash(pathInput)).add(pathInput.fixedSubPath());
String fixedPrefix = pathInput.fixedPrefix();
return pathInput.basePath()
.add(Strings.isNullOrEmpty(fixedPrefix) ? hashAlgorithm.hash(pathInput) : fixedPrefix + hashAlgorithm.hash(pathInput))
.add(pathInput.fixedSubPath());
}

@Override
Expand Down
Loading

0 comments on commit 33be5a9

Please sign in to comment.