Queen MQ
Clients

Five clients. One mental model.

Every Queen client speaks the same fluent grammar: queue(name).partition(p).group(g).…push() / .pop() / .consume(). Switch languages, the verbs stay the same. Every snippet on this page was verified end-to-end against a live Queen server at http://localhost:6632.

Client Min runtime Install Source
JavaScript · Node and browser Node.js 22+ npm install queen-mq client-js/
Python Python 3.8+ pip install queen-mq client-py/
Go Go 1.24+ go get github.com/smartpricing/queen/client-go client-go/
PHP / Laravel PHP 8.3+ composer require smartpricing/queen-mq client-laravel/
C++ · header-only C++17 copy queen_client.hpp client-cpp/

Install

npm install queen-mq
Pure ESM. Works in Node 22+ and modern browsers (no native deps).
pip install queen-mq
Async-first (httpx). Sync wrappers available where natural.
go get github.com/smartpricing/queen/client-go
Single module. No CGO.
composer require smartpricing/queen-mq

# Laravel only, publish config and set env:
php artisan vendor:publish --tag=queen-config
Standalone PHP works on any 8.3+ project. Laravel service provider, facade, and artisan command are auto-discovered.
# Header-only client.
curl -O https://raw.githubusercontent.com/smartpricing/queen/master/clients/client-cpp/queen_client.hpp

# Plus cpp-httplib + nlohmann/json (header-only deps).
brew install cpp-httplib   # macOS
# or just download httplib.h alongside queen_client.hpp
C++17. Includes a built-in thread pool, async buffer manager, and graceful shutdown.

Connect

import { Queen } from 'queen-mq'

// 1) Single server
const queen = new Queen('http://localhost:6632')

// 2) Multiple servers, automatic failover + load balancing
const queenHA = new Queen([
  'http://queen-a:6632',
  'http://queen-b:6632',
])

// 3) Full config (JWT, custom headers, retry, affinity)
const queenFull = new Queen({
  urls: ['http://queen-a:6632', 'http://queen-b:6632'],
  loadBalancingStrategy: 'affinity',  // 'round-robin' | 'session' | 'affinity'
  enableFailover: true,
  retryAttempts: 3,
  bearerToken: process.env.QUEEN_TOKEN,
})
from queen import Queen

# Use as async context manager, closes the HTTP pool cleanly.
async with Queen('http://localhost:6632') as queen:
    ...

# Multiple servers
async with Queen(['http://queen-a:6632', 'http://queen-b:6632']) as queen:
    ...

# Full config
queen = Queen({
    'urls': ['http://queen-a:6632', 'http://queen-b:6632'],
    'load_balancing_strategy': 'affinity',
    'enable_failover': True,
    'retry_attempts': 3,
    'bearer_token': os.environ.get('QUEEN_TOKEN'),
})
import queen "github.com/smartpricing/queen/client-go"

// Single
client, _ := queen.New("http://localhost:6632")
defer client.Close(ctx)

// HA + auth
client, _ = queen.New(queen.ClientConfig{
    URLs: []string{"http://queen-a:6632", "http://queen-b:6632"},
    LoadBalancingStrategy: "affinity",
    EnableFailover:        true,
    RetryAttempts:         3,
    BearerToken:           os.Getenv("QUEEN_TOKEN"),
})
use Queen\Queen;

// Standalone
$queen = new Queen('http://localhost:6632');

// HA + auth
$queen = new Queen([
    'urls' => ['http://queen-a:6632', 'http://queen-b:6632'],
    'loadBalancingStrategy' => 'affinity',
    'bearerToken' => env('QUEEN_TOKEN'),
]);

// Laravel, auto-resolves the singleton from config/queen.php
use Queen\Laravel\QueenFacade as Queen;
Queen::queue('orders')->push([['data' => ['ok' => true]]])->execute();
#include "queen_client.hpp"
using namespace queen;

QueenClient client("http://localhost:6632");

