[ PROMPT_NODE_24204 ]
Cloudflare Pipelines 设计模式
[ SKILL_DOCUMENTATION ]
# Pipelines 设计模式
## 即发即弃 (Fire-and-Forget)
```typescript
export default {
async fetch(request, env, ctx) {
const event = { user_id: '...', event_type: 'page_view', timestamp: new Date().toISOString() };
ctx.waitUntil(env.STREAM.send([event])); // 不阻塞响应
return new Response('OK');
}
};
```
## 使用 Zod 进行架构验证
```typescript
import { z } from 'zod';
const EventSchema = z.object({
user_id: z.string(),
event_type: z.enum(['purchase', 'view']),
amount: z.number().positive().optional()
});
const validated = EventSchema.parse(rawEvent); // 验证失败时抛出异常
await env.STREAM.send([validated]);
```
**原因:** 结构化流会静默丢弃无效事件。客户端验证可提供即时反馈。
## SQL 转换模式
```sql
-- 尽早过滤(减少存储占用)
INSERT INTO my_sink
SELECT user_id, event_type, amount
FROM my_stream
WHERE event_type = 'purchase' AND amount > 10
-- 仅选择所需字段
INSERT INTO my_sink
SELECT user_id, event_type, timestamp FROM my_stream
-- 使用 CASE 进行数据增强
INSERT INTO my_sink
SELECT user_id, amount,
CASE WHEN amount > 1000 THEN 'vip' ELSE 'standard' END as tier
FROM my_stream
```
## Pipelines + Queues 扇出模式
```typescript
await Promise.all([
env.ANALYTICS_STREAM.send([event]), // 长期存储
env.PROCESS_QUEUE.send(event) // 立即处理
]);
```
| 需求 | 使用方式 |
|------|-----|
| 长期存储、SQL 查询 | Pipelines |
| 立即处理、重试机制 | Queues |
| 两者兼顾 | 扇出模式 (Fan-out) |
## 性能调优
| 目标 | 配置 |
|------|--------|
| 低延迟 | `--roll-interval 10` |
| 查询性能 | `--roll-interval 300 --roll-size 100` |
| 成本优化 | `--compression zstd --roll-interval 300` |
## 架构演进
Pipelines 是不可变的。请使用版本管理:
```bash
# 创建 v2 版本的 stream/sink/pipeline
npx wrangler pipelines streams create events-v2 --schema-file v2.json
# 过渡期间进行双写
await Promise.all([env.EVENTS_V1.send([event]), env.EVENTS_V2.send([event])]);
# 使用 UNION ALL 跨版本进行查询
```