From b382941a44e14d01523891edeeac0d4cac2aac76 Mon Sep 17 00:00:00 2001 From: Lucas Pedroza <40873230+pnwpedro@users.noreply.github.com> Date: Thu, 21 Mar 2024 16:32:17 +0100 Subject: [PATCH 01/36] Setup pipeline for beta deploy (#236) --- concourse/pipeline.yml | 142 +++++++++++++++++++++++-- concourse/scripts/publish.sh | 7 +- concourse/tasks/integration-tests.yml | 8 +- concourse/tasks/npm-publish.yml | 5 +- concourse/tasks/query-limits-tests.yml | 6 +- 5 files changed, 147 insertions(+), 21 deletions(-) diff --git a/concourse/pipeline.yml b/concourse/pipeline.yml index 84c79c08..644feff7 100644 --- a/concourse/pipeline.yml +++ b/concourse/pipeline.yml @@ -13,12 +13,22 @@ resources: source: url: ((slack-webhook)) - - name: fauna-js-repository + - name: main.git type: git icon: github source: uri: git@github.com:fauna/fauna-js.git branch: main + tag_filter: v* + private_key: ((github-ssh-key)) + + - name: beta.git + type: git + icon: github + source: + uri: git@github.com:fauna/fauna-js.git + branch: beta + tag_filter: v* private_key: ((github-ssh-key)) - name: testtools-repo @@ -39,26 +49,36 @@ resources: aws_region: us-east-2 groups: - - name: standard-release + - name: pipeline jobs: - set-self - test - release + - test-beta + - release-beta + - name: standard-release + jobs: + - test + - release + - name: beta-release + jobs: + - test-beta + - release-beta jobs: - name: set-self serial: true plan: - - get: fauna-js-repository + - get: beta.git + - get: main.git trigger: true - set_pipeline: self - file: fauna-js-repository/concourse/pipeline.yml + file: main.git/concourse/pipeline.yml - name: test serial: true plan: - - get: fauna-js-repository - trigger: true + - get: main.git passed: - set-self @@ -67,7 +87,7 @@ jobs: - load_var: git-commit reveal: true - file: fauna-js-repository/.git/ref + file: main.git/.git/ref - in_parallel: fail_fast: false @@ -102,7 +122,8 @@ jobs: - task: query-limits-tests privileged: true - file: fauna-js-repository/concourse/tasks/query-limits-tests.yml + file: main.git/concourse/tasks/query-limits-tests.yml + input_mapping: {repo.git: main.git, testtools-repo: testtools-repo} params: QUERY_LIMITS_DB: limited QUERY_LIMITS_COLL: limitCollection @@ -119,12 +140,109 @@ jobs: serial: true public: false plan: - - get: fauna-js-repository + - get: main.git passed: - test - task: integration-tests - file: fauna-js-repository/concourse/tasks/integration-tests.yml + file: main.git/concourse/tasks/integration-tests.yml + input_mapping: {repo.git: main.git} + privileged: true + on_success: + put: notify + params: + text: "fauna-js driver release passed integration tests" + on_failure: + put: notify + params: + text: fauna-js driver release failed integration tests + + - task: publish + file: main.git/concourse/tasks/npm-publish.yml + input_mapping: {repo.git: main.git} + params: + NPM_TOKEN: ((npm_token)) + on_success: + put: notify + params: + text_file: slack-message/publish + on_failure: + put: notify + params: + text_file: slack-message/publish + + - name: test-beta + serial: true + plan: + - get: beta.git + passed: + - set-self + + - get: testtools-repo + - get: testtools-image + + - load_var: git-commit + reveal: true + file: beta.git/.git/ref + + - in_parallel: + fail_fast: false + steps: + - task: aws-lambda-tests + image: testtools-image + file: testtools-repo/fauna-driver-platform-tests/concourse/tasks/js-aws-lambda-tests.yml + params: + GIT_COMMIT: ((.:git-commit)) + FAUNA_SECRET: ((drivers-platform-tests/fauna-secret)) + AWS_LAMBDA_ROLE_ARN: ((drivers-platform-tests/aws-lambda-role-arn)) + AWS_ACCESS_KEY_ID: ((drivers-platform-tests/aws-access-key-id)) + AWS_SECRET_ACCESS_KEY: ((drivers-platform-tests/aws-secret-key)) + + - task: cloudflare-tests + image: testtools-image + file: testtools-repo/fauna-driver-platform-tests/concourse/tasks/js-cloudflare-workers-tests.yml + params: + GIT_COMMIT: ((.:git-commit)) + CLOUDFLARE_API_TOKEN: ((drivers-platform-tests/cloudflare-api-token)) + FAUNA_SECRET: ((drivers-platform-tests/fauna-secret)) + VERCEL_TOKEN: ((drivers-platform-tests/vercel-token)) + + - task: netlify-tests + image: testtools-image + file: testtools-repo/fauna-driver-platform-tests/concourse/tasks/js-netlify-tests.yml + params: + GIT_COMMIT: ((.:git-commit)) + FAUNA_SECRET: ((drivers-platform-tests/fauna-secret)) + NETLIFY_ACCOUNT: ((drivers-platform-tests/netlify-account)) + NETLIFY_AUTH_TOKEN: ((drivers-platform-tests/netlify-auth-token)) + + - task: query-limits-tests + privileged: true + file: main.git/concourse/tasks/query-limits-tests.yml + input_mapping: {repo.git: beta.git, testtools-repo: testtools-repo} + params: + QUERY_LIMITS_DB: limited + QUERY_LIMITS_COLL: limitCollection + + # - task: vercel-tests + # image: testtools-image + # file: testtools-repo/fauna-driver-platform-tests/concourse/tasks/js-vercel-tests.yml + # params: + # GIT_COMMIT: ((.:git-commit)) + # FAUNA_SECRET: ((drivers-platform-tests/fauna-secret)) + # VERCEL_TOKEN: ((drivers-platform-tests/vercel-token)) + + - name: release-beta + serial: true + public: false + plan: + - get: beta.git + passed: + - test-beta + + - task: integration-tests + file: main.git/concourse/tasks/integration-tests.yml + input_mapping: {repo.git: beta.git} privileged: true on_success: put: notify @@ -136,9 +254,11 @@ jobs: text: fauna-js driver release failed integration tests - task: publish - file: fauna-js-repository/concourse/tasks/npm-publish.yml + file: main.git/concourse/tasks/npm-publish.yml + input_mapping: {repo.git: beta.git} params: NPM_TOKEN: ((npm_token)) + NPM_TAG: beta on_success: put: notify params: diff --git a/concourse/scripts/publish.sh b/concourse/scripts/publish.sh index 3ed59c56..c75ca0bd 100755 --- a/concourse/scripts/publish.sh +++ b/concourse/scripts/publish.sh @@ -15,7 +15,12 @@ then echo "Publishing a new version..." echo "//registry.npmjs.org/:_authToken=$NPM_TOKEN" > .npmrc - npm publish + if [[ -z "$NPM_TAG" ]]; then + npm publish --tag $NPM_TAG + else + npm publish + fi + rm .npmrc echo "fauna-js@$PACKAGE_VERSION published to npm " > ../slack-message/publish diff --git a/concourse/tasks/integration-tests.yml b/concourse/tasks/integration-tests.yml index 52d961e7..f7d58f6f 100644 --- a/concourse/tasks/integration-tests.yml +++ b/concourse/tasks/integration-tests.yml @@ -15,7 +15,7 @@ params: FAUNA_PORT: inputs: - - name: fauna-js-repository + - name: repo.git run: path: entrypoint.sh @@ -24,9 +24,9 @@ run: - -ceu - | # start containers - docker-compose -f fauna-js-repository/concourse/scripts/docker-compose-fauna.yml run node-lts - docker-compose -f fauna-js-repository/concourse/scripts/docker-compose-fauna.yml run node-current + docker-compose -f repo.git/concourse/scripts/docker-compose-fauna.yml run node-lts + docker-compose -f repo.git/concourse/scripts/docker-compose-fauna.yml run node-current # stop and remove containers - docker-compose -f fauna-js-repository/concourse/scripts/docker-compose-fauna.yml down + docker-compose -f repo.git/concourse/scripts/docker-compose-fauna.yml down # remove volumes docker volume rm $(docker volume ls -q) diff --git a/concourse/tasks/npm-publish.yml b/concourse/tasks/npm-publish.yml index 7049dc36..d6b7c80e 100644 --- a/concourse/tasks/npm-publish.yml +++ b/concourse/tasks/npm-publish.yml @@ -9,12 +9,13 @@ image_resource: params: NPM_TOKEN: + NPM_TAG: inputs: - - name: fauna-js-repository + - name: repo.git outputs: - name: slack-message run: - path: ./fauna-js-repository/concourse/scripts/publish.sh + path: ./repo.git/concourse/scripts/publish.sh diff --git a/concourse/tasks/query-limits-tests.yml b/concourse/tasks/query-limits-tests.yml index 1e426a9d..03691e97 100644 --- a/concourse/tasks/query-limits-tests.yml +++ b/concourse/tasks/query-limits-tests.yml @@ -14,7 +14,7 @@ params: QUERY_LIMITS_COLL: inputs: - - name: fauna-js-repository + - name: repo.git - name: testtools-repo run: @@ -26,7 +26,7 @@ run: # setup Fauna container docker-compose -f testtools-repo/fauna-driver-query-limits-tests/docker-compose.yml run setup # run tests - docker-compose -f fauna-js-repository/concourse/scripts/docker-compose-fauna-limits.yml run query-limits-tests + docker-compose -f repo.git/concourse/scripts/docker-compose-fauna-limits.yml run query-limits-tests # stop and remove containers - docker-compose -f fauna-js-repository/concourse/scripts/docker-compose-fauna-limits.yml down + docker-compose -f repo.git/concourse/scripts/docker-compose-fauna-limits.yml down docker-compose -f testtools-repo/fauna-driver-query-limits-tests/docker-compose.yml down From e8f042a9dfc34bcc7f777fa0b4dc38b7d37b31ac Mon Sep 17 00:00:00 2001 From: Paul Paterson Date: Thu, 8 Feb 2024 14:34:18 -0500 Subject: [PATCH 02/36] Initial implementation with fetch --- src/client-configuration.ts | 18 +++++ src/client.ts | 133 +++++++++++++++++++++++++++++++- src/http-client/fetch-client.ts | 93 +++++++++++++++++++++- src/http-client/http-client.ts | 63 +++++++++++++++ src/http-client/index.ts | 15 ++-- src/tagged-type.ts | 8 ++ src/values/index.ts | 1 + src/values/stream.ts | 26 +++++++ src/wire-protocol.ts | 25 +++++- 9 files changed, 365 insertions(+), 17 deletions(-) create mode 100644 src/values/stream.ts diff --git a/src/client-configuration.ts b/src/client-configuration.ts index 99bbe111..0a00b404 100644 --- a/src/client-configuration.ts +++ b/src/client-configuration.ts @@ -155,6 +155,24 @@ export interface Endpoints { [key: string]: URL; } +/** + * Configuration for a streaming client. This typically comes from the `Client` + * instance configuration. + */ +export type StreamClientConfiguration = { + /** + * A secret for your Fauna DB, used to authorize your queries. + * @see https://docs.fauna.com/fauna/current/security/keys + */ + secret: string; + + /** + * Controls what Javascript type to deserialize {@link https://fqlx-beta--fauna-docs.netlify.app/fqlx/beta/reference/language/types#long | Fauna longs} to. + * @see {@link ClientConfiguration.long_type} + */ + long_type: "number" | "bigint"; +}; + /** * A extensible set of endpoints for calling Fauna. * @remarks Most clients will will not need to extend this set. diff --git a/src/client.ts b/src/client.ts index 2f5f85d8..20ee6039 100644 --- a/src/client.ts +++ b/src/client.ts @@ -1,4 +1,8 @@ -import { ClientConfiguration, endpoints } from "./client-configuration"; +import { + ClientConfiguration, + StreamClientConfiguration, + endpoints, +} from "./client-configuration"; import { AuthenticationError, AuthorizationError, @@ -18,18 +22,23 @@ import { InvalidRequestError, } from "./errors"; import { + HTTPStreamClient, + StreamAdapter, getDefaultHTTPClient, + implementsStreamClient, isHTTPResponse, type HTTPClient, } from "./http-client"; import { Query } from "./query-builder"; import { TaggedTypeFormat } from "./tagged-type"; import { getDriverEnv } from "./util/environment"; -import { EmbeddedSet, Page, SetIterator } from "./values"; +import { EmbeddedSet, Page, SetIterator, StreamToken } from "./values"; import { isQueryFailure, isQuerySuccess, QueryInterpolation, + StreamEvent, + StreamEventType, type QueryFailure, type QueryOptions, type QuerySuccess, @@ -80,7 +89,7 @@ export class Client { /** The {@link ClientConfiguration} */ readonly #clientConfiguration: RequiredClientConfig; /** The underlying {@link HTTPClient} client. */ - readonly #httpClient: HTTPClient; + readonly #httpClient: HTTPClient & Partial; /** The last transaction timestamp this client has seen */ #lastTxnTs?: number; /** true if this client is closed false otherwise */ @@ -264,6 +273,28 @@ export class Client { return this.#queryWithRetries(queryInterpolation, options); } + /** + * Initialize a streaming request to Fauna + * @param query - A string-encoded streaming token, or a {@link Query} + */ + // TODO: implement options + // TODO: implement providing Query + stream(query: StreamToken): StreamClient { + if (this.#isClosed) { + throw new ClientClosedError( + "Your client is closed. No further requests can be issued." + ); + } + + const streamClient = this.#httpClient; + + if (implementsStreamClient(streamClient)) { + return new StreamClient(query, this.#clientConfiguration, streamClient); + } else { + throw new ClientError("Streaming is not supported by this client."); + } + } + async #queryWithRetries( queryInterpolation: string | QueryInterpolation, options?: QueryOptions, @@ -569,6 +600,102 @@ in an environmental variable named FAUNA_SECRET or pass it to the Client\ } } +export type StreamEventHandler = (event: StreamEvent) => void; + +export class StreamClient { + #callbacks: Record = { + start: [], + add: [], + remove: [], + update: [], + error: [], + }; + #query: StreamToken; + #clientConfiguration: Record; + #httpStreamClient: HTTPStreamClient; + #streamAdapter: StreamAdapter | null = null; + + // TODO: make clientConfiguration and httpStreamClient optional + constructor( + query: StreamToken, + clientConfiguration: StreamClientConfiguration, + httpStreamClient: HTTPStreamClient + ) { + this.#query = query; + this.#clientConfiguration = clientConfiguration; + this.#httpStreamClient = httpStreamClient; + } + + on(type: StreamEventType, callback: StreamEventHandler) { + this.#callbacks[type].push(callback); + return this; + } + + start() { + const headers = { + Authorization: `Bearer ${this.#clientConfiguration.secret}`, + }; + + const streamAdapter = this.#httpStreamClient.stream({ + data: this.#query.token, + headers, + method: "POST", + }); + + this.#streamAdapter = streamAdapter; + + const handleEvents = async () => { + for await (const event of streamAdapter.read) { + // stream events are always tagged + const deserializedEvent: StreamEvent = TaggedTypeFormat.decode(event, { + long_type: this.#clientConfiguration.long_type, + }); + this.#callbacks[deserializedEvent.type].forEach((callback) => + callback(deserializedEvent) + ); + } + }; + + handleEvents().catch((error) => { + throw new NetworkError("Error reading from stream", { cause: error }); + }); + } + + async *iter(): AsyncGenerator { + const headers = { + Authorization: `Bearer ${this.#clientConfiguration.secret}`, + }; + + const streamAdapter = this.#httpStreamClient.stream({ + data: this.#query.token, + headers, + method: "POST", + }); + + this.#streamAdapter = streamAdapter; + + for await (const event of streamAdapter.read) { + // stream events are always tagged + const deserializedEvent: StreamEvent = TaggedTypeFormat.decode(event, { + long_type: this.#clientConfiguration.long_type, + }); + + // TODO: handle callbacks when using the async generator? + // this.#callbacks[deserializedEvent.type].forEach((callback) => + // callback(deserializedEvent) + // ); + + yield deserializedEvent; + } + } + + close() { + if (this.#streamAdapter) { + this.#streamAdapter.close(); + } + } +} + // Private types and constants for internal logic. const QUERY_CHECK_FAILURE_CODES = [ diff --git a/src/http-client/fetch-client.ts b/src/http-client/fetch-client.ts index b4c1b721..7f1b3cda 100644 --- a/src/http-client/fetch-client.ts +++ b/src/http-client/fetch-client.ts @@ -7,17 +7,22 @@ import { HTTPClientOptions, HTTPRequest, HTTPResponse, + HTTPStreamRequest, + HTTPStreamClient, + StreamAdapter, } from "./http-client"; /** * An implementation for {@link HTTPClient} that uses the native fetch API */ -export class FetchClient implements HTTPClient { - #url: string; +export class FetchClient implements HTTPClient, HTTPStreamClient { + #queryURL: string; + #streamURL: string; #keepalive: boolean; constructor({ url, fetch_keepalive }: HTTPClientOptions) { - this.#url = new URL("/query/1", url).toString(); + this.#queryURL = new URL("/query/1", url).toString(); + this.#streamURL = new URL("/stream/1", url).toString(); this.#keepalive = fetch_keepalive; } @@ -38,7 +43,7 @@ export class FetchClient implements HTTPClient { })() : AbortSignal.timeout(client_timeout_ms); - const response = await fetch(this.#url, { + const response = await fetch(this.#queryURL, { method, headers: { ...requestHeaders, "Content-Type": "application/json" }, body: JSON.stringify(data), @@ -64,8 +69,88 @@ export class FetchClient implements HTTPClient { }; } + /** {@inheritDoc HTTPStreamClient.stream} */ + stream({ + data, + headers: requestHeaders, + method, + }: HTTPStreamRequest): StreamAdapter { + const request = new Request(this.#streamURL, { + method, + headers: { ...requestHeaders, "Content-Type": "application/json" }, + body: data, + keepalive: this.#keepalive, + }); + + const abortController = new AbortController(); + + const options = { + signal: abortController.signal, + }; + + async function* reader() { + const response = await fetch(request, options); + const body = response.body; + if (!body) { + throw new Error("Response body is undefined."); + } + const reader = body.getReader(); + + for await (const line of readLines(reader)) { + yield line; + } + } + + return { + read: reader(), + close: () => { + abortController.abort(); + }, + }; + } + /** {@inheritDoc HTTPClient.close} */ close() { // no actions at this time } } + +/** + * Get individual lines from the stream + * + * The stream may be broken into arbitrary chunks, but the events are delimited by a newline character. + * + * @param reader - The stream reader + */ +async function* readLines(reader: ReadableStreamDefaultReader) { + const textDecoder = new TextDecoder(); + let partOfLine = ""; + for await (const chunk of readChunks(reader)) { + const chunkText = textDecoder.decode(chunk); + const chunkLines = (partOfLine + chunkText).split("\n"); + + // Yield all complete lines + for (let i = 0; i < chunkLines.length - 1; i++) { + yield chunkLines[i].trim(); + } + + // Store the partial line + partOfLine = chunkLines[chunkLines.length - 1]; + } + + // Yield the remaining partial line if any + if (partOfLine.trim() !== "") { + yield partOfLine; + } +} + +async function* readChunks(reader: ReadableStreamDefaultReader) { + let done = false; + do { + const readResult = await reader.read(); + if (readResult.value !== undefined) { + yield readResult.value; + } + done = readResult.done; + } while (!done); +} diff --git a/src/http-client/http-client.ts b/src/http-client/http-client.ts index c36711f6..f8e1533f 100644 --- a/src/http-client/http-client.ts +++ b/src/http-client/http-client.ts @@ -1,5 +1,6 @@ // eslint-disable-next-line @typescript-eslint/no-unused-vars import type { Client } from "../client"; +// import type { Stream } from "../stream"; import { QueryRequest } from "../wire-protocol"; /** @@ -65,3 +66,65 @@ export interface HTTPClient { */ close(): void; } + +/** + * An object representing an http request. + * The {@link Client} provides this to the {@link HTTPStreamClient} implementation. + */ +export type HTTPStreamRequest = { + /** The encoded Fauna query to send */ + data: string; // | QueryRequest; + + /** Headers in object format */ + headers: Record; + + /** HTTP method to use */ + method: "POST"; +}; + +// export interface Stream { +// /** +// * Register an event handler to execute for each event with specified type. +// * @param type - The event type to listen to {@link StreamEventType} +// * @param callback - The event handler to call each time an event is emitted +// * @returns +// */ +// on: (type: StreamEventType, callback: StreamEventHandler) => Stream; + +// /** +// * Start the stream. +// */ +// // TODO: return `AsyncGenerator`? +// start: () => void; + +// /** +// * Close the stream. +// */ +// close: () => void; +// } + +export interface StreamAdapter { + read: AsyncGenerator; + close: () => void; +} + +/** + * An interface to provide implementation-specific, asyncronous http calls. + * This driver provides default implementations for common environments. Users + * can configure the {@link Client} to use custom implementations if desired. + */ +export interface HTTPStreamClient { + /** + * Makes an HTTP request and returns the response + * @param req - an {@link HTTPStreamRequest} + * @returns A Promise<{@link HTTPResponse}> + * @throws {@link NetworkError} on request timeout or other network issue. + */ + stream(req: HTTPStreamRequest): StreamAdapter; +} + +export const implementsStreamClient = ( + client: Partial +): client is HTTPStreamClient => { + return "stream" in client && typeof client.stream === "function"; +}; diff --git a/src/http-client/index.ts b/src/http-client/index.ts index 4093d3ca..1ed20eab 100644 --- a/src/http-client/index.ts +++ b/src/http-client/index.ts @@ -1,11 +1,10 @@ import { FetchClient } from "./fetch-client"; +import { HTTPClient, HTTPClientOptions, HTTPResponse } from "./http-client"; import { NodeHTTP2Client } from "./node-http2-client"; -import { - HTTPClient, - HTTPClientOptions, - HTTPRequest, - HTTPResponse, -} from "./http-client"; + +export * from "./fetch-client"; +export * from "./http-client"; +export * from "./node-http2-client"; export const getDefaultHTTPClient = (options: HTTPClientOptions): HTTPClient => nodeHttp2IsSupported() @@ -15,7 +14,7 @@ export const getDefaultHTTPClient = (options: HTTPClientOptions): HTTPClient => export const isHTTPResponse = (res: any): res is HTTPResponse => res instanceof Object && "body" in res && "headers" in res && "status" in res; -const nodeHttp2IsSupported = () => { +export const nodeHttp2IsSupported = () => { if ( typeof process !== "undefined" && process && @@ -30,5 +29,3 @@ const nodeHttp2IsSupported = () => { } return false; }; - -export { FetchClient, NodeHTTP2Client, HTTPClient, HTTPRequest, HTTPResponse }; diff --git a/src/tagged-type.ts b/src/tagged-type.ts index c7f29a87..50c919a1 100644 --- a/src/tagged-type.ts +++ b/src/tagged-type.ts @@ -10,6 +10,7 @@ import { Page, NullDocument, EmbeddedSet, + StreamToken, } from "./values"; import { QueryValueObject, QueryValue } from "./wire-protocol"; @@ -95,6 +96,8 @@ Returning as Number with loss of precision. Use long_type 'bigint' instead.`); return TimeStub.from(value["@time"]); } else if (value["@object"]) { return value["@object"]; + } else if (value["@stream"]) { + return new StreamToken(value["@stream"]); } return value; @@ -209,6 +212,9 @@ const encodeMap = { // "@set": { data: encodeMap["array"](value.data), after: value.after }, // }; }, + // TODO: encode as a tagged value if provided as a query arg? + // streamToken: (value: StreamToken): TaggedStreamToken => ({ "@stream": value.token }), + streamToken: (value: StreamToken): string => value.token, }; const encode = (input: QueryValue): QueryValue => { @@ -253,6 +259,8 @@ const encode = (input: QueryValue): QueryValue => { return encodeMap["set"](input); } else if (input instanceof EmbeddedSet) { return encodeMap["set"](input); + } else if (input instanceof StreamToken) { + return encodeMap["streamToken"](input); } else { return encodeMap["object"](input); } diff --git a/src/values/index.ts b/src/values/index.ts index d31b16c4..fd384678 100644 --- a/src/values/index.ts +++ b/src/values/index.ts @@ -1,3 +1,4 @@ export * from "./date-time"; export * from "./doc"; export * from "./set"; +export * from "./stream"; diff --git a/src/values/stream.ts b/src/values/stream.ts new file mode 100644 index 00000000..544dc68d --- /dev/null +++ b/src/values/stream.ts @@ -0,0 +1,26 @@ +/** + * A token used to initiate a Fauna stream at a particular snapshot in time. + * + * The example below shows how to request a stream token from Fauna and use it + * to establish an event steam. + * + * @example + * ```javascript + * const response = await client.query(fql` + * Messages.byRecipient(User.byId("1234")) + * `); + * const token = response.data; + * + * const stream = client.stream(token) + * .on("add", (event) => console.log("New message", event)) + * + * stream.start(); + * ``` + */ +export class StreamToken { + readonly token: string; + + constructor(token: string) { + this.token = token; + } +} diff --git a/src/wire-protocol.ts b/src/wire-protocol.ts index 8682b1f7..2d141c38 100644 --- a/src/wire-protocol.ts +++ b/src/wire-protocol.ts @@ -10,6 +10,7 @@ import { NamedDocumentReference, NullDocument, Page, + StreamToken, TimeStub, } from "./values"; @@ -316,4 +317,26 @@ export type QueryValue = | NamedDocumentReference | NullDocument | Page - | EmbeddedSet; + | EmbeddedSet + | StreamToken; + +export type StreamEventType = "start" | "add" | "remove" | "update" | "error"; +export type StreamEventStart = { + type: "start"; + ts: TimeStub; +}; +export type StreamEventData = { + type: "add" | "remove" | "update"; + ts: TimeStub; + // TODO: Different type for StreamStats? + stats: QueryStats; + data: QueryValue; +}; +export type StreamEventError = { + type: "error"; + code: string; + message: string; + // TODO: Different type for StreamStats? + stats: QueryStats; +}; +export type StreamEvent = StreamEventStart | StreamEventData | StreamEventError; From f648682499c387dfb5ef6c338cb1d14363b315c4 Mon Sep 17 00:00:00 2001 From: Paul Paterson Date: Thu, 8 Feb 2024 16:12:27 -0500 Subject: [PATCH 03/36] remove old commented code --- src/http-client/http-client.ts | 21 --------------------- 1 file changed, 21 deletions(-) diff --git a/src/http-client/http-client.ts b/src/http-client/http-client.ts index f8e1533f..74d9281b 100644 --- a/src/http-client/http-client.ts +++ b/src/http-client/http-client.ts @@ -82,27 +82,6 @@ export type HTTPStreamRequest = { method: "POST"; }; -// export interface Stream { -// /** -// * Register an event handler to execute for each event with specified type. -// * @param type - The event type to listen to {@link StreamEventType} -// * @param callback - The event handler to call each time an event is emitted -// * @returns -// */ -// on: (type: StreamEventType, callback: StreamEventHandler) => Stream; - -// /** -// * Start the stream. -// */ -// // TODO: return `AsyncGenerator`? -// start: () => void; - -// /** -// * Close the stream. -// */ -// close: () => void; -// } - export interface StreamAdapter { read: AsyncGenerator; close: () => void; From d7b0b7217cc585458156e531c9d56fbe7a765e69 Mon Sep 17 00:00:00 2001 From: Paul Paterson Date: Thu, 22 Feb 2024 10:14:52 -0500 Subject: [PATCH 04/36] Update stream request format --- src/client.ts | 4 ++-- src/http-client/fetch-client.ts | 2 +- src/http-client/http-client.ts | 5 +++-- src/wire-protocol.ts | 5 +++++ 4 files changed, 11 insertions(+), 5 deletions(-) diff --git a/src/client.ts b/src/client.ts index 20ee6039..1f753d00 100644 --- a/src/client.ts +++ b/src/client.ts @@ -637,7 +637,7 @@ export class StreamClient { }; const streamAdapter = this.#httpStreamClient.stream({ - data: this.#query.token, + data: { token: this.#query.token }, headers, method: "POST", }); @@ -667,7 +667,7 @@ export class StreamClient { }; const streamAdapter = this.#httpStreamClient.stream({ - data: this.#query.token, + data: { token: this.#query.token }, headers, method: "POST", }); diff --git a/src/http-client/fetch-client.ts b/src/http-client/fetch-client.ts index 7f1b3cda..cdabbb20 100644 --- a/src/http-client/fetch-client.ts +++ b/src/http-client/fetch-client.ts @@ -78,7 +78,7 @@ export class FetchClient implements HTTPClient, HTTPStreamClient { const request = new Request(this.#streamURL, { method, headers: { ...requestHeaders, "Content-Type": "application/json" }, - body: data, + body: JSON.stringify(data), keepalive: this.#keepalive, }); diff --git a/src/http-client/http-client.ts b/src/http-client/http-client.ts index 74d9281b..b93ce170 100644 --- a/src/http-client/http-client.ts +++ b/src/http-client/http-client.ts @@ -1,7 +1,7 @@ // eslint-disable-next-line @typescript-eslint/no-unused-vars import type { Client } from "../client"; // import type { Stream } from "../stream"; -import { QueryRequest } from "../wire-protocol"; +import { QueryRequest, StreamRequest } from "../wire-protocol"; /** * An object representing an http request. @@ -73,7 +73,8 @@ export interface HTTPClient { */ export type HTTPStreamRequest = { /** The encoded Fauna query to send */ - data: string; // | QueryRequest; + // TODO: Allow type to be a QueryRequest once implemented by the db + data: StreamRequest; /** Headers in object format */ headers: Record; diff --git a/src/wire-protocol.ts b/src/wire-protocol.ts index 2d141c38..f30eda10 100644 --- a/src/wire-protocol.ts +++ b/src/wire-protocol.ts @@ -320,6 +320,11 @@ export type QueryValue = | EmbeddedSet | StreamToken; +export type StreamRequest = { + token: string; + start_ts?: number; +}; + export type StreamEventType = "start" | "add" | "remove" | "update" | "error"; export type StreamEventStart = { type: "start"; From 08101c19d2cf0e9b260883307e97a09007c901c3 Mon Sep 17 00:00:00 2001 From: Paul Paterson Date: Thu, 22 Feb 2024 10:22:52 -0500 Subject: [PATCH 05/36] Guard against event types with no registered callbacks --- src/client.ts | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/client.ts b/src/client.ts index 1f753d00..e98cf073 100644 --- a/src/client.ts +++ b/src/client.ts @@ -650,9 +650,12 @@ export class StreamClient { const deserializedEvent: StreamEvent = TaggedTypeFormat.decode(event, { long_type: this.#clientConfiguration.long_type, }); - this.#callbacks[deserializedEvent.type].forEach((callback) => - callback(deserializedEvent) - ); + const callbacks = this.#callbacks[deserializedEvent.type]; + if (callbacks) { + this.#callbacks[deserializedEvent.type].forEach((callback) => + callback(deserializedEvent) + ); + } } }; From bd1f3f8209aee2ede667aa48f3e071e79f206c8e Mon Sep 17 00:00:00 2001 From: Paul Paterson Date: Thu, 22 Feb 2024 11:10:57 -0500 Subject: [PATCH 06/36] Accept a Query as an argument for Client.stream --- src/client.ts | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/src/client.ts b/src/client.ts index e98cf073..9c1f141e 100644 --- a/src/client.ts +++ b/src/client.ts @@ -29,7 +29,7 @@ import { isHTTPResponse, type HTTPClient, } from "./http-client"; -import { Query } from "./query-builder"; +import { Query, fql } from "./query-builder"; import { TaggedTypeFormat } from "./tagged-type"; import { getDriverEnv } from "./util/environment"; import { EmbeddedSet, Page, SetIterator, StreamToken } from "./values"; @@ -278,18 +278,32 @@ export class Client { * @param query - A string-encoded streaming token, or a {@link Query} */ // TODO: implement options - // TODO: implement providing Query - stream(query: StreamToken): StreamClient { + async stream(query: Query | StreamToken): Promise { if (this.#isClosed) { throw new ClientClosedError( "Your client is closed. No further requests can be issued." ); } + let streamToken: StreamToken | null = null; + if (query instanceof Query) { + const toStreamQuery = fql`${query}.toStream()`; + + streamToken = (await this.query(toStreamQuery).then( + (res) => res.data + )) as StreamToken; + } else { + streamToken = query; + } + const streamClient = this.#httpClient; if (implementsStreamClient(streamClient)) { - return new StreamClient(query, this.#clientConfiguration, streamClient); + return new StreamClient( + streamToken, + this.#clientConfiguration, + streamClient + ); } else { throw new ClientError("Streaming is not supported by this client."); } From 6968226f0a55703c8ac863dd6df0abdf43796327 Mon Sep 17 00:00:00 2001 From: Paul Paterson Date: Thu, 22 Feb 2024 11:17:12 -0500 Subject: [PATCH 07/36] Convert StreamClient to be an AsyncIterator itself --- src/client.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/client.ts b/src/client.ts index 9c1f141e..0964d3d6 100644 --- a/src/client.ts +++ b/src/client.ts @@ -678,7 +678,7 @@ export class StreamClient { }); } - async *iter(): AsyncGenerator { + async *[Symbol.asyncIterator](): AsyncGenerator { const headers = { Authorization: `Bearer ${this.#clientConfiguration.secret}`, }; From 775a6c7e80c34319fa0161ec863800d4ca341777 Mon Sep 17 00:00:00 2001 From: Paul Paterson Date: Fri, 23 Feb 2024 09:46:25 -0500 Subject: [PATCH 08/36] Require user to return token when Query provided --- src/client.ts | 27 ++++++++++++++++----------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/src/client.ts b/src/client.ts index 0964d3d6..3d28838c 100644 --- a/src/client.ts +++ b/src/client.ts @@ -285,20 +285,25 @@ export class Client { ); } - let streamToken: StreamToken | null = null; - if (query instanceof Query) { - const toStreamQuery = fql`${query}.toStream()`; - - streamToken = (await this.query(toStreamQuery).then( - (res) => res.data - )) as StreamToken; - } else { - streamToken = query; - } - const streamClient = this.#httpClient; if (implementsStreamClient(streamClient)) { + let streamToken: StreamToken | null = null; + if (query instanceof Query) { + streamToken = await this.query(query).then( + (res) => res.data + ); + + if (!(streamToken instanceof StreamToken)) { + throw new ClientError( + `Error requesting a stream token. Expected a StreamToken as the query result, but received ${typeof streamToken}. Your query must return the result of '.toStream' or '.changesOn')\n` + + `Query result: ${JSON.stringify(streamToken, null)}` + ); + } + } else { + streamToken = query; + } + return new StreamClient( streamToken, this.#clientConfiguration, From ed3c88c1e27d416416874358a7276121596ad95d Mon Sep 17 00:00:00 2001 From: Paul Paterson Date: Fri, 23 Feb 2024 09:50:53 -0500 Subject: [PATCH 09/36] Remove commented import --- src/http-client/http-client.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/src/http-client/http-client.ts b/src/http-client/http-client.ts index b93ce170..28ef6427 100644 --- a/src/http-client/http-client.ts +++ b/src/http-client/http-client.ts @@ -1,6 +1,5 @@ // eslint-disable-next-line @typescript-eslint/no-unused-vars import type { Client } from "../client"; -// import type { Stream } from "../stream"; import { QueryRequest, StreamRequest } from "../wire-protocol"; /** From a3aaf93eb6e09ea830c98f5ba45d1d698d19c04e Mon Sep 17 00:00:00 2001 From: Paul Paterson Date: Fri, 23 Feb 2024 09:51:54 -0500 Subject: [PATCH 10/36] remove unused import --- src/client.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/client.ts b/src/client.ts index 3d28838c..b8310bbe 100644 --- a/src/client.ts +++ b/src/client.ts @@ -29,7 +29,7 @@ import { isHTTPResponse, type HTTPClient, } from "./http-client"; -import { Query, fql } from "./query-builder"; +import { Query } from "./query-builder"; import { TaggedTypeFormat } from "./tagged-type"; import { getDriverEnv } from "./util/environment"; import { EmbeddedSet, Page, SetIterator, StreamToken } from "./values"; From 4a0a31996c68d42231293d58e90e8b3d83a94ea7 Mon Sep 17 00:00:00 2001 From: Paul Paterson Date: Fri, 23 Feb 2024 11:02:17 -0500 Subject: [PATCH 11/36] Provide custom abort message on stream.close() --- src/http-client/fetch-client.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/http-client/fetch-client.ts b/src/http-client/fetch-client.ts index cdabbb20..5ecb34e8 100644 --- a/src/http-client/fetch-client.ts +++ b/src/http-client/fetch-client.ts @@ -104,7 +104,7 @@ export class FetchClient implements HTTPClient, HTTPStreamClient { return { read: reader(), close: () => { - abortController.abort(); + abortController.abort("Stream closed by the client."); }, }; } From 4e4484dcd270b4ae4b435131789481bd509f37e9 Mon Sep 17 00:00:00 2001 From: Paul Paterson Date: Fri, 23 Feb 2024 12:53:52 -0500 Subject: [PATCH 12/36] Refactor so Client.stream is sync again --- src/client.ts | 92 +++++++++++++++++++++++++++------------------------ 1 file changed, 49 insertions(+), 43 deletions(-) diff --git a/src/client.ts b/src/client.ts index b8310bbe..ffe5a7a7 100644 --- a/src/client.ts +++ b/src/client.ts @@ -278,7 +278,7 @@ export class Client { * @param query - A string-encoded streaming token, or a {@link Query} */ // TODO: implement options - async stream(query: Query | StreamToken): Promise { + stream(query: Query | StreamToken): StreamClient { if (this.#isClosed) { throw new ClientClosedError( "Your client is closed. No further requests can be issued." @@ -288,24 +288,23 @@ export class Client { const streamClient = this.#httpClient; if (implementsStreamClient(streamClient)) { - let streamToken: StreamToken | null = null; - if (query instanceof Query) { - streamToken = await this.query(query).then( - (res) => res.data - ); - - if (!(streamToken instanceof StreamToken)) { - throw new ClientError( - `Error requesting a stream token. Expected a StreamToken as the query result, but received ${typeof streamToken}. Your query must return the result of '.toStream' or '.changesOn')\n` + - `Query result: ${JSON.stringify(streamToken, null)}` - ); - } - } else { - streamToken = query; - } + const getStreamToken: () => Promise = + query instanceof Query + ? () => + this.query(query).then((res) => { + const maybeStreamToken = res.data; + if (!(maybeStreamToken instanceof StreamToken)) { + throw new ClientError( + `Error requesting a stream token. Expected a StreamToken as the query result, but received ${typeof maybeStreamToken}. Your query must return the result of '.toStream' or '.changesOn')\n` + + `Query result: ${JSON.stringify(maybeStreamToken, null)}` + ); + } + return maybeStreamToken; + }) + : () => Promise.resolve(query as StreamToken); return new StreamClient( - streamToken, + getStreamToken, this.#clientConfiguration, streamClient ); @@ -629,14 +628,14 @@ export class StreamClient { update: [], error: [], }; - #query: StreamToken; + #query: () => Promise; #clientConfiguration: Record; #httpStreamClient: HTTPStreamClient; #streamAdapter: StreamAdapter | null = null; // TODO: make clientConfiguration and httpStreamClient optional constructor( - query: StreamToken, + query: () => Promise, clientConfiguration: StreamClientConfiguration, httpStreamClient: HTTPStreamClient ) { @@ -651,45 +650,52 @@ export class StreamClient { } start() { - const headers = { - Authorization: `Bearer ${this.#clientConfiguration.secret}`, - }; + this.#query().then((streamToken) => { + const headers = { + Authorization: `Bearer ${this.#clientConfiguration.secret}`, + }; - const streamAdapter = this.#httpStreamClient.stream({ - data: { token: this.#query.token }, - headers, - method: "POST", - }); + const streamAdapter = this.#httpStreamClient.stream({ + data: { token: streamToken.token }, + headers, + method: "POST", + }); - this.#streamAdapter = streamAdapter; + this.#streamAdapter = streamAdapter; - const handleEvents = async () => { - for await (const event of streamAdapter.read) { - // stream events are always tagged - const deserializedEvent: StreamEvent = TaggedTypeFormat.decode(event, { - long_type: this.#clientConfiguration.long_type, - }); - const callbacks = this.#callbacks[deserializedEvent.type]; - if (callbacks) { - this.#callbacks[deserializedEvent.type].forEach((callback) => - callback(deserializedEvent) + const handleEvents = async () => { + for await (const event of streamAdapter.read) { + // stream events are always tagged + const deserializedEvent: StreamEvent = TaggedTypeFormat.decode( + event, + { + long_type: this.#clientConfiguration.long_type, + } ); + const callbacks = this.#callbacks[deserializedEvent.type]; + if (callbacks) { + this.#callbacks[deserializedEvent.type].forEach((callback) => + callback(deserializedEvent) + ); + } } - } - }; + }; - handleEvents().catch((error) => { - throw new NetworkError("Error reading from stream", { cause: error }); + handleEvents().catch((error) => { + throw new NetworkError("Error reading from stream", { cause: error }); + }); }); } async *[Symbol.asyncIterator](): AsyncGenerator { + const streamToken = await this.#query(); + const headers = { Authorization: `Bearer ${this.#clientConfiguration.secret}`, }; const streamAdapter = this.#httpStreamClient.stream({ - data: { token: this.#query.token }, + data: { token: streamToken.token }, headers, method: "POST", }); From c59ea03b8d27bac912c5ba3876f18dfbf8ea72e4 Mon Sep 17 00:00:00 2001 From: Paul Paterson Date: Wed, 28 Feb 2024 16:35:27 -0500 Subject: [PATCH 13/36] Add a StreamError class --- src/errors.ts | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/src/errors.ts b/src/errors.ts index c6d72fef..06564c40 100644 --- a/src/errors.ts +++ b/src/errors.ts @@ -2,7 +2,9 @@ import type { ConstraintFailure, QueryFailure, QueryInfo, + QueryStats, QueryValue, + StreamEventError, } from "./wire-protocol"; /** @@ -312,3 +314,35 @@ export class ProtocolError extends FaunaError { this.httpStatus = error.httpStatus; } } + +/** + * An error representing a failure in a stream + */ +export class StreamError extends FaunaError { + /** + * Error code + */ + readonly code: string; + + /** + * Details about the query sent along with the response + */ + readonly stats: QueryStats; + + constructor(message: string, code: string, stats: QueryStats) { + super(message); + + // Maintains proper stack trace for where our error was thrown (only available on V8) + if (Error.captureStackTrace) { + Error.captureStackTrace(this, StreamError); + } + + this.name = "StreamError"; + this.code = code; + this.stats = stats; + } + + static fromStreamEventError(event: StreamEventError) { + return new StreamError(event.message, event.code, event.stats); + } +} From bcc307cb998176c534d84380f745aa7e93a566d7 Mon Sep 17 00:00:00 2001 From: Paul Paterson Date: Wed, 28 Feb 2024 16:36:21 -0500 Subject: [PATCH 14/36] Refactor StreamClient and provide retries --- src/client.ts | 135 ++++++++++++++++++++++++++++--------------- src/wire-protocol.ts | 4 +- 2 files changed, 89 insertions(+), 50 deletions(-) diff --git a/src/client.ts b/src/client.ts index ffe5a7a7..2870dda5 100644 --- a/src/client.ts +++ b/src/client.ts @@ -20,6 +20,7 @@ import { ThrottlingError, ContendedTransactionError, InvalidRequestError, + StreamError, } from "./errors"; import { HTTPStreamClient, @@ -326,16 +327,17 @@ export class Client { const backoffMs = Math.min(Math.random() * 2 ** attempt, maxBackoff) * 1_000; - const wait = (ms: number) => new Promise((r) => setTimeout(r, ms)); attempt += 1; - return this.#query(queryInterpolation, options, attempt).catch((e) => { - if (e instanceof ThrottlingError && attempt < maxAttempts) { - return wait(backoffMs).then(() => - this.#queryWithRetries(queryInterpolation, options, attempt) - ); + + try { + return await this.#query(queryInterpolation, options, attempt); + } catch (error) { + if (error instanceof ThrottlingError && attempt < maxAttempts) { + await wait(backoffMs); + return this.#queryWithRetries(queryInterpolation, options, attempt); } - throw e; - }); + throw error; + } } #getError(e: any): ClientError | NetworkError | ProtocolError | ServiceError { @@ -621,6 +623,7 @@ in an environmental variable named FAUNA_SECRET or pass it to the Client\ export type StreamEventHandler = (event: StreamEvent) => void; export class StreamClient { + closed = false; #callbacks: Record = { start: [], add: [], @@ -631,7 +634,10 @@ export class StreamClient { #query: () => Promise; #clientConfiguration: Record; #httpStreamClient: HTTPStreamClient; - #streamAdapter: StreamAdapter | null = null; + #connectionAttempts = 0; + #streamAdapter?: StreamAdapter; + #streamToken?: StreamToken; + #last_ts?: number; // TODO: make clientConfiguration and httpStreamClient optional constructor( @@ -650,52 +656,78 @@ export class StreamClient { } start() { - this.#query().then((streamToken) => { - const headers = { - Authorization: `Bearer ${this.#clientConfiguration.secret}`, - }; + const run = async () => { + for await (const event of this) { + const callbacks = this.#callbacks[event.type]; + if (callbacks) { + this.#callbacks[event.type].forEach((callback) => callback(event)); + } + } + }; + run(); + } - const streamAdapter = this.#httpStreamClient.stream({ - data: { token: streamToken.token }, - headers, - method: "POST", - }); + async *[Symbol.asyncIterator](): AsyncGenerator { + if (this.closed) { + throw new ClientError("The stream has been closed and cannot be reused."); + } - this.#streamAdapter = streamAdapter; + if (!this.#streamToken) { + // TODO: Should we retry any errors when trying to get the token? + this.#streamToken = await this.#query(); + } - const handleEvents = async () => { - for await (const event of streamAdapter.read) { - // stream events are always tagged - const deserializedEvent: StreamEvent = TaggedTypeFormat.decode( - event, - { - long_type: this.#clientConfiguration.long_type, - } - ); - const callbacks = this.#callbacks[deserializedEvent.type]; - if (callbacks) { - this.#callbacks[deserializedEvent.type].forEach((callback) => - callback(deserializedEvent) - ); - } + // TODO: make these constants configurable + const STREAM_RECONNECT_MAX_ATTEMPTS = 60; + const STREAM_RECONNECT_MAX_BACKOFF = 10; + + while (!this.closed) { + const backoffMs = + Math.min( + Math.random() * 2 ** this.#connectionAttempts, + STREAM_RECONNECT_MAX_BACKOFF + ) * 1_000; + + this.#connectionAttempts = 1; + try { + for await (const event of this.#startStream(this.#last_ts)) { + yield event; + } + } catch (error) { + console.error("Error in stream", error); + if ( + error instanceof StreamError || + this.#connectionAttempts >= STREAM_RECONNECT_MAX_ATTEMPTS + ) { + this.close(); + throw error; } - }; - handleEvents().catch((error) => { - throw new NetworkError("Error reading from stream", { cause: error }); - }); - }); + console.log("retrying stream..."); + this.#connectionAttempts += 1; + await wait(backoffMs); + } + } } - async *[Symbol.asyncIterator](): AsyncGenerator { - const streamToken = await this.#query(); + close() { + if (this.#streamAdapter) { + this.#streamAdapter.close(); + this.#streamAdapter = undefined; + } + this.closed = true; + } + + async *#startStream(start_ts?: number): AsyncGenerator { + // Safety: This method must only be called after a stream token has been acquired + const streamToken = this.#streamToken as StreamToken; const headers = { Authorization: `Bearer ${this.#clientConfiguration.secret}`, }; const streamAdapter = this.#httpStreamClient.stream({ - data: { token: streamToken.token }, + data: { token: streamToken.token, start_ts }, headers, method: "POST", }); @@ -708,18 +740,21 @@ export class StreamClient { long_type: this.#clientConfiguration.long_type, }); + if (deserializedEvent.type === "error") { + // Errors sent from Fauna are assumed fatal + this.close(); + yield deserializedEvent; + throw StreamError.fromStreamEventError(deserializedEvent); + } + // TODO: handle callbacks when using the async generator? // this.#callbacks[deserializedEvent.type].forEach((callback) => // callback(deserializedEvent) // ); - yield deserializedEvent; - } - } + this.#last_ts = deserializedEvent.ts; - close() { - if (this.#streamAdapter) { - this.#streamAdapter.close(); + yield deserializedEvent; } } } @@ -733,3 +768,7 @@ const QUERY_CHECK_FAILURE_CODES = [ "invalid_syntax", "invalid_type", ]; + +function wait(ms: number) { + return new Promise((r) => setTimeout(r, ms)); +} diff --git a/src/wire-protocol.ts b/src/wire-protocol.ts index f30eda10..05de010c 100644 --- a/src/wire-protocol.ts +++ b/src/wire-protocol.ts @@ -328,11 +328,11 @@ export type StreamRequest = { export type StreamEventType = "start" | "add" | "remove" | "update" | "error"; export type StreamEventStart = { type: "start"; - ts: TimeStub; + ts: number; }; export type StreamEventData = { type: "add" | "remove" | "update"; - ts: TimeStub; + ts: number; // TODO: Different type for StreamStats? stats: QueryStats; data: QueryValue; From 0273292882affd9beb5cd2bd4665a2cd4a2e6790 Mon Sep 17 00:00:00 2001 From: Paul Paterson Date: Tue, 5 Mar 2024 13:05:04 -0500 Subject: [PATCH 15/36] pipe network errors into events --- src/client.ts | 28 ++++++++++++++++++++-------- src/errors.ts | 4 ++-- src/wire-protocol.ts | 3 ++- 3 files changed, 24 insertions(+), 11 deletions(-) diff --git a/src/client.ts b/src/client.ts index 2870dda5..3a997da7 100644 --- a/src/client.ts +++ b/src/client.ts @@ -655,12 +655,18 @@ export class StreamClient { return this; } - start() { + start(onFatalError: (error: Error) => void) { const run = async () => { - for await (const event of this) { - const callbacks = this.#callbacks[event.type]; - if (callbacks) { - this.#callbacks[event.type].forEach((callback) => callback(event)); + try { + for await (const event of this) { + const callbacks = this.#callbacks[event.type]; + if (callbacks) { + this.#callbacks[event.type].forEach((callback) => callback(event)); + } + } + } catch (error) { + if (error instanceof Error) { + onFatalError(error); } } }; @@ -693,17 +699,23 @@ export class StreamClient { for await (const event of this.#startStream(this.#last_ts)) { yield event; } - } catch (error) { - console.error("Error in stream", error); + } catch (error: any) { if ( error instanceof StreamError || this.#connectionAttempts >= STREAM_RECONNECT_MAX_ATTEMPTS ) { + // A terminal error from Fauna this.close(); throw error; } - console.log("retrying stream..."); + yield { + type: "error", + code: "network error", + message: error.message, + cause: error, + }; + this.#connectionAttempts += 1; await wait(backoffMs); } diff --git a/src/errors.ts b/src/errors.ts index 06564c40..9fa4be95 100644 --- a/src/errors.ts +++ b/src/errors.ts @@ -327,9 +327,9 @@ export class StreamError extends FaunaError { /** * Details about the query sent along with the response */ - readonly stats: QueryStats; + readonly stats?: QueryStats; - constructor(message: string, code: string, stats: QueryStats) { + constructor(message: string, code: string, stats?: QueryStats) { super(message); // Maintains proper stack trace for where our error was thrown (only available on V8) diff --git a/src/wire-protocol.ts b/src/wire-protocol.ts index 05de010c..7bc20143 100644 --- a/src/wire-protocol.ts +++ b/src/wire-protocol.ts @@ -342,6 +342,7 @@ export type StreamEventError = { code: string; message: string; // TODO: Different type for StreamStats? - stats: QueryStats; + stats?: QueryStats; + cause?: Error; }; export type StreamEvent = StreamEventStart | StreamEventData | StreamEventError; From 98d240aed3b9cddaaef2458b1e9958f7575d488d Mon Sep 17 00:00:00 2001 From: Paul Paterson Date: Tue, 5 Mar 2024 19:55:03 -0500 Subject: [PATCH 16/36] validate onFatalError callback --- src/client.ts | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/src/client.ts b/src/client.ts index 3a997da7..8b643337 100644 --- a/src/client.ts +++ b/src/client.ts @@ -655,7 +655,12 @@ export class StreamClient { return this; } - start(onFatalError: (error: Error) => void) { + start(onFatalError?: (error: Error) => void) { + if (onFatalError && typeof onFatalError !== "function") { + throw new TypeError( + `Expected a function as the 'onFatalError' argument, but received ${typeof onFatalError}. Please provide a valid function.` + ); + } const run = async () => { try { for await (const event of this) { @@ -665,8 +670,8 @@ export class StreamClient { } } } catch (error) { - if (error instanceof Error) { - onFatalError(error); + if (onFatalError) { + onFatalError(error as Error); } } }; From 7b6e3e72d75c3a2b5f2231c155861588ba2981ea Mon Sep 17 00:00:00 2001 From: Paul Paterson Date: Tue, 5 Mar 2024 19:57:02 -0500 Subject: [PATCH 17/36] do not emit type=error events --- src/client.ts | 6 ------ 1 file changed, 6 deletions(-) diff --git a/src/client.ts b/src/client.ts index 8b643337..ea8d30a2 100644 --- a/src/client.ts +++ b/src/client.ts @@ -760,15 +760,9 @@ export class StreamClient { if (deserializedEvent.type === "error") { // Errors sent from Fauna are assumed fatal this.close(); - yield deserializedEvent; throw StreamError.fromStreamEventError(deserializedEvent); } - // TODO: handle callbacks when using the async generator? - // this.#callbacks[deserializedEvent.type].forEach((callback) => - // callback(deserializedEvent) - // ); - this.#last_ts = deserializedEvent.ts; yield deserializedEvent; From 398c1f42f1801041e5736c25d00dab377009dba7 Mon Sep 17 00:00:00 2001 From: Paul Paterson Date: Fri, 8 Mar 2024 15:41:26 -0500 Subject: [PATCH 18/36] Provide exactly one handler for events and another for fatal errors (#234) --- src/client.ts | 28 +++++++++++----------------- 1 file changed, 11 insertions(+), 17 deletions(-) diff --git a/src/client.ts b/src/client.ts index ea8d30a2..14260ffc 100644 --- a/src/client.ts +++ b/src/client.ts @@ -624,13 +624,6 @@ export type StreamEventHandler = (event: StreamEvent) => void; export class StreamClient { closed = false; - #callbacks: Record = { - start: [], - add: [], - remove: [], - update: [], - error: [], - }; #query: () => Promise; #clientConfiguration: Record; #httpStreamClient: HTTPStreamClient; @@ -650,12 +643,15 @@ export class StreamClient { this.#httpStreamClient = httpStreamClient; } - on(type: StreamEventType, callback: StreamEventHandler) { - this.#callbacks[type].push(callback); - return this; - } - - start(onFatalError?: (error: Error) => void) { + start( + onEvent: StreamEventHandler, + onFatalError: (error: Error) => void + ): StreamClient { + if (typeof onEvent !== "function") { + throw new TypeError( + `Expected a function as the 'onEvent' argument, but received ${typeof onEvent}. Please provide a valid function.` + ); + } if (onFatalError && typeof onFatalError !== "function") { throw new TypeError( `Expected a function as the 'onFatalError' argument, but received ${typeof onFatalError}. Please provide a valid function.` @@ -664,10 +660,7 @@ export class StreamClient { const run = async () => { try { for await (const event of this) { - const callbacks = this.#callbacks[event.type]; - if (callbacks) { - this.#callbacks[event.type].forEach((callback) => callback(event)); - } + onEvent(event); } } catch (error) { if (onFatalError) { @@ -676,6 +669,7 @@ export class StreamClient { } }; run(); + return this; } async *[Symbol.asyncIterator](): AsyncGenerator { From 90c64f1da19b41b482f206e2fa77509800a9d0c4 Mon Sep 17 00:00:00 2001 From: Paul Paterson Date: Fri, 8 Mar 2024 16:00:22 -0500 Subject: [PATCH 19/36] Never yeild start or error events --- src/client.ts | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/src/client.ts b/src/client.ts index 14260ffc..e2f48b62 100644 --- a/src/client.ts +++ b/src/client.ts @@ -39,7 +39,6 @@ import { isQuerySuccess, QueryInterpolation, StreamEvent, - StreamEventType, type QueryFailure, type QueryOptions, type QuerySuccess, @@ -708,13 +707,6 @@ export class StreamClient { throw error; } - yield { - type: "error", - code: "network error", - message: error.message, - cause: error, - }; - this.#connectionAttempts += 1; await wait(backoffMs); } @@ -754,12 +746,15 @@ export class StreamClient { if (deserializedEvent.type === "error") { // Errors sent from Fauna are assumed fatal this.close(); + // TODO: replace with appropriate class from existing error heirarchy throw StreamError.fromStreamEventError(deserializedEvent); } this.#last_ts = deserializedEvent.ts; - yield deserializedEvent; + if (deserializedEvent.type !== "start") { + yield deserializedEvent; + } } } } From cb98659f249eb45b7abab1c3639265310bf90e9a Mon Sep 17 00:00:00 2001 From: Paul Paterson Date: Fri, 8 Mar 2024 17:05:00 -0500 Subject: [PATCH 20/36] client.query already implements appropriate retries --- src/client.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/src/client.ts b/src/client.ts index e2f48b62..5723f87e 100644 --- a/src/client.ts +++ b/src/client.ts @@ -677,7 +677,6 @@ export class StreamClient { } if (!this.#streamToken) { - // TODO: Should we retry any errors when trying to get the token? this.#streamToken = await this.#query(); } From 44edf28bb82aff34fd0ec3ac1f1597c16b784d94 Mon Sep 17 00:00:00 2001 From: Paul Paterson Date: Tue, 12 Mar 2024 14:06:16 -0400 Subject: [PATCH 21/36] Implement core update for error events to implement QueryFailure --- src/client.ts | 15 +++++++-------- src/errors.ts | 32 -------------------------------- src/wire-protocol.ts | 9 +-------- 3 files changed, 8 insertions(+), 48 deletions(-) diff --git a/src/client.ts b/src/client.ts index 5723f87e..ddee4fe2 100644 --- a/src/client.ts +++ b/src/client.ts @@ -20,7 +20,7 @@ import { ThrottlingError, ContendedTransactionError, InvalidRequestError, - StreamError, + FaunaError, } from "./errors"; import { HTTPStreamClient, @@ -39,6 +39,7 @@ import { isQuerySuccess, QueryInterpolation, StreamEvent, + StreamEventData, type QueryFailure, type QueryOptions, type QuerySuccess, @@ -619,8 +620,6 @@ in an environmental variable named FAUNA_SECRET or pass it to the Client\ } } -export type StreamEventHandler = (event: StreamEvent) => void; - export class StreamClient { closed = false; #query: () => Promise; @@ -643,7 +642,7 @@ export class StreamClient { } start( - onEvent: StreamEventHandler, + onEvent: (event: StreamEventData) => void, onFatalError: (error: Error) => void ): StreamClient { if (typeof onEvent !== "function") { @@ -671,7 +670,7 @@ export class StreamClient { return this; } - async *[Symbol.asyncIterator](): AsyncGenerator { + async *[Symbol.asyncIterator](): AsyncGenerator { if (this.closed) { throw new ClientError("The stream has been closed and cannot be reused."); } @@ -698,7 +697,7 @@ export class StreamClient { } } catch (error: any) { if ( - error instanceof StreamError || + error instanceof FaunaError || this.#connectionAttempts >= STREAM_RECONNECT_MAX_ATTEMPTS ) { // A terminal error from Fauna @@ -720,7 +719,7 @@ export class StreamClient { this.closed = true; } - async *#startStream(start_ts?: number): AsyncGenerator { + async *#startStream(start_ts?: number): AsyncGenerator { // Safety: This method must only be called after a stream token has been acquired const streamToken = this.#streamToken as StreamToken; @@ -746,7 +745,7 @@ export class StreamClient { // Errors sent from Fauna are assumed fatal this.close(); // TODO: replace with appropriate class from existing error heirarchy - throw StreamError.fromStreamEventError(deserializedEvent); + throw new ServiceError(deserializedEvent, 400); } this.#last_ts = deserializedEvent.ts; diff --git a/src/errors.ts b/src/errors.ts index 9fa4be95..0a382e93 100644 --- a/src/errors.ts +++ b/src/errors.ts @@ -314,35 +314,3 @@ export class ProtocolError extends FaunaError { this.httpStatus = error.httpStatus; } } - -/** - * An error representing a failure in a stream - */ -export class StreamError extends FaunaError { - /** - * Error code - */ - readonly code: string; - - /** - * Details about the query sent along with the response - */ - readonly stats?: QueryStats; - - constructor(message: string, code: string, stats?: QueryStats) { - super(message); - - // Maintains proper stack trace for where our error was thrown (only available on V8) - if (Error.captureStackTrace) { - Error.captureStackTrace(this, StreamError); - } - - this.name = "StreamError"; - this.code = code; - this.stats = stats; - } - - static fromStreamEventError(event: StreamEventError) { - return new StreamError(event.message, event.code, event.stats); - } -} diff --git a/src/wire-protocol.ts b/src/wire-protocol.ts index 7bc20143..fbe5c66f 100644 --- a/src/wire-protocol.ts +++ b/src/wire-protocol.ts @@ -337,12 +337,5 @@ export type StreamEventData = { stats: QueryStats; data: QueryValue; }; -export type StreamEventError = { - type: "error"; - code: string; - message: string; - // TODO: Different type for StreamStats? - stats?: QueryStats; - cause?: Error; -}; +export type StreamEventError = { type: "error" } & QueryFailure; export type StreamEvent = StreamEventStart | StreamEventData | StreamEventError; From e2352db870b59901fd890bad62554ce8e0d7562c Mon Sep 17 00:00:00 2001 From: Paul Paterson Date: Tue, 12 Mar 2024 14:28:10 -0400 Subject: [PATCH 22/36] catch and throw non 200s --- src/http-client/fetch-client.ts | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/src/http-client/fetch-client.ts b/src/http-client/fetch-client.ts index 5ecb34e8..20db8e09 100644 --- a/src/http-client/fetch-client.ts +++ b/src/http-client/fetch-client.ts @@ -1,7 +1,7 @@ /** following reference needed to include types for experimental fetch API in Node */ /// -import { NetworkError } from "../errors"; +import { NetworkError, ServiceError } from "../errors"; import { HTTPClient, HTTPClientOptions, @@ -89,7 +89,20 @@ export class FetchClient implements HTTPClient, HTTPStreamClient { }; async function* reader() { - const response = await fetch(request, options); + const response = await fetch(request, options).catch((error) => { + throw new NetworkError( + "The network connection encountered a problem.", + { + cause: error, + } + ); + }); + const status = response.status; + if (!(status >= 200 && status < 400)) { + const body = await response.json(); + throw new ServiceError(body, status); + } + const body = response.body; if (!body) { throw new Error("Response body is undefined."); From 6373f67793d89b48e8b8968eca26380c651a9f23 Mon Sep 17 00:00:00 2001 From: Paul Paterson Date: Tue, 12 Mar 2024 14:45:16 -0400 Subject: [PATCH 23/36] Add README example --- README.md | 85 ++++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 84 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 64cbe320..4650274f 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,7 @@ See the [Fauna Documentation](https://docs.fauna.com/fauna/current/) for additio
Table of Contents -- [A JavaScript driver for Fauna.](#a-javascript-driver-for-fauna) +- [The Official Javascript Driver for Fauna.](#the-official-javascript-driver-for-fauna) - [Quick-Start](#quick-start) - [Supported Runtimes](#supported-runtimes) - [Installation](#installation) @@ -23,12 +23,16 @@ See the [Fauna Documentation](https://docs.fauna.com/fauna/current/) for additio - [Typescript Support](#typescript-support) - [Query Options](#query-options) - [Client Configuration](#client-configuration) + - [Retry](#retry) + - [Max Attempts](#max-attempts) + - [Max Backoff](#max-backoff) - [Timeouts](#timeouts) - [Query Timeout](#query-timeout) - [Client Timeout](#client-timeout) - [HTTP/2 Session Idle Timeout](#http2-session-idle-timeout) - [Using environment variables](#using-environment-variables) - [Query Statistics](#query-statistics) + - [Streaming](#streaming) - [Contributing](#contributing) - [Setting up this Repo](#setting-up-this-repo) - [Running tests](#running-tests) @@ -402,6 +406,85 @@ try { */ ```` +## Streaming + +Obtain a stream token using a regular query with either the `toStream()` or `changesOn()` FQL methods on a Set. + +```javascript +import { Client, fql } from "fauna" +const client = new Client({ secret: FAUNA_SECRET }) + +const response = await client.query(fql` + let set = MyCollection.all() + + { + initialPage: set.pageSize(10), + streamToken: set.toStream() + } +`); +const { initialPage, streamToken } = response.data; + +const stream = client.stream(streamToken) +``` + +The driver will take care of the initial request to convert to a stream if you provide a Query + +```javascript +import { Client, fql } from "fauna" +const client = new Client({ secret: FAUNA_SECRET }) + +const stream = await client.stream(fql`MyCollection.all().changesOn(.field1, .field2)`) +``` + +There are two Two ways to initiate the stream: +1. Async Iterator +2. Callbacks + +_Async Iterator example_ +```javascript +try { + for await (const event of stream) { + switch (event.type) { + case "update": + case "add": + case "remove": + console.log("Stream event:", event); + // ... + break; + } + } +} catch (error) { + // An error will be handled here if Fauna returns a terminal, "error" event, or + // if Fauna returns a non-200 response when trying to connect, or + // if the max number of retries on network errors is reached. + + // ... handle fatal error +} +``` + +_Callbacks example_ +```javascript +stream.start( + function onEvent(event) { + switch (event.type) { + case "update": + case "add": + case "remove": + console.log("Stream event:", event); + // ... + break; + } + }, + function onFatalError(error) { + // An error will be handled here if Fauna returns a terminal, "error" event, or + // if Fauna returns a non-200 response when trying to connect, or + // if the max number of retries on network errors is reached. + + // ... handle fatal error + } +); +``` + # Contributing Any contributions are from the community are greatly appreciated! From 23bd3f664f6977dde9018372cfd1f647e98a0243 Mon Sep 17 00:00:00 2001 From: Paul Paterson Date: Thu, 14 Mar 2024 14:31:30 -0400 Subject: [PATCH 24/36] update event format --- src/client.ts | 8 ++++++-- src/wire-protocol.ts | 10 +++++----- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/src/client.ts b/src/client.ts index ddee4fe2..d2268d6e 100644 --- a/src/client.ts +++ b/src/client.ts @@ -719,6 +719,10 @@ export class StreamClient { this.closed = true; } + get last_ts(): number | undefined { + return this.#last_ts; + } + async *#startStream(start_ts?: number): AsyncGenerator { // Safety: This method must only be called after a stream token has been acquired const streamToken = this.#streamToken as StreamToken; @@ -748,9 +752,9 @@ export class StreamClient { throw new ServiceError(deserializedEvent, 400); } - this.#last_ts = deserializedEvent.ts; + this.#last_ts = deserializedEvent.txn_ts; - if (deserializedEvent.type !== "start") { + if (deserializedEvent.type !== "status") { yield deserializedEvent; } } diff --git a/src/wire-protocol.ts b/src/wire-protocol.ts index fbe5c66f..fd06b04f 100644 --- a/src/wire-protocol.ts +++ b/src/wire-protocol.ts @@ -325,15 +325,15 @@ export type StreamRequest = { start_ts?: number; }; -export type StreamEventType = "start" | "add" | "remove" | "update" | "error"; +export type StreamEventType = "status" | "add" | "remove" | "update" | "error"; export type StreamEventStart = { - type: "start"; - ts: number; + type: "status"; + txn_ts: number; + stats: QueryStats; }; export type StreamEventData = { type: "add" | "remove" | "update"; - ts: number; - // TODO: Different type for StreamStats? + txn_ts: number; stats: QueryStats; data: QueryValue; }; From 303b9818a4ea5027c62f737d68ffcee99111a273 Mon Sep 17 00:00:00 2001 From: Paul Paterson Date: Thu, 14 Mar 2024 14:55:44 -0400 Subject: [PATCH 25/36] Add JS docs --- src/client.ts | 101 +++++++++++++++++++++++++++++---- src/http-client/http-client.ts | 3 + 2 files changed, 93 insertions(+), 11 deletions(-) diff --git a/src/client.ts b/src/client.ts index d2268d6e..e7a86c67 100644 --- a/src/client.ts +++ b/src/client.ts @@ -277,6 +277,57 @@ export class Client { /** * Initialize a streaming request to Fauna * @param query - A string-encoded streaming token, or a {@link Query} + * @returns A {@link StreamClient} that which can be used to listen to a stream + * of events + * + * @example + * ```javascript + * const stream = client.stream(fql`MyCollection.all().toStream()`) + * + * try { + * for await (const event of stream) { + * switch (event.type) { + * case "update": + * case "add": + * case "remove": + * console.log("Stream update:", event); + * // ... + * break; + * } + * } + * } catch (error) { + * // An error will be handled here if Fauna returns a terminal, "error" event, or + * // if Fauna returns a non-200 response when trying to connect, or + * // if the max number of retries on network errors is reached. + * + * // ... handle fatal error + * }; + * ``` + * + * @example + * ```javascript + * const stream = client.stream(fql`MyCollection.all().toStream()`) + * + * stream.start( + * function onEvent(event) { + * switch (event.type) { + * case "update": + * case "add": + * case "remove": + * console.log("Stream update:", event); + * // ... + * break; + * } + * }, + * function onError(error) { + * // An error will be handled here if Fauna returns a terminal, "error" event, or + * // if Fauna returns a non-200 response when trying to connect, or + * // if the max number of retries on network errors is reached. + * + * // ... handle fatal error + * } + * ); + * ``` */ // TODO: implement options stream(query: Query | StreamToken): StreamClient { @@ -620,17 +671,39 @@ in an environmental variable named FAUNA_SECRET or pass it to the Client\ } } +/** + * A class to listen to Fauna streams. + */ export class StreamClient { + /** Whether or not this stream has been closed */ closed = false; - #query: () => Promise; + /** The stream client options */ #clientConfiguration: Record; - #httpStreamClient: HTTPStreamClient; + /** A tracker for the number of connection attempts */ #connectionAttempts = 0; + /** The underlying {@link HTTPStreamClient} that will execute the actual HTTP calls */ + #httpStreamClient: HTTPStreamClient; + /** A lambda that returns a promise for a {@link StreamToken} */ + #query: () => Promise; + /** The last `txn_ts` value received from events */ + #last_ts?: number; + /** A common interface to operate a stream from any HTTPStreamClient */ #streamAdapter?: StreamAdapter; + /** A saved copy of the StreamToken once received */ #streamToken?: StreamToken; - #last_ts?: number; - // TODO: make clientConfiguration and httpStreamClient optional + /** + * + * @param query - A lambda that returns a promise for a {@link StreamToken} + * @param clientConfiguration - The {@link ClientConfiguration} to apply + * @param httpStreamClient - The underlying {@link HTTPStreamClient} that will + * execute the actual HTTP calls + * @example + * ```typescript + * const streamClient = client.stream(streamToken); + * ``` + */ + // TODO: implement stream-specific options constructor( query: () => Promise, clientConfiguration: StreamClientConfiguration, @@ -641,18 +714,25 @@ export class StreamClient { this.#httpStreamClient = httpStreamClient; } + /** + * A synchronous method to start listening to the stream and handle events + * using callbacks. + * @param onEvent - A callback function to handle each event + * @param onError - An Optional callback function to handle errors. If none is + * provided, error will not be handled, and the stream will simply end. + */ start( onEvent: (event: StreamEventData) => void, - onFatalError: (error: Error) => void - ): StreamClient { + onError: (error: Error) => void + ) { if (typeof onEvent !== "function") { throw new TypeError( `Expected a function as the 'onEvent' argument, but received ${typeof onEvent}. Please provide a valid function.` ); } - if (onFatalError && typeof onFatalError !== "function") { + if (onError && typeof onError !== "function") { throw new TypeError( - `Expected a function as the 'onFatalError' argument, but received ${typeof onFatalError}. Please provide a valid function.` + `Expected a function as the 'onError' argument, but received ${typeof onError}. Please provide a valid function.` ); } const run = async () => { @@ -661,13 +741,12 @@ export class StreamClient { onEvent(event); } } catch (error) { - if (onFatalError) { - onFatalError(error as Error); + if (onError) { + onError(error as Error); } } }; run(); - return this; } async *[Symbol.asyncIterator](): AsyncGenerator { diff --git a/src/http-client/http-client.ts b/src/http-client/http-client.ts index 28ef6427..d567f3a1 100644 --- a/src/http-client/http-client.ts +++ b/src/http-client/http-client.ts @@ -82,6 +82,9 @@ export type HTTPStreamRequest = { method: "POST"; }; +/** + * A common interface for a StreamClient to operate a stream from any HTTPStreamClient + */ export interface StreamAdapter { read: AsyncGenerator; close: () => void; From 2cd737f263a2433701b94a282b5e38a5338d345d Mon Sep 17 00:00:00 2001 From: Paul Paterson Date: Thu, 14 Mar 2024 15:01:32 -0400 Subject: [PATCH 26/36] unused imports --- src/errors.ts | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/errors.ts b/src/errors.ts index 0a382e93..c6d72fef 100644 --- a/src/errors.ts +++ b/src/errors.ts @@ -2,9 +2,7 @@ import type { ConstraintFailure, QueryFailure, QueryInfo, - QueryStats, QueryValue, - StreamEventError, } from "./wire-protocol"; /** From 11491f90f1d007e58bc33b25027c9d942c49c31c Mon Sep 17 00:00:00 2001 From: Paul Paterson Date: Mon, 18 Mar 2024 16:37:35 -0400 Subject: [PATCH 27/36] Initial implementation for streams with http2 module (#235) --- src/http-client/node-http2-client.ts | 119 ++++++++++++++++++++++++++- 1 file changed, 116 insertions(+), 3 deletions(-) diff --git a/src/http-client/node-http2-client.ts b/src/http-client/node-http2-client.ts index ebd61bb6..193c2263 100644 --- a/src/http-client/node-http2-client.ts +++ b/src/http-client/node-http2-client.ts @@ -9,8 +9,11 @@ import { HTTPClientOptions, HTTPRequest, HTTPResponse, + HTTPStreamClient, + HTTPStreamRequest, + StreamAdapter, } from "./http-client"; -import { NetworkError } from "../errors"; +import { ServiceError, NetworkError } from "../errors"; // alias http2 types type ClientHttp2Session = any; @@ -22,7 +25,7 @@ type OutgoingHttpHeaders = any; /** * An implementation for {@link HTTPClient} that uses the node http package */ -export class NodeHTTP2Client implements HTTPClient { +export class NodeHTTP2Client implements HTTPClient, HTTPStreamClient { static #clients: Map = new Map(); #http2_session_idle_ms: number; @@ -107,6 +110,11 @@ export class NodeHTTP2Client implements HTTPClient { }); } + /** {@inheritDoc HTTPStreamClient.stream} */ + stream(req: HTTPStreamRequest): StreamAdapter { + return this.#doStream(req); + } + /** {@inheritDoc HTTPClient.close} */ close() { // defend against redundant close calls @@ -170,7 +178,7 @@ export class NodeHTTP2Client implements HTTPClient { // append response data to the data string every time we receive new // data chunks in the response - req.on("data", (chunk: any) => { + req.on("data", (chunk: string) => { responseData += chunk; }); @@ -213,4 +221,109 @@ export class NodeHTTP2Client implements HTTPClient { } }); } + + /** {@inheritDoc HTTPStreamClient.stream} */ + #doStream({ + data: requestData, + headers: requestHeaders, + method, + }: HTTPStreamRequest): StreamAdapter { + let resolveChunk: (chunk: string) => void; + let rejectChunk: (reason: any) => void; + + const setChunkPromise = () => + new Promise((res, rej) => { + resolveChunk = res; + rejectChunk = rej; + }); + + let chunkPromise = setChunkPromise(); + + let req: ClientHttp2Stream; + const onResponse = ( + http2ResponseHeaders: IncomingHttpHeaders & IncomingHttpStatusHeader + ) => { + const status = Number( + http2ResponseHeaders[http2.constants.HTTP2_HEADER_STATUS] + ); + if (!(status >= 200 && status < 400)) { + // Get the error body and then throw an error + let responseData = ""; + + // append response data to the data string every time we receive new + // data chunks in the response + req.on("data", (chunk: string) => { + responseData += chunk; + }); + + // Once the response is finished, resolve the promise + // TODO: The Client contains the information for how to parse an error + // into the appropriate class, so lift this logic out of the HTTPClient. + req.on("end", () => { + rejectChunk(new ServiceError(JSON.parse(responseData), status)); + }); + } else { + let partOfLine = ""; + + // append response data to the data string every time we receive new + // data chunks in the response + req.on("data", (chunk: string) => { + const chunkLines = (partOfLine + chunk).split("\n"); + + // Yield all complete lines + for (let i = 0; i < chunkLines.length - 1; i++) { + resolveChunk(chunkLines[i].trim()); + chunkPromise = setChunkPromise(); + } + + // Store the partial line + partOfLine = chunkLines[chunkLines.length - 1]; + }); + + // Once the response is finished, resolve the promise + req.on("end", () => { + resolveChunk(partOfLine); + }); + } + }; + + // eslint-disable-next-line @typescript-eslint/no-this-alias + const self = this; + + async function* reader(): AsyncGenerator { + const httpRequestHeaders: OutgoingHttpHeaders = { + ...requestHeaders, + [http2.constants.HTTP2_HEADER_PATH]: "/stream/1", + [http2.constants.HTTP2_HEADER_METHOD]: method, + }; + + const session = self.#connect(); + req = session + .request(httpRequestHeaders) + .setEncoding("utf8") + .on("error", (error: any) => { + rejectChunk(error); + }) + .on("response", onResponse); + + const body = JSON.stringify(requestData); + + req.write(body, "utf8"); + + req.end(); + + while (true) { + yield await chunkPromise; + } + } + + return { + read: reader(), + close: () => { + if (req) { + req.close(); + } + }, + }; + } } From f18023c2e2ea38c7bde06c893968d01da2d745d9 Mon Sep 17 00:00:00 2001 From: Paul Paterson Date: Tue, 19 Mar 2024 09:28:35 -0400 Subject: [PATCH 28/36] expose retry and status_events options --- src/client-configuration.ts | 18 ++++++++++++ src/client.ts | 52 ++++++++++++++++++++-------------- src/http-client/http-client.ts | 6 ---- src/http-client/index.ts | 13 ++++++++- src/index.ts | 9 ++++-- src/wire-protocol.ts | 7 +++-- 6 files changed, 73 insertions(+), 32 deletions(-) diff --git a/src/client-configuration.ts b/src/client-configuration.ts index 0a00b404..ebbdd382 100644 --- a/src/client-configuration.ts +++ b/src/client-configuration.ts @@ -171,6 +171,24 @@ export type StreamClientConfiguration = { * @see {@link ClientConfiguration.long_type} */ long_type: "number" | "bigint"; + + /** + * Max attempts for retryable exceptions. + */ + max_attempts: number; + + /** + * Max backoff between retries. + */ + max_backoff: number; + + /** + * Indicates if stream should include "status" events, periodic events that + * update the client with the latest valid timestamp (in the event of a + * dropped connection) as well as metrics about about the cost of maintaining + * the stream other than the cost of the received events. + */ + status_events?: boolean; }; /** diff --git a/src/client.ts b/src/client.ts index e7a86c67..67bb76a7 100644 --- a/src/client.ts +++ b/src/client.ts @@ -26,7 +26,7 @@ import { HTTPStreamClient, StreamAdapter, getDefaultHTTPClient, - implementsStreamClient, + isStreamClient, isHTTPResponse, type HTTPClient, } from "./http-client"; @@ -40,6 +40,7 @@ import { QueryInterpolation, StreamEvent, StreamEventData, + StreamEventStatus, type QueryFailure, type QueryOptions, type QuerySuccess, @@ -330,7 +331,10 @@ export class Client { * ``` */ // TODO: implement options - stream(query: Query | StreamToken): StreamClient { + stream( + query: Query | StreamToken, + options?: Partial + ): StreamClient { if (this.#isClosed) { throw new ClientClosedError( "Your client is closed. No further requests can be issued." @@ -339,7 +343,12 @@ export class Client { const streamClient = this.#httpClient; - if (implementsStreamClient(streamClient)) { + if (isStreamClient(streamClient)) { + const streamClientConfig: StreamClientConfiguration = { + ...this.#clientConfiguration, + ...options, + }; + const getStreamToken: () => Promise = query instanceof Query ? () => @@ -355,11 +364,7 @@ export class Client { }) : () => Promise.resolve(query as StreamToken); - return new StreamClient( - getStreamToken, - this.#clientConfiguration, - streamClient - ); + return new StreamClient(getStreamToken, streamClientConfig, streamClient); } else { throw new ClientError("Streaming is not supported by this client."); } @@ -678,7 +683,7 @@ export class StreamClient { /** Whether or not this stream has been closed */ closed = false; /** The stream client options */ - #clientConfiguration: Record; + #clientConfiguration: StreamClientConfiguration; /** A tracker for the number of connection attempts */ #connectionAttempts = 0; /** The underlying {@link HTTPStreamClient} that will execute the actual HTTP calls */ @@ -722,7 +727,7 @@ export class StreamClient { * provided, error will not be handled, and the stream will simply end. */ start( - onEvent: (event: StreamEventData) => void, + onEvent: (event: StreamEventData | StreamEventStatus) => void, onError: (error: Error) => void ) { if (typeof onEvent !== "function") { @@ -749,7 +754,9 @@ export class StreamClient { run(); } - async *[Symbol.asyncIterator](): AsyncGenerator { + async *[Symbol.asyncIterator](): AsyncGenerator< + StreamEventData | StreamEventStatus + > { if (this.closed) { throw new ClientError("The stream has been closed and cannot be reused."); } @@ -758,18 +765,14 @@ export class StreamClient { this.#streamToken = await this.#query(); } - // TODO: make these constants configurable - const STREAM_RECONNECT_MAX_ATTEMPTS = 60; - const STREAM_RECONNECT_MAX_BACKOFF = 10; - + this.#connectionAttempts = 1; while (!this.closed) { const backoffMs = Math.min( Math.random() * 2 ** this.#connectionAttempts, - STREAM_RECONNECT_MAX_BACKOFF + this.#clientConfiguration.max_backoff ) * 1_000; - this.#connectionAttempts = 1; try { for await (const event of this.#startStream(this.#last_ts)) { yield event; @@ -777,7 +780,7 @@ export class StreamClient { } catch (error: any) { if ( error instanceof FaunaError || - this.#connectionAttempts >= STREAM_RECONNECT_MAX_ATTEMPTS + this.#connectionAttempts >= this.#clientConfiguration.max_attempts ) { // A terminal error from Fauna this.close(); @@ -802,7 +805,9 @@ export class StreamClient { return this.#last_ts; } - async *#startStream(start_ts?: number): AsyncGenerator { + async *#startStream( + start_ts?: number + ): AsyncGenerator { // Safety: This method must only be called after a stream token has been acquired const streamToken = this.#streamToken as StreamToken; @@ -833,9 +838,14 @@ export class StreamClient { this.#last_ts = deserializedEvent.txn_ts; - if (deserializedEvent.type !== "status") { - yield deserializedEvent; + if ( + !this.#clientConfiguration.status_events && + deserializedEvent.type === "status" + ) { + continue; } + + yield deserializedEvent; } } } diff --git a/src/http-client/http-client.ts b/src/http-client/http-client.ts index d567f3a1..ca800184 100644 --- a/src/http-client/http-client.ts +++ b/src/http-client/http-client.ts @@ -104,9 +104,3 @@ export interface HTTPStreamClient { */ stream(req: HTTPStreamRequest): StreamAdapter; } - -export const implementsStreamClient = ( - client: Partial -): client is HTTPStreamClient => { - return "stream" in client && typeof client.stream === "function"; -}; diff --git a/src/http-client/index.ts b/src/http-client/index.ts index 1ed20eab..861e20ed 100644 --- a/src/http-client/index.ts +++ b/src/http-client/index.ts @@ -1,5 +1,10 @@ import { FetchClient } from "./fetch-client"; -import { HTTPClient, HTTPClientOptions, HTTPResponse } from "./http-client"; +import { + HTTPClient, + HTTPClientOptions, + HTTPResponse, + HTTPStreamClient, +} from "./http-client"; import { NodeHTTP2Client } from "./node-http2-client"; export * from "./fetch-client"; @@ -14,6 +19,12 @@ export const getDefaultHTTPClient = (options: HTTPClientOptions): HTTPClient => export const isHTTPResponse = (res: any): res is HTTPResponse => res instanceof Object && "body" in res && "headers" in res && "status" in res; +export const isStreamClient = ( + client: Partial +): client is HTTPStreamClient => { + return "stream" in client && typeof client.stream === "function"; +}; + export const nodeHttp2IsSupported = () => { if ( typeof process !== "undefined" && diff --git a/src/index.ts b/src/index.ts index 927c7858..808a0e4e 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,8 +1,9 @@ -export { Client } from "./client"; +export { Client, StreamClient } from "./client"; export { endpoints, type ClientConfiguration, type Endpoints, + type StreamClientConfiguration, } from "./client-configuration"; export { AbortError, @@ -42,7 +43,6 @@ export { DateStub, Document, DocumentReference, - type DocumentT, EmbeddedSet, Module, NamedDocument, @@ -50,14 +50,19 @@ export { NullDocument, Page, SetIterator, + StreamToken, TimeStub, + type DocumentT, } from "./values"; export { FetchClient, getDefaultHTTPClient, isHTTPResponse, + isStreamClient, NodeHTTP2Client, type HTTPClient, type HTTPRequest, type HTTPResponse, + type HTTPStreamClient, + type StreamAdapter, } from "./http-client"; diff --git a/src/wire-protocol.ts b/src/wire-protocol.ts index fd06b04f..421c5691 100644 --- a/src/wire-protocol.ts +++ b/src/wire-protocol.ts @@ -326,7 +326,7 @@ export type StreamRequest = { }; export type StreamEventType = "status" | "add" | "remove" | "update" | "error"; -export type StreamEventStart = { +export type StreamEventStatus = { type: "status"; txn_ts: number; stats: QueryStats; @@ -338,4 +338,7 @@ export type StreamEventData = { data: QueryValue; }; export type StreamEventError = { type: "error" } & QueryFailure; -export type StreamEvent = StreamEventStart | StreamEventData | StreamEventError; +export type StreamEvent = + | StreamEventStatus + | StreamEventData + | StreamEventError; From 6e9e638412bf118cd64a68c19ce98a1a3407a3e9 Mon Sep 17 00:00:00 2001 From: Paul Paterson Date: Tue, 19 Mar 2024 09:54:12 -0400 Subject: [PATCH 29/36] Add httpStreamClient to options, rather than separate arg --- src/client-configuration.ts | 12 +++++++++--- src/client.ts | 11 ++++------- 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/src/client-configuration.ts b/src/client-configuration.ts index ebbdd382..adaf0862 100644 --- a/src/client-configuration.ts +++ b/src/client-configuration.ts @@ -1,3 +1,4 @@ +import { HTTPStreamClient } from "./http-client"; import type { ValueFormat } from "./wire-protocol"; /** @@ -161,10 +162,9 @@ export interface Endpoints { */ export type StreamClientConfiguration = { /** - * A secret for your Fauna DB, used to authorize your queries. - * @see https://docs.fauna.com/fauna/current/security/keys + * The underlying {@link HTTPStreamClient} that will execute the actual HTTP calls */ - secret: string; + httpStreamClient: HTTPStreamClient; /** * Controls what Javascript type to deserialize {@link https://fqlx-beta--fauna-docs.netlify.app/fqlx/beta/reference/language/types#long | Fauna longs} to. @@ -182,6 +182,12 @@ export type StreamClientConfiguration = { */ max_backoff: number; + /** + * A secret for your Fauna DB, used to authorize your queries. + * @see https://docs.fauna.com/fauna/current/security/keys + */ + secret: string; + /** * Indicates if stream should include "status" events, periodic events that * update the client with the latest valid timestamp (in the event of a diff --git a/src/client.ts b/src/client.ts index 67bb76a7..e1ba5028 100644 --- a/src/client.ts +++ b/src/client.ts @@ -346,6 +346,7 @@ export class Client { if (isStreamClient(streamClient)) { const streamClientConfig: StreamClientConfiguration = { ...this.#clientConfiguration, + httpStreamClient: streamClient, ...options, }; @@ -364,7 +365,7 @@ export class Client { }) : () => Promise.resolve(query as StreamToken); - return new StreamClient(getStreamToken, streamClientConfig, streamClient); + return new StreamClient(getStreamToken, streamClientConfig); } else { throw new ClientError("Streaming is not supported by this client."); } @@ -686,8 +687,6 @@ export class StreamClient { #clientConfiguration: StreamClientConfiguration; /** A tracker for the number of connection attempts */ #connectionAttempts = 0; - /** The underlying {@link HTTPStreamClient} that will execute the actual HTTP calls */ - #httpStreamClient: HTTPStreamClient; /** A lambda that returns a promise for a {@link StreamToken} */ #query: () => Promise; /** The last `txn_ts` value received from events */ @@ -711,12 +710,10 @@ export class StreamClient { // TODO: implement stream-specific options constructor( query: () => Promise, - clientConfiguration: StreamClientConfiguration, - httpStreamClient: HTTPStreamClient + clientConfiguration: StreamClientConfiguration ) { this.#query = query; this.#clientConfiguration = clientConfiguration; - this.#httpStreamClient = httpStreamClient; } /** @@ -815,7 +812,7 @@ export class StreamClient { Authorization: `Bearer ${this.#clientConfiguration.secret}`, }; - const streamAdapter = this.#httpStreamClient.stream({ + const streamAdapter = this.#clientConfiguration.httpStreamClient.stream({ data: { token: streamToken.token, start_ts }, headers, method: "POST", From 9cfc9723f0b94ee3f553027592c2b26737e1d11b Mon Sep 17 00:00:00 2001 From: Paul Paterson Date: Tue, 19 Mar 2024 10:25:09 -0400 Subject: [PATCH 30/36] refactor StreamClient constructor so we can pass in a token directly --- src/client.ts | 41 ++++++++++++++++++++++------------------- 1 file changed, 22 insertions(+), 19 deletions(-) diff --git a/src/client.ts b/src/client.ts index e1ba5028..70557d73 100644 --- a/src/client.ts +++ b/src/client.ts @@ -332,7 +332,7 @@ export class Client { */ // TODO: implement options stream( - query: Query | StreamToken, + tokenOrQuery: StreamToken | Query, options?: Partial ): StreamClient { if (this.#isClosed) { @@ -350,22 +350,12 @@ export class Client { ...options, }; - const getStreamToken: () => Promise = - query instanceof Query - ? () => - this.query(query).then((res) => { - const maybeStreamToken = res.data; - if (!(maybeStreamToken instanceof StreamToken)) { - throw new ClientError( - `Error requesting a stream token. Expected a StreamToken as the query result, but received ${typeof maybeStreamToken}. Your query must return the result of '.toStream' or '.changesOn')\n` + - `Query result: ${JSON.stringify(maybeStreamToken, null)}` - ); - } - return maybeStreamToken; - }) - : () => Promise.resolve(query as StreamToken); + const tokenOrGetToken = + tokenOrQuery instanceof Query + ? () => this.query(tokenOrQuery).then((res) => res.data) + : tokenOrQuery; - return new StreamClient(getStreamToken, streamClientConfig); + return new StreamClient(tokenOrGetToken, streamClientConfig); } else { throw new ClientError("Streaming is not supported by this client."); } @@ -709,10 +699,15 @@ export class StreamClient { */ // TODO: implement stream-specific options constructor( - query: () => Promise, + token: StreamToken | (() => Promise), clientConfiguration: StreamClientConfiguration ) { - this.#query = query; + if (token instanceof StreamToken) { + this.#query = () => Promise.resolve(token); + } else { + this.#query = token; + } + this.#clientConfiguration = clientConfiguration; } @@ -759,7 +754,15 @@ export class StreamClient { } if (!this.#streamToken) { - this.#streamToken = await this.#query(); + this.#streamToken = await this.#query().then((maybeStreamToken) => { + if (!(maybeStreamToken instanceof StreamToken)) { + throw new ClientError( + `Error requesting a stream token. Expected a StreamToken as the query result, but received ${typeof maybeStreamToken}. Your query must return the result of '.toStream' or '.changesOn')\n` + + `Query result: ${JSON.stringify(maybeStreamToken, null)}` + ); + } + return maybeStreamToken; + }); } this.#connectionAttempts = 1; From cd5ad15e485978c1ad91de76053d6a72353969a5 Mon Sep 17 00:00:00 2001 From: Paul Paterson Date: Wed, 20 Mar 2024 15:29:44 -0400 Subject: [PATCH 31/36] Add some integration tests --- __tests__/integration/set.test.ts | 4 +-- package.json | 2 +- src/client.ts | 60 ++++++++++++++++++++++++++----- src/http-client/index.ts | 4 ++- src/wire-protocol.ts | 8 ++--- 5 files changed, 62 insertions(+), 16 deletions(-) diff --git a/__tests__/integration/set.test.ts b/__tests__/integration/set.test.ts index a68a9bbb..d6510281 100644 --- a/__tests__/integration/set.test.ts +++ b/__tests__/integration/set.test.ts @@ -36,10 +36,10 @@ describe("SetIterator", () => { beforeAll(async () => { await client.query(fql` if (Collection.byName("IterTestSmall") != null) { - IterTestSmall.definition.delete() + Collection.byName("IterTestSmall")!.delete() } if (Collection.byName("IterTestBig") != null) { - IterTestBig.definition.delete() + Collection.byName("IterTestBig")!.delete() } `); await client.query(fql` diff --git a/package.json b/package.json index 6d5086f5..2321e047 100644 --- a/package.json +++ b/package.json @@ -42,7 +42,7 @@ "build:node": "esbuild src/index.ts --bundle --sourcemap --platform=node --outfile=dist/node/index.js", "build:types": "tsc -emitDeclarationOnly --declaration true", "lint": "eslint -f unix \"src/**/*.{ts,tsx}\"", - "fauna-local": "docker start faunadb-local || docker run --rm -d --name faunadb-local -p 8443:8443 -p 8084:8084 fauna/faunadb", + "fauna-local": "docker start faunadb-local || docker run --rm -d --name faunadb-local -p 8443:8443 -p 8084:8084 --mount type=bind,source=\"$(pwd)\"/docker/feature-flags.json,target=/etc/feature-flag-periodic.d/feature-flags.json fauna/faunadb", "fauna-local-alt-port": "docker start faunadb-local-alt-port || docker run --rm -d --name faunadb-local-alt-port -p 7443:8443 -p 7084:8084 fauna/faunadb", "prepare": "husky install", "test": "yarn fauna-local; yarn fauna-local-alt-port; ./prepare-test-env.sh; jest", diff --git a/src/client.ts b/src/client.ts index 70557d73..b60a140e 100644 --- a/src/client.ts +++ b/src/client.ts @@ -331,10 +331,10 @@ export class Client { * ``` */ // TODO: implement options - stream( + stream( tokenOrQuery: StreamToken | Query, options?: Partial - ): StreamClient { + ): StreamClient { if (this.#isClosed) { throw new ClientClosedError( "Your client is closed. No further requests can be issued." @@ -642,6 +642,8 @@ in an environmental variable named FAUNA_SECRET or pass it to the Client\ "query_timeout_ms", "fetch_keepalive", "http2_max_streams", + "max_backoff", + "max_attempts", ]; required_options.forEach((option) => { if (config[option] === undefined) { @@ -664,13 +666,21 @@ in an environmental variable named FAUNA_SECRET or pass it to the Client\ if (config.query_timeout_ms <= 0) { throw new RangeError(`'query_timeout_ms' must be greater than zero.`); } + + if (config.max_backoff <= 0) { + throw new RangeError(`'max_backoff' must be greater than zero.`); + } + + if (config.max_attempts <= 0) { + throw new RangeError(`'max_attempts' must be greater than zero.`); + } } } /** * A class to listen to Fauna streams. */ -export class StreamClient { +export class StreamClient { /** Whether or not this stream has been closed */ closed = false; /** The stream client options */ @@ -709,6 +719,8 @@ export class StreamClient { } this.#clientConfiguration = clientConfiguration; + + this.#validateConfiguration(); } /** @@ -719,8 +731,8 @@ export class StreamClient { * provided, error will not be handled, and the stream will simply end. */ start( - onEvent: (event: StreamEventData | StreamEventStatus) => void, - onError: (error: Error) => void + onEvent: (event: StreamEventData | StreamEventStatus) => void, + onError?: (error: Error) => void ) { if (typeof onEvent !== "function") { throw new TypeError( @@ -747,7 +759,7 @@ export class StreamClient { } async *[Symbol.asyncIterator](): AsyncGenerator< - StreamEventData | StreamEventStatus + StreamEventData | StreamEventStatus > { if (this.closed) { throw new ClientError("The stream has been closed and cannot be reused."); @@ -807,7 +819,7 @@ export class StreamClient { async *#startStream( start_ts?: number - ): AsyncGenerator { + ): AsyncGenerator | StreamEventStatus> { // Safety: This method must only be called after a stream token has been acquired const streamToken = this.#streamToken as StreamToken; @@ -825,7 +837,7 @@ export class StreamClient { for await (const event of streamAdapter.read) { // stream events are always tagged - const deserializedEvent: StreamEvent = TaggedTypeFormat.decode(event, { + const deserializedEvent: StreamEvent = TaggedTypeFormat.decode(event, { long_type: this.#clientConfiguration.long_type, }); @@ -838,6 +850,11 @@ export class StreamClient { this.#last_ts = deserializedEvent.txn_ts; + // TODO: remove this once all environments have updated the events to use "status" instead of "start" + if ((deserializedEvent.type as any) === "start") { + deserializedEvent.type = "status"; + } + if ( !this.#clientConfiguration.status_events && deserializedEvent.type === "status" @@ -848,6 +865,33 @@ export class StreamClient { yield deserializedEvent; } } + + #validateConfiguration() { + const config = this.#clientConfiguration; + + const required_options: (keyof StreamClientConfiguration)[] = [ + "long_type", + "httpStreamClient", + "max_backoff", + "max_attempts", + "secret", + ]; + required_options.forEach((option) => { + if (config[option] === undefined) { + throw new TypeError( + `ClientConfiguration option '${option}' must be defined.` + ); + } + }); + + if (config.max_backoff <= 0) { + throw new RangeError(`'max_backoff' must be greater than zero.`); + } + + if (config.max_attempts <= 0) { + throw new RangeError(`'max_attempts' must be greater than zero.`); + } + } } // Private types and constants for internal logic. diff --git a/src/http-client/index.ts b/src/http-client/index.ts index 861e20ed..e17f80d2 100644 --- a/src/http-client/index.ts +++ b/src/http-client/index.ts @@ -11,7 +11,9 @@ export * from "./fetch-client"; export * from "./http-client"; export * from "./node-http2-client"; -export const getDefaultHTTPClient = (options: HTTPClientOptions): HTTPClient => +export const getDefaultHTTPClient = ( + options: HTTPClientOptions +): HTTPClient & HTTPStreamClient => nodeHttp2IsSupported() ? NodeHTTP2Client.getClient(options) : new FetchClient(options); diff --git a/src/wire-protocol.ts b/src/wire-protocol.ts index 421c5691..0c8b82ab 100644 --- a/src/wire-protocol.ts +++ b/src/wire-protocol.ts @@ -331,14 +331,14 @@ export type StreamEventStatus = { txn_ts: number; stats: QueryStats; }; -export type StreamEventData = { +export type StreamEventData = { type: "add" | "remove" | "update"; txn_ts: number; stats: QueryStats; - data: QueryValue; + data: T; }; export type StreamEventError = { type: "error" } & QueryFailure; -export type StreamEvent = +export type StreamEvent = | StreamEventStatus - | StreamEventData + | StreamEventData | StreamEventError; From 765b06f3ecbe4ea0ae0fd26b3f990461803e7144 Mon Sep 17 00:00:00 2001 From: Paul Paterson Date: Wed, 20 Mar 2024 15:30:37 -0400 Subject: [PATCH 32/36] commit the new files --- .../stream-client-configuration.test.ts | 71 ++++ __tests__/integration/stream.test.ts | 381 ++++++++++++++++++ docker/feature-flags.json | 27 ++ 3 files changed, 479 insertions(+) create mode 100644 __tests__/functional/stream-client-configuration.test.ts create mode 100644 __tests__/integration/stream.test.ts create mode 100644 docker/feature-flags.json diff --git a/__tests__/functional/stream-client-configuration.test.ts b/__tests__/functional/stream-client-configuration.test.ts new file mode 100644 index 00000000..f9e1851e --- /dev/null +++ b/__tests__/functional/stream-client-configuration.test.ts @@ -0,0 +1,71 @@ +import { + StreamClient, + StreamToken, + getDefaultHTTPClient, + StreamClientConfiguration, +} from "../../src"; +import { getDefaultHTTPClientOptions } from "../client"; + +const defaultHttpClient = getDefaultHTTPClient(getDefaultHTTPClientOptions()); +const defaultConfig: StreamClientConfiguration = { + secret: "secret", + long_type: "number", + max_attempts: 3, + max_backoff: 20, + httpStreamClient: defaultHttpClient, +}; +const dummyStreamToken = new StreamToken("dummy"); + +describe("StreamClientConfiguration", () => { + it("can be instantiated directly with a token", () => { + new StreamClient(dummyStreamToken, defaultConfig); + }); + + it("can be instantiated directly with a lambda", async () => { + new StreamClient(() => Promise.resolve(dummyStreamToken), defaultConfig); + }); + + it.each` + fieldName + ${"long_type"} + ${"httpStreamClient"} + ${"max_backoff"} + ${"max_attempts"} + ${"secret"} + `( + "throws a TypeError if $fieldName provided is undefined", + async ({ fieldName }: { fieldName: keyof StreamClientConfiguration }) => { + expect.assertions(1); + + const config = { ...defaultConfig }; + delete config[fieldName]; + try { + new StreamClient(dummyStreamToken, config); + } catch (e: any) { + expect(e).toBeInstanceOf(TypeError); + } + } + ); + + it("throws a RangeError if 'max_backoff' is less than or equal to zero", async () => { + expect.assertions(1); + + const config = { ...defaultConfig, max_backoff: 0 }; + try { + new StreamClient(dummyStreamToken, config); + } catch (e: any) { + expect(e).toBeInstanceOf(RangeError); + } + }); + + it("throws a RangeError if 'max_attempts' is less than or equal to zero", async () => { + expect.assertions(1); + + const config = { ...defaultConfig, max_attempts: 0 }; + try { + new StreamClient(dummyStreamToken, config); + } catch (e: any) { + expect(e).toBeInstanceOf(RangeError); + } + }); +}); diff --git a/__tests__/integration/stream.test.ts b/__tests__/integration/stream.test.ts new file mode 100644 index 00000000..15367407 --- /dev/null +++ b/__tests__/integration/stream.test.ts @@ -0,0 +1,381 @@ +import { + fql, + getDefaultHTTPClient, + StreamClient, + StreamClientConfiguration, + StreamToken, + Client, + DocumentT, + ServiceError, + TimeStub, + DateStub, + Document, +} from "../../src"; +import { + getClient, + getDefaultHTTPClientOptions, + getDefaultSecretAndEndpoint, +} from "../client"; + +const defaultHttpClient = getDefaultHTTPClient(getDefaultHTTPClientOptions()); +const { secret } = getDefaultSecretAndEndpoint(); +const dummyStreamToken = new StreamToken("dummy"); + +let client: Client; +const STREAM_DB_NAME = "StreamTestDB"; +const STREAM_SECRET = `${secret}:${STREAM_DB_NAME}:admin`; +const defaultStreamConfig: StreamClientConfiguration = { + secret: STREAM_SECRET, + long_type: "number", + max_attempts: 3, + max_backoff: 20, + httpStreamClient: defaultHttpClient, +}; + +type StreamTest = { value: number }; + +beforeAll(async () => { + const rootClient = getClient(); + + // create a child database to use for Streams, since streaming FF will not + // work on the root database + await rootClient.query(fql` + if (Database.byName(${STREAM_DB_NAME}) == null) { + Database.create({ name: ${STREAM_DB_NAME} }) + } + `); + + // scope the client to the child db + client = getClient({ secret: STREAM_SECRET }); + + await client.query(fql` + if (Collection.byName("StreamTest") != null) { + Collection.byName("StreamTest")!.delete() + } + `); + await client.query(fql` + Collection.create({ name: "StreamTest" }) + `); +}); + +afterAll(() => { + if (client) { + client.close(); + } +}); + +describe("Client", () => { + it("can initiate a stream from a Client", async () => { + expect.assertions(1); + + let stream: StreamClient | null = null; + try { + const response = await client.query( + fql`StreamTest.all().toStream()` + ); + const token = response.data; + + stream = client.stream(token, { status_events: true }); + + for await (const event of stream) { + expect(event.type).toEqual("status"); + break; + } + } finally { + stream?.close(); + } + }); + + it("can initiate a stream from a Client, providing a query", async () => { + expect.assertions(1); + + let stream: StreamClient | null = null; + try { + stream = client.stream(fql`StreamTest.all().toStream()`, { + status_events: true, + }); + + for await (const event of stream) { + expect(event.type).toEqual("status"); + break; + } + } finally { + stream?.close(); + } + }); +}); + +describe("StreamClient", () => { + it("can initiate a stream", async () => { + expect.assertions(1); + + let stream: StreamClient | null = null; + try { + const response = await client.query( + fql`StreamTest.all().toStream()` + ); + const token = response.data; + + stream = new StreamClient(token, { + ...defaultStreamConfig, + status_events: true, + }); + + for await (const event of stream) { + expect(event.type).toEqual("status"); + break; + } + } finally { + stream?.close(); + } + }); + + it("can initiate a stream with a lambda", async () => { + expect.assertions(1); + + let stream: StreamClient | null = null; + try { + const getToken = async () => { + const response = await client.query( + fql`StreamTest.all().toStream()` + ); + return response.data; + }; + + stream = new StreamClient(getToken, { + ...defaultStreamConfig, + status_events: true, + }); + + for await (const event of stream) { + expect(event.type).toEqual("status"); + break; + } + } finally { + stream?.close(); + } + }); + + it("can get events with async iterator", async () => { + expect.assertions(2); + + let stream: StreamClient> | null = null; + try { + const response = await client.query( + fql`StreamTest.all().toStream()` + ); + const token = response.data; + + stream = new StreamClient(token, defaultStreamConfig); + + // create some events that will be played back + await client.query(fql`StreamTest.create({ value: 0 })`); + await client.query(fql`StreamTest.create({ value: 1 })`); + + let count = 0; + for await (const event of stream) { + if (event.type == "add") { + if (count === 0) { + expect(event.data.value).toEqual(0); + } else { + expect(event.data.value).toEqual(1); + break; + } + } + count++; + } + } finally { + stream?.close(); + } + }); + + it("can get events with callbacks", async () => { + expect.assertions(2); + + const response = await client.query( + fql`StreamTest.all().toStream()` + ); + const token = response.data; + + const stream = new StreamClient>( + token, + defaultStreamConfig + ); + + // create some events that will be played back + await client.query(fql`StreamTest.create({ value: 0 })`); + await client.query(fql`StreamTest.create({ value: 1 })`); + + let resolve: () => void; + const promise = new Promise((res) => { + resolve = () => res(null); + }); + + let count = 0; + stream.start(function onEvent(event) { + if (event.type == "add") { + if (count === 0) { + expect(event.data.value).toEqual(0); + } else { + expect(event.data.value).toEqual(1); + stream.close(); + resolve(); + } + } + count++; + }); + + await promise; + }); + + it("catches non 200 responses when establishing a stream", async () => { + expect.assertions(1); + + try { + // create a stream with a bad token + const stream = new StreamClient( + new StreamToken("2"), + defaultStreamConfig + ); + + for await (const _ of stream) { + /* do nothing */ + } + } catch (e) { + // TODO: be more specific about the error and split into multiple tests + expect(e).toBeInstanceOf(ServiceError); + } + }); + + it("handles non 200 responses via callback when establishing a stream", async () => { + expect.assertions(1); + + // create a stream with a bad token + const stream = new StreamClient(new StreamToken("2"), defaultStreamConfig); + + let resolve: () => void; + const promise = new Promise((res) => { + resolve = () => res(null); + }); + + stream.start( + function onEvent(_) {}, + function onError(e) { + // TODO: be more specific about the error and split into multiple tests + expect(e).toBeInstanceOf(ServiceError); + resolve(); + } + ); + + await promise; + }); + + it("catches a ServiceError if an error event is received", async () => { + expect.assertions(1); + + let stream: StreamClient> | null = null; + try { + const response = await client.query( + fql`StreamTest.all().map((doc) => abort("oops")).toStream()` + ); + const token = response.data; + + stream = new StreamClient(token, defaultStreamConfig); + + // create some events that will be played back + await client.query(fql`StreamTest.create({ value: 0 })`); + + for await (const _ of stream) { + /* do nothing */ + } + } catch (e) { + // TODO: be more specific about the error and split into multiple tests + expect(e).toBeInstanceOf(ServiceError); + } finally { + stream?.close(); + } + }); + + it("handles a ServiceError via callback if an error event is received", async () => { + expect.assertions(1); + + const response = await client.query( + fql`StreamTest.all().map((doc) => abort("oops")).toStream()` + ); + const token = response.data; + + const stream = new StreamClient>( + token, + defaultStreamConfig + ); + + // create some events that will be played back + await client.query(fql`StreamTest.create({ value: 0 })`); + + let resolve: () => void; + const promise = new Promise((res) => { + resolve = () => res(null); + }); + + stream.start( + function onEvent(_) {}, + function onError(e) { + // TODO: be more specific about the error and split into multiple tests + expect(e).toBeInstanceOf(ServiceError); + resolve(); + } + ); + + await promise; + }); + + it("decodes values from streams correctly", async () => { + expect.assertions(5); + + let stream: StreamClient | null = null; + let stream2: StreamClient | null = null; + try { + const response = await client.query( + fql`StreamTest.all().map((doc) => { + time: Time.now(), + date: Date.today(), + doc: doc, + bigInt: 922337036854775808, + }).toStream()` + ); + const token = response.data; + + stream = new StreamClient(token, defaultStreamConfig); + + // create some events that will be played back + await client.query(fql`StreamTest.create({ value: 0 })`); + + for await (const event of stream) { + if (event.type == "add") { + const data = event.data; + expect(data.time).toBeInstanceOf(TimeStub); + expect(data.date).toBeInstanceOf(DateStub); + expect(data.doc).toBeInstanceOf(Document); + expect(typeof data.bigInt).toBe("number"); + } + break; + } + + stream2 = new StreamClient(token, { + ...defaultStreamConfig, + long_type: "bigint", + }); + + for await (const event of stream2) { + if (event.type == "add") { + const data = event.data; + expect(typeof data.bigInt).toBe("bigint"); + } + break; + } + } finally { + stream?.close(); + stream2?.close(); + } + }); +}); diff --git a/docker/feature-flags.json b/docker/feature-flags.json new file mode 100644 index 00000000..2fc48a0a --- /dev/null +++ b/docker/feature-flags.json @@ -0,0 +1,27 @@ +{ + "version": 1, + "properties": [ + { + "property_name": "cluster_name", + "property_value": "fauna", + "flags": { + "fql2_schema": true, + "fqlx_typecheck_default": true, + "persisted_fields": true, + "changes_by_collection_index": true, + "fql2_streams": true + } + }, + { + "property_name": "account_id", + "property_value": 0, + "flags": { + "fql2_schema": true, + "fqlx_typecheck_default": true, + "persisted_fields": true, + "changes_by_collection_index": true, + "fql2_streams": true + } + } + ] +} From a2d49f3b19f6fa2ebd02ec5baac4337ce607d42e Mon Sep 17 00:00:00 2001 From: Paul Paterson Date: Wed, 20 Mar 2024 15:41:29 -0400 Subject: [PATCH 33/36] apply FF to github actions --- .github/workflows/pr_validate.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/pr_validate.yml b/.github/workflows/pr_validate.yml index a85e9a82..e7016a8b 100644 --- a/.github/workflows/pr_validate.yml +++ b/.github/workflows/pr_validate.yml @@ -11,6 +11,8 @@ jobs: image: fauna/faunadb:latest ports: - 8443:8443 + volumes: + - ${{ github.workspace }}/docker/feature-flags.json:/etc/feature-flag-periodic.d/feature-flags.js alt_core: image: fauna/faunadb:latest ports: From c0f4a80efbc85789c717ddced11ecb26ee13a070 Mon Sep 17 00:00:00 2001 From: Paul Paterson Date: Wed, 20 Mar 2024 15:51:40 -0400 Subject: [PATCH 34/36] try relative path --- .github/workflows/pr_validate.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/pr_validate.yml b/.github/workflows/pr_validate.yml index e7016a8b..2f8c21fc 100644 --- a/.github/workflows/pr_validate.yml +++ b/.github/workflows/pr_validate.yml @@ -12,7 +12,7 @@ jobs: ports: - 8443:8443 volumes: - - ${{ github.workspace }}/docker/feature-flags.json:/etc/feature-flag-periodic.d/feature-flags.js + - ./docker/feature-flags.json:/etc/feature-flag-periodic.d/feature-flags.js alt_core: image: fauna/faunadb:latest ports: From b7bb861351ea2d77af36d865eb3990bdb1ba3885 Mon Sep 17 00:00:00 2001 From: Paul Paterson Date: Wed, 20 Mar 2024 16:48:16 -0400 Subject: [PATCH 35/36] fix: yield all events when multiple received asa single chunk --- __tests__/integration/stream.test.ts | 41 ++++++++++++++++++++++++++++ src/http-client/node-http2-client.ts | 17 ++++++------ 2 files changed, 50 insertions(+), 8 deletions(-) diff --git a/__tests__/integration/stream.test.ts b/__tests__/integration/stream.test.ts index 15367407..6c504137 100644 --- a/__tests__/integration/stream.test.ts +++ b/__tests__/integration/stream.test.ts @@ -378,4 +378,45 @@ describe("StreamClient", () => { stream2?.close(); } }); + + it("yields all events when Fauna sends them as a single chunk", async () => { + // This test has a chance of creating a false positive. Since we are not in + // control of the actual stream, we will use a query that has been shown to + // behave how we expect MOST of the time. + // This test will not create a false negative. + expect.assertions(6); + + let stream: StreamClient> | null = null; + try { + // clear the collection + await client.query(fql`StreamTest.all().forEach(.delete())`); + + const response = await client.query( + fql`StreamTest.all().toStream()` + ); + const token = response.data; + + stream = new StreamClient(token, defaultStreamConfig); + + // create some events that will be played back + await client.query(fql`StreamTest.create({ value: 0 })`); + await client.query(fql`StreamTest.create({ value: 1 })`); + await client.query(fql`StreamTest.create({ value: 2 })`); + // This has a very high probability of creating multiple events sent as a single chunk + await client.query(fql`StreamTest.all().forEach(.delete())`); + + let count = 0; + for await (const event of stream) { + if (event.type != "status") { + expect(event.data).toBeDefined(); + } + count++; + if (count === 6) { + break; + } + } + } finally { + stream?.close(); + } + }); }); diff --git a/src/http-client/node-http2-client.ts b/src/http-client/node-http2-client.ts index 193c2263..b9e38872 100644 --- a/src/http-client/node-http2-client.ts +++ b/src/http-client/node-http2-client.ts @@ -228,11 +228,11 @@ export class NodeHTTP2Client implements HTTPClient, HTTPStreamClient { headers: requestHeaders, method, }: HTTPStreamRequest): StreamAdapter { - let resolveChunk: (chunk: string) => void; + let resolveChunk: (chunk: string[]) => void; let rejectChunk: (reason: any) => void; const setChunkPromise = () => - new Promise((res, rej) => { + new Promise((res, rej) => { resolveChunk = res; rejectChunk = rej; }); @@ -271,10 +271,8 @@ export class NodeHTTP2Client implements HTTPClient, HTTPStreamClient { const chunkLines = (partOfLine + chunk).split("\n"); // Yield all complete lines - for (let i = 0; i < chunkLines.length - 1; i++) { - resolveChunk(chunkLines[i].trim()); - chunkPromise = setChunkPromise(); - } + resolveChunk(chunkLines.map((s) => s.trim()).slice(0, -1)); + chunkPromise = setChunkPromise(); // Store the partial line partOfLine = chunkLines[chunkLines.length - 1]; @@ -282,7 +280,7 @@ export class NodeHTTP2Client implements HTTPClient, HTTPStreamClient { // Once the response is finished, resolve the promise req.on("end", () => { - resolveChunk(partOfLine); + resolveChunk([partOfLine]); }); } }; @@ -313,7 +311,10 @@ export class NodeHTTP2Client implements HTTPClient, HTTPStreamClient { req.end(); while (true) { - yield await chunkPromise; + const chunks = await chunkPromise; + for (const chunk of chunks) { + yield chunk; + } } } From 3888409858efdbbf9c34e8f2eefd2634198cfde7 Mon Sep 17 00:00:00 2001 From: Paul Paterson Date: Thu, 21 Mar 2024 11:17:13 -0400 Subject: [PATCH 36/36] undo github action changes --- .github/workflows/pr_validate.yml | 2 -- 1 file changed, 2 deletions(-) diff --git a/.github/workflows/pr_validate.yml b/.github/workflows/pr_validate.yml index 2f8c21fc..a85e9a82 100644 --- a/.github/workflows/pr_validate.yml +++ b/.github/workflows/pr_validate.yml @@ -11,8 +11,6 @@ jobs: image: fauna/faunadb:latest ports: - 8443:8443 - volumes: - - ./docker/feature-flags.json:/etc/feature-flag-periodic.d/feature-flags.js alt_core: image: fauna/faunadb:latest ports: