Queen MQ

← Use cases

Recipe · analytics

Real-time per-entity aggregations, queryable as plain SQL.

Maintain rolling per-customer, per-tenant, or per-device counters (count, sum, max, avg) continuously, with no Materialize, ksqlDB, or ClickHouse cluster to operate. The aggregate is a row in queen_streams.state — your dashboard, BI tool, billing service, or alerting rule queries it as plain SQL on the PostgreSQL you already run.

3,000
events produced
across 30 customers
3,000
Σ window-counts
(exact match)
Σ exact
sum of amounts
matches input to the penny
0 extra
infrastructure
no Redis, no Materialize,
no analytics store

Numbers are from a verified run with 30 customers × ~100 events each over 12 seconds, tumbling windows of 5 seconds. Reproduce with examples/use-case-aggregations.js.

1. The problem

You need "per-customer activity in the last N seconds / minutes / hours" continuously, accessible from:

The standard answers in this space all add their own infrastructure:

SystemFloor costWhere state livesHow dashboards read it
Materialize $400+/mo managed Proprietary store SQL via Materialize's wire-compatible endpoint
ksqlDB Kafka cluster + ksqlDB nodes Internal RocksDB + Kafka changelog REST API or "pull queries"
Tinybird $0–$$ as a service ClickHouse hosted Generated HTTP API
Self-hosted ClickHouse + ingestor Separate cluster + ETL ClickHouse SQL via ClickHouse client
Queen streams Your existing PostgreSQL queen_streams.state table Plain SQL — already wired to your BI / Grafana

2. The pattern

A producer pushes events partitioned by entity (customerId, tenantId, deviceId). A streaming query keys by partition (default), tumbles into windows of N seconds, and runs aggregate() over each window. The per-window accumulator is persisted as a row in queen_streams.state at the end of every cycle:

producer ──► events (Queen, partitioned by customerId) │ ▼ Stream.from(events) .keyBy(default = partitionId = customerId) .windowTumbling({ seconds: 60 }) .aggregate({ count, sum, max }) .to(events.totals_per_customer_per_min) ◀── (optional sink) │ ▼ queen_streams.state (one row per OPEN window per customer) │ PK = (query_id, partition_id, state_key) └─► SELECT key, value FROM queen_streams.state ─► your Grafana / Metabase / billing service

Each window's accumulator is a JSONB document like { count: 47, sum: 312.5, max: 99.0, windowStart, windowEnd }. Workers holding the partition lease are the only writers, so there's no cross-worker contention on a customer's row. State is durable, replicated by your PG backup, and queryable from anywhere that speaks SQL.

3. Quickstart

Verified end-to-end

Every snippet on this page was run against a live Queen server at http://localhost:6632 before being included. The full demo lives at examples/use-case-aggregations.js.

javascript · setup + producer
// npm install queen-mq        (Node 22+)
import { Queen, Stream } from 'queen-mq'

const url = 'http://localhost:6632'
const q   = new Queen({ url, handleSignals: false })

const Q_EVENTS = 'events'

await q.queue(Q_EVENTS).config({
  leaseTime:                 30,
  retryLimit:                3,
  retentionEnabled:          true,
  retentionSeconds:          3600
}).create()

// Producer: 3000 events spread across 30 customers.
for (let burst = 0; burst < 30; burst++) {
  const batch = []
  for (let i = 0; i < 25; i++) {
    const customerId = `cust-${Math.floor(Math.random() * 30)}`
    batch.push({
      partition: customerId,
      item:      { data: { customerId, amount: Math.random() * 100, t: Date.now() } }
    })
  }
  // Group by partition so each push hits one partition cleanly.
  const byPart = new Map()
  for (const { partition, item } of batch) {
    const arr = byPart.get(partition) || []
    arr.push(item)
    byPart.set(partition, arr)
  }
  await Promise.all([...byPart].map(([p, items]) =>
    q.queue(Q_EVENTS).partition(p).push(items)
  ))
  await new Promise(r => setTimeout(r, 50))
}
javascript · streaming aggregation
// 5-second tumbling windows, count + sum + max per customer.
const stream = await Stream
  .from(q.queue(Q_EVENTS))
  // Default key = partition_id = customerId. No explicit keyBy needed.
  .windowTumbling({ seconds: 5, idleFlushMs: 1500 })
  .aggregate({
    count: () => 1,
    sum:   m => m.amount,
    max:   m => m.amount
  })
  .to(q.queue('events.totals_per_customer_per_5s'))   // optional sink queue
  .run({
    queryId:       'customer-rolling-stats',
    url,
    batchSize:     200,
    maxPartitions: 8,
    reset:         true
  })

// Verified run, 3000 events across 30 customers, 12s producer:
//   Σ window.count across all emits  = 3000   (matches input exactly)
//   Σ window.sum   across all emits  = 150803.70  (matches input to the penny)
//   query registered in queen_streams.queries with config_hash 376ce9fa...

