Skip to content

Commit

Permalink
BarrageTable: Fix testCoalescingLargeUpdates out-of-memory (deephaven…
Browse files Browse the repository at this point in the history
  • Loading branch information
nbauernfeind authored Jan 17, 2024
1 parent 4f5cc3b commit 2120b16
Showing 1 changed file with 29 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.deephaven.extensions.barrage.table.BarrageTable;
import io.deephaven.extensions.barrage.util.BarrageProtoUtil;
import io.deephaven.extensions.barrage.util.BarrageStreamReader;
import io.deephaven.extensions.barrage.util.BarrageUtil;
import io.deephaven.server.arrow.ArrowModule;
import io.deephaven.server.session.SessionService;
import io.deephaven.server.util.Scheduler;
Expand Down Expand Up @@ -163,23 +164,28 @@ private class RemoteClient {
// comparing the producer table to the consumer table to validate contents are correct.
RemoteClient(final RowSet viewport, final BitSet subscribedColumns,
final BarrageMessageProducer<BarrageStreamGeneratorImpl.View> barrageMessageProducer,
final String name) {
final Table sourceTable, final String name) {
// assume a forward viewport when not specified
this(viewport, subscribedColumns, barrageMessageProducer, name, false, false);
this(viewport, subscribedColumns, barrageMessageProducer, sourceTable, name, false, false);
}

RemoteClient(final RowSet viewport, final BitSet subscribedColumns,
final BarrageMessageProducer<BarrageStreamGeneratorImpl.View> barrageMessageProducer,
final Table sourceTable,
final String name, final boolean reverseViewport, final boolean deferSubscription) {
this.viewport = viewport;
this.reverseViewport = reverseViewport;
this.subscribedColumns = subscribedColumns;
this.name = name;
this.barrageMessageProducer = barrageMessageProducer;

final Map<String, Object> attributes = new HashMap<>(sourceTable.getAttributes());
if (sourceTable.isFlat()) {
attributes.put(BarrageUtil.TABLE_ATTRIBUTE_IS_FLAT, true);
}
this.barrageTable = BarrageTable.make(updateSourceCombiner,
ExecutionContext.getContext().getUpdateGraph(),
null, barrageMessageProducer.getTableDefinition(), new HashMap<>(), null);
null, barrageMessageProducer.getTableDefinition(), attributes, null);
this.barrageTable.addSourceToRegistrar();

final BarrageSubscriptionOptions options = BarrageSubscriptionOptions.builder()
Expand Down Expand Up @@ -386,8 +392,8 @@ public RemoteClient newClient(final RowSet viewport, final BitSet subscribedColu

public RemoteClient newClient(final RowSet viewport, final BitSet subscribedColumns,
final boolean reverseViewport, final String name) {
clients.add(new RemoteClient(viewport, subscribedColumns, barrageMessageProducer, name, reverseViewport,
false));
clients.add(new RemoteClient(viewport, subscribedColumns, barrageMessageProducer,
originalTable, name, reverseViewport, false));
return clients.get(clients.size() - 1);
}

Expand Down Expand Up @@ -794,7 +800,8 @@ public void testColumnSubChange() {
columns.set(0, nugget.originalTable.numColumns() / 2);
nugget.clients.add(new RemoteClient(
RowSetFactory.fromRange(size / 5, 2L * size / 5),
columns, nugget.barrageMessageProducer, "sub-changer"));
columns, nugget.barrageMessageProducer, nugget.originalTable,
"sub-changer"));
}
}

Expand Down Expand Up @@ -840,7 +847,8 @@ void createNuggetsForTableMaker(final Supplier<Table> makeTable) {
columns.set(0, 4);
nugget.clients.add(
new RemoteClient(RowSetFactory.fromRange(0, size / 5),
columns, nugget.barrageMessageProducer, "sub-changer"));
columns, nugget.barrageMessageProducer, nugget.originalTable,
"sub-changer"));
}

