Queen MQ

← Use cases

Recipe · backpressure

Rate limiter for external APIs.

Build a production-grade rate limiter that throttles outbound calls to a third-party API (an OTA, an LLM provider, a payment gateway), preserves per-tenant FIFO order on retries, has zero coordination overhead between multiple consumers, and scales linearly. The pattern uses the .gate() primitive shipped in the queen-mq package, with the per-partition lease as natural backpressure — no deferred queue, no timers, no Redis.

9,431 msg/s
aggregate throughput
(8 runners, 100 tenants)
500,000
events processed in
53s · 0 errors
±3%
per-tenant rate accuracy
vs. configured target
94%
efficiency vs. theoretical
minimum drain time

Numbers are from a stress run on a single 10-core M-series Mac with one PostgreSQL 18 container. Reproduce with streams/examples/08-rate-limiter-stress.js.

1. The problem

Many APIs we depend on impose strict rate limits, and exceeding them costs money or service:

A naïve approach (fire requests as fast as they arrive, retry on 429) is brittle: it floods the upstream, burns retries, and cannot enforce fairness across tenants. A correct rate limiter has to:

  1. Hold the surplus durably (no in-memory queue that dies on restart).
  2. Preserve per-key FIFO order across retries (otherwise a stale "set room available = false" can overwrite a fresh "set true").
  3. Coordinate across multiple worker processes without race conditions on the budget counter.
  4. Expose state for ops debugging.

Queen + streams gives all four properties out of the box.

2. The pattern

A producer pushes outbound API calls onto a Queen queue partitioned by the rate-limit key (typically tenant, or tenant + endpoint). A streaming pipeline reads that queue through a .gate() operator that runs a token bucket per partition. Allowed messages are forwarded to a sink queue from which the actual API caller drains.

producer ──► requests (Queen, partitioned by tenant key, leaseTime=2s) │ ▼ Stream .gate(tokenBucketGate({ capacity, refillPerSec, costFn })) │ ALLOWED ───┼─── DENIED │ │ ▼ └─► no ack, no commit approved lease expires (Queen) message redelivered │ in original FIFO order ▼ OTA worker calls the upstream API

When the gate denies a message, the runner skips the cycle entirely (no state mutation, no ack, no sink push) and lets the source-queue lease expire naturally. The broker then redelivers the un-acked tail of the batch in its original order. This is what makes ordering FIFO on retries — the same partition lease sits with the runner across the deny → expiry → redeliver cycle, so denied messages can never overtake later messages.

Why not a deferred queue?

Pushing denied messages onto a "retry later" queue (the typical Kafka-shaped pattern) breaks per-key FIFO order: by the time the deferred message rejoins the main queue, newer messages may have already passed through. Queen's per-partition lease lets us avoid this entirely — the broker itself is the backpressure mechanism.

3. Quickstart

The smallest working example. One tenant, 30 messages pushed in burst, bucket of 10 with 5/sec refill. The first 10 should arrive almost instantly, the rest at 5/sec.

Verified end-to-end

Every snippet on this page was run against a live Queen server at http://localhost:6632 before being included. The quickstart drains in ~6 seconds with all 30 messages arriving in seq order.

One install, one import — JavaScript / Python / Go

The broker client and the streaming SDK ship in the same package on every supported runtime:

# Node.js (npm)
npm install queen-mq

# Python (pip)
pip install queen-mq

# Go
go get github.com/smartpricing/queen/clients/client-go
// JavaScript
import { Queen, Stream, tokenBucketGate, slidingWindowGate } from 'queen-mq'
# Python
from queen import Queen, Stream, token_bucket_gate, sliding_window_gate
// Go
import (
    queen "github.com/smartpricing/queen/clients/client-go"
    "github.com/smartpricing/queen/clients/client-go/streams"
    "github.com/smartpricing/queen/clients/client-go/streams/helpers"
)

The same .gate() primitive, helper factories, and config_hash wire format work identically across all three languages. A query registered by the JS client can be resumed by a worker written in Go (or vice versa) — the SHA-256 of the operator chain matches across runtimes.

