[ PROMPT_NODE_24234 ]
R2 Data Catalog 设计模式
[ SKILL_DOCUMENTATION ]
# 常见模式
适用于 R2 数据目录与 PyIceberg 的实用模式。
## PyIceberg 连接
python
import os
from pyiceberg.catalog.rest import RestCatalog
from pyiceberg.exceptions import NamespaceAlreadyExistsError
catalog = RestCatalog(
name="r2_catalog",
warehouse=os.getenv("R2_WAREHOUSE"), # 存储桶名称
uri=os.getenv("R2_CATALOG_URI"), # 目录端点
token=os.getenv("R2_TOKEN"), # API 令牌
)
# 创建命名空间(幂等)
try:
catalog.create_namespace("default")
except NamespaceAlreadyExistsError:
pass
## 模式 1:日志分析工作流
增量摄入日志,按时间/级别进行查询。
python
import pyarrow as pa
from datetime import datetime
from pyiceberg.schema import Schema
from pyiceberg.types import NestedField, TimestampType, StringType, IntegerType
from pyiceberg.partitioning import PartitionSpec, PartitionField
from pyiceberg.transforms import DayTransform
# 创建分区表(仅需一次)
schema = Schema(
NestedField(1, "timestamp", TimestampType(), required=True),
NestedField(2, "level", StringType(), required=True),
NestedField(3, "service", StringType(), required=True),
NestedField(4, "message", StringType(), required=False),
)
partition_spec = PartitionSpec(
PartitionField(source_id=1, field_id=1000, transform=DayTransform(), name="day")
)
catalog.create_namespace("logs")
table = catalog.create_table(("logs", "app_logs"), schema=schema, partition_spec=partition_spec)
# 追加日志(增量)
data = pa.table({
"timestamp": [datetime(2026, 1, 27, 10, 30, 0)],
"level": ["ERROR"],
"service": ["auth-service"],
"message": ["Failed login"],
})
table.append(data)
# 按时间 + 级别查询(利用分区)
scan = table.scan(row_filter="level = 'ERROR' AND day = '2026-01-27'")
errors = scan.to_pandas()
## 模式 2:时间旅行查询
python
from datetime import datetime, timedelta
table = catalog.load_table(("logs", "app_logs"))
# 查询特定快照
snapshot_id = table.current_snapshot().snapshot_id
data = table.scan(snapshot_id=snapshot_id).to_pandas()
# 按时间戳查询(昨天)
yesterday_ms = int((datetime.now() - timedelta(days=1)).timestamp() * 1000)
data = table.scan(as_of_timestamp=yesterday_ms).to_pandas()
## 模式 3:模式演进
python
from pyiceberg.types import StringType
table = catalog.load_table(("users", "profiles"))
with table.update_schema() as update:
up