Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement FlightSqlProducer CommandGetCatalogs #53

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions services/arrow-flight/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@
<groupId>org.apache.arrow</groupId>
<artifactId>flight-grpc</artifactId>
</dependency>
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>flight-sql</artifactId>
<version>6.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
Expand Down

Large diffs are not rendered by default.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package com.dremio.service.flight.impl;

import static org.apache.arrow.flight.sql.impl.FlightSql.*;

import org.apache.arrow.flight.FlightDescriptor;
import org.apache.arrow.flight.FlightEndpoint;
import org.apache.arrow.flight.FlightInfo;
Expand All @@ -24,6 +26,7 @@

import com.dremio.exec.proto.UserProtos;
import com.dremio.service.flight.TicketContent.PreparedStatementTicket;
import com.dremio.service.flight.protector.CancellableUserResponseHandler;
import com.google.common.collect.ImmutableList;
import com.google.protobuf.ByteString;

Expand All @@ -35,10 +38,10 @@ public class FlightPreparedStatement {

private final FlightDescriptor flightDescriptor;
private final String query;
private final CreatePreparedStatementResponseHandler responseHandler;
private final CancellableUserResponseHandler<UserProtos.CreatePreparedStatementArrowResp> responseHandler;

public FlightPreparedStatement(FlightDescriptor flightDescriptor, String query,
CreatePreparedStatementResponseHandler responseHandler) {
CancellableUserResponseHandler<UserProtos.CreatePreparedStatementArrowResp> responseHandler) {
this.flightDescriptor = flightDescriptor;
this.query = query;
this.responseHandler = responseHandler;
Expand All @@ -56,7 +59,7 @@ public FlightInfo getFlightInfo(Location location) {

final PreparedStatementTicket preparedStatementTicketContent = PreparedStatementTicket.newBuilder()
.setQuery(query)
.setHandle(createPreparedStatementResp.getPreparedStatement().getServerHandle())
.setHandle(getServerHandle())
.build();

final Ticket ticket = new Ticket(preparedStatementTicketContent.toByteArray());
Expand All @@ -75,6 +78,22 @@ public Schema getSchema() {
return buildSchema(resp.getPreparedStatement().getArrowSchema());
}

public ActionCreatePreparedStatementResult createAction() {
final UserProtos.CreatePreparedStatementArrowResp createPreparedStatementResp = responseHandler.get();
final Schema schema = buildSchema(createPreparedStatementResp.getPreparedStatement().getArrowSchema());

return ActionCreatePreparedStatementResult.newBuilder()
.setDatasetSchema(ByteString.copyFrom(schema.toByteArray()))
.setParameterSchema(ByteString.EMPTY)
.setPreparedStatementHandle(getServerHandle().toByteString())
.build();
}

public UserProtos.PreparedStatementHandle getServerHandle() {
UserProtos.CreatePreparedStatementArrowResp createPreparedStatementResp = responseHandler.get();
return createPreparedStatementResp.getPreparedStatement().getServerHandle();
}

private static Schema buildSchema(ByteString arrowSchema) {
return Schema.deserialize(arrowSchema.asReadOnlyByteBuffer());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.dremio.service.flight.impl;

import java.nio.charset.StandardCharsets;
Expand All @@ -23,7 +24,11 @@
import org.apache.arrow.flight.CallStatus;
import org.apache.arrow.flight.FlightDescriptor;
import org.apache.arrow.flight.FlightProducer;
import org.apache.arrow.flight.sql.FlightSqlProducer;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.util.Text;

import com.dremio.common.utils.protos.ExternalIdHelper;
import com.dremio.exec.proto.UserBitShared;
Expand All @@ -35,9 +40,9 @@
import com.dremio.options.OptionManager;
import com.dremio.sabot.rpc.user.UserSession;
import com.dremio.service.flight.DremioFlightServiceOptions;
import com.dremio.service.flight.TicketContent;
import com.dremio.service.flight.impl.RunQueryResponseHandler.BackpressureHandlingResponseHandler;
import com.dremio.service.flight.impl.RunQueryResponseHandler.BasicResponseHandler;
import com.dremio.service.flight.protector.CancellableUserResponseHandler;
import com.google.common.annotations.VisibleForTesting;

/**
Expand Down Expand Up @@ -67,7 +72,8 @@ public FlightWorkManager(Provider<UserWorker> workerProvider,
* @return A FlightPreparedStatement which consumes the result of the job.
*/
public FlightPreparedStatement createPreparedStatement(FlightDescriptor flightDescriptor,
Supplier<Boolean> isRequestCancelled, UserSession userSession) {
Supplier<Boolean> isRequestCancelled,
UserSession userSession) {
final String query = getQuery(flightDescriptor);

final UserProtos.CreatePreparedStatementArrowReq createPreparedStatementReq =
Expand All @@ -79,18 +85,20 @@ public FlightPreparedStatement createPreparedStatement(FlightDescriptor flightDe
final UserRequest userRequest =
new UserRequest(UserProtos.RpcType.CREATE_PREPARED_STATEMENT_ARROW, createPreparedStatementReq);

final CreatePreparedStatementResponseHandler createPreparedStatementResponseHandler =
new CreatePreparedStatementResponseHandler(prepareExternalId, userSession, workerProvider, isRequestCancelled);
final CancellableUserResponseHandler<UserProtos.CreatePreparedStatementArrowResp>
createPreparedStatementResponseHandler =
new CancellableUserResponseHandler<>(prepareExternalId, userSession,
workerProvider, isRequestCancelled, UserProtos.CreatePreparedStatementArrowResp.class);

workerProvider.get().submitWork(prepareExternalId, userSession, createPreparedStatementResponseHandler,
userRequest, TerminationListenerRegistry.NOOP);

return new FlightPreparedStatement(flightDescriptor, query, createPreparedStatementResponseHandler);
}

public void runPreparedStatement(TicketContent.PreparedStatementTicket ticket, FlightProducer.ServerStreamListener listener,
BufferAllocator allocator, UserSession userSession) {

public void runPreparedStatement(UserProtos.PreparedStatementHandle preparedStatementHandle,
FlightProducer.ServerStreamListener listener, BufferAllocator allocator,
UserSession userSession) {
final UserBitShared.ExternalId runExternalId = ExternalIdHelper.generateExternalId();
final UserRequest userRequest =
new UserRequest(UserProtos.RpcType.RUN_QUERY,
Expand All @@ -100,13 +108,54 @@ public void runPreparedStatement(TicketContent.PreparedStatementTicket ticket, F
.setWorkloadType(UserBitShared.WorkloadType.FLIGHT)
.setWorkloadClass(UserBitShared.WorkloadClass.GENERAL))
.setSource(UserProtos.SubmissionSource.FLIGHT)
.setPreparedStatementHandle(ticket.getHandle())
.setPreparedStatementHandle(preparedStatementHandle)
.build());

final UserResponseHandler responseHandler = runQueryResponseHandlerFactory.getHandler(runExternalId, userSession,
workerProvider, optionManagerProvider, listener, allocator);

workerProvider.get().submitWork(runExternalId, userSession, responseHandler, userRequest, TerminationListenerRegistry.NOOP);
workerProvider.get()
.submitWork(runExternalId, userSession, responseHandler, userRequest, TerminationListenerRegistry.NOOP);
}

/**
* Submits a GET_CATALOGS job to a worker and sends the response to given ServerStreamListener.
*
* @param listener ServerStreamListener listening to the job result.
* @param allocator BufferAllocator used to allocate the response VectorSchemaRoot.
* @param userSession The session for the user which made the request.
*/
public void getCatalogs(FlightProducer.ServerStreamListener listener, BufferAllocator allocator,
Supplier<Boolean> isRequestCancelled, UserSession userSession) {
final UserBitShared.ExternalId runExternalId = ExternalIdHelper.generateExternalId();
final UserRequest userRequest =
new UserRequest(UserProtos.RpcType.GET_CATALOGS, UserProtos.GetCatalogsReq.newBuilder().build());

final CancellableUserResponseHandler<UserProtos.GetCatalogsResp> responseHandler =
new CancellableUserResponseHandler<>(runExternalId, userSession, workerProvider, isRequestCancelled,
UserProtos.GetCatalogsResp.class);

workerProvider.get()
.submitWork(runExternalId, userSession, responseHandler, userRequest, TerminationListenerRegistry.NOOP);

UserProtos.GetCatalogsResp response = responseHandler.get();
try (VectorSchemaRoot vectorSchemaRoot = VectorSchemaRoot.create(FlightSqlProducer.Schemas.GET_CATALOGS_SCHEMA,
allocator)) {
listener.start(vectorSchemaRoot);

vectorSchemaRoot.allocateNew();
VarCharVector catalogNameVector = (VarCharVector) vectorSchemaRoot.getVector("catalog_name");

int i = 0;
for (UserProtos.CatalogMetadata catalogMetadata : response.getCatalogsList()) {
catalogNameVector.setSafe(i, new Text(catalogMetadata.getCatalogName()));
i++;
}

vectorSchemaRoot.setRowCount(response.getCatalogsCount());
listener.putNext();
listener.completed();
}
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,12 @@
import org.apache.arrow.flight.FlightRuntimeException;

import com.dremio.common.exceptions.UserException;
import com.dremio.common.utils.protos.QueryWritableBatch;
import com.dremio.exec.proto.GeneralRPCProtos;
import com.dremio.exec.proto.UserBitShared;
import com.dremio.exec.rpc.RpcOutcomeListener;
import com.dremio.exec.work.protector.UserResponseHandler;
import com.dremio.exec.work.protector.UserResult;
import com.dremio.exec.work.protector.UserWorker;
import com.dremio.sabot.rpc.user.UserSession;
import com.dremio.service.flight.error.mapping.DremioFlightErrorMapper;
Expand All @@ -39,21 +43,62 @@
*
* @param <T> The response type.
*/
public abstract class CancellableUserResponseHandler<T> implements UserResponseHandler {
public class CancellableUserResponseHandler<T> implements UserResponseHandler {
private final CompletableFuture<T> future = new CompletableFuture<>();
private final Supplier<Boolean> isRequestCancelled;
private final UserBitShared.ExternalId externalId;
private final UserSession userSession;
private final Provider<UserWorker> workerProvider;
private final Class<T> responseType;

public CancellableUserResponseHandler(UserBitShared.ExternalId externalId,
UserSession userSession,
Provider<UserWorker> workerProvider,
Supplier<Boolean> isRequestCancelled) {
Supplier<Boolean> isRequestCancelled,
Class<T> responseType) {
this.externalId = externalId;
this.userSession = userSession;
this.workerProvider = workerProvider;
this.isRequestCancelled = isRequestCancelled;
this.responseType = responseType;
}

@Override
public final void sendData(RpcOutcomeListener<GeneralRPCProtos.Ack> outcomeListener, QueryWritableBatch result) {
throw new UnsupportedOperationException("A response sender based implementation should send no data to end users.");
}

@Override
public void completed(UserResult result) {
switch (result.getState()) {
case COMPLETED:
getCompletableFuture().complete(result.unwrap(responseType));
break;
case FAILED:
getCompletableFuture().completeExceptionally(
DremioFlightErrorMapper.toFlightRuntimeException(result.getException()));
break;
case CANCELED:
final Exception canceledException = result.getException();
getCompletableFuture().completeExceptionally(
CallStatus.CANCELLED
.withCause(canceledException)
.withDescription(canceledException.getMessage())
.toRuntimeException());
break;

case STARTING:
case RUNNING:
case NO_LONGER_USED_1:
case ENQUEUED:
default:
getCompletableFuture().completeExceptionally(
CallStatus.INTERNAL
.withCause(new IllegalStateException())
.withDescription("Internal Error: Invalid planning state.")
.toRuntimeException());
break;
}
}

public T get() {
Expand Down Expand Up @@ -104,4 +149,5 @@ public void cancelJob() {
protected CompletableFuture<T> getCompletableFuture() {
return future;
}

}
Loading