forrealtime

Custom Adapter

Implement the RedisAdapter interface for any Redis client.

If you need a different Redis client or want to wrap an existing one, implement the RedisAdapter interface:

type RedisAdapter = {
  xadd(
    channel: string,
    payload: Record<string, unknown>,
    options?: {
      maxLen?: number;
      expireAfterSecs?: number;
    },
  ): Promise<string>;

  xrange(
    channel: string,
    args?: {
      start?: string;
      end?: string;
      count?: number;
    },
  ): Promise<Array<{ id: string; payload: Record<string, unknown> }>>;

  xread(args: {
    channels: string[];
    cursors: string[];
    blockMs?: number;
    count?: number;
    signal?: AbortSignal;
  }): Promise<
    Array<{
      channel: string;
      messages: Array<{ id: string; payload: Record<string, unknown> }>;
    }>
  >;

  getLatestCursor(channel: string): Promise<string | null>;
};

Method descriptions

MethodDescription
xaddAppend a message to a stream. maxLen trims the stream. expireAfterSecs sets a TTL on the key.
xrangeRead a range of messages from a stream by ID. Used for history fetching.
xreadBlock-read from one or more streams simultaneously. Used for the live SSE subscription. signal cancels the read when the client disconnects.
getLatestCursorReturn the ID of the most recent entry in a stream, or null if empty. Used to initialise new subscriber cursors.

Example: node-redis

import { createClient } from "redis"; // node-redis
import type { RedisAdapter } from "forrealtime";

function createNodeRedisAdapter(client: ReturnType<typeof createClient>): RedisAdapter {
  return {
    async xadd(channel, payload, options) {
      const id = await client.xAdd(
        channel,
        "*",
        Object.fromEntries(
          Object.entries(payload).map(([k, v]) => [k, JSON.stringify(v)])
        ),
        options?.maxLen ? { TRIM: { strategy: "MAXLEN", threshold: options.maxLen } } : undefined,
      );
      return id;
    },

    async xrange(channel, args) {
      const entries = await client.xRange(
        channel,
        args?.start ?? "-",
        args?.end ?? "+",
        args?.count ? { COUNT: args.count } : undefined,
      );
      return entries.map((e) => ({
        id: e.id,
        payload: Object.fromEntries(
          Object.entries(e.message).map(([k, v]) => [k, JSON.parse(v as string)])
        ),
      }));
    },

    async xread({ channels, cursors, blockMs, count, signal }) {
      // implementation omitted for brevity
      return [];
    },

    async getLatestCursor(channel) {
      const entries = await client.xRevRange(channel, "+", "-", { COUNT: 1 });
      return entries[0]?.id ?? null;
    },
  };
}

On this page