Skip to content

Commit

Permalink
Merge branch 'createpit' of github.com:bharath-techie/OpenSearch into…
Browse files Browse the repository at this point in the history
… deletepit
  • Loading branch information
bharath-techie committed May 11, 2022
2 parents 2acb465 + 648402e commit 480bdc2
Show file tree
Hide file tree
Showing 25 changed files with 589 additions and 136 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.opensearch.action.get.MultiGetRequest;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.search.ClearScrollRequest;
import org.opensearch.action.search.CreatePitRequest;
import org.opensearch.action.search.MultiSearchRequest;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchScrollRequest;
Expand Down Expand Up @@ -92,6 +93,7 @@
import org.opensearch.index.reindex.ReindexRequest;
import org.opensearch.index.reindex.UpdateByQueryRequest;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.rest.action.search.RestCreatePitAction;
import org.opensearch.rest.action.search.RestSearchAction;
import org.opensearch.script.mustache.MultiSearchTemplateRequest;
import org.opensearch.script.mustache.SearchTemplateRequest;
Expand Down Expand Up @@ -433,9 +435,13 @@ static void addSearchRequestParams(Params params, SearchRequest searchRequest) {
params.putParam(RestSearchAction.TYPED_KEYS_PARAM, "true");
params.withRouting(searchRequest.routing());
params.withPreference(searchRequest.preference());
params.withIndicesOptions(searchRequest.indicesOptions());
if (searchRequest.pointInTimeBuilder() == null) {
params.withIndicesOptions(searchRequest.indicesOptions());
}
params.withSearchType(searchRequest.searchType().name().toLowerCase(Locale.ROOT));
params.putParam("ccs_minimize_roundtrips", Boolean.toString(searchRequest.isCcsMinimizeRoundtrips()));
if (searchRequest.pointInTimeBuilder() == null) {
params.putParam("ccs_minimize_roundtrips", Boolean.toString(searchRequest.isCcsMinimizeRoundtrips()));
}
if (searchRequest.getPreFilterShardSize() != null) {
params.putParam("pre_filter_shard_size", Integer.toString(searchRequest.getPreFilterShardSize()));
}
Expand All @@ -458,6 +464,18 @@ static Request searchScroll(SearchScrollRequest searchScrollRequest) throws IOEx
return request;
}

static Request createPit(CreatePitRequest createPitRequest) throws IOException {
Params params = new Params();

params.putParam(RestCreatePitAction.ALLOW_PARTIAL_PIT_CREATION, "true");
params.putParam(RestCreatePitAction.KEEP_ALIVE, "1d");
params.withIndicesOptions(createPitRequest.indicesOptions());
Request request = new Request(HttpPost.METHOD_NAME, endpoint(createPitRequest.indices(), "_search/point_in_time"));
request.addParameters(params.asMap());
request.setEntity(createEntity(createPitRequest, REQUEST_BODY_CONTENT_TYPE));
return request;
}

static Request clearScroll(ClearScrollRequest clearScrollRequest) throws IOException {
Request request = new Request(HttpDelete.METHOD_NAME, "/_search/scroll");
request.setEntity(createEntity(clearScrollRequest, REQUEST_BODY_CONTENT_TYPE));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.search.ClearScrollRequest;
import org.opensearch.action.search.ClearScrollResponse;
import org.opensearch.action.search.CreatePitRequest;
import org.opensearch.action.search.CreatePitResponse;
import org.opensearch.action.search.MultiSearchRequest;
import org.opensearch.action.search.MultiSearchResponse;
import org.opensearch.action.search.SearchRequest;
Expand Down Expand Up @@ -1256,6 +1258,46 @@ public final Cancellable scrollAsync(
);
}

/**
* Create PIT context using create PIT API
*
* @param createPitRequest the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return the response
*/
public final CreatePitResponse createPit(CreatePitRequest createPitRequest, RequestOptions options) throws IOException {
return performRequestAndParseEntity(
createPitRequest,
RequestConverters::createPit,
options,
CreatePitResponse::fromXContent,
emptySet()
);
}

/**
* Asynchronously Create PIT context using create PIT API
*
* @param createPitRequest the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener the listener to be notified upon request completion
* @return the response
*/
public final Cancellable createPitAsync(
CreatePitRequest createPitRequest,
RequestOptions options,
ActionListener<CreatePitResponse> listener
) {
return performRequestAsyncAndParseEntity(
createPitRequest,
RequestConverters::createPit,
options,
CreatePitResponse::fromXContent,
listener,
emptySet()
);
}

