Skip to content

Commit

Permalink
More wildcard fixes
Browse files Browse the repository at this point in the history
Signed-off-by: Harsha Vamsi Kalluri <[email protected]>
  • Loading branch information
harshavamsi committed Jan 22, 2025
1 parent 364edbe commit 131c8ad
Show file tree
Hide file tree
Showing 53 changed files with 659 additions and 555 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ public InternalAggregation.ReduceContext forFinalReduction() {
PipelineAggregator.PipelineTree.EMPTY
);
}
});
}
);

@State(Scope.Benchmark)
public static class TermsList extends AbstractList<InternalAggregations> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,3 @@ public interface StreamIterator extends Closeable {
*/
VectorSchemaRoot getRoot();
}

Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,13 @@ public CompletableFuture<ArrowReader> collect(BufferAllocator allocator) {
public CompletableFuture<RecordBatchStream> getStream(BufferAllocator allocator) {
CompletableFuture<RecordBatchStream> result = new CompletableFuture<>();
long runtimePointer = ctx.getRuntime();
DataFusion.executeStream(
runtimePointer,
ptr,
(String errString, long streamId) -> {
if (errString != null && errString.isEmpty() == false) {
result.completeExceptionally(new RuntimeException(errString));
} else {
result.complete(new RecordBatchStream(ctx, streamId, allocator));
}
});
DataFusion.executeStream(runtimePointer, ptr, (String errString, long streamId) -> {
if (errString != null && errString.isEmpty() == false) {
result.completeExceptionally(new RuntimeException(errString));
} else {
result.complete(new RecordBatchStream(ctx, streamId, allocator));
}
});
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ public void onCancel() {
void close() throws Exception {
if (recordBatchStream != null) {
recordBatchStream.close();
};
}
;
df.close();
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
package org.opensearch.datafusion;

import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ public RecordBatchStream(SessionContext ctx, long streamId, BufferAllocator allo

private static native void destroy(long pointer);


@Override
public void close() throws Exception {
destroy(ptr);
Expand All @@ -48,28 +47,24 @@ public void close() throws Exception {

public CompletableFuture<Boolean> loadNextBatch() {
ensureInitialized();
long runtimePointer = context.getRuntime();
long runtimePointer = context.getRuntime();
CompletableFuture<Boolean> result = new CompletableFuture<>();
next(
runtimePointer,
ptr,
(String errString, long arrowArrayAddress) -> {
if (errString != null && errString.isEmpty() == false) {
result.completeExceptionally(new RuntimeException(errString));
} else if (arrowArrayAddress == 0) {
// Reached end of stream
result.complete(false);
} else {
try {
ArrowArray arrowArray = ArrowArray.wrap(arrowArrayAddress);
Data.importIntoVectorSchemaRoot(
allocator, arrowArray, vectorSchemaRoot, dictionaryProvider);
result.complete(true);
} catch (Exception e) {
result.completeExceptionally(e);
}
next(runtimePointer, ptr, (String errString, long arrowArrayAddress) -> {
if (errString != null && errString.isEmpty() == false) {
result.completeExceptionally(new RuntimeException(errString));
} else if (arrowArrayAddress == 0) {
// Reached end of stream
result.complete(false);
} else {
try {
ArrowArray arrowArray = ArrowArray.wrap(arrowArrayAddress);
Data.importIntoVectorSchemaRoot(allocator, arrowArray, vectorSchemaRoot, dictionaryProvider);
result.complete(true);
} catch (Exception e) {
result.completeExceptionally(e);
}
});
}
});
return result;
}

Expand All @@ -92,22 +87,20 @@ private void ensureInitialized() {
private Schema getSchema() {
// Native method is not async, but use a future to store the result for convenience
CompletableFuture<Schema> result = new CompletableFuture<>();
getSchema(
ptr,
(errString, arrowSchemaAddress) -> {
if (errString != null && errString.isEmpty() == false) {
result.completeExceptionally(new RuntimeException(errString));
} else {
try {
ArrowSchema arrowSchema = ArrowSchema.wrap(arrowSchemaAddress);
Schema schema = Data.importSchema(allocator, arrowSchema, dictionaryProvider);
result.complete(schema);
// The FFI schema will be released from rust when it is dropped
} catch (Exception e) {
result.completeExceptionally(e);
}
getSchema(ptr, (errString, arrowSchemaAddress) -> {
if (errString != null && errString.isEmpty() == false) {
result.completeExceptionally(new RuntimeException(errString));
} else {
try {
ArrowSchema arrowSchema = ArrowSchema.wrap(arrowSchemaAddress);
Schema schema = Data.importSchema(allocator, arrowSchema, dictionaryProvider);
result.complete(schema);
// The FFI schema will be released from rust when it is dropped
} catch (Exception e) {
result.completeExceptionally(e);
}
});
}
});
return result.join();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.dictionary.DictionaryProvider;
import org.apache.arrow.vector.ipc.message.IpcOption;
import org.opensearch.arrow.StreamProducer;
import org.opensearch.arrow.StreamManager;
import org.opensearch.arrow.StreamProducer;
import org.opensearch.arrow.StreamTicket;
import org.opensearch.test.OpenSearchTestCase;

Expand Down Expand Up @@ -92,8 +92,7 @@ public void start(VectorSchemaRoot root) {
}

@Override
public void start(VectorSchemaRoot root, DictionaryProvider dictionaries, IpcOption option) {
}
public void start(VectorSchemaRoot root, DictionaryProvider dictionaries, IpcOption option) {}

@Override
public void putNext(ArrowBuf metadata) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@

package org.opensearch.flight;

import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.common.settings.Settings;
import org.opensearch.test.OpenSearchTestCase;

public class FlightServiceTests extends OpenSearchTestCase {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class FlightStreamIteratorTests extends OpenSearchTestCase {
public class FlightStreamIteratorTests extends OpenSearchTestCase {

private FlightStream mockFlightStream;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@

import java.util.Collections;

import static org.mockito.Mockito.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class FlightStreamManagerTests extends OpenSearchTestCase {

Expand All @@ -36,7 +38,7 @@ public void setUp() throws Exception {
FlightService flightService = mock(FlightService.class);
when(flightService.getFlightClient(NODE_ID)).thenReturn(flightClient);
BufferAllocator allocator = mock(BufferAllocator.class);
flightStreamManager = new FlightStreamManager(()->allocator, flightService);
flightStreamManager = new FlightStreamManager(() -> allocator, flightService);
}

public void testGetStreamIterator() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
package org.opensearch.flight;

import org.opensearch.cluster.service.ClusterService;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.common.settings.Settings;
import org.opensearch.test.OpenSearchTestCase;

import java.util.Collection;

Expand All @@ -29,19 +29,41 @@ public void setUp() throws Exception {
}

public void testCreateComponents() {
Collection<Object> components = flightStreamPlugin.createComponents(null, mock(ClusterService.class), null,null, null,null, null, null, null, null, null);
Collection<Object> components = flightStreamPlugin.createComponents(
null,
mock(ClusterService.class),
null,
null,
null,
null,
null,
null,
null,
null,
null
);
assertNotNull(components);
assertTrue(components.stream().anyMatch(component -> component instanceof FlightService));
}

public void testGetStreamManager() {
}
public void testGetStreamManager() {}

public void testGetSettings() {
}
public void testGetSettings() {}

public void testCreateComponentsWithNullArguments() {
Collection<Object> components = flightStreamPlugin.createComponents(null, mock(ClusterService.class), null,null, null,null, null, null, null, null, null);
Collection<Object> components = flightStreamPlugin.createComponents(
null,
mock(ClusterService.class),
null,
null,
null,
null,
null,
null,
null,
null,
null
);
assertNotNull(components);
assertFalse(components.isEmpty());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@
import org.opensearch.arrow.StreamProducer;
import org.opensearch.test.OpenSearchTestCase;

import static org.mockito.Mockito.*;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class ProxyStreamProducerTests extends OpenSearchTestCase {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,13 +279,13 @@
import org.opensearch.action.search.SearchScrollAction;
import org.opensearch.action.search.StreamedJoinAction;
import org.opensearch.action.search.TransportClearScrollAction;
import org.opensearch.action.search.TransportStreamedJoinAction;
import org.opensearch.action.search.TransportCreatePitAction;
import org.opensearch.action.search.TransportDeletePitAction;
import org.opensearch.action.search.TransportGetAllPitsAction;
import org.opensearch.action.search.TransportMultiSearchAction;
import org.opensearch.action.search.TransportSearchAction;
import org.opensearch.action.search.TransportSearchScrollAction;
import org.opensearch.action.search.TransportStreamedJoinAction;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.AutoCreateIndex;
import org.opensearch.action.support.DestructiveOperations;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,10 @@ private void innerRun() throws Exception {
queryAndFetchOptimization ? queryResults : fetchResults.getAtomicArray()
);
if (queryAndFetchOptimization) {
assert phaseResults.isEmpty() || phaseResults.get(0).fetchResult() != null : "phaseResults empty ["
+ phaseResults.isEmpty()
+ "], single result: "
+ phaseResults.get(0).fetchResult();
// assert phaseResults.isEmpty() || phaseResults.get(0).fetchResult() != null : "phaseResults empty ["
// + phaseResults.isEmpty()
// + "], single result: "
// + phaseResults.get(0).fetchResult();
// query AND fetch optimization
finishPhase.run();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,10 @@
import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import static org.opensearch.action.ValidateActions.addValidationError;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

import org.apache.lucene.search.TotalHits;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.core.action.ActionResponse;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
Expand Down Expand Up @@ -42,7 +41,7 @@ public OSTicket getTicket() {

public JoinResponse(OSTicket ticket) {
this.ticket = ticket;
this.hits = new SearchHits(new SearchHit[]{}, new TotalHits(0, TotalHits.Relation.EQUAL_TO), 0f);
this.hits = new SearchHits(new SearchHit[] {}, new TotalHits(0, TotalHits.Relation.EQUAL_TO), 0f);
}

public JoinResponse(SearchHits hits) {
Expand All @@ -58,14 +57,12 @@ public JoinResponse(StreamInput in) throws IOException {

@Override
public void writeTo(StreamOutput out) throws IOException {
ticket.writeTo(out);
hits.writeTo(out);
ticket.writeTo(out);
hits.writeTo(out);
}

@Override
public String toString() {
return "JoinResponse{" +
"ticket=" + ticket +
'}';
return "JoinResponse{" + "ticket=" + ticket + '}';
}
}
Loading

0 comments on commit 131c8ad

Please sign in to comment.