Real-time SQL subscriptions for PostgreSQL. Subscribe to any query, get live updates via NATS.

⚠️ Alpha — Under active development
CI Release Docker Rust PostgreSQL NATS MIT

Quick Start

// Subscribe to a query
const res = await nats.request('livequery.orders.subscribe', {
  query: "SELECT * FROM orders WHERE status = 'pending'",
  mode: 'events'
});

// Receive real-time updates
const sub = nats.subscribe(res.subject);
for await (const msg of sub) {
  console.log('Update:', JSON.parse(msg.data));
}
import nats
import json

# Subscribe to a query
nc = await nats.connect("nats://localhost:4222")
res = await nc.request("livequery.orders.subscribe", json.dumps({
    "query": "SELECT * FROM orders WHERE status = 'pending'",
    "mode": "events"
}).encode())
data = json.loads(res.data)

# Receive real-time updates
async for msg in nc.subscribe(data["subject"]):
    print("Changes:", json.loads(msg.data)["events"])
// Subscribe to a query
nc, _ := nats.Connect("nats://localhost:4222")
res, _ := nc.Request("livequery.orders.subscribe", []byte(`{
  "query": "SELECT * FROM orders WHERE status = 'pending'",
  "mode": "events"
}`), time.Second*10)

var resp struct{ Subject string }
json.Unmarshal(res.Data, &resp)

// Receive real-time updates
nc.Subscribe(resp.Subject, func(msg *nats.Msg) {
    fmt.Println("Changes:", string(msg.Data))
})
// Subscribe to a query
let nc = async_nats::connect("nats://localhost:4222").await?;
let res = nc.request("livequery.orders.subscribe", r#"{
  "query": "SELECT * FROM orders WHERE status = 'pending'",
  "mode": "events"
}"#.into()).await?;

let data: serde_json::Value = serde_json::from_slice(&res.payload)?;

// Receive real-time updates
let mut sub = nc.subscribe(data["subject"].as_str().unwrap()).await?;
while let Some(msg) = sub.next().await {
    println!("Changes: {:?}", msg.payload);
}

Subscription Modes

events Recommended

Differential updates inspired by Materialize SUBSCRIBE. mz_diff: +1 = insert, -1 = delete. Update = delete + insert pair.

// Insert
{ "seq": 1, "ts": 1706464800000, "events": [
  { "mz_diff": 1, "data": { "id": 1, "status": "pending" } }
]}
// Update = delete old + insert new
{ "seq": 2, "ts": 1706464801000, "events": [
  { "mz_diff": -1, "data": { "id": 1, "status": "pending" } },
  { "mz_diff": 1, "data": { "id": 1, "status": "shipped" } }
]}
snapshot

Receive full query result on every change. Just replace your data. Best for small datasets or guaranteed consistency.

{ "seq": 1, "ts": 1706464800000, "snapshot": [
  { "id": 1, "status": "pending" },
  { "id": 2, "status": "shipped" }
]}

Features

Plain SQL

No new query languages. Write standard SQL and subscribe to results in real-time.

Low Latency

Native PostgreSQL WAL streaming with sub-10ms change detection.

Shared Queries

Identical queries share one snapshot. Scales efficiently with many clients.

NATS Native

Built on NATS for lightweight, high-performance pub/sub messaging.

Install

Docker

docker pull ghcr.io/sstepanchuk/livequery:latest

Cargo

cargo install --git https://github.com/sstepanchuk/livequery

Binary

curl -LO https://github.com/sstepanchuk/livequery/releases/latest/download/livequery-linux-x86_64.tar.gz