From 08db823fee598d5672017ee0537a60a841bdf5a0 Mon Sep 17 00:00:00 2001 From: Martin Gaievski Date: Tue, 23 Jul 2024 00:54:10 +0000 Subject: [PATCH 1/3] Adding operation for enabling concurrent segment search Signed-off-by: Martin Gaievski --- osbenchmark/worker_coordinator/runner.py | 14 ++++++++++++++ osbenchmark/workload/workload.py | 3 +++ tests/worker_coordinator/runner_test.py | 20 ++++++++++++++++++++ 3 files changed, 37 insertions(+) diff --git a/osbenchmark/worker_coordinator/runner.py b/osbenchmark/worker_coordinator/runner.py index d280ddffd..3d9e1306b 100644 --- a/osbenchmark/worker_coordinator/runner.py +++ b/osbenchmark/worker_coordinator/runner.py @@ -2696,3 +2696,17 @@ async def __call__(self, opensearch, params): def __repr__(self, *args, **kwargs): return "deploy-ml-model" + +class EnableConcurrentSegmentSearch(Runner): + @time_func + async def __call__(self, opensearch, params): + enable_setting = params.get("enable", "false") + body = { + "transient": { + "search.concurrent_segment_search.enabled": enable_setting + } + } + await opensearch.cluster.put_settings(body=body) + + def __repr__(self, *args, **kwargs): + return "enable-concurrent-segment-search" \ No newline at end of file diff --git a/osbenchmark/workload/workload.py b/osbenchmark/workload/workload.py index 95545fb59..c58b1e817 100644 --- a/osbenchmark/workload/workload.py +++ b/osbenchmark/workload/workload.py @@ -635,6 +635,7 @@ class OperationType(Enum): DeleteMlModel = 1041 RegisterMlModel = 1042 DeployMlModel = 1043 + EnableConcurrentSegmentSearch = 1044 @property def admin_op(self): @@ -752,6 +753,8 @@ def from_hyphenated_string(cls, v): return OperationType.TrainKnnModel elif v == "delete-knn-model": return OperationType.DeleteKnnModel + elif v == "enable-concurrent-segment-search": + return OperationType.EnableConcurrentSegmentSearch else: raise KeyError(f"No enum value for [{v}]") diff --git a/tests/worker_coordinator/runner_test.py b/tests/worker_coordinator/runner_test.py index 4f4b303b2..d0b83d461 100644 --- a/tests/worker_coordinator/runner_test.py +++ b/tests/worker_coordinator/runner_test.py @@ -6785,3 +6785,23 @@ async def test_param_id_mandatory(self, opensearch, on_client_request_start, on_ await r(opensearch, params) self.assertEqual(0, opensearch.transport.perform_request.call_count) + +class EnableConcurrentSegmentSearchTests(TestCase): + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') + @mock.patch("opensearchpy.OpenSearch") + @run_async + async def test_enable_concurrent_segment_search(self, opensearch, on_client_request_start, on_client_request_end): + opensearch.cluster.put_settings.return_value = as_future() + params = { + "enable": "true" + } + + r = runner.EnableConcurrentSegmentSearch() + await r(opensearch, params) + + opensearch.cluster.put_settings.assert_called_once_with(body={ + "transient": { + "search.concurrent_segment_search.enabled": "true" + } + }) \ No newline at end of file From d25b1458ffa64f742be1cfc197d5ed0a9d8ca3c1 Mon Sep 17 00:00:00 2001 From: Martin Gaievski Date: Tue, 23 Jul 2024 16:26:10 +0000 Subject: [PATCH 2/3] Fixed lint errors, made setting persistent Signed-off-by: Martin Gaievski --- osbenchmark/worker_coordinator/runner.py | 7 ++++--- tests/worker_coordinator/runner_test.py | 6 +++--- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/osbenchmark/worker_coordinator/runner.py b/osbenchmark/worker_coordinator/runner.py index 3d9e1306b..df3711ad8 100644 --- a/osbenchmark/worker_coordinator/runner.py +++ b/osbenchmark/worker_coordinator/runner.py @@ -107,6 +107,7 @@ def register_default_runners(): register_runner(workload.OperationType.DeployMlModel, Retry(DeployMlModel()), async_runner=True) register_runner(workload.OperationType.TrainKnnModel, Retry(TrainKnnModel()), async_runner=True) register_runner(workload.OperationType.DeleteKnnModel, Retry(DeleteKnnModel()), async_runner=True) + register_runner(workload.OperationType.EnableConcurrentSegmentSearch, Retry(EnableConcurrentSegmentSearch()), async_runner=True) def runner_for(operation_type): try: @@ -2702,11 +2703,11 @@ class EnableConcurrentSegmentSearch(Runner): async def __call__(self, opensearch, params): enable_setting = params.get("enable", "false") body = { - "transient": { + "persistent": { "search.concurrent_segment_search.enabled": enable_setting - } } + } await opensearch.cluster.put_settings(body=body) def __repr__(self, *args, **kwargs): - return "enable-concurrent-segment-search" \ No newline at end of file + return "enable-concurrent-segment-search" diff --git a/tests/worker_coordinator/runner_test.py b/tests/worker_coordinator/runner_test.py index d0b83d461..10f5da722 100644 --- a/tests/worker_coordinator/runner_test.py +++ b/tests/worker_coordinator/runner_test.py @@ -6801,7 +6801,7 @@ async def test_enable_concurrent_segment_search(self, opensearch, on_client_requ await r(opensearch, params) opensearch.cluster.put_settings.assert_called_once_with(body={ - "transient": { + "persistent": { "search.concurrent_segment_search.enabled": "true" - } - }) \ No newline at end of file + } + }) From 41b0429e9aa74ea662b7bfd05f7f6457323f3c9f Mon Sep 17 00:00:00 2001 From: Martin Gaievski Date: Tue, 23 Jul 2024 22:35:31 +0000 Subject: [PATCH 3/3] Generalized to update settings and added max slice param Signed-off-by: Martin Gaievski --- osbenchmark/worker_coordinator/runner.py | 10 ++++-- osbenchmark/workload/workload.py | 6 ++-- tests/worker_coordinator/runner_test.py | 45 ++++++++++++++++++++++-- 3 files changed, 53 insertions(+), 8 deletions(-) diff --git a/osbenchmark/worker_coordinator/runner.py b/osbenchmark/worker_coordinator/runner.py index df3711ad8..90330be54 100644 --- a/osbenchmark/worker_coordinator/runner.py +++ b/osbenchmark/worker_coordinator/runner.py @@ -107,7 +107,8 @@ def register_default_runners(): register_runner(workload.OperationType.DeployMlModel, Retry(DeployMlModel()), async_runner=True) register_runner(workload.OperationType.TrainKnnModel, Retry(TrainKnnModel()), async_runner=True) register_runner(workload.OperationType.DeleteKnnModel, Retry(DeleteKnnModel()), async_runner=True) - register_runner(workload.OperationType.EnableConcurrentSegmentSearch, Retry(EnableConcurrentSegmentSearch()), async_runner=True) + register_runner(workload.OperationType.UpdateConcurrentSegmentSearchSettings, + Retry(UpdateConcurrentSegmentSearchSettings()), async_runner=True) def runner_for(operation_type): try: @@ -2698,16 +2699,19 @@ async def __call__(self, opensearch, params): def __repr__(self, *args, **kwargs): return "deploy-ml-model" -class EnableConcurrentSegmentSearch(Runner): +class UpdateConcurrentSegmentSearchSettings(Runner): @time_func async def __call__(self, opensearch, params): enable_setting = params.get("enable", "false") + max_slice_count = params.get("max_slice_count", None) body = { "persistent": { "search.concurrent_segment_search.enabled": enable_setting } } + if max_slice_count is not None: + body["persistent"]["search.concurrent_segment_search.max_slice_count"] = max_slice_count await opensearch.cluster.put_settings(body=body) def __repr__(self, *args, **kwargs): - return "enable-concurrent-segment-search" + return "update-concurrent-segment-search-settings" diff --git a/osbenchmark/workload/workload.py b/osbenchmark/workload/workload.py index c58b1e817..848cb5b3b 100644 --- a/osbenchmark/workload/workload.py +++ b/osbenchmark/workload/workload.py @@ -635,7 +635,7 @@ class OperationType(Enum): DeleteMlModel = 1041 RegisterMlModel = 1042 DeployMlModel = 1043 - EnableConcurrentSegmentSearch = 1044 + UpdateConcurrentSegmentSearchSettings = 1044 @property def admin_op(self): @@ -753,8 +753,8 @@ def from_hyphenated_string(cls, v): return OperationType.TrainKnnModel elif v == "delete-knn-model": return OperationType.DeleteKnnModel - elif v == "enable-concurrent-segment-search": - return OperationType.EnableConcurrentSegmentSearch + elif v == "update-concurrent-segment-search-settings": + return OperationType.UpdateConcurrentSegmentSearchSettings else: raise KeyError(f"No enum value for [{v}]") diff --git a/tests/worker_coordinator/runner_test.py b/tests/worker_coordinator/runner_test.py index 10f5da722..a48bbb82c 100644 --- a/tests/worker_coordinator/runner_test.py +++ b/tests/worker_coordinator/runner_test.py @@ -6786,7 +6786,7 @@ async def test_param_id_mandatory(self, opensearch, on_client_request_start, on_ self.assertEqual(0, opensearch.transport.perform_request.call_count) -class EnableConcurrentSegmentSearchTests(TestCase): +class UpdateConcurrentSegmentSearchSettingsTests(TestCase): @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') @mock.patch("opensearchpy.OpenSearch") @@ -6797,7 +6797,7 @@ async def test_enable_concurrent_segment_search(self, opensearch, on_client_requ "enable": "true" } - r = runner.EnableConcurrentSegmentSearch() + r = runner.UpdateConcurrentSegmentSearchSettings() await r(opensearch, params) opensearch.cluster.put_settings.assert_called_once_with(body={ @@ -6805,3 +6805,44 @@ async def test_enable_concurrent_segment_search(self, opensearch, on_client_requ "search.concurrent_segment_search.enabled": "true" } }) + + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') + @mock.patch("opensearchpy.OpenSearch") + @run_async + async def test_max_slice_count(self, opensearch, on_client_request_start, on_client_request_end): + opensearch.cluster.put_settings.return_value = as_future() + params = { + "max_slice_count": 2 + } + + r = runner.UpdateConcurrentSegmentSearchSettings() + await r(opensearch, params) + + opensearch.cluster.put_settings.assert_called_once_with(body={ + "persistent": { + "search.concurrent_segment_search.enabled": "false", + "search.concurrent_segment_search.max_slice_count": 2 + } + }) + + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_end') + @mock.patch('osbenchmark.client.RequestContextHolder.on_client_request_start') + @mock.patch("opensearchpy.OpenSearch") + @run_async + async def test_concurrent_segment_search_settings(self, opensearch, on_client_request_start, on_client_request_end): + opensearch.cluster.put_settings.return_value = as_future() + params = { + "enable": "true", + "max_slice_count": 2 + } + + r = runner.UpdateConcurrentSegmentSearchSettings() + await r(opensearch, params) + + opensearch.cluster.put_settings.assert_called_once_with(body={ + "persistent": { + "search.concurrent_segment_search.enabled": "true", + "search.concurrent_segment_search.max_slice_count": 2 + } + })