Queen MQ

← Use cases

Recipe · integration

Idempotent webhook deduplication, without Redis.

Stripe, Booking, Octorate, Twilio, GitHub all redeliver webhooks — sometimes within minutes, sometimes hours later. Your handler running twice means a double charge, a duplicate email, a duplicate row. The standard fix is a Redis SET with a TTL; the standard problem is that now you have Redis to operate, an eviction story to design, and an outage waiting when Redis fills up. Queen replaces all of that with a streaming window whose seen-set state lives in PostgreSQL and is garbage-collected automatically when the window closes.

1,000
unique webhook events
across 5 providers
300
duplicates interleaved
(redelivery simulation)
1,000 / 0
unique through · dupes
leaked to downstream
23.1%
of incoming dropped
(matches injected ratio)

Numbers are from a verified run with 1,000 events + 300 random duplicates over 5 providers, 10-second tumbling windows. Reproduce with examples/use-case-dedup.js.

1. The problem

External providers retry webhook deliveries on any non-2xx response, on read timeouts, and sometimes — especially during platform-side incidents — at random. The contract is "at-least-once". Real redelivery patterns we've seen:

The standard fix:

// The naïve handler — broken under redelivery.
app.post('/webhooks/stripe', async (req, res) => {
  await charge(req.body)             // ← runs twice on retry
  res.send('ok')
})

// The standard fix — adds Redis to your stack.
app.post('/webhooks/stripe', async (req, res) => {
  if (await redis.set(`seen:${req.body.id}`, 1, 'EX', 3600, 'NX') === null) {
    return res.send('ok')            // already processed
  }
  await charge(req.body)
  res.send('ok')
})

Now you operate Redis. You worry about its TTL policy when it approaches max-memory, about eviction kicking in mid-incident, about it being a SPOF for every webhook integration in your platform.

2. The pattern

A webhook ingestor pushes raw webhook bodies onto a Queen queue partitioned by provider. A streaming query tumbles into N-second windows per provider, holds a seen-set of eventIds in state, and emits each event downstream only on its first observation within that window. State lives in PostgreSQL; when the window closes, the row is deleted in the same transaction that emits the final accumulator.

HTTP ingestor ──► incoming (Queen, partitioned by providerId) │ ▼ Stream.from(incoming) .keyBy(default = providerId) .windowTumbling({ seconds: 600 }) ← 10-min dedup window .reduce( (acc, msg) => acc.seen.includes(msg.eventId) ? acc ← already seen → drop : { seen: [...acc.seen, msg.eventId], firsts: [...acc.firsts, msg] }, ← first time → keep { seen: [], firsts: [] } ) .flatMap(window => window.firsts) ← emit only the firsts .to(verified) │ ▼ downstream consumer (charge, email, etc.) sees one event per unique eventId

Two key properties:

3. Quickstart

Verified end-to-end

Every snippet on this page was run against a live Queen server at http://localhost:6632 before being included. The full demo lives at examples/use-case-dedup.js.

javascript · setup
import { Queen, Stream } from 'queen-mq'

const url = 'http://localhost:6632'
const q   = new Queen({ url, handleSignals: false })

const Q_INCOMING = 'webhooks.incoming'
const Q_VERIFIED = 'webhooks.verified'

await q.queue(Q_INCOMING).config({
  leaseTime:        30,
  retryLimit:       3,
  retentionEnabled: true,
  retentionSeconds: 3600
}).create()
await q.queue(Q_VERIFIED).config({
  leaseTime:        30,
  retentionEnabled: true,
  retentionSeconds: 3600
}).create()
javascript · the dedup stream
// 10-minute tumbling windows, per-provider seen-set, emit firsts to verified.
const stream = await Stream
  .from(q.queue(Q_INCOMING))
  // Default key = partition_id = providerId — what we want.
  .windowTumbling({ seconds: 600, idleFlushMs: 5000 })
  .reduce(
    (acc, msg) => {
      if (acc.seen.includes(msg.eventId)) return acc       // dup → drop
      return {
        seen:   [...acc.seen,   msg.eventId],
        firsts: [...acc.firsts, msg]
      }
    },
    { seen: [], firsts: [] }
  )
  .flatMap(window => window.firsts)                       // explode firsts
  .to(q.queue(Q_VERIFIED))
  .run({
    queryId:       'webhook-dedup',
    url,
    batchSize:     200,
    maxPartitions: 8,
    maxWaitMillis: 200,
    reset:         true
  })
