Skip to content

Commit

Permalink
Initial Streaming Implementation (#232)
Browse files Browse the repository at this point in the history
  • Loading branch information
ptpaterson authored Mar 21, 2024
1 parent f6ab840 commit a3f74b5
Show file tree
Hide file tree
Showing 22 changed files with 1,480 additions and 54 deletions.
85 changes: 84 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ See the [Fauna Documentation](https://docs.fauna.com/fauna/current/) for additio
<details>
<summary>Table of Contents</summary>

- [A JavaScript driver for Fauna.](#a-javascript-driver-for-fauna)
- [The Official Javascript Driver for Fauna.](#the-official-javascript-driver-for-fauna)
- [Quick-Start](#quick-start)
- [Supported Runtimes](#supported-runtimes)
- [Installation](#installation)
Expand All @@ -23,12 +23,16 @@ See the [Fauna Documentation](https://docs.fauna.com/fauna/current/) for additio
- [Typescript Support](#typescript-support)
- [Query Options](#query-options)
- [Client Configuration](#client-configuration)
- [Retry](#retry)
- [Max Attempts](#max-attempts)
- [Max Backoff](#max-backoff)
- [Timeouts](#timeouts)
- [Query Timeout](#query-timeout)
- [Client Timeout](#client-timeout)
- [HTTP/2 Session Idle Timeout](#http2-session-idle-timeout)
- [Using environment variables](#using-environment-variables)
- [Query Statistics](#query-statistics)
- [Streaming](#streaming)
- [Contributing](#contributing)
- [Setting up this Repo](#setting-up-this-repo)
- [Running tests](#running-tests)
Expand Down Expand Up @@ -402,6 +406,85 @@ try {
*/
````

## Streaming

Obtain a stream token using a regular query with either the `toStream()` or `changesOn()` FQL methods on a Set.

```javascript
import { Client, fql } from "fauna"
const client = new Client({ secret: FAUNA_SECRET })

const response = await client.query(fql`
let set = MyCollection.all()
{
initialPage: set.pageSize(10),
streamToken: set.toStream()
}
`);
const { initialPage, streamToken } = response.data;

const stream = client.stream(streamToken)
```

The driver will take care of the initial request to convert to a stream if you provide a Query

```javascript
import { Client, fql } from "fauna"
const client = new Client({ secret: FAUNA_SECRET })

const stream = await client.stream(fql`MyCollection.all().changesOn(.field1, .field2)`)
```

There are two Two ways to initiate the stream:
1. Async Iterator
2. Callbacks

_Async Iterator example_
```javascript
try {
for await (const event of stream) {
switch (event.type) {
case "update":
case "add":
case "remove":
console.log("Stream event:", event);
// ...
break;
}
}
} catch (error) {
// An error will be handled here if Fauna returns a terminal, "error" event, or
// if Fauna returns a non-200 response when trying to connect, or
// if the max number of retries on network errors is reached.

// ... handle fatal error
}
```

_Callbacks example_
```javascript
stream.start(
function onEvent(event) {
switch (event.type) {
case "update":
case "add":
case "remove":
console.log("Stream event:", event);
// ...
break;
}
},
function onFatalError(error) {
// An error will be handled here if Fauna returns a terminal, "error" event, or
// if Fauna returns a non-200 response when trying to connect, or
// if the max number of retries on network errors is reached.

// ... handle fatal error
}
);
```

# Contributing

Any contributions are from the community are greatly appreciated!
Expand Down
71 changes: 71 additions & 0 deletions __tests__/functional/stream-client-configuration.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import {
StreamClient,
StreamToken,
getDefaultHTTPClient,
StreamClientConfiguration,
} from "../../src";
import { getDefaultHTTPClientOptions } from "../client";

const defaultHttpClient = getDefaultHTTPClient(getDefaultHTTPClientOptions());
const defaultConfig: StreamClientConfiguration = {
secret: "secret",
long_type: "number",
max_attempts: 3,
max_backoff: 20,
httpStreamClient: defaultHttpClient,
};
const dummyStreamToken = new StreamToken("dummy");

describe("StreamClientConfiguration", () => {
it("can be instantiated directly with a token", () => {
new StreamClient(dummyStreamToken, defaultConfig);
});

it("can be instantiated directly with a lambda", async () => {
new StreamClient(() => Promise.resolve(dummyStreamToken), defaultConfig);
});

it.each`
fieldName
${"long_type"}
${"httpStreamClient"}
${"max_backoff"}
${"max_attempts"}
${"secret"}
`(
"throws a TypeError if $fieldName provided is undefined",
async ({ fieldName }: { fieldName: keyof StreamClientConfiguration }) => {
expect.assertions(1);

const config = { ...defaultConfig };
delete config[fieldName];
try {
new StreamClient(dummyStreamToken, config);
} catch (e: any) {
expect(e).toBeInstanceOf(TypeError);
}
}
);

it("throws a RangeError if 'max_backoff' is less than or equal to zero", async () => {
expect.assertions(1);

const config = { ...defaultConfig, max_backoff: 0 };
try {
new StreamClient(dummyStreamToken, config);
} catch (e: any) {
expect(e).toBeInstanceOf(RangeError);
}
});

it("throws a RangeError if 'max_attempts' is less than or equal to zero", async () => {
expect.assertions(1);

const config = { ...defaultConfig, max_attempts: 0 };
try {
new StreamClient(dummyStreamToken, config);
} catch (e: any) {
expect(e).toBeInstanceOf(RangeError);
}
});
});
4 changes: 2 additions & 2 deletions __tests__/integration/set.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ describe("SetIterator", () => {
beforeAll(async () => {
await client.query(fql`
if (Collection.byName("IterTestSmall") != null) {
IterTestSmall.definition.delete()
Collection.byName("IterTestSmall")!.delete()
}
if (Collection.byName("IterTestBig") != null) {
IterTestBig.definition.delete()
Collection.byName("IterTestBig")!.delete()
}
`);
await client.query(fql`
Expand Down
Loading

0 comments on commit a3f74b5

Please sign in to comment.