Five clients. One mental model.
Every Queen client speaks the same fluent grammar:
queue(name).partition(p).group(g).…push() / .pop() / .consume().
Switch languages, the verbs stay the same. Every snippet on this page was verified
end-to-end against a live Queen server at http://localhost:6632.
| Client | Min runtime | Install | Source |
|---|---|---|---|
| JavaScript · Node and browser | Node.js 22+ | npm install queen-mq |
client-js/ |
| Python | Python 3.8+ | pip install queen-mq |
client-py/ |
| Go | Go 1.24+ | go get github.com/smartpricing/queen/client-go |
client-go/ |
| PHP / Laravel | PHP 8.3+ | composer require smartpricing/queen-mq |
client-laravel/ |
| C++ · header-only | C++17 | copy queen_client.hpp |
client-cpp/ |
Install
npm install queen-mqpip install queen-mqgo get github.com/smartpricing/queen/client-gocomposer require smartpricing/queen-mq
# Laravel only, publish config and set env:
php artisan vendor:publish --tag=queen-config# Header-only client.
curl -O https://raw.githubusercontent.com/smartpricing/queen/master/clients/client-cpp/queen_client.hpp
# Plus cpp-httplib + nlohmann/json (header-only deps).
brew install cpp-httplib # macOS
# or just download httplib.h alongside queen_client.hppConnect
import { Queen } from 'queen-mq'
// 1) Single server
const queen = new Queen('http://localhost:6632')
// 2) Multiple servers, automatic failover + load balancing
const queenHA = new Queen([
'http://queen-a:6632',
'http://queen-b:6632',
])
// 3) Full config (JWT, custom headers, retry, affinity)
const queenFull = new Queen({
urls: ['http://queen-a:6632', 'http://queen-b:6632'],
loadBalancingStrategy: 'affinity', // 'round-robin' | 'session' | 'affinity'
enableFailover: true,
retryAttempts: 3,
bearerToken: process.env.QUEEN_TOKEN,
})from queen import Queen
# Use as async context manager, closes the HTTP pool cleanly.
async with Queen('http://localhost:6632') as queen:
...
# Multiple servers
async with Queen(['http://queen-a:6632', 'http://queen-b:6632']) as queen:
...
# Full config
queen = Queen({
'urls': ['http://queen-a:6632', 'http://queen-b:6632'],
'load_balancing_strategy': 'affinity',
'enable_failover': True,
'retry_attempts': 3,
'bearer_token': os.environ.get('QUEEN_TOKEN'),
})import queen "github.com/smartpricing/queen/client-go"
// Single
client, _ := queen.New("http://localhost:6632")
defer client.Close(ctx)
// HA + auth
client, _ = queen.New(queen.ClientConfig{
URLs: []string{"http://queen-a:6632", "http://queen-b:6632"},
LoadBalancingStrategy: "affinity",
EnableFailover: true,
RetryAttempts: 3,
BearerToken: os.Getenv("QUEEN_TOKEN"),
})use Queen\Queen;
// Standalone
$queen = new Queen('http://localhost:6632');
// HA + auth
$queen = new Queen([
'urls' => ['http://queen-a:6632', 'http://queen-b:6632'],
'loadBalancingStrategy' => 'affinity',
'bearerToken' => env('QUEEN_TOKEN'),
]);
// Laravel, auto-resolves the singleton from config/queen.php
use Queen\Laravel\QueenFacade as Queen;
Queen::queue('orders')->push([['data' => ['ok' => true]]])->execute();#include "queen_client.hpp"
using namespace queen;
QueenClient client("http://localhost:6632");
// HA
ClientConfig cfg;
cfg.load_balancing_strategy = "round-robin";
cfg.enable_failover = true;
cfg.retry_attempts = 3;
QueenClient cluster({"http://queen-a:6632", "http://queen-b:6632"}, cfg);Configure a queue
Queues are created on first push by default. Configure them explicitly when you need non-default lease, retry, retention, or encryption.
await queen.queue('orders')
.config({
leaseTime: 300,
retryLimit: 3,
retentionSeconds: 86400,
encryptionEnabled: false,
})
.create()await queen.queue('orders').config({
'leaseTime': 300,
'retryLimit': 3,
'retentionSeconds': 86400,
'encryptionEnabled': False,
}).create()_, err := client.Queue("orders").Config(queen.QueueConfig{
LeaseTime: 300,
RetryLimit: 3,
RetentionSeconds: 86400,
}).Create().Execute(ctx)$queen->queue('orders')
->config(['leaseTime' => 300, 'retryLimit' => 3, 'retentionSeconds' => 86400])
->create()
->execute();QueueConfig cfg;
cfg.lease_time = 300;
cfg.retry_limit = 3;
cfg.retention_seconds = 86400;
cfg.encryption_enabled = false;
client.queue("orders").config(cfg).create();curl -X POST http://localhost:6632/api/v1/configure \
-H 'Content-Type: application/json' \
-d '{
"queue":"orders",
"options":{"leaseTime":300,"retryLimit":3,"retentionSeconds":86400}
}'Push
// Plain push
await queen.queue('orders').push([
{ data: { orderId: 1, amount: 99.99 } },
])
// Push to a partition for ordering
await queen.queue('orders').partition('user-123').push([
{ data: { orderId: 2, amount: 50 } },
])
// With a transactionId for exactly-once dedup
await queen.queue('orders').push([
{ transactionId: 'order-2-v1', data: { orderId: 2, amount: 50 } },
])
// Many at once
await queen.queue('orders').push([
{ data: { orderId: 3 } },
{ data: { orderId: 4 } },
{ data: { orderId: 5 } },
])await queen.queue('orders').push([
{'data': {'orderId': 1, 'amount': 99.99}},
])
await queen.queue('orders').partition('user-123').push([
{'data': {'orderId': 2, 'amount': 50}},
])
await queen.queue('orders').push([
{'transactionId': 'order-2-v1', 'data': {'orderId': 2}},
])_, err := client.Queue("orders").Push([]interface{}{
map[string]interface{}{"orderId": 1, "amount": 99.99},
}).Execute(ctx)
_, err = client.Queue("orders").Partition("user-123").Push([]interface{}{
map[string]interface{}{"orderId": 2, "amount": 50},
}).Execute(ctx)
_, err = client.Queue("orders").
Push([]interface{}{map[string]interface{}{"orderId": 2}}).
TransactionID("order-2-v1").
Execute(ctx)$queen->queue('orders')->push([
['data' => ['orderId' => 1, 'amount' => 99.99]],
])->execute();
$queen->queue('orders')->partition('user-123')->push([
['data' => ['orderId' => 2, 'amount' => 50]],
])->execute();
$queen->queue('orders')->push([
['transactionId' => 'order-2-v1', 'data' => ['orderId' => 2]],
])->execute();client.queue("orders").push({
{{"data", {{"orderId", 1}, {"amount", 99.99}}}},
});
client.queue("orders").partition("user-123").push({
{{"data", {{"orderId", 2}, {"amount", 50}}}},
});curl -X POST http://localhost:6632/api/v1/push \
-H 'Content-Type: application/json' \
-d '{
"items":[
{"queue":"orders","partition":"user-123",
"payload":{"orderId":2,"amount":50},
"transactionId":"order-2-v1"}
]
}'Pop & ack
Lower-level than consume(). Pops a batch, hands it to you, and you ack
each message yourself. Use this when you need full control over ack timing or want to
bridge into your own framework.
const messages = await queen.queue('orders').batch(10).pop()
for (const m of messages) {
try {
await processOrder(m.data)
await queen.ack(m, true) // success
} catch (e) {
await queen.ack(m, false, { error: e.message }) // retry
}
}messages = await queen.queue('orders').batch(10).pop()
for m in messages:
try:
await process_order(m['data'])
await queen.ack(m, True)
except Exception as e:
await queen.ack(m, False, error=str(e))messages, _ := client.Queue("orders").Batch(10).Pop(ctx)
for _, m := range messages {
if err := processOrder(m.Data); err != nil {
client.Ack(ctx, m, false, queen.AckOptions{Error: err.Error()})
} else {
client.Ack(ctx, m, true, queen.AckOptions{})
}
}$messages = $queen->queue('orders')->batch(10)->pop();
foreach ($messages as $m) {
try {
processOrder($m['data']);
$queen->ack($m, true);
} catch (\Throwable $e) {
$queen->ack($m, false, ['error' => $e->getMessage()]);
}
}auto messages = client.queue("orders").batch(10).wait(false).pop();
for (const auto& m : messages) {
try {
processOrder(m["data"]);
client.ack(m, true);
} catch (const std::exception& e) {
client.ack(m, false, {{"error", e.what()}});
}
}curl 'http://localhost:6632/api/v1/pop/queue/orders?batch=10'
# returns {"messages":[{"transactionId":"...","partitionId":"...",...}, ...]}
curl -X POST http://localhost:6632/api/v1/ack \
-H 'Content-Type: application/json' \
-d '{
"transactionId":"<tx>",
"partitionId":"<pid>",
"status":"completed"
}'Consume (auto-managed worker)
The high-level path. consume() takes care of the loop, batching, lease
renewal, error handling, and ack semantics for you.
await queen.queue('orders')
.concurrency(10) // 10 parallel workers
.batch(20) // up to 20 messages per poll
.each() // handler receives one message at a time
.consume(async (m) => {
await processOrder(m.data)
// auto-ack on resolve, auto-nack on throw
})async def handler(m):
await process_order(m['data'])
await (queen.queue('orders')
.concurrency(10)
.batch(20)
.consume(handler)) # batch=1 → handler receives single msg
# batch>1 → handler receives a listerr := client.Queue("orders").
Concurrency(10).
Batch(20).
Consume(ctx, func(c context.Context, m *queen.Message) error {
return processOrder(m.Data)
}).Execute(ctx)$consumer = $queen->queue('orders')
->group('processors')
->batch(1)
->autoAck(false)
->getConsumer();
$consumer->subscribe();
while (!$consumer->isClosed()) {
$m = $consumer->consume(1000); // ms timeout
if ($m === null) continue;
try {
processOrder($m['data']);
$consumer->ack($m);
} catch (\Throwable $e) {
$consumer->nack($m, $e->getMessage());
}
}client.queue("orders")
.concurrency(10)
.batch(20)
.consume([](const json& msgs) {
for (const auto& m : msgs) {
processOrder(m["data"]);
}
});Multi-partition pop
Drain up to N partitions in a single network round-trip with
.partitions(N). Designed for queues with many partitions where each
partition only has a handful of new messages per polling interval, claiming one
partition per call wastes bandwidth. With .partitions(N),
.batch(B) becomes a global cap on total messages
returned across all claimed partitions, and every claimed partition shares a single
lease (one renew() call extends them all).
Default is .partitions(1), single-partition behaviour, byte-equivalent
to a normal wildcard pop. Each returned message carries its own
partitionId, partition, leaseId, and
consumerGroup, so ACK and renew always work message-by-message
regardless of how many partitions the batch spans.
// One round-trip drains up to 200 messages spread across up to 50 partitions.
// batch(200) is the GLOBAL cap; partitions(50) is the partition fan-out cap.
const messages = await queen.queue('events')
.batch(200)
.partitions(50)
.wait(true)
.pop()
for (const m of messages) {
console.log(`from ${m.partition}:`, m.data) // m.partitionId / m.leaseId baked in
}
await queen.renew(messages) // one renew extends every claimed partition's lease
// Same builder works for long-running consumers:
await queen.queue('events')
.batch(100)
.partitions(8)
.consume(async (msgs) => {
// msgs may span multiple partitions; FIFO is preserved within each partition
for (const m of msgs) await process(m.data)
})# One round-trip drains up to 200 messages from up to 50 partitions.
messages = await (queen.queue('events')
.batch(200)
.partitions(50)
.wait(True)
.pop())
for m in messages:
print(f"from {m['partition']}:", m['data']) # partitionId / leaseId baked in
await queen.renew(messages) # one renew extends every claimed partition's lease
# Same builder works for long-running consumers:
async def handler(msgs):
for m in msgs:
await process(m['data'])
await (queen.queue('events')
.batch(100)
.partitions(8)
.consume(handler))// One round-trip drains up to 200 messages from up to 50 partitions.
messages, _ := client.Queue("events").
Batch(200).
Partitions(50).
Wait(true).
Pop(ctx)
for _, m := range messages {
fmt.Printf("from %s: %v\n", m.Partition, m.Data)
}
client.Renew(ctx, messages) // one renew extends every claimed partition's lease
// Same builder works for long-running consumers:
client.Queue("events").
Batch(100).
Partitions(8).
Consume(ctx, func(c context.Context, m *queen.Message) error {
return process(m.Data)
}).Execute(ctx)// One round-trip drains up to 200 messages from up to 50 partitions.
$messages = $queen->queue('events')
->batch(200)
->partitions(50)
->wait(true)
->pop();
foreach ($messages as $m) {
echo "from {$m['partition']}: " . json_encode($m['data']) . "\n";
}
$queen->renew($messages); // one renew extends every claimed partition's lease
// Same builder works for the high-level consumer:
$consumer = $queen->queue('events')
->batch(100)
->partitions(8)
->getConsumer();// One round-trip drains up to 200 messages from up to 50 partitions.
auto messages = client.queue("events")
.batch(200)
.partitions(50)
.wait(true)
.pop();
for (const auto& m : messages) {
std::cout << "from " << m["partition"] << ": " << m["data"].dump() << "\n";
}
client.renew(messages); // one renew extends every claimed partition's lease
// Same builder works for long-running consumers:
client.queue("events")
.batch(100)
.partitions(8)
.consume([](const json& msgs) {
for (const auto& m : msgs) process(m["data"]);
});# Wildcard pop with multi-partition fan-out: drain up to 200 messages from up
# to 50 partitions in a single call. partitions=1 (default) preserves legacy
# single-partition behaviour.
curl 'http://localhost:6632/api/v1/pop/queue/events?batch=200&partitions=50&wait=true'
# Response shape (note partitionsClaimed and per-message partitionId):
# {
# "success": true,
# "queue": "events",
# "partition": "p7", # first claimed partition (back-compat)
# "partitionId": "...", # first claimed partitionId (back-compat)
# "leaseId": "worker-uuid", # shared across all claimed partitions
# "messages": [
# {"transactionId":"...","partitionId":"...","partition":"p7","leaseId":"...", ...},
# {"transactionId":"...","partitionId":"...","partition":"p12","leaseId":"...", ...},
# ...
# ],
# "partitionsClaimed": 8 # how many partitions actually contributed
# }
When to use it. Many partitions, each with few messages per polling
interval (per-customer event streams, per-tenant work queues, per-device telemetry).
For a queue with P active partitions and k messages per partition
per cycle, .partitions(N) reduces round-trips from P to
P / N while preserving per-partition FIFO ordering.
When not to use it. Few partitions, or each one busy enough to fill
.batch(B) on its own, default .partitions(1) is already
optimal there. .partitions(N) only affects wildcard
pops; specifying .partition('name') ignores the cap.
Consumer groups
Same group → workers share the load. Different groups → fan-out (each group sees every
message). Add .subscriptionMode('new') to skip historical messages.
// Group A: emailer (load-balanced across two workers)
queen.queue('events').group('emailer').consume(emailHandler1)
queen.queue('events').group('emailer').consume(emailHandler2)
// Group B: analytics, sees the same messages independently
queen.queue('events').group('analytics').consume(analyticsHandler)
// Skip backlog, only process messages pushed after this group joins
queen.queue('events').group('realtime')
.subscriptionMode('new')
.consume(rtHandler)
// Replay from a timestamp
queen.queue('events').group('replay-2024')
.subscriptionFrom('2024-01-01T00:00:00Z')
.consume(replayHandler)await queen.queue('events').group('emailer').consume(email_handler)
await queen.queue('events').group('analytics').consume(analytics_handler)
await (queen.queue('events').group('realtime')
.subscription_mode('new')
.consume(rt_handler))
await (queen.queue('events').group('replay-2024')
.subscription_from('2024-01-01T00:00:00Z')
.consume(replay_handler))client.Queue("events").Group("emailer").
Consume(ctx, emailHandler).Execute(ctx)
client.Queue("events").Group("realtime").
SubscriptionMode(queen.SubscriptionModeNew).
Consume(ctx, rtHandler).Execute(ctx)# Pop with consumer group + subscription mode
curl 'http://localhost:6632/api/v1/pop/queue/events?consumerGroup=emailer&batch=10'
curl 'http://localhost:6632/api/v1/pop/queue/events?consumerGroup=realtime&subscriptionMode=new'
# Ack within a group (REQUIRED for cg semantics)
curl -X POST http://localhost:6632/api/v1/ack \
-H 'Content-Type: application/json' \
-d '{"transactionId":"<tx>","partitionId":"<pid>","status":"completed","consumerGroup":"emailer"}'Transactions, atomic ack + push
await queen
.transaction()
.ack(message, 'completed', { consumerGroup: 'analytics' })
.queue('next-stage').partition('p2').push([
{ data: derived, transactionId: message.transactionId + '-next' },
])
.commit()await (queen.transaction()
.ack(message, 'completed', consumer_group='analytics')
.queue('next-stage').partition('p2').push([
{'data': derived, 'transactionId': message['transactionId'] + '-next'},
])
.commit())_, err := client.Transaction().
Ack(message, "completed", queen.AckOptions{ConsumerGroup: "analytics"}).
Queue("next-stage").Partition("p2").
Push([]interface{}{
map[string]interface{}{"derived": derived},
}).
Commit(ctx)client.transaction()
.ack(message)
.queue("next-stage").partition("p2")
.push({{{"data", derived}}})
.commit();curl -X POST http://localhost:6632/api/v1/transaction \
-H 'Content-Type: application/json' \
-d '{
"operations":[
{"type":"ack","transactionId":"<tx>","partitionId":"<pid>",
"status":"completed","consumerGroup":"analytics"},
{"type":"push","items":[
{"queue":"next-stage","partition":"p2","payload":{"derived":true}}
]}
]
}'Long polling
// Server holds the connection until a message arrives or 30s elapses.
const messages = await queen.queue('jobs')
.batch(10)
.wait(true)
.pop()messages = await queen.queue('jobs').batch(10).wait(True).pop()messages, _ := client.Queue("jobs").
Batch(10).Wait(true).TimeoutMillis(30000).Pop(ctx)curl 'http://localhost:6632/api/v1/pop/queue/jobs?batch=10&wait=true&timeout=30000'Lease renewal
For tasks that may run longer than leaseTime, the client renews the
lease in the background so the message stays owned by you.
await queen.queue('long-jobs')
.renewLease(true, 2000) // renew every 2 seconds
.autoAck(false)
.each()
.consume(async (m) => {
await runLongJob(m.data) // can take minutes
})await (queen.queue('long-jobs')
.renew_lease(True, 2000)
.auto_ack(False)
.consume(handler))client.Queue("long-jobs").
RenewLease(true, 2000).
AutoAck(false).
Consume(ctx, handler).
Execute(ctx)// Manual lease renewal in your worker thread
client.renew(message); // call periodically while job runscurl -X POST http://localhost:6632/api/v1/lease/<leaseId>/extend \
-H 'Content-Type: application/json' \
-d '{"seconds":60}'Client-side buffering (10×–100× push throughput)
When you're firing many small messages, ask the client to buffer them and flush in batches. Tunable by message count or max wait time, whichever comes first.
await queen.queue('events')
.buffer({ messageCount: 100, timeMillis: 100 })
.push([{ data: event }])
// Flush at shutdown
await queen.flushAllBuffers()await (queen.queue('events')
.buffer({'messageCount': 100, 'timeMillis': 100})
.push([{'data': event}]))
await queen.flush_all_buffers()client.Queue("events").Buffer(queen.BufferConfig{
MessageCount: 100,
TimeMillis: 100,
}).Push(event).Execute(ctx)
client.FlushAllBuffers(ctx)$queen->queue('events')
->buffer(['messageCount' => 100, 'timeMillis' => 100])
->push([['data' => $event]])
->execute();
$queen->flushAllBuffers();BufferOptions opts;
opts.message_count = 100;
opts.time_millis = 100;
client.queue("events").buffer(opts).push({{{"data", event}}});
client.flush_all_buffers();Dead-letter queue
const dlq = await queen.queue('orders').dlq().limit(50).get()
console.log('failed:', dlq.total)
for (const m of dlq.messages) {
console.log(m.errorMessage, m.data)
}dlq = await queen.queue('orders').dlq().limit(50).get()
print('failed:', dlq['total'])dlq, _ := client.Queue("orders").DLQ("").Limit(50).Get(ctx)
fmt.Println("failed:", dlq.Total)curl 'http://localhost:6632/api/v1/dlq?queue=orders&limit=50'Per-client deep dives
Every client has a comprehensive README in its source folder. They are the canonical reference for type signatures, edge cases, and language-specific idioms.
client-js / README
Browser support, SSE streaming, admin API, full type definitions. npm install queen-mq.
client-py / README
Async-first API, sync helpers, asyncio context-manager lifecycle. pip install queen-mq.
client-go / README
Idiomatic context.Context, builder-style API, batch consumers. go get github.com/smartpricing/queen/client-go.
client-laravel / README
Standalone PHP and Laravel facade, KafkaConsumer-style high-level consumer. composer require smartpricing/queen-mq.
client-cpp / README
Header-only, thread-pool based consumer, manual buffer/lease/transaction control.
examples/, 30+ runnable scripts
Chat pipeline, transactional pipeline, consumer groups, window buffers, DLQ monitoring, load tests.
