forrealtime
Plugins

postgresSync

Stream PostgreSQL table changes to realtime clients via LISTEN/NOTIFY.

postgresSync watches one or more PostgreSQL tables and emits a realtime event whenever a row is inserted, updated, or deleted. Clients subscribe to those events just like any other realtime event.

Installation

No extra packages needed — it uses Bun.sql which is built into Bun.

import { postgresSync } from "forrealtime/plugins/postgres-sync";

Basic usage

import { Realtime } from "forrealtime";
import { createBunAdapter } from "forrealtime/adapters/bun";
import { postgresSync } from "forrealtime/plugins/postgres-sync";

const realtime = new Realtime({
  redis: createBunAdapter(Bun.redis),
  plugins: [
    postgresSync({
      sql: Bun.sql,
      tables: ["users", "posts"],
    }),
  ],
});

// Start listening after the server is ready
await realtime.pg.start();

Call realtime.pg.stop() to unsubscribe and tear down the LISTEN connections.

How it works

  1. On start(), the plugin installs an AFTER INSERT OR UPDATE OR DELETE trigger on each table using CREATE OR REPLACE (requires PostgreSQL 14+).
  2. The trigger calls pg_notify with a JSON payload containing the table name, operation, and the affected row.
  3. The plugin subscribes via Bun.sql.listen() and — for each notification — emits a realtime event.

Events are emitted on a channel named after the table, with the event name {table}.insert, {table}.update, or {table}.delete:

TableOperationChannelEvent
usersINSERTusersusers.insert
usersUPDATEusersusers.update
usersDELETEusersusers.delete

Subscribing on the client

const { status } = useRealtime({
  channels: ["users"],
  events: ["users.insert", "users.update", "users.delete"],
  onData(payload) {
    console.log(payload.event, payload.data); // e.g. "users.insert", { id: 1, name: "..." }
  },
});

Options

OptionTypeDefaultDescription
sqltypeof Bun.sqlBun SQL connection
tablesArray<string | TableConfig>Tables to watch
pgSchemastring"public"PostgreSQL schema
realtimeChannelstringtable nameDefault realtime channel for all tables

Per-table config

Each entry in tables can be a plain string or a TableConfig object for per-table control:

postgresSync({
  sql: Bun.sql,
  tables: [
    "posts", // plain string — no filter

    {
      name: "users",
      // Only emit events for active users
      filter: (row, op) => row.active === true,
    },

    {
      name: "messages",
      // Skip deletes
      filter: (row, op) => op !== "DELETE",
      // Emit on a different realtime channel
      realtimeChannel: "feed",
    },
  ],
});

TableConfig

PropertyTypeDescription
namestringTable name
filter(row, op) => booleanDrop the event when this returns false. row is NEW for INSERT/UPDATE, OLD for DELETE. op is "INSERT", "UPDATE", or "DELETE".
realtimeChannelstringOverride the realtime channel for this table

Filter examples

// Only sync rows where a column matches
filter: (row) => row.status === "published"

// Skip soft-deletes (emit the update, not a delete event)
filter: (row, op) => op !== "DELETE"

// Only emit inserts
filter: (row, op) => op === "INSERT"

// Combine conditions
filter: (row, op) => op !== "DELETE" && row.tenant_id === "acme"

pg API

MethodDescription
pg.start()Install triggers and start listening. Safe to call once — warns if already started.
pg.stop()Unlisten from all tables.

On this page