Skip to content

Commit

Permalink
Vamshi's changes for Term Agg using Stream
Browse files Browse the repository at this point in the history
Signed-off-by: Rishabh Maurya <[email protected]>
  • Loading branch information
rishabhmaurya committed Jan 31, 2025
1 parent 60a3586 commit edabf74
Show file tree
Hide file tree
Showing 56 changed files with 2,030 additions and 219 deletions.
6 changes: 5 additions & 1 deletion .idea/runConfigurations/Debug_OpenSearch.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -290,9 +290,9 @@ allprojects {
}

// See please https://bugs.openjdk.java.net/browse/JDK-8209058
if (BuildParams.runtimeJavaVersion > JavaVersion.VERSION_11) {
compile.options.compilerArgs << '-Werror'
}
// if (BuildParams.runtimeJavaVersion > JavaVersion.VERSION_11) {
// compile.options.compilerArgs << '-Werror'
// }
compile.options.compilerArgs << '-Xlint:auxiliaryclass'
compile.options.compilerArgs << '-Xlint:cast'
compile.options.compilerArgs << '-Xlint:classfile'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ public class OpenSearchNode implements TestClusterConfiguration {
private boolean isWorkingDirConfigured = false;
private String httpPort = "0";
private String transportPort = "0";
private String streamPort = "0";
private Path confPathData;
private String keystorePassword = "";
private boolean preserveDataDir = false;
Expand Down Expand Up @@ -1175,6 +1176,8 @@ private void createConfiguration() {
baseConfig.put("node.portsfile", "true");
baseConfig.put("http.port", httpPort);
baseConfig.put("transport.port", transportPort);
baseConfig.put("node.attr.transport.stream.port", streamPort);

// Default the watermarks to absurdly low to prevent the tests from failing on nodes without enough disk space
baseConfig.put("cluster.routing.allocation.disk.watermark.low", "1b");
baseConfig.put("cluster.routing.allocation.disk.watermark.high", "1b");
Expand Down Expand Up @@ -1447,6 +1450,10 @@ void setTransportPort(String transportPort) {
this.transportPort = transportPort;
}

void setStreamPort(String streamPort) {
this.streamPort = streamPort;
}

void setDataPath(Path dataPath) {
this.confPathData = dataPath;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public class RunTask extends DefaultTestClustersTask {
public static final String CUSTOM_SETTINGS_PREFIX = "tests.opensearch.";
private static final int DEFAULT_HTTP_PORT = 9200;
private static final int DEFAULT_TRANSPORT_PORT = 9300;
private static final int DEFAULT_STREAM_PORT = 9880;
private static final int DEFAULT_DEBUG_PORT = 5005;
public static final String LOCALHOST_ADDRESS_PREFIX = "127.0.0.1:";

Expand Down Expand Up @@ -140,6 +141,8 @@ public void beforeStart() {
int debugPort = DEFAULT_DEBUG_PORT;
int httpPort = DEFAULT_HTTP_PORT;
int transportPort = DEFAULT_TRANSPORT_PORT;
int streamPort = DEFAULT_STREAM_PORT;

Map<String, String> additionalSettings = System.getProperties()
.entrySet()
.stream()
Expand All @@ -164,15 +167,19 @@ public void beforeStart() {
firstNode.setHttpPort(String.valueOf(httpPort));
httpPort++;
firstNode.setTransportPort(String.valueOf(transportPort));
firstNode.setStreamPort(String.valueOf(streamPort));
transportPort++;
streamPort++;
firstNode.setting("discovery.seed_hosts", LOCALHOST_ADDRESS_PREFIX + DEFAULT_TRANSPORT_PORT);
cluster.setPreserveDataDir(preserveData);
for (OpenSearchNode node : cluster.getNodes()) {
if (node != firstNode) {
node.setHttpPort(String.valueOf(httpPort));
httpPort++;
node.setTransportPort(String.valueOf(transportPort));
node.setStreamPort(String.valueOf(streamPort));
transportPort++;
streamPort++;
node.setting("discovery.seed_hosts", LOCALHOST_ADDRESS_PREFIX + DEFAULT_TRANSPORT_PORT);
}
additionalSettings.forEach(node::setting);
Expand Down
4 changes: 2 additions & 2 deletions distribution/archives/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
* under the License.
*/

import org.opensearch.gradle.JavaPackageType
import org.opensearch.gradle.JavaPackageType

apply plugin: 'opensearch.internal-distribution-archive-setup'

Expand Down Expand Up @@ -190,7 +190,7 @@ distribution_archives {
}
}


linuxPpc64leTar {
archiveClassifier = 'linux-ppc64le'
content {
Expand Down
2 changes: 1 addition & 1 deletion distribution/src/config/jvm.options
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,4 @@ ${error.file}
# See please https://bugs.openjdk.org/browse/JDK-8341127 (openjdk/jdk#21283)
23:-XX:CompileCommand=dontinline,java/lang/invoke/MethodHandle.setAsTypeCache
23:-XX:CompileCommand=dontinline,java/lang/invoke/MethodHandle.asTypeUncached
24:-add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED
--add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED
21 changes: 18 additions & 3 deletions libs/core/licenses/log4j-api-NOTICE.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,20 @@
Apache log4j
Copyright 2007 The Apache Software Foundation
Apache Log4j
Copyright 1999-2024 Apache Software Foundation

This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
The Apache Software Foundation (http://www.apache.org/).

ResolverUtil.java
Copyright 2005-2006 Tim Fennell

Dumbster SMTP test server
Copyright 2004 Jason Paul Kitchen

TypeUtil.java
Copyright 2002-2012 Ramnivas Laddad, Juergen Hoeller, Chris Beams

picocli (http://picocli.info)
Copyright 2017 Remko Popma

TimeoutBlockingWaitStrategy.java and parts of Util.java
Copyright 2011 LMAX Ltd.
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.arrow.flight.api;

import org.opensearch.client.node.NodeClient;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.RestRequest;
import org.opensearch.rest.action.RestToXContentListener;

import java.util.List;

import static org.opensearch.rest.RestRequest.Method.GET;

/**
* It handles GET requests for retrieving Flight server information.
*/
public class FlightServerInfoAction extends BaseRestHandler {

/**
* Constructor for FlightServerInfoAction.
*/
public FlightServerInfoAction() {}

/**
* Returns the name of the action.
* @return The name of the action.
*/
@Override
public String getName() {
return "flight_server_info_action";
}

/**
* Returns the list of routes for the action.
* @return The list of routes for the action.
*/
@Override
public List<Route> routes() {
return List.of(new Route(GET, "/_flight/info"), new Route(GET, "/_flight/info/{nodeId}"));
}

/**
* Prepares the request for the action.
* @param request The REST request.
* @param client The node client.
* @return The rest channel consumer.
*/
@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) {
String nodeId = request.param("nodeId");
if (nodeId != null) {
// Query specific node
NodesFlightInfoRequest nodesRequest = new NodesFlightInfoRequest(nodeId);
return channel -> client.execute(NodesFlightInfoAction.INSTANCE, nodesRequest, new RestToXContentListener<>(channel));
} else {
NodesFlightInfoRequest nodesRequest = new NodesFlightInfoRequest();
return channel -> client.execute(NodesFlightInfoAction.INSTANCE, nodesRequest, new RestToXContentListener<>(channel));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.arrow.flight.api;

import org.opensearch.action.support.nodes.BaseNodeResponse;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.transport.BoundTransportAddress;
import org.opensearch.core.common.transport.TransportAddress;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;

import java.io.IOException;

/**
* Represents the response for a node's flight information.
*/
public class NodeFlightInfo extends BaseNodeResponse implements ToXContentObject {
private final BoundTransportAddress boundAddress;

/**
* Constructor for NodeFlightInfo.
* @param in The stream input to read from.
* @throws IOException If an I/O error occurs.
*/
public NodeFlightInfo(StreamInput in) throws IOException {
super(in);
boundAddress = new BoundTransportAddress(in);
}

/**
* Constructor for NodeFlightInfo.
* @param node The discovery node.
* @param boundAddress The bound transport address.
*/
public NodeFlightInfo(DiscoveryNode node, BoundTransportAddress boundAddress) {
super(node);
this.boundAddress = boundAddress;
}

/**
* Writes the node flight information to the stream.
* @param out The stream output to write to.
* @throws IOException If an I/O error occurs.
*/
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
boundAddress.writeTo(out);
}

/**
* Returns the bound transport address.
* @return The bound transport address.
*/
public BoundTransportAddress getBoundAddress() {
return boundAddress;
}

/**
* Converts the node flight information to XContent.
* @param builder The XContent builder.
* @param params The parameters for the XContent conversion.
* @return The XContent builder.
* @throws IOException If an I/O error occurs.
*/
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startObject();
builder.startObject("flight_server");

builder.startArray("bound_addresses");
for (TransportAddress address : boundAddress.boundAddresses()) {
builder.startObject();
builder.field("host", address.address().getHostString());
builder.field("port", address.address().getPort());
builder.endObject();
}
builder.endArray();

TransportAddress publishAddress = boundAddress.publishAddress();
builder.startObject("publish_address");
builder.field("host", publishAddress.address().getHostString());
builder.field("port", publishAddress.address().getPort());
builder.endObject();

builder.endObject();
builder.endObject();
return builder;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.arrow.flight.api;

import org.opensearch.action.ActionType;

/**
* Action to retrieve flight info from nodes
*/
public class NodesFlightInfoAction extends ActionType<NodesFlightInfoResponse> {
/**
* Singleton instance of NodesFlightInfoAction.
*/
public static final NodesFlightInfoAction INSTANCE = new NodesFlightInfoAction();
/**
* Name of this action.
*/
public static final String NAME = "cluster:admin/flight/info";

NodesFlightInfoAction() {
super(NAME, NodesFlightInfoResponse::new);
}
}
Loading

0 comments on commit edabf74

Please sign in to comment.