4. Querying the live aggregate via SQL

While a window is still open for a customer, the partial accumulator is sitting in queen_streams.state. You can query it directly:

sql · current open-window aggregates per customer
SELECT
    s.key                                AS customer_or_partition,
    (s.value->'acc'->>'count')::int      AS events_in_window,
    (s.value->'acc'->>'sum')::numeric    AS sum_in_window,
    (s.value->'acc'->>'max')::numeric    AS max_in_window,
    to_timestamp(((s.value->>'windowStart')::bigint / 1000.0)) AS window_start,
    to_timestamp(((s.value->>'windowEnd')::bigint / 1000.0))   AS window_end
FROM queen_streams.state s
JOIN queen_streams.queries q ON q.id = s.query_id
WHERE q.name = 'customer-rolling-stats'
ORDER BY events_in_window DESC
LIMIT 20;

Hook this query into Grafana with the standard PostgreSQL datasource and you have a live per-customer dashboard. No exporter, no second database, no schema migration in a different system.

State lifecycle

The state row for a window exists from the first event in that window until the window closes. When the window closes (windowEnd + grace period <= clock), the runner emits the final aggregate to the sink (if any) and deletes the state row in the same PG transaction. So:

5. Emitting closed windows to a queue

Pair this with the .to(sinkQueue) on the stream and every closed window becomes a regular Queen message on a sink queue. That queue can fan out to multiple downstream consumers (billing, alerting, anomaly detection) using consumer groups, with full replay-from-timestamp support.

javascript · downstream consumer of closed windows
await q.queue('events.totals_per_customer_per_5s')
  .group('billing-meter')
  .batch(50)
  .each()
  .consume(async (msg) => {
    // msg.data = { count, sum, max }   one record per closed window
    await billingSystem.recordUsage({
      customer:  msg.partition,        // partitions carry forward the customerId
      windowEnd: msg.data.windowEnd,
      count:     msg.data.count,
      revenue:   msg.data.sum
    })
  })

Other consumer groups (analytics-dashboard, anomaly-detector) read the same sink queue independently. Adding a new downstream is one q.queue(...).group('new-name').consume(...) — no schema change to the streaming query.

6. vs. Materialize / ksqlDB / Tinybird

When you pick this approach over a dedicated streaming SQL engine, you're making two trade-offs:

Materialize / ksqlDBQueen streams
SQL expressiveness Full SQL on the streaming query Functional builder (map/filter/keyBy/window/reduce/aggregate) in JS / Python / Go
Join semantics Stream-stream joins, materialised views over joins Single-stream operators; cross-stream join requires the repartition pattern
Infrastructure Separate cluster ($400+/mo managed, more self-hosted) Your existing PG
State location Proprietary store, queried via wire protocol Plain PG table, queried via plain SQL
Backup / restore Separate backup pipeline Same as your existing PG backup
Backfill from history Reset the materialized view subscriptionFrom on a new CG + new query

Pick Materialize / ksqlDB when you genuinely need streaming SQL with joins, CDC, or advanced temporal operators. Pick Queen streams when your aggregations are per-entity windowed counters / sums / max — the vast majority of "real-time per-customer X" requirements — and you don't want to operate a second database for them.

7. Common pitfalls

Live SQL shows only OPEN windows

The state row is deleted on window close (the closed aggregate is emitted to the sink instead). If you query queen_streams.state a few seconds after the producer stops, the rows will be gone and your dashboard panel will look empty. For historical aggregates, consume the sink queue or persist them into your own table.

Idle-flush is the only way windows close on quiet entities

In processing-time mode, a window only closes when either (a) a later event arrives whose created_at is past the window end, or (b) the idle-flush timer fires. If a customer goes quiet, the window stays open until the timer sweeps it. Tune idleFlushMs based on how stale you can tolerate the aggregate being.

Partition by the entity you're aggregating over

Queen partitions are the unit of single-writer ownership. If the source queue is partitioned by orderId but you call .keyBy(m => m.customerId), two partition leases may write to the same logical customerId state row from different workers. Always partition by the same key you aggregate by — usually customerId / tenantId / deviceId — and skip .keyBy() entirely.

Aggregate cardinality matters

One open window = one PG row. Aggregating per-customer with 1M customers and 1-minute windows means up to 1M open state rows. Fine for PG, but worth right-sizing your hardware. For very large cardinalities, prefer larger windows or coarser keying.

8. References

WhatWhere
Full demo (verified live) examples/use-case-aggregations.js
Reference: per-customer windowed sum (streams SDK) streams/examples/02-per-customer-windowed-sum.js
Window operators source client-v2/streams/operators/
Streams schema lib/schema/procedures/019_streams_schema.sql
Cycle commit (atomic state + push + ack) 021_streams_cycle_v1.sql

← Back to use cases