diff --git a/server/src/test/java/io/deephaven/server/barrage/BarrageMessageRoundTripTest.java b/server/src/test/java/io/deephaven/server/barrage/BarrageMessageRoundTripTest.java index e6c30edb3bb..1a3d2f9bb45 100644 --- a/server/src/test/java/io/deephaven/server/barrage/BarrageMessageRoundTripTest.java +++ b/server/src/test/java/io/deephaven/server/barrage/BarrageMessageRoundTripTest.java @@ -32,6 +32,7 @@ import io.deephaven.extensions.barrage.table.BarrageTable; import io.deephaven.extensions.barrage.util.BarrageProtoUtil; import io.deephaven.extensions.barrage.util.BarrageStreamReader; +import io.deephaven.extensions.barrage.util.BarrageUtil; import io.deephaven.server.arrow.ArrowModule; import io.deephaven.server.session.SessionService; import io.deephaven.server.util.Scheduler; @@ -163,13 +164,14 @@ private class RemoteClient { // comparing the producer table to the consumer table to validate contents are correct. RemoteClient(final RowSet viewport, final BitSet subscribedColumns, final BarrageMessageProducer barrageMessageProducer, - final String name) { + final Table sourceTable, final String name) { // assume a forward viewport when not specified - this(viewport, subscribedColumns, barrageMessageProducer, name, false, false); + this(viewport, subscribedColumns, barrageMessageProducer, sourceTable, name, false, false); } RemoteClient(final RowSet viewport, final BitSet subscribedColumns, final BarrageMessageProducer barrageMessageProducer, + final Table sourceTable, final String name, final boolean reverseViewport, final boolean deferSubscription) { this.viewport = viewport; this.reverseViewport = reverseViewport; @@ -177,9 +179,13 @@ private class RemoteClient { this.name = name; this.barrageMessageProducer = barrageMessageProducer; + final Map attributes = new HashMap<>(sourceTable.getAttributes()); + if (sourceTable.isFlat()) { + attributes.put(BarrageUtil.TABLE_ATTRIBUTE_IS_FLAT, true); + } this.barrageTable = BarrageTable.make(updateSourceCombiner, ExecutionContext.getContext().getUpdateGraph(), - null, barrageMessageProducer.getTableDefinition(), new HashMap<>(), null); + null, barrageMessageProducer.getTableDefinition(), attributes, null); this.barrageTable.addSourceToRegistrar(); final BarrageSubscriptionOptions options = BarrageSubscriptionOptions.builder() @@ -386,8 +392,8 @@ public RemoteClient newClient(final RowSet viewport, final BitSet subscribedColu public RemoteClient newClient(final RowSet viewport, final BitSet subscribedColumns, final boolean reverseViewport, final String name) { - clients.add(new RemoteClient(viewport, subscribedColumns, barrageMessageProducer, name, reverseViewport, - false)); + clients.add(new RemoteClient(viewport, subscribedColumns, barrageMessageProducer, + originalTable, name, reverseViewport, false)); return clients.get(clients.size() - 1); } @@ -794,7 +800,8 @@ public void testColumnSubChange() { columns.set(0, nugget.originalTable.numColumns() / 2); nugget.clients.add(new RemoteClient( RowSetFactory.fromRange(size / 5, 2L * size / 5), - columns, nugget.barrageMessageProducer, "sub-changer")); + columns, nugget.barrageMessageProducer, nugget.originalTable, + "sub-changer")); } } @@ -840,7 +847,8 @@ void createNuggetsForTableMaker(final Supplier makeTable) { columns.set(0, 4); nugget.clients.add( new RemoteClient(RowSetFactory.fromRange(0, size / 5), - columns, nugget.barrageMessageProducer, "sub-changer")); + columns, nugget.barrageMessageProducer, nugget.originalTable, + "sub-changer")); } void maybeChangeSub(final int step, final int rt, final int pt) { @@ -887,7 +895,8 @@ void createNuggetsForTableMaker(final Supplier
makeTable) { columns.set(0, 4); nugget.clients.add( new RemoteClient(RowSetFactory.fromRange(0, size / 5), - columns, nugget.barrageMessageProducer, "sub-changer")); + columns, nugget.barrageMessageProducer, nugget.originalTable, + "sub-changer")); } void maybeChangeSub(final int step, final int rt, final int pt) { @@ -931,7 +940,8 @@ public void testOverlappedColumnSubsChange() { columns.set(0, 3); nugget.clients.add(new RemoteClient( RowSetFactory.fromRange(size / 5, 2L * size / 5), - columns, nugget.barrageMessageProducer, "sub-changer")); + columns, nugget.barrageMessageProducer, nugget.originalTable, + "sub-changer")); } } @@ -999,7 +1009,8 @@ public void onGetSnapshot() { final boolean deferSubscription = true; nugget.clients.add(new RemoteClient( RowSetFactory.fromRange(size / 5, 2L * size / 5), - columns, nugget.barrageMessageProducer, "sub-changer", false, deferSubscription)); + columns, nugget.barrageMessageProducer, nugget.originalTable, + "sub-changer", false, deferSubscription)); } }.runTest(); @@ -1029,7 +1040,8 @@ public void createNuggets() { columns.set(0, 4); nugget.clients.add(new RemoteClient( RowSetFactory.fromRange(size / 5, 3L * size / 5), - columns, nugget.barrageMessageProducer, "sub-changer")); + columns, nugget.barrageMessageProducer, nugget.originalTable, + "sub-changer")); } } @@ -1068,7 +1080,7 @@ public void testSimultaneousSubscriptionChanges() { columns.set(0, 4); nugget.clients.add(new RemoteClient( RowSetFactory.fromRange(size / 5, 2L * size / 5), - columns, nugget.barrageMessageProducer, "sub-changer")); + columns, nugget.barrageMessageProducer, nugget.originalTable, "sub-changer")); } } @@ -1200,13 +1212,14 @@ public void testCoalescingLargeUpdates() { allColumns.set(0); final QueryTable sourceTable = TstUtils.testRefreshingTable(i().toTracking()); - final Table queryTable = sourceTable.updateView("data = (short) k"); + sourceTable.setFlat(); + final QueryTable queryTable = (QueryTable) sourceTable.updateView("data = (short) k"); final RemoteNugget remoteNugget = new RemoteNugget(() -> queryTable); // Create a few interesting clients around the mapping boundary. final int mb = SNAPSHOT_CHUNK_SIZE; - final int sz = 2 * mb; + final long sz = 2L * mb; // noinspection unused final RemoteClient[] remoteClients = new RemoteClient[] { remoteNugget.newClient(null, allColumns, "full"), @@ -1224,13 +1237,10 @@ public void testCoalescingLargeUpdates() { // Add all of our new rows spread over multiple deltas. final int numDeltas = 4; + final long blockSize = sz / numDeltas; for (int ii = 0; ii < numDeltas; ++ii) { - final RowSetBuilderSequential newRowsBuilder = RowSetFactory.builderSequential(); - for (int jj = ii; jj < sz; jj += numDeltas) { - newRowsBuilder.appendKey(jj); - } + final RowSet newRows = RowSetFactory.fromRange(ii * blockSize, (ii + 1) * blockSize - 1); updateGraph.runWithinUnitTestCycle(() -> { - final RowSet newRows = newRowsBuilder.build(); TstUtils.addToTable(sourceTable, newRows); sourceTable.notifyListeners(new TableUpdateImpl( newRows,