// HA
ClientConfig cfg;
cfg.load_balancing_strategy = "round-robin";
cfg.enable_failover = true;
cfg.retry_attempts = 3;
QueenClient cluster({"http://queen-a:6632", "http://queen-b:6632"}, cfg);

Configure a queue

Queues are created on first push by default. Configure them explicitly when you need non-default lease, retry, retention, or encryption.

await queen.queue('orders')
  .config({
    leaseTime: 300,
    retryLimit: 3,
    retentionSeconds: 86400,
    encryptionEnabled: false,
  })
  .create()
await queen.queue('orders').config({
    'leaseTime': 300,
    'retryLimit': 3,
    'retentionSeconds': 86400,
    'encryptionEnabled': False,
}).create()
_, err := client.Queue("orders").Config(queen.QueueConfig{
    LeaseTime:        300,
    RetryLimit:       3,
    RetentionSeconds: 86400,
}).Create().Execute(ctx)
$queen->queue('orders')
    ->config(['leaseTime' => 300, 'retryLimit' => 3, 'retentionSeconds' => 86400])
    ->create()
    ->execute();
QueueConfig cfg;
cfg.lease_time = 300;
cfg.retry_limit = 3;
cfg.retention_seconds = 86400;
cfg.encryption_enabled = false;

client.queue("orders").config(cfg).create();
curl -X POST http://localhost:6632/api/v1/configure \
  -H 'Content-Type: application/json' \
  -d '{
    "queue":"orders",
    "options":{"leaseTime":300,"retryLimit":3,"retentionSeconds":86400}
  }'

Push

// Plain push
await queen.queue('orders').push([
  { data: { orderId: 1, amount: 99.99 } },
])

// Push to a partition for ordering
await queen.queue('orders').partition('user-123').push([
  { data: { orderId: 2, amount: 50 } },
])

// With a transactionId for exactly-once dedup
await queen.queue('orders').push([
  { transactionId: 'order-2-v1', data: { orderId: 2, amount: 50 } },
])

// Many at once
await queen.queue('orders').push([
  { data: { orderId: 3 } },
  { data: { orderId: 4 } },
  { data: { orderId: 5 } },
])
await queen.queue('orders').push([
    {'data': {'orderId': 1, 'amount': 99.99}},
])

await queen.queue('orders').partition('user-123').push([
    {'data': {'orderId': 2, 'amount': 50}},
])

await queen.queue('orders').push([
    {'transactionId': 'order-2-v1', 'data': {'orderId': 2}},
])
_, err := client.Queue("orders").Push([]interface{}{
    map[string]interface{}{"orderId": 1, "amount": 99.99},
}).Execute(ctx)

_, err = client.Queue("orders").Partition("user-123").Push([]interface{}{
    map[string]interface{}{"orderId": 2, "amount": 50},
}).Execute(ctx)

_, err = client.Queue("orders").
    Push([]interface{}{map[string]interface{}{"orderId": 2}}).
    TransactionID("order-2-v1").
    Execute(ctx)
$queen->queue('orders')->push([
    ['data' => ['orderId' => 1, 'amount' => 99.99]],
])->execute();

$queen->queue('orders')->partition('user-123')->push([
    ['data' => ['orderId' => 2, 'amount' => 50]],
])->execute();

$queen->queue('orders')->push([
    ['transactionId' => 'order-2-v1', 'data' => ['orderId' => 2]],
])->execute();
client.queue("orders").push({
    {{"data", {{"orderId", 1}, {"amount", 99.99}}}},
});

client.queue("orders").partition("user-123").push({
    {{"data", {{"orderId", 2}, {"amount", 50}}}},
});
curl -X POST http://localhost:6632/api/v1/push \
  -H 'Content-Type: application/json' \
  -d '{
    "items":[
      {"queue":"orders","partition":"user-123",
       "payload":{"orderId":2,"amount":50},
       "transactionId":"order-2-v1"}
    ]
  }'

