Custom Adapter
Implement the RedisAdapter interface for any Redis client.
If you need a different Redis client or want to wrap an existing one, implement the RedisAdapter interface:
type RedisAdapter = {
xadd(
channel: string,
payload: Record<string, unknown>,
options?: {
maxLen?: number;
expireAfterSecs?: number;
},
): Promise<string>;
xrange(
channel: string,
args?: {
start?: string;
end?: string;
count?: number;
},
): Promise<Array<{ id: string; payload: Record<string, unknown> }>>;
xread(args: {
channels: string[];
cursors: string[];
blockMs?: number;
count?: number;
signal?: AbortSignal;
}): Promise<
Array<{
channel: string;
messages: Array<{ id: string; payload: Record<string, unknown> }>;
}>
>;
getLatestCursor(channel: string): Promise<string | null>;
};Method descriptions
| Method | Description |
|---|---|
xadd | Append a message to a stream. maxLen trims the stream. expireAfterSecs sets a TTL on the key. |
xrange | Read a range of messages from a stream by ID. Used for history fetching. |
xread | Block-read from one or more streams simultaneously. Used for the live SSE subscription. signal cancels the read when the client disconnects. |
getLatestCursor | Return the ID of the most recent entry in a stream, or null if empty. Used to initialise new subscriber cursors. |
Example: node-redis
import { createClient } from "redis"; // node-redis
import type { RedisAdapter } from "forrealtime";
function createNodeRedisAdapter(client: ReturnType<typeof createClient>): RedisAdapter {
return {
async xadd(channel, payload, options) {
const id = await client.xAdd(
channel,
"*",
Object.fromEntries(
Object.entries(payload).map(([k, v]) => [k, JSON.stringify(v)])
),
options?.maxLen ? { TRIM: { strategy: "MAXLEN", threshold: options.maxLen } } : undefined,
);
return id;
},
async xrange(channel, args) {
const entries = await client.xRange(
channel,
args?.start ?? "-",
args?.end ?? "+",
args?.count ? { COUNT: args.count } : undefined,
);
return entries.map((e) => ({
id: e.id,
payload: Object.fromEntries(
Object.entries(e.message).map(([k, v]) => [k, JSON.parse(v as string)])
),
}));
},
async xread({ channels, cursors, blockMs, count, signal }) {
// implementation omitted for brevity
return [];
},
async getLatestCursor(channel) {
const entries = await client.xRevRange(channel, "+", "-", { COUNT: 1 });
return entries[0]?.id ?? null;
},
};
}