Skip to content

Commit

Permalink
Add migration test for .async-search system index
Browse files Browse the repository at this point in the history
This change adds migration testing for the ".async-search" system index
to the full cluster upgrade tests that perform updates from versions N-2
to N via N-1.
The test creates a system index by using async_search on a cluster with
version N-2, then calls the "_migrate" API in version N-1 and finally
checks that on the upgraded cluster in N we are still able to retrieve
async search results from previous versions and can still write to the
system index. This is necessary to ensure we don't end up with a
write-only async search system index when migrating to version 9.
  • Loading branch information
cbuescher committed Feb 3, 2025
1 parent 85f5222 commit bfb8fe7
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 4 deletions.
2 changes: 0 additions & 2 deletions muted-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -332,8 +332,6 @@ tests:
- class: org.elasticsearch.upgrades.VectorSearchIT
method: testBBQVectorSearch {upgradedNodes=0}
issue: https://github.com/elastic/elasticsearch/issues/121253
- class: org.elasticsearch.lucene.FullClusterRestartLuceneIndexCompatibilityIT
issue: https://github.com/elastic/elasticsearch/issues/121257
- class: org.elasticsearch.upgrades.VectorSearchIT
method: testBBQVectorSearch {upgradedNodes=1}
issue: https://github.com/elastic/elasticsearch/issues/121271
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
import org.apache.http.entity.ContentType;
import org.apache.http.entity.InputStreamEntity;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.WarningsHandler;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.MetadataIndexStateService;
Expand All @@ -27,6 +29,7 @@
import org.elasticsearch.test.cluster.local.distribution.DistributionType;
import org.elasticsearch.test.cluster.util.Version;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.test.rest.ObjectPath;
import org.elasticsearch.xcontent.XContentType;
import org.hamcrest.Matcher;
import org.junit.After;
Expand Down Expand Up @@ -161,8 +164,21 @@ protected static boolean isFullyUpgradedTo(Version version) throws Exception {
}

protected static Version indexVersion(String indexName) throws Exception {
var response = assertOK(client().performRequest(new Request("GET", "/" + indexName + "/_settings")));
int id = Integer.parseInt(createFromResponse(response).evaluate(indexName + ".settings.index.version.created"));
return indexVersion(indexName, false);
}

protected static Version indexVersion(String indexName, boolean ignoreWarnings) throws Exception {
Request request = new Request("GET", "/" + indexName + "/_settings");
request.addParameter("flat_settings", "true");
if (ignoreWarnings) {
RequestOptions.Builder options = request.getOptions().toBuilder();
options.setWarningsHandler(WarningsHandler.PERMISSIVE);
request.setOptions(options);
}
var response = assertOK(client().performRequest(request));
ObjectPath fromResponse = createFromResponse(response);
Map<String, Object> settings = fromResponse.evaluateExact(indexName, "settings");
int id = Integer.parseInt((String) settings.get("index.version.created"));
return new Version((byte) ((id / 1000000) % 100), (byte) ((id / 10000) % 100), (byte) ((id / 100) % 100));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,22 @@

package org.elasticsearch.lucene;

import org.elasticsearch.client.Request;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.WarningsHandler;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.repositories.fs.FsRepository;
import org.elasticsearch.test.cluster.util.Version;
import org.elasticsearch.test.rest.ObjectPath;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_READ_ONLY_BLOCK;
import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_WRITE_BLOCK;
Expand All @@ -37,6 +47,9 @@ public FullClusterRestartLuceneIndexCompatibilityIT(Version version) {
super(version);
}

// we need a place to store async_search ids across cluster restarts
private static Map<String, String> async_search_ids = new HashMap<>(3);

/**
* Creates an index on N-2, upgrades to N-1 and marks as read-only, then upgrades to N.
*/
Expand Down Expand Up @@ -374,4 +387,118 @@ public void testRestoreIndexOverClosedIndex() throws Exception {
deleteIndex(index);
}
}

