Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor object and key-value store watch methods #77

Merged
merged 1 commit into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions TODO.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# TODO

## BUGS

- ObjectStore should report the number of entries in it.
4 changes: 3 additions & 1 deletion kv/src/kv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -858,10 +858,12 @@ export class Bucket implements KV, KvRemove {
let isUpdate = content === KvWatchInclude.UpdatesOnly || count === 0;

qi._data = oc;
let i = 0;
const iter = await oc.consume({
callback: (m) => {
if (!isUpdate) {
isUpdate = qi.received >= count;
i++;
isUpdate = i >= count;
}
const e = this.jmToWatchEntry(m, isUpdate);
if (ignoreDeletes && e.operation === "DEL") {
Expand Down
31 changes: 31 additions & 0 deletions kv/tests/kv_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2161,3 +2161,34 @@ Deno.test("kv - sourced", async () => {

await cleanup(ns, nc);
});

Deno.test("kv - watch isUpdate", async () => {
const { ns, nc } = await _setup(
connect,
jetstreamServerConf({}),
);
if (await notCompatible(ns, nc, "2.6.3")) {
return;
}

const js = jetstream(nc);
const kvm = await new Kvm(js);
const kv = await kvm.create("A");
await kv.put("a", "hello");
await kv.delete("a");

const iter = await kv.watch({ ignoreDeletes: true });
const done = (async () => {
for await (const e of iter) {
if (e.key === "b") {
assertEquals(e.isUpdate, true);
break;
}
}
})();
await kv.put("b", "hello");

await done;

await cleanup(ns, nc);
});
10 changes: 10 additions & 0 deletions migration.md
Original file line number Diff line number Diff line change
Expand Up @@ -168,3 +168,13 @@ const service = await svc.add({

// other manipulation as per service api...
```

### Watch

Object.watch() now returns an `ObjectWatchInfo` which is an `ObjectInfo` but adding the property
`isUpdate` this property is now true when the watch is notifying of a new entry. Note that previously
the iterator would yield `ObjectInfo | null`, the `null` signal has been removed. This means that
when doing a watch on an empty ObjectStore you won't get an update notification until an actual value
arrives.


76 changes: 41 additions & 35 deletions obj/src/objectstore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@
* limitations under the License.
*/

import type {
MsgHdrs,
NatsConnection,
NatsError,
QueuedIterator,
} from "@nats-io/nats-core/internal";
import {
Base64UrlPaddedCodec,
DataBuffer,
Expand All @@ -26,23 +32,6 @@ import {
SHA256,
} from "@nats-io/nats-core/internal";

import type {
MsgHdrs,
NatsConnection,
NatsError,
QueuedIterator,
} from "@nats-io/nats-core/internal";

import {
DeliverPolicy,
DiscardPolicy,
JsHeaders,
ListerImpl,
PubHeaders,
StoreCompression,
toJetStreamClient,
} from "@nats-io/jetstream/internal";

import type {
ConsumerConfig,
JetStreamClient,
Expand All @@ -59,6 +48,15 @@ import type {
StreamInfoRequestOptions,
StreamListResponse,
} from "@nats-io/jetstream/internal";
import {
DeliverPolicy,
DiscardPolicy,
JsHeaders,
ListerImpl,
PubHeaders,
StoreCompression,
toJetStreamClient,
} from "@nats-io/jetstream/internal";

import type {
ObjectInfo,
Expand All @@ -69,6 +67,7 @@ import type {
ObjectStoreOptions,
ObjectStorePutOpts,
ObjectStoreStatus,
ObjectWatchInfo,
} from "./types.ts";

export const osPrefix = "OBJ_";
Expand Down Expand Up @@ -340,13 +339,12 @@ export class ObjectStoreImpl implements ObjectStore {
const iter = await this.watch({
ignoreDeletes: true,
includeHistory: true,
//@ts-ignore: hidden
historyOnly: true,
});

// historyOnly will stop the iterator
for await (const info of iter) {
// watch will give a null when it has initialized
// for us that is the hint we are done
if (info === null) {
break;
}
buf.push(info);
}
return Promise.resolve(buf);
Expand Down Expand Up @@ -803,19 +801,17 @@ export class ObjectStoreImpl implements ObjectStore {
ignoreDeletes?: boolean;
includeHistory?: boolean;
}
> = {}): Promise<QueuedIterator<ObjectInfo | null>> {
> = {}): Promise<QueuedIterator<ObjectWatchInfo>> {
opts.includeHistory = opts.includeHistory ?? false;
opts.ignoreDeletes = opts.ignoreDeletes ?? false;
let initialized = false;
const qi = new QueuedIteratorImpl<ObjectInfo | null>();
// @ts-ignore: not exposed
const historyOnly = opts.historyOnly ?? false;
const qi = new QueuedIteratorImpl<ObjectWatchInfo>();
const subj = this._metaSubjectAll();
try {
await this.jsm.streams.getMessage(this.stream, { last_by_subj: subj });
} catch (err) {
if ((err as NatsError).code === "404") {
qi.push(null);
initialized = true;
} else {
if ((err as NatsError).code !== "404") {
qi.stop(err as Error);
}
}
Expand All @@ -827,28 +823,38 @@ export class ObjectStoreImpl implements ObjectStore {
} else {
// FIXME: Go's implementation doesn't seem correct - if history is not desired
// the watch should only be giving notifications on new entries
initialized = true;
cc.deliver_policy = DeliverPolicy.New;
}

const oc = await this.js.consumers.getPushConsumer(this.stream, cc);
const info = await oc.info(true);
const count = info.num_pending;
let isUpdate = cc.deliver_policy === DeliverPolicy.New || count === 0;
qi._data = oc;

let i = 0;
const iter = await oc.consume({
callback: (jm: JsMsg) => {
const oi = jm.json<ObjectInfo>();
if (!isUpdate) {
i++;
isUpdate = i >= count;
}
const oi = jm.json<ObjectWatchInfo>();
oi.isUpdate = isUpdate;
if (oi.deleted && opts.ignoreDeletes === true) {
// do nothing
} else {
qi.push(oi);
}
if (jm.info?.pending === 0 && !initialized) {
initialized = true;
qi.push(null);
if (historyOnly && i === count) {
iter.stop();
}
},
});

if (historyOnly && count === 0) {
iter.stop();
}

iter.closed().then(() => {
qi.push(() => {
qi.stop();
Expand Down
6 changes: 5 additions & 1 deletion obj/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ export type ObjectStoreMeta = {
metadata?: Record<string, string>;
};

export interface ObjectWatchInfo extends ObjectInfo {
isUpdate: boolean;
}

export interface ObjectInfo extends ObjectStoreMeta {
/**
* The name of the bucket where the object is stored.
Expand Down Expand Up @@ -316,7 +320,7 @@ export interface ObjectStore {
includeHistory?: boolean;
}
>,
): Promise<QueuedIterator<ObjectInfo | null>>;
): Promise<QueuedIterator<ObjectWatchInfo>>;

/**
* Seals the object store preventing any further modifications.
Expand Down
42 changes: 42 additions & 0 deletions obj/tests/objectstore_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,48 @@ Deno.test("objectstore - list", async () => {
await cleanup(ns, nc);
});

Deno.test("objectstore - list no updates", async () => {
const { ns, nc } = await _setup(connect, jetstreamServerConf({}));
if (await notCompatible(ns, nc, "2.6.3")) {
return;
}
const objm = new Objm(nc);
const os = await objm.create("test");

let infos = await os.list();
assertEquals(infos.length, 0);

await os.put({ name: "a" }, readableStreamFrom(new Uint8Array(0)));
infos = await os.list();
assertEquals(infos.length, 1);

await cleanup(ns, nc);
});

Deno.test("objectstore - watch isUpdate", async () => {
const { ns, nc } = await _setup(connect, jetstreamServerConf({}));
if (await notCompatible(ns, nc, "2.6.3")) {
return;
}
const objm = new Objm(nc);
const os = await objm.create("test");
await os.put({ name: "a" }, readableStreamFrom(new Uint8Array(0)));

const watches = await os.watch();
await os.put({ name: "b" }, readableStreamFrom(new Uint8Array(0)));

for await (const e of watches) {
if (e.name === "b") {
assertEquals(e.isUpdate, true);
break;
} else {
assertEquals(e.isUpdate, false);
}
}

await cleanup(ns, nc);
});

Deno.test("objectstore - watch initially empty", async () => {
const { ns, nc } = await _setup(connect, jetstreamServerConf({}));
if (await notCompatible(ns, nc, "2.6.3")) {
Expand Down
Loading