Pop & ack

Lower-level than consume(). Pops a batch, hands it to you, and you ack each message yourself. Use this when you need full control over ack timing or want to bridge into your own framework.

const messages = await queen.queue('orders').batch(10).pop()

for (const m of messages) {
  try {
    await processOrder(m.data)
    await queen.ack(m, true)             // success
  } catch (e) {
    await queen.ack(m, false, { error: e.message })  // retry
  }
}
messages = await queen.queue('orders').batch(10).pop()
for m in messages:
    try:
        await process_order(m['data'])
        await queen.ack(m, True)
    except Exception as e:
        await queen.ack(m, False, error=str(e))
messages, _ := client.Queue("orders").Batch(10).Pop(ctx)
for _, m := range messages {
    if err := processOrder(m.Data); err != nil {
        client.Ack(ctx, m, false, queen.AckOptions{Error: err.Error()})
    } else {
        client.Ack(ctx, m, true, queen.AckOptions{})
    }
}
$messages = $queen->queue('orders')->batch(10)->pop();

foreach ($messages as $m) {
    try {
        processOrder($m['data']);
        $queen->ack($m, true);
    } catch (\Throwable $e) {
        $queen->ack($m, false, ['error' => $e->getMessage()]);
    }
}
auto messages = client.queue("orders").batch(10).wait(false).pop();

for (const auto& m : messages) {
    try {
        processOrder(m["data"]);
        client.ack(m, true);
    } catch (const std::exception& e) {
        client.ack(m, false, {{"error", e.what()}});
    }
}
curl 'http://localhost:6632/api/v1/pop/queue/orders?batch=10'

# returns {"messages":[{"transactionId":"...","partitionId":"...",...}, ...]}

curl -X POST http://localhost:6632/api/v1/ack \
  -H 'Content-Type: application/json' \
  -d '{
    "transactionId":"<tx>",
    "partitionId":"<pid>",
    "status":"completed"
  }'

Consume (auto-managed worker)

The high-level path. consume() takes care of the loop, batching, lease renewal, error handling, and ack semantics for you.

await queen.queue('orders')
  .concurrency(10)        // 10 parallel workers
  .batch(20)              // up to 20 messages per poll
  .each()                 // handler receives one message at a time
  .consume(async (m) => {
    await processOrder(m.data)
    // auto-ack on resolve, auto-nack on throw
  })
async def handler(m):
    await process_order(m['data'])

await (queen.queue('orders')
       .concurrency(10)
       .batch(20)
       .consume(handler))   # batch=1 → handler receives single msg
                            # batch>1 → handler receives a list
err := client.Queue("orders").
    Concurrency(10).
    Batch(20).
    Consume(ctx, func(c context.Context, m *queen.Message) error {
        return processOrder(m.Data)
    }).Execute(ctx)
$consumer = $queen->queue('orders')
    ->group('processors')
    ->batch(1)
    ->autoAck(false)
    ->getConsumer();

$consumer->subscribe();

while (!$consumer->isClosed()) {
    $m = $consumer->consume(1000); // ms timeout
    if ($m === null) continue;

    try {
        processOrder($m['data']);
        $consumer->ack($m);
    } catch (\Throwable $e) {
        $consumer->nack($m, $e->getMessage());
    }
}
client.queue("orders")
    .concurrency(10)
    .batch(20)
    .consume([](const json& msgs) {
        for (const auto& m : msgs) {
            processOrder(m["data"]);
        }
    });

Multi-partition pop

Drain up to N partitions in a single network round-trip with .partitions(N). Designed for queues with many partitions where each partition only has a handful of new messages per polling interval, claiming one partition per call wastes bandwidth. With .partitions(N), .batch(B) becomes a global cap on total messages returned across all claimed partitions, and every claimed partition shares a single lease (one renew() call extends them all).

