[ PROMPT_NODE_24222 ]
Queues 常见陷阱
[ SKILL_DOCUMENTATION ]
# 队列陷阱与故障排除
## 关键:生产环境常见错误
### 1. “单条错误导致整个批次重试”
**问题:** 在队列处理器中抛出未捕获的错误会重试整个批次,而不仅仅是失败的消息。
**原因:** 未捕获的异常传播到运行时,触发了批次级别的重试。
**解决方案:** 始终将单个消息处理逻辑包装在 try/catch 中,并显式调用 `msg.retry()`。
typescript
// ❌ 错误:抛出错误,重试整个批次
async queue(batch: MessageBatch): Promise {
for (const msg of batch.messages) {
await riskyOperation(msg.body); // 如果此处抛出异常,整个批次重试
msg.ack();
}
}
// ✅ 正确:按消息捕获,单独处理
async queue(batch: MessageBatch): Promise {
for (const msg of batch.messages) {
try {
await riskyOperation(msg.body);
msg.ack();
} catch (error) {
msg.retry({ delaySeconds: 60 });
}
}
}
### 2. “消息无限期重试”
**问题:** 未显式确认 (ack) 或重试的消息将无限期自动重试。
**原因:** 运行时默认行为会重试未处理的消息,直到达到 `max_retries`。
**解决方案:** 始终为每条消息调用 `msg.ack()` 或 `msg.retry()`。切勿留下未处理的消息。
typescript
// ❌ 错误:跳过的消息会无限期自动重试
async queue(batch: MessageBatch): Promise {
for (const msg of batch.messages) {
if (shouldProcess(msg.body)) {
await process(msg.body);
msg.ack();
}
// 缺失:跳过消息的 msg.ack() - 它们会重试!
}
}
// ✅ 正确:显式处理所有消息
async queue(batch: MessageBatch): Promise {
for (const msg of batch.messages) {
if (shouldProcess(msg.body)) {
await process(msg.body);
msg.ack();
} else {
msg.ack(); // 即使不处理也要显式确认
}
}
}
## 常见错误
### “重复消息处理”
**问题:** 同一条消息被处理多次。
**原因:** 至少一次送达保证意味着在重试期间可能会出现重复。
**解决方案:** 通过在 KV 中跟踪已处理消息的 ID(带过期 TTL)来设计幂等消费者。
typescript
async queue(batch: MessageBatch, env: Env): Promise {
for (const msg of batch.messages) {
const processed = await env.PROCESSED_KV.get(msg.id);
if (processed) {
msg.ack();
continue;
}
await processMessage(msg.body);
await env.PROCESSED_KV.put(msg.id, '1'