Skip to content

Commit

Permalink
Download functionality of global metadata from remote store (opensear…
Browse files Browse the repository at this point in the history
…ch-project#10535) (opensearch-project#10733)

* Download funcationality of global metadata from remote store

(cherry picked from commit 3a36c22)

Signed-off-by: Dhwanil Patel <[email protected]>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
1 parent 5e79291 commit 4a0b3c3
Show file tree
Hide file tree
Showing 5 changed files with 168 additions and 19 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Admission control] Add enhancements to FS stats to include read/write time, queue size and IO time ([#10696](https://github.com/opensearch-project/OpenSearch/pull/10696))
- [Remote cluster state] Change file names for remote cluster state ([#10557](https://github.com/opensearch-project/OpenSearch/pull/10557))
- [Remote cluster state] Upload global metadata in cluster state to remote store([#10404](https://github.com/opensearch-project/OpenSearch/pull/10404))
- [Remote cluster state] Download functionality of global metadata from remote store ([#10535](https://github.com/opensearch-project/OpenSearch/pull/10535))

### Dependencies
- Bumps jetty version to 9.4.52.v20230823 to fix GMS-2023-1857 ([#9822](https://github.com/opensearch-project/OpenSearch/pull/9822))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,10 @@ public void testFullClusterRestoreStaleDelete() throws Exception {

assertEquals(10, repository.blobStore().blobContainer(baseMetadataPath.add("manifest")).listBlobsByPrefix("manifest").size());

Map<String, IndexMetadata> indexMetadataMap = remoteClusterStateService.getLatestIndexMetadata(
Map<String, IndexMetadata> indexMetadataMap = remoteClusterStateService.getLatestMetadata(
cluster().getClusterName(),
getClusterState().metadata().clusterUUID()
);
).getIndices();
assertEquals(0, indexMetadataMap.values().stream().findFirst().get().getNumberOfReplicas());
assertEquals(shardCount, indexMetadataMap.values().stream().findFirst().get().getNumberOfShards());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -662,18 +662,18 @@ private BlobPath getManifestFolderPath(String clusterName, String clusterUUID) {
*
* @param clusterUUID uuid of cluster state to refer to in remote
* @param clusterName name of the cluster
* @param clusterMetadataManifest manifest file of cluster
* @return {@code Map<String, IndexMetadata>} latest IndexUUID to IndexMetadata map
*/
public Map<String, IndexMetadata> getLatestIndexMetadata(String clusterName, String clusterUUID) throws IOException {
start();
Map<String, IndexMetadata> remoteIndexMetadata = new HashMap<>();
Optional<ClusterMetadataManifest> clusterMetadataManifest = getLatestClusterMetadataManifest(clusterName, clusterUUID);
if (!clusterMetadataManifest.isPresent()) {
throw new IllegalStateException("Latest index metadata is not present for the provided clusterUUID");
}
assert Objects.equals(clusterUUID, clusterMetadataManifest.get().getClusterUUID())
private Map<String, IndexMetadata> getIndexMetadataMap(
String clusterName,
String clusterUUID,
ClusterMetadataManifest clusterMetadataManifest
) {
assert Objects.equals(clusterUUID, clusterMetadataManifest.getClusterUUID())
: "Corrupt ClusterMetadataManifest found. Cluster UUID mismatch.";
for (UploadedIndexMetadata uploadedIndexMetadata : clusterMetadataManifest.get().getIndices()) {
Map<String, IndexMetadata> remoteIndexMetadata = new HashMap<>();
for (UploadedIndexMetadata uploadedIndexMetadata : clusterMetadataManifest.getIndices()) {
IndexMetadata indexMetadata = getIndexMetadata(clusterName, clusterUUID, uploadedIndexMetadata);
remoteIndexMetadata.put(uploadedIndexMetadata.getIndexUUID(), indexMetadata);
}
Expand Down Expand Up @@ -704,6 +704,52 @@ private IndexMetadata getIndexMetadata(String clusterName, String clusterUUID, U
}
}

/**
* Fetch latest metadata from remote cluster state including global metadata and index metadata
*
* @param clusterUUID uuid of cluster state to refer to in remote
* @param clusterName name of the cluster
* @return {@link IndexMetadata}
*/
public Metadata getLatestMetadata(String clusterName, String clusterUUID) throws IOException {
start();
Optional<ClusterMetadataManifest> clusterMetadataManifest = getLatestClusterMetadataManifest(clusterName, clusterUUID);
if (!clusterMetadataManifest.isPresent()) {
throw new IllegalStateException(
String.format(Locale.ROOT, "Latest cluster metadata manifest is not present for the provided clusterUUID: %s", clusterUUID)
);
}
// Fetch Global Metadata
Metadata globalMetadata = getGlobalMetadata(clusterName, clusterUUID, clusterMetadataManifest.get());

// Fetch Index Metadata
Map<String, IndexMetadata> indices = getIndexMetadataMap(clusterName, clusterUUID, clusterMetadataManifest.get());

return Metadata.builder(globalMetadata).indices(indices).build();
}

private Metadata getGlobalMetadata(String clusterName, String clusterUUID, ClusterMetadataManifest clusterMetadataManifest) {
String globalMetadataFileName = clusterMetadataManifest.getGlobalMetadataFileName();
try {
// Fetch Global metadata
if (globalMetadataFileName != null) {
String[] splitPath = globalMetadataFileName.split("/");
return GLOBAL_METADATA_FORMAT.read(
globalMetadataContainer(clusterName, clusterUUID),
splitPath[splitPath.length - 1],
blobStoreRepository.getNamedXContentRegistry()
);
} else {
return Metadata.EMPTY_METADATA;
}
} catch (IOException e) {
throw new IllegalStateException(
String.format(Locale.ROOT, "Error while downloading Global Metadata - %s", globalMetadataFileName),
e
);
}
}

/**
* Fetch latest ClusterMetadataManifest from remote state store
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ public RemoteRestoreResult restore(
|| restoreClusterUUID.isBlank()) == false;
if (metadataFromRemoteStore) {
try {
remoteClusterStateService.getLatestIndexMetadata(currentState.getClusterName().value(), restoreClusterUUID)
remoteClusterStateService.getLatestMetadata(currentState.getClusterName().value(), restoreClusterUUID)
.getIndices()
.values()
.forEach(indexMetadata -> {
indexMetadataMap.put(indexMetadata.getIndex().getName(), new Tuple<>(true, indexMetadata));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.coordination.CoordinationMetadata;
import org.opensearch.cluster.metadata.IndexGraveyard;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.node.DiscoveryNodes;
Expand All @@ -28,6 +29,7 @@
import org.opensearch.common.lucene.store.ByteArrayIndexInput;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.ParseField;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.bytes.BytesArray;
import org.opensearch.core.common.bytes.BytesReference;
Expand Down Expand Up @@ -634,7 +636,8 @@ public void testReadLatestMetadataManifestSuccessButNoIndexMetadata() throws IOE

remoteClusterStateService.start();
assertEquals(
remoteClusterStateService.getLatestIndexMetadata(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID())
remoteClusterStateService.getLatestMetadata(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID())
.getIndices()
.size(),
0
);
Expand Down Expand Up @@ -662,10 +665,8 @@ public void testReadLatestMetadataManifestSuccessButIndexMetadataFetchIOExceptio
remoteClusterStateService.start();
Exception e = assertThrows(
IllegalStateException.class,
() -> remoteClusterStateService.getLatestIndexMetadata(
clusterState.getClusterName().value(),
clusterState.metadata().clusterUUID()
)
() -> remoteClusterStateService.getLatestMetadata(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID())
.getIndices()
);
assertEquals(e.getMessage(), "Error while downloading IndexMetadata - " + uploadedIndexMetadata.getUploadedFilename());
}
Expand Down Expand Up @@ -704,6 +705,70 @@ public void testReadLatestMetadataManifestSuccess() throws IOException {
assertThat(manifest.getStateUUID(), is(expectedManifest.getStateUUID()));
}

public void testReadGlobalMetadata() throws IOException {
when(blobStoreRepository.getNamedXContentRegistry()).thenReturn(new NamedXContentRegistry(
List.of(new NamedXContentRegistry.Entry(Metadata.Custom.class, new ParseField(IndexGraveyard.TYPE), IndexGraveyard::fromXContent))));
final ClusterState clusterState = generateClusterStateWithGlobalMetadata().nodes(nodesWithLocalNodeClusterManager()).build();
remoteClusterStateService.start();

final ClusterMetadataManifest expectedManifest = ClusterMetadataManifest.builder()
.indices(List.of())
.clusterTerm(1L)
.stateVersion(1L)
.stateUUID("state-uuid")
.clusterUUID("cluster-uuid")
.codecVersion(MANIFEST_CURRENT_CODEC_VERSION)
.globalMetadataFileName("global-metadata-file")
.nodeId("nodeA")
.opensearchVersion(VersionUtils.randomOpenSearchVersion(random()))
.previousClusterUUID("prev-cluster-uuid")
.build();

Metadata expactedMetadata = Metadata.builder().persistentSettings(Settings.builder().put("readonly", true).build()).build();
mockBlobContainerForGlobalMetadata(mockBlobStoreObjects(), expectedManifest, expactedMetadata);

Metadata metadata = remoteClusterStateService.getLatestMetadata(
clusterState.getClusterName().value(),
clusterState.metadata().clusterUUID()
);

assertTrue(Metadata.isGlobalStateEquals(metadata, expactedMetadata));
}

public void testReadGlobalMetadataIOException() throws IOException {
final ClusterState clusterState = generateClusterStateWithGlobalMetadata().nodes(nodesWithLocalNodeClusterManager()).build();
remoteClusterStateService.start();
String globalIndexMetadataName = "global-metadata-file";
final ClusterMetadataManifest expectedManifest = ClusterMetadataManifest.builder()
.indices(List.of())
.clusterTerm(1L)
.stateVersion(1L)
.stateUUID("state-uuid")
.clusterUUID("cluster-uuid")
.codecVersion(MANIFEST_CURRENT_CODEC_VERSION)
.globalMetadataFileName(globalIndexMetadataName)
.nodeId("nodeA")
.opensearchVersion(VersionUtils.randomOpenSearchVersion(random()))
.previousClusterUUID("prev-cluster-uuid")
.build();

Metadata expactedMetadata = Metadata.builder().persistentSettings(Settings.builder().put("readonly", true).build()).build();

BlobContainer blobContainer = mockBlobStoreObjects();
mockBlobContainerForGlobalMetadata(blobContainer, expectedManifest, expactedMetadata);

when(blobContainer.readBlob(RemoteClusterStateService.GLOBAL_METADATA_FORMAT.blobName(globalIndexMetadataName))).thenThrow(
FileNotFoundException.class
);

remoteClusterStateService.start();
Exception e = assertThrows(
IllegalStateException.class,
() -> remoteClusterStateService.getLatestMetadata(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID())
);
assertEquals(e.getMessage(), "Error while downloading Global Metadata - " + globalIndexMetadataName);
}

public void testReadLatestIndexMetadataSuccess() throws IOException {
final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build();
remoteClusterStateService.start();
Expand All @@ -730,15 +795,16 @@ public void testReadLatestIndexMetadataSuccess() throws IOException {
.nodeId("nodeA")
.opensearchVersion(VersionUtils.randomOpenSearchVersion(random()))
.previousClusterUUID("prev-cluster-uuid")
.globalMetadataFileName("global-metadata-file")
.codecVersion(ClusterMetadataManifest.CODEC_V0)
.build();

mockBlobContainer(mockBlobStoreObjects(), expectedManifest, Map.of(index.getUUID(), indexMetadata));

Map<String, IndexMetadata> indexMetadataMap = remoteClusterStateService.getLatestIndexMetadata(
Map<String, IndexMetadata> indexMetadataMap = remoteClusterStateService.getLatestMetadata(
clusterState.getClusterName().value(),
clusterState.metadata().clusterUUID()
);
).getIndices();

assertEquals(indexMetadataMap.size(), 1);
assertEquals(indexMetadataMap.get(index.getUUID()).getIndex().getName(), index.getName());
Expand Down Expand Up @@ -1114,6 +1180,41 @@ private void mockBlobContainer(
});
}

private void mockBlobContainerForGlobalMetadata(
BlobContainer blobContainer,
ClusterMetadataManifest clusterMetadataManifest,
Metadata metadata
) throws IOException {
String mockManifestFileName = "manifest__1__2__C__456__1";
BlobMetadata blobMetadata = new PlainBlobMetadata(mockManifestFileName, 1);
when(
blobContainer.listBlobsByPrefixInSortedOrder(
"manifest" + RemoteClusterStateService.DELIMITER,
1,
BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC
)
).thenReturn(Arrays.asList(blobMetadata));

BytesReference bytes = RemoteClusterStateService.CLUSTER_METADATA_MANIFEST_FORMAT.serialize(
clusterMetadataManifest,
mockManifestFileName,
blobStoreRepository.getCompressor(),
FORMAT_PARAMS
);
when(blobContainer.readBlob(mockManifestFileName)).thenReturn(new ByteArrayInputStream(bytes.streamInput().readAllBytes()));

BytesReference bytesGlobalMetadata = RemoteClusterStateService.GLOBAL_METADATA_FORMAT.serialize(
metadata,
"global-metadata-file",
blobStoreRepository.getCompressor(),
FORMAT_PARAMS
);
String[] splitPath = clusterMetadataManifest.getGlobalMetadataFileName().split("/");
when(blobContainer.readBlob(RemoteClusterStateService.GLOBAL_METADATA_FORMAT.blobName(splitPath[splitPath.length - 1]))).thenReturn(
new ByteArrayInputStream(bytesGlobalMetadata.streamInput().readAllBytes())
);
}

private static ClusterState.Builder generateClusterStateWithGlobalMetadata() {
final Settings clusterSettings = Settings.builder().put("cluster.blocks.read_only", true).build();
final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder().term(1L).build();
Expand Down

0 comments on commit 4a0b3c3

Please sign in to comment.