From 6ceaf615e220f821ddc3e36bb41b82c80e595fe7 Mon Sep 17 00:00:00 2001 From: Bharathwaj G Date: Fri, 13 May 2022 11:50:29 +0530 Subject: [PATCH] addressing comments Signed-off-by: Bharathwaj G --- .../opensearch/client/RequestConverters.java | 2 +- .../action/search/CreatePitController.java | 23 +++++----- .../opensearch/action/search/SearchUtils.java | 42 +++++++++++++++++++ .../org/opensearch/search/SearchService.java | 18 +++----- .../search/CreatePitControllerTests.java | 3 +- 5 files changed, 60 insertions(+), 28 deletions(-) create mode 100644 server/src/main/java/org/opensearch/action/search/SearchUtils.java diff --git a/client/rest-high-level/src/main/java/org/opensearch/client/RequestConverters.java b/client/rest-high-level/src/main/java/org/opensearch/client/RequestConverters.java index 47bb6630c08db..277759c921fbf 100644 --- a/client/rest-high-level/src/main/java/org/opensearch/client/RequestConverters.java +++ b/client/rest-high-level/src/main/java/org/opensearch/client/RequestConverters.java @@ -439,7 +439,7 @@ static void addSearchRequestParams(Params params, SearchRequest searchRequest) { params.withIndicesOptions(searchRequest.indicesOptions()); } params.withSearchType(searchRequest.searchType().name().toLowerCase(Locale.ROOT)); - if (searchRequest.pointInTimeBuilder() == null) { + if (searchRequest.pointInTimeBuilder() != null) { params.putParam("ccs_minimize_roundtrips", "false"); } else { params.putParam("ccs_minimize_roundtrips", Boolean.toString(searchRequest.isCcsMinimizeRoundtrips())); diff --git a/server/src/main/java/org/opensearch/action/search/CreatePitController.java b/server/src/main/java/org/opensearch/action/search/CreatePitController.java index 5b20bbf2696db..4ed2e5b3de0a3 100644 --- a/server/src/main/java/org/opensearch/action/search/CreatePitController.java +++ b/server/src/main/java/org/opensearch/action/search/CreatePitController.java @@ -103,7 +103,7 @@ public void executeCreatePit(StepListener createPitListener, Act * Phase 2 of create PIT where we update pit id in pit contexts */ createPitListener.whenComplete( - searchResponse -> { executeUpdatePitId(request, searchResponse, updatePitIdListener); }, + searchResponse -> { executeUpdatePitId(request, searchRequest, searchResponse, updatePitIdListener); }, updatePitIdListener::onFailure ); } @@ -148,6 +148,7 @@ public void executeOnShardTarget( */ void executeUpdatePitId( CreatePitRequest request, + SearchRequest searchRequest, SearchResponse searchResponse, ActionListener updatePitIdListener ) { @@ -161,7 +162,13 @@ void executeUpdatePitId( * store the create time ( same create time for all PIT contexts across shards ) to be used * for list PIT api */ - final long creationTime = System.currentTimeMillis(); + final long relativeStartNanos = System.nanoTime(); + final TransportSearchAction.SearchTimeProvider timeProvider = new TransportSearchAction.SearchTimeProvider( + searchRequest.getOrCreateAbsoluteStartMillis(), + relativeStartNanos, + System::nanoTime + ); + final long creationTime = timeProvider.getAbsoluteStartMillis(); CreatePitResponse createPITResponse = new CreatePitResponse( searchResponse.pointInTimeId(), creationTime, @@ -212,27 +219,17 @@ void executeUpdatePitId( } } }, updatePitIdListener::onFailure); - } private StepListener> getConnectionLookupListener(SearchContextId contextId) { ClusterState state = clusterService.state(); - final Set clusters = contextId.shards() .values() .stream() .filter(ctx -> Strings.isEmpty(ctx.getClusterAlias()) == false) .map(SearchContextIdForNode::getClusterAlias) .collect(Collectors.toSet()); - - final StepListener> lookupListener = new StepListener<>(); - - if (clusters.isEmpty()) { - lookupListener.onResponse((cluster, nodeId) -> state.getNodes().get(nodeId)); - } else { - searchTransportService.getRemoteClusterService().collectNodes(clusters, lookupListener); - } - return lookupListener; + return SearchUtils.getConnectionLookupListener(searchTransportService, state, clusters); } private ActionListener getGroupedListener( diff --git a/server/src/main/java/org/opensearch/action/search/SearchUtils.java b/server/src/main/java/org/opensearch/action/search/SearchUtils.java new file mode 100644 index 0000000000000..0fdb1331d8ad6 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/search/SearchUtils.java @@ -0,0 +1,42 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.opensearch.action.StepListener; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.node.DiscoveryNode; + +import java.util.Set; +import java.util.function.BiFunction; + +/** + * Helper class for common search functions + */ +public class SearchUtils { + + public SearchUtils() {} + + /** + * Get connection lookup listener for list of clusters passed + */ + public static StepListener> getConnectionLookupListener( + SearchTransportService searchTransportService, + ClusterState state, + Set clusters + ) { + final StepListener> lookupListener = new StepListener<>(); + + if (clusters.isEmpty()) { + lookupListener.onResponse((cluster, nodeId) -> state.getNodes().get(nodeId)); + } else { + searchTransportService.getRemoteClusterService().collectNodes(clusters, lookupListener); + } + return lookupListener; + } +} diff --git a/server/src/main/java/org/opensearch/search/SearchService.java b/server/src/main/java/org/opensearch/search/SearchService.java index 4af98aad74541..a5b2e998d4361 100644 --- a/server/src/main/java/org/opensearch/search/SearchService.java +++ b/server/src/main/java/org/opensearch/search/SearchService.java @@ -872,10 +872,7 @@ public void createPitReaderContext(ShardId shardId, TimeValue keepAlive, ActionL Releasable decreasePitContexts = null; Engine.SearcherSupplier searcherSupplier = null; ReaderContext readerContext = null; - boolean success = false; try { - // use this when reader context is freed - decreasePitContexts = openPitContexts::decrementAndGet; if (openPitContexts.incrementAndGet() > maxOpenPitContext) { throw new OpenSearchRejectedExecutionException( "Trying to create too many Point In Time contexts. Must be less than or equal to: [" @@ -894,6 +891,9 @@ public void createPitReaderContext(ShardId shardId, TimeValue keepAlive, ActionL searchOperationListener.onNewReaderContext(readerContext); searchOperationListener.onNewPitContext(finalReaderContext); + + // use this when reader context is freed + decreasePitContexts = openPitContexts::decrementAndGet; readerContext.addOnClose(decreasePitContexts); decreasePitContexts = null; @@ -905,15 +905,9 @@ public void createPitReaderContext(ShardId shardId, TimeValue keepAlive, ActionL putReaderContext(readerContext); readerContext = null; listener.onResponse(finalReaderContext.id()); - success = true; } catch (Exception exc) { + Releasables.closeWhileHandlingException(searcherSupplier, readerContext, decreasePitContexts); listener.onFailure(exc); - } finally { - if (success) { - Releasables.close(readerContext, searcherSupplier, decreasePitContexts); - } else { - Releasables.closeWhileHandlingException(searcherSupplier, readerContext, decreasePitContexts); - } } }); } @@ -1043,14 +1037,14 @@ public void updatePitIdAndKeepAlive(UpdatePitContextRequest request, ActionListe Releasable updatePit = null; try { updatePit = readerContext.updatePitIdAndKeepAlive(request.getKeepAlive(), request.getPitId(), request.getCreationTime()); - updatePit.close(); listener.onResponse(new UpdatePitContextResponse(request.getPitId(), request.getCreationTime(), request.getKeepAlive())); } catch (Exception e) { freeReaderContext(readerContext.id()); + listener.onFailure(e); + } finally { if (updatePit != null) { updatePit.close(); } - listener.onFailure(e); } } diff --git a/server/src/test/java/org/opensearch/action/search/CreatePitControllerTests.java b/server/src/test/java/org/opensearch/action/search/CreatePitControllerTests.java index 5411d6c72686d..a1521cca03640 100644 --- a/server/src/test/java/org/opensearch/action/search/CreatePitControllerTests.java +++ b/server/src/test/java/org/opensearch/action/search/CreatePitControllerTests.java @@ -126,7 +126,7 @@ public void onFailure(Exception e) { clusterServiceMock = mock(ClusterService.class); ClusterState state = mock(ClusterState.class); - final Settings keepAliveSettings = Settings.builder().put(CreatePitController.PIT_CREATE_PHASE_KEEP_ALIVE.getKey(), 30000).build(); + final Settings keepAliveSettings = Settings.builder().put(CreatePitController.PIT_INIT_KEEP_ALIVE.getKey(), 30000).build(); when(clusterServiceMock.getSettings()).thenReturn(keepAliveSettings); when(state.getMetadata()).thenReturn(Metadata.EMPTY_METADATA); @@ -177,7 +177,6 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod CreatePitRequest request = new CreatePitRequest(TimeValue.timeValueDays(1), true); request.setIndices(new String[] { "index" }); - CreatePitController controller = new CreatePitController( request, searchTransportService,