Default is .partitions(1), single-partition behaviour, byte-equivalent to a normal wildcard pop. Each returned message carries its own partitionId, partition, leaseId, and consumerGroup, so ACK and renew always work message-by-message regardless of how many partitions the batch spans.

// One round-trip drains up to 200 messages spread across up to 50 partitions.
// batch(200) is the GLOBAL cap; partitions(50) is the partition fan-out cap.
const messages = await queen.queue('events')
  .batch(200)
  .partitions(50)
  .wait(true)
  .pop()

for (const m of messages) {
  console.log(`from ${m.partition}:`, m.data)   // m.partitionId / m.leaseId baked in
}

await queen.renew(messages)   // one renew extends every claimed partition's lease

// Same builder works for long-running consumers:
await queen.queue('events')
  .batch(100)
  .partitions(8)
  .consume(async (msgs) => {
    // msgs may span multiple partitions; FIFO is preserved within each partition
    for (const m of msgs) await process(m.data)
  })
# One round-trip drains up to 200 messages from up to 50 partitions.
messages = await (queen.queue('events')
                  .batch(200)
                  .partitions(50)
                  .wait(True)
                  .pop())

for m in messages:
    print(f"from {m['partition']}:", m['data'])  # partitionId / leaseId baked in

await queen.renew(messages)   # one renew extends every claimed partition's lease

# Same builder works for long-running consumers:
async def handler(msgs):
    for m in msgs:
        await process(m['data'])

await (queen.queue('events')
       .batch(100)
       .partitions(8)
       .consume(handler))
// One round-trip drains up to 200 messages from up to 50 partitions.
messages, _ := client.Queue("events").
    Batch(200).
    Partitions(50).
    Wait(true).
    Pop(ctx)

for _, m := range messages {
    fmt.Printf("from %s: %v\n", m.Partition, m.Data)
}

client.Renew(ctx, messages)  // one renew extends every claimed partition's lease

// Same builder works for long-running consumers:
client.Queue("events").
    Batch(100).
    Partitions(8).
    Consume(ctx, func(c context.Context, m *queen.Message) error {
        return process(m.Data)
    }).Execute(ctx)
// One round-trip drains up to 200 messages from up to 50 partitions.
$messages = $queen->queue('events')
    ->batch(200)
    ->partitions(50)
    ->wait(true)
    ->pop();

foreach ($messages as $m) {
    echo "from {$m['partition']}: " . json_encode($m['data']) . "\n";
}

$queen->renew($messages);  // one renew extends every claimed partition's lease

// Same builder works for the high-level consumer:
$consumer = $queen->queue('events')
    ->batch(100)
    ->partitions(8)
    ->getConsumer();
// One round-trip drains up to 200 messages from up to 50 partitions.
auto messages = client.queue("events")
    .batch(200)
    .partitions(50)
    .wait(true)
    .pop();

for (const auto& m : messages) {
    std::cout << "from " << m["partition"] << ": " << m["data"].dump() << "\n";
}

client.renew(messages);  // one renew extends every claimed partition's lease

// Same builder works for long-running consumers:
client.queue("events")
    .batch(100)
    .partitions(8)
    .consume([](const json& msgs) {
        for (const auto& m : msgs) process(m["data"]);
    });
# Wildcard pop with multi-partition fan-out: drain up to 200 messages from up
# to 50 partitions in a single call. partitions=1 (default) preserves legacy
# single-partition behaviour.
curl 'http://localhost:6632/api/v1/pop/queue/events?batch=200&partitions=50&wait=true'

# Response shape (note partitionsClaimed and per-message partitionId):
# {
#   "success": true,
#   "queue": "events",
#   "partition": "p7",                    # first claimed partition (back-compat)
#   "partitionId": "...",                 # first claimed partitionId (back-compat)
#   "leaseId": "worker-uuid",             # shared across all claimed partitions
#   "messages": [
#     {"transactionId":"...","partitionId":"...","partition":"p7","leaseId":"...", ...},
#     {"transactionId":"...","partitionId":"...","partition":"p12","leaseId":"...", ...},
#     ...
#   ],
#   "partitionsClaimed": 8                # how many partitions actually contributed
# }

