Skip to content

Commit

Permalink
set system properties in flight module and read from settings
Browse files Browse the repository at this point in the history
  • Loading branch information
rishabhmaurya committed Oct 3, 2024
1 parent 480dbd8 commit c04cb3e
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -165,14 +165,6 @@ public void beforeStart() {
httpPort++;
firstNode.setTransportPort(String.valueOf(transportPort));
transportPort++;
firstNode.systemProperty("arrow.allocation.manager.type", "Netty");
// firstNode.systemProperty("arrow.memory.debug.allocator", "true");
firstNode.systemProperty("arrow.enable_null_check_for_get", "false");
firstNode.systemProperty("io.netty.tryReflectionSetAccessible", "true");
firstNode.systemProperty("arrow.enable_unsafe_memory_access", "true");
firstNode.systemProperty("io.netty.allocator.numDirectArenas", "2");
firstNode.systemProperty("io.netty.noUnsafe", "false");
firstNode.systemProperty("io.netty.tryUnsafe", "true");
firstNode.setting("discovery.seed_hosts", LOCALHOST_ADDRESS_PREFIX + DEFAULT_TRANSPORT_PORT);

cluster.setPreserveDataDir(preserveData);
Expand Down Expand Up @@ -206,8 +198,6 @@ public void beforeStart() {
node.keystorePassword(keystorePassword);
}
node.jvmArgs("--add-opens=java.base/java.nio=ALL-UNNAMED");
node.jvmArgs("--enable-native-access=ALL-UNNAMED");
node.jvmArgs("--add-opens=jdk.unsupported/sun.misc=ALL-UNNAMED");
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
import org.opensearch.arrow.StreamManager;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.settings.Settings;

import java.io.IOException;

Expand All @@ -29,8 +32,76 @@ public class FlightService extends AbstractLifecycleComponent {
private static BufferAllocator allocator;
private static StreamManager streamManager;
private static final Logger logger = LogManager.getLogger(FlightService.class);
private static String host;
private static int port;

FlightService() {
public static final Setting<String> FLIGHT_HOST = Setting.simpleString(
"opensearch.flight.host",
"localhost",
Property.NodeScope
);

public static final Setting<Integer> FLIGHT_PORT = Setting.intSetting(
"opensearch.flight.port",
8815,
1024,
65535,
Property.NodeScope
);

public static final Setting<String> ARROW_ALLOCATION_MANAGER_TYPE = Setting.simpleString(
"arrow.allocation.manager.type",
"Netty",
Property.NodeScope
);

public static final Setting<Boolean> ARROW_ENABLE_NULL_CHECK_FOR_GET = Setting.boolSetting(
"arrow.enable_null_check_for_get",
false,
Property.NodeScope
);

public static final Setting<Boolean> NETTY_TRY_REFLECTION_SET_ACCESSIBLE = Setting.boolSetting(
"io.netty.tryReflectionSetAccessible",
true,
Property.NodeScope
);

public static final Setting<Boolean> ARROW_ENABLE_UNSAFE_MEMORY_ACCESS = Setting.boolSetting(
"arrow.enable_unsafe_memory_access",
true,
Property.NodeScope
);

public static final Setting<Integer> NETTY_ALLOCATOR_NUM_DIRECT_ARENAS = Setting.intSetting(
"io.netty.allocator.numDirectArenas",
1,
1,
Property.NodeScope
);

public static final Setting<Boolean> NETTY_NO_UNSAFE = Setting.boolSetting(
"io.netty.noUnsafe",
false,
Setting.Property.NodeScope
);

public static final Setting<Boolean> NETTY_TRY_UNSAFE = Setting.boolSetting(
"io.netty.tryUnsafe",
true,
Property.NodeScope
);

FlightService(Settings settings) {
System.setProperty("arrow.allocation.manager.type", ARROW_ALLOCATION_MANAGER_TYPE.get(settings));
System.setProperty("arrow.enable_null_check_for_get", Boolean.toString(ARROW_ENABLE_NULL_CHECK_FOR_GET.get(settings)));
System.setProperty("io.netty.tryReflectionSetAccessible", Boolean.toString(NETTY_TRY_REFLECTION_SET_ACCESSIBLE.get(settings)));
System.setProperty("arrow.enable_unsafe_memory_access", Boolean.toString(ARROW_ENABLE_UNSAFE_MEMORY_ACCESS.get(settings)));
System.setProperty("io.netty.allocator.numDirectArenas", Integer.toString(NETTY_ALLOCATOR_NUM_DIRECT_ARENAS.get(settings)));
System.setProperty("io.netty.noUnsafe", Boolean.toString(NETTY_NO_UNSAFE.get(settings)));
System.setProperty("io.netty.tryUnsafe", Boolean.toString(NETTY_TRY_UNSAFE.get(settings)));
host = FLIGHT_HOST.get(settings);
port = FLIGHT_PORT.get(settings);
streamManager = new FlightStreamManager(client);
}

Expand All @@ -39,9 +110,6 @@ protected void doStart() {
try {
allocator = new RootAllocator(Integer.MAX_VALUE);
BaseFlightProducer producer = new BaseFlightProducer(streamManager, allocator);
// TODO: Load these settings from OpenSearch configuration
String host = "localhost";
int port = 8815;
final Location location = Location.forGrpcInsecure(host, port);
server = FlightServer.builder(allocator, location, producer).build();
client = FlightClient.builder(allocator, location).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
Expand All @@ -23,14 +25,21 @@
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.watcher.ResourceWatcherService;

import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.function.Supplier;

import static org.opensearch.flight.FlightService.*;

public class FlightStreamPlugin extends Plugin implements StreamManagerPlugin {

private FlightService flightService;

public FlightStreamPlugin(Settings settings) {
this.flightService = new FlightService(settings);
}

@Override
public Collection<Object> createComponents(
Client client,
Expand All @@ -45,13 +54,25 @@ public Collection<Object> createComponents(
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<RepositoriesService> repositoriesServiceSupplier
) {
this.flightService = new FlightService();
return List.of(flightService);
}

@Override
public StreamManager getStreamManager() {
return flightService.getStreamManager();
}

@Override
public List<Setting<?>> getSettings() {
return Arrays.asList(
ARROW_ALLOCATION_MANAGER_TYPE,
ARROW_ENABLE_NULL_CHECK_FOR_GET,
NETTY_TRY_REFLECTION_SET_ACCESSIBLE,
FlightService.ARROW_ENABLE_UNSAFE_MEMORY_ACCESS,
FlightService.NETTY_ALLOCATOR_NUM_DIRECT_ARENAS,
FlightService.NETTY_NO_UNSAFE,
FlightService.NETTY_TRY_UNSAFE
);
}
}

0 comments on commit c04cb3e

Please sign in to comment.