Skip to content

Commit

Permalink
Adjust all SessionState.ExportBuilder uses that set a QueryPerformanc…
Browse files Browse the repository at this point in the history
…eRecorder to use onSuccess for response delivery if possible. Exclusions: DoExchange subscriptions and async input table operations. Exceptions: batch already used onSuccess.
  • Loading branch information
rcaudy committed Nov 4, 2024
1 parent 13e1303 commit d4abe74
Show file tree
Hide file tree
Showing 8 changed files with 88 additions and 95 deletions.
45 changes: 23 additions & 22 deletions server/src/main/java/io/deephaven/server/arrow/ArrowFlightUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ public static void DoGetCustom(
.queryPerformanceRecorder(queryPerformanceRecorder)
.require(tableExport)
.onError(observer)
.onSuccess(observer::onCompleted)
.submit(() -> {
metrics.queueNanos = System.nanoTime() - queueStartTm;
Object export = tableExport.get();
Expand All @@ -146,8 +147,6 @@ public static void DoGetCustom(
// shared code between `DoGet` and `BarrageSnapshotRequest`
BarrageUtil.createAndSendSnapshot(streamGeneratorFactory, table, null, null, false,
DEFAULT_SNAPSHOT_DESER_OPTIONS, listener, metrics);

listener.onCompleted();
});
}
}
Expand Down Expand Up @@ -544,6 +543,27 @@ public void handleMessage(@NotNull final BarrageProtoUtil.MessageInfo message) {
.queryPerformanceRecorder(queryPerformanceRecorder)
.require(tableExport)
.onError(listener)
.onSuccess(() -> {
final HalfClosedState newState = halfClosedState.updateAndGet(current -> {
switch (current) {
case DONT_CLOSE:
// record that we have finished sending
return HalfClosedState.FINISHED_SENDING;
case CLIENT_HALF_CLOSED:
// since streaming has now finished, and client already half-closed,
// time to half close from server
return HalfClosedState.CLOSED;
case FINISHED_SENDING:
case CLOSED:
throw new IllegalStateException("Can't finish streaming twice");
default:
throw new IllegalStateException("Unknown state " + current);
}
});
if (newState == HalfClosedState.CLOSED) {
listener.onCompleted();
}
})
.submit(() -> {
metrics.queueNanos = System.nanoTime() - queueStartTm;
Object export = tableExport.get();
Expand Down Expand Up @@ -586,25 +606,6 @@ public void handleMessage(@NotNull final BarrageProtoUtil.MessageInfo message) {
BarrageUtil.createAndSendSnapshot(streamGeneratorFactory, table, columns, viewport,
reverseViewport, snapshotOptAdapter.adapt(snapshotRequest), listener,
metrics);
HalfClosedState newState = halfClosedState.updateAndGet(current -> {
switch (current) {
case DONT_CLOSE:
// record that we have finished sending
return HalfClosedState.FINISHED_SENDING;
case CLIENT_HALF_CLOSED:
// since streaming has now finished, and client already half-closed,
// time to half close from server
return HalfClosedState.CLOSED;
case FINISHED_SENDING:
case CLOSED:
throw new IllegalStateException("Can't finish streaming twice");
default:
throw new IllegalStateException("Unknown state " + current);
}
});
if (newState == HalfClosedState.CLOSED) {
listener.onCompleted();
}
});
}
}
Expand All @@ -614,7 +615,7 @@ public void handleMessage(@NotNull final BarrageProtoUtil.MessageInfo message) {
public void close() {
// no work to do for DoGetRequest close
// possibly safely complete if finished sending data
HalfClosedState newState = halfClosedState.updateAndGet(current -> {
final HalfClosedState newState = halfClosedState.updateAndGet(current -> {
switch (current) {
case DONT_CLOSE:
// record that we have half closed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,10 +226,8 @@ public void getFlightInfo(
.queryPerformanceRecorder(queryPerformanceRecorder)
.require(export)
.onError(responseObserver)
.submit(() -> {
responseObserver.onNext(export.get());
responseObserver.onCompleted();
});
.onSuccess(responseObserver::onCompleted)
.submit(() -> responseObserver.onNext(export.get()));
return;
}

Expand Down Expand Up @@ -273,12 +271,10 @@ public void getSchema(
.queryPerformanceRecorder(queryPerformanceRecorder)
.require(export)
.onError(responseObserver)
.submit(() -> {
responseObserver.onNext(Flight.SchemaResult.newBuilder()
.setSchema(export.get().getSchema())
.build());
responseObserver.onCompleted();
});
.onSuccess(responseObserver::onCompleted)
.submit(() -> responseObserver.onNext(Flight.SchemaResult.newBuilder()
.setSchema(export.get().getSchema())
.build()));
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,16 +183,17 @@ public void executeCommand(
final SessionState.ExportObject<ScriptSession> exportedConsole =
ticketRouter.resolve(session, consoleId, "consoleId");

session.nonExport()
session.<ExecuteCommandResponse>nonExport()
.queryPerformanceRecorder(queryPerformanceRecorder)
.requiresSerialQueue()
.require(exportedConsole)
.onError(responseObserver)
.onSuccess((final ExecuteCommandResponse response) -> safelyComplete(responseObserver, response))
.submit(() -> {
ScriptSession scriptSession = exportedConsole.get();
ScriptSession.Changes changes = scriptSession.evaluateScript(request.getCode());
ExecuteCommandResponse.Builder diff = ExecuteCommandResponse.newBuilder();
FieldsChangeUpdate.Builder fieldChanges = FieldsChangeUpdate.newBuilder();
final ScriptSession scriptSession = exportedConsole.get();
final ScriptSession.Changes changes = scriptSession.evaluateScript(request.getCode());
final ExecuteCommandResponse.Builder diff = ExecuteCommandResponse.newBuilder();
final FieldsChangeUpdate.Builder fieldChanges = FieldsChangeUpdate.newBuilder();
changes.created.entrySet()
.forEach(entry -> fieldChanges.addCreated(makeVariableDefinition(entry)));
changes.updated.entrySet()
Expand All @@ -203,7 +204,7 @@ public void executeCommand(
diff.setErrorMessage(Throwables.getStackTraceAsString(changes.error));
log.error().append("Error running script: ").append(changes.error).endl();
}
safelyComplete(responseObserver, diff.setChanges(fieldChanges).build());
return diff.setChanges(fieldChanges).build();
});
}
}
Expand Down Expand Up @@ -276,7 +277,8 @@ public void bindTableToVariable(
ExportBuilder<?> exportBuilder = session.nonExport()
.queryPerformanceRecorder(queryPerformanceRecorder)
.requiresSerialQueue()
.onError(responseObserver);
.onError(responseObserver)
.onSuccess(responseObserver::onCompleted);

if (request.hasConsoleId()) {
exportedConsole = ticketRouter.resolve(session, request.getConsoleId(), "consoleId");
Expand All @@ -293,7 +295,6 @@ public void bindTableToVariable(
Table table = exportedTable.get();
queryScope.putParam(request.getVariableName(), table);
responseObserver.onNext(BindTableToVariableResponse.getDefaultInstance());
responseObserver.onCompleted();
});
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,12 @@ public void rollup(
final SessionState.ExportObject<Table> sourceTableExport =
ticketRouter.resolve(session, request.getSourceTableId(), "sourceTableId");

session.newExport(request.getResultRollupTableId(), "resultRollupTableId")
session.<RollupTable>newExport(request.getResultRollupTableId(), "resultRollupTableId")
.queryPerformanceRecorder(queryPerformanceRecorder)
.require(sourceTableExport)
.onError(responseObserver)
.onSuccess((final RollupTable ignoredResult) -> safelyComplete(responseObserver,
RollupResponse.getDefaultInstance()))
.submit(() -> {
final Table sourceTable = sourceTableExport.get();

Expand All @@ -109,7 +111,6 @@ public void rollup(
throw Exceptions.statusRuntimeException(
Code.FAILED_PRECONDITION, "Not authorized to rollup hierarchical table");
}
safelyComplete(responseObserver, RollupResponse.getDefaultInstance());
return transformedResult;
});
}
Expand Down Expand Up @@ -141,10 +142,12 @@ public void tree(
final SessionState.ExportObject<Table> sourceTableExport =
ticketRouter.resolve(session, request.getSourceTableId(), "sourceTableId");

session.newExport(request.getResultTreeTableId(), "resultTreeTableId")
session.<TreeTable>newExport(request.getResultTreeTableId(), "resultTreeTableId")
.queryPerformanceRecorder(queryPerformanceRecorder)
.require(sourceTableExport)
.onError(responseObserver)
.onSuccess((final TreeTable ignoredResult) -> safelyComplete(responseObserver,
TreeResponse.getDefaultInstance()))
.submit(() -> {
final Table sourceTable = sourceTableExport.get();

Expand All @@ -169,7 +172,6 @@ public void tree(
throw Exceptions.statusRuntimeException(
Code.FAILED_PRECONDITION, "Not authorized to tree hierarchical table");
}
safelyComplete(responseObserver, TreeResponse.getDefaultInstance());
return transformedResult;
});
}
Expand Down Expand Up @@ -202,10 +204,12 @@ public void apply(
final SessionState.ExportObject<HierarchicalTable<?>> inputHierarchicalTableExport =
ticketRouter.resolve(session, request.getInputHierarchicalTableId(), "inputHierarchicalTableId");

session.newExport(request.getResultHierarchicalTableId(), "resultHierarchicalTableId")
session.<HierarchicalTable<?>>newExport(request.getResultHierarchicalTableId(), "resultHierarchicalTableId")
.queryPerformanceRecorder(queryPerformanceRecorder)
.require(inputHierarchicalTableExport)
.onError(responseObserver)
.onSuccess((final HierarchicalTable<?> ignoredResult) -> safelyComplete(responseObserver,
HierarchicalTableApplyResponse.getDefaultInstance()))
.submit(() -> {
final HierarchicalTable<?> inputHierarchicalTable = inputHierarchicalTableExport.get();

Expand Down Expand Up @@ -274,7 +278,6 @@ public void apply(
throw Exceptions.statusRuntimeException(
Code.FAILED_PRECONDITION, "Not authorized to apply to hierarchical table");
}
safelyComplete(responseObserver, HierarchicalTableApplyResponse.getDefaultInstance());
return transformedResult;
});
}
Expand Down Expand Up @@ -395,6 +398,8 @@ public void view(
resultExportBuilder
.queryPerformanceRecorder(queryPerformanceRecorder)
.onError(responseObserver)
.onSuccess((final HierarchicalTableView ignoredResult) -> safelyComplete(responseObserver,
HierarchicalTableViewResponse.getDefaultInstance()))
.submit(() -> {
final Table keyTable = keyTableExport == null ? null : keyTableExport.get();
final Object target = targetExport.get();
Expand Down Expand Up @@ -439,7 +444,6 @@ public void view(
throw Exceptions.statusRuntimeException(
Code.FAILED_PRECONDITION, "Not authorized to view hierarchical table");
}
safelyComplete(responseObserver, HierarchicalTableViewResponse.getDefaultInstance());
return transformedResult;
});
}
Expand Down Expand Up @@ -483,10 +487,12 @@ public void exportSource(
final SessionState.ExportObject<HierarchicalTable<?>> hierarchicalTableExport =
ticketRouter.resolve(session, request.getHierarchicalTableId(), "hierarchicalTableId");

session.newExport(request.getResultTableId(), "resultTableId")
session.<Table>newExport(request.getResultTableId(), "resultTableId")
.queryPerformanceRecorder(queryPerformanceRecorder)
.require(hierarchicalTableExport)
.onError(responseObserver)
.onSuccess((final Table transformedResult) -> safelyComplete(responseObserver,
ExportUtil.buildTableCreationResponse(request.getResultTableId(), transformedResult)))
.submit(() -> {
final HierarchicalTable<?> hierarchicalTable = hierarchicalTableExport.get();

Expand All @@ -499,9 +505,6 @@ public void exportSource(
Code.FAILED_PRECONDITION,
"Not authorized to export source from hierarchical table");
}
final ExportedTableCreationResponse response =
ExportUtil.buildTableCreationResponse(request.getResultTableId(), transformedResult);
safelyComplete(responseObserver, response);
return transformedResult;
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public ObjectServiceGrpcImpl(SessionService sessionService, TicketRouter ticketR
private enum EnqueuedState {
WAITING, RUNNING, CLOSED
}

private final class SendMessageObserver implements StreamObserver<StreamRequest> {
private final SessionState session;
private final StreamObserver<StreamResponse> responseObserver;
Expand Down Expand Up @@ -268,10 +269,12 @@ public void fetchObject(
final SessionState.ExportObject<Object> object =
ticketRouter.resolve(session, request.getSourceId().getTicket(), "sourceId");

session.nonExport()
session.<FetchObjectResponse>nonExport()
.queryPerformanceRecorder(queryPerformanceRecorder)
.require(object)
.onError(responseObserver)
.onSuccess(
(final FetchObjectResponse response) -> GrpcUtil.safelyComplete(responseObserver, response))
.submit(() -> {
final Object o = object.get();
ObjectType objectTypeInstance = getObjectTypeInstance(type, o);
Expand Down Expand Up @@ -312,9 +315,7 @@ public void onCompleted() {
throw Exceptions.statusRuntimeException(Code.INVALID_ARGUMENT,
"Plugin didn't close response, use MessageStream instead for this object");
}
GrpcUtil.safelyComplete(responseObserver, message);

return null;
return message;
});
}
}
Expand Down
Loading

0 comments on commit d4abe74

Please sign in to comment.