From 8d819de1f126015739367c6fb5161e9b33dd7163 Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Mon, 11 Nov 2024 22:43:07 +0530 Subject: [PATCH] Fix stale index deletion in snapshots for hashed prefix path type Signed-off-by: Ashish Singh --- .../snapshots/DeleteSnapshotIT.java | 2 +- .../blobstore/BlobStoreRepository.java | 65 ++++++++++++------- .../snapshots/SnapshotShardPaths.java | 10 +-- .../test/OpenSearchIntegTestCase.java | 2 +- 4 files changed, 50 insertions(+), 29 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/DeleteSnapshotIT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/DeleteSnapshotIT.java index 25f7ff08535ad..85d81761ea4a0 100644 --- a/server/src/internalClusterTest/java/org/opensearch/snapshots/DeleteSnapshotIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/DeleteSnapshotIT.java @@ -45,7 +45,7 @@ public class DeleteSnapshotIT extends AbstractSnapshotIntegTestCase { private static final String REMOTE_REPO_NAME = "remote-store-repo-name"; - public void testShardBlobDeletionForHashedPrefixPathType() throws Exception { + public void testStaleIndexDeletion() throws Exception { String indexName1 = ".testindex1"; String repoName = "test-restore-snapshot-repo"; String snapshotName1 = "test-restore-snapshot1"; diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index 243d0021fac2e..21802e26072d1 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -2307,11 +2307,23 @@ private List findMatchingShardPaths(String indexId, Map findHighestGenerationShardPaths(List matchingShardPaths) { - return matchingShardPaths.stream() - .map(s -> s.split("\\" + SnapshotShardPaths.DELIMITER)) - .sorted((a, b) -> Integer.parseInt(b[2]) - Integer.parseInt(a[2])) - .map(parts -> String.join(SnapshotShardPaths.DELIMITER, parts)) - .findFirst(); + if (matchingShardPaths.isEmpty()) { + return Optional.empty(); + } + + int maxGen = Integer.MIN_VALUE; + String maxGenShardPath = null; + + for (String shardPath : matchingShardPaths) { + String[] parts = shardPath.split("\\" + SnapshotShardPaths.DELIMITER); + int shardCount = Integer.parseInt(parts[parts.length - 3]); + if (shardCount > maxGen) { + maxGen = shardCount; + maxGenShardPath = shardPath; + } + } + assert maxGenShardPath != null : "Valid maxGenShardPath should be present"; + return Optional.of(maxGenShardPath); } /** @@ -2549,25 +2561,32 @@ public void finalizeSnapshot( * on account of new indexes by same index name being snapshotted that exists already in the repository's snapshots. */ private void cleanupRedundantSnapshotShardPaths(Set updatedShardPathsIndexIds) { - Set updatedIndexIds = updatedShardPathsIndexIds.stream() - .map(s -> getIndexId(s.split("\\" + SnapshotShardPaths.DELIMITER)[0])) - .collect(Collectors.toSet()); - Set indexIdShardPaths = getSnapshotShardPaths().keySet(); - List staleShardPaths = indexIdShardPaths.stream().filter(s -> updatedShardPathsIndexIds.contains(s) == false).filter(s -> { - String indexId = getIndexId(s.split("\\" + SnapshotShardPaths.DELIMITER)[0]); - return updatedIndexIds.contains(indexId); - }).collect(Collectors.toList()); try { - deleteFromContainer(snapshotShardPathBlobContainer(), staleShardPaths); - } catch (IOException e) { - logger.warn( - new ParameterizedMessage( - "Repository [{}] Exception during snapshot stale index deletion {}", - metadata.name(), - staleShardPaths - ), - e - ); + Set updatedIndexIds = updatedShardPathsIndexIds.stream() + .map(s -> getIndexId(s.split("\\" + SnapshotShardPaths.DELIMITER)[0])) + .collect(Collectors.toSet()); + Set indexIdShardPaths = getSnapshotShardPaths().keySet(); + List staleShardPaths = indexIdShardPaths.stream() + .filter(s -> updatedShardPathsIndexIds.contains(s) == false) + .filter(s -> { + String indexId = getIndexId(s.split("\\" + SnapshotShardPaths.DELIMITER)[0]); + return updatedIndexIds.contains(indexId); + }) + .collect(Collectors.toList()); + try { + deleteFromContainer(snapshotShardPathBlobContainer(), staleShardPaths); + } catch (IOException e) { + logger.warn( + new ParameterizedMessage( + "Repository [{}] Exception during snapshot stale index deletion {}", + metadata.name(), + staleShardPaths + ), + e + ); + } + } catch (Exception ex) { + logger.error("Exception during cleanup of redundant snapshot shard paths", ex); } } diff --git a/server/src/main/java/org/opensearch/snapshots/SnapshotShardPaths.java b/server/src/main/java/org/opensearch/snapshots/SnapshotShardPaths.java index 878c2baba4ce9..5a7fc943c7023 100644 --- a/server/src/main/java/org/opensearch/snapshots/SnapshotShardPaths.java +++ b/server/src/main/java/org/opensearch/snapshots/SnapshotShardPaths.java @@ -92,18 +92,20 @@ public static SnapshotShardPaths fromXContent(XContentParser ignored) { * Parses a shard path string and extracts relevant shard information. * * @param shardPath The shard path string to parse. Expected format is: - * [index_id]#[index_name]#[shard_count]#[path_type_code]#[path_hash_algorithm_code] + * [index_id].[index_name].[shard_count].[path_type_code].[path_hash_algorithm_code] * @return A {@link ShardInfo} object containing the parsed index ID and shard count. * @throws IllegalArgumentException if the shard path format is invalid or cannot be parsed. */ public static ShardInfo parseShardPath(String shardPath) { String[] parts = shardPath.split("\\" + SnapshotShardPaths.DELIMITER); - if (parts.length != 5) { + int len = parts.length; + if (len < 5) { throw new IllegalArgumentException("Invalid shard path format: " + shardPath); } try { - IndexId indexId = new IndexId(parts[1], getIndexId(parts[0]), Integer.parseInt(parts[3])); - int shardCount = Integer.parseInt(parts[2]); + // indexName is never used from within the ShardInfo returned + IndexId indexId = new IndexId("", getIndexId(parts[0]), Integer.parseInt(parts[len - 2])); + int shardCount = Integer.parseInt(parts[len - 3]); return new ShardInfo(indexId, shardCount); } catch (NumberFormatException e) { throw new IllegalArgumentException("Invalid shard path format: " + shardPath, e); diff --git a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java index bcfd24cbe618e..e27ff311c06f6 100644 --- a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java +++ b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java @@ -2658,7 +2658,7 @@ public static PutRepositoryRequestBuilder putRepositoryRequestBuilder( builder.setTimeout(timeout); } if (finalSettings == false) { - settings.put(BlobStoreRepository.SHARD_PATH_TYPE.getKey(), PathType.HASHED_PREFIX.name()); + settings.put(BlobStoreRepository.SHARD_PATH_TYPE.getKey(), randomFrom(PathType.values())); } builder.setSettings(settings); return builder;