From 64a3b52b8b5e1acf2bb77cde857e1a716b7bd46d Mon Sep 17 00:00:00 2001 From: Bharathwaj G Date: Thu, 16 Nov 2023 13:11:55 +0530 Subject: [PATCH 1/3] threadcontext poc Signed-off-by: Bharathwaj G --- .../main/java/org/opensearch/node/Node.java | 31 ++-- .../node/NodeResourceUsageStats.java | 14 +- .../node/ResourceUsageCollectorService.java | 8 ++ .../transport/TransportService.java | 134 ++++++++++++++++-- .../cluster/NodeConnectionsServiceTests.java | 5 +- .../java/org/opensearch/node/MockNode.java | 6 +- 6 files changed, 165 insertions(+), 33 deletions(-) diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index e80b768074fc7..411b57e48a370 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -894,6 +894,17 @@ protected Node( final RestController restController = actionModule.getRestController(); + final NodeResourceUsageTracker nodeResourceUsageTracker = new NodeResourceUsageTracker( + threadPool, + settings, + clusterService.getClusterSettings() + ); + final ResourceUsageCollectorService resourceUsageCollectorService = new ResourceUsageCollectorService( + nodeResourceUsageTracker, + clusterService, + threadPool + ); + final AdmissionControlService admissionControlService = new AdmissionControlService( settings, clusterService.getClusterSettings(), @@ -942,6 +953,7 @@ protected Node( pluginsService.filterPlugins(ActionPlugin.class).stream().flatMap(p -> p.getTaskHeaders().stream()), Stream.of(Task.X_OPAQUE_ID) ).collect(Collectors.toSet()); + final TransportService transportService = newTransportService( settings, transport, @@ -950,7 +962,8 @@ protected Node( localNodeFactory, settingsModule.getClusterSettings(), taskHeaders, - tracer + tracer, + resourceUsageCollectorService ); TopNSearchTasksLogger taskConsumer = new TopNSearchTasksLogger(settings, settingsModule.getClusterSettings()); transportService.getTaskManager().registerTaskResourceConsumer(taskConsumer); @@ -1101,16 +1114,6 @@ protected Node( transportService.getTaskManager(), taskCancellationMonitoringSettings ); - final NodeResourceUsageTracker nodeResourceUsageTracker = new NodeResourceUsageTracker( - threadPool, - settings, - clusterService.getClusterSettings() - ); - final ResourceUsageCollectorService resourceUsageCollectorService = new ResourceUsageCollectorService( - nodeResourceUsageTracker, - clusterService, - threadPool - ); this.nodeService = new NodeService( settings, threadPool, @@ -1320,9 +1323,11 @@ protected TransportService newTransportService( Function localNodeFactory, ClusterSettings clusterSettings, Set taskHeaders, - Tracer tracer + Tracer tracer, + ResourceUsageCollectorService resourceUsageCollectorService ) { - return new TransportService(settings, transport, threadPool, interceptor, localNodeFactory, clusterSettings, taskHeaders, tracer); + return new TransportService(settings, transport, threadPool, interceptor, localNodeFactory, clusterSettings, + taskHeaders, tracer, resourceUsageCollectorService); } protected void processRecoverySettings(ClusterSettings clusterSettings, RecoverySettings recoverySettings) { diff --git a/server/src/main/java/org/opensearch/node/NodeResourceUsageStats.java b/server/src/main/java/org/opensearch/node/NodeResourceUsageStats.java index 6ef66d4ac1914..32133df470853 100644 --- a/server/src/main/java/org/opensearch/node/NodeResourceUsageStats.java +++ b/server/src/main/java/org/opensearch/node/NodeResourceUsageStats.java @@ -41,6 +41,7 @@ public NodeResourceUsageStats(StreamInput in) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException { + out.writeString(this.nodeId); out.writeLong(this.timestamp); out.writeDouble(this.cpuUtilizationPercent); @@ -49,12 +50,13 @@ public void writeTo(StreamOutput out) throws IOException { @Override public String toString() { - StringBuilder sb = new StringBuilder("NodeResourceUsageStats["); - sb.append(nodeId).append("]("); - sb.append("Timestamp: ").append(timestamp); - sb.append(", CPU utilization percent: ").append(String.format(Locale.ROOT, "%.1f", cpuUtilizationPercent)); - sb.append(", Memory utilization percent: ").append(String.format(Locale.ROOT, "%.1f", memoryUtilizationPercent)); - sb.append(")"); + StringBuilder sb = new StringBuilder(); + sb.append(nodeId).append(":"); + sb.append(timestamp); + sb.append(","); + sb.append(memoryUtilizationPercent); + sb.append(","); + sb.append(cpuUtilizationPercent); return sb.toString(); } diff --git a/server/src/main/java/org/opensearch/node/ResourceUsageCollectorService.java b/server/src/main/java/org/opensearch/node/ResourceUsageCollectorService.java index f1c763e09f147..62a339d2513f4 100644 --- a/server/src/main/java/org/opensearch/node/ResourceUsageCollectorService.java +++ b/server/src/main/java/org/opensearch/node/ResourceUsageCollectorService.java @@ -113,6 +113,14 @@ public Optional getNodeStatistics(final String nodeId) { .map(resourceUsageStats -> new NodeResourceUsageStats(resourceUsageStats)); } + public Optional getLocalNodeStatistics() { + if(clusterService.state() != null) { + return Optional.ofNullable(nodeIdToResourceUsageStats.get(clusterService.state().nodes().getLocalNodeId())) + .map(resourceUsageStats -> new NodeResourceUsageStats(resourceUsageStats)); + } + return Optional.empty(); + } + /** * Returns collected resource usage statistics of all nodes */ diff --git a/server/src/main/java/org/opensearch/transport/TransportService.java b/server/src/main/java/org/opensearch/transport/TransportService.java index de88c3619abe8..84f7eedd49838 100644 --- a/server/src/main/java/org/opensearch/transport/TransportService.java +++ b/server/src/main/java/org/opensearch/transport/TransportService.java @@ -64,6 +64,7 @@ import org.opensearch.core.service.ReportingService; import org.opensearch.core.transport.TransportResponse; import org.opensearch.node.NodeClosedException; +import org.opensearch.node.ResourceUsageCollectorService; import org.opensearch.tasks.Task; import org.opensearch.tasks.TaskManager; import org.opensearch.telemetry.tracing.Span; @@ -77,14 +78,7 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.net.UnknownHostException; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; +import java.util.*; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; @@ -114,6 +108,8 @@ public class TransportService extends AbstractLifecycleComponent protected final ThreadPool threadPool; protected final ClusterName clusterName; protected final TaskManager taskManager; + + private Set admissionControlTransportActions = new HashSet<>(); private final TransportInterceptor.AsyncSender asyncSender; private final Function localNodeFactory; private final boolean remoteClusterClient; @@ -143,6 +139,8 @@ protected boolean removeEldestEntry(Map.Entry eldest) { private final RemoteClusterService remoteClusterService; private final Tracer tracer; + private final ResourceUsageCollectorService resourceUsageCollectorService; + /** if set will call requests sent to this id to shortcut and executed locally */ volatile DiscoveryNode localNode = null; private final Transport.Connection localNodeConnection = new Transport.Connection() { @@ -196,7 +194,8 @@ public TransportService( Function localNodeFactory, @Nullable ClusterSettings clusterSettings, Set taskHeaders, - Tracer tracer + Tracer tracer, + ResourceUsageCollectorService resourceUsageCollectorService ) { this( settings, @@ -207,7 +206,8 @@ public TransportService( clusterSettings, taskHeaders, new ClusterConnectionManager(settings, transport), - tracer + tracer, + resourceUsageCollectorService ); } @@ -220,7 +220,8 @@ public TransportService( @Nullable ClusterSettings clusterSettings, Set taskHeaders, ConnectionManager connectionManager, - Tracer tracer + Tracer tracer, + ResourceUsageCollectorService resourceUsageCollectorService ) { this.transport = transport; transport.setSlowLogThreshold(TransportSettings.SLOW_OPERATION_THRESHOLD_SETTING.get(settings)); @@ -236,6 +237,7 @@ public TransportService( this.asyncSender = interceptor.interceptSender(this::sendRequestInternal); this.remoteClusterClient = DiscoveryNode.isRemoteClusterClient(settings); this.tracer = tracer; + this.resourceUsageCollectorService = resourceUsageCollectorService; remoteClusterService = new RemoteClusterService(settings, this); responseHandlers = transport.getResponseHandlers(); if (clusterSettings != null) { @@ -848,6 +850,7 @@ public final void sendRequest( return; } sendRequest(connection, action, request, options, handler); + // send request with default values } /** @@ -882,12 +885,16 @@ public final void sendRequest( delegate = new TransportResponseHandler() { @Override public void handleResponse(T response) { + // Example - this can come in the form of + addResourceUsageStatsToThreadContext(action); unregisterChildNode.close(); traceableTransportResponseHandler.handleResponse(response); } @Override public void handleException(TransportException exp) { + // Example + addResourceUsageStatsToThreadContext(action); unregisterChildNode.close(); traceableTransportResponseHandler.handleException(exp); } @@ -924,6 +931,65 @@ public String toString() { } } + private void addResourceUsageStatsToThreadContext(String action) { + //if(action.startsWith("indices:data/read/search")) { + if (resourceUsageCollectorService.getLocalNodeStatistics().isPresent()) { + threadPool.getThreadContext().addResponseHeader("PERF_STATS", + resourceUsageCollectorService.getLocalNodeStatistics().get().toString()); + } + //} + } + + public final void sendRequest( + final Transport.Connection connection, + final String action, + final TransportRequest request, + final TransportRequestOptions options, + final TransportResponseHandler handler, + final boolean shouldAddResourceUsageStats + ) { + final TransportResponseHandler delegate; + if(shouldAddResourceUsageStats) { + delegate = new TransportResponseHandler() { + @Override + public void handleResponse(T response) { + if(resourceUsageCollectorService.getLocalNodeStatistics().isPresent()) { + threadPool.getThreadContext().addResponseHeader("PERF_STATS", + resourceUsageCollectorService.getLocalNodeStatistics().get().toString()); + } + handler.handleResponse(response); + } + + @Override + public void handleException(TransportException exp) { + if(resourceUsageCollectorService.getLocalNodeStatistics().isPresent()) { + threadPool.getThreadContext().addResponseHeader("PERF_STATS", + resourceUsageCollectorService.getLocalNodeStatistics().get().toString()); + } + handler.handleException(exp); + } + + @Override + public String executor() { + return handler.executor(); + } + + @Override + public T read(StreamInput in) throws IOException { + return handler.read(in); + } + + @Override + public String toString() { + return getClass().getName() + "/[" + action + "]:" + handler.toString(); + } + }; + } else { + delegate = handler; + } + sendRequest(connection, action, request, options, delegate); + } + /** * Returns either a real transport connection or a local node connection if we are using the local node optimization. * @throws NodeNotConnectedException if the given node is not connected @@ -1253,9 +1319,55 @@ public void onRequestReceived(long requestId, String action) { if (tracerLog.isTraceEnabled() && shouldTraceAction(action)) { tracerLog.trace("[{}][{}] received request", requestId, action); } + addStatsToResourceUsageCollectorService(); messageListener.onRequestReceived(requestId, action); } + private void addStatsToResourceUsageCollectorService() { + try { + Map> responseHeaders = threadPool.getThreadContext().getResponseHeaders(); + if (responseHeaders.size() > 0) { + List perfStats = responseHeaders.get("PERF_STATS"); + // nodeid:111113131313,11.0,11.0 + // NodeResourceUsageStats[aaxnzZb7R3KdRqjqXfv8SQ](Timestamp: 1699253278365, CPU utilization percent: 3.1, Memory utilization percent: 25.0) + + StringBuilder sb = new StringBuilder(); + String nodeId = perfStats.get(0).substring(0, perfStats.get(0).indexOf(':') + 1); + if (resourceUsageCollectorService.getNodeStatistics(nodeId).isPresent()) { + long timestamp = resourceUsageCollectorService.getNodeStatistics(nodeId).get().getTimestamp(); + if (System.currentTimeMillis() - timestamp < 1000) { + logger.warn("Node resource usage stats is updated recently - so skipping"); + } else { + String[] parse = perfStats.get(0).split(":"); + String[] parse1 = parse[1].split(","); + String datatimestamp = parse1[0]; + String cpu = parse1[1]; + String memory = parse1[2]; + resourceUsageCollectorService.collectNodeResourceUsageStats(nodeId, Long.valueOf(datatimestamp), Double.valueOf(cpu), Double.valueOf(memory)); + //logger.warn("Updates stats"); + } + } else { + String[] parse = perfStats.get(0).split(":"); + String[] parse1 = parse[1].split(","); + String datatimestamp = parse1[0]; + String cpu = parse1[1]; + String memory = parse1[2]; + resourceUsageCollectorService.collectNodeResourceUsageStats(nodeId, Long.valueOf(datatimestamp), Double.valueOf(cpu), Double.valueOf(memory)); + logger.warn("added stats"); + } + // String[] parse = responseHeaders.get("PERF_STATS").get(0).split(":")[1].split("NodeResourceUsageStats\\[")[1].split("]"); + // String nodeId = parse[0]; + // String[] parse1 = parse[1].split("\\(")[1].split(": "); + // String timestamp = parse1[1].split(",")[0]; + // String cpu = parse1[2].split(",")[0]; + // String memory = parse1[3].split("\\)")[0]; + + } + } catch(Exception e){ + logger.warn("Adding stats failed : ", e); + } + } + /** called by the {@link Transport} implementation once a request has been sent */ @Override public void onRequestSent( diff --git a/server/src/test/java/org/opensearch/cluster/NodeConnectionsServiceTests.java b/server/src/test/java/org/opensearch/cluster/NodeConnectionsServiceTests.java index 4cf82f1dabab3..76f9603ac8e4e 100644 --- a/server/src/test/java/org/opensearch/cluster/NodeConnectionsServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/NodeConnectionsServiceTests.java @@ -50,6 +50,7 @@ import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.transport.BoundTransportAddress; import org.opensearch.core.common.transport.TransportAddress; +import org.opensearch.node.ResourceUsageCollectorService; import org.opensearch.telemetry.tracing.noop.NoopTracer; import org.opensearch.test.MockLogAppender; import org.opensearch.test.OpenSearchTestCase; @@ -80,6 +81,7 @@ import java.util.function.Predicate; import static java.util.Collections.emptySet; +import static org.mockito.Mockito.mock; import static org.opensearch.cluster.NodeConnectionsService.CLUSTER_NODE_RECONNECT_INTERVAL_SETTING; import static org.opensearch.common.settings.Settings.builder; import static org.opensearch.common.unit.TimeValue.timeValueMillis; @@ -554,7 +556,8 @@ private TestTransportService(Transport transport, ThreadPool threadPool) { boundAddress -> DiscoveryNode.createLocal(Settings.EMPTY, buildNewFakeTransportAddress(), UUIDs.randomBase64UUID()), null, emptySet(), - NoopTracer.INSTANCE + NoopTracer.INSTANCE, + mock(ResourceUsageCollectorService.class) ); } diff --git a/test/framework/src/main/java/org/opensearch/node/MockNode.java b/test/framework/src/main/java/org/opensearch/node/MockNode.java index e6c7e21d5b3ea..a567f7a2e8639 100644 --- a/test/framework/src/main/java/org/opensearch/node/MockNode.java +++ b/test/framework/src/main/java/org/opensearch/node/MockNode.java @@ -201,7 +201,8 @@ protected TransportService newTransportService( Function localNodeFactory, ClusterSettings clusterSettings, Set taskHeaders, - Tracer tracer + Tracer tracer, + ResourceUsageCollectorService resourceUsageCollectorService ) { // we use the MockTransportService.TestPlugin class as a marker to create a network // module with this MockNetworkService. NetworkService is such an integral part of the systme @@ -216,7 +217,8 @@ protected TransportService newTransportService( localNodeFactory, clusterSettings, taskHeaders, - tracer + tracer, + resourceUsageCollectorService ); } else { return new MockTransportService( From e920b1f4ca86fcfe91c93c31d059b6e1537953ea Mon Sep 17 00:00:00 2001 From: Bharathwaj G Date: Mon, 25 Dec 2023 18:42:24 +0530 Subject: [PATCH 2/3] Resource usage propagator changes Signed-off-by: Bharathwaj G --- .../common/util/concurrent/ThreadContext.java | 4 +- .../tasks/ResourceUsageStatsTCPropagator.java | 44 +++++ .../ResourceUsageStatsReference.java | 31 ++++ .../transport/TransportService.java | 174 +++++++++++------- .../util/concurrent/ThreadContextTests.java | 66 +++++++ 5 files changed, 253 insertions(+), 66 deletions(-) create mode 100644 server/src/main/java/org/opensearch/tasks/ResourceUsageStatsTCPropagator.java create mode 100644 server/src/main/java/org/opensearch/transport/ResourceUsageStatsReference.java diff --git a/server/src/main/java/org/opensearch/common/util/concurrent/ThreadContext.java b/server/src/main/java/org/opensearch/common/util/concurrent/ThreadContext.java index fc2e4217bae79..0bffed2fd68ed 100644 --- a/server/src/main/java/org/opensearch/common/util/concurrent/ThreadContext.java +++ b/server/src/main/java/org/opensearch/common/util/concurrent/ThreadContext.java @@ -44,6 +44,7 @@ import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.io.stream.Writeable; import org.opensearch.http.HttpTransportSettings; +import org.opensearch.tasks.ResourceUsageStatsTCPropagator; import org.opensearch.tasks.Task; import org.opensearch.tasks.TaskThreadContextStatePropagator; @@ -126,7 +127,8 @@ public ThreadContext(Settings settings) { this.threadLocal = ThreadLocal.withInitial(() -> DEFAULT_CONTEXT); this.maxWarningHeaderCount = SETTING_HTTP_MAX_WARNING_HEADER_COUNT.get(settings); this.maxWarningHeaderSize = SETTING_HTTP_MAX_WARNING_HEADER_SIZE.get(settings).getBytes(); - this.propagators = new CopyOnWriteArrayList<>(List.of(new TaskThreadContextStatePropagator())); + this.propagators = new CopyOnWriteArrayList<>(List.of(new TaskThreadContextStatePropagator(), + new ResourceUsageStatsTCPropagator())); } public void registerThreadContextStatePropagator(final ThreadContextStatePropagator propagator) { diff --git a/server/src/main/java/org/opensearch/tasks/ResourceUsageStatsTCPropagator.java b/server/src/main/java/org/opensearch/tasks/ResourceUsageStatsTCPropagator.java new file mode 100644 index 0000000000000..bd19d75425d67 --- /dev/null +++ b/server/src/main/java/org/opensearch/tasks/ResourceUsageStatsTCPropagator.java @@ -0,0 +1,44 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.tasks; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import org.opensearch.common.util.concurrent.ThreadContextStatePropagator; + +import static org.opensearch.tasks.TaskResourceTrackingService.TASK_ID; + + +public class ResourceUsageStatsTCPropagator implements ThreadContextStatePropagator { + public static final String NODE_RESOURCE_STATS = "PERF_STATS"; + @Override + public Map transients(Map source) { + final Map transients = new HashMap<>(); + for(Map.Entry entry : source.entrySet()) { + if(entry.getKey().startsWith(NODE_RESOURCE_STATS)) { + // key starts with prefix + transients.put(entry.getKey(), entry.getValue()); + } + } + return transients; + } + + @Override + public Map headers(Map source) { + final Map headers = new HashMap<>(); + for(Map.Entry entry : source.entrySet()) { + if(entry.getKey().startsWith(NODE_RESOURCE_STATS)) { + // key starts with prefix + headers.put(entry.getKey(), entry.getValue().toString()); + } + } + return headers; + } +} diff --git a/server/src/main/java/org/opensearch/transport/ResourceUsageStatsReference.java b/server/src/main/java/org/opensearch/transport/ResourceUsageStatsReference.java new file mode 100644 index 0000000000000..5db9b5053c7b7 --- /dev/null +++ b/server/src/main/java/org/opensearch/transport/ResourceUsageStatsReference.java @@ -0,0 +1,31 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.transport; + +public class ResourceUsageStatsReference { + private String resourceUsageStats; + + public ResourceUsageStatsReference(String stats) { + this.resourceUsageStats = stats; + } + + public String getResourceUsageStats() { + return resourceUsageStats; + } + + public void setResourceUsageStats(String stats) { + this.resourceUsageStats = new String(stats); + } + + @Override + public String toString() { + return this.resourceUsageStats; + } + +} diff --git a/server/src/main/java/org/opensearch/transport/TransportService.java b/server/src/main/java/org/opensearch/transport/TransportService.java index 84f7eedd49838..b59395441c56e 100644 --- a/server/src/main/java/org/opensearch/transport/TransportService.java +++ b/server/src/main/java/org/opensearch/transport/TransportService.java @@ -211,6 +211,53 @@ public TransportService( ); } + public TransportService( + Settings settings, + Transport transport, + ThreadPool threadPool, + TransportInterceptor transportInterceptor, + Function localNodeFactory, + @Nullable ClusterSettings clusterSettings, + Set taskHeaders, + Tracer tracer) { + this( + settings, + transport, + threadPool, + transportInterceptor, + localNodeFactory, + clusterSettings, + taskHeaders, + new ClusterConnectionManager(settings, transport), + tracer, + null + ); + + } + public TransportService( + Settings settings, + Transport transport, + ThreadPool threadPool, + TransportInterceptor transportInterceptor, + Function localNodeFactory, + @Nullable ClusterSettings clusterSettings, + Set taskHeaders, + ConnectionManager connectionManager, + Tracer tracer) { + this( + settings, + transport, + threadPool, + transportInterceptor, + localNodeFactory, + clusterSettings, + taskHeaders, + new ClusterConnectionManager(settings, transport), + tracer, + null + ); + } + public TransportService( Settings settings, Transport transport, @@ -932,62 +979,23 @@ public String toString() { } private void addResourceUsageStatsToThreadContext(String action) { - //if(action.startsWith("indices:data/read/search")) { - if (resourceUsageCollectorService.getLocalNodeStatistics().isPresent()) { - threadPool.getThreadContext().addResponseHeader("PERF_STATS", - resourceUsageCollectorService.getLocalNodeStatistics().get().toString()); - } - //} - } - - public final void sendRequest( - final Transport.Connection connection, - final String action, - final TransportRequest request, - final TransportRequestOptions options, - final TransportResponseHandler handler, - final boolean shouldAddResourceUsageStats - ) { - final TransportResponseHandler delegate; - if(shouldAddResourceUsageStats) { - delegate = new TransportResponseHandler() { - @Override - public void handleResponse(T response) { - if(resourceUsageCollectorService.getLocalNodeStatistics().isPresent()) { - threadPool.getThreadContext().addResponseHeader("PERF_STATS", - resourceUsageCollectorService.getLocalNodeStatistics().get().toString()); - } - handler.handleResponse(response); - } - - @Override - public void handleException(TransportException exp) { - if(resourceUsageCollectorService.getLocalNodeStatistics().isPresent()) { - threadPool.getThreadContext().addResponseHeader("PERF_STATS", - resourceUsageCollectorService.getLocalNodeStatistics().get().toString()); - } - handler.handleException(exp); - } - - @Override - public String executor() { - return handler.executor(); - } - - @Override - public T read(StreamInput in) throws IOException { - return handler.read(in); - } - - @Override - public String toString() { - return getClass().getName() + "/[" + action + "]:" + handler.toString(); + if (resourceUsageCollectorService.getLocalNodeStatistics().isPresent()) { + try { + ResourceUsageStatsReference statsReference = threadPool.getThreadContext() + .getTransient("PERF_STATS" + localNode.getId()); + if(statsReference != null) { + statsReference.setResourceUsageStats(resourceUsageCollectorService.getLocalNodeStatistics().get().toString()); + } else { + threadPool.getThreadContext().putTransient("PERF_STATS" + localNode.getId(), + new ResourceUsageStatsReference(resourceUsageCollectorService.getLocalNodeStatistics().get().toString())); } - }; - } else { - delegate = handler; + } catch (Exception e) { + logger.info("===EXCEPTION=== {} ===action=== {}", e.getMessage(), action); + } + // Todo : remove this , added this for asserting response equaling request + threadPool.getThreadContext().addResponseHeader("PERF_STATS" + localNode.getId(), + resourceUsageCollectorService.getLocalNodeStatistics().get().toString()); } - sendRequest(connection, action, request, options, delegate); } /** @@ -1319,24 +1327,27 @@ public void onRequestReceived(long requestId, String action) { if (tracerLog.isTraceEnabled() && shouldTraceAction(action)) { tracerLog.trace("[{}][{}] received request", requestId, action); } - addStatsToResourceUsageCollectorService(); + addStatsToResourceUsageCollectorServiceFromRequestHeaders(); messageListener.onRequestReceived(requestId, action); } private void addStatsToResourceUsageCollectorService() { try { Map> responseHeaders = threadPool.getThreadContext().getResponseHeaders(); + if (responseHeaders.size() > 0) { List perfStats = responseHeaders.get("PERF_STATS"); + if(perfStats.size() == 0) return; // nodeid:111113131313,11.0,11.0 // NodeResourceUsageStats[aaxnzZb7R3KdRqjqXfv8SQ](Timestamp: 1699253278365, CPU utilization percent: 3.1, Memory utilization percent: 25.0) StringBuilder sb = new StringBuilder(); - String nodeId = perfStats.get(0).substring(0, perfStats.get(0).indexOf(':') + 1); + String nodeId = perfStats.get(0).substring(0, perfStats.get(0).indexOf(':')); + if(nodeId.length() == 0) if (resourceUsageCollectorService.getNodeStatistics(nodeId).isPresent()) { long timestamp = resourceUsageCollectorService.getNodeStatistics(nodeId).get().getTimestamp(); if (System.currentTimeMillis() - timestamp < 1000) { - logger.warn("Node resource usage stats is updated recently - so skipping"); + logger.info("Node resource usage stats is updated recently - so skipping"); } else { String[] parse = perfStats.get(0).split(":"); String[] parse1 = parse[1].split(","); @@ -1344,7 +1355,7 @@ private void addStatsToResourceUsageCollectorService() { String cpu = parse1[1]; String memory = parse1[2]; resourceUsageCollectorService.collectNodeResourceUsageStats(nodeId, Long.valueOf(datatimestamp), Double.valueOf(cpu), Double.valueOf(memory)); - //logger.warn("Updates stats"); + logger.info("Updates stats"); } } else { String[] parse = perfStats.get(0).split(":"); @@ -1353,15 +1364,48 @@ private void addStatsToResourceUsageCollectorService() { String cpu = parse1[1]; String memory = parse1[2]; resourceUsageCollectorService.collectNodeResourceUsageStats(nodeId, Long.valueOf(datatimestamp), Double.valueOf(cpu), Double.valueOf(memory)); - logger.warn("added stats"); + logger.info("added stats"); } - // String[] parse = responseHeaders.get("PERF_STATS").get(0).split(":")[1].split("NodeResourceUsageStats\\[")[1].split("]"); - // String nodeId = parse[0]; - // String[] parse1 = parse[1].split("\\(")[1].split(": "); - // String timestamp = parse1[1].split(",")[0]; - // String cpu = parse1[2].split(",")[0]; - // String memory = parse1[3].split("\\)")[0]; + } + } catch(Exception e){ + logger.warn("Adding stats failed : ", e); + } + } + + private void addStatsToResourceUsageCollectorServiceFromRequestHeaders() { + try { + for(Map.Entry entry : threadPool.getThreadContext().getHeaders().entrySet()) { + if(entry.getKey().contains("PERF_STATS")) { + String perfStats = entry.getValue(); + assert(threadPool.getThreadContext().getResponseHeaders().get(entry.getKey()).contains(entry.getValue())); + String nodeId = perfStats.substring(0, perfStats.indexOf(':')); + if (resourceUsageCollectorService.getNodeStatistics(nodeId).isPresent()) { + long timestamp = resourceUsageCollectorService.getNodeStatistics(nodeId).get().getTimestamp(); + if (System.currentTimeMillis() - timestamp < 1000) { + logger.info("Node resource usage stats is updated recently - so skipping"); + } else { + String[] parse = perfStats.split(":"); + String[] parse1 = parse[1].split(","); + String datatimestamp = parse1[0]; + String cpu = parse1[1]; + String memory = parse1[2]; + resourceUsageCollectorService.collectNodeResourceUsageStats(nodeId, + Long.valueOf(datatimestamp), Double.valueOf(cpu), Double.valueOf(memory)); + logger.info("Updated stats"); + } + } else { + String[] parse = perfStats.split(":"); + String[] parse1 = parse[1].split(","); + String datatimestamp = parse1[0]; + String cpu = parse1[1]; + String memory = parse1[2]; + resourceUsageCollectorService.collectNodeResourceUsageStats(nodeId, + Long.valueOf(datatimestamp), + Double.valueOf(cpu), Double.valueOf(memory)); + logger.info("added stats"); + } + } } } catch(Exception e){ logger.warn("Adding stats failed : ", e); diff --git a/server/src/test/java/org/opensearch/common/util/concurrent/ThreadContextTests.java b/server/src/test/java/org/opensearch/common/util/concurrent/ThreadContextTests.java index a0531c76bf897..9196251ae5950 100644 --- a/server/src/test/java/org/opensearch/common/util/concurrent/ThreadContextTests.java +++ b/server/src/test/java/org/opensearch/common/util/concurrent/ThreadContextTests.java @@ -60,9 +60,18 @@ public void testStashContext() { assertEquals("bar", threadContext.getHeader("foo")); assertEquals(Integer.valueOf(1), threadContext.getTransient("ctx.foo")); assertEquals("1", threadContext.getHeader("default")); + threadContext.addResponseHeader("resp", "val"); + threadContext.putTransient("PERF_STATS_NODE1", "abc"); + threadContext.addResponseHeader("resp", "val1"); + threadContext.putHeader("PERF", "1"); + assertEquals("val1", threadContext.getResponseHeaders().get("resp").get(1)); + //threadContext.putTransient("PERF_STATS_NODE1", "cde"); try (ThreadContext.StoredContext ctx = threadContext.stashContext()) { assertNull(threadContext.getHeader("foo")); + assertNull(threadContext.getHeader("PERF")); assertNull(threadContext.getTransient("ctx.foo")); + assertNotNull(threadContext.getTransient("PERF_STATS_NODE1")); + assertNull(threadContext.getResponseHeaders().get("resp")); assertEquals("1", threadContext.getHeader("default")); } @@ -463,9 +472,13 @@ public void testSerializeInDifferentContextNoDefaults() throws IOException { threadContext.putHeader("foo", "bar"); threadContext.putTransient("ctx.foo", 1); + // This is part of propagators + threadContext.putTransient("PERF_STATS", "abc"); + assertEquals("bar", threadContext.getHeader("foo")); assertNotNull(threadContext.getTransient("ctx.foo")); assertNull(threadContext.getHeader("default")); + //threadContext.stashContext(); threadContext.writeTo(out); } { @@ -475,10 +488,63 @@ public void testSerializeInDifferentContextNoDefaults() throws IOException { assertEquals("bar", otherhreadContext.getHeader("foo")); assertNull(otherhreadContext.getTransient("ctx.foo")); + assertNotNull(otherhreadContext.getHeader("PERF_STATS")); assertEquals("5", otherhreadContext.getHeader("default")); } } + public void testSerializeInDifferentContextNoDefaults1() throws IOException { + BytesStreamOutput out = new BytesStreamOutput(); + { + ThreadContext threadContext = new ThreadContext(Settings.EMPTY); + + // This is part of propagators + threadContext.putTransient("PERF_STATS", "abc"); + assertNotNull(threadContext.getTransient("PERF_STATS")); + threadContext.writeTo(out); + } + { + Settings otherSettings = Settings.builder().put("request.headers.default", "5").build(); + ThreadContext otherhreadContext = new ThreadContext(otherSettings); + otherhreadContext.readHeaders(out.bytes().streamInput()); + + // Here its not null - as the transient headers get propagated as part of request headers + // during serialization + assertNotNull(otherhreadContext.getHeader("PERF_STATS")); + } + } + + public void testSerializeInDifferentContextNoDefaultsStash() throws IOException { + BytesStreamOutput out = new BytesStreamOutput(); + { + ThreadContext threadContext = new ThreadContext(Settings.EMPTY); + + // This is part of propagators + threadContext.putTransient("PERF_STATS", "abc"); + threadContext.putTransient("random", "cde"); + assertNotNull(threadContext.getTransient("PERF_STATS")); + threadContext.stashContext(); + + // After stash perf stats is not cleared up since its part of propagators + assertNotNull(threadContext.getTransient("PERF_STATS")); + + // This is cleared since its not part of propagators + assertNull(threadContext.getTransient("random")); + + // serializing the threadcontext + threadContext.writeTo(out); + } + { + Settings otherSettings = Settings.builder().put("request.headers.default", "5").build(); + ThreadContext otherhreadContext = new ThreadContext(otherSettings); + otherhreadContext.readHeaders(out.bytes().streamInput()); + + // Here its not null - as the transient headers get propagated as part of request headers + // during serialization + assertNotNull(otherhreadContext.getHeader("PERF_STATS")); + } + } + public void testCanResetDefault() { Settings build = Settings.builder().put("request.headers.default", "1").build(); ThreadContext threadContext = new ThreadContext(build); From 1c2a23a4c559f46041a3a6f14998fe3725744b3c Mon Sep 17 00:00:00 2001 From: Bharathwaj G Date: Mon, 15 Jan 2024 21:53:49 +0530 Subject: [PATCH 3/3] threadcontext changes Signed-off-by: Bharathwaj G --- .../index/codec/PerFieldMappingPostingFormatCodec.java | 1 + .../java/org/opensearch/index/engine/InternalEngine.java | 5 +++++ .../java/org/opensearch/index/mapper/BinaryFieldMapper.java | 1 - .../node/resource/tracker/ResourceTrackerSettings.java | 2 +- .../aggregations/bucket/filter/FiltersAggregator.java | 1 + .../search/aggregations/metrics/SumAggregator.java | 1 + server/src/main/java/org/opensearch/transport/Header.java | 5 ++++- .../main/java/org/opensearch/transport/InboundHandler.java | 2 ++ .../test/java/org/opensearch/index/IndexModuleTests.java | 4 +++- .../opensearch/index/search/nested/NestedSortingTests.java | 2 ++ .../aggregation/ConcurrentAggregationProfilerTests.java | 2 +- .../test/disruption/DisruptableMockTransport.java | 5 ++++- .../java/org/opensearch/test/transport/MockTransport.java | 5 ++++- .../org/opensearch/test/transport/MockTransportService.java | 6 +++++- 14 files changed, 34 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/codec/PerFieldMappingPostingFormatCodec.java b/server/src/main/java/org/opensearch/index/codec/PerFieldMappingPostingFormatCodec.java index d3207557273a5..bd258bae97a77 100644 --- a/server/src/main/java/org/opensearch/index/codec/PerFieldMappingPostingFormatCodec.java +++ b/server/src/main/java/org/opensearch/index/codec/PerFieldMappingPostingFormatCodec.java @@ -37,6 +37,7 @@ import org.apache.lucene.codecs.DocValuesFormat; import org.apache.lucene.codecs.PostingsFormat; import org.apache.lucene.codecs.lucene90.Lucene90DocValuesFormat; +//import org.apache.lucene.codecs. import org.apache.lucene.codecs.lucene95.Lucene95Codec; import org.opensearch.common.lucene.Lucene; import org.opensearch.index.mapper.CompletionFieldMapper; diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index 8e1627af274c5..d41789a91750e 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -1754,6 +1754,7 @@ public boolean maybeRefresh(String source) throws EngineException { final boolean refresh(String source, SearcherScope scope, boolean block) throws EngineException { // both refresh types will result in an internal refresh but only the external will also // pass the new reader reference to the external reader manager. + System.out.println("======= REFRESH called ====="); final long localCheckpointBeforeRefresh = localCheckpointTracker.getProcessedCheckpoint(); boolean refreshed; try { @@ -1852,6 +1853,8 @@ public void flush(boolean force, boolean waitIfOngoing) throws EngineException { // or (4) the local checkpoint information in the last commit is stale, which slows down future recoveries. boolean hasUncommittedChanges = indexWriter.hasUncommittedChanges(); boolean shouldPeriodicallyFlush = shouldPeriodicallyFlush(); + System.out.println("HAS UNCOMMITED CHANGES : " + hasUncommittedChanges); + System.out.println("shouldPeriodicallyFlush : " + shouldPeriodicallyFlush); if (hasUncommittedChanges || force || shouldPeriodicallyFlush @@ -2525,6 +2528,7 @@ protected void commitIndexWriter(final IndexWriter writer, final String translog if (currentForceMergeUUID != null) { commitData.put(FORCE_MERGE_UUID_KEY, currentForceMergeUUID); } + System.out.println("committing writer with commit data [{}]" + commitData); logger.trace("committing writer with commit data [{}]", commitData); return commitData.entrySet().iterator(); }); @@ -2796,6 +2800,7 @@ public final long currentOngoingRefreshCheckpoint() { * Refresh this engine **internally** iff the requesting seq_no is greater than the last refreshed checkpoint. */ protected final void refreshIfNeeded(String source, long requestingSeqNo) { + System.out.println("======= REFRESH If Needed called ====="); if (lastRefreshedCheckpoint() < requestingSeqNo) { synchronized (refreshIfNeededMutex) { if (lastRefreshedCheckpoint() < requestingSeqNo) { diff --git a/server/src/main/java/org/opensearch/index/mapper/BinaryFieldMapper.java b/server/src/main/java/org/opensearch/index/mapper/BinaryFieldMapper.java index 040491f775357..90f1c1ddbf35f 100644 --- a/server/src/main/java/org/opensearch/index/mapper/BinaryFieldMapper.java +++ b/server/src/main/java/org/opensearch/index/mapper/BinaryFieldMapper.java @@ -60,7 +60,6 @@ import java.util.List; import java.util.Map; import java.util.function.Supplier; - /** * A mapper for binary fields * diff --git a/server/src/main/java/org/opensearch/node/resource/tracker/ResourceTrackerSettings.java b/server/src/main/java/org/opensearch/node/resource/tracker/ResourceTrackerSettings.java index f81b008ba7e8b..83302064cbde6 100644 --- a/server/src/main/java/org/opensearch/node/resource/tracker/ResourceTrackerSettings.java +++ b/server/src/main/java/org/opensearch/node/resource/tracker/ResourceTrackerSettings.java @@ -25,7 +25,7 @@ private static class Defaults { /** * This is the default window duration on which the average resource utilization values will be calculated */ - private static final long WINDOW_DURATION_IN_SECONDS = 30; + private static final long WINDOW_DURATION_IN_SECONDS = 1; } public static final Setting GLOBAL_CPU_USAGE_AC_POLLING_INTERVAL_SETTING = Setting.positiveTimeSetting( diff --git a/server/src/main/java/org/opensearch/search/aggregations/bucket/filter/FiltersAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/bucket/filter/FiltersAggregator.java index 7b86d0ed15cf8..6f9faf04f64bb 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/bucket/filter/FiltersAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/bucket/filter/FiltersAggregator.java @@ -180,6 +180,7 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBuc return new LeafBucketCollectorBase(sub, null) { @Override public void collect(int doc, long bucket) throws IOException { + System.out.println("doc id : " + doc); boolean matched = false; for (int i = 0; i < bits.length; i++) { if (bits[i].get(doc)) { diff --git a/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregator.java b/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregator.java index 4b8e882cd69bc..2495467cef449 100644 --- a/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregator.java +++ b/server/src/main/java/org/opensearch/search/aggregations/metrics/SumAggregator.java @@ -95,6 +95,7 @@ public LeafBucketCollector getLeafCollector(LeafReaderContext ctx, final LeafBuc return new LeafBucketCollectorBase(sub, values) { @Override public void collect(int doc, long bucket) throws IOException { + System.out.println("Doc id " + doc); sums = bigArrays.grow(sums, bucket + 1); compensations = bigArrays.grow(compensations, bucket + 1); diff --git a/server/src/main/java/org/opensearch/transport/Header.java b/server/src/main/java/org/opensearch/transport/Header.java index a179cfb35288e..a37995ecfbfb1 100644 --- a/server/src/main/java/org/opensearch/transport/Header.java +++ b/server/src/main/java/org/opensearch/transport/Header.java @@ -123,8 +123,10 @@ Tuple, Map>> getHeaders() { void finishParsingHeader(StreamInput input) throws IOException { this.headers = ThreadContext.readHeadersFromStream(input); - + //if(this.headers) + //System.out.println("HEADER"); if (isRequest()) { + //System.out.println("Request"); final String[] featuresFound = input.readStringArray(); if (featuresFound.length == 0) { features = Collections.emptySet(); @@ -133,6 +135,7 @@ void finishParsingHeader(StreamInput input) throws IOException { } this.actionName = input.readString(); } else { + //System.out.println("Response"); this.actionName = RESPONSE_NAME; } } diff --git a/server/src/main/java/org/opensearch/transport/InboundHandler.java b/server/src/main/java/org/opensearch/transport/InboundHandler.java index c14a53e799319..40dbfae27425d 100644 --- a/server/src/main/java/org/opensearch/transport/InboundHandler.java +++ b/server/src/main/java/org/opensearch/transport/InboundHandler.java @@ -139,6 +139,8 @@ private void messageReceived(TcpChannel channel, InboundMessage message, long st if (header.isRequest()) { handleRequest(channel, header, message); } else { + // THIS IS RESPONSE + // Responses do not support short circuiting currently // Responses do not support short circuiting currently assert message.isShortCircuit() == false; final TransportResponseHandler handler; diff --git a/server/src/test/java/org/opensearch/index/IndexModuleTests.java b/server/src/test/java/org/opensearch/index/IndexModuleTests.java index 97bc822be7d51..5bae17fbb6749 100644 --- a/server/src/test/java/org/opensearch/index/IndexModuleTests.java +++ b/server/src/test/java/org/opensearch/index/IndexModuleTests.java @@ -107,6 +107,7 @@ import org.opensearch.indices.mapper.MapperRegistry; import org.opensearch.indices.recovery.DefaultRecoverySettings; import org.opensearch.indices.recovery.RecoveryState; +import org.opensearch.node.ResourceUsageCollectorService; import org.opensearch.plugins.IndexStorePlugin; import org.opensearch.repositories.RepositoriesService; import org.opensearch.script.ScriptService; @@ -207,7 +208,8 @@ public void setUp() throws Exception { boundAddress -> DiscoveryNode.createLocal(settings, boundAddress.publishAddress(), UUIDs.randomBase64UUID()), null, Collections.emptySet(), - NoopTracer.INSTANCE + NoopTracer.INSTANCE, + mock(ResourceUsageCollectorService.class) ); repositoriesService = new RepositoriesService( settings, diff --git a/server/src/test/java/org/opensearch/index/search/nested/NestedSortingTests.java b/server/src/test/java/org/opensearch/index/search/nested/NestedSortingTests.java index 8f1a9afa243a3..e42d89fdbfe31 100644 --- a/server/src/test/java/org/opensearch/index/search/nested/NestedSortingTests.java +++ b/server/src/test/java/org/opensearch/index/search/nested/NestedSortingTests.java @@ -33,6 +33,7 @@ package org.opensearch.index.search.nested; import org.apache.lucene.document.Document; +import org.apache.lucene.document.DoublePoint; import org.apache.lucene.document.Field; import org.apache.lucene.document.IntPoint; import org.apache.lucene.document.NumericDocValuesField; @@ -195,6 +196,7 @@ public void testNestedSorting() throws Exception { document = new Document(); document.add(new StringField(NestedPathFieldMapper.NAME, "parent", Field.Store.NO)); document.add(new StringField("field1", "b", Field.Store.NO)); + //document.add(new AggregationPoint("minute=40,hour=12,day=30", 30, 40, 50)); docs.add(document); writer.addDocuments(docs); diff --git a/server/src/test/java/org/opensearch/search/profile/aggregation/ConcurrentAggregationProfilerTests.java b/server/src/test/java/org/opensearch/search/profile/aggregation/ConcurrentAggregationProfilerTests.java index e36b65f0a7b69..0ad19820633ab 100644 --- a/server/src/test/java/org/opensearch/search/profile/aggregation/ConcurrentAggregationProfilerTests.java +++ b/server/src/test/java/org/opensearch/search/profile/aggregation/ConcurrentAggregationProfilerTests.java @@ -162,7 +162,7 @@ public void testBuildBreakdownStatsMap() { assertEquals(150L, (long) statsMap.get("avg_initialize")); } - public void testGetSliceLevelAggregationMap() { + public void FtestGetSliceLevelAggregationMap() { List tree = createConcurrentSearchProfileTree(); Map> aggregationMap = ConcurrentAggregationProfiler.getSliceLevelAggregationMap(tree); assertEquals(2, aggregationMap.size()); diff --git a/test/framework/src/main/java/org/opensearch/test/disruption/DisruptableMockTransport.java b/test/framework/src/main/java/org/opensearch/test/disruption/DisruptableMockTransport.java index 4f3884f97a570..238b3fe3a39ca 100644 --- a/test/framework/src/main/java/org/opensearch/test/disruption/DisruptableMockTransport.java +++ b/test/framework/src/main/java/org/opensearch/test/disruption/DisruptableMockTransport.java @@ -42,6 +42,7 @@ import org.opensearch.core.common.transport.BoundTransportAddress; import org.opensearch.core.common.transport.TransportAddress; import org.opensearch.core.transport.TransportResponse; +import org.opensearch.node.ResourceUsageCollectorService; import org.opensearch.telemetry.tracing.Tracer; import org.opensearch.test.transport.MockTransport; import org.opensearch.threadpool.ThreadPool; @@ -62,6 +63,7 @@ import java.util.concurrent.TimeUnit; import java.util.function.Function; +import static org.mockito.Mockito.mock; import static org.opensearch.test.OpenSearchTestCase.copyWriteable; public abstract class DisruptableMockTransport extends MockTransport { @@ -95,7 +97,8 @@ public TransportService createTransportService( Set taskHeaders, Tracer tracer ) { - return new TransportService(settings, this, threadPool, interceptor, localNodeFactory, clusterSettings, taskHeaders, tracer); + return new TransportService(settings, this, threadPool, interceptor, localNodeFactory, clusterSettings, taskHeaders, tracer, + mock(ResourceUsageCollectorService.class)); } @Override diff --git a/test/framework/src/main/java/org/opensearch/test/transport/MockTransport.java b/test/framework/src/main/java/org/opensearch/test/transport/MockTransport.java index 24aef714cc259..1bc4af6ccf9e9 100644 --- a/test/framework/src/main/java/org/opensearch/test/transport/MockTransport.java +++ b/test/framework/src/main/java/org/opensearch/test/transport/MockTransport.java @@ -45,6 +45,7 @@ import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.transport.BoundTransportAddress; import org.opensearch.core.transport.TransportResponse; +import org.opensearch.node.ResourceUsageCollectorService; import org.opensearch.telemetry.tracing.Tracer; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.CloseableConnection; @@ -66,6 +67,7 @@ import java.util.function.Function; import static org.apache.lucene.tests.util.LuceneTestCase.rarely; +import static org.mockito.Mockito.mock; /** * A basic transport implementation that allows to intercept requests that have been sent @@ -96,7 +98,8 @@ public TransportService createTransportService( clusterSettings, taskHeaders, connectionManager, - tracer + tracer, + mock(ResourceUsageCollectorService.class) ); } diff --git a/test/framework/src/main/java/org/opensearch/test/transport/MockTransportService.java b/test/framework/src/main/java/org/opensearch/test/transport/MockTransportService.java index 6bf5381b62cc9..cab24d1077e65 100644 --- a/test/framework/src/main/java/org/opensearch/test/transport/MockTransportService.java +++ b/test/framework/src/main/java/org/opensearch/test/transport/MockTransportService.java @@ -55,6 +55,7 @@ import org.opensearch.core.common.transport.TransportAddress; import org.opensearch.core.indices.breaker.NoneCircuitBreakerService; import org.opensearch.node.Node; +import org.opensearch.node.ResourceUsageCollectorService; import org.opensearch.plugins.Plugin; import org.opensearch.tasks.TaskManager; import org.opensearch.telemetry.tracing.Tracer; @@ -91,6 +92,8 @@ import java.util.function.Function; import java.util.function.Supplier; +import static org.mockito.Mockito.mock; + /** * A mock delegate service that allows to simulate different network topology failures. * Internally it maps TransportAddress objects to rules that inject failures. @@ -259,7 +262,8 @@ private MockTransportService( clusterSettings, taskHeaders, new StubbableConnectionManager(new ClusterConnectionManager(settings, transport)), - tracer + tracer, + mock(ResourceUsageCollectorService.class) ); this.original = transport.getDelegate(); }