Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: adjust shard size #908

Merged
merged 8 commits into from
Sep 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions packages/upload-client/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@
"dist/src/**/*.d.ts.map"
],
"dependencies": {
"@ipld/car": "^5.0.3",
"@ipld/car": "^5.2.2",
"@ipld/dag-cbor": "^9.0.0",
"@ipld/dag-ucan": "^3.2.0",
"@ipld/unixfs": "^2.1.1",
"@ucanto/client": "^8.0.0",
Expand All @@ -74,11 +75,13 @@
"ipfs-utils": "^9.0.14",
"multiformats": "^11.0.2",
"p-queue": "^7.3.0",
"p-retry": "^5.1.2"
"p-retry": "^5.1.2",
"varint": "^6.0.0"
},
"devDependencies": {
"@types/assert": "^1.5.6",
"@types/mocha": "^10.0.1",
"@types/varint": "^6.0.1",
"@ucanto/principal": "^8.0.0",
"@ucanto/server": "^8.0.1",
"assert": "^2.0.0",
Expand Down
20 changes: 20 additions & 0 deletions packages/upload-client/src/car.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,29 @@
import { CarBlockIterator, CarWriter } from '@ipld/car'
import * as dagCBOR from '@ipld/dag-cbor'
import varint from 'varint'

/**
* @typedef {import('@ipld/unixfs').Block} Block
*/

/** Byte length of a CBOR encoded CAR header with zero roots. */
const NO_ROOTS_HEADER_LENGTH = 17

/** @param {import('./types').AnyLink} [root] */
export function headerEncodingLength(root) {
if (!root) return NO_ROOTS_HEADER_LENGTH
const headerLength = dagCBOR.encode({ version: 1, roots: [root] }).length
const varintLength = varint.encodingLength(headerLength)
return varintLength + headerLength
}

/** @param {Block} block */
export function blockEncodingLength(block) {
const payloadLength = block.cid.bytes.length + block.bytes.length
const varintLength = varint.encodingLength(payloadLength)
return varintLength + payloadLength
}

/**
* @param {Iterable<Block> | AsyncIterable<Block>} blocks
* @param {import('./types').AnyLink} [root]
Expand Down
75 changes: 54 additions & 21 deletions packages/upload-client/src/sharding.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import Queue from 'p-queue'
import { encode } from './car.js'
import { blockEncodingLength, encode, headerEncodingLength } from './car.js'
import { add } from './store.js'

const SHARD_SIZE = 1024 * 1024 * 100
// https://observablehq.com/@gozala/w3up-shard-size
const SHARD_SIZE = 133_169_152
const CONCURRENT_UPLOADS = 3

/**
Expand All @@ -18,37 +19,69 @@ export class ShardingStream extends TransformStream {
*/
constructor(options = {}) {
const shardSize = options.shardSize ?? SHARD_SIZE
const maxBlockLength = shardSize - headerEncodingLength()
/** @type {import('@ipld/unixfs').Block[]} */
let shard = []
let blocks = []
/** @type {import('@ipld/unixfs').Block[] | null} */
let readyShard = null
let size = 0
let readyBlocks = null
let currentLength = 0

super({
async transform(block, controller) {
if (readyShard != null) {
controller.enqueue(await encode(readyShard))
readyShard = null
if (readyBlocks != null) {
controller.enqueue(await encode(readyBlocks))
readyBlocks = null
}
if (shard.length && size + block.bytes.length > shardSize) {
readyShard = shard
shard = []
size = 0

const blockLength = blockEncodingLength(block)
if (blockLength > maxBlockLength) {
throw new Error(
`block will cause CAR to exceed shard size: ${block.cid}`
)
}

if (blocks.length && currentLength + blockLength > maxBlockLength) {
readyBlocks = blocks
blocks = []
currentLength = 0
}
shard.push(block)
size += block.bytes.length
blocks.push(block)
currentLength += blockLength
},

async flush(controller) {
if (readyShard != null) {
controller.enqueue(await encode(readyShard))
if (readyBlocks != null) {
controller.enqueue(await encode(readyBlocks))
}

const rootBlock = shard.at(-1)
if (rootBlock != null) {
controller.enqueue(
await encode(shard, options.rootCID ?? rootBlock.cid)
)
const rootBlock = blocks.at(-1)
if (rootBlock == null) return

const rootCID = options.rootCID ?? rootBlock.cid
const headerLength = headerEncodingLength(rootCID)

// if adding CAR root overflows the shard limit we move overflowing
// blocks into a another CAR.
if (headerLength + currentLength > shardSize) {
const overage = headerLength + currentLength - shardSize
const overflowBlocks = []
let overflowCurrentLength = 0
while (overflowCurrentLength < overage) {
const block = blocks[blocks.length - 1]
blocks.pop()
overflowBlocks.unshift(block)
overflowCurrentLength += blockEncodingLength(block)

// need at least 1 block in original shard
if (blocks.length < 1)
throw new Error(
`block will cause CAR to exceed shard size: ${block.cid}`
)
}
controller.enqueue(await encode(blocks))
controller.enqueue(await encode(overflowBlocks, rootCID))
} else {
controller.enqueue(await encode(blocks, rootCID))
}
},
})
Expand Down
36 changes: 25 additions & 11 deletions packages/upload-client/test/index.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@ import { randomBlock, randomBytes } from './helpers/random.js'
import { toCAR } from './helpers/car.js'
import { File } from './helpers/shims.js'
import { mockService } from './helpers/mocks.js'
import { encode } from '../src/car.js'
import {
blockEncodingLength,
encode,
headerEncodingLength,
} from '../src/car.js'

describe('uploadFile', () => {
it('uploads a file to the service', async () => {
Expand Down Expand Up @@ -173,12 +177,19 @@ describe('uploadFile', () => {
file,
{
connection,
shardSize: 1024 * 1024 * 2, // should end up with 2 CAR files
// chunk size = 1_048_576
// encoded block size = 1_048_615
// shard size = 2_097_152 (as configured below)
// total file size = 5_242_880 (as above)
// so, at least 2 shards, but 2 encoded blocks (_without_ CAR header) = 2_097_230
// ...which is > shard size of 2_097_152
// so we actually end up with a shard for each block - 5 CARs!
shardSize: 1024 * 1024 * 2,
onShardStored: (meta) => carCIDs.push(meta.cid),
}
)

assert.equal(carCIDs.length, 3)
assert.equal(carCIDs.length, 5)
})
})

Expand Down Expand Up @@ -344,7 +355,7 @@ describe('uploadDirectory', () => {
files,
{
connection,
shardSize: 400_000, // should end up with 2 CAR files
shardSize: 500_056, // should end up with 2 CAR files
onShardStored: (meta) => carCIDs.push(meta.cid),
}
)
Expand All @@ -358,15 +369,18 @@ describe('uploadCAR', () => {
const space = await Signer.generate()
const agent = await Signer.generate()
const blocks = [
await randomBlock(32),
await randomBlock(32),
await randomBlock(32),
await randomBlock(128),
await randomBlock(128),
await randomBlock(128),
]
const car = await encode(blocks, blocks.at(-1)?.cid)
// shard size 1 block less than total = 2 expected CAR shards
const shardSize = blocks
.slice(0, -1)
.reduce((size, block) => size + block.bytes.length, 0)
// Wanted: 2 shards
// 2 * CAR header (34) + 2 * blocks (256), 2 * block encoding prefix (78)
const shardSize =
headerEncodingLength() * 2 +
blocks
.slice(0, -1)
.reduce((size, block) => size + blockEncodingLength(block), 0)

/** @type {import('../src/types').CARLink[]} */
const carCIDs = []
Expand Down
96 changes: 95 additions & 1 deletion packages/upload-client/test/sharding.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { CID } from 'multiformats'
import { createFileEncoderStream } from '../src/unixfs.js'
import { ShardingStream, ShardStoringStream } from '../src/sharding.js'
import { serviceSigner } from './fixtures.js'
import { randomBytes, randomCAR } from './helpers/random.js'
import { randomBlock, randomBytes, randomCAR } from './helpers/random.js'
import { mockService } from './helpers/mocks.js'

describe('ShardingStream', () => {
Expand Down Expand Up @@ -60,6 +60,100 @@ describe('ShardingStream', () => {
assert.equal(shards.length, 1)
assert.equal(shards[0].roots[0].toString(), rootCID.toString())
})

it('exceeds shard size when block bigger than shard size is encoded', async () => {
await assert.rejects(
() =>
new ReadableStream({
async pull(controller) {
const block = await randomBlock(128)
controller.enqueue(block)
controller.close()
},
})
.pipeThrough(new ShardingStream({ shardSize: 64 }))
.pipeTo(new WritableStream()),
/block will cause CAR to exceed shard size/
)
})

it('creates overflow shard when CAR header with root CID exceeds shard size', async () => {
const blocks = [
await randomBlock(128), // encoded block length = 166
await randomBlock(64), // encoded block length = 102
await randomBlock(32), // encoded block length = 70
]

/** @type {import('../src/types').CARFile[]} */
const shards = []
await new ReadableStream({
pull(controller) {
const block = blocks.shift()
if (!block) return controller.close()
controller.enqueue(block)
},
})
// shard with no roots = encoded block (166) + CAR header (17) = 183
// shard with no roots = encoded block (102) + CAR header (17) = 119
// shard with 1 root = encoded block (70) + CAR header (17) = 87
// shard with 1 root = encoded block (70) + CAR header (59) = 155
// i.e. shard size of 206 (119 + 87) should allow us 1 shard with 0 roots
// and then 1 shard with 2 blocks that, when encoded as a CAR with 1 root
// will actually exceed the shard size. It must then be refactored into
// 2 shards.
.pipeThrough(new ShardingStream({ shardSize: 206 }))
.pipeTo(
new WritableStream({
write: (s) => {
shards.push(s)
},
})
)

assert.equal(shards.length, 3)
})

it('exceeds shard size when block is encoded with root CID', async () => {
const blocks = [
await randomBlock(128), // encoded block length = 166
]

await assert.rejects(() => {
return (
new ReadableStream({
pull(controller) {
const block = blocks.shift()
if (!block) return controller.close()
controller.enqueue(block)
},
})
// shard with no roots = encoded block (166) + CAR header (17) = 183
// shard with 1 root = encoded block (166) + CAR header (59) = 225
// i.e. shard size of 183 should allow us 1 shard with no roots and then
// we'll fail to create a shard with 1 root.
.pipeThrough(new ShardingStream({ shardSize: 183 }))
.pipeTo(new WritableStream())
)
}, /block will cause CAR to exceed shard size/)
})

it('no blocks no shards', async () => {
let shards = 0
await new ReadableStream({
pull: (controller) => {
controller.close()
},
})
.pipeThrough(new ShardingStream({ shardSize: 206 }))
.pipeTo(
new WritableStream({
write: () => {
shards++
},
})
)
assert.equal(shards, 0)
})
})

describe('ShardStoringStream', () => {
Expand Down
Loading
Loading