/**
* 1. creates an index on N-2 and performs async_search on it that is kept in system index
* 2. After update to N-1 (latest) perform a system index migration step, also write block the index
* 3. on N, check that async search results are still retrievable and we can write to the system index
*/
public void testAsyncSearchIndexMigration() throws Exception {
final String index = suffix("index");
final String asyncSearchIndex = ".async-search";
final int numDocs = 2431;

final Request asyncSearchRequest = new Request("POST", "/" + index + "/_async_search?size=100&keep_on_completion=true");

if (isFullyUpgradedTo(VERSION_MINUS_2)) {
createIndex(
client(),
index,
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, randomInt(2))
.put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)
.build()
);
indexDocs(index, numDocs);
ensureGreen(index);

assertThat(indexVersion(index), equalTo(VERSION_MINUS_2));
String asyncId = searchAsyncAndStoreId(asyncSearchRequest, "n-2_id");
ensureGreen(asyncSearchIndex);

checkRetrieveAsyncSearch(asyncId, numDocs);
assertBusy(() -> assertDocCountNoWarnings(client(), asyncSearchIndex, 1));
assertThat(indexVersion(asyncSearchIndex, true), equalTo(VERSION_MINUS_2));
return;
}

if (isFullyUpgradedTo(VERSION_MINUS_1)) {
// check .async-search index is readable
assertThat(indexVersion(asyncSearchIndex, true), equalTo(VERSION_MINUS_2));

// migrate system indices
Request migrateRequest = new Request("POST", "/_migration/system_features");
assertThat(
ObjectPath.createFromResponse(client().performRequest(migrateRequest)).evaluate("features.0.feature_name"),
equalTo("async_search")
);
assertBusy(() -> {
Request checkMigrateProgress = new Request("GET", "/_migration/system_features");
Response resp = null;
try {
assertFalse(
ObjectPath.createFromResponse(client().performRequest(checkMigrateProgress))
.evaluate("migration_status")
.equals("IN_PROGRESS")
);
} catch (IOException e) {
throw new RuntimeException(e);
}
});

// check search results from n-2 search are still readable
checkRetrieveAsyncSearch(async_search_ids.get("n-2_id"), numDocs);

// perform new async search and check its readable
String asyncId = searchAsyncAndStoreId(asyncSearchRequest, "n-1_id");
checkRetrieveAsyncSearch(asyncId, numDocs);
assertBusy(() -> assertDocCountNoWarnings(client(), asyncSearchIndex, 2));

// in order to move to current version we need write block for n-2 index
addIndexBlock(index, IndexMetadata.APIBlock.WRITE);
}

if (isFullyUpgradedTo(VERSION_CURRENT)) {
assertThat(indexVersion(index, true), equalTo(VERSION_MINUS_2));
checkRetrieveAsyncSearch(async_search_ids.get("n-2_id"), numDocs);
checkRetrieveAsyncSearch(async_search_ids.get("n-1_id"), numDocs);

// check system index is still writeable
String asyncId = searchAsyncAndStoreId(asyncSearchRequest, "n_id");
checkRetrieveAsyncSearch(asyncId, numDocs);
assertBusy(() -> assertDocCountNoWarnings(client(), asyncSearchIndex, 3));
}

}

private static String searchAsyncAndStoreId(Request asyncSearchRequest, String asyncIdName) throws IOException {
ObjectPath resp = ObjectPath.createFromResponse(client().performRequest(asyncSearchRequest));
String asyncId = resp.evaluate("id");
assertNotNull(asyncId);
async_search_ids.put(asyncIdName, asyncId);
return asyncId;
}

private static void checkRetrieveAsyncSearch(String asyncId, int numDocs) throws IOException {
var asyncGet = new Request("GET", "/_async_search/" + asyncId);
ObjectPath resp = ObjectPath.createFromResponse(client().performRequest(asyncGet));
assertEquals(Integer.valueOf(numDocs), resp.evaluate("response.hits.total.value"));
}

/**
* Assert that the index in question has the given number of documents present
*/
public static void assertDocCountNoWarnings(RestClient client, String indexName, long docCount) throws IOException {
Request countReq = new Request("GET", "/" + indexName + "/_count");
RequestOptions.Builder options = countReq.getOptions().toBuilder();
options.setWarningsHandler(WarningsHandler.PERMISSIVE);
countReq.setOptions(options);
ObjectPath resp = ObjectPath.createFromResponse(client.performRequest(countReq));
assertEquals(
"expected " + docCount + " documents but it was a different number",
docCount,
Long.parseLong(resp.evaluate("count").toString())
);
}
}

0 comments on commit bfb8fe7

Please sign in to comment.