[ PROMPT_NODE_24218 ]
Queues API 参考
[ SKILL_DOCUMENTATION ]
# 队列 API 参考
## 生产者:发送消息
typescript
// 基础发送
await env.MY_QUEUE.send({ url: request.url, timestamp: Date.now() });
// 选项:delay(最长 43200 秒),contentType (json|text|bytes|v8)
await env.MY_QUEUE.send(message, { delaySeconds: 600 });
await env.MY_QUEUE.send(message, { delaySeconds: 0 }); // 覆盖队列默认值
// 批处理(最多 100 条消息或 256 KB)
await env.MY_QUEUE.sendBatch([
{ body: 'msg1' },
{ body: 'msg2' },
{ body: 'msg3', options: { delaySeconds: 300 } }
]);
// 使用 ctx.waitUntil 非阻塞发送 - 发送在响应后继续
ctx.waitUntil(env.MY_QUEUE.send({ data: 'async' }));
// 队列消费者中的后台任务
export default {
async queue(batch: MessageBatch, env: Env, ctx: ExecutionContext): Promise {
for (const msg of batch.messages) {
await processMessage(msg.body);
// 即发即弃的分析(不阻塞确认)
ctx.waitUntil(
env.ANALYTICS_QUEUE.send({ messageId: msg.id, processedAt: Date.now() })
);
msg.ack();
}
}
};
## 消费者:基于推送(Worker)
typescript
// 使用 ExportedHandler 的类型安全处理器
interface Env {
MY_QUEUE: Queue;
DB: D1Database;
}
export default {
async queue(batch: MessageBatch, env: Env, ctx: ExecutionContext): Promise {
// batch.queue, batch.messages.length
for (const msg of batch.messages) {
// msg.id, msg.body, msg.timestamp, msg.attempts
try {
await processMessage(msg.body);
msg.ack();
} catch (error) {
msg.retry({ delaySeconds: 600 });
}
}
}
} satisfies ExportedHandler;
**关键警告:**
1. **未显式确认 (ack) 或重试的消息将无限期自动重试**,直到达到 `max_retries`。请务必为每条消息调用 `msg.ack()` 或 `msg.retry()`。
2. **抛出未捕获的错误会重试整个批次**,而不仅仅是失败的消息。请务必将单个消息处理逻辑包装在 try/catch 中,并显式调用 `msg.retry()`。
typescript
// ❌ 错误:未捕获的错误会导致整个批次重试
async queue(batch: MessageBatch): Promise {
for (const msg of batch.messages) {
await riskyOperation(msg.body); // 如果此处抛出异常,整个批次重试
msg.ack();
}
}
// ✅ 正确:按消息捕获,单独处理
async queue(batch: MessageBatch): Promise {
for (const msg of batch.messages) {
try {
await riskyOperation(msg.body);
msg.ack();