Skip to content

Commit

Permalink
Address flakiness of testRemoteCleanupDeleteStale
Browse files Browse the repository at this point in the history
- Make RemoteReadResult have Object rather than ToXContent
- Fix getManifestCodecVersion in RemoteClusterMetadataManifest

Signed-off-by: Shivansh Arora <[email protected]>
  • Loading branch information
shiv0408 committed Jun 13, 2024
1 parent afeddc2 commit 1755bf5
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ public void testRemoteCleanupDeleteStale() throws Exception {
.add("cluster-state")
.add(getClusterState().metadata().clusterUUID());
BlobPath manifestContainerPath = baseMetadataPath.add("manifest");
RemoteClusterStateCleanupManager remoteClusterStateCleanupManager = internalCluster().getClusterManagerNodeInstance(
RemoteClusterStateCleanupManager.class
);

// set cleanup interval to 100 ms to make the test faster
ClusterUpdateSettingsResponse response = client().admin()
Expand All @@ -117,6 +120,7 @@ public void testRemoteCleanupDeleteStale() throws Exception {
.get();

assertTrue(response.isAcknowledged());
assertBusy(() -> assertEquals(100, remoteClusterStateCleanupManager.getStaleFileDeletionTask().getInterval().getMillis()));

assertBusy(() -> {
int manifestFiles = repository.blobStore().blobContainer(manifestContainerPath).listBlobsByPrefix("manifest").size();
Expand All @@ -128,7 +132,7 @@ public void testRemoteCleanupDeleteStale() throws Exception {
"Current number of manifest files: " + manifestFiles,
manifestFiles >= RETAINED_MANIFESTS && manifestFiles < RETAINED_MANIFESTS + 2 * SKIP_CLEANUP_STATE_CHANGES
);
}, 500, TimeUnit.MILLISECONDS);
});

