Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: adjust shard size #908

Merged
merged 8 commits into from
Sep 7, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions packages/upload-client/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@
"dist/src/**/*.d.ts.map"
],
"dependencies": {
"@ipld/car": "^5.0.3",
"@ipld/car": "^5.2.2",
"@ipld/dag-cbor": "^9.0.0",
"@ipld/dag-ucan": "^3.2.0",
"@ipld/unixfs": "^2.1.1",
"@ucanto/client": "^8.0.0",
Expand All @@ -74,11 +75,13 @@
"ipfs-utils": "^9.0.14",
"multiformats": "^11.0.2",
"p-queue": "^7.3.0",
"p-retry": "^5.1.2"
"p-retry": "^5.1.2",
"varint": "^6.0.0"
},
"devDependencies": {
"@types/assert": "^1.5.6",
"@types/mocha": "^10.0.1",
"@types/varint": "^6.0.1",
"@ucanto/principal": "^8.0.0",
"@ucanto/server": "^8.0.1",
"assert": "^2.0.0",
Expand Down
22 changes: 22 additions & 0 deletions packages/upload-client/src/car.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,31 @@
import { CarBlockIterator, CarWriter } from '@ipld/car'
import * as dagCBOR from '@ipld/dag-cbor'
import varint from 'varint'

/**
* @typedef {import('@ipld/unixfs').Block} Block
*/

/** Byte length of a CBOR encoded CAR header with zero roots. */
const NO_ROOTS_HEADER_LENGTH = 17

/** @param {import('./types').AnyLink} [root] */
export function headerEncodingLength(root) {
if (!root) return NO_ROOTS_HEADER_LENGTH
const headerLength = dagCBOR.encode({ version: 1, roots: [root] }).length
const varintLength = varint.encodingLength(headerLength)
return varintLength + headerLength
}

/** @param {Block} block */
export function blockEncodingLength(block) {
const varintLength = varint.encodingLength(
block.cid.bytes.length + block.bytes.length
)
const cidLength = block.cid.bytes.length
return varintLength + cidLength + block.bytes.length
alanshaw marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* @param {Iterable<Block> | AsyncIterable<Block>} blocks
* @param {import('./types').AnyLink} [root]
Expand Down
49 changes: 39 additions & 10 deletions packages/upload-client/src/sharding.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import Queue from 'p-queue'
import { encode } from './car.js'
import { blockEncodingLength, encode, headerEncodingLength } from './car.js'
import { add } from './store.js'

const SHARD_SIZE = 1024 * 1024 * 100
// https://observablehq.com/@gozala/w3up-shard-size
const SHARD_SIZE = 133_169_152
const CONCURRENT_UPLOADS = 3

/**
Expand All @@ -22,21 +23,30 @@ export class ShardingStream extends TransformStream {
let shard = []
alanshaw marked this conversation as resolved.
Show resolved Hide resolved
/** @type {import('@ipld/unixfs').Block[] | null} */
let readyShard = null
let size = 0
let shardBlockLength = 0
alanshaw marked this conversation as resolved.
Show resolved Hide resolved

super({
async transform(block, controller) {
if (readyShard != null) {
controller.enqueue(await encode(readyShard))
readyShard = null
}
if (shard.length && size + block.bytes.length > shardSize) {

const blockLength = blockEncodingLength(block)
if (headerEncodingLength() + blockLength > shardSize) {
alanshaw marked this conversation as resolved.
Show resolved Hide resolved
throw new Error(`block exceeds shard size: ${block.cid}`)
}

if (
shard.length &&
headerEncodingLength() + shardBlockLength + blockLength > shardSize
) {
readyShard = shard
shard = []
size = 0
shardBlockLength = 0
}
shard.push(block)
size += block.bytes.length
shardBlockLength += blockLength
},

async flush(controller) {
Expand All @@ -45,10 +55,29 @@ export class ShardingStream extends TransformStream {
}

const rootBlock = shard.at(-1)
if (rootBlock != null) {
controller.enqueue(
await encode(shard, options.rootCID ?? rootBlock.cid)
)
if (rootBlock == null) return

const rootCID = options.rootCID ?? rootBlock.cid
const headerLength = headerEncodingLength(rootCID)
// does the shard with the CAR header that _includes_ a root CID
// exceed the shard size?
alanshaw marked this conversation as resolved.
Show resolved Hide resolved
if (headerLength + shardBlockLength > shardSize) {
const overage = headerLength + shardBlockLength - shardSize
const lastShard = []
let lastShardBlockLength = 0
while (lastShardBlockLength < overage) {
// need at least 1 block in original shard
if (shard.length < 2)
throw new Error(`block exceeds shard size: ${shard.at(-1)?.cid}`)
const block = shard[shard.length - 1]
shard.pop()
lastShard.unshift(block)
lastShardBlockLength += blockEncodingLength(block)
alanshaw marked this conversation as resolved.
Show resolved Hide resolved
}
controller.enqueue(await encode(shard))
controller.enqueue(await encode(lastShard, rootCID))
} else {
controller.enqueue(await encode(shard, rootCID))
}
},
})
Expand Down
8 changes: 6 additions & 2 deletions packages/upload-client/src/unixfs.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,12 @@ export function createFileEncoderStream(blob) {
const unixfsWriter = UnixFS.createWriter({ writable, settings })
const fileBuilder = new UnixFSFileBuilder('', blob)
void (async () => {
await fileBuilder.finalize(unixfsWriter)
await unixfsWriter.close()
try {
await fileBuilder.finalize(unixfsWriter)
await unixfsWriter.close()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calling close on a writable stream that has errored causes this to throw an uncaught exception.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

☝️ This should probably be a code comment, also I don't think just logging an error is much better, browsers will log such errors and in node you can decide what to do. If we just log error things will appear to work but then break don't they ?

We should probably propagate error into returned readable somehow although I have to say I'm not sure how can we do that.

Any idea in what circumstances writable stream would error, it's not even exposed outside so if it errors perhaps it's problem in the UnixFS writer that can be fixed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have removed and refactored the test so we don't encounter it.

What was causing it was the tests that were triggering the ShardingStream to error because a block is too big and it caused the CAR to exceed the shard size.

} catch (err) {
console.error(err)
}
})()
return readable
}
Expand Down
36 changes: 25 additions & 11 deletions packages/upload-client/test/index.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@ import { randomBlock, randomBytes } from './helpers/random.js'
import { toCAR } from './helpers/car.js'
import { File } from './helpers/shims.js'
import { mockService } from './helpers/mocks.js'
import { encode } from '../src/car.js'
import {
blockEncodingLength,
encode,
headerEncodingLength,
} from '../src/car.js'

describe('uploadFile', () => {
it('uploads a file to the service', async () => {
Expand Down Expand Up @@ -173,12 +177,19 @@ describe('uploadFile', () => {
file,
{
connection,
shardSize: 1024 * 1024 * 2, // should end up with 2 CAR files
// chunk size = 1_048_576
// encoded block size = 1_048_615
// shard size = 2_097_152 (as configured below)
// total file size = 5_242_880 (as above)
// so, at least 2 shards, but 2 encoded blocks (_without_ CAR header) = 2_097_230
// ...which is > shard size of 2_097_152
// so we actually end up with a shard for each block - 5 CARs!
shardSize: 1024 * 1024 * 2,
onShardStored: (meta) => carCIDs.push(meta.cid),
}
)

assert.equal(carCIDs.length, 3)
assert.equal(carCIDs.length, 5)
})
})

Expand Down Expand Up @@ -344,7 +355,7 @@ describe('uploadDirectory', () => {
files,
{
connection,
shardSize: 400_000, // should end up with 2 CAR files
shardSize: 500_056, // should end up with 2 CAR files
onShardStored: (meta) => carCIDs.push(meta.cid),
}
)
Expand All @@ -358,15 +369,18 @@ describe('uploadCAR', () => {
const space = await Signer.generate()
const agent = await Signer.generate()
const blocks = [
await randomBlock(32),
await randomBlock(32),
await randomBlock(32),
await randomBlock(128),
await randomBlock(128),
await randomBlock(128),
]
const car = await encode(blocks, blocks.at(-1)?.cid)
// shard size 1 block less than total = 2 expected CAR shards
const shardSize = blocks
.slice(0, -1)
.reduce((size, block) => size + block.bytes.length, 0)
// Wanted: 2 shards
// 2 * CAR header (34) + 2 * blocks (256), 2 * block encoding prefix (78)
const shardSize =
headerEncodingLength() * 2 +
blocks
.slice(0, -1)
.reduce((size, block) => size + blockEncodingLength(block), 0)

/** @type {import('../src/types').CARLink[]} */
const carCIDs = []
Expand Down
91 changes: 90 additions & 1 deletion packages/upload-client/test/sharding.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { CID } from 'multiformats'
import { createFileEncoderStream } from '../src/unixfs.js'
import { ShardingStream, ShardStoringStream } from '../src/sharding.js'
import { serviceSigner } from './fixtures.js'
import { randomBytes, randomCAR } from './helpers/random.js'
import { randomBlock, randomBytes, randomCAR } from './helpers/random.js'
import { mockService } from './helpers/mocks.js'

describe('ShardingStream', () => {
Expand Down Expand Up @@ -60,6 +60,95 @@ describe('ShardingStream', () => {
assert.equal(shards.length, 1)
assert.equal(shards[0].roots[0].toString(), rootCID.toString())
})

it('fails to shard block that exceeds shard size when encoded', async () => {
const file = new Blob([await randomBytes(128)])
await assert.rejects(
() =>
createFileEncoderStream(file)
.pipeThrough(new ShardingStream({ shardSize: 64 }))
.pipeTo(new WritableStream()),
/block exceeds shard size/
)
})

it('reduces final shard to accomodate CAR header with root CID', async () => {
const blocks = [
await randomBlock(128), // encoded block length = 166
await randomBlock(64), // encoded block length = 102
await randomBlock(32), // encoded block length = 70
]

/** @type {import('../src/types').CARFile[]} */
const shards = []
await new ReadableStream({
pull(controller) {
const block = blocks.shift()
if (!block) return controller.close()
controller.enqueue(block)
},
})
// shard with no roots = encoded block (166) + CAR header (17) = 183
// shard with no roots = encoded block (102) + CAR header (17) = 119
// shard with 1 root = encoded block (70) + CAR header (17) = 87
// shard with 1 root = encoded block (70) + CAR header (59) = 155
// i.e. shard size of 206 (119 + 87) should allow us 1 shard with 0 roots
// and then 1 shard with 2 blocks that, when encoded as a CAR with 1 root
// will actually exceed the shard size. It must then be refactored into
// 2 shards.
.pipeThrough(new ShardingStream({ shardSize: 206 }))
.pipeTo(
new WritableStream({
write: (s) => {
shards.push(s)
},
})
)

assert.equal(shards.length, 3)
})

it('fails to shard block that exceeds shard size when encoded with root CID', async () => {
const blocks = [
await randomBlock(128), // encoded block length = 166
]

await assert.rejects(() => {
return (
new ReadableStream({
pull(controller) {
const block = blocks.shift()
if (!block) return controller.close()
controller.enqueue(block)
},
})
// shard with no roots = encoded block (166) + CAR header (17) = 183
// shard with 1 root = encoded block (166) + CAR header (59) = 225
// i.e. shard size of 183 should allow us 1 shard with no roots and then
// we'll fail to create a shard with 1 root.
.pipeThrough(new ShardingStream({ shardSize: 183 }))
.pipeTo(new WritableStream())
)
}, /block exceeds shard size/)
})

it('no blocks no shards', async () => {
let shards = 0
await new ReadableStream({
pull: (controller) => {
controller.close()
},
})
.pipeThrough(new ShardingStream({ shardSize: 206 }))
.pipeTo(
new WritableStream({
write: () => {
shards++
},
})
)
assert.equal(shards, 0)
})
})

describe('ShardStoringStream', () => {
Expand Down
Loading
Loading