Skip to content

Commit

Permalink
Fix stale index deletion in snapshots for hashed prefix path type
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <[email protected]>
  • Loading branch information
ashking94 committed Nov 11, 2024
1 parent 706710c commit 8d819de
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class DeleteSnapshotIT extends AbstractSnapshotIntegTestCase {

private static final String REMOTE_REPO_NAME = "remote-store-repo-name";

public void testShardBlobDeletionForHashedPrefixPathType() throws Exception {
public void testStaleIndexDeletion() throws Exception {
String indexName1 = ".testindex1";
String repoName = "test-restore-snapshot-repo";
String snapshotName1 = "test-restore-snapshot1";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2307,11 +2307,23 @@ private List<String> findMatchingShardPaths(String indexId, Map<String, BlobMeta
* @return An Optional containing the shard path with the highest generation number, or empty if the list is empty
*/
private Optional<String> findHighestGenerationShardPaths(List<String> matchingShardPaths) {
return matchingShardPaths.stream()
.map(s -> s.split("\\" + SnapshotShardPaths.DELIMITER))
.sorted((a, b) -> Integer.parseInt(b[2]) - Integer.parseInt(a[2]))
.map(parts -> String.join(SnapshotShardPaths.DELIMITER, parts))
.findFirst();
if (matchingShardPaths.isEmpty()) {
return Optional.empty();
}

int maxGen = Integer.MIN_VALUE;
String maxGenShardPath = null;

for (String shardPath : matchingShardPaths) {
String[] parts = shardPath.split("\\" + SnapshotShardPaths.DELIMITER);
int shardCount = Integer.parseInt(parts[parts.length - 3]);
if (shardCount > maxGen) {
maxGen = shardCount;
maxGenShardPath = shardPath;
}
}
assert maxGenShardPath != null : "Valid maxGenShardPath should be present";
return Optional.of(maxGenShardPath);
}

/**
Expand Down Expand Up @@ -2549,25 +2561,32 @@ public void finalizeSnapshot(
* on account of new indexes by same index name being snapshotted that exists already in the repository's snapshots.
*/
private void cleanupRedundantSnapshotShardPaths(Set<String> updatedShardPathsIndexIds) {
Set<String> updatedIndexIds = updatedShardPathsIndexIds.stream()
.map(s -> getIndexId(s.split("\\" + SnapshotShardPaths.DELIMITER)[0]))
.collect(Collectors.toSet());
Set<String> indexIdShardPaths = getSnapshotShardPaths().keySet();
List<String> staleShardPaths = indexIdShardPaths.stream().filter(s -> updatedShardPathsIndexIds.contains(s) == false).filter(s -> {
String indexId = getIndexId(s.split("\\" + SnapshotShardPaths.DELIMITER)[0]);
return updatedIndexIds.contains(indexId);
}).collect(Collectors.toList());
try {
deleteFromContainer(snapshotShardPathBlobContainer(), staleShardPaths);
} catch (IOException e) {
logger.warn(
new ParameterizedMessage(
"Repository [{}] Exception during snapshot stale index deletion {}",
metadata.name(),
staleShardPaths
),
e
);
Set<String> updatedIndexIds = updatedShardPathsIndexIds.stream()
.map(s -> getIndexId(s.split("\\" + SnapshotShardPaths.DELIMITER)[0]))
.collect(Collectors.toSet());
Set<String> indexIdShardPaths = getSnapshotShardPaths().keySet();
List<String> staleShardPaths = indexIdShardPaths.stream()
.filter(s -> updatedShardPathsIndexIds.contains(s) == false)
.filter(s -> {
String indexId = getIndexId(s.split("\\" + SnapshotShardPaths.DELIMITER)[0]);
return updatedIndexIds.contains(indexId);
})
.collect(Collectors.toList());
try {
deleteFromContainer(snapshotShardPathBlobContainer(), staleShardPaths);
} catch (IOException e) {
logger.warn(
new ParameterizedMessage(
"Repository [{}] Exception during snapshot stale index deletion {}",
metadata.name(),
staleShardPaths
),
e
);
}
} catch (Exception ex) {
logger.error("Exception during cleanup of redundant snapshot shard paths", ex);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,18 +92,20 @@ public static SnapshotShardPaths fromXContent(XContentParser ignored) {
* Parses a shard path string and extracts relevant shard information.
*
* @param shardPath The shard path string to parse. Expected format is:
* [index_id]#[index_name]#[shard_count]#[path_type_code]#[path_hash_algorithm_code]
* [index_id].[index_name].[shard_count].[path_type_code].[path_hash_algorithm_code]
* @return A {@link ShardInfo} object containing the parsed index ID and shard count.
* @throws IllegalArgumentException if the shard path format is invalid or cannot be parsed.
*/
public static ShardInfo parseShardPath(String shardPath) {
String[] parts = shardPath.split("\\" + SnapshotShardPaths.DELIMITER);
if (parts.length != 5) {
int len = parts.length;
if (len < 5) {
throw new IllegalArgumentException("Invalid shard path format: " + shardPath);
}
try {
IndexId indexId = new IndexId(parts[1], getIndexId(parts[0]), Integer.parseInt(parts[3]));
int shardCount = Integer.parseInt(parts[2]);
// indexName is never used from within the ShardInfo returned
IndexId indexId = new IndexId("", getIndexId(parts[0]), Integer.parseInt(parts[len - 2]));
int shardCount = Integer.parseInt(parts[len - 3]);
return new ShardInfo(indexId, shardCount);
} catch (NumberFormatException e) {
throw new IllegalArgumentException("Invalid shard path format: " + shardPath, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2658,7 +2658,7 @@ public static PutRepositoryRequestBuilder putRepositoryRequestBuilder(
builder.setTimeout(timeout);
}
if (finalSettings == false) {
settings.put(BlobStoreRepository.SHARD_PATH_TYPE.getKey(), PathType.HASHED_PREFIX.name());
settings.put(BlobStoreRepository.SHARD_PATH_TYPE.getKey(), randomFrom(PathType.values()));
}
builder.setSettings(settings);
return builder;
Expand Down

0 comments on commit 8d819de

Please sign in to comment.