Transactional multi-stage pipelines.
Chain multiple processing stages together — translation → routing →
notification, validate → charge → ship, ingest → enrich → publish —
with one PostgreSQL transaction per handoff. The
transaction().ack().push().commit() primitive guarantees
that a worker crash, network hiccup, or PG hiccup between stages
cannot duplicate a message into the next stage, nor lose it from the
current one. Per-stage retries and DLQ are built into the queue
config — no orchestrator process, no Temporal cluster.
(14 poisoned on purpose)
(exact through-count)
landed in DLQ
(14 × 3 retries, then DLQ)
Numbers are from a verified run with deliberately-poisoned messages.
Reproduce with examples/use-case-pipeline.js.
1. The problem
Real-world workloads are rarely a single processing step. A chat message goes through translation, intent routing, and delivery. An order goes through validation, charging, fulfilment, and notification. Each stage is a different team's code, different scaling profile, different failure mode. The standard implementation is a chain of queues. The standard bug in that implementation is one of these two race windows:
| Where | Race window | Visible failure |
|---|---|---|
| Push next, then ack | Worker crashes after pushing to stage N+1 but before acking stage N. | Message is delivered twice: re-popped from stage N, processed again, pushed to stage N+1 a second time. |
| Ack first, then push | Worker crashes after acking stage N but before pushing to stage N+1. | Message is lost: stage N's offset has advanced, stage N+1 never saw it. |
Both failure modes silently corrupt downstream state. They happen
rarely enough to escape testing — on a heavily loaded system, weekly.
The fix in most systems is "implement saga / outbox / 2PC" which
doubles the surface area. In Queen the fix is one line of code:
transaction().ack().push().commit().
2. The transactional handoff
The queen.transaction() builder packages the ack and the
next-stage push into a single payload posted to
POST /api/v1/transaction.
On the broker side this is one call to
queen.execute_transaction_v2:
one BEGIN, one COMMIT, both operations
inside. The PG transaction is the integrity boundary; there is no
intermediate state that can leak.
3. What happens without the transaction
To make the property concrete, compare the two natural orderings:
A worker crash anywhere in this handler now rolls back to the last
successful commit(): either both the ack and the push
happened (the message has moved to stage N+1, exactly once) or neither
happened (the message stays in stage N with its retry counter intact,
and the broker redelivers when the lease expires).
4. Quickstart — a 3-stage pipeline
Setup: three queues, one for each stage, all with
retryLimit: 2 and dlqAfterMaxRetries: true.
Messages are partitioned by chatId so all messages for one
conversation stay strictly ordered.
Every snippet on this page was run against a live Queen server at
http://localhost:6632 before being included. The
full demo lives at
examples/use-case-pipeline.js
and runs as part of the docs build.
Stage 2 (route) and stage 3 (notify) follow
the same shape. Stage 3 is terminal — it doesn't push anywhere, just
calls a downstream API.
5. Retries and DLQ per stage
Each queue's retryLimit applies independently to that
stage. When a handler throws, the manual nack
(q.ack(msg, false)) marks the attempt as failed; the
broker decrements the retry budget and redelivers when the lease
expires. On exhaustion, the message is moved to that queue's DLQ — not
to any global one — so debugging stays scoped to the stage that broke.
Verified live run (numbers in the stat cards at the top):
| Counter | Value | Expected |
|---|---|---|
| Messages produced | 100 | — |
| Poisoned (every 7th) | 14 | — |
| Translate handler attempts (incl. retries) | 128 | ≥ 86 + 14×3 = 128 |
| Translate handler failures | 42 | 14 × 3 = 42 |
| Routed (stage 2 throughput) | 86 | 86 |
| Notified (stage 3 throughput) | 86 | 86 |
DLQ for chat.translate | 14 | 14 |
Or via the dashboard: each queue's detail page shows its DLQ inline with a "Replay" action that re-pushes the message to the original queue.
6. Per-entity ordering via partitions
When you call .partition(msg.data.chatId) on each stage's
push, all messages for one chat stay on the same partition through the
entire pipeline. Combined with Queen's per-partition lease, that
guarantees:
- Stage N processes messages for chat-X strictly in order.
- Stage N+1 receives messages for chat-X strictly in order.
- Different chats are processed concurrently across partitions without contention.
This is the smartchat / order-processing / IoT-per-device pattern. The partition is the unit of ordered work. Add a new chat → a new partition appears on first push, with no preallocation.
7. Common pitfalls
The whole point of transaction() is that ack and push
commit together. Calling q.ack(msg) first defeats it.
Use the builder form everywhere except terminal stages.
consumerGroup on ack and consume
If your consumer uses .group('translate-worker'), then
transaction().ack(msg, 'completed', { consumerGroup: 'translate-worker' })
must use the same name. Mismatches cause "Invalid or expired lease"
errors and the message is redelivered.
leaseTime longer than your handler's p99
If the handler runs longer than leaseTime seconds the
broker assumes it crashed and redelivers — and your eventual
transaction commit will fail with "Invalid or expired lease". Either
raise leaseTime or use
.renewLease(true, intervalMillis) on the consumer.
Each stage has its own DLQ inspected at
/api/v1/dlq?queue=<name>. If a stage 3 handler
poisons every message, those poison messages land in
chat.notify's DLQ, not in chat.translate's.
This is deliberate — different stages have different operational
owners.
Temporal replays workflow code deterministically from an event log. Queen pipelines don't — they pop a message, run the handler, commit the next-stage push. There's no replay harness, no determinism contract on your handler code. The trade-off is simpler workers (use any library, any clock, any random number generator) but no automatic time-travel debugging. Most real pipelines don't need determinism; they need atomic handoffs.
8. References
| What | Where |
|---|---|
| Full demo (verified live) | examples/use-case-pipeline.js |
| Older chat-style example (manual transactions) | examples/00-chat.js |
| Transaction stored procedure | lib/schema/procedures/004_transaction.sql |
| Transaction HTTP route | server/src/routes/transactions.cpp |
| Transaction client builder | client-v2/builders/TransactionBuilder.js |
| DLQ + retry developer docs | developer/13-consumer-groups.md |
