Real-time per-entity aggregations, queryable as plain SQL.
Maintain rolling per-customer, per-tenant, or per-device counters
(count, sum, max,
avg) continuously, with no Materialize, ksqlDB, or
ClickHouse cluster to operate. The aggregate is a row
in queen_streams.state — your dashboard, BI tool, billing
service, or alerting rule queries it as plain SQL on the PostgreSQL
you already run.
across 30 customers
(exact match)
matches input to the penny
infrastructure
no analytics store
Numbers are from a verified run with 30 customers × ~100 events each
over 12 seconds, tumbling windows of 5 seconds.
Reproduce with examples/use-case-aggregations.js.
1. The problem
You need "per-customer activity in the last N seconds / minutes / hours" continuously, accessible from:
- A real-time dashboard (Grafana panel, custom React UI).
- A billing service that meters usage per window.
- An alerting rule that fires if any customer crosses a threshold.
- An ML feature pipeline that needs rolling stats as inputs.
The standard answers in this space all add their own infrastructure:
| System | Floor cost | Where state lives | How dashboards read it |
|---|---|---|---|
| Materialize | $400+/mo managed | Proprietary store | SQL via Materialize's wire-compatible endpoint |
| ksqlDB | Kafka cluster + ksqlDB nodes | Internal RocksDB + Kafka changelog | REST API or "pull queries" |
| Tinybird | $0–$$ as a service | ClickHouse hosted | Generated HTTP API |
| Self-hosted ClickHouse + ingestor | Separate cluster + ETL | ClickHouse | SQL via ClickHouse client |
| Queen streams | Your existing PostgreSQL | queen_streams.state table |
Plain SQL — already wired to your BI / Grafana |
2. The pattern
A producer pushes events partitioned by entity (customerId, tenantId,
deviceId). A streaming query keys by partition (default), tumbles into
windows of N seconds, and runs aggregate() over each
window. The per-window accumulator is persisted as a row in
queen_streams.state at the end of every cycle:
Each window's accumulator is a JSONB document like
{ count: 47, sum: 312.5, max: 99.0, windowStart, windowEnd }.
Workers holding the partition lease are the only writers, so there's
no cross-worker contention on a customer's row. State is durable,
replicated by your PG backup, and queryable from anywhere that speaks
SQL.
3. Quickstart
Every snippet on this page was run against a live Queen server at
http://localhost:6632 before being included. The
full demo lives at
examples/use-case-aggregations.js.
4. Querying the live aggregate via SQL
While a window is still open for a customer, the partial
accumulator is sitting in queen_streams.state. You can
query it directly:
Hook this query into Grafana with the standard PostgreSQL datasource and you have a live per-customer dashboard. No exporter, no second database, no schema migration in a different system.
The state row for a window exists from the first event in that window until the window closes. When the window closes (windowEnd + grace period <= clock), the runner emits the final aggregate to the sink (if any) and deletes the state row in the same PG transaction. So:
- Live SELECTs see currently open windows for each customer.
- The sink queue
(
events.totals_per_customer_per_5sabove) receives one push per closed window — that's your durable history.
5. Emitting closed windows to a queue
Pair this with the .to(sinkQueue) on the stream and every
closed window becomes a regular Queen message on a sink queue. That
queue can fan out to multiple downstream consumers (billing,
alerting, anomaly detection) using consumer groups, with full
replay-from-timestamp support.
Other consumer groups (analytics-dashboard,
anomaly-detector) read the same sink queue independently.
Adding a new downstream is one
q.queue(...).group('new-name').consume(...) — no schema
change to the streaming query.
6. vs. Materialize / ksqlDB / Tinybird
When you pick this approach over a dedicated streaming SQL engine, you're making two trade-offs:
| Materialize / ksqlDB | Queen streams | |
|---|---|---|
| SQL expressiveness | Full SQL on the streaming query | Functional builder (map/filter/keyBy/window/reduce/aggregate) in JS / Python / Go |
| Join semantics | Stream-stream joins, materialised views over joins | Single-stream operators; cross-stream join requires the repartition pattern |
| Infrastructure | Separate cluster ($400+/mo managed, more self-hosted) | Your existing PG |
| State location | Proprietary store, queried via wire protocol | Plain PG table, queried via plain SQL |
| Backup / restore | Separate backup pipeline | Same as your existing PG backup |
| Backfill from history | Reset the materialized view | subscriptionFrom on a new CG + new query |
Pick Materialize / ksqlDB when you genuinely need streaming SQL with joins, CDC, or advanced temporal operators. Pick Queen streams when your aggregations are per-entity windowed counters / sums / max — the vast majority of "real-time per-customer X" requirements — and you don't want to operate a second database for them.
7. Common pitfalls
The state row is deleted on window close (the closed aggregate is
emitted to the sink instead). If you query
queen_streams.state a few seconds after the producer
stops, the rows will be gone and your dashboard panel will look
empty. For historical aggregates, consume the sink queue or persist
them into your own table.
In processing-time mode, a window only closes when either (a) a
later event arrives whose created_at is past the window
end, or (b) the idle-flush timer fires. If a customer goes quiet,
the window stays open until the timer sweeps it. Tune
idleFlushMs based on how stale you can tolerate the
aggregate being.
Queen partitions are the unit of single-writer ownership. If the
source queue is partitioned by orderId but you call
.keyBy(m => m.customerId), two partition leases may
write to the same logical customerId state row from
different workers. Always partition by the same key you aggregate
by — usually customerId / tenantId /
deviceId — and skip .keyBy() entirely.
One open window = one PG row. Aggregating per-customer with 1M customers and 1-minute windows means up to 1M open state rows. Fine for PG, but worth right-sizing your hardware. For very large cardinalities, prefer larger windows or coarser keying.
8. References
| What | Where |
|---|---|
| Full demo (verified live) | examples/use-case-aggregations.js |
| Reference: per-customer windowed sum (streams SDK) | streams/examples/02-per-customer-windowed-sum.js |
| Window operators source | client-v2/streams/operators/ |
| Streams schema | lib/schema/procedures/019_streams_schema.sql |
| Cycle commit (atomic state + push + ack) | 021_streams_cycle_v1.sql |
