[ PROMPT_NODE_22335 ]
Multi Node
[ SKILL_DOCUMENTATION ]
# Ray Train Multi-Node Setup
## Ray Cluster Architecture
Ray Train runs on a **Ray cluster** with one head node and multiple worker nodes.
**Components**:
- **Head node**: Coordinates workers, runs scheduling
- **Worker nodes**: Execute training tasks
- **Object store**: Shared memory across nodes (using Apache Arrow/Plasma)
## Local Multi-Node Setup
### Manual Cluster Setup
**Head node**:
```bash
# Start Ray head
ray start --head --port=6379 --dashboard-host=0.0.0.0
# Output:
# Started Ray on this node with:
# - Head node IP: 192.168.1.100
# - Dashboard: http://192.168.1.100:8265
```
**Worker nodes**:
```bash
# Connect to head node
ray start --address=192.168.1.100:6379
# Output:
# Started Ray on this node.
# Connected to Ray cluster.
```
**Training script**:
```python
import ray
from ray.train.torch import TorchTrainer
from ray.train import ScalingConfig
# Connect to cluster
ray.init(address='auto') # Auto-detects cluster
# Train across all nodes
trainer = TorchTrainer(
train_func,
scaling_config=ScalingConfig(
num_workers=16, # Total workers across all nodes
use_gpu=True,
placement_strategy="SPREAD" # Spread across nodes
)
)
result = trainer.fit()
```
### Check Cluster Status
```bash
# View cluster status
ray status
# Output:
# ======== Cluster Status ========
# Nodes: 4
# Total CPUs: 128
# Total GPUs: 32
# Total memory: 512 GB
```
**Python API**:
```python
import ray
ray.init(address='auto')
# Get cluster resources
print(ray.cluster_resources())
# {'CPU': 128.0, 'GPU': 32.0, 'memory': 549755813888, 'node:192.168.1.100': 1.0, ...}
# Get available resources
print(ray.available_resources())
```
## Cloud Deployments
### AWS EC2 Cluster
**Cluster config** (`cluster.yaml`):
```yaml
cluster_name: ray-train-cluster
max_workers: 3 # 3 worker nodes
provider:
type: aws
region: us-west-2
availability_zone: us-west-2a
auth:
ssh_user: ubuntu
head_node_type: head_node
available_node_types:
head_node:
node_config:
InstanceType: p3.2xlarge # V100 GPU
ImageId: ami-0a2363a9cff180a64 # Deep Learning AMI
resources: {"CPU": 8, "GPU": 1}
min_workers: 0
max_workers: 0
worker_node:
node_config:
InstanceType: p3.8xlarge # 4× V100
ImageId: ami-0a2363a9cff180a64
resources: {"CPU": 32, "GPU": 4}
min_workers: 3
max_workers: 3
setup_commands:
- pip install -U ray[train] torch transformers
head_setup_commands:
- pip install -U "ray[default]"
```
**Launch cluster**:
```bash
# Start cluster
ray up cluster.yaml
# SSH to head node
ray attach cluster.yaml
# Run training
python train.py
# Teardown
ray down cluster.yaml
```
**Auto-submit job**:
```bash
# Submit job from local machine
ray job submit
--address http://:8265
--working-dir .
-- python train.py
```
### GCP Cluster
**Cluster config** (`gcp-cluster.yaml`):
```yaml
cluster_name: ray-train-gcp
provider:
type: gcp
region: us-central1
availability_zone: us-central1-a
project_id: my-project-id
auth:
ssh_user: ubuntu
head_node_type: head_node
available_node_types:
head_node:
node_config:
machineType: n1-standard-8
disks:
- boot: true
autoDelete: true
type: PERSISTENT
initializeParams:
diskSizeGb: 50
sourceImage: projects/deeplearning-platform-release/global/images/family/pytorch-latest-gpu
guestAccelerators:
- acceleratorType: nvidia-tesla-v100
acceleratorCount: 1
resources: {"CPU": 8, "GPU": 1}
worker_node:
node_config:
machineType: n1-highmem-16
disks:
- boot: true
autoDelete: true
type: PERSISTENT
initializeParams:
diskSizeGb: 100
sourceImage: projects/deeplearning-platform-release/global/images/family/pytorch-latest-gpu
guestAccelerators:
- acceleratorType: nvidia-tesla-v100
acceleratorCount: 4
resources: {"CPU": 16, "GPU": 4}
min_workers: 2
max_workers: 10
setup_commands:
- pip install -U ray[train] torch transformers
```
**Launch**:
```bash
ray up gcp-cluster.yaml --yes
```
### Azure Cluster
**Cluster config** (`azure-cluster.yaml`):
```yaml
cluster_name: ray-train-azure
provider:
type: azure
location: eastus
resource_group: ray-cluster-rg
subscription_id: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
auth:
ssh_user: ubuntu
ssh_private_key: ~/.ssh/id_rsa
head_node_type: head_node
available_node_types:
head_node:
node_config:
azure_arm_parameters:
vmSize: Standard_NC6 # K80 GPU
imagePublisher: microsoft-dsvm
imageOffer: ubuntu-1804
imageSku: 1804-gen2
imageVersion: latest
resources: {"CPU": 6, "GPU": 1}
worker_node:
node_config:
azure_arm_parameters:
vmSize: Standard_NC24 # 4× K80
imagePublisher: microsoft-dsvm
imageOffer: ubuntu-1804
imageSku: 1804-gen2
imageVersion: latest
resources: {"CPU": 24, "GPU": 4}
min_workers: 2
max_workers: 10
```
## Kubernetes Deployment
### KubeRay Operator
**Install KubeRay**:
```bash
# Add Helm repo
helm repo add kuberay https://ray-project.github.io/kuberay-helm/
# Install operator
helm install kuberay-operator kuberay/kuberay-operator --version 0.6.0
```
**RayCluster manifest** (`ray-cluster.yaml`):
```yaml
apiVersion: ray.io/v1alpha1
kind: RayCluster
metadata:
name: ray-train-cluster
spec:
rayVersion: '2.40.0'
headGroupSpec:
rayStartParams:
dashboard-host: '0.0.0.0'
template:
spec:
containers:
- name: ray-head
image: rayproject/ray:2.40.0-py310-gpu
resources:
limits:
cpu: "8"
memory: "32Gi"
nvidia.com/gpu: "1"
requests:
cpu: "8"
memory: "32Gi"
nvidia.com/gpu: "1"
ports:
- containerPort: 6379
name: gcs-server
- containerPort: 8265
name: dashboard
- containerPort: 10001
name: client
workerGroupSpecs:
- replicas: 4
minReplicas: 2
maxReplicas: 10
groupName: gpu-workers
rayStartParams: {}
template:
spec:
containers:
- name: ray-worker
image: rayproject/ray:2.40.0-py310-gpu
resources:
limits:
cpu: "16"
memory: "64Gi"
nvidia.com/gpu: "4"
requests:
cpu: "16"
memory: "64Gi"
nvidia.com/gpu: "4"
```
**Deploy**:
```bash
kubectl apply -f ray-cluster.yaml
# Check status
kubectl get rayclusters
# Access dashboard
kubectl port-forward service/ray-train-cluster-head-svc 8265:8265
# Open http://localhost:8265
```
**Submit training job**:
```bash
# Port-forward Ray client port
kubectl port-forward service/ray-train-cluster-head-svc 10001:10001
# Submit from local machine
RAY_ADDRESS="ray://localhost:10001" python train.py
```
## SLURM Integration
### SLURM Job Script
**Launch Ray cluster** (`ray_cluster.sh`):
```bash
#!/bin/bash
#SBATCH --job-name=ray-train
#SBATCH --nodes=4
#SBATCH --ntasks-per-node=1
#SBATCH --cpus-per-task=32
#SBATCH --gres=gpu:8
#SBATCH --time=24:00:00
#SBATCH --output=ray_train_%j.out
# Load modules
module load cuda/11.8
module load python/3.10
# Activate environment
source ~/venv/bin/activate
# Get head node
head_node=$(hostname)
head_node_ip=$(hostname -I | awk '{print $1}')
# Start Ray head on first node
if [ "$SLURM_NODEID" -eq 0 ]; then
echo "Starting Ray head node at $head_node_ip"
ray start --head --node-ip-address=$head_node_ip
--port=6379
--dashboard-host=0.0.0.0
--num-cpus=$SLURM_CPUS_PER_TASK
--num-gpus=$SLURM_GPUS_ON_NODE
--block &
sleep 10
fi
# Start Ray workers on other nodes
if [ "$SLURM_NODEID" -ne 0 ]; then
echo "Starting Ray worker node"
ray start --address=$head_node_ip:6379
--num-cpus=$SLURM_CPUS_PER_TASK
--num-gpus=$SLURM_GPUS_ON_NODE
--block &
fi
sleep 5
# Run training on head node only
if [ "$SLURM_NODEID" -eq 0 ]; then
echo "Running training..."
python train.py --address=$head_node_ip:6379
fi
# Wait for all processes
wait
```
**Submit job**:
```bash
sbatch ray_cluster.sh
```
**Training script** (`train.py`):
```python
import argparse
import ray
from ray.train.torch import TorchTrainer
from ray.train import ScalingConfig
def main(args):
# Connect to Ray cluster
ray.init(address=args.address)
# Train across all SLURM nodes
trainer = TorchTrainer(
train_func,
scaling_config=ScalingConfig(
num_workers=32, # 4 nodes × 8 GPUs
use_gpu=True,
placement_strategy="SPREAD"
)
)
result = trainer.fit()
print(f"Training complete: {result.metrics}")
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--address', required=True)
args = parser.parse_args()
main(args)
```
## Autoscaling
### Enable Autoscaling
**Cluster config with autoscaling**:
```yaml
cluster_name: ray-autoscale
max_workers: 10 # Maximum worker nodes
idle_timeout_minutes: 5 # Shutdown idle workers after 5 min
provider:
type: aws
region: us-west-2
available_node_types:
worker_node:
min_workers: 2 # Always keep 2 workers
max_workers: 10 # Scale up to 10
resources: {"CPU": 32, "GPU": 4}
node_config:
InstanceType: p3.8xlarge
```
**Training with autoscaling**:
```python
from ray.train.torch import TorchTrainer
from ray.train import ScalingConfig, RunConfig
# Request resources, Ray autoscaler adds nodes as needed
trainer = TorchTrainer(
train_func,
scaling_config=ScalingConfig(
num_workers=40, # Ray will autoscale to 10 nodes (40 GPUs)
use_gpu=True,
trainer_resources={"CPU": 0} # Trainer doesn't need resources
),
run_config=RunConfig(
name="autoscale-training",
storage_path="s3://my-bucket/ray-results"
)
)
result = trainer.fit()
```
## Network Configuration
### Firewall Rules
**Required ports**:
- **6379**: Ray GCS (Global Control Store)
- **8265**: Ray Dashboard
- **10001**: Ray Client
- **8000-9000**: Worker communication (configurable)
**AWS Security Group**:
```bash
# Allow Ray ports within cluster
aws ec2 authorize-security-group-ingress
--group-id sg-xxxxx
--source-group sg-xxxxx
--protocol tcp
--port 6379
aws ec2 authorize-security-group-ingress
--group-id sg-xxxxx
--source-group sg-xxxxx
--protocol tcp
--port 8000-9000
```
### High-Performance Networking
**Enable InfiniBand/RDMA** (on-prem):
```bash
# Set Ray to use specific network interface
export RAY_BACKEND_LOG_LEVEL=debug
export NCCL_SOCKET_IFNAME=ib0 # InfiniBand interface
export NCCL_IB_DISABLE=0 # Enable InfiniBand
ray start --head --node-ip-address=$(ip addr show ib0 | grep 'inet ' | awk '{print $2}' | cut -d/ -f1)
```
**AWS Enhanced Networking**:
```yaml
# Use ENA (Elastic Network Adapter)
worker_node:
node_config:
InstanceType: p3dn.24xlarge # 100 Gbps networking
EbsOptimized: true
NetworkInterfaces:
- DeviceIndex: 0
DeleteOnTermination: true
InterfaceType: ena # Enhanced networking
```
## Monitoring and Debugging
### Ray Dashboard
**Access dashboard**:
```bash
# Local: http://localhost:8265
# Remote: http://:8265
# SSH tunnel for secure access
ssh -L 8265:localhost:8265 user@
```
**Dashboard features**:
- Cluster utilization (CPU, GPU, memory)
- Running tasks and actors
- Object store usage
- Logs and errors
### Cluster Logs
**View logs**:
```bash
# Head node logs
tail -f /tmp/ray/session_latest/logs/monitor.log
# Worker node logs
tail -f /tmp/ray/session_latest/logs/raylet.log
# All logs
ray logs
```
**Python logging**:
```python
import logging
logger = logging.getLogger("ray")
logger.setLevel(logging.DEBUG)
# In training function
def train_func(config):
logger.info(f"Worker {ray.get_runtime_context().get_worker_id()} starting")
# Training...
```
## Best Practices
### 1. Placement Strategies
```python
# PACK: Pack workers on fewer nodes (better for communication)
ScalingConfig(num_workers=16, placement_strategy="PACK")
# SPREAD: Spread across nodes (better for fault tolerance)
ScalingConfig(num_workers=16, placement_strategy="SPREAD")
# STRICT_SPREAD: Exactly one worker per node
ScalingConfig(num_workers=4, placement_strategy="STRICT_SPREAD")
```
### 2. Resource Allocation
```python
# Reserve resources per worker
ScalingConfig(
num_workers=8,
use_gpu=True,
resources_per_worker={"CPU": 8, "GPU": 1}, # Explicit allocation
trainer_resources={"CPU": 2} # Reserve for trainer
)
```
### 3. Fault Tolerance
```python
from ray.train import RunConfig, FailureConfig
trainer = TorchTrainer(
train_func,
run_config=RunConfig(
failure_config=FailureConfig(
max_failures=3 # Retry up to 3 times on worker failure
)
)
)
```
## Resources
- Ray Cluster Launcher: https://docs.ray.io/en/latest/cluster/getting-started.html
- KubeRay: https://docs.ray.io/en/latest/cluster/kubernetes/index.html
- SLURM: https://docs.ray.io/en/latest/cluster/vms/user-guides/launching-clusters/slurm.html
- Autoscaling: https://docs.ray.io/en/latest/cluster/vms/user-guides/configuring-autoscaling.html
Source: claude-code-templates (MIT). See About Us for full credits.