// disable the clean up to avoid race condition during shutdown
response = client().admin()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import org.opensearch.common.remote.RemoteWritableEntityStore;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.gateway.remote.model.RemoteClusterBlocks;
import org.opensearch.gateway.remote.model.RemoteClusterStateBlobStore;
import org.opensearch.gateway.remote.model.RemoteClusterStateCustoms;
Expand Down Expand Up @@ -121,7 +120,7 @@ public CheckedRunnable<IOException> getAsyncMetadataReadAction(
LatchedActionListener<RemoteReadResult> listener
) {
final ActionListener actionListener = ActionListener.wrap(
response -> listener.onResponse(new RemoteReadResult((ToXContent) response, CLUSTER_STATE_ATTRIBUTE, component)),
response -> listener.onResponse(new RemoteReadResult(response, CLUSTER_STATE_ATTRIBUTE, component)),
listener::onFailure
);
return () -> getStore(blobEntity).readAsync(blobEntity, actionListener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata(
ClusterState clusterState,
ClusterMetadataManifest previousManifest
) throws IOException {
logger.info("WRITING INCREMENTAL STATE");
logger.trace("WRITING INCREMENTAL STATE");

final long startTimeNanos = relativeTimeNanosSupplier.getAsLong();
if (clusterState.nodes().isLocalNodeElectedClusterManager() == false) {
Expand Down Expand Up @@ -766,7 +766,7 @@ private UploadedMetadataResults writeMetadataInParallel(
throw new IllegalStateException("Unknown metadata component name " + name);
}
});
logger.info("response {}", response.uploadedIndicesRoutingMetadata.toString());
logger.trace("response {}", response.uploadedIndicesRoutingMetadata.toString());
return response;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.compress.Compressor;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.gateway.remote.model.RemoteClusterStateBlobStore;
import org.opensearch.gateway.remote.model.RemoteCoordinationMetadata;
import org.opensearch.gateway.remote.model.RemoteCustomMetadata;
Expand Down Expand Up @@ -194,7 +193,7 @@ CheckedRunnable<IOException> getAsyncMetadataReadAction(
LatchedActionListener<RemoteReadResult> listener
) {
ActionListener actionListener = ActionListener.wrap(
response -> listener.onResponse(new RemoteReadResult((ToXContent) response, readEntity.getType(), componentName)),
response -> listener.onResponse(new RemoteReadResult(response, readEntity.getType(), componentName)),
listener::onFailure
);
return () -> getStore(readEntity).readAsync(readEntity, actionListener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,16 +131,17 @@ public ClusterMetadataManifest deserialize(final InputStream inputStream) throws
return blobStoreFormat.deserialize(blobName, getNamedXContentRegistry(), Streams.readFully(inputStream));
}

private int getManifestCodecVersion() {
// package private for testing
int getManifestCodecVersion() {
assert blobName != null;
String[] splitName = blobName.split(DELIMITER);
String[] splitName = getBlobFileName().split(DELIMITER);
if (splitName.length == SPLITTED_MANIFEST_FILE_LENGTH) {
return Integer.parseInt(splitName[splitName.length - 1]); // Last value would be codec version.
} else if (splitName.length < SPLITTED_MANIFEST_FILE_LENGTH) { // Where codec is not part of file name, i.e. default codec version 0
// is used.
return ClusterMetadataManifest.CODEC_V0;
} else {
throw new IllegalArgumentException("Manifest file name is corrupted");
throw new IllegalArgumentException("Manifest file name is corrupted : " + blobName);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,22 @@

package org.opensearch.gateway.remote.model;

import org.opensearch.core.xcontent.ToXContent;

/**
* Container class for entity read from remote store
*/
public class RemoteReadResult {

ToXContent obj;
Object obj;
String component;
String componentName;

public RemoteReadResult(ToXContent obj, String component, String componentName) {
public RemoteReadResult(Object obj, String component, String componentName) {
this.obj = obj;
this.component = component;
this.componentName = componentName;
}

public ToXContent getObj() {
public Object getObj() {
return obj;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import java.util.stream.Stream;

import static java.util.stream.Collectors.toList;
import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V0;
import static org.opensearch.gateway.remote.ClusterMetadataManifest.CODEC_V2;
import static org.opensearch.gateway.remote.model.RemoteClusterMetadataManifest.MANIFEST;
import static org.opensearch.gateway.remote.model.RemoteClusterMetadataManifest.MANIFEST_CURRENT_CODEC_VERSION;
import static org.hamcrest.Matchers.greaterThan;
Expand Down Expand Up @@ -191,6 +193,16 @@ public void testGenerateBlobFileName() {
assertThat(nameTokens[3], is("C"));
assertThat(RemoteStoreUtils.invertLong(nameTokens[4]), lessThanOrEqualTo(System.currentTimeMillis()));
assertThat(nameTokens[5], is(String.valueOf(MANIFEST_CURRENT_CODEC_VERSION)));

String blobName = "/usr/local/random/path/to/manifest/manifest__1__2__3__4__2";
RemoteClusterMetadataManifest remoteObjectForDownload = new RemoteClusterMetadataManifest(
blobName,
clusterUUID,
compressor,
namedXContentRegistry
);
assertEquals("manifest__1__2__3__4__2", remoteObjectForDownload.generateBlobFileName());
assertEquals(remoteObjectForDownload.getManifestCodecVersion(), 2);
}

public void testGetUploadedMetadata() throws IOException {
Expand Down Expand Up @@ -236,6 +248,28 @@ public void testSerDe() throws IOException {
assertThrows(IllegalArgumentException.class, () -> invalidRemoteObject.deserialize(new ByteArrayInputStream(new byte[0])));
}

public void testGetManifestCodecVersion() {
String manifestFileWithDelimiterInPath =
"123456789012_test-cluster/cluster-state/dsgYj10__Nkso7/manifest/manifest__9223372036854775806__9223372036854775804__C__9223370319103329556__2";
RemoteClusterMetadataManifest remoteManifestForDownload = new RemoteClusterMetadataManifest(
manifestFileWithDelimiterInPath,
clusterUUID,
compressor,
namedXContentRegistry
);
assertEquals(CODEC_V2, remoteManifestForDownload.getManifestCodecVersion());

String v0ManifestFileWithDelimiterInPath =
"123456789012_test-cluster/cluster-state/dsgYj10__Nkso7/manifest/manifest__9223372036854775806__9223372036854775804__C__9223370319103329556";
RemoteClusterMetadataManifest remoteManifestV0ForDownload = new RemoteClusterMetadataManifest(
v0ManifestFileWithDelimiterInPath,
clusterUUID,
compressor,
namedXContentRegistry
);
assertEquals(CODEC_V0, remoteManifestV0ForDownload.getManifestCodecVersion());
}

private ClusterMetadataManifest getClusterMetadataManifest() {
return ClusterMetadataManifest.builder()
.opensearchVersion(Version.CURRENT)
Expand Down

0 comments on commit 1755bf5

Please sign in to comment.