Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial Streaming Implementation #232

Merged
merged 36 commits into from
Mar 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
b382941
Setup pipeline for beta deploy (#236)
pnwpedro Mar 21, 2024
e8f042a
Initial implementation with fetch
ptpaterson Feb 8, 2024
f648682
remove old commented code
ptpaterson Feb 8, 2024
d7b0b72
Update stream request format
ptpaterson Feb 22, 2024
08101c1
Guard against event types with no registered callbacks
ptpaterson Feb 22, 2024
bd1f3f8
Accept a Query as an argument for Client.stream
ptpaterson Feb 22, 2024
6968226
Convert StreamClient to be an AsyncIterator itself
ptpaterson Feb 22, 2024
775a6c7
Require user to return token when Query provided
ptpaterson Feb 23, 2024
ed3c88c
Remove commented import
ptpaterson Feb 23, 2024
a3aaf93
remove unused import
ptpaterson Feb 23, 2024
4a0a319
Provide custom abort message on stream.close()
ptpaterson Feb 23, 2024
4e4484d
Refactor so Client.stream is sync again
ptpaterson Feb 23, 2024
c59ea03
Add a StreamError class
ptpaterson Feb 28, 2024
bcc307c
Refactor StreamClient and provide retries
ptpaterson Feb 28, 2024
0273292
pipe network errors into events
ptpaterson Mar 5, 2024
98d240a
validate onFatalError callback
ptpaterson Mar 6, 2024
7b6e3e7
do not emit type=error events
ptpaterson Mar 6, 2024
398c1f4
Provide exactly one handler for events and another for fatal errors (…
ptpaterson Mar 8, 2024
90c64f1
Never yeild start or error events
ptpaterson Mar 8, 2024
cb98659
client.query already implements appropriate retries
ptpaterson Mar 8, 2024
44edf28
Implement core update for error events to implement QueryFailure
ptpaterson Mar 12, 2024
e2352db
catch and throw non 200s
ptpaterson Mar 12, 2024
6373f67
Add README example
ptpaterson Mar 12, 2024
23bd3f6
update event format
ptpaterson Mar 14, 2024
303b981
Add JS docs
ptpaterson Mar 14, 2024
2cd737f
unused imports
ptpaterson Mar 14, 2024
11491f9
Initial implementation for streams with http2 module (#235)
ptpaterson Mar 18, 2024
f18023c
expose retry and status_events options
ptpaterson Mar 19, 2024
6e9e638
Add httpStreamClient to options, rather than separate arg
ptpaterson Mar 19, 2024
9cfc972
refactor StreamClient constructor so we can pass in a token directly
ptpaterson Mar 19, 2024
cd5ad15
Add some integration tests
ptpaterson Mar 20, 2024
765b06f
commit the new files
ptpaterson Mar 20, 2024
a2d49f3
apply FF to github actions
ptpaterson Mar 20, 2024
c0f4a80
try relative path
ptpaterson Mar 20, 2024
b7bb861
fix: yield all events when multiple received asa single chunk
ptpaterson Mar 20, 2024
3888409
undo github action changes
ptpaterson Mar 21, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 84 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ See the [Fauna Documentation](https://docs.fauna.com/fauna/current/) for additio
<details>
<summary>Table of Contents</summary>

- [A JavaScript driver for Fauna.](#a-javascript-driver-for-fauna)
- [The Official Javascript Driver for Fauna.](#the-official-javascript-driver-for-fauna)
- [Quick-Start](#quick-start)
- [Supported Runtimes](#supported-runtimes)
- [Installation](#installation)
Expand All @@ -23,12 +23,16 @@ See the [Fauna Documentation](https://docs.fauna.com/fauna/current/) for additio
- [Typescript Support](#typescript-support)
- [Query Options](#query-options)
- [Client Configuration](#client-configuration)
- [Retry](#retry)
- [Max Attempts](#max-attempts)
- [Max Backoff](#max-backoff)
- [Timeouts](#timeouts)
- [Query Timeout](#query-timeout)
- [Client Timeout](#client-timeout)
- [HTTP/2 Session Idle Timeout](#http2-session-idle-timeout)
- [Using environment variables](#using-environment-variables)
- [Query Statistics](#query-statistics)
- [Streaming](#streaming)
- [Contributing](#contributing)
- [Setting up this Repo](#setting-up-this-repo)
- [Running tests](#running-tests)
Expand Down Expand Up @@ -402,6 +406,85 @@ try {
*/
````

## Streaming

Obtain a stream token using a regular query with either the `toStream()` or `changesOn()` FQL methods on a Set.

```javascript
import { Client, fql } from "fauna"
const client = new Client({ secret: FAUNA_SECRET })

const response = await client.query(fql`
let set = MyCollection.all()
{
initialPage: set.pageSize(10),
streamToken: set.toStream()
}
`);
const { initialPage, streamToken } = response.data;

const stream = client.stream(streamToken)
```

The driver will take care of the initial request to convert to a stream if you provide a Query

```javascript
import { Client, fql } from "fauna"
const client = new Client({ secret: FAUNA_SECRET })

const stream = await client.stream(fql`MyCollection.all().changesOn(.field1, .field2)`)
```

There are two Two ways to initiate the stream:
1. Async Iterator
2. Callbacks

_Async Iterator example_
```javascript
try {
for await (const event of stream) {
switch (event.type) {
case "update":
case "add":
case "remove":
console.log("Stream event:", event);
// ...
break;
}
}
} catch (error) {
// An error will be handled here if Fauna returns a terminal, "error" event, or
// if Fauna returns a non-200 response when trying to connect, or
// if the max number of retries on network errors is reached.

// ... handle fatal error
}
```

_Callbacks example_
```javascript
stream.start(
function onEvent(event) {
switch (event.type) {
case "update":
case "add":
case "remove":
console.log("Stream event:", event);
// ...
break;
}
},
function onFatalError(error) {
// An error will be handled here if Fauna returns a terminal, "error" event, or
// if Fauna returns a non-200 response when trying to connect, or
// if the max number of retries on network errors is reached.

// ... handle fatal error
}
);
```

# Contributing

Any contributions are from the community are greatly appreciated!
Expand Down
71 changes: 71 additions & 0 deletions __tests__/functional/stream-client-configuration.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import {
StreamClient,
StreamToken,
getDefaultHTTPClient,
StreamClientConfiguration,
} from "../../src";
import { getDefaultHTTPClientOptions } from "../client";

const defaultHttpClient = getDefaultHTTPClient(getDefaultHTTPClientOptions());
const defaultConfig: StreamClientConfiguration = {
secret: "secret",
long_type: "number",
max_attempts: 3,
max_backoff: 20,
httpStreamClient: defaultHttpClient,
};
const dummyStreamToken = new StreamToken("dummy");

describe("StreamClientConfiguration", () => {
it("can be instantiated directly with a token", () => {
new StreamClient(dummyStreamToken, defaultConfig);
});

it("can be instantiated directly with a lambda", async () => {
new StreamClient(() => Promise.resolve(dummyStreamToken), defaultConfig);
});

it.each`
fieldName
${"long_type"}
${"httpStreamClient"}
${"max_backoff"}
${"max_attempts"}
${"secret"}
`(
"throws a TypeError if $fieldName provided is undefined",
async ({ fieldName }: { fieldName: keyof StreamClientConfiguration }) => {
expect.assertions(1);

const config = { ...defaultConfig };
delete config[fieldName];
try {
new StreamClient(dummyStreamToken, config);
} catch (e: any) {
expect(e).toBeInstanceOf(TypeError);
}
}
);

it("throws a RangeError if 'max_backoff' is less than or equal to zero", async () => {
expect.assertions(1);

const config = { ...defaultConfig, max_backoff: 0 };
try {
new StreamClient(dummyStreamToken, config);
} catch (e: any) {
expect(e).toBeInstanceOf(RangeError);
}
});

it("throws a RangeError if 'max_attempts' is less than or equal to zero", async () => {
expect.assertions(1);

const config = { ...defaultConfig, max_attempts: 0 };
try {
new StreamClient(dummyStreamToken, config);
} catch (e: any) {
expect(e).toBeInstanceOf(RangeError);
}
});
});
4 changes: 2 additions & 2 deletions __tests__/integration/set.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ describe("SetIterator", () => {
beforeAll(async () => {
await client.query(fql`
if (Collection.byName("IterTestSmall") != null) {
IterTestSmall.definition.delete()
Collection.byName("IterTestSmall")!.delete()
}
if (Collection.byName("IterTestBig") != null) {
IterTestBig.definition.delete()
Collection.byName("IterTestBig")!.delete()
}
`);
await client.query(fql`
Expand Down
Loading
Loading