javascript · single-tenant token bucket
// npm install queen-mq        (Node 22+)
import { Queen, Stream, tokenBucketGate } from 'queen-mq'

const url = 'http://localhost:6632'
const q = new Queen({ url, handleSignals: false })

await q.queue('rl.requests').config({ leaseTime: 2, retryLimit: 1000 }).create()
await q.queue('rl.approved').create()

// Push 30 messages on partition 'tenant-A' as a burst.
await q.queue('rl.requests').partition('tenant-A').push(
  Array.from({ length: 30 }, (_, i) => ({ data: { seq: i } }))
)

// Token bucket: burst 10, refill 5/sec.
const stream = await Stream
  .from(q.queue('rl.requests'))
  .gate(tokenBucketGate({ capacity: 10, refillPerSec: 5 }))
  .to(q.queue('rl.approved'))
  .run({
    queryId:   'docs-rl-quickstart',
    url,
    batchSize: 10,
    reset:     true
  })

// Drain the approved queue and observe arrival order + timing.
const t0 = Date.now()
const arrivals = []
const cg = 'docs-rl-quickstart-cg'
while (arrivals.length < 30) {
  const batch = await q.queue('rl.approved')
    .group(cg).batch(10).wait(true).timeoutMillis(2000).pop()
  if (!batch || batch.length === 0) continue
  for (const m of batch) arrivals.push({ seq: m.data.seq, at: Date.now() - t0 })
  await q.ack(batch, true, { group: cg })
}
await stream.stop()

console.log('first 10 ms:', arrivals.slice(0, 10).map(a => a.at).join(' '))
console.log('last  10 ms:', arrivals.slice(-10).map(a => a.at).join(' '))
// Output (verified):
//   first 10 ms: 128 128 128 128 128 128 128 128 128 128
//   last  10 ms: 5961 5961 5961 5961 5961 5961 5961 5961 5961 5961
//   30 messages, all in seq order [0..29].

The bucket takes ~6s to drain 30 messages: 10 burst (instant) + 20 more at 5/sec ≈ 4s. The remaining ~2s is broker round-trip overhead from the lease cycle.

python · single-tenant token bucket
# pip install queen-mq        (Python 3.10+)
import asyncio
from queen import Queen, Stream, token_bucket_gate

URL = "http://localhost:6632"

async def main():
    async with Queen(url=URL) as q:
        await q.queue("rl.requests").config(lease_time=2, retry_limit=1000).create()
        await q.queue("rl.approved").create()

        # Push 30 messages on partition 'tenant-A' as a burst.
        await q.queue("rl.requests").partition("tenant-A").push(
            [{"data": {"seq": i}} for i in range(30)]
        )

        # Token bucket: burst 10, refill 5/sec.
        stream = await (
            Stream.from_(q.queue("rl.requests"))
            .gate(token_bucket_gate(capacity=10, refill_per_sec=5))
            .to(q.queue("rl.approved"))
            .run(query_id="docs-rl-quickstart-py", url=URL, batch_size=10, reset=True)
        )

        cg = "docs-rl-quickstart-cg"
        arrivals = []
        while len(arrivals) < 30:
            batch = await q.queue("rl.approved").group(cg).batch(10).wait(True).timeout_millis(2000).pop()
            if not batch:
                continue
            for m in batch:
                arrivals.append(m.get("data", {}).get("seq"))
            await q.ack(batch, True, {"group": cg})
        await stream.stop()
        print("seqs:", arrivals)
        # All 30 messages, in seq order [0..29].

asyncio.run(main())
go · single-tenant token bucket
// go get github.com/smartpricing/queen/clients/client-go
package main

import (
    "context"
    "fmt"

    queen "github.com/smartpricing/queen/clients/client-go"
    "github.com/smartpricing/queen/clients/client-go/streams"
    "github.com/smartpricing/queen/clients/client-go/streams/helpers"
)