When to use it. Many partitions, each with few messages per polling interval (per-customer event streams, per-tenant work queues, per-device telemetry). For a queue with P active partitions and k messages per partition per cycle, .partitions(N) reduces round-trips from P to P / N while preserving per-partition FIFO ordering.

When not to use it. Few partitions, or each one busy enough to fill .batch(B) on its own, default .partitions(1) is already optimal there. .partitions(N) only affects wildcard pops; specifying .partition('name') ignores the cap.

Consumer groups

Same group → workers share the load. Different groups → fan-out (each group sees every message). Add .subscriptionMode('new') to skip historical messages.

// Group A: emailer (load-balanced across two workers)
queen.queue('events').group('emailer').consume(emailHandler1)
queen.queue('events').group('emailer').consume(emailHandler2)

// Group B: analytics, sees the same messages independently
queen.queue('events').group('analytics').consume(analyticsHandler)

// Skip backlog, only process messages pushed after this group joins
queen.queue('events').group('realtime')
  .subscriptionMode('new')
  .consume(rtHandler)

// Replay from a timestamp
queen.queue('events').group('replay-2024')
  .subscriptionFrom('2024-01-01T00:00:00Z')
  .consume(replayHandler)
await queen.queue('events').group('emailer').consume(email_handler)
await queen.queue('events').group('analytics').consume(analytics_handler)

await (queen.queue('events').group('realtime')
       .subscription_mode('new')
       .consume(rt_handler))

await (queen.queue('events').group('replay-2024')
       .subscription_from('2024-01-01T00:00:00Z')
       .consume(replay_handler))
client.Queue("events").Group("emailer").
    Consume(ctx, emailHandler).Execute(ctx)

client.Queue("events").Group("realtime").
    SubscriptionMode(queen.SubscriptionModeNew).
    Consume(ctx, rtHandler).Execute(ctx)
# Pop with consumer group + subscription mode
curl 'http://localhost:6632/api/v1/pop/queue/events?consumerGroup=emailer&batch=10'
curl 'http://localhost:6632/api/v1/pop/queue/events?consumerGroup=realtime&subscriptionMode=new'

# Ack within a group (REQUIRED for cg semantics)
curl -X POST http://localhost:6632/api/v1/ack \
  -H 'Content-Type: application/json' \
  -d '{"transactionId":"<tx>","partitionId":"<pid>","status":"completed","consumerGroup":"emailer"}'

Transactions, atomic ack + push

await queen
  .transaction()
  .ack(message, 'completed', { consumerGroup: 'analytics' })
  .queue('next-stage').partition('p2').push([
    { data: derived, transactionId: message.transactionId + '-next' },
  ])
  .commit()
await (queen.transaction()
       .ack(message, 'completed', consumer_group='analytics')
       .queue('next-stage').partition('p2').push([
           {'data': derived, 'transactionId': message['transactionId'] + '-next'},
       ])
       .commit())
_, err := client.Transaction().
    Ack(message, "completed", queen.AckOptions{ConsumerGroup: "analytics"}).
    Queue("next-stage").Partition("p2").
    Push([]interface{}{
        map[string]interface{}{"derived": derived},
    }).
    Commit(ctx)
client.transaction()
    .ack(message)
    .queue("next-stage").partition("p2")
    .push({{{"data", derived}}})
    .commit();
curl -X POST http://localhost:6632/api/v1/transaction \
  -H 'Content-Type: application/json' \
  -d '{
    "operations":[
      {"type":"ack","transactionId":"<tx>","partitionId":"<pid>",
       "status":"completed","consumerGroup":"analytics"},
      {"type":"push","items":[
        {"queue":"next-stage","partition":"p2","payload":{"derived":true}}
      ]}
    ]
  }'

