Queen MQ

← Use cases

Recipe · orchestration

Transactional multi-stage pipelines.

Chain multiple processing stages together — translation → routing → notification, validate → charge → ship, ingest → enrich → publish — with one PostgreSQL transaction per handoff. The transaction().ack().push().commit() primitive guarantees that a worker crash, network hiccup, or PG hiccup between stages cannot duplicate a message into the next stage, nor lose it from the current one. Per-stage retries and DLQ are built into the queue config — no orchestrator process, no Temporal cluster.

100
messages produced
(14 poisoned on purpose)
86 / 86
routed → notified
(exact through-count)
14 / 14
poison messages
landed in DLQ
42
total failed attempts
(14 × 3 retries, then DLQ)

Numbers are from a verified run with deliberately-poisoned messages. Reproduce with examples/use-case-pipeline.js.

1. The problem

Real-world workloads are rarely a single processing step. A chat message goes through translation, intent routing, and delivery. An order goes through validation, charging, fulfilment, and notification. Each stage is a different team's code, different scaling profile, different failure mode. The standard implementation is a chain of queues. The standard bug in that implementation is one of these two race windows:

WhereRace windowVisible failure
Push next, then ack Worker crashes after pushing to stage N+1 but before acking stage N. Message is delivered twice: re-popped from stage N, processed again, pushed to stage N+1 a second time.
Ack first, then push Worker crashes after acking stage N but before pushing to stage N+1. Message is lost: stage N's offset has advanced, stage N+1 never saw it.

Both failure modes silently corrupt downstream state. They happen rarely enough to escape testing — on a heavily loaded system, weekly. The fix in most systems is "implement saga / outbox / 2PC" which doubles the surface area. In Queen the fix is one line of code: transaction().ack().push().commit().

2. The transactional handoff

Worker holds lease on stage N's partition │ ▼ result = await stageNHandler(msg) │ ▼ q.transaction() .ack(msg) ──┐ .queue('stageN+1').push([result]) ├ ONE PG TRANSACTION .commit() ──┘ │ ▼ Either both happened, or neither did. If COMMIT fails, lease expires, message redelivered. If COMMIT succeeds, stage N+1's queue has it AND stage N's cursor advanced.

The queen.transaction() builder packages the ack and the next-stage push into a single payload posted to POST /api/v1/transaction. On the broker side this is one call to queen.execute_transaction_v2: one BEGIN, one COMMIT, both operations inside. The PG transaction is the integrity boundary; there is no intermediate state that can leak.

3. What happens without the transaction

To make the property concrete, compare the two natural orderings:

javascript · the wrong way (ack-then-push)
// ❌ DANGEROUS — message can be lost between stages on a worker crash.
await q.queue('stageN').consume(async (msg) => {
  const out = await stageNHandler(msg)
  await q.ack(msg, true, { group: 'stageN-worker' })
  // ←─ worker dies HERE: msg is acked on stageN, never pushed to stageN+1
  await q.queue('stageN+1').push([{ data: out }])
})
javascript · the also-wrong way (push-then-ack)
// ❌ DANGEROUS — message can be duplicated to stageN+1 on a worker crash.
await q.queue('stageN').consume(async (msg) => {
  const out = await stageNHandler(msg)
  await q.queue('stageN+1').push([{ data: out }])
  // ←─ worker dies HERE: stageN+1 has it, but stageN's lease expires
  //                      and the broker redelivers msg → another push to stageN+1
  await q.ack(msg, true, { group: 'stageN-worker' })
})
javascript · the right way
// ✅ Both operations commit together inside one PG transaction.
await q.queue('stageN').autoAck(false).consume(async (msg) => {
  const out = await stageNHandler(msg)
  await q.transaction()
    .ack(msg, 'completed', { consumerGroup: 'stageN-worker' })
    .queue('stageN+1').partition(msg.data.entityId).push([{
      transactionId: `${msg.transactionId}-next`,
      data:          out
    }])
    .commit()
})

A worker crash anywhere in this handler now rolls back to the last successful commit(): either both the ack and the push happened (the message has moved to stage N+1, exactly once) or neither happened (the message stays in stage N with its retry counter intact, and the broker redelivers when the lease expires).

4. Quickstart — a 3-stage pipeline

Setup: three queues, one for each stage, all with retryLimit: 2 and dlqAfterMaxRetries: true. Messages are partitioned by chatId so all messages for one conversation stay strictly ordered.

Verified end-to-end

Every snippet on this page was run against a live Queen server at http://localhost:6632 before being included. The full demo lives at examples/use-case-pipeline.js and runs as part of the docs build.

javascript · setup
import { Queen } from 'queen-mq'

const q = new Queen({ url: 'http://localhost:6632', handleSignals: false })

const Q_TRANSLATE = 'chat.translate'
const Q_ROUTE     = 'chat.route'
const Q_NOTIFY    = 'chat.notify'