func main() {
    url := "http://localhost:6632"
    q, _ := queen.New(url)
    defer q.Close(context.Background())

    _, _ = q.Queue("rl.requests").Config(queen.QueueConfig{
        LeaseTime: 2, RetryLimit: 1000,
    }).Create().Execute(context.Background())
    _, _ = q.Queue("rl.approved").Create().Execute(context.Background())

    // Push 30 messages on partition 'tenant-A' as a burst.
    for i := 0; i < 30; i++ {
        _, _ = q.Queue("rl.requests").Partition("tenant-A").Push(
            map[string]interface{}{"seq": i},
        ).Execute(context.Background())
    }

    // Token bucket: burst 10, refill 5/sec.
    gateFn := helpers.TokenBucketGate(helpers.TokenBucketGateOptions{
        Capacity: 10, RefillPerSec: 5,
    })
    stream := streams.From(q.Queue("rl.requests").AsStreamSource()).
        Gate(gateFn).
        To(q.Queue("rl.approved"))

    runner, _ := stream.Run(context.Background(), streams.RunOptions{
        QueryID: "docs-rl-quickstart-go", URL: url, BatchSize: 10, Reset: true,
    })

    cg := "docs-rl-quickstart-cg"
    drained := 0
    for drained < 30 {
        batch, _ := q.Queue("rl.approved").Group(cg).Batch(10).Wait(true).TimeoutMillis(2000).Pop(context.Background())
        if len(batch) == 0 { continue }
        for _, m := range batch {
            fmt.Println("seq:", m.Data["seq"])
        }
        drained += len(batch)
        _, _ = q.Ack(context.Background(), batch, true, queen.AckOptions{ConsumerGroup: cg})
    }
    runner.Stop()
}

Same wire protocol, same config_hash, same per-partition lease back-pressure semantics. Pick whichever runtime fits your service; the bucket state is shared via PostgreSQL so workers written in different languages can co-process the same query.

4. Four canonical models

Real APIs measure rate limits in different units. The same .gate() primitive expresses all four common shapes — what changes is only the costFn and the queue contents:

Model Real-world example Queue contains costFn
A — req/sec
"100 calls/sec, batch arbitrary inside"
Airbnb StandardCalendars; Google Hotel uploads Pre-batched HTTP requests
each carrying an items[]
() => 1
B — msg/sec
"100 ops/sec, regardless of batching"
WhatsApp Cloud per-phone tier; Twilio; SendGrid Individual messages
(one logical operation per item)
() => 1
C — 1:1
"each call carries exactly one op"
Legacy VRBO/Expedia "one operation per call" endpoints Individual operations () => 1
D — cost-based
"weight per request varies"
Expedia weight-per-endpoint; OpenAI TPM Requests with a weight field msg => msg.weight

Models A, B, and C share the same costFn — they differ only in what you put on the queue. Model D shows the only place where the gate code itself changes: the cost function reads the message payload to compute how many tokens to charge.

Model D — cost-based example

100 requests with weights 1..5 (avg 3, total 300). Bucket of 100, refill 50/sec. Drains in ~6 seconds.

javascript · variable-cost token bucket
import { Queen, Stream, tokenBucketGate } from 'queen-mq'

const q = new Queen({ url: 'http://localhost:6632', handleSignals: false })

await q.queue('rl.cost.requests').config({ leaseTime: 2, retryLimit: 1000 }).create()
await q.queue('rl.cost.approved').create()

// 100 requests, weight 1..5 each (deterministic for reproducibility).
const items = Array.from({ length: 100 }, (_, i) => ({
  data: { seq: i, weight: (i % 5) + 1 }
}))
await q.queue('rl.cost.requests').partition('tenant-A').push(items)

const stream = await Stream
  .from(q.queue('rl.cost.requests'))
  .gate(tokenBucketGate({
    capacity:     100,
    refillPerSec: 50,
    costFn:       msg => msg.weight        // ★ cost varies per request
  }))
  .to(q.queue('rl.cost.approved'))
  .run({ queryId: 'docs-rl-cost', url: 'http://localhost:6632', batchSize: 30, reset: true })

// Verified: drained 100 requests, 300 total weight in 5.71s
//           weight rate sustained: 52.6 weight/sec (target 50)
//           order intact: true

5. Two limits at once (model A + B cascade)

