Skip to content

Commit

Permalink
feat: Make IndexAPI call the database (#2201)
Browse files Browse the repository at this point in the history
* feat: Make IndexAPI call the database

* feat: use Repository::streamState to get StreamState by StreamID

* chore: simpler preparation of queryIndex result

* Update packages/core/src/indexing/local-index-api.ts

Co-authored-by: Spencer T Brody <[email protected]>

* chore: After-rebase fixes

Co-authored-by: Spencer T Brody <[email protected]>
  • Loading branch information
ukstv and stbrody authored Jun 6, 2022
1 parent a60ae1e commit 0c8aad4
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 7 deletions.
2 changes: 1 addition & 1 deletion packages/core/src/ceramic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,6 @@ export class Ceramic implements CeramicApi {
this._gateway = params.gateway
this._networkOptions = params.networkOptions
this._loadOptsOverride = params.loadOptsOverride
this._index = new LocalIndexApi(modules.indexing)

this.context = {
api: this,
Expand Down Expand Up @@ -234,6 +233,7 @@ export class Ceramic implements CeramicApi {
anchorService: modules.anchorService,
conflictResolution: conflictResolution,
})
this._index = new LocalIndexApi(modules.indexing, this.repository, this._logger)
}

get index(): IndexApi {
Expand Down
66 changes: 66 additions & 0 deletions packages/core/src/indexing/__tests__/index-api.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import { jest } from '@jest/globals'
import type { DatabaseIndexApi } from '../database-index-api.js'
import type { Repository } from '../../state-management/repository.js'
import type { DiagnosticsLogger, Page } from '@ceramicnetwork/common'
import { IndexApi } from '../index-api.js'
import { randomString } from '@stablelib/random'

const randomInt = (max: number) => Math.floor(Math.random() * max)

describe('with database backend', () => {
test('return page from the database', async () => {
const query = { model: 'foo', first: randomInt(100) }
const backendPage: Page<string> = {
entries: Array.from({ length: query.first }).map(() => randomString(3)),
pageInfo: {
hasPreviousPage: true,
hasNextPage: true,
startCursor: 'startCursor',
endCursor: 'endCursor',
},
}
const pageFn = jest.fn(async () => backendPage)
const streamStateFn = jest.fn(async (streamId: any) => {
return {
type: 1,
content: streamId,
}
})
const fauxBackend = { page: pageFn } as unknown as DatabaseIndexApi
const fauxRepository = { streamState: streamStateFn } as unknown as Repository
const fauxLogger = {} as DiagnosticsLogger
const indexApi = new IndexApi(fauxBackend, fauxRepository, fauxLogger)
const response = await indexApi.queryIndex(query)
// Call databaseIndexApi::page function
expect(pageFn).toBeCalledTimes(1)
expect(pageFn).toBeCalledWith(query)
// We pass pageInfo through
expect(response.pageInfo).toEqual(backendPage.pageInfo)
// Transform from StreamId to StreamState via Repository::streamState
expect(streamStateFn).toBeCalledTimes(query.first)
backendPage.entries.forEach((fauxStreamId) => {
expect(streamStateFn).toBeCalledWith(fauxStreamId)
})
expect(response.entries.map((e) => e.content)).toEqual(backendPage.entries)
})
})

describe('without database backend', () => {
test('return an empty response', async () => {
const fauxRepository = {} as unknown as Repository
const warnFn = jest.fn()
const fauxLogger = { warn: warnFn } as unknown as DiagnosticsLogger
const indexApi = new IndexApi(undefined, fauxRepository, fauxLogger)
const response = await indexApi.queryIndex({ model: 'foo', first: 5 })
// Return an empty response
expect(response).toEqual({
entries: [],
pageInfo: {
hasNextPage: false,
hasPreviousPage: false,
},
})
// Log a warning
expect(warnFn).toBeCalledTimes(1)
})
})
2 changes: 1 addition & 1 deletion packages/core/src/indexing/database-index-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ export interface DatabaseIndexApi {
indexStream(args: IndexStreamArgs): Promise<void>

/**
* Query the index
* Query the index.
*/
page(query: BaseQuery & Pagination): Promise<Page<StreamID>>

Expand Down
50 changes: 45 additions & 5 deletions packages/core/src/indexing/local-index-api.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,52 @@
import type { BaseQuery, IndexApi, Page, Pagination, StreamState } from '@ceramicnetwork/common'
import type {
BaseQuery,
IndexApi,
Page,
Pagination,
StreamState,
DiagnosticsLogger,
} from '@ceramicnetwork/common'
import { SyncOptions } from '@ceramicnetwork/common'
import type { DatabaseIndexApi } from './database-index-api.js'
import { NotImplementedError } from './not-implemented-error.js'
import type { Repository } from '../state-management/repository.js'

/**
* API to query an index.
*/
export class LocalIndexApi implements IndexApi {
constructor(private readonly databaseIndexApi: DatabaseIndexApi | undefined) {}
constructor(
private readonly databaseIndexApi: DatabaseIndexApi | undefined,
private readonly repository: Repository,
private readonly logger: DiagnosticsLogger
) {}

queryIndex(query: BaseQuery & Pagination): Promise<Page<StreamState>> {
throw new NotImplementedError('IndexApi::queryIndex')
/**
* Query the index. Ask an indexing database for a list of StreamIDs,
* and convert them to corresponding StreamState instances via `Repository::streamState`.
*
* We assume that a state store always contains StreamState for an indexed stream.
*/
async queryIndex(query: BaseQuery & Pagination): Promise<Page<StreamState>> {
if (this.databaseIndexApi) {
const page = await this.databaseIndexApi.page(query)
const streamStates = await Promise.all(
// For database queries we bypass the stream cache and repository loading queue
page.entries.map((streamId) => this.repository.streamState(streamId))
)
return {
entries: streamStates,
pageInfo: page.pageInfo,
}
} else {
this.logger.warn(`Indexing is not configured. Unable to serve query ${JSON.stringify(query)}`)
return {
entries: [],
pageInfo: {
hasNextPage: false,
hasPreviousPage: false,
},
}
}
}

async init(): Promise<void> {
Expand Down

0 comments on commit 0c8aad4

Please sign in to comment.