From bc6f7f5fde86234b4c4def3f40debcc2267197f4 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Wed, 6 Sep 2023 22:55:43 +0100 Subject: [PATCH 1/6] feat: adjust shard size As discussed in #764 and calculated in https://observablehq.com/@gozala/w3up-shard-size, this PR adjusts the upload-client to shard on a CAR size of <= 133,169,152 bytes. It also fixes a bug where a CAR shard could be created with size greater than the shard size. supersedes #764 --- packages/upload-client/package.json | 7 +- packages/upload-client/src/car.js | 20 ++++++ packages/upload-client/src/sharding.js | 45 ++++++++++--- packages/upload-client/src/unixfs.js | 8 ++- packages/upload-client/test/index.test.js | 28 +++++--- packages/upload-client/test/sharding.test.js | 69 +++++++++++++++++++- packages/upload-client/test/store.test.js | 6 +- pnpm-lock.yaml | 39 +++++++++-- 8 files changed, 188 insertions(+), 34 deletions(-) diff --git a/packages/upload-client/package.json b/packages/upload-client/package.json index e6000752b..4f1f43d0e 100644 --- a/packages/upload-client/package.json +++ b/packages/upload-client/package.json @@ -64,7 +64,8 @@ "dist/src/**/*.d.ts.map" ], "dependencies": { - "@ipld/car": "^5.0.3", + "@ipld/car": "^5.2.2", + "@ipld/dag-cbor": "^9.0.0", "@ipld/dag-ucan": "^3.2.0", "@ipld/unixfs": "^2.1.1", "@ucanto/client": "^8.0.0", @@ -74,11 +75,13 @@ "ipfs-utils": "^9.0.14", "multiformats": "^11.0.2", "p-queue": "^7.3.0", - "p-retry": "^5.1.2" + "p-retry": "^5.1.2", + "varint": "^6.0.0" }, "devDependencies": { "@types/assert": "^1.5.6", "@types/mocha": "^10.0.1", + "@types/varint": "^6.0.1", "@ucanto/principal": "^8.0.0", "@ucanto/server": "^8.0.1", "assert": "^2.0.0", diff --git a/packages/upload-client/src/car.js b/packages/upload-client/src/car.js index 2749da4ee..4a491ce35 100644 --- a/packages/upload-client/src/car.js +++ b/packages/upload-client/src/car.js @@ -1,9 +1,29 @@ import { CarBlockIterator, CarWriter } from '@ipld/car' +import * as dagCBOR from '@ipld/dag-cbor' +import varint from 'varint' /** * @typedef {import('@ipld/unixfs').Block} Block */ +/** Byte length of a CBOR encoded CAR header with zero roots. */ +const NO_ROOTS_HEADER_LENGTH = 17 + +/** @param {import('./types').AnyLink} [root] */ +export function headerEncodingLength(root) { + if (!root) return NO_ROOTS_HEADER_LENGTH + const headerLength = dagCBOR.encode({ version: 1, roots: [root] }).length + const varintLength = varint.encodingLength(headerLength) + return varintLength + headerLength +} + +/** @param {Block} block */ +export function blockEncodingLength(block) { + const varintLength = varint.encodingLength(block.cid.bytes.length + block.bytes.length) + const cidLength = block.cid.bytes.length + return varintLength + cidLength + block.bytes.length +} + /** * @param {Iterable | AsyncIterable} blocks * @param {import('./types').AnyLink} [root] diff --git a/packages/upload-client/src/sharding.js b/packages/upload-client/src/sharding.js index 282f13d18..4a9b2e1e8 100644 --- a/packages/upload-client/src/sharding.js +++ b/packages/upload-client/src/sharding.js @@ -1,8 +1,9 @@ import Queue from 'p-queue' -import { encode } from './car.js' +import { blockEncodingLength, encode, headerEncodingLength } from './car.js' import { add } from './store.js' -const SHARD_SIZE = 1024 * 1024 * 100 +// https://observablehq.com/@gozala/w3up-shard-size +const SHARD_SIZE = 133_169_152 const CONCURRENT_UPLOADS = 3 /** @@ -22,7 +23,7 @@ export class ShardingStream extends TransformStream { let shard = [] /** @type {import('@ipld/unixfs').Block[] | null} */ let readyShard = null - let size = 0 + let shardBlockLength = 0 super({ async transform(block, controller) { @@ -30,13 +31,19 @@ export class ShardingStream extends TransformStream { controller.enqueue(await encode(readyShard)) readyShard = null } - if (shard.length && size + block.bytes.length > shardSize) { + + const blockLength = blockEncodingLength(block) + if (headerEncodingLength() + blockLength > shardSize) { + throw new Error(`block exceeds shard size: ${block.cid}`) + } + + if (shard.length && headerEncodingLength() + shardBlockLength + blockLength > shardSize) { readyShard = shard shard = [] - size = 0 + shardBlockLength = 0 } shard.push(block) - size += block.bytes.length + shardBlockLength += blockLength }, async flush(controller) { @@ -45,10 +52,28 @@ export class ShardingStream extends TransformStream { } const rootBlock = shard.at(-1) - if (rootBlock != null) { - controller.enqueue( - await encode(shard, options.rootCID ?? rootBlock.cid) - ) + if (rootBlock == null) return + + const rootCID = options.rootCID ?? rootBlock.cid + const headerLength = headerEncodingLength(rootCID) + // does the shard with the CAR header that _includes_ a root CID + // exceed the shard size? + if (headerLength + shardBlockLength > shardSize) { + const overage = headerLength + shardBlockLength - shardSize + const lastShard = [] + let lastShardBlockLength = 0 + while (lastShardBlockLength < overage) { + // need at least 1 block in original shard + if (shard.length < 2) throw new Error(`block exceeds shard size: ${shard.at(-1)?.cid}`) + const block = shard[shard.length - 1] + shard.pop() + lastShard.unshift(block) + lastShardBlockLength += blockEncodingLength(block) + } + controller.enqueue(await encode(shard)) + controller.enqueue(await encode(lastShard, rootCID)) + } else { + controller.enqueue(await encode(shard, rootCID)) } }, }) diff --git a/packages/upload-client/src/unixfs.js b/packages/upload-client/src/unixfs.js index 942e2ea95..91aff7c8e 100644 --- a/packages/upload-client/src/unixfs.js +++ b/packages/upload-client/src/unixfs.js @@ -34,8 +34,12 @@ export function createFileEncoderStream(blob) { const unixfsWriter = UnixFS.createWriter({ writable, settings }) const fileBuilder = new UnixFSFileBuilder('', blob) void (async () => { - await fileBuilder.finalize(unixfsWriter) - await unixfsWriter.close() + try { + await fileBuilder.finalize(unixfsWriter) + await unixfsWriter.close() + } catch (err) { + console.error(err) + } })() return readable } diff --git a/packages/upload-client/test/index.test.js b/packages/upload-client/test/index.test.js index b569e34e9..eadeee7d4 100644 --- a/packages/upload-client/test/index.test.js +++ b/packages/upload-client/test/index.test.js @@ -12,7 +12,7 @@ import { randomBlock, randomBytes } from './helpers/random.js' import { toCAR } from './helpers/car.js' import { File } from './helpers/shims.js' import { mockService } from './helpers/mocks.js' -import { encode } from '../src/car.js' +import { blockEncodingLength, encode, headerEncodingLength } from '../src/car.js' describe('uploadFile', () => { it('uploads a file to the service', async () => { @@ -173,12 +173,19 @@ describe('uploadFile', () => { file, { connection, - shardSize: 1024 * 1024 * 2, // should end up with 2 CAR files + // chunk size = 1_048_576 + // encoded block size = 1_048_615 + // shard size = 2_097_152 (as configured below) + // total file size = 5_242_880 (as above) + // so, at least 2 shards, but 2 encoded blocks (_without_ CAR header) = 2_097_230 + // ...which is > shard size of 2_097_152 + // so we actually end up with a shard for each block - 5 CARs! + shardSize: 1024 * 1024 * 2, onShardStored: (meta) => carCIDs.push(meta.cid), } ) - assert.equal(carCIDs.length, 3) + assert.equal(carCIDs.length, 5) }) }) @@ -344,7 +351,7 @@ describe('uploadDirectory', () => { files, { connection, - shardSize: 400_000, // should end up with 2 CAR files + shardSize: 500_056, // should end up with 2 CAR files onShardStored: (meta) => carCIDs.push(meta.cid), } ) @@ -358,15 +365,16 @@ describe('uploadCAR', () => { const space = await Signer.generate() const agent = await Signer.generate() const blocks = [ - await randomBlock(32), - await randomBlock(32), - await randomBlock(32), + await randomBlock(128), + await randomBlock(128), + await randomBlock(128), ] const car = await encode(blocks, blocks.at(-1)?.cid) - // shard size 1 block less than total = 2 expected CAR shards - const shardSize = blocks + // Wanted: 2 shards + // 2 * CAR header (34) + 2 * blocks (256), 2 * block encoding prefix (78) + const shardSize = (headerEncodingLength() * 2) + blocks .slice(0, -1) - .reduce((size, block) => size + block.bytes.length, 0) + .reduce((size, block) => size + blockEncodingLength(block), 0) /** @type {import('../src/types').CARLink[]} */ const carCIDs = [] diff --git a/packages/upload-client/test/sharding.test.js b/packages/upload-client/test/sharding.test.js index 0a3540e78..3906ccad7 100644 --- a/packages/upload-client/test/sharding.test.js +++ b/packages/upload-client/test/sharding.test.js @@ -9,7 +9,7 @@ import { CID } from 'multiformats' import { createFileEncoderStream } from '../src/unixfs.js' import { ShardingStream, ShardStoringStream } from '../src/sharding.js' import { serviceSigner } from './fixtures.js' -import { randomBytes, randomCAR } from './helpers/random.js' +import { randomBlock, randomBytes, randomCAR } from './helpers/random.js' import { mockService } from './helpers/mocks.js' describe('ShardingStream', () => { @@ -60,6 +60,73 @@ describe('ShardingStream', () => { assert.equal(shards.length, 1) assert.equal(shards[0].roots[0].toString(), rootCID.toString()) }) + + it('fails to shard block that exceeds shard size when encoded', async () => { + const file = new Blob([await randomBytes(128)]) + await assert.rejects(() => createFileEncoderStream(file) + .pipeThrough(new ShardingStream({ shardSize: 64 })) + .pipeTo(new WritableStream()), /block exceeds shard size/) + }) + + it('reduces final shard to accomodate CAR header with root CID', async () => { + const blocks = [ + await randomBlock(128), // encoded block length = 166 + await randomBlock(64), // encoded block length = 102 + await randomBlock(32) // encoded block length = 70 + ] + + /** @type {import('../src/types').CARFile[]} */ + const shards = [] + await new ReadableStream({ + pull (controller) { + const block = blocks.shift() + if (!block) return controller.close() + controller.enqueue(block) + } + }) + // shard with no roots = encoded block (166) + CAR header (17) = 183 + // shard with no roots = encoded block (102) + CAR header (17) = 119 + // shard with 1 root = encoded block (70) + CAR header (17) = 87 + // shard with 1 root = encoded block (70) + CAR header (59) = 155 + // i.e. shard size of 206 (119 + 87) should allow us 1 shard with 0 roots + // and then 1 shard with 2 blocks that, when encoded as a CAR with 1 root + // will actually exceed the shard size. It must then be refactored into + // 2 shards. + .pipeThrough(new ShardingStream({ shardSize: 206 })) + .pipeTo(new WritableStream({ write: s => { shards.push(s) } })) + + assert.equal(shards.length, 3) + }) + + it('fails to shard block that exceeds shard size when encoded with root CID', async () => { + const blocks = [ + await randomBlock(128) // encoded block length = 166 + ] + + await assert.rejects(() => { + return new ReadableStream({ + pull (controller) { + const block = blocks.shift() + if (!block) return controller.close() + controller.enqueue(block) + } + }) + // shard with no roots = encoded block (166) + CAR header (17) = 183 + // shard with 1 root = encoded block (166) + CAR header (59) = 225 + // i.e. shard size of 183 should allow us 1 shard with no roots and then + // we'll fail to create a shard with 1 root. + .pipeThrough(new ShardingStream({ shardSize: 183 })) + .pipeTo(new WritableStream()) + }, /block exceeds shard size/) + }) + + it('no blocks no shards', async () => { + let shards = 0 + await new ReadableStream({ pull: controller => { controller.close() } }) + .pipeThrough(new ShardingStream({ shardSize: 206 })) + .pipeTo(new WritableStream({ write: () => { shards++ } })) + assert.equal(shards, 0) + }) }) describe('ShardStoringStream', () => { diff --git a/packages/upload-client/test/store.test.js b/packages/upload-client/test/store.test.js index 11b348f34..65ac1b9e3 100644 --- a/packages/upload-client/test/store.test.js +++ b/packages/upload-client/test/store.test.js @@ -59,7 +59,7 @@ describe('Store.add', () => { channel: server, }) - let progressStatusCalls = 0 + let loaded = 0 const carCID = await Store.add( { issuer: agent, with: space.did(), proofs, audience: serviceSigner }, car, @@ -67,14 +67,14 @@ describe('Store.add', () => { connection, onUploadProgress: (status) => { assert(typeof status.loaded === 'number' && status.loaded > 0) - progressStatusCalls++ + loaded = status.loaded }, } ) assert(service.store.add.called) assert.equal(service.store.add.callCount, 1) - assert.equal(progressStatusCalls, 1) + assert.equal(loaded, 225) assert(carCID) assert.equal(carCID.toString(), car.cid.toString()) diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index e428d8a9b..d015b6c85 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -420,8 +420,11 @@ importers: packages/upload-client: dependencies: '@ipld/car': - specifier: ^5.0.3 - version: 5.1.1 + specifier: ^5.2.2 + version: 5.2.2 + '@ipld/dag-cbor': + specifier: ^9.0.0 + version: 9.0.4 '@ipld/dag-ucan': specifier: ^3.2.0 version: 3.3.2 @@ -452,6 +455,9 @@ importers: p-retry: specifier: ^5.1.2 version: 5.1.2 + varint: + specifier: ^6.0.0 + version: 6.0.0 devDependencies: '@types/assert': specifier: ^1.5.6 @@ -459,6 +465,9 @@ importers: '@types/mocha': specifier: ^10.0.1 version: 10.0.1 + '@types/varint': + specifier: ^6.0.1 + version: 6.0.1 '@ucanto/principal': specifier: ^8.0.0 version: 8.0.0 @@ -2317,6 +2326,16 @@ packages: multiformats: 11.0.2 varint: 6.0.0 + /@ipld/car@5.2.2: + resolution: {integrity: sha512-8IapvzPNB1Z2VwtA7n6olB3quhrLMbFxk4JaENIT4OlQ6YQNz1peY00qb2iJTC/kCDir7yb3TuNHkbdDzSKiXA==} + engines: {node: '>=16.0.0', npm: '>=7.0.0'} + dependencies: + '@ipld/dag-cbor': 9.0.4 + cborg: 2.0.5 + multiformats: 12.1.1 + varint: 6.0.0 + dev: false + /@ipld/dag-cbor@9.0.0: resolution: {integrity: sha512-zdsiSiYDEOIDW7mmWOYWC9gukjXO+F8wqxz/LfN7iSwTfIyipC8+UQrCbPupFMRb/33XQTZk8yl3My8vUQBRoA==} engines: {node: '>=16.0.0', npm: '>=7.0.0'} @@ -2329,8 +2348,8 @@ packages: resolution: {integrity: sha512-HBNVngk/47pKNLTAelN6ORWgKkjJtQj96Xb+jIBtRShJGCsXgghj1TzTynTTIp1dZxwPe5rVIL6yjZmvdyP2Wg==} engines: {node: '>=16.0.0', npm: '>=7.0.0'} dependencies: - cborg: 2.0.3 - multiformats: 12.0.1 + cborg: 2.0.5 + multiformats: 12.1.1 /@ipld/dag-json@10.1.3: resolution: {integrity: sha512-c0PJE+cDnsYfyuWMdTTwkzMvxjthEwWD+0u6ApQfXrs+GTcxeYh+Hefd0xE9L4fb2Trr1DGA1a/iia4PAS3Mpg==} @@ -3031,7 +3050,7 @@ packages: /@types/through@0.0.30: resolution: {integrity: sha512-FvnCJljyxhPM3gkRgWmxmDZyAQSiBQQWLI0A0VFL0K7W1oRUrPJSqNO0NvTnLkBcotdlp3lKvaT0JrnyRDkzOg==} dependencies: - '@types/node': 18.11.18 + '@types/node': 18.17.4 dev: true /@types/unist@2.0.7: @@ -3041,7 +3060,7 @@ packages: /@types/varint@6.0.1: resolution: {integrity: sha512-fQdOiZpDMBvaEdl12P1x7xlTPRAtd7qUUtVaWgkCy8DC//wCv19nqFFtrnR3y/ac6VFY0UUvYuQqfKzZTSE26w==} dependencies: - '@types/node': 18.11.18 + '@types/node': 18.17.4 dev: true /@types/ws@8.5.4: @@ -4138,6 +4157,10 @@ packages: resolution: {integrity: sha512-f1IbyqgRLQK4ruNM+V3WikfYfXQg/f/zC1oneOw1P7F/Dn2OJX6MaXIdei3JMpz361IjY7OENBKcE53nkJFVCQ==} hasBin: true + /cborg@2.0.5: + resolution: {integrity: sha512-xVW1rSIw1ZXbkwl2XhJ7o/jAv0vnVoQv/QlfQxV8a7V5PlA4UU/AcIiXqmpyybwNWy/GPQU1m/aBVNIWr7/T0w==} + hasBin: true + /ccount@1.1.0: resolution: {integrity: sha512-vlNK021QdI7PNeiUh/lKkC/mNHHfV0m/Ad5JoI0TYtlBnJAslM/JIkm/tGC88bkLIwO6OQ5uV6ztS6kVAtCDlg==} dev: true @@ -8411,6 +8434,10 @@ packages: resolution: {integrity: sha512-s01wijBJoDUqESWSzePY0lvTw7J3PVO9x2Cc6ASI5AMZM2Gnhh7BC17+nlFhHKU7dDzaCaRfb+NiqNzOsgPUoQ==} engines: {node: '>=16.0.0', npm: '>=7.0.0'} + /multiformats@12.1.1: + resolution: {integrity: sha512-GBSToTmri2vJYs8wqcZQ8kB21dCaeTOzHTIAlr8J06C1eL6UbzqURXFZ5Fl0EYm9GAFz1IlYY8SxGOs9G9NJRg==} + engines: {node: '>=16.0.0', npm: '>=7.0.0'} + /multimatch@5.0.0: resolution: {integrity: sha512-ypMKuglUrZUD99Tk2bUQ+xNQj43lPEfAeX2o9cTteAmShXy2VHDJpuwu1o0xqoKCt9jLVAvwyFKdLTPXKAfJyA==} engines: {node: '>=10'} From b76919a27fea65bd8e3486474d4ea57116661173 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Wed, 6 Sep 2023 23:09:39 +0100 Subject: [PATCH 2/6] chore: appease linter --- packages/upload-client/src/car.js | 4 +- packages/upload-client/src/sharding.js | 8 +- packages/upload-client/test/index.test.js | 14 +++- packages/upload-client/test/sharding.test.js | 78 +++++++++++++------- 4 files changed, 69 insertions(+), 35 deletions(-) diff --git a/packages/upload-client/src/car.js b/packages/upload-client/src/car.js index 4a491ce35..09a8e6ca5 100644 --- a/packages/upload-client/src/car.js +++ b/packages/upload-client/src/car.js @@ -19,7 +19,9 @@ export function headerEncodingLength(root) { /** @param {Block} block */ export function blockEncodingLength(block) { - const varintLength = varint.encodingLength(block.cid.bytes.length + block.bytes.length) + const varintLength = varint.encodingLength( + block.cid.bytes.length + block.bytes.length + ) const cidLength = block.cid.bytes.length return varintLength + cidLength + block.bytes.length } diff --git a/packages/upload-client/src/sharding.js b/packages/upload-client/src/sharding.js index 4a9b2e1e8..2c0286bec 100644 --- a/packages/upload-client/src/sharding.js +++ b/packages/upload-client/src/sharding.js @@ -37,7 +37,10 @@ export class ShardingStream extends TransformStream { throw new Error(`block exceeds shard size: ${block.cid}`) } - if (shard.length && headerEncodingLength() + shardBlockLength + blockLength > shardSize) { + if ( + shard.length && + headerEncodingLength() + shardBlockLength + blockLength > shardSize + ) { readyShard = shard shard = [] shardBlockLength = 0 @@ -64,7 +67,8 @@ export class ShardingStream extends TransformStream { let lastShardBlockLength = 0 while (lastShardBlockLength < overage) { // need at least 1 block in original shard - if (shard.length < 2) throw new Error(`block exceeds shard size: ${shard.at(-1)?.cid}`) + if (shard.length < 2) + throw new Error(`block exceeds shard size: ${shard.at(-1)?.cid}`) const block = shard[shard.length - 1] shard.pop() lastShard.unshift(block) diff --git a/packages/upload-client/test/index.test.js b/packages/upload-client/test/index.test.js index eadeee7d4..0f200e20b 100644 --- a/packages/upload-client/test/index.test.js +++ b/packages/upload-client/test/index.test.js @@ -12,7 +12,11 @@ import { randomBlock, randomBytes } from './helpers/random.js' import { toCAR } from './helpers/car.js' import { File } from './helpers/shims.js' import { mockService } from './helpers/mocks.js' -import { blockEncodingLength, encode, headerEncodingLength } from '../src/car.js' +import { + blockEncodingLength, + encode, + headerEncodingLength, +} from '../src/car.js' describe('uploadFile', () => { it('uploads a file to the service', async () => { @@ -372,9 +376,11 @@ describe('uploadCAR', () => { const car = await encode(blocks, blocks.at(-1)?.cid) // Wanted: 2 shards // 2 * CAR header (34) + 2 * blocks (256), 2 * block encoding prefix (78) - const shardSize = (headerEncodingLength() * 2) + blocks - .slice(0, -1) - .reduce((size, block) => size + blockEncodingLength(block), 0) + const shardSize = + headerEncodingLength() * 2 + + blocks + .slice(0, -1) + .reduce((size, block) => size + blockEncodingLength(block), 0) /** @type {import('../src/types').CARLink[]} */ const carCIDs = [] diff --git a/packages/upload-client/test/sharding.test.js b/packages/upload-client/test/sharding.test.js index 3906ccad7..7aaad4bea 100644 --- a/packages/upload-client/test/sharding.test.js +++ b/packages/upload-client/test/sharding.test.js @@ -63,27 +63,31 @@ describe('ShardingStream', () => { it('fails to shard block that exceeds shard size when encoded', async () => { const file = new Blob([await randomBytes(128)]) - await assert.rejects(() => createFileEncoderStream(file) - .pipeThrough(new ShardingStream({ shardSize: 64 })) - .pipeTo(new WritableStream()), /block exceeds shard size/) + await assert.rejects( + () => + createFileEncoderStream(file) + .pipeThrough(new ShardingStream({ shardSize: 64 })) + .pipeTo(new WritableStream()), + /block exceeds shard size/ + ) }) it('reduces final shard to accomodate CAR header with root CID', async () => { const blocks = [ await randomBlock(128), // encoded block length = 166 - await randomBlock(64), // encoded block length = 102 - await randomBlock(32) // encoded block length = 70 + await randomBlock(64), // encoded block length = 102 + await randomBlock(32), // encoded block length = 70 ] /** @type {import('../src/types').CARFile[]} */ const shards = [] await new ReadableStream({ - pull (controller) { - const block = blocks.shift() - if (!block) return controller.close() - controller.enqueue(block) - } - }) + pull(controller) { + const block = blocks.shift() + if (!block) return controller.close() + controller.enqueue(block) + }, + }) // shard with no roots = encoded block (166) + CAR header (17) = 183 // shard with no roots = encoded block (102) + CAR header (17) = 119 // shard with 1 root = encoded block (70) + CAR header (17) = 87 @@ -93,38 +97,56 @@ describe('ShardingStream', () => { // will actually exceed the shard size. It must then be refactored into // 2 shards. .pipeThrough(new ShardingStream({ shardSize: 206 })) - .pipeTo(new WritableStream({ write: s => { shards.push(s) } })) + .pipeTo( + new WritableStream({ + write: (s) => { + shards.push(s) + }, + }) + ) assert.equal(shards.length, 3) }) it('fails to shard block that exceeds shard size when encoded with root CID', async () => { const blocks = [ - await randomBlock(128) // encoded block length = 166 + await randomBlock(128), // encoded block length = 166 ] await assert.rejects(() => { - return new ReadableStream({ - pull (controller) { - const block = blocks.shift() - if (!block) return controller.close() - controller.enqueue(block) - } - }) - // shard with no roots = encoded block (166) + CAR header (17) = 183 - // shard with 1 root = encoded block (166) + CAR header (59) = 225 - // i.e. shard size of 183 should allow us 1 shard with no roots and then - // we'll fail to create a shard with 1 root. - .pipeThrough(new ShardingStream({ shardSize: 183 })) - .pipeTo(new WritableStream()) + return ( + new ReadableStream({ + pull(controller) { + const block = blocks.shift() + if (!block) return controller.close() + controller.enqueue(block) + }, + }) + // shard with no roots = encoded block (166) + CAR header (17) = 183 + // shard with 1 root = encoded block (166) + CAR header (59) = 225 + // i.e. shard size of 183 should allow us 1 shard with no roots and then + // we'll fail to create a shard with 1 root. + .pipeThrough(new ShardingStream({ shardSize: 183 })) + .pipeTo(new WritableStream()) + ) }, /block exceeds shard size/) }) it('no blocks no shards', async () => { let shards = 0 - await new ReadableStream({ pull: controller => { controller.close() } }) + await new ReadableStream({ + pull: (controller) => { + controller.close() + }, + }) .pipeThrough(new ShardingStream({ shardSize: 206 })) - .pipeTo(new WritableStream({ write: () => { shards++ } })) + .pipeTo( + new WritableStream({ + write: () => { + shards++ + }, + }) + ) assert.equal(shards, 0) }) }) From 786d8737bbd2ed19a9f54026e7dd829f9ffcf54b Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Thu, 7 Sep 2023 08:21:00 +0100 Subject: [PATCH 3/6] Update packages/upload-client/src/sharding.js Co-authored-by: Irakli Gozalishvili --- packages/upload-client/src/sharding.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/upload-client/src/sharding.js b/packages/upload-client/src/sharding.js index 2c0286bec..8661e0b81 100644 --- a/packages/upload-client/src/sharding.js +++ b/packages/upload-client/src/sharding.js @@ -59,8 +59,8 @@ export class ShardingStream extends TransformStream { const rootCID = options.rootCID ?? rootBlock.cid const headerLength = headerEncodingLength(rootCID) - // does the shard with the CAR header that _includes_ a root CID - // exceed the shard size? + // If adding CAR root overflows the shard limit we move overflowing blocks + // into a another CAR. if (headerLength + shardBlockLength > shardSize) { const overage = headerLength + shardBlockLength - shardSize const lastShard = [] From 33011b6193db2934ad55cc64c43e567b18f88820 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Thu, 7 Sep 2023 08:21:07 +0100 Subject: [PATCH 4/6] Update packages/upload-client/src/car.js Co-authored-by: Irakli Gozalishvili --- packages/upload-client/src/car.js | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/packages/upload-client/src/car.js b/packages/upload-client/src/car.js index 44e7f6d52..c53c9338c 100644 --- a/packages/upload-client/src/car.js +++ b/packages/upload-client/src/car.js @@ -19,11 +19,10 @@ export function headerEncodingLength(root) { /** @param {Block} block */ export function blockEncodingLength(block) { - const varintLength = varint.encodingLength( - block.cid.bytes.length + block.bytes.length - ) - const cidLength = block.cid.bytes.length - return varintLength + cidLength + block.bytes.length + const payloadLength = block.cid.bytes.length + block.bytes.length + const varintLength = varint.encodingLength(payloadLength) + + return varintLength + payloadLength } /** From d1bc2e7b4f2daf0709505dd6f77865115ea8e5b2 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Thu, 7 Sep 2023 10:06:25 +0100 Subject: [PATCH 5/6] refactor: address feedback --- packages/upload-client/src/car.js | 1 - packages/upload-client/src/sharding.js | 72 ++++++++++---------- packages/upload-client/src/unixfs.js | 8 +-- packages/upload-client/test/sharding.test.js | 24 ++++--- 4 files changed, 52 insertions(+), 53 deletions(-) diff --git a/packages/upload-client/src/car.js b/packages/upload-client/src/car.js index c53c9338c..33406caf9 100644 --- a/packages/upload-client/src/car.js +++ b/packages/upload-client/src/car.js @@ -21,7 +21,6 @@ export function headerEncodingLength(root) { export function blockEncodingLength(block) { const payloadLength = block.cid.bytes.length + block.bytes.length const varintLength = varint.encodingLength(payloadLength) - return varintLength + payloadLength } diff --git a/packages/upload-client/src/sharding.js b/packages/upload-client/src/sharding.js index 8661e0b81..eef77290c 100644 --- a/packages/upload-client/src/sharding.js +++ b/packages/upload-client/src/sharding.js @@ -19,65 +19,65 @@ export class ShardingStream extends TransformStream { */ constructor(options = {}) { const shardSize = options.shardSize ?? SHARD_SIZE + const maxBlockLength = shardSize - headerEncodingLength() /** @type {import('@ipld/unixfs').Block[]} */ - let shard = [] + let blocks = [] /** @type {import('@ipld/unixfs').Block[] | null} */ - let readyShard = null - let shardBlockLength = 0 + let readyBlocks = null + let currentLength = 0 super({ async transform(block, controller) { - if (readyShard != null) { - controller.enqueue(await encode(readyShard)) - readyShard = null + if (readyBlocks != null) { + controller.enqueue(await encode(readyBlocks)) + readyBlocks = null } const blockLength = blockEncodingLength(block) - if (headerEncodingLength() + blockLength > shardSize) { - throw new Error(`block exceeds shard size: ${block.cid}`) + if (blockLength > maxBlockLength) { + throw new Error(`block will cause CAR to exceed shard size: ${block.cid}`) } - if ( - shard.length && - headerEncodingLength() + shardBlockLength + blockLength > shardSize - ) { - readyShard = shard - shard = [] - shardBlockLength = 0 + if (blocks.length && currentLength + blockLength > maxBlockLength) { + readyBlocks = blocks + blocks = [] + currentLength = 0 } - shard.push(block) - shardBlockLength += blockLength + blocks.push(block) + currentLength += blockLength }, async flush(controller) { - if (readyShard != null) { - controller.enqueue(await encode(readyShard)) + if (readyBlocks != null) { + controller.enqueue(await encode(readyBlocks)) } - const rootBlock = shard.at(-1) + const rootBlock = blocks.at(-1) if (rootBlock == null) return const rootCID = options.rootCID ?? rootBlock.cid const headerLength = headerEncodingLength(rootCID) - // If adding CAR root overflows the shard limit we move overflowing blocks - // into a another CAR. - if (headerLength + shardBlockLength > shardSize) { - const overage = headerLength + shardBlockLength - shardSize - const lastShard = [] - let lastShardBlockLength = 0 - while (lastShardBlockLength < overage) { + + // if adding CAR root overflows the shard limit we move overflowing + // blocks into a another CAR. + if (headerLength + currentLength > shardSize) { + const overage = headerLength + currentLength - shardSize + const overflowBlocks = [] + let overflowCurrentLength = 0 + while (overflowCurrentLength < overage) { + const block = blocks[blocks.length - 1] + blocks.pop() + overflowBlocks.unshift(block) + overflowCurrentLength += blockEncodingLength(block) + // need at least 1 block in original shard - if (shard.length < 2) - throw new Error(`block exceeds shard size: ${shard.at(-1)?.cid}`) - const block = shard[shard.length - 1] - shard.pop() - lastShard.unshift(block) - lastShardBlockLength += blockEncodingLength(block) + if (blocks.length < 1) + throw new Error(`block will cause CAR to exceed shard size: ${block.cid}`) } - controller.enqueue(await encode(shard)) - controller.enqueue(await encode(lastShard, rootCID)) + controller.enqueue(await encode(blocks)) + controller.enqueue(await encode(overflowBlocks, rootCID)) } else { - controller.enqueue(await encode(shard, rootCID)) + controller.enqueue(await encode(blocks, rootCID)) } }, }) diff --git a/packages/upload-client/src/unixfs.js b/packages/upload-client/src/unixfs.js index 91aff7c8e..942e2ea95 100644 --- a/packages/upload-client/src/unixfs.js +++ b/packages/upload-client/src/unixfs.js @@ -34,12 +34,8 @@ export function createFileEncoderStream(blob) { const unixfsWriter = UnixFS.createWriter({ writable, settings }) const fileBuilder = new UnixFSFileBuilder('', blob) void (async () => { - try { - await fileBuilder.finalize(unixfsWriter) - await unixfsWriter.close() - } catch (err) { - console.error(err) - } + await fileBuilder.finalize(unixfsWriter) + await unixfsWriter.close() })() return readable } diff --git a/packages/upload-client/test/sharding.test.js b/packages/upload-client/test/sharding.test.js index 7aaad4bea..da11a4af5 100644 --- a/packages/upload-client/test/sharding.test.js +++ b/packages/upload-client/test/sharding.test.js @@ -61,18 +61,22 @@ describe('ShardingStream', () => { assert.equal(shards[0].roots[0].toString(), rootCID.toString()) }) - it('fails to shard block that exceeds shard size when encoded', async () => { - const file = new Blob([await randomBytes(128)]) + it('exceeds shard size when block bigger than shard size is encoded', async () => { await assert.rejects( - () => - createFileEncoderStream(file) - .pipeThrough(new ShardingStream({ shardSize: 64 })) - .pipeTo(new WritableStream()), - /block exceeds shard size/ + () => new ReadableStream({ + async pull(controller) { + const block = await randomBlock(128) + controller.enqueue(block) + controller.close() + }, + }) + .pipeThrough(new ShardingStream({ shardSize: 64 })) + .pipeTo(new WritableStream()), + /block will cause CAR to exceed shard size/ ) }) - it('reduces final shard to accomodate CAR header with root CID', async () => { + it('creates overflow shard when CAR header with root CID exceeds shard size', async () => { const blocks = [ await randomBlock(128), // encoded block length = 166 await randomBlock(64), // encoded block length = 102 @@ -108,7 +112,7 @@ describe('ShardingStream', () => { assert.equal(shards.length, 3) }) - it('fails to shard block that exceeds shard size when encoded with root CID', async () => { + it('exceeds shard size when block is encoded with root CID', async () => { const blocks = [ await randomBlock(128), // encoded block length = 166 ] @@ -129,7 +133,7 @@ describe('ShardingStream', () => { .pipeThrough(new ShardingStream({ shardSize: 183 })) .pipeTo(new WritableStream()) ) - }, /block exceeds shard size/) + }, /block will cause CAR to exceed shard size/) }) it('no blocks no shards', async () => { From fa642081584b9facbaf96cf908ae739bdc08bd60 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Thu, 7 Sep 2023 10:11:07 +0100 Subject: [PATCH 6/6] chore: appease linter --- packages/upload-client/src/sharding.js | 8 ++++++-- packages/upload-client/test/sharding.test.js | 9 +++++---- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/packages/upload-client/src/sharding.js b/packages/upload-client/src/sharding.js index eef77290c..992ce1f8f 100644 --- a/packages/upload-client/src/sharding.js +++ b/packages/upload-client/src/sharding.js @@ -35,7 +35,9 @@ export class ShardingStream extends TransformStream { const blockLength = blockEncodingLength(block) if (blockLength > maxBlockLength) { - throw new Error(`block will cause CAR to exceed shard size: ${block.cid}`) + throw new Error( + `block will cause CAR to exceed shard size: ${block.cid}` + ) } if (blocks.length && currentLength + blockLength > maxBlockLength) { @@ -72,7 +74,9 @@ export class ShardingStream extends TransformStream { // need at least 1 block in original shard if (blocks.length < 1) - throw new Error(`block will cause CAR to exceed shard size: ${block.cid}`) + throw new Error( + `block will cause CAR to exceed shard size: ${block.cid}` + ) } controller.enqueue(await encode(blocks)) controller.enqueue(await encode(overflowBlocks, rootCID)) diff --git a/packages/upload-client/test/sharding.test.js b/packages/upload-client/test/sharding.test.js index da11a4af5..2177be4e9 100644 --- a/packages/upload-client/test/sharding.test.js +++ b/packages/upload-client/test/sharding.test.js @@ -63,15 +63,16 @@ describe('ShardingStream', () => { it('exceeds shard size when block bigger than shard size is encoded', async () => { await assert.rejects( - () => new ReadableStream({ + () => + new ReadableStream({ async pull(controller) { - const block = await randomBlock(128) + const block = await randomBlock(128) controller.enqueue(block) controller.close() }, }) - .pipeThrough(new ShardingStream({ shardSize: 64 })) - .pipeTo(new WritableStream()), + .pipeThrough(new ShardingStream({ shardSize: 64 })) + .pipeTo(new WritableStream()), /block will cause CAR to exceed shard size/ ) })