diff --git a/server/src/main/java/org/opensearch/action/support/replication/ReplicationRequest.java b/server/src/main/java/org/opensearch/action/support/replication/ReplicationRequest.java index 92e50f7a476f3..e7451e4f3e328 100644 --- a/server/src/main/java/org/opensearch/action/support/replication/ReplicationRequest.java +++ b/server/src/main/java/org/opensearch/action/support/replication/ReplicationRequest.java @@ -201,7 +201,8 @@ protected Request routedBasedOnClusterVersion(long routedBasedOnClusterVersion) return (Request) this; } - long routedBasedOnClusterVersion() { + // visible for testing + public long routedBasedOnClusterVersion() { return routedBasedOnClusterVersion; } diff --git a/server/src/test/java/org/opensearch/action/bulk/BulkShardRequestTests.java b/server/src/test/java/org/opensearch/action/bulk/BulkShardRequestTests.java index 05c946e73f94d..38dc2bf414ff2 100644 --- a/server/src/test/java/org/opensearch/action/bulk/BulkShardRequestTests.java +++ b/server/src/test/java/org/opensearch/action/bulk/BulkShardRequestTests.java @@ -32,10 +32,17 @@ package org.opensearch.action.bulk; +import org.opensearch.action.DocWriteRequest; +import org.opensearch.action.delete.DeleteRequest; +import org.opensearch.action.index.IndexRequest; import org.opensearch.action.support.WriteRequest.RefreshPolicy; +import org.opensearch.action.update.UpdateRequest; +import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.core.index.shard.ShardId; import org.opensearch.test.OpenSearchTestCase; +import java.io.IOException; + import static org.apache.lucene.tests.util.TestUtil.randomSimpleString; public class BulkShardRequestTests extends OpenSearchTestCase { @@ -55,4 +62,73 @@ public void testToString() { assertEquals("BulkShardRequest [" + shardId + "] containing [" + count + "] requests blocking until refresh", r.toString()); assertEquals("requests[" + count + "], index[" + index + "][0], refresh[WAIT_UNTIL]", r.getDescription()); } + + public void testBulkShardRequestSerialization() throws IOException { + final String index = randomSimpleString(random(), 10); + final int count = between(2, 100); + final ShardId shardId = new ShardId(index, "ignored", 0); + final RefreshPolicy refreshPolicy = randomFrom(RefreshPolicy.values()); + final BulkShardRequest expected = new BulkShardRequest(shardId, refreshPolicy, generateBulkItemRequests(count)); + + final BytesStreamOutput out = new BytesStreamOutput(); + + expected.writeTo(out); + + final BulkShardRequest actual = new BulkShardRequest(out.bytes().streamInput()); + + assertEquals(expected.getParentTask().getId(), actual.getParentTask().getId()); + assertEquals(expected.getParentTask().getNodeId(), actual.getParentTask().getNodeId()); + + assertEquals(expected.shardId(), actual.shardId()); + assertEquals(expected.waitForActiveShards(), actual.waitForActiveShards()); + assertEquals(expected.timeout(), actual.timeout()); + assertEquals(expected.index(), actual.index()); + assertEquals(expected.routedBasedOnClusterVersion(), actual.routedBasedOnClusterVersion()); + + assertEquals(expected.getRefreshPolicy(), actual.getRefreshPolicy()); + + assertEquals(expected.items().length, actual.items().length); + for (int i = 0; i < count; ++i) { + final BulkItemRequest expectedItem = expected.items()[i]; + final BulkItemRequest actualItem = actual.items()[i]; + if (null == expectedItem) { + assertNull(actualItem); + continue; + } + assertEquals(expectedItem.id(), actualItem.id()); + assertEquals(expectedItem.request().id(), actualItem.request().id()); + assertEquals(expectedItem.request().index(), actualItem.request().index()); + assertEquals(expectedItem.request().opType(), actualItem.request().opType()); + } + } + + private BulkItemRequest[] generateBulkItemRequests(final int count) { + final BulkItemRequest[] items = new BulkItemRequest[count]; + final int nullIdx = randomIntBetween(0, count - 1); + for (int i = 0; i < count; i++) { + if (i == nullIdx) { + items[i] = null; + continue; + } + final DocWriteRequest request; + switch (randomFrom(DocWriteRequest.OpType.values())) { + case INDEX: + request = new IndexRequest("index").id("id_" + i); + break; + case CREATE: + request = new IndexRequest("index").id("id_" + i).create(true); + break; + case UPDATE: + request = new UpdateRequest("index", "id_" + i); + break; + case DELETE: + request = new DeleteRequest("index", "id_" + i); + break; + default: + throw new AssertionError("unknown type"); + } + items[i] = new BulkItemRequest(i, request); + } + return items; + } }