Skip to content

Commit

Permalink
Add BulkShardRequest Serialization Test (#17057)
Browse files Browse the repository at this point in the history
Signed-off-by: jchrys <[email protected]>
(cherry picked from commit 7977446)
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
github-actions[bot] committed Jan 22, 2025
1 parent e6edb1e commit 399b3f9
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,8 @@ protected Request routedBasedOnClusterVersion(long routedBasedOnClusterVersion)
return (Request) this;
}

long routedBasedOnClusterVersion() {
// visible for testing
public long routedBasedOnClusterVersion() {
return routedBasedOnClusterVersion;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,17 @@

package org.opensearch.action.bulk;

import org.opensearch.action.DocWriteRequest;
import org.opensearch.action.delete.DeleteRequest;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.support.WriteRequest.RefreshPolicy;
import org.opensearch.action.update.UpdateRequest;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.test.OpenSearchTestCase;

import java.io.IOException;

import static org.apache.lucene.tests.util.TestUtil.randomSimpleString;

public class BulkShardRequestTests extends OpenSearchTestCase {
Expand All @@ -55,4 +62,73 @@ public void testToString() {
assertEquals("BulkShardRequest [" + shardId + "] containing [" + count + "] requests blocking until refresh", r.toString());
assertEquals("requests[" + count + "], index[" + index + "][0], refresh[WAIT_UNTIL]", r.getDescription());
}

public void testBulkShardRequestSerialization() throws IOException {
final String index = randomSimpleString(random(), 10);
final int count = between(2, 100);
final ShardId shardId = new ShardId(index, "ignored", 0);
final RefreshPolicy refreshPolicy = randomFrom(RefreshPolicy.values());
final BulkShardRequest expected = new BulkShardRequest(shardId, refreshPolicy, generateBulkItemRequests(count));

final BytesStreamOutput out = new BytesStreamOutput();

expected.writeTo(out);

final BulkShardRequest actual = new BulkShardRequest(out.bytes().streamInput());

assertEquals(expected.getParentTask().getId(), actual.getParentTask().getId());
assertEquals(expected.getParentTask().getNodeId(), actual.getParentTask().getNodeId());

assertEquals(expected.shardId(), actual.shardId());
assertEquals(expected.waitForActiveShards(), actual.waitForActiveShards());
assertEquals(expected.timeout(), actual.timeout());
assertEquals(expected.index(), actual.index());
assertEquals(expected.routedBasedOnClusterVersion(), actual.routedBasedOnClusterVersion());

assertEquals(expected.getRefreshPolicy(), actual.getRefreshPolicy());

assertEquals(expected.items().length, actual.items().length);
for (int i = 0; i < count; ++i) {
final BulkItemRequest expectedItem = expected.items()[i];
final BulkItemRequest actualItem = actual.items()[i];
if (null == expectedItem) {
assertNull(actualItem);
continue;
}
assertEquals(expectedItem.id(), actualItem.id());
assertEquals(expectedItem.request().id(), actualItem.request().id());
assertEquals(expectedItem.request().index(), actualItem.request().index());
assertEquals(expectedItem.request().opType(), actualItem.request().opType());
}
}

private BulkItemRequest[] generateBulkItemRequests(final int count) {
final BulkItemRequest[] items = new BulkItemRequest[count];
final int nullIdx = randomIntBetween(0, count - 1);
for (int i = 0; i < count; i++) {
if (i == nullIdx) {
items[i] = null;
continue;
}
final DocWriteRequest<?> request;
switch (randomFrom(DocWriteRequest.OpType.values())) {
case INDEX:
request = new IndexRequest("index").id("id_" + i);
break;
case CREATE:
request = new IndexRequest("index").id("id_" + i).create(true);
break;
case UPDATE:
request = new UpdateRequest("index", "id_" + i);
break;
case DELETE:
request = new DeleteRequest("index", "id_" + i);
break;
default:
throw new AssertionError("unknown type");
}
items[i] = new BulkItemRequest(i, request);
}
return items;
}
}

0 comments on commit 399b3f9

Please sign in to comment.