Skip to content

soundxyz/redis-pubsub

Repository files navigation

redis-pubsub

Full type-safe Redis PubSub system with async iterators

Features

Install

pnpm add @soundxyz/redis-pubsub
npm install @soundxyz/redis-pubsub
yarn add @soundxyz/redis-pubsub

Peer dependencies

pnpm add zod ioredis
npm install zod ioredis
yarn add zod ioredis

Usage

Create a Redis PubSub instance:

import Redis from "ioredis";
import { z } from "zod";

import { RedisPubSub } from "@soundxyz/redis-pubsub";

const { createChannel } = RedisPubSub({
  publisher: new Redis({
    port: 6379,
  }),
  subscriber: new Redis({
    port: 6379,
  }),
});

Create a channel with any Zod schema and a unique "name" to be used as main trigger.

const schema = z.object({
  id: z.string(),
  name: z.string(),
});

const userChannel = createChannel({
  name: "User",
  schema,
});

const nonLazyUserChannel = createChannel({
  name: "User",
  schema,
  // By default the channels are lazily connected with redis
  isLazy: false,
});

Subscribe and publish to the channel

// Using async iterators / async generators to subscribe
(async () => {
  for await (const user of userChannel.subscribe()) {
    console.log("User", {
      id: user.id,
      name: user.name,
    });
  }
})();

// You can explicitly wait until the channel is sucessfully connected with Redis
await userChannel.isReady();

// Publish data into the channel
await userChannel.publish(
  {
    value: {
      id: "1",
      name: "John",
    },
  },
  // You can also publish more than a single value
  {
    value: {
      id: "2",
      name: "Peter",
    },
  }
);

Filter based on the data

(async () => {
  for await (const user of userChannel.subscribe({
    filter(value) {
      return value.id === "1";
    },
  })) {
    console.log("User 1", {
      id: user.id,
      name: user.name,
    });
  }
})();

// You can also use type predicates / type guards
(async () => {
  for await (const user of userChannel.subscribe({
    filter(value): value is { id: "1"; name: string } {
      return value.id === "1";
    },
  })) {
    // typeof user.id == "1"
    console.log("User 1", {
      id: user.id,
      name: user.name,
    });
  }
})();

Use custom identifiers

It will create a separate redis channel for every identifier, concatenating "name" and "identifier", for example, with "name"="User" and "identifier" = 1, the channel trigger name will be "User1"

(async () => {
  for await (const user of userChannel.subscribe({
    // number or string
    identifier: 1,
  })) {
    console.log("User with identifier=1", {
      id: user.id,
      name: user.name,
    });
  }
})();

await userChannel.isReady({
  // number or string
  identifier: 1,
});

await userChannel.publish({
  value: {
    id: "1",
    name: "John",
  },
  identifier: 1,
});

Separate input from output

You can levarage Zod Transforms to be able to separate input types from the output types, and receive any custom class or output on your subscriptions.

class CustomClass {
  constructor(public name: string) {}
}

const inputSchema = z.string();
const outputSchema = z.string().transform((input) => new CustomClass(input));

const channel = pubSub.createChannel({
  name: "separate-type",
  inputSchema,
  outputSchema,
});

const subscription = (async () => {
  for await (const data of channel.subscribe()) {
    return data;
  }
})();

await channel.isReady();

await channel.publish({
  value: "test",
});

const result = await subscription;

// true
console.log(result instanceof CustomClass);

// true
console.log(result.name === "test");

Use AbortController / AbortSignal

If isLazy is not disabled, the last subscription to a channel will be automatically unsubscribed from Redis.

const abortController = new AbortController();
const abortedSubscription = (() => {
  for await (const data of userChannel.subscribe({
    abortSignal: abortController.signal,
  })) {
    console.log({ data });
  }
})();

// ...

firstSubscribeAbortController.abort();

await abortedSubscription;

Unsubscribe specific identifiers

await userChannel.unsubscribe(
  {
    identifier: 1,
  },
  // You can specify more than a single identifer at once
  {
    identifier: 2,
  }
);

Unsubscribe an entire channel

await userChannel.unsubscribeAll();

Close the PubSub instance

const pubSub = RedisPubSub({
  publisher: new Redis({
    port: 6379,
  }),
  subscriber: new Redis({
    port: 6379,
  }),
});

// ...
await pubSub.close();