void maybeChangeSub(final int step, final int rt, final int pt) {
Expand Down Expand Up @@ -887,7 +895,8 @@ void createNuggetsForTableMaker(final Supplier<Table> makeTable) {
columns.set(0, 4);
nugget.clients.add(
new RemoteClient(RowSetFactory.fromRange(0, size / 5),
columns, nugget.barrageMessageProducer, "sub-changer"));
columns, nugget.barrageMessageProducer, nugget.originalTable,
"sub-changer"));
}

void maybeChangeSub(final int step, final int rt, final int pt) {
Expand Down Expand Up @@ -931,7 +940,8 @@ public void testOverlappedColumnSubsChange() {
columns.set(0, 3);
nugget.clients.add(new RemoteClient(
RowSetFactory.fromRange(size / 5, 2L * size / 5),
columns, nugget.barrageMessageProducer, "sub-changer"));
columns, nugget.barrageMessageProducer, nugget.originalTable,
"sub-changer"));
}
}

Expand Down Expand Up @@ -999,7 +1009,8 @@ public void onGetSnapshot() {
final boolean deferSubscription = true;
nugget.clients.add(new RemoteClient(
RowSetFactory.fromRange(size / 5, 2L * size / 5),
columns, nugget.barrageMessageProducer, "sub-changer", false, deferSubscription));
columns, nugget.barrageMessageProducer, nugget.originalTable,
"sub-changer", false, deferSubscription));

}
}.runTest();
Expand Down Expand Up @@ -1029,7 +1040,8 @@ public void createNuggets() {
columns.set(0, 4);
nugget.clients.add(new RemoteClient(
RowSetFactory.fromRange(size / 5, 3L * size / 5),
columns, nugget.barrageMessageProducer, "sub-changer"));
columns, nugget.barrageMessageProducer, nugget.originalTable,
"sub-changer"));
}
}

Expand Down Expand Up @@ -1068,7 +1080,7 @@ public void testSimultaneousSubscriptionChanges() {
columns.set(0, 4);
nugget.clients.add(new RemoteClient(
RowSetFactory.fromRange(size / 5, 2L * size / 5),
columns, nugget.barrageMessageProducer, "sub-changer"));
columns, nugget.barrageMessageProducer, nugget.originalTable, "sub-changer"));
}
}

Expand Down Expand Up @@ -1200,13 +1212,14 @@ public void testCoalescingLargeUpdates() {
allColumns.set(0);

final QueryTable sourceTable = TstUtils.testRefreshingTable(i().toTracking());
final Table queryTable = sourceTable.updateView("data = (short) k");
sourceTable.setFlat();
final QueryTable queryTable = (QueryTable) sourceTable.updateView("data = (short) k");

final RemoteNugget remoteNugget = new RemoteNugget(() -> queryTable);

// Create a few interesting clients around the mapping boundary.
final int mb = SNAPSHOT_CHUNK_SIZE;
final int sz = 2 * mb;
final long sz = 2L * mb;
// noinspection unused
final RemoteClient[] remoteClients = new RemoteClient[] {
remoteNugget.newClient(null, allColumns, "full"),
Expand All @@ -1224,13 +1237,10 @@ public void testCoalescingLargeUpdates() {

// Add all of our new rows spread over multiple deltas.
final int numDeltas = 4;
final long blockSize = sz / numDeltas;
for (int ii = 0; ii < numDeltas; ++ii) {
final RowSetBuilderSequential newRowsBuilder = RowSetFactory.builderSequential();
for (int jj = ii; jj < sz; jj += numDeltas) {
newRowsBuilder.appendKey(jj);
}
final RowSet newRows = RowSetFactory.fromRange(ii * blockSize, (ii + 1) * blockSize - 1);
updateGraph.runWithinUnitTestCycle(() -> {
final RowSet newRows = newRowsBuilder.build();
TstUtils.addToTable(sourceTable, newRows);
sourceTable.notifyListeners(new TableUpdateImpl(
newRows,
Expand Down

0 comments on commit 2120b16

Please sign in to comment.