Queen MQ

← Use cases

Recipe · ops

Subscription replay — rewind a stream to any timestamp.

Your consumer had a bug last Tuesday. You shipped a fix today. You need to reprocess every message since the bug went live — without disturbing the production consumer that's still running. With Queen this is one line of configuration: spawn a new consumer group with subscriptionFrom set to the moment the bug went live. The new group reads from that exact timestamp; the production group's offset is untouched.

1,500
events pushed
across 16 partitions
1,000
received by the
rewound CG (exact)
idx 500
first event seen
(matches deploy boundary)
0
interference with the
production consumer group

Numbers are from a verified run on a single Mac with one PostgreSQL 18 container. Reproduce with examples/use-case-replay.js.

1. The problem

The "I need to reprocess events from a point in time" problem comes up constantly:

How the major alternatives handle it:

SystemReprocess from timestamp?Affects production?
SQS No — messages are deleted on ack
Kafka Yes (new consumer group + seek) Possibly — seek on the same group disturbs production
RabbitMQ No — messages are gone after consumption
Pulsar Yes (subscription seek) Same caveat as Kafka
Queen Yes — new CG with subscriptionFrom Zero — each CG has its own offset row in PG

2. The pattern

Queen stores each consumer group's progress as a row in queen.partition_consumers keyed by (partition_id, consumer_group). To replay, you spawn a brand-new consumer group that's never been seen before. The first pop registers it in queen.consumer_groups_metadata with the subscription anchor — either 'all', 'new', or a specific ISO timestamp.

queue: order-events (16 partitions, all messages retained) │ ┌─────────┴─────────┐ │ │ ▼ ▼ CG 'prod' CG 'reprocess' ← new, never seen before (subscription (subscription mode='all') anchor: '2026-05-11T14:00:00Z') │ │ own offset row own offset row in PG in PG │ │ ▼ ▼ sees everything sees only events (no disruption) created at-or-after the anchor

Because the two consumer groups have separate offset rows, they read the same partitions completely independently. The production worker doesn't pause, restart, or notice anything. The replay worker is isolated by construction.

3. Quickstart

Minimal reproducible setup: push 1,500 events with a deploy boundary marked between idx 499 and idx 500. One CG reads everything; a new CG spawned afterwards reads only what came at or after the boundary.

Verified end-to-end

Every snippet on this page was run against a live Queen server at http://localhost:6632 before being included. The full demo lives at examples/use-case-replay.js and is run as part of the docs build.

javascript · production CG sees everything
// npm install queen-mq        (Node 22+)
import { Queen } from 'queen-mq'

const url = 'http://localhost:6632'
const q   = new Queen({ url, handleSignals: false })

const QUEUE = 'order-events'

await q.queue(QUEUE).config({
  leaseTime:                 30,
  retryLimit:                3,
  retentionEnabled:          true,
  retentionSeconds:          3600,     // keep history for 1 hour
  completedRetentionSeconds: 600
}).create()

// Production consumer — sees every message ever pushed.
const ac = new AbortController()
;(async () => {
  await q.queue(QUEUE)
    .group('prod')
    .batch(50)
    .each()
    .subscriptionMode('all')   // ★ first pop registers prod with mode='all'
    .partitions(8)
    .consume(async (msg) => {
      // handle in production
    }, { signal: ac.signal })
})()

// Producer (interleaved with prod consumer running):
//   - push 500 events  ───►  sleep 800 ms  ───►  capture deployAt  ───►  sleep 800 ms
//   - push 500 more events
//   - sleep 600 ms
//   - push 500 more events

Now the deploy fix has shipped and we want to reprocess every event created at or after deployAt. We spawn a new consumer group with that timestamp:

javascript · spawn a rewound consumer group
// New CG, never seen before. First pop locks in the subscription anchor.
const reprocessAc = new AbortController()
;(async () => {
  await q.queue(QUEUE)
    .group('reprocess')
    .batch(50)
    .each()
    .subscriptionFrom(deployAt)   // ★ ISO timestamp — picks up the 'timestamp' mode
    .partitions(8)
    .consume(async (msg) => {
      // re-run your fixed pipeline for this message
    }, { signal: reprocessAc.signal })
})()

