Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(filecoin-api): paginated queries #1521

Merged
merged 5 commits into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions packages/filecoin-api/src/aggregator/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,15 @@ import {
ServiceConfig,
} from '../types.js'

export type PieceStore = UpdatableStore<PieceRecordKey, PieceRecord>
export type PieceStore = Store<PieceRecordKey, PieceRecord> &
UpdatableStore<PieceRecordKey, PieceRecord>
export type PieceQueue = Queue<PieceMessage>
export type BufferQueue = Queue<BufferMessage>
export type BufferStore = Store<Link, BufferRecord>
export type AggregateStore = Store<AggregateRecordKey, AggregateRecord>
export type PieceAcceptQueue = Queue<PieceAcceptMessage>
export type InclusionStore = QueryableStore<
InclusionRecordKey,
InclusionRecord,
InclusionRecordQueryByGroup
>
export type InclusionStore = Store<InclusionRecordKey, InclusionRecord> &
QueryableStore<InclusionRecordQueryByGroup, InclusionRecord>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I flattened the store types to give us more flexibility and simplicity. They can be composed as shown and we now have the ability to express in types a read only store for querying data (for example).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 here for composable types :)

export type AggregateOfferQueue = Queue<AggregateOfferMessage>

export interface ServiceContext {
Expand Down
4 changes: 2 additions & 2 deletions packages/filecoin-api/src/aggregator/service.js
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ export const pieceAccept = async ({ capability }, context) => {
error: new StoreOperationFailed(getInclusionRes.error?.message),
}
}
if (!getInclusionRes.ok.length) {
if (!getInclusionRes.ok.results.length) {
return {
error: new UnexpectedState(
`no inclusion proof found for pair {${piece}, ${group}}`
Expand All @@ -91,7 +91,7 @@ export const pieceAccept = async ({ capability }, context) => {
}

// Get buffered pieces
const [{ aggregate, inclusion }] = getInclusionRes.ok
const [{ aggregate, inclusion }] = getInclusionRes.ok.results
const getAggregateRes = await context.aggregateStore.get({ aggregate })
if (getAggregateRes.error) {
return {
Expand Down
9 changes: 3 additions & 6 deletions packages/filecoin-api/src/deal-tracker/api.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
import type { Signer } from '@ucanto/interface'
import { PieceLink } from '@web3-storage/data-segment'
import { QueryableStore } from '../types.js'
import { Store, QueryableStore } from '../types.js'

export type DealStore = QueryableStore<
DealRecordKey,
DealRecord,
DealRecordQueryByPiece
>
export type DealStore = Store<DealRecordKey, DealRecord> &
QueryableStore<DealRecordQueryByPiece, DealRecord>

export interface ServiceContext {
/**
Expand Down
17 changes: 12 additions & 5 deletions packages/filecoin-api/src/deal-tracker/service.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,23 @@ import { StoreOperationFailed } from '../errors.js'
export const dealInfo = async ({ capability }, context) => {
const { piece } = capability.nb

const storeGet = await context.dealStore.query({ piece })
if (storeGet.error) {
return {
error: new StoreOperationFailed(storeGet.error.message),
const records = []
/** @type {string|undefined} */
let cursor
while (true) {
const storeQuery = await context.dealStore.query({ piece }, { cursor })
if (storeQuery.error) {
return { error: new StoreOperationFailed(storeQuery.error.message) }
}

records.push(...storeQuery.ok.results)
cursor = storeQuery.ok.cursor
if (!cursor) break
}

return {
ok: {
deals: storeGet.ok.reduce((acc, curr) => {
deals: records.reduce((acc, curr) => {
acc[`${curr.dealId}`] = {
provider: curr.provider,
}
Expand Down
14 changes: 7 additions & 7 deletions packages/filecoin-api/src/dealer/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,17 @@ import {
DealTrackerService,
} from '@web3-storage/filecoin-client/types'
import {
Store,
UpdatableStore,
UpdatableAndQueryableStore,
QueryableStore,
ServiceConfig,
} from '../types.js'

export type OfferStore<OfferDoc> = UpdatableStore<string, OfferDoc>
export type AggregateStore = UpdatableAndQueryableStore<
AggregateRecordKey,
AggregateRecord,
Pick<AggregateRecord, 'status'>
>
export type OfferStore<OfferDoc> = Store<string, OfferDoc> &
UpdatableStore<string, OfferDoc>
export type AggregateStore = Store<AggregateRecordKey, AggregateRecord> &
UpdatableStore<AggregateRecordKey, AggregateRecord> &
QueryableStore<Pick<AggregateRecord, 'status'>, AggregateRecord>

export interface ServiceContext<OfferDoc = OfferDocument> {
id: Signer
Expand Down
74 changes: 45 additions & 29 deletions packages/filecoin-api/src/dealer/events.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import { StoreOperationFailed } from '../errors.js'
* @typedef {import('./api.js').AggregateRecordKey} AggregateRecordKey
*/

/** Max items per page of query. */
const MAX_PAGE_SIZE = 20

/**
* On aggregate insert event, update offer key with date to be retrievable by broker.
*
Expand Down Expand Up @@ -55,45 +58,58 @@ export const handleAggregateUpdatedStatus = async (context, record) => {
* @param {import('./api.js').CronContext} context
*/
export const handleCronTick = async (context) => {
// Get offered deals pending approval/rejection
const offeredDeals = await context.aggregateStore.query({
status: 'offered',
})
if (offeredDeals.error) {
return {
error: offeredDeals.error,
let totalDealsCount = 0
let updatedDealsCount = 0
/** @type {string|undefined} */
let cursor
while (true) {
// Get offered deals pending approval/rejection
const offeredDeals = await context.aggregateStore.query(
{
status: 'offered',
},
{ cursor, size: MAX_PAGE_SIZE }
)
if (offeredDeals.error) {
return {
error: offeredDeals.error,
}
}
}

// Update approved deals from the ones resolved
const updatedResponses = await Promise.all(
offeredDeals.ok.map((deal) =>
updateApprovedDeals({
deal,
aggregateStore: context.aggregateStore,
dealTrackerServiceConnection: context.dealTrackerService.connection,
dealTrackerInvocationConfig:
context.dealTrackerService.invocationConfig,
})
totalDealsCount += offeredDeals.ok.results.length

// Update approved deals from the ones resolved
const updatedResponses = await Promise.all(
offeredDeals.ok.results.map((deal) =>
updateApprovedDeals({
deal,
aggregateStore: context.aggregateStore,
dealTrackerServiceConnection: context.dealTrackerService.connection,
dealTrackerInvocationConfig:
context.dealTrackerService.invocationConfig,
})
)
)
)

// Fail if one or more update operations did not succeed.
// The successful ones are still valid, but we should keep track of errors for monitoring/alerting.
const updateErrorResponse = updatedResponses.find((r) => r.error)
if (updateErrorResponse) {
return {
error: updateErrorResponse.error,
// Fail if one or more update operations did not succeed.
// The successful ones are still valid, but we should keep track of errors for monitoring/alerting.
const updateErrorResponse = updatedResponses.find((r) => r.error)
if (updateErrorResponse) {
return {
error: updateErrorResponse.error,
}
}

updatedDealsCount += updatedResponses.filter((r) => r.ok?.updated).length
cursor = offeredDeals.ok.cursor
if (!cursor) break
}

// Return successful update operation
// Include in response the ones that were Updated, and the ones still pending response.
const updatedDealsCount = updatedResponses.filter((r) => r.ok?.updated).length
return {
ok: {
updatedCount: updatedDealsCount,
pendingCount: updatedResponses.length - updatedDealsCount,
pendingCount: totalDealsCount - updatedDealsCount,
},
}
}
Expand All @@ -103,7 +119,7 @@ export const handleCronTick = async (context) => {
*
* @param {object} context
* @param {AggregateRecord} context.deal
* @param {import('../types.js').UpdatableAndQueryableStore<AggregateRecordKey, AggregateRecord, Pick<AggregateRecord, 'status'>>} context.aggregateStore
* @param {import('../types.js').UpdatableStore<AggregateRecordKey, AggregateRecord>} context.aggregateStore
* @param {import('@ucanto/interface').ConnectionView<any>} context.dealTrackerServiceConnection
* @param {import('@web3-storage/filecoin-client/types').InvocationConfig} context.dealTrackerInvocationConfig
*/
Expand Down
11 changes: 5 additions & 6 deletions packages/filecoin-api/src/storefront/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,16 @@ import {
} from '@web3-storage/filecoin-client/types'
import {
Store,
UpdatableAndQueryableStore,
UpdatableStore,
QueryableStore,
Queue,
ServiceConfig,
StoreGetError,
} from '../types.js'

export type PieceStore = UpdatableAndQueryableStore<
PieceRecordKey,
PieceRecord,
Pick<PieceRecord, 'status'>
>
export type PieceStore = Store<PieceRecordKey, PieceRecord> &
UpdatableStore<PieceRecordKey, PieceRecord> &
QueryableStore<Pick<PieceRecord, 'status'>, PieceRecord>
export type FilecoinSubmitQueue = Queue<FilecoinSubmitMessage>
export type PieceOfferQueue = Queue<PieceOfferMessage>
export type TaskStore = Store<UnknownLink, Invocation>
Expand Down
81 changes: 46 additions & 35 deletions packages/filecoin-api/src/storefront/events.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import pMap from 'p-map'
import { Storefront, Aggregator } from '@web3-storage/filecoin-client'
import * as AggregatorCaps from '@web3-storage/capabilities/filecoin/aggregator'
import { Assert } from '@web3-storage/content-claims/capability'
Expand All @@ -17,9 +16,12 @@ import {
/**
* @typedef {import('./api.js').PieceRecord} PieceRecord
* @typedef {import('./api.js').PieceRecordKey} PieceRecordKey
* @typedef {import('../types.js').UpdatableAndQueryableStore<PieceRecordKey, PieceRecord, Pick<PieceRecord, 'status'>>} PieceStore
* @typedef {import('./api.js').PieceStore} PieceStore
*/

/** Max items per page of query. */
const MAX_PAGE_SIZE = 20

/**
* On filecoin submit queue messages, validate piece for given content and store it in store.
*
Expand Down Expand Up @@ -185,49 +187,58 @@ export const handlePieceStatusUpdate = async (context, record) => {
* @param {import('./api.js').CronContext} context
*/
export const handleCronTick = async (context) => {
const submittedPieces = await context.pieceStore.query({
status: 'submitted',
})
if (submittedPieces.error) {
return {
error: submittedPieces.error,
}
}
// Update approved pieces from the ones resolved
const updatedResponses = await pMap(
submittedPieces.ok,
(pieceRecord) =>
updatePiecesWithDeal({
id: context.id,
aggregatorId: context.aggregatorId,
pieceRecord,
pieceStore: context.pieceStore,
taskStore: context.taskStore,
receiptStore: context.receiptStore,
}),
{
concurrency: 20,
let totalPiecesCount = 0
let updatedPiecesCount = 0
/** @type {string|undefined} */
let cursor
while (true) {
const submittedPieces = await context.pieceStore.query(
{
status: 'submitted',
},
{ cursor, size: MAX_PAGE_SIZE }
)
if (submittedPieces.error) {
return {
error: submittedPieces.error,
}
}
)
totalPiecesCount += submittedPieces.ok.results.length

// Fail if one or more update operations did not succeed.
// The successful ones are still valid, but we should keep track of errors for monitoring/alerting.
const updateErrorResponse = updatedResponses.find((r) => r.error)
if (updateErrorResponse) {
return {
error: updateErrorResponse.error,
// Update approved pieces from the ones resolved
const updatedResponses = await Promise.all(
submittedPieces.ok.results.map((pieceRecord) =>
updatePiecesWithDeal({
id: context.id,
aggregatorId: context.aggregatorId,
pieceRecord,
pieceStore: context.pieceStore,
taskStore: context.taskStore,
receiptStore: context.receiptStore,
})
)
)

// Fail if one or more update operations did not succeed.
// The successful ones are still valid, but we should keep track of errors for monitoring/alerting.
const updateErrorResponse = updatedResponses.find((r) => r.error)
if (updateErrorResponse) {
return {
error: updateErrorResponse.error,
}
}

updatedPiecesCount += updatedResponses.filter((r) => r.ok?.updated).length
cursor = submittedPieces.ok.cursor
if (!cursor) break
}

// Return successful update operation
// Include in response the ones that were Updated, and the ones still pending response.
const updatedPiecesCount = updatedResponses.filter(
(r) => r.ok?.updated
).length
return {
ok: {
updatedCount: updatedPiecesCount,
pendingCount: updatedResponses.length - updatedPiecesCount,
pendingCount: totalPiecesCount - updatedPiecesCount,
},
}
}
Expand Down
Loading
Loading