[ PROMPT_NODE_22254 ]
Data Processing Ray Data 集成
[ SKILL_DOCUMENTATION ]
# Ray Data 集成指南
与 Ray Train 及机器学习框架的集成。
## Ray Train 集成
### 使用数据集进行基础训练
python
import ray
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer
# 创建数据集
train_ds = ray.data.read_parquet("s3://data/train/")
val_ds = ray.data.read_parquet("s3://data/val/")
def train_func(config):
# 获取数据集分片
train_ds = ray.train.get_dataset_shard("train")
val_ds = ray.train.get_dataset_shard("val")
for epoch in range(config["epochs"]):
# 遍历批次
for batch in train_ds.iter_batches(batch_size=32):
# 在批次上进行训练
pass
# 启动训练
trainer = TorchTrainer(
train_func,
train_loop_config={"epochs": 10},
datasets={"train": train_ds, "val": val_ds},
scaling_config=ScalingConfig(num_workers=4, use_gpu=True)
)
result = trainer.fit()
## PyTorch 集成
### 转换为 PyTorch 数据集
python
# 选项 1: to_torch (推荐)
torch_ds = ds.to_torch(
label_column="label",
batch_size=32,
drop_last=True
)
for batch in torch_ds:
inputs = batch["features"]
labels = batch["label"]
# 训练模型
# 选项 2: iter_torch_batches
for batch in ds.iter_torch_batches(batch_size=32):
# batch 是张量字典
pass
## TensorFlow 集成
python
tf_ds = ds.to_tf(
feature_columns=["image", "text"],
label_column="label",
batch_size=32
)
for features, labels in tf_ds:
# 训练 TensorFlow 模型
pass
## 最佳实践
1. **在 Ray Train 中分片数据集** - 使用 `get_dataset_shard()` 自动完成
2. **使用流式处理** - 不要将整个数据集加载到内存中
3. **在 Ray Data 中进行预处理** - 在集群中分布式执行预处理
4. **缓存预处理数据** - 写入 Parquet,在训练时读取