// Verified run (1,500 events total, deploy boundary at idx 500):
//   'prod' CG received:       1500  (expected 1500)
//   'reprocess' CG received:  1000  (expected ≈ 1000)
//   smallest idx in reprocess: 500  (matches deploy boundary)

Note that 'prod' and 'reprocess' consume the same queue, on the same broker, against the same database — there is no contention or impact between them. The same pattern works at any scale: the reprocess CG can be a separate Pod, a separate cluster, a notebook, or a CLI invocation.

4. Cleaning up a consumer group when you're done

Once the reprocess completes, you typically want to delete the one-shot CG so its registration doesn't linger. Use the admin API:

javascript · delete a consumer group's metadata
// Delete CG state + subscription metadata for one queue:
await q.admin.deleteConsumerGroupForQueue(
  'reprocess',     // consumer group
  'order-events',  // queue
  true             // deleteMetadata: also drop the subscription_timestamp row
)

queue.delete() intentionally does not cascade to queen.consumer_groups_metadata, because consumer groups subscribe across multiple queues (and across namespace/task selectors). To wipe the metadata too, pass deleteMetadata: true. This is the same pattern as queenctl cg delete --queue ....

5. The three subscription modes

Mode How to set it What the CG sees on first encounter
'all' (default) .subscriptionMode('all') or omit entirely Every message ever pushed (full history, until retention prunes it).
'new' .subscriptionMode('new') Only messages pushed after the CG's first pop. Skips history.
'timestamp' .subscriptionFrom('2026-05-11T14:00:00Z') Only messages whose created_at >= timestamp.

The mode is locked in on the consumer group's first pop. Subsequent pops by the same CG honour the recorded anchor — they ignore any subscriptionMode / subscriptionFrom you pass. To change the anchor, you either rename the CG, or delete it via the admin API (see §4) and let the next pop re-register it.

6. Why the two CGs don't interfere

Per-CG isolation is enforced at the schema level. Every (partition_id, consumer_group) pair gets its own row in queen.partition_consumers holding that CG's last_consumed_id. The pop stored procedure (pop_unified_batch_v4) filters by that row alone — no cross-CG read or write happens anywhere on the hot path.

sql · independent offset rows per CG
SELECT consumer_group,
       count(*)                AS partitions_consumed,
       sum(total_messages_consumed) AS lifetime_messages,
       max(last_consumed_at)   AS last_seen
FROM   queen.partition_consumers pc
JOIN   queen.partitions p   ON p.id = pc.partition_id
JOIN   queen.queues     q   ON q.id = p.queue_id
WHERE  q.name = 'order-events'
GROUP  BY consumer_group;

You'll see one row per CG. Each one advances independently as that CG processes messages. There's no shared "topic offset" that one CG can clobber on another.

7. Common pitfalls

Subscription anchor is locked at first pop

Once a consumer group has popped once, its anchor is fixed. Re-running your script with a different subscriptionFrom against the same CG name does nothing — the recorded subscription_timestamp wins. Either rename the CG or delete it via q.admin.deleteConsumerGroupForQueue(cg, queue, true).

Retention can erase the data you want to replay

If the source queue has aggressive retentionSeconds set, the messages you want to rewind to may already be gone. Verify SELECT count(*) FROM queen.messages WHERE created_at >= '...' before relying on replay for a critical window.

Don't reuse a "reprocess" name for permanent consumers

The natural pattern is one fresh CG per replay job (e.g. reprocess-2026-05-11). That way each replay starts clean. Deleting and re-creating the same CG with a different anchor is a 2-step dance you can avoid by naming.

Replay scales just like normal pop

The rewound CG uses the exact same wildcard pop path (maxPartitions, lease, batch, etc.) as production. You can run multiple parallel workers on a replay CG — Queen's per-partition lease guarantees no double-processing.

8. References

WhatWhere
Full demo (verified live) examples/use-case-replay.js
Pop procedure that enforces subscription anchors lib/schema/procedures/002d_pop_unified_v4.sql
Consumer-group admin SPs lib/schema/procedures/008_consumer_groups.sql
Consumer-group developer docs developer/13-consumer-groups.md
queenctl CLI equivalents queenctl cg ls / describe / delete / seek

← Back to use cases