Some APIs impose both a request-per-second limit AND an operations-per-second limit at the same time. Booking.com is the canonical example: "≤ 100 calls/sec AND ≤ 1000 ops/sec per partner". Both must be satisfied simultaneously.

Solution: chain two .gate() stages via an intermediate queue. The MIN of the two binding rates wins. Per-partition order is preserved across the chain because each stage maintains its own lease on the same partition key.

javascript · two gates in series
import { Queen, Stream, tokenBucketGate } from 'queen-mq'

const q = new Queen({ url: 'http://localhost:6632', handleSignals: false })

await q.queue('rl.cascade.src').config({ leaseTime: 2, retryLimit: 1000 }).create()
await q.queue('rl.cascade.mid').config({ leaseTime: 2, retryLimit: 1000 }).create()
await q.queue('rl.cascade.ok').create()

// 60 requests, each carrying 5 items.
const items = Array.from({ length: 60 }, (_, i) => ({
  data: {
    seq: i,
    items: Array.from({ length: 5 }, (_, j) => ({ id: `${i}-${j}` }))
  }
}))
await q.queue('rl.cascade.src').partition('tenant-A').push(items)

// Gate 1: 10 req/sec
const gate1 = await Stream
  .from(q.queue('rl.cascade.src'))
  .gate(tokenBucketGate({
    capacity:     20,
    refillPerSec: 10,
    costFn:       () => 1                     // 1 token per request
  }))
  .to(q.queue('rl.cascade.mid'))
  .run({ queryId: 'docs-rl-casc-1', url: 'http://localhost:6632',
         batchSize: 50, reset: true })

// Gate 2: 30 msg/sec (cost = items.length)
const gate2 = await Stream
  .from(q.queue('rl.cascade.mid'))
  .gate(tokenBucketGate({
    capacity:     60,
    refillPerSec: 30,
    costFn:       msg => msg.items.length     // weight by batch size
  }))
  .to(q.queue('rl.cascade.ok'))
  .run({ queryId: 'docs-rl-casc-2', url: 'http://localhost:6632',
         batchSize: 50, reset: true })

// Verified: 60 requests in 10.00s
//           req rate: 6.0/sec (gate1 limit 10)   ← gate2 is binding
//           msg rate: 30.0/sec (gate2 limit 30)
//           order intact: true

