Plugins
postgresSync
Stream PostgreSQL table changes to realtime clients via LISTEN/NOTIFY.
postgresSync watches one or more PostgreSQL tables and emits a realtime event whenever a row is inserted, updated, or deleted. Clients subscribe to those events just like any other realtime event.
Installation
No extra packages needed — it uses Bun.sql which is built into Bun.
import { postgresSync } from "forrealtime/plugins/postgres-sync";Basic usage
import { Realtime } from "forrealtime";
import { createBunAdapter } from "forrealtime/adapters/bun";
import { postgresSync } from "forrealtime/plugins/postgres-sync";
const realtime = new Realtime({
redis: createBunAdapter(Bun.redis),
plugins: [
postgresSync({
sql: Bun.sql,
tables: ["users", "posts"],
}),
],
});
// Start listening after the server is ready
await realtime.pg.start();Call realtime.pg.stop() to unsubscribe and tear down the LISTEN connections.
How it works
- On
start(), the plugin installs anAFTER INSERT OR UPDATE OR DELETEtrigger on each table usingCREATE OR REPLACE(requires PostgreSQL 14+). - The trigger calls
pg_notifywith a JSON payload containing the table name, operation, and the affected row. - The plugin subscribes via
Bun.sql.listen()and — for each notification — emits a realtime event.
Events are emitted on a channel named after the table, with the event name {table}.insert, {table}.update, or {table}.delete:
| Table | Operation | Channel | Event |
|---|---|---|---|
users | INSERT | users | users.insert |
users | UPDATE | users | users.update |
users | DELETE | users | users.delete |
Subscribing on the client
const { status } = useRealtime({
channels: ["users"],
events: ["users.insert", "users.update", "users.delete"],
onData(payload) {
console.log(payload.event, payload.data); // e.g. "users.insert", { id: 1, name: "..." }
},
});Options
| Option | Type | Default | Description |
|---|---|---|---|
sql | typeof Bun.sql | — | Bun SQL connection |
tables | Array<string | TableConfig> | — | Tables to watch |
pgSchema | string | "public" | PostgreSQL schema |
realtimeChannel | string | table name | Default realtime channel for all tables |
Per-table config
Each entry in tables can be a plain string or a TableConfig object for per-table control:
postgresSync({
sql: Bun.sql,
tables: [
"posts", // plain string — no filter
{
name: "users",
// Only emit events for active users
filter: (row, op) => row.active === true,
},
{
name: "messages",
// Skip deletes
filter: (row, op) => op !== "DELETE",
// Emit on a different realtime channel
realtimeChannel: "feed",
},
],
});TableConfig
| Property | Type | Description |
|---|---|---|
name | string | Table name |
filter | (row, op) => boolean | Drop the event when this returns false. row is NEW for INSERT/UPDATE, OLD for DELETE. op is "INSERT", "UPDATE", or "DELETE". |
realtimeChannel | string | Override the realtime channel for this table |
Filter examples
// Only sync rows where a column matches
filter: (row) => row.status === "published"
// Skip soft-deletes (emit the update, not a delete event)
filter: (row, op) => op !== "DELETE"
// Only emit inserts
filter: (row, op) => op === "INSERT"
// Combine conditions
filter: (row, op) => op !== "DELETE" && row.tenant_id === "acme"pg API
| Method | Description |
|---|---|
pg.start() | Install triggers and start listening. Safe to call once — warns if already started. |
pg.stop() | Unlisten from all tables. |