[ PROMPT_NODE_24200 ]
Pipelines 配置说明
[ SKILL_DOCUMENTATION ]
# Pipelines 配置
## Worker 绑定
c
// wrangler.jsonc
{
"pipelines": [
{ "pipeline": "", "binding": "STREAM" }
]
}
获取流 ID: `npx wrangler pipelines streams list`
## Schema(结构化流)
{
"fields": [
{ "name": "user_id", "type": "string", "required": true },
{ "name": "event_type", "type": "string", "required": true },
{ "name": "amount", "type": "float64", "required": false },
{ "name": "timestamp", "type": "timestamp", "required": true }
]
}
**类型:** `string`, `int32`, `int64`, `float32`, `float64`, `bool`, `timestamp`, `json`, `binary`, `list`, `struct`
## 流设置
bash
# 带 Schema
npx wrangler pipelines streams create my-stream --schema-file schema.json
# 非结构化(无验证)
npx wrangler pipelines streams create my-stream
# 列出/获取/删除
npx wrangler pipelines streams list
npx wrangler pipelines streams get
npx wrangler pipelines streams delete
## 接收器配置
**R2 数据目录 (Iceberg):**
bash
npx wrangler pipelines sinks create my-sink
--type r2-data-catalog
--bucket my-bucket --namespace default --table events
--catalog-token $TOKEN
--compression zstd --roll-interval 60
**R2 Raw (Parquet):**
bash
npx wrangler pipelines sinks create my-sink
--type r2 --bucket my-bucket --format parquet
--path analytics/events
--partitioning "year=%Y/month=%m/day=%d"
--access-key-id $KEY --secret-access-key $SECRET
| 选项 | 值 | 指导 |
|--------|--------|----------|
| `--compression` | `zstd`, `snappy`, `gzip` | `zstd` 压缩比最好,`snappy` 最快 |
| `--roll-interval` | 秒 | 低延迟: 10-60, 查询性能: 300 |
| `--roll-size` | MB | 更大 = 更好的压缩 |
## 管道创建
bash
npx wrangler pipelines create my-pipeline
--sql "INSERT INTO my_sink SELECT * FROM my_stream WHERE event_type = 'purchase'"
**⚠️ 管道是不可变的** - 无法修改 SQL。必须删除并重新创建。
## 凭据
| 类型 | 权限 | 获取方式 |
|------|------------|----------|
| Catalog token | R2 Admin Read & Write | 仪表盘 → R2 → API tokens |
| R2 凭据 | Object Read & Write | `wrangler r2 bucket create` 输出 |
| HTTP 摄取令牌 | Workers Pipeline Send | 仪表盘 → Workers → API tokens |
## 完整示例
bash
npx wrangler r2 bucket create my-bucket
npx wrangler r2 bucket catalog enable my-bucket
npx wrangler pipelines streams create my-stream