/**
* Clears one or more scroll ids using the Clear Scroll API.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.client;

import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.junit.Before;
import org.opensearch.action.search.CreatePitRequest;
import org.opensearch.action.search.CreatePitResponse;
import org.opensearch.common.unit.TimeValue;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

/**
* Tests point in time API with rest high level client
*/
public class PitIT extends OpenSearchRestHighLevelClientTestCase {

@Before
public void indexDocuments() throws IOException {
{
Request doc1 = new Request(HttpPut.METHOD_NAME, "/index/_doc/1");
doc1.setJsonEntity("{\"type\":\"type1\", \"id\":1, \"num\":10, \"num2\":50}");
client().performRequest(doc1);
Request doc2 = new Request(HttpPut.METHOD_NAME, "/index/_doc/2");
doc2.setJsonEntity("{\"type\":\"type1\", \"id\":2, \"num\":20, \"num2\":40}");
client().performRequest(doc2);
Request doc3 = new Request(HttpPut.METHOD_NAME, "/index/_doc/3");
doc3.setJsonEntity("{\"type\":\"type1\", \"id\":3, \"num\":50, \"num2\":35}");
client().performRequest(doc3);
Request doc4 = new Request(HttpPut.METHOD_NAME, "/index/_doc/4");
doc4.setJsonEntity("{\"type\":\"type2\", \"id\":4, \"num\":100, \"num2\":10}");
client().performRequest(doc4);
Request doc5 = new Request(HttpPut.METHOD_NAME, "/index/_doc/5");
doc5.setJsonEntity("{\"type\":\"type2\", \"id\":5, \"num\":100, \"num2\":10}");
client().performRequest(doc5);
}

client().performRequest(new Request(HttpPost.METHOD_NAME, "/_refresh"));
}

public void testCreatePit() throws IOException {
CreatePitRequest pitRequest = new CreatePitRequest(new TimeValue(1, TimeUnit.DAYS), true, "index");
CreatePitResponse pitResponse = execute(pitRequest, highLevelClient()::createPit, highLevelClient()::createPitAsync);
assertTrue(pitResponse.getId() != null);
assertEquals(1, pitResponse.getTotalShards());
assertEquals(1, pitResponse.getSuccessfulShards());
assertEquals(0, pitResponse.getFailedShards());
assertEquals(0, pitResponse.getSkippedShards());
}
/**
* Todo: add deletion logic and test cluster settings
*/
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.opensearch.action.get.MultiGetRequest;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.search.ClearScrollRequest;
import org.opensearch.action.search.CreatePitRequest;
import org.opensearch.action.search.MultiSearchRequest;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchScrollRequest;
Expand Down Expand Up @@ -131,6 +132,7 @@
import java.util.Locale;
import java.util.Map;
import java.util.StringJoiner;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
Expand Down Expand Up @@ -1303,6 +1305,27 @@ public void testClearScroll() throws IOException {
assertEquals(REQUEST_BODY_CONTENT_TYPE.mediaTypeWithoutParameters(), request.getEntity().getContentType().getValue());
}

public void testCreatePit() throws IOException {
String[] indices = randomIndicesNames(0, 5);
Map<String, String> expectedParams = new HashMap<>();
expectedParams.put("keep_alive", "1d");
expectedParams.put("allow_partial_pit_creation", "true");
CreatePitRequest createPitRequest = new CreatePitRequest(new TimeValue(1, TimeUnit.DAYS), true, indices);
setRandomIndicesOptions(createPitRequest::indicesOptions, createPitRequest::indicesOptions, expectedParams);
Request request = RequestConverters.createPit(createPitRequest);
StringJoiner endpoint = new StringJoiner("/", "/", "");
String index = String.join(",", indices);
if (Strings.hasLength(index)) {
endpoint.add(index);
}
endpoint.add("_search/point_in_time");
assertEquals(HttpPost.METHOD_NAME, request.getMethod());
assertEquals(endpoint.toString(), request.getEndpoint());
assertEquals(expectedParams, request.getParameters());
assertToXContentBody(createPitRequest, request.getEntity());
assertEquals(REQUEST_BODY_CONTENT_TYPE.mediaTypeWithoutParameters(), request.getEntity().getContentType().getValue());
}

public void testSearchTemplate() throws Exception {
// Create a random request.
String[] indices = randomIndicesNames(0, 5);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
import org.opensearch.action.fieldcaps.FieldCapabilitiesResponse;
import org.opensearch.action.search.ClearScrollRequest;
import org.opensearch.action.search.ClearScrollResponse;
import org.opensearch.action.search.CreatePitRequest;
import org.opensearch.action.search.CreatePitResponse;
import org.opensearch.action.search.MultiSearchRequest;
import org.opensearch.action.search.MultiSearchResponse;
import org.opensearch.action.search.SearchRequest;
Expand Down Expand Up @@ -89,6 +91,7 @@
import org.opensearch.search.aggregations.metrics.WeightedAvgAggregationBuilder;
import org.opensearch.search.aggregations.support.MultiValuesSourceFieldConfig;
import org.opensearch.search.aggregations.support.ValueType;
import org.opensearch.search.builder.PointInTimeBuilder;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.search.fetch.subphase.FetchSourceContext;
import org.opensearch.search.fetch.subphase.highlight.HighlightBuilder;
Expand All @@ -105,6 +108,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertToXContentEquivalent;
Expand Down Expand Up @@ -762,6 +766,37 @@ public void testSearchScroll() throws Exception {
}
}

public void testSearchWithPit() throws Exception {
for (int i = 0; i < 100; i++) {
XContentBuilder builder = jsonBuilder().startObject().field("field", i).endObject();
Request doc = new Request(HttpPut.METHOD_NAME, "/test/_doc/" + Integer.toString(i));
doc.setJsonEntity(Strings.toString(builder));
client().performRequest(doc);
}
client().performRequest(new Request(HttpPost.METHOD_NAME, "/test/_refresh"));

CreatePitRequest pitRequest = new CreatePitRequest(new TimeValue(1, TimeUnit.DAYS), true, "test");
CreatePitResponse pitResponse = execute(pitRequest, highLevelClient()::createPit, highLevelClient()::createPitAsync);

SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().size(35)
.sort("field", SortOrder.ASC)
.pointInTimeBuilder(new PointInTimeBuilder(pitResponse.getId()));
SearchRequest searchRequest = new SearchRequest().source(searchSourceBuilder);
SearchResponse searchResponse = execute(searchRequest, highLevelClient()::search, highLevelClient()::searchAsync);

try {
long counter = 0;
assertSearchHeader(searchResponse);
assertThat(searchResponse.getHits().getTotalHits().value, equalTo(100L));
assertThat(searchResponse.getHits().getHits().length, equalTo(35));
for (SearchHit hit : searchResponse.getHits()) {
assertThat(((Number) hit.getSortValues()[0]).longValue(), equalTo(counter++));
}
} finally {
// TODO : Delete PIT
}
}

public void testMultiSearch() throws Exception {
MultiSearchRequest multiSearchRequest = new MultiSearchRequest();
SearchRequest searchRequest1 = new SearchRequest("index1");
Expand Down
43 changes: 43 additions & 0 deletions rest-api-spec/src/main/resources/rest-api-spec/api/create_pit.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
{
"create_pit":{
"documentation":{
"url":"https://opensearch.org/docs/latest/opensearch/rest-api/point_in_time/",
"description":"Creates point in time context."
},
"stability":"stable",
"url":{
"paths":[
{
"path":"/{index}/_search/point_in_time",
"methods":[
"POST"
],
"parts":{
"index":{
"type":"list",
"description":"A comma-separated list of index names to search; use `_all` or empty string to perform the operation on all indices"
}
}
}
]
},
"params":{
"allow_partial_pit_creation":{
"type":"boolean",
"description":"Allow if point in time can be created with partial failures"
},
"keep_alive":{
"type":"string",
"description":"Specify the keep alive for point in time"
},
"preference":{
"type":"string",
"description":"Specify the node or shard the operation should be performed on (default: random)"
},
"routing":{
"type":"list",
"description":"A comma-separated list of specific routing values"
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,12 @@ private void setupIndex(int numDocs, int numberOfShards) throws IOException, Exe
client().admin()
.indices()
.prepareCreate("test")
.setSettings(Settings.builder().put("number_of_shards", numberOfShards).put("index.max_slices_per_scroll", 10000))
.setSettings(
Settings.builder()
.put("number_of_shards", numberOfShards)
.put("index.max_slices_per_scroll", 10000)
.put("index.max_slices_per_pit", 10000)
)
.setMapping(mapping)
);
ensureGreen();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
public class CreatePitAction extends ActionType<CreatePitResponse> {
public static final CreatePitAction INSTANCE = new CreatePitAction();
public static final String NAME = "indices:data/write/pit";
public static final String NAME = "indices:data/read/pit";

private CreatePitAction() {
super(NAME, CreatePitResponse::new);
Expand Down
Loading

0 comments on commit 480bdc2

Please sign in to comment.