[ PROMPT_NODE_24196 ]
Pipelines 说明文档
[ SKILL_DOCUMENTATION ]
# Cloudflare Pipelines
用于将数据摄取、转换并加载到 R2(支持 SQL 转换)的 ETL 流式处理平台。
## 概览
Pipelines 提供:
- **Streams(流)**: 持久化事件缓冲区(HTTP/Workers 摄取)
- **Pipelines(管道)**: 基于 SQL 的转换
- **Sinks(接收器)**: R2 目标(Iceberg 表或 Parquet/JSON 文件)
**状态**: 公开测试版 (Workers 付费计划)
**定价**: 除标准 R2 存储/操作费用外无额外费用
## 架构
数据源 → 流 → 管道 (SQL) → 接收器 → R2
↑ ↓ ↓
HTTP/Workers 转换 Iceberg/Parquet
| 组件 | 用途 | 关键特性 |
|-----------|---------|-------------|
| Streams | 事件摄取 | 结构化(已验证)或非结构化 |
| Pipelines | SQL 转换 | 创建后不可变 |
| Sinks | 写入 R2 | 精确一次交付 |
## 快速入门
bash
# 交互式设置(推荐)
npx wrangler pipelines setup
**最小化 Worker 示例:**
typescript
interface Env {
STREAM: Pipeline;
}
export default {
async fetch(request: Request, env: Env, ctx: ExecutionContext): Promise {
const event = { user_id: "123", event_type: "purchase", amount: 29.99 };
// 即发即弃模式
ctx.waitUntil(env.STREAM.send([event]));
return new Response('OK');
}
} satisfies ExportedHandler;
## 如何选择接收器类型?
需要对数据进行 SQL 查询?
→ R2 数据目录 (Iceberg)
✅ ACID 事务、时间旅行、模式演进
❌ 设置较复杂(命名空间、表、目录令牌)
仅需文件存储/归档?
→ R2 存储 (Parquet)
✅ 简单、直接的文件访问
❌ 无内置 SQL 查询
使用外部工具 (Spark/Athena)?
→ R2 存储 (带分区的 Parquet)
✅ 标准格式,分区修剪以提升性能
❌ 必须自行管理模式兼容性
## 常见用例
- **分析管道**: 点击流、遥测、服务器日志
- **数据仓库**: ETL 到可查询的 Iceberg 表
- **事件处理**: 带有数据增强的移动端/IoT 事件
## 阅读顺序
**刚接触 Pipelines?** 从这里开始:
1. [configuration.md](./configuration.md) - 设置流、接收器、管道
2. [api.md](./api.md) - 发送事件、TypeScript 类型、SQL 函数
3. [patterns.md](./patterns.md) - 最佳实践、集成、完整示例
4. [gotchas.md](./gotchas.md) - 关键警告、故障排除
**