Long polling

// Server holds the connection until a message arrives or 30s elapses.
const messages = await queen.queue('jobs')
  .batch(10)
  .wait(true)
  .pop()
messages = await queen.queue('jobs').batch(10).wait(True).pop()
messages, _ := client.Queue("jobs").
    Batch(10).Wait(true).TimeoutMillis(30000).Pop(ctx)
curl 'http://localhost:6632/api/v1/pop/queue/jobs?batch=10&wait=true&timeout=30000'

Lease renewal

For tasks that may run longer than leaseTime, the client renews the lease in the background so the message stays owned by you.

await queen.queue('long-jobs')
  .renewLease(true, 2000)   // renew every 2 seconds
  .autoAck(false)
  .each()
  .consume(async (m) => {
    await runLongJob(m.data)   // can take minutes
  })
await (queen.queue('long-jobs')
       .renew_lease(True, 2000)
       .auto_ack(False)
       .consume(handler))
client.Queue("long-jobs").
    RenewLease(true, 2000).
    AutoAck(false).
    Consume(ctx, handler).
    Execute(ctx)
// Manual lease renewal in your worker thread
client.renew(message);   // call periodically while job runs
curl -X POST http://localhost:6632/api/v1/lease/<leaseId>/extend \
  -H 'Content-Type: application/json' \
  -d '{"seconds":60}'

Client-side buffering (10×–100× push throughput)

When you're firing many small messages, ask the client to buffer them and flush in batches. Tunable by message count or max wait time, whichever comes first.

await queen.queue('events')
  .buffer({ messageCount: 100, timeMillis: 100 })
  .push([{ data: event }])

// Flush at shutdown
await queen.flushAllBuffers()
await (queen.queue('events')
       .buffer({'messageCount': 100, 'timeMillis': 100})
       .push([{'data': event}]))

await queen.flush_all_buffers()
client.Queue("events").Buffer(queen.BufferConfig{
    MessageCount: 100,
    TimeMillis:   100,
}).Push(event).Execute(ctx)

client.FlushAllBuffers(ctx)
$queen->queue('events')
    ->buffer(['messageCount' => 100, 'timeMillis' => 100])
    ->push([['data' => $event]])
    ->execute();

$queen->flushAllBuffers();
BufferOptions opts;
opts.message_count = 100;
opts.time_millis = 100;

client.queue("events").buffer(opts).push({{{"data", event}}});
client.flush_all_buffers();

Dead-letter queue

const dlq = await queen.queue('orders').dlq().limit(50).get()
console.log('failed:', dlq.total)
for (const m of dlq.messages) {
  console.log(m.errorMessage, m.data)
}
dlq = await queen.queue('orders').dlq().limit(50).get()
print('failed:', dlq['total'])
dlq, _ := client.Queue("orders").DLQ("").Limit(50).Get(ctx)
fmt.Println("failed:", dlq.Total)
curl 'http://localhost:6632/api/v1/dlq?queue=orders&limit=50'

Per-client deep dives

Every client has a comprehensive README in its source folder. They are the canonical reference for type signatures, edge cases, and language-specific idioms.

JavaScript

client-js / README

Browser support, SSE streaming, admin API, full type definitions. npm install queen-mq.

Python

client-py / README

Async-first API, sync helpers, asyncio context-manager lifecycle. pip install queen-mq.

Go

client-go / README

Idiomatic context.Context, builder-style API, batch consumers. go get github.com/smartpricing/queen/client-go.

PHP / Laravel

client-laravel / README

Standalone PHP and Laravel facade, KafkaConsumer-style high-level consumer. composer require smartpricing/queen-mq.

C++

client-cpp / README

Header-only, thread-pool based consumer, manual buffer/lease/transaction control.

Examples

examples/, 30+ runnable scripts

Chat pipeline, transactional pipeline, consumer groups, window buffers, DLQ monitoring, load tests.