In the verified run, gate 2 is the binding limit: the request rate stabilises at 6/sec (well below gate 1's limit of 10) precisely because each request carries 5 items and gate 2 caps the total at 30 msg/sec ⇒ 30/5 = 6 req/sec.

6. Sliding-window quotas (model E)

Some APIs impose quota-style limits — "max N events in any rolling window of W seconds" — typically used for daily caps ("max 1000 emails/day per IP") or hourly bursts. The slidingWindowGate helper implements an efficient 2-bucket approximation that is accurate within ±1× the rate at boundaries (precise enough for rate-limiting; for billing-grade precision keep per-event timestamps in state).

Critical sizing rule

For sliding windows, WINDOW_SEC ≥ 5 × LEASE_SEC. If they're comparable (e.g. lease=2s ≈ window=2s) every lease expiry lands at a window boundary and the previous-bucket weight blocks the entire next window, collapsing throughput to ~half of nominal. Real quota use cases (windows of minutes, hours, days) are far above typical lease times, so this is mostly an issue for synthetic tests.

javascript · sliding-window quota
import { Queen, Stream, slidingWindowGate } from 'queen-mq'

const q = new Queen({ url: 'http://localhost:6632', handleSignals: false })

// IMPORTANT: leaseTime should be much smaller than the window (rule of thumb: ×5).
await q.queue('rl.sw.requests').config({ leaseTime: 1, retryLimit: 1000 }).create()
await q.queue('rl.sw.approved').create()

await q.queue('rl.sw.requests').partition('tenant-A').push(
  Array.from({ length: 60 }, (_, i) => ({ data: { seq: i } }))
)

const stream = await Stream
  .from(q.queue('rl.sw.requests'))
  .gate(slidingWindowGate({ limit: 20, windowSec: 5 }))   // max 20 in any 5s window
  .to(q.queue('rl.sw.approved'))
  .run({ queryId: 'docs-rl-sw', url: 'http://localhost:6632',
         batchSize: 50, reset: true })

// Verified: drained 60 messages in 12.41s
//           max arrivals in any 5s window: 29 (limit 20)
//           sustained rate: 4.75 msg/sec (target 4 = 20/5)
//           order intact: true

The 29 messages observed in the worst 5-second window (vs. the configured limit of 20) is the documented ±1× boundary inaccuracy of the 2-bucket approximation. For real-world quotas (e.g. 1000/day) the same approximation gives 1000-1500 across any 24h window, which is acceptable in nearly all use cases.

7. Sizing rule

For the token bucket, the maximum sustainable rate per partition is:

effective_rate = min(refillPerSec, capacity / leaseSec)

The reason: each lease cycle, the runner can drain at most capacity tokens before being denied; after the deny the partition is parked for leaseSec seconds before the next cycle. So if capacity is too small, the deny cadence becomes the bottleneck rather than the refill rate.

Recommendation: always set capacity ≥ refillPerSec × leaseSec. This guarantees the cycle quantum doesn't artificially clamp the bucket. The default config in streams/examples/08-rate-limiter-stress.js uses capacity = refillPerSec × leaseSec exactly.

Quick formula

For a target of R ops/sec/tenant on a 2-second source-queue lease, set refillPerSec: R and capacity: 2 × R. Then set batchSize on the streaming runner to at least capacity so a full bucket can be drained in a single cycle.

8. Scaling with multiple consumers

A single Node process running Stream.run() sustains roughly 1500–2500 msg/sec end-to-end on a modest setup, limited by HTTP round-trips and PG cycle latency. To scale higher, run multiple consumers in parallel — each calls .run() with the same queryId. Queen's per-partition lease guarantees that a given partition is processed by AT MOST ONE runner at any moment, so per-key state writes never collide and ordering stays intact.

javascript · 4 parallel runners on the same query
// Same gate definition, run N times. Only the FIRST runner uses reset:true
// so a re-run of the program starts clean. The rest attach with reset:false
// so they don't wipe state the first one already started building.
async function spawnRateLimiter({ runners, queryId, sourceQueue, sinkQueue,
                                 capacity, refillPerSec, costFn }) {
  const url = 'http://localhost:6632'
  const q = new Queen({ url, handleSignals: false })

  const buildStream = () => Stream
    .from(q.queue(sourceQueue))
    .gate(tokenBucketGate({ capacity, refillPerSec, costFn }))
    .to(q.queue(sinkQueue))

  const streams = []
  streams.push(await buildStream().run({
    queryId, url, batchSize: capacity, maxPartitions: 16, reset: true
  }))
  for (let i = 1; i < runners; i++) {
    streams.push(await buildStream().run({
      queryId, url, batchSize: capacity, maxPartitions: 16, reset: false
    }))
  }
  return streams
}

// Spawn 4 runners locally; throughput scales ~linearly.
const streams = await spawnRateLimiter({
  runners:      4,
  queryId:      'ota-rate-limiter.v1',
  sourceQueue:  'ota.requests',
  sinkQueue:    'ota.approved',
  capacity:     200,
  refillPerSec: 100,
  costFn:       () => 1
})

In production each runner is typically its own Pod. The state coordination is fully handled by PostgreSQL: every gate decision reads the bucket state from queen_streams.state within the same transaction that mutates it, and the partition lease enforces single-writer semantics across all runners.

9. Stress test results

The pattern was stress-tested at three scales. Source: streams/examples/08-rate-limiter-stress.js.

Config Total msgs Time Aggregate throughput Per-tenant rate (avg) Errors Efficiency
10 tenants × 500 msg, 1 runner 5,000 6.4s 785 msg/s 101.4 (target 100) 0 78%
50 tenants × 2,000 msg, 4 runners 100,000 22.6s 4,426 msg/s 96.9 (target 100) 0 89%
100 tenants × 5,000 msg, 8 runners 500,000 53.0s 9,431 msg/s 100.5 (target 100) 0 94%

Throughput scales nearly linearly with the number of runners. The same test was repeated for all four canonical models (A, B, C, D) plus the cascade and sliding window — all 5 verifiable against streams/examples/09-rate-limiter-all-models.js:

Model Description Drained Per-tenant rate (target) Aggregate Time Status
Areq/s100,000104.0 (100)4,702/s21.3s
Bmsg/s100,000106.0 (100)4,713/s21.2s
Dcost-based100,000 reqs
(550k weight)
1,051 weight/s (1,000)42,953 w/s12.8s
A+Bcascading50,000119.9 req/s (100)
+ 511 msg/s (500)
4,671/s10.7s
Esliding-window
(20 / 10s)
10,00011.2 (10)518/s19.3s

10. Inspecting state

All gate state lives in the queen_streams.state table — a plain PostgreSQL table you can query directly for ops debugging.

sql · current bucket state per tenant
-- All keys for a query, sorted by remaining tokens (lowest = closest to throttle).
SELECT
    s.key                         AS tenant_or_partition,
    (s.value->>'tokens')::float   AS tokens_remaining,
    (s.value->>'allowedTotal')::int AS lifetime_allowed,
    (s.value->>'consumedTotal')::int AS lifetime_cost,
    s.updated_at
FROM queen_streams.state s
JOIN queen_streams.queries q ON q.id = s.query_id
WHERE q.name = 'ota-rate-limiter.v1'
ORDER BY tokens_remaining ASC
LIMIT 20;
sql · throttle alert: who is empty?
SELECT s.key, (s.value->>'tokens')::float AS tokens
FROM queen_streams.state s
JOIN queen_streams.queries q ON q.id = s.query_id
WHERE q.name = 'ota-rate-limiter.v1'
  AND (s.value->>'tokens')::float < 1
ORDER BY s.updated_at DESC;

Because the state is plain SQL, you can hook it directly into Grafana, Metabase, or your existing alerting pipeline — no extra metrics exporter required.

11. Common pitfalls

Don't use .gate() with windowing in the same stream

The gate's partial-ack-on-deny semantics are incompatible with the full-batch atomic-cycle model that windows + reducers assume. The Stream compiler enforces this with a clear error. If you need both rate limiting AND windowed aggregation, run them as two sequential streams with an intermediate queue.

Partition by the rate-limit key, not by something else

The bucket state is sharded by partition_id in queen_streams.state. If you partition the source queue by tenant but rate-limit by tenant + endpoint (via .keyBy()), two different runners can hold different partition leases yet write the same logical key, causing cross-worker contention. Always partition by exactly the rate-limit key. The runtime warns you if you use .keyBy() to override.

Set retryLimit to a large number

A denied message goes back into the queue via lease expiry — Queen reads that as a "retry". On a heavily throttled tenant a single message can be redelivered hundreds of times before its bucket refills enough. Set retryLimit to ≥ 1000 on the source queue or messages will end up in the DLQ unfairly. (We use 100,000 in the stress tests.)

Sliding-window: window must dwarf the lease

Repeating from the sizing callout: for slidingWindowGate, ensure windowSec ≥ 5 × leaseSec on the source queue. Otherwise deny→retry round-trips collide with window boundaries and effective throughput collapses.

12. References

All helpers used in this recipe are tiny pure-JavaScript factories defined in a single file (~140 lines). You can read the full source in the time it takes to read this page:

WhatWhereRe-exported from
tokenBucketGate({ capacity, refillPerSec, costFn }) client-v2/streams/helpers/rateLimiter.js (function declaration) queen-mq top-level
(see client-v2/index.js)
slidingWindowGate({ limit, windowSec, costFn }) client-v2/streams/helpers/rateLimiter.js (function declaration) queen-mq top-level
(same)
Minimal example (used in §3 quickstart) streams/examples/07-rate-limiter.js
Stress test (used in §9 numbers) streams/examples/08-rate-limiter-stress.js
All-models test (A / B / C / D / cascade / sliding-window) streams/examples/09-rate-limiter-all-models.js
The .gate() operator (compiler integration) client-v2/streams/operators/GateOperator.js
Stored procedure (SP) implementing the cycle commit lib/schema/procedures/021_streams_cycle_v1.sql

← Back to use cases