javascript · downstream consumer of verified events
// Downstream sees each unique webhook event exactly once per window.
await q.queue(Q_VERIFIED)
  .group('charge-processor')
  .batch(50)
  .each()
  .consume(async (msg) => {
    await chargeCustomer(msg.data)   // safe to run — deduplicated upstream
  })

// Verified run, 1000 unique + 300 duplicates injected over 5 providers:
//   events arriving at incoming:  1300
//   events through to verified:   1000   (expected 1000)
//   duplicates leaked downstream: 0      (expected 0)
//   dedup ratio:                  23.1%  (matches injected ratio)

4. Inspecting state via SQL

While a window is open, the seen-set for that provider is a JSONB field in queen_streams.state. You can monitor dedup effectiveness directly:

sql · how many distinct events have we seen per provider this window
SELECT
    s.key                                              AS provider_or_partition,
    jsonb_array_length(s.value->'acc'->'seen')         AS distinct_events_in_window,
    jsonb_array_length(s.value->'acc'->'firsts')       AS forwarded_downstream,
    to_timestamp(((s.value->>'windowStart')::bigint / 1000.0)) AS window_start,
    to_timestamp(((s.value->>'windowEnd')::bigint / 1000.0))   AS window_end
FROM queen_streams.state s
JOIN queen_streams.queries q ON q.id = s.query_id
WHERE q.name = 'webhook-dedup'
ORDER BY distinct_events_in_window DESC;

Connect this to your existing dashboards as you would any PG table. When the window closes the row disappears; the closed-window aggregate is emitted to webhooks.verified as a regular Queen message that downstream consumers process.

5. Window sizing — how long to dedup for

The window size determines the maximum gap between an original delivery and a redelivery that the system will still deduplicate. Match it to the redelivery window of the upstream provider:

Upstream Redelivery window Suggested seconds
Stripe Up to 3 days 259,200 (3d) — but see below for large windows
Twilio 11 retries over 24h 86,400 (24h)
GitHub manual redelivery Hours; manual 3,600–86,400 (1h–24h)
OTA channel managers Seconds to minutes 60–600 (1min–10min)
Large windows = large seen-sets

The seen-set is held entirely in state during the window's lifetime. For a provider sending 100 req/s with a 24h window, the seen-set has up to 8.6M entries. PG JSONB handles this, but the per-cycle cost of serialising/deserialising the accumulator grows with the set size. For very long windows, consider keeping deduplication state in your own application table (one row per eventId with a TTL index) instead of in stream state.

6. vs. Redis / outbox / hand-rolled

Redis SET + TTL App-side DB table + TTL job Queen streams
Extra infra to operate Yes — Redis No, but TTL job to maintain No — same PG
Durability of seen-set Volatile (unless AOF) Durable Durable (PG-backed)
State eviction TTL + max-memory policy Your own cron job Automatic on window close
SPOF risk Yes — Redis down kills dedup Same as your DB Same as your DB
Per-provider isolation Manual (key prefixes) Manual Free — partition by providerId
Audit / inspection SCAN with cursor SQL on your table SQL on queen_streams.state

7. Common pitfalls

Deduplication is per-window, not global

An event that arrives once in window N and once again in window N+1 will pass through twice. The window boundary is the dedup horizon. If you need stricter "exactly-once forever" semantics, use a long-lived dedup table in your application schema instead.

Partition by the provider, not by the event

The seen-set is sharded by partition_id. If you partition the source queue by eventId instead of providerId, every event lands on its own partition with a seen-set of size 1 — no dedup ever happens. Always partition by the entity whose seen-set you want to share (the provider).

Tune idleFlushMs for quiet providers

A window only closes when (a) a later event arrives that's past the window end, or (b) the idle-flush timer fires. If a provider goes silent, the window stays open until the timer sweeps it. The default in this recipe is 5 seconds — fast enough that you see closed windows promptly even on quiet providers, slow enough that the flush sweep doesn't dominate CPU.

Choosing eventId

Use the most authoritative identifier the upstream gives you. Stripe: event.id. Twilio: MessageSid. GitHub: X-GitHub-Delivery. If the upstream doesn't expose a stable identifier, hash a canonical subset of the payload — but be aware that hash collisions effectively become false-positive duplicates.

8. References

WhatWhere
Full demo (verified live) examples/use-case-dedup.js
In-window dedup reference (streams SDK) streams/examples/04-in-window-dedup.js
Reduce operator (the seen-set accumulator) ReduceOperator.js
Window operators client-v2/streams/operators/
Streams schema (where the seen-set lives) 019_streams_schema.sql
Cycle commit (atomic state + push + ack) 021_streams_cycle_v1.sql

← Back to use cases