diff --git a/server/src/main/java/io/deephaven/server/arrow/ArrowFlightUtil.java b/server/src/main/java/io/deephaven/server/arrow/ArrowFlightUtil.java index 56f225ea3c7..fd1ae43a142 100644 --- a/server/src/main/java/io/deephaven/server/arrow/ArrowFlightUtil.java +++ b/server/src/main/java/io/deephaven/server/arrow/ArrowFlightUtil.java @@ -120,6 +120,7 @@ public static void DoGetCustom( .queryPerformanceRecorder(queryPerformanceRecorder) .require(tableExport) .onError(observer) + .onSuccess(observer::onCompleted) .submit(() -> { metrics.queueNanos = System.nanoTime() - queueStartTm; Object export = tableExport.get(); @@ -146,8 +147,6 @@ public static void DoGetCustom( // shared code between `DoGet` and `BarrageSnapshotRequest` BarrageUtil.createAndSendSnapshot(streamGeneratorFactory, table, null, null, false, DEFAULT_SNAPSHOT_DESER_OPTIONS, listener, metrics); - - listener.onCompleted(); }); } } @@ -544,6 +543,27 @@ public void handleMessage(@NotNull final BarrageProtoUtil.MessageInfo message) { .queryPerformanceRecorder(queryPerformanceRecorder) .require(tableExport) .onError(listener) + .onSuccess(() -> { + final HalfClosedState newState = halfClosedState.updateAndGet(current -> { + switch (current) { + case DONT_CLOSE: + // record that we have finished sending + return HalfClosedState.FINISHED_SENDING; + case CLIENT_HALF_CLOSED: + // since streaming has now finished, and client already half-closed, + // time to half close from server + return HalfClosedState.CLOSED; + case FINISHED_SENDING: + case CLOSED: + throw new IllegalStateException("Can't finish streaming twice"); + default: + throw new IllegalStateException("Unknown state " + current); + } + }); + if (newState == HalfClosedState.CLOSED) { + listener.onCompleted(); + } + }) .submit(() -> { metrics.queueNanos = System.nanoTime() - queueStartTm; Object export = tableExport.get(); @@ -586,25 +606,6 @@ public void handleMessage(@NotNull final BarrageProtoUtil.MessageInfo message) { BarrageUtil.createAndSendSnapshot(streamGeneratorFactory, table, columns, viewport, reverseViewport, snapshotOptAdapter.adapt(snapshotRequest), listener, metrics); - HalfClosedState newState = halfClosedState.updateAndGet(current -> { - switch (current) { - case DONT_CLOSE: - // record that we have finished sending - return HalfClosedState.FINISHED_SENDING; - case CLIENT_HALF_CLOSED: - // since streaming has now finished, and client already half-closed, - // time to half close from server - return HalfClosedState.CLOSED; - case FINISHED_SENDING: - case CLOSED: - throw new IllegalStateException("Can't finish streaming twice"); - default: - throw new IllegalStateException("Unknown state " + current); - } - }); - if (newState == HalfClosedState.CLOSED) { - listener.onCompleted(); - } }); } } @@ -614,7 +615,7 @@ public void handleMessage(@NotNull final BarrageProtoUtil.MessageInfo message) { public void close() { // no work to do for DoGetRequest close // possibly safely complete if finished sending data - HalfClosedState newState = halfClosedState.updateAndGet(current -> { + final HalfClosedState newState = halfClosedState.updateAndGet(current -> { switch (current) { case DONT_CLOSE: // record that we have half closed diff --git a/server/src/main/java/io/deephaven/server/arrow/FlightServiceGrpcImpl.java b/server/src/main/java/io/deephaven/server/arrow/FlightServiceGrpcImpl.java index ac3bedf066d..8796748c906 100644 --- a/server/src/main/java/io/deephaven/server/arrow/FlightServiceGrpcImpl.java +++ b/server/src/main/java/io/deephaven/server/arrow/FlightServiceGrpcImpl.java @@ -226,10 +226,8 @@ public void getFlightInfo( .queryPerformanceRecorder(queryPerformanceRecorder) .require(export) .onError(responseObserver) - .submit(() -> { - responseObserver.onNext(export.get()); - responseObserver.onCompleted(); - }); + .onSuccess(responseObserver::onCompleted) + .submit(() -> responseObserver.onNext(export.get())); return; } @@ -273,12 +271,10 @@ public void getSchema( .queryPerformanceRecorder(queryPerformanceRecorder) .require(export) .onError(responseObserver) - .submit(() -> { - responseObserver.onNext(Flight.SchemaResult.newBuilder() - .setSchema(export.get().getSchema()) - .build()); - responseObserver.onCompleted(); - }); + .onSuccess(responseObserver::onCompleted) + .submit(() -> responseObserver.onNext(Flight.SchemaResult.newBuilder() + .setSchema(export.get().getSchema()) + .build())); return; } diff --git a/server/src/main/java/io/deephaven/server/console/ConsoleServiceGrpcImpl.java b/server/src/main/java/io/deephaven/server/console/ConsoleServiceGrpcImpl.java index 8ab6115ccb7..81a8ca4a23b 100644 --- a/server/src/main/java/io/deephaven/server/console/ConsoleServiceGrpcImpl.java +++ b/server/src/main/java/io/deephaven/server/console/ConsoleServiceGrpcImpl.java @@ -183,16 +183,17 @@ public void executeCommand( final SessionState.ExportObject exportedConsole = ticketRouter.resolve(session, consoleId, "consoleId"); - session.nonExport() + session.nonExport() .queryPerformanceRecorder(queryPerformanceRecorder) .requiresSerialQueue() .require(exportedConsole) .onError(responseObserver) + .onSuccess((final ExecuteCommandResponse response) -> safelyComplete(responseObserver, response)) .submit(() -> { - ScriptSession scriptSession = exportedConsole.get(); - ScriptSession.Changes changes = scriptSession.evaluateScript(request.getCode()); - ExecuteCommandResponse.Builder diff = ExecuteCommandResponse.newBuilder(); - FieldsChangeUpdate.Builder fieldChanges = FieldsChangeUpdate.newBuilder(); + final ScriptSession scriptSession = exportedConsole.get(); + final ScriptSession.Changes changes = scriptSession.evaluateScript(request.getCode()); + final ExecuteCommandResponse.Builder diff = ExecuteCommandResponse.newBuilder(); + final FieldsChangeUpdate.Builder fieldChanges = FieldsChangeUpdate.newBuilder(); changes.created.entrySet() .forEach(entry -> fieldChanges.addCreated(makeVariableDefinition(entry))); changes.updated.entrySet() @@ -203,7 +204,7 @@ public void executeCommand( diff.setErrorMessage(Throwables.getStackTraceAsString(changes.error)); log.error().append("Error running script: ").append(changes.error).endl(); } - safelyComplete(responseObserver, diff.setChanges(fieldChanges).build()); + return diff.setChanges(fieldChanges).build(); }); } } @@ -276,7 +277,8 @@ public void bindTableToVariable( ExportBuilder exportBuilder = session.nonExport() .queryPerformanceRecorder(queryPerformanceRecorder) .requiresSerialQueue() - .onError(responseObserver); + .onError(responseObserver) + .onSuccess(responseObserver::onCompleted); if (request.hasConsoleId()) { exportedConsole = ticketRouter.resolve(session, request.getConsoleId(), "consoleId"); @@ -293,7 +295,6 @@ public void bindTableToVariable( Table table = exportedTable.get(); queryScope.putParam(request.getVariableName(), table); responseObserver.onNext(BindTableToVariableResponse.getDefaultInstance()); - responseObserver.onCompleted(); }); } } diff --git a/server/src/main/java/io/deephaven/server/hierarchicaltable/HierarchicalTableServiceGrpcImpl.java b/server/src/main/java/io/deephaven/server/hierarchicaltable/HierarchicalTableServiceGrpcImpl.java index 08e6e9eeba6..be5481e5790 100644 --- a/server/src/main/java/io/deephaven/server/hierarchicaltable/HierarchicalTableServiceGrpcImpl.java +++ b/server/src/main/java/io/deephaven/server/hierarchicaltable/HierarchicalTableServiceGrpcImpl.java @@ -85,10 +85,12 @@ public void rollup( final SessionState.ExportObject sourceTableExport = ticketRouter.resolve(session, request.getSourceTableId(), "sourceTableId"); - session.newExport(request.getResultRollupTableId(), "resultRollupTableId") + session.newExport(request.getResultRollupTableId(), "resultRollupTableId") .queryPerformanceRecorder(queryPerformanceRecorder) .require(sourceTableExport) .onError(responseObserver) + .onSuccess((final RollupTable ignoredResult) -> safelyComplete(responseObserver, + RollupResponse.getDefaultInstance())) .submit(() -> { final Table sourceTable = sourceTableExport.get(); @@ -109,7 +111,6 @@ public void rollup( throw Exceptions.statusRuntimeException( Code.FAILED_PRECONDITION, "Not authorized to rollup hierarchical table"); } - safelyComplete(responseObserver, RollupResponse.getDefaultInstance()); return transformedResult; }); } @@ -141,10 +142,12 @@ public void tree( final SessionState.ExportObject
sourceTableExport = ticketRouter.resolve(session, request.getSourceTableId(), "sourceTableId"); - session.newExport(request.getResultTreeTableId(), "resultTreeTableId") + session.newExport(request.getResultTreeTableId(), "resultTreeTableId") .queryPerformanceRecorder(queryPerformanceRecorder) .require(sourceTableExport) .onError(responseObserver) + .onSuccess((final TreeTable ignoredResult) -> safelyComplete(responseObserver, + TreeResponse.getDefaultInstance())) .submit(() -> { final Table sourceTable = sourceTableExport.get(); @@ -169,7 +172,6 @@ public void tree( throw Exceptions.statusRuntimeException( Code.FAILED_PRECONDITION, "Not authorized to tree hierarchical table"); } - safelyComplete(responseObserver, TreeResponse.getDefaultInstance()); return transformedResult; }); } @@ -202,10 +204,12 @@ public void apply( final SessionState.ExportObject> inputHierarchicalTableExport = ticketRouter.resolve(session, request.getInputHierarchicalTableId(), "inputHierarchicalTableId"); - session.newExport(request.getResultHierarchicalTableId(), "resultHierarchicalTableId") + session.>newExport(request.getResultHierarchicalTableId(), "resultHierarchicalTableId") .queryPerformanceRecorder(queryPerformanceRecorder) .require(inputHierarchicalTableExport) .onError(responseObserver) + .onSuccess((final HierarchicalTable ignoredResult) -> safelyComplete(responseObserver, + HierarchicalTableApplyResponse.getDefaultInstance())) .submit(() -> { final HierarchicalTable inputHierarchicalTable = inputHierarchicalTableExport.get(); @@ -274,7 +278,6 @@ public void apply( throw Exceptions.statusRuntimeException( Code.FAILED_PRECONDITION, "Not authorized to apply to hierarchical table"); } - safelyComplete(responseObserver, HierarchicalTableApplyResponse.getDefaultInstance()); return transformedResult; }); } @@ -395,6 +398,8 @@ public void view( resultExportBuilder .queryPerformanceRecorder(queryPerformanceRecorder) .onError(responseObserver) + .onSuccess((final HierarchicalTableView ignoredResult) -> safelyComplete(responseObserver, + HierarchicalTableViewResponse.getDefaultInstance())) .submit(() -> { final Table keyTable = keyTableExport == null ? null : keyTableExport.get(); final Object target = targetExport.get(); @@ -439,7 +444,6 @@ public void view( throw Exceptions.statusRuntimeException( Code.FAILED_PRECONDITION, "Not authorized to view hierarchical table"); } - safelyComplete(responseObserver, HierarchicalTableViewResponse.getDefaultInstance()); return transformedResult; }); } @@ -483,10 +487,12 @@ public void exportSource( final SessionState.ExportObject> hierarchicalTableExport = ticketRouter.resolve(session, request.getHierarchicalTableId(), "hierarchicalTableId"); - session.newExport(request.getResultTableId(), "resultTableId") + session.
newExport(request.getResultTableId(), "resultTableId") .queryPerformanceRecorder(queryPerformanceRecorder) .require(hierarchicalTableExport) .onError(responseObserver) + .onSuccess((final Table transformedResult) -> safelyComplete(responseObserver, + ExportUtil.buildTableCreationResponse(request.getResultTableId(), transformedResult))) .submit(() -> { final HierarchicalTable hierarchicalTable = hierarchicalTableExport.get(); @@ -499,9 +505,6 @@ public void exportSource( Code.FAILED_PRECONDITION, "Not authorized to export source from hierarchical table"); } - final ExportedTableCreationResponse response = - ExportUtil.buildTableCreationResponse(request.getResultTableId(), transformedResult); - safelyComplete(responseObserver, response); return transformedResult; }); } diff --git a/server/src/main/java/io/deephaven/server/object/ObjectServiceGrpcImpl.java b/server/src/main/java/io/deephaven/server/object/ObjectServiceGrpcImpl.java index 60aa6283e17..b00fcd3a286 100644 --- a/server/src/main/java/io/deephaven/server/object/ObjectServiceGrpcImpl.java +++ b/server/src/main/java/io/deephaven/server/object/ObjectServiceGrpcImpl.java @@ -66,6 +66,7 @@ public ObjectServiceGrpcImpl(SessionService sessionService, TicketRouter ticketR private enum EnqueuedState { WAITING, RUNNING, CLOSED } + private final class SendMessageObserver implements StreamObserver { private final SessionState session; private final StreamObserver responseObserver; @@ -268,10 +269,12 @@ public void fetchObject( final SessionState.ExportObject object = ticketRouter.resolve(session, request.getSourceId().getTicket(), "sourceId"); - session.nonExport() + session.nonExport() .queryPerformanceRecorder(queryPerformanceRecorder) .require(object) .onError(responseObserver) + .onSuccess( + (final FetchObjectResponse response) -> GrpcUtil.safelyComplete(responseObserver, response)) .submit(() -> { final Object o = object.get(); ObjectType objectTypeInstance = getObjectTypeInstance(type, o); @@ -312,9 +315,7 @@ public void onCompleted() { throw Exceptions.statusRuntimeException(Code.INVALID_ARGUMENT, "Plugin didn't close response, use MessageStream instead for this object"); } - GrpcUtil.safelyComplete(responseObserver, message); - - return null; + return message; }); } } diff --git a/server/src/main/java/io/deephaven/server/partitionedtable/PartitionedTableServiceGrpcImpl.java b/server/src/main/java/io/deephaven/server/partitionedtable/PartitionedTableServiceGrpcImpl.java index 7c45821873e..25ba261cfae 100644 --- a/server/src/main/java/io/deephaven/server/partitionedtable/PartitionedTableServiceGrpcImpl.java +++ b/server/src/main/java/io/deephaven/server/partitionedtable/PartitionedTableServiceGrpcImpl.java @@ -67,16 +67,17 @@ public void partitionBy( final SessionState.ExportObject
targetTable = ticketRouter.resolve(session, request.getTableId(), "tableId"); - session.newExport(request.getResultId(), "resultId") + session.newExport(request.getResultId(), "resultId") .queryPerformanceRecorder(queryPerformanceRecorder) .require(targetTable) .onError(responseObserver) + .onSuccess((final PartitionedTable ignoredResult) -> safelyComplete(responseObserver, + PartitionByResponse.getDefaultInstance())) .submit(() -> { authWiring.checkPermissionPartitionBy(session.getAuthContext(), request, Collections.singletonList(targetTable.get())); PartitionedTable partitionedTable = targetTable.get().partitionBy(request.getDropKeys(), request.getKeyColumnNamesList().toArray(String[]::new)); - safelyComplete(responseObserver, PartitionByResponse.getDefaultInstance()); return partitionedTable; }); } @@ -97,10 +98,12 @@ public void merge( final SessionState.ExportObject partitionedTable = ticketRouter.resolve(session, request.getPartitionedTable(), "partitionedTable"); - session.newExport(request.getResultId(), "resultId") + session.
newExport(request.getResultId(), "resultId") .queryPerformanceRecorder(queryPerformanceRecorder) .require(partitionedTable) .onError(responseObserver) + .onSuccess((final Table merged) -> safelyComplete(responseObserver, + buildTableCreationResponse(request.getResultId(), merged))) .submit(() -> { final Table table = partitionedTable.get().table(); authWiring.checkPermissionMerge(session.getAuthContext(), request, @@ -116,9 +119,6 @@ public void merge( throw Exceptions.statusRuntimeException( Code.FAILED_PRECONDITION, "Not authorized to merge table."); } - final ExportedTableCreationResponse response = - buildTableCreationResponse(request.getResultId(), merged); - safelyComplete(responseObserver, response); return merged; }); } @@ -142,10 +142,12 @@ public void getTable( final SessionState.ExportObject
keys = ticketRouter.resolve(session, request.getKeyTableTicket(), "keyTable"); - session.newExport(request.getResultId(), "resultId") + session.
newExport(request.getResultId(), "resultId") .queryPerformanceRecorder(queryPerformanceRecorder) .require(partitionedTable, keys) .onError(responseObserver) + .onSuccess((final Table table) -> safelyComplete(responseObserver, + buildTableCreationResponse(request.getResultId(), table))) .submit(() -> { Table table; Table keyTable = keys.get(); @@ -189,13 +191,10 @@ public void getTable( }); } table = authorizationTransformation.transform(table); - final ExportedTableCreationResponse response = - buildTableCreationResponse(request.getResultId(), table); if (table == null) { throw Exceptions.statusRuntimeException( Code.FAILED_PRECONDITION, "Not authorized to get table."); } - safelyComplete(responseObserver, response); return table; }); } diff --git a/server/src/main/java/io/deephaven/server/session/SessionServiceGrpcImpl.java b/server/src/main/java/io/deephaven/server/session/SessionServiceGrpcImpl.java index f9ccce190d2..ed04b4f154a 100644 --- a/server/src/main/java/io/deephaven/server/session/SessionServiceGrpcImpl.java +++ b/server/src/main/java/io/deephaven/server/session/SessionServiceGrpcImpl.java @@ -184,11 +184,9 @@ public void exportFromTicket( .queryPerformanceRecorder(queryPerformanceRecorder) .require(source) .onError(responseObserver) - .submit(() -> { - final Object o = source.get(); - GrpcUtil.safelyComplete(responseObserver, ExportResponse.getDefaultInstance()); - return o; - }); + .onSuccess((final Object ignoredResult) -> GrpcUtil.safelyComplete(responseObserver, + ExportResponse.getDefaultInstance())) + .submit(source::get); } } diff --git a/server/src/main/java/io/deephaven/server/table/ops/TableServiceGrpcImpl.java b/server/src/main/java/io/deephaven/server/table/ops/TableServiceGrpcImpl.java index 3fdcc66811b..52487e54c72 100644 --- a/server/src/main/java/io/deephaven/server/table/ops/TableServiceGrpcImpl.java +++ b/server/src/main/java/io/deephaven/server/table/ops/TableServiceGrpcImpl.java @@ -455,10 +455,11 @@ public void seekRow( final SessionState.ExportObject
exportedTable = ticketRouter.resolve(session, sourceId, "sourceId"); - session.nonExport() + session.nonExport() .queryPerformanceRecorder(queryPerformanceRecorder) .require(exportedTable) .onError(responseObserver) + .onSuccess((final SeekRowResponse response) -> safelyComplete(responseObserver, response)) .submit(() -> { final Table table = exportedTable.get(); authWiring.checkPermissionSeekRow(session.getAuthContext(), request, @@ -473,8 +474,7 @@ public void seekRow( request.getInsensitive(), request.getContains(), request.getIsBackward()).seek(table); - SeekRowResponse.Builder rowResponse = SeekRowResponse.newBuilder(); - safelyComplete(responseObserver, rowResponse.setResultRow(result).build()); + return SeekRowResponse.newBuilder().setResultRow(result).build(); }); } } @@ -525,18 +525,17 @@ public void batch( if (numRemaining > 0) { return; } - + final StatusRuntimeException failure = firstFailure.get(); try (final SafeCloseable ignored2 = queryPerformanceRecorder.resumeQuery()) { - final StatusRuntimeException failure = firstFailure.get(); - if (failure != null) { - safelyError(responseObserver, failure); - } else { - safelyComplete(responseObserver); - } if (queryPerformanceRecorder.endQuery()) { EngineMetrics.getInstance().logQueryProcessingResults(queryPerformanceRecorder, failure); } } + if (failure != null) { + safelyError(responseObserver, failure); + } else { + safelyComplete(responseObserver); + } }; for (int i = 0; i < exportBuilders.size(); ++i) { @@ -611,23 +610,20 @@ public void getExportedTableCreationResponse( try (final SafeCloseable ignored = queryPerformanceRecorder.startQuery()) { final SessionState.ExportObject export = ticketRouter.resolve(session, request, "request"); - session.nonExport() + session.nonExport() .queryPerformanceRecorder(queryPerformanceRecorder) .require(export) .onError(responseObserver) + .onSuccess((final ExportedTableCreationResponse response) -> safelyComplete(responseObserver, + response)) .submit(() -> { final Object obj = export.get(); if (!(obj instanceof Table)) { - responseObserver.onError( - Exceptions.statusRuntimeException(Code.FAILED_PRECONDITION, - "Ticket is not a table")); - return; + throw Exceptions.statusRuntimeException(Code.FAILED_PRECONDITION, "Ticket is not a table"); } authWiring.checkPermissionGetExportedTableCreationResponse( session.getAuthContext(), request, Collections.singletonList((Table) obj)); - final ExportedTableCreationResponse response = - ExportUtil.buildTableCreationResponse(request, (Table) obj); - safelyComplete(responseObserver, response); + return ExportUtil.buildTableCreationResponse(request, (Table) obj); }); } } @@ -665,17 +661,15 @@ private void oneShotOperationWrapper( .map(ref -> resolveOneShotReference(session, ref)) .collect(Collectors.toList()); - session.newExport(resultId, "resultId") + session.
newExport(resultId, "resultId") .require(dependencies) - .onError(responseObserver) .queryPerformanceRecorder(queryPerformanceRecorder) + .onError(responseObserver) + .onSuccess((final Table result) -> safelyComplete(responseObserver, + ExportUtil.buildTableCreationResponse(resultId, result))) .submit(() -> { operation.checkPermission(request, dependencies); - final Table result = operation.create(request, dependencies); - final ExportedTableCreationResponse response = - ExportUtil.buildTableCreationResponse(resultId, result); - safelyComplete(responseObserver, response); - return result; + return operation.create(request, dependencies); }); } }