Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support to upload snapshot shard blobs with hashed prefix #15426

Merged
merged 14 commits into from
Sep 1, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add allowlist setting for ingest-geoip and ingest-useragent ([#15325](https://github.com/opensearch-project/OpenSearch/pull/15325))
- Adding access to noSubMatches and noOverlappingMatches in Hyphenation ([#13895](https://github.com/opensearch-project/OpenSearch/pull/13895))
- Add support for index level max slice count setting for concurrent segment search ([#15336](https://github.com/opensearch-project/OpenSearch/pull/15336))
- Add support to upload snapshot shard blobs with hashed prefix ([#15426](https://github.com/opensearch-project/OpenSearch/pull/15426))

### Dependencies
- Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,8 @@
.build();
}

IndexId indexId = new IndexId(indexName, updatedIndexMetadata.getIndexUUID());
// This instance of IndexId is not related to Snapshot Restore. Hence, we are using the ctor without pathType.
IndexId indexId = new IndexId(indexName, updatedIndexMetadata.getIndexUUID(), IndexId.DEFAULT_SHARD_PATH_TYPE);

Check warning on line 231 in server/src/main/java/org/opensearch/index/recovery/RemoteStoreRestoreService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/recovery/RemoteStoreRestoreService.java#L231

Added line #L231 was not covered by tests

if (metadataFromRemoteStore == false) {
Map<ShardId, IndexShardRoutingTable> indexShardRoutingTableMap = currentState.routingTable()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ public class RemoteIndexPathUploader extends IndexMetadataUploadListener {

public static final String DELIMITER = "#";
public static final ConfigBlobStoreFormat<RemoteIndexPath> REMOTE_INDEX_PATH_FORMAT = new ConfigBlobStoreFormat<>(
RemoteIndexPath.FILE_NAME_FORMAT
RemoteIndexPath.FILE_NAME_FORMAT,
RemoteIndexPath::fromXContent
);

private static final String TIMEOUT_EXCEPTION_MSG = "Timed out waiting while uploading remote index path file for indexes=%s";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ public enum PathHashAlgorithm {
@Override
String hash(PathInput pathInput) {
StringBuilder input = new StringBuilder();
for (String path : pathInput.fixedSubPath().toArray()) {
for (String path : pathInput.hashPath().toArray()) {
input.append(path);
}
long hash = FNV1a.hash64(input.toString());
Expand All @@ -222,7 +222,7 @@ String hash(PathInput pathInput) {
@Override
String hash(PathInput pathInput) {
StringBuilder input = new StringBuilder();
for (String path : pathInput.fixedSubPath().toArray()) {
for (String path : pathInput.hashPath().toArray()) {
input.append(path);
}
long hash = FNV1a.hash64(input.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.opensearch.index.remote.RemoteStoreEnums.DataType;
import org.opensearch.index.remote.RemoteStoreEnums.PathHashAlgorithm;
import org.opensearch.index.remote.RemoteStoreEnums.PathType;
import org.opensearch.repositories.blobstore.BlobStoreRepository;

import java.util.Objects;

Expand Down Expand Up @@ -100,6 +101,10 @@
return BlobPath.cleanPath().add(indexUUID);
}

BlobPath hashPath() {
return fixedSubPath();
}

/**
* Returns a new builder for {@link PathInput}.
*/
Expand Down Expand Up @@ -127,7 +132,7 @@
return self();
}

public Builder indexUUID(String indexUUID) {
public T indexUUID(String indexUUID) {
this.indexUUID = indexUUID;
return self();
}
Expand All @@ -142,6 +147,61 @@
}
}

/**
* A subclass of {@link PathInput} that represents the input required to generate a path
* for a shard in a snapshot. It includes the base path, index UUID, and shard ID.
*
* @opensearch.internal
*/
public static class SnapshotShardPathInput extends PathInput {
private final String shardId;

public SnapshotShardPathInput(SnapshotShardPathInput.Builder builder) {
super(builder);
this.shardId = Objects.requireNonNull(builder.shardId);
}

@Override
BlobPath fixedSubPath() {
return BlobPath.cleanPath().add(BlobStoreRepository.INDICES_DIR).add(super.fixedSubPath()).add(shardId);
}

@Override
BlobPath hashPath() {
return BlobPath.cleanPath().add(shardId).add(indexUUID());

Check warning on line 171 in server/src/main/java/org/opensearch/index/remote/RemoteStorePathStrategy.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/remote/RemoteStorePathStrategy.java#L171

Added line #L171 was not covered by tests
}

/**
* Returns a new builder for {@link SnapshotShardPathInput}.
*/
public static SnapshotShardPathInput.Builder builder() {
return new SnapshotShardPathInput.Builder();

Check warning on line 178 in server/src/main/java/org/opensearch/index/remote/RemoteStorePathStrategy.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/remote/RemoteStorePathStrategy.java#L178

Added line #L178 was not covered by tests
}

/**
* Builder for {@link SnapshotShardPathInput}.
*
* @opensearch.internal
*/
public static class Builder extends PathInput.Builder<SnapshotShardPathInput.Builder> {
private String shardId;

public SnapshotShardPathInput.Builder shardId(String shardId) {
this.shardId = shardId;
return this;
}

@Override
protected SnapshotShardPathInput.Builder self() {
return this;
}

public SnapshotShardPathInput build() {
return new SnapshotShardPathInput(this);
}
}
}

/**
* Wrapper class for the data aware path input required to generate path for remote store uploads. This input is
* composed of the parent inputs, shard id, data category and data type.
Expand Down Expand Up @@ -204,16 +264,6 @@
private DataCategory dataCategory;
private DataType dataType;

public Builder basePath(BlobPath basePath) {
ashking94 marked this conversation as resolved.
Show resolved Hide resolved
super.basePath = basePath;
return this;
}

public Builder indexUUID(String indexUUID) {
super.indexUUID = indexUUID;
return this;
}

public Builder shardId(String shardId) {
this.shardId = shardId;
return this;
Expand Down
38 changes: 32 additions & 6 deletions server/src/main/java/org/opensearch/repositories/IndexId.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.repositories;

import org.opensearch.Version;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
Expand All @@ -40,6 +41,7 @@
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.index.remote.RemoteStoreEnums;

import java.io.IOException;
import java.util.Objects;
Expand All @@ -51,23 +53,36 @@
*/
@PublicApi(since = "1.0.0")
public final class IndexId implements Writeable, ToXContentObject {
protected static final String NAME = "name";
protected static final String ID = "id";
static final String NAME = "name";
static final String ID = "id";
static final String SHARD_PATH_TYPE = "shard_path_type";
public static final int DEFAULT_SHARD_PATH_TYPE = RemoteStoreEnums.PathType.FIXED.getCode();

private final String name;
private final String id;
private final int shardPathType;
ashking94 marked this conversation as resolved.
Show resolved Hide resolved
private final int hashCode;

// Used for testing only
public IndexId(final String name, final String id) {
this(name, id, DEFAULT_SHARD_PATH_TYPE);
}

public IndexId(String name, String id, int shardPathType) {
this.name = name;
this.id = id;
this.shardPathType = shardPathType;
this.hashCode = computeHashCode();

}

public IndexId(final StreamInput in) throws IOException {
this.name = in.readString();
this.id = in.readString();
if (in.getVersion().onOrAfter(Version.CURRENT)) {
ashking94 marked this conversation as resolved.
Show resolved Hide resolved
this.shardPathType = in.readVInt();
} else {
this.shardPathType = DEFAULT_SHARD_PATH_TYPE;

Check warning on line 84 in server/src/main/java/org/opensearch/repositories/IndexId.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/IndexId.java#L84

Added line #L84 was not covered by tests
}
this.hashCode = computeHashCode();
}

Expand All @@ -93,9 +108,16 @@
return id;
}

/**
* The storage path type in remote store for the indexes having the underlying index ids.
*/
public int getShardPathType() {
return shardPathType;
}

@Override
public String toString() {
return "[" + name + "/" + id + "]";
return "[" + name + "/" + id + "/" + shardPathType + "]";
}

@Override
Expand All @@ -107,7 +129,7 @@
return false;
}
IndexId that = (IndexId) o;
return Objects.equals(name, that.name) && Objects.equals(id, that.id);
return Objects.equals(name, that.name) && Objects.equals(id, that.id) && Objects.equals(this.shardPathType, that.shardPathType);
}

@Override
Expand All @@ -116,20 +138,24 @@
}

private int computeHashCode() {
return Objects.hash(name, id);
return Objects.hash(name, id, shardPathType);
}

@Override
public void writeTo(final StreamOutput out) throws IOException {
out.writeString(name);
out.writeString(id);
if (out.getVersion().onOrAfter(Version.CURRENT)) {
out.writeVInt(shardPathType);
}
ashking94 marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startObject();
builder.field(NAME, name);
builder.field(ID, id);
builder.field(SHARD_PATH_TYPE, shardPathType);
builder.endObject();
return builder;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -517,15 +517,15 @@ public List<IndexId> resolveIndices(final List<String> indices) {
* @param indicesToResolve names of indices to resolve
* @param inFlightIds name to index mapping for currently in-flight snapshots not yet in the repository data to fall back to
*/
public List<IndexId> resolveNewIndices(List<String> indicesToResolve, Map<String, IndexId> inFlightIds) {
public List<IndexId> resolveNewIndices(List<String> indicesToResolve, Map<String, IndexId> inFlightIds, int pathType) {
List<IndexId> snapshotIndices = new ArrayList<>();
for (String index : indicesToResolve) {
IndexId indexId = indices.get(index);
if (indexId == null) {
indexId = inFlightIds.get(index);
}
if (indexId == null) {
indexId = new IndexId(index, UUIDs.randomBase64UUID());
indexId = new IndexId(index, UUIDs.randomBase64UUID(), pathType);
}
snapshotIndices.add(indexId);
}
Expand All @@ -544,10 +544,16 @@ public List<IndexId> resolveNewIndices(List<String> indicesToResolve, Map<String
private static final String VERSION = "version";
private static final String MIN_VERSION = "min_version";

// Visible for testing only
public XContentBuilder snapshotsToXContent(final XContentBuilder builder, final Version repoMetaVersion) throws IOException {
return snapshotsToXContent(builder, repoMetaVersion, Version.CURRENT);
}

/**
* Writes the snapshots metadata and the related indices metadata to x-content.
*/
public XContentBuilder snapshotsToXContent(final XContentBuilder builder, final Version repoMetaVersion) throws IOException {
public XContentBuilder snapshotsToXContent(final XContentBuilder builder, final Version repoMetaVersion, final Version minNodeVersion)
throws IOException {
builder.startObject();
// write the snapshots list
builder.startArray(SNAPSHOTS);
Expand Down Expand Up @@ -578,6 +584,9 @@ public XContentBuilder snapshotsToXContent(final XContentBuilder builder, final
for (final IndexId indexId : getIndices().values()) {
builder.startObject(indexId.getName());
builder.field(INDEX_ID, indexId.getId());
if (minNodeVersion.onOrAfter(Version.CURRENT)) {
builder.field(IndexId.SHARD_PATH_TYPE, indexId.getShardPathType());
}
builder.startArray(SNAPSHOTS);
List<SnapshotId> snapshotIds = indexSnapshots.get(indexId);
assert snapshotIds != null;
Expand Down Expand Up @@ -765,14 +774,20 @@ private static void parseIndices(
final List<SnapshotId> snapshotIds = new ArrayList<>();
final List<String> gens = new ArrayList<>();

String id = null;
int pathType = IndexId.DEFAULT_SHARD_PATH_TYPE;
IndexId indexId = null;

XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
final String indexMetaFieldName = parser.currentName();
final XContentParser.Token currentToken = parser.nextToken();
switch (indexMetaFieldName) {
case INDEX_ID:
indexId = new IndexId(indexName, parser.text());
id = parser.text();
break;
case IndexId.SHARD_PATH_TYPE:
pathType = parser.intValue();
break;
case SNAPSHOTS:
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_ARRAY, currentToken, parser);
Expand All @@ -795,7 +810,7 @@ private static void parseIndices(
// different versions create or delete snapshot in the same repository.
throw new OpenSearchParseException(
"Detected a corrupted repository, index "
+ indexId
+ new IndexId(indexName, id, pathType)
+ " references an unknown snapshot uuid ["
+ uuid
+ "]"
Expand All @@ -812,9 +827,10 @@ private static void parseIndices(
break;
}
}
assert indexId != null;
assert id != null;
indexId = new IndexId(indexName, id, pathType);
indexSnapshots.put(indexId, Collections.unmodifiableList(snapshotIds));
indexLookup.put(indexId.getId(), indexId);
indexLookup.put(id, indexId);
for (int i = 0; i < gens.size(); i++) {
String parsedGen = gens.get(i);
if (parsedGen != null) {
Expand Down
Loading
Loading