for (const name of [Q_TRANSLATE, Q_ROUTE, Q_NOTIFY]) {
  await q.queue(name).config({
    leaseTime:                 8,     // fast lease for fast retries
    retryLimit:                2,     // 1 original + 2 retries, then DLQ
    deadLetterQueue:           true,
    dlqAfterMaxRetries:        true,
    retentionEnabled:          true,
    retentionSeconds:          3600
  }).create()
}
javascript · stage 1 (translate) with transactional handoff
await q.queue(Q_TRANSLATE)
  .group('translate-worker')
  .batch(1)
  .each()
  .autoAck(false)
  .consume(async (msg) => {
    const translated = {
      ...msg.data,
      translatedText: `[en] ${msg.data.text}`,
      translatedAt:   Date.now()
    }
    await q.transaction()
      .ack(msg, 'completed', { consumerGroup: 'translate-worker' })
      .queue(Q_ROUTE).partition(msg.data.chatId).push([{
        transactionId: `${msg.data.chatId}-routed`,
        data:          translated
      }])
      .commit()
  })
  .onError(async (msg, err) => {
    // Manual nack so retry/DLQ kicks in.
    await q.ack(msg, false, {
      group: 'translate-worker',
      error: err.message
    })
  })

Stage 2 (route) and stage 3 (notify) follow the same shape. Stage 3 is terminal — it doesn't push anywhere, just calls a downstream API.

5. Retries and DLQ per stage

Each queue's retryLimit applies independently to that stage. When a handler throws, the manual nack (q.ack(msg, false)) marks the attempt as failed; the broker decrements the retry budget and redelivers when the lease expires. On exhaustion, the message is moved to that queue's DLQ — not to any global one — so debugging stays scoped to the stage that broke.

produce(100 messages, 14 are 'poisoned') │ ▼ ┌───────────────┐ │ translate │ retry_limit=2, DLQ=on └───────────────┘ │ ──── 14 poisons fail × 3 attempts each = 42 attempts ────► chat.translate DLQ │ 86 pass ▼ ┌───────────────┐ │ route │ retry_limit=2, DLQ=on └───────────────┘ │ 86 pass ▼ ┌───────────────┐ │ notify │ └───────────────┘ │ ▼ 86 completed.

Verified live run (numbers in the stat cards at the top):

CounterValueExpected
Messages produced100
Poisoned (every 7th)14
Translate handler attempts (incl. retries)128≥ 86 + 14×3 = 128
Translate handler failures4214 × 3 = 42
Routed (stage 2 throughput)8686
Notified (stage 3 throughput)8686
DLQ for chat.translate1414
http · inspect a stage's DLQ
curl -s "http://localhost:6632/api/v1/dlq?queue=chat.translate&limit=20" \
  | jq '.messages[] | { transactionId, retryCount, error: .errorMessage }'

Or via the dashboard: each queue's detail page shows its DLQ inline with a "Replay" action that re-pushes the message to the original queue.

6. Per-entity ordering via partitions

When you call .partition(msg.data.chatId) on each stage's push, all messages for one chat stay on the same partition through the entire pipeline. Combined with Queen's per-partition lease, that guarantees:

This is the smartchat / order-processing / IoT-per-device pattern. The partition is the unit of ordered work. Add a new chat → a new partition appears on first push, with no preallocation.

7. Common pitfalls

Don't ack inside the handler, then transaction-push after

The whole point of transaction() is that ack and push commit together. Calling q.ack(msg) first defeats it. Use the builder form everywhere except terminal stages.

Match consumerGroup on ack and consume

If your consumer uses .group('translate-worker'), then transaction().ack(msg, 'completed', { consumerGroup: 'translate-worker' }) must use the same name. Mismatches cause "Invalid or expired lease" errors and the message is redelivered.

Set leaseTime longer than your handler's p99

If the handler runs longer than leaseTime seconds the broker assumes it crashed and redelivers — and your eventual transaction commit will fail with "Invalid or expired lease". Either raise leaseTime or use .renewLease(true, intervalMillis) on the consumer.

DLQ is per-queue, not per-pipeline

Each stage has its own DLQ inspected at /api/v1/dlq?queue=<name>. If a stage 3 handler poisons every message, those poison messages land in chat.notify's DLQ, not in chat.translate's. This is deliberate — different stages have different operational owners.

Why this isn't Temporal

Temporal replays workflow code deterministically from an event log. Queen pipelines don't — they pop a message, run the handler, commit the next-stage push. There's no replay harness, no determinism contract on your handler code. The trade-off is simpler workers (use any library, any clock, any random number generator) but no automatic time-travel debugging. Most real pipelines don't need determinism; they need atomic handoffs.

8. References

WhatWhere
Full demo (verified live) examples/use-case-pipeline.js
Older chat-style example (manual transactions) examples/00-chat.js
Transaction stored procedure lib/schema/procedures/004_transaction.sql
Transaction HTTP route server/src/routes/transactions.cpp
Transaction client builder client-v2/builders/TransactionBuilder.js
DLQ + retry developer docs developer/13-consumer-groups.md

← Back to use cases