Skip to content

Do not submit: Multinode training seems to be working #1314

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 80 additions & 0 deletions slurm_b200
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
#!/bin/bash
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.

# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.

# --- This script is optimized for AWS with EFA
# --- adjust NCCL_BUFFSIZE if you encounter memory
# --- constraint issues or to tune for improved performance.
# ---

#SBATCH --job-name=ahmads_titan1

#SBATCH --ntasks=2

#SBATCH --nodes=2

#SBATCH --cpus-per-task=96
#use this to run with specific nodes:
# --nodelist=slurm-compute-node-[3-8,22-31,33-55,57-74,76-80,82-92,94,96,98,187,193-194,200,202-203,205,207-212,214-215,217-225,227-234,236,238-248]
# and --exclude=slurm-compute-node-[node ids list here]

# not needed for b200 .... export NCCL_IB_HCA=^mlx5_0:1
# export NCCL_TOPO_FILE=/etc/crusoe/nccl_topo/h200-141gb-sxm-ib-cloud-hypervisor.xml
export UCX_NET_DEVICES=ens7

nodes=( $( scontrol show hostnames $SLURM_JOB_NODELIST ) )
nodes_array=($nodes)
head_node=${nodes_array[0]}
head_node_ip=$(srun --nodes=1 --ntasks=1 -w "$head_node" hostname --ip-address)

echo All nodes: ${nodes[@]}
echo Node IP: $head_node_ip
export LOGLEVEL=INFO

# export NCCL_TOPO_FILE=/etc/crusoe/nccl_topo/h200-141gb-sxm-ib-cloud-hypervisor.xml
# Enable for A100
# export FI_PROVIDER="efa"
# Ensure that P2P is available
# export NCCL_P2P_DISABLE=1
export NCCL_IB_DISABLE=0

# debugging flags (optional)
export NCCL_DEBUG=WARN
export PYTHONFAULTHANDLER=1
# optional debug settings
# export NCCL_DEBUG=INFO
# NCCL_DEBUG_SUBSYS=INIT,GRAPH,ENV

#export LD_LIBRARY_PATH=/opt/amazon/efa/lib:$LD_LIBRARY_PATH
#export LD_LIBRARY_PATH=/usr/local/lib/:$LD_LIBRARY_PATH
export CUDA_LAUNCH_BLOCKING=0
export WANDB_PROJECT="b200_titan"
# on your cluster you might need these:
# set the network interface
export NCCL_SOCKET_IFNAME="eth0,en,eth,em,bond"
export NCCL_BUFFSIZE=2097152
#export TORCH_DIST_INIT_BARRIER=1
export FI_EFA_SET_CUDA_SYNC_MEMOPS=0

TITAN_CONFIG_FILE=${TITAN_CONFIG_FILE:-"./torchtitan/models/llama3/train_configs/llama3_8b.toml"}

# dcgmi profile --pause
# adjust sbatch --ntasks and sbatch --nodes above and --nnodes below
# to your specific node count, and update target launch file.
# This works with multiple nodes now:
#srun conda run -n ahmads_titan torchrun --nnodes 2 --nproc_per_node 8 --rdzv_id 101 --rdzv_backend c10d --rdzv_endpoint "$head_node_ip:29500" -m torchtitan.train --job.config_file ${TITAN_CONFIG_FILE}
#exit

# This is the baseline and it works.
#export NGPU=8
#srun conda run -n ahmads_titan ./run_train.sh

export LOCAL_WORLD_SIZE=8
export NUM_HOSTS=8
export MASTER_ADDR=$head_node_ip
export MASTER_PORT=12347

srun --nodes=$NUM_HOSTS --ntasks=$NUM_HOSTS conda run -n ahmads_titan python -m torchtitan.train_monarch --job.config_file ./torchtitan/models/llama3/train_configs/llama3_8b.toml
96 changes: 96 additions & 0 deletions slurm_hyperactor
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
#!/bin/bash
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.

# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.

# --- This script is optimized for AWS with EFA
# --- adjust NCCL_BUFFSIZE if you encounter memory
# --- constraint issues or to tune for improved performance.
# ---

#SBATCH --job-name=ahmads_titan1

#SBATCH --ntasks=8

#SBATCH --nodes=8

#SBATCH --cpus-per-task=96
#use this to run with specific nodes:
# --nodelist=slurm-compute-node-[3-8,22-31,33-55,57-74,76-80,82-92,94,96,98,187,193-194,200,202-203,205,207-212,214-215,217-225,227-234,236,238-248]
# and --exclude=slurm-compute-node-[node ids list here]

# not needed for b200 .... export NCCL_IB_HCA=^mlx5_0:1
# export NCCL_TOPO_FILE=/etc/crusoe/nccl_topo/h200-141gb-sxm-ib-cloud-hypervisor.xml
export UCX_NET_DEVICES=ens7

nodes=( $( scontrol show hostnames $SLURM_JOB_NODELIST ) )
nodes_array=($nodes)
head_node=${nodes_array[0]}
head_node_ip=$(srun --nodes=1 --ntasks=1 -w "$head_node" hostname --ip-address)

echo All nodes: ${nodes[@]}
echo Node IP: $head_node_ip

node_ips=()
for node in "${nodes[@]}"
do
node_ip=$(getent hosts $node | awk '{ print $1 }')
node_ips+=($node_ip)
done

echo "All ip addresses"
echo "${node_ips[@]}"


export LOGLEVEL=INFO

# export NCCL_TOPO_FILE=/etc/crusoe/nccl_topo/h200-141gb-sxm-ib-cloud-hypervisor.xml
# Enable for A100
# export FI_PROVIDER="efa"
# Ensure that P2P is available
# export NCCL_P2P_DISABLE=1
export NCCL_IB_DISABLE=0

# debugging flags (optional)
export NCCL_DEBUG=WARN
export PYTHONFAULTHANDLER=1
# optional debug settings
# export NCCL_DEBUG=INFO
# NCCL_DEBUG_SUBSYS=INIT,GRAPH,ENV

#export LD_LIBRARY_PATH=/opt/amazon/efa/lib:$LD_LIBRARY_PATH
#export LD_LIBRARY_PATH=/usr/local/lib/:$LD_LIBRARY_PATH
export CUDA_LAUNCH_BLOCKING=0
export WANDB_PROJECT="b200_titan"
# on your cluster you might need these:
# set the network interface
export NCCL_SOCKET_IFNAME="eth0,en,eth,em,bond"
export NCCL_BUFFSIZE=2097152
#export TORCH_DIST_INIT_BARRIER=1
export FI_EFA_SET_CUDA_SYNC_MEMOPS=0

TITAN_CONFIG_FILE=${TITAN_CONFIG_FILE:-"./torchtitan/models/llama3/train_configs/llama3_8b.toml"}

# dcgmi profile --pause
# adjust sbatch --ntasks and sbatch --nodes above and --nnodes below
# to your specific node count, and update target launch file.
# This works with multiple nodes now:
#srun conda run -n ahmads_titan torchrun --nnodes 2 --nproc_per_node 8 --rdzv_id 101 --rdzv_backend c10d --rdzv_endpoint "$head_node_ip:29500" -m torchtitan.train --job.config_file ${TITAN_CONFIG_FILE}
#exit

# This is the baseline and it works.
#export NGPU=8
#srun conda run -n ahmads_titan ./run_train.sh

export LOCAL_WORLD_SIZE=8
export NUM_HOSTS=8
export MASTER_ADDR=$head_node_ip
export MASTER_PORT=12347
export PYTHONPATH=$PYTHONPATH:$(readlink -f .)

echo "About to run process allocator"
# Run a process allocator in all hosts
srun --nodes=$NUM_HOSTS --ntasks=$NUM_HOSTS conda run -n ahmads_titan3 process_allocator --program=monarch_bootstrap
#python -m torchtitan.train_monarch --job.config_file ./torchtitan/models/llama3/train_configs/llama3_8b.toml
5 changes: 5 additions & 0 deletions torchtitan/train.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
maybe_enable_profiling,
)

logger.info = logger.error


class Trainer(torch.distributed.checkpoint.stateful.Stateful):
job_config: JobConfig
Expand Down Expand Up @@ -536,6 +538,9 @@ def close(self) -> None:
config_manager = ConfigManager()
config = config_manager.parse_args()
trainer: Optional[Trainer] = None
rank = int(os.environ["RANK"])
if rank == 15:
print(os.environ)

try:
trainer = Trainer(config)
Expand Down
141 changes: 141 additions & 0 deletions torchtitan/train_monarch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
import asyncio
import socket
import importlib
import os
import pickle
import threading
import sys
import time
from datetime import timedelta
from logging import getLogger
from typing import Any, Generator, Iterable, Optional
import torch
import torchtitan.components.ft as ft
import torchtitan.protocols.train_spec as train_spec_module
from monarch._rust_bindings.monarch_hyperactor.proc_mesh import ProcMesh as HyProcMesh
from monarch.actor_mesh import Actor, current_rank, endpoint
from monarch.proc_mesh import proc_mesh, ProcMesh
from monarch_meta._monarch_meta import hyperactor_meta
from torchtitan.config_manager import ConfigManager, JobConfig
from torchtitan.tools.logging import init_logger, logger
from .train import Trainer

def pretend_you_are_torchrun(global_rank):
"""
Eventually, Monarch should handle all of this, but it's necessary for now because the job is
not running torchrun. Also there are already better ways to avoid hardcoding this, but
it's a demo and we'll live for now.
"""
# task_id = int(os.environ["TW_TASK_ID"])
# global_rank = task_id * 8 + (global_rank % 8)
task_id = int(os.environ["SLURM_NODEID"])
local_world_size = int(os.environ["LOCAL_WORLD_SIZE"])
num_hosts = int(os.environ["NUM_HOSTS"])

global_rank = task_id * local_world_size + global_rank

world_size = num_hosts * local_world_size
local_rank = min(world_size, global_rank % local_world_size)

group_rank = global_rank // local_world_size
group_world_size = (world_size + local_world_size - 1) // local_world_size

env = {
# "MASTER_ADDR": get_master_addr(),
# "MASTER_PORT": str(20101),
"RANK": str(global_rank),
"LOCAL_RANK": str(local_rank),


# Note that local_world_size is already set.

"GROUP_RANK": str(group_rank),
"GROUP_WORLD_SIZE": str(group_world_size),

"ROLE_RANK": str(global_rank),
"ROLE_WORLD_SIZE": str(world_size),
"ROLE_NAME": "rank",

"WORLD_SIZE": str(world_size),
}
print(f" AHMAD: {global_rank=} {env}")
os.environ.update(env)
if global_rank == 0:
print(f" AHMAD: {global_rank=} {os.environ}")


class TrainerActorWrapper(Actor):
def __init__(self, job_config: JobConfig):
self.job_config = job_config
self.rank = current_rank().rank
hostname = socket.gethostname()
print(f" ===> AHMAD: {self.rank} {hostname=} {current_rank()=}")
pretend_you_are_torchrun(self.rank)

@endpoint
def train(self):
print("Starting training")
pretend_you_are_torchrun(self.rank)
config = self.job_config
trainer: Optional[Trainer] = None

try:
trainer = Trainer(config)
# trainer = self.trainer
tid = threading.get_native_id()
logger.error(f"AHMAD tid in train: {self.rank=} {tid=}")
trainer.train()

if config.checkpoint.create_seed_checkpoint:
assert (
int(os.environ["WORLD_SIZE"]) == 1
), "Must create seed checkpoint using a single device, to disable sharding."
assert (
config.checkpoint.enable_checkpoint
), "Must enable checkpointing when creating a seed checkpoint."
trainer.checkpointer.save(curr_step=0, force=True)
logger.info("Created seed checkpoint")
else:
trainer.train()
finally:
if trainer:
trainer.close()

if torch.distributed.is_initialized():
torch.distributed.destroy_process_group()
logger.info("Process group destroyed.")
print("Done training")

async def async_main(job_config: JobConfig):
torch.use_deterministic_algorithms(True)
local_world_size = int(os.environ["LOCAL_WORLD_SIZE"])
num_hosts = int(os.environ["NUM_HOSTS"])
master_addr = os.environ["MASTER_ADDR"]
master_port = os.environ["MASTER_PORT"]
world_size = local_world_size * num_hosts

local_proc_mesh = await proc_mesh(
gpus=local_world_size,
env={
"MASTER_ADDR": master_addr,
"MASTER_PORT": master_port,
},
)
print(job_config)
trainer_actor = await local_proc_mesh.spawn(
"trainer_actor", TrainerActorWrapper, job_config
)
await trainer_actor.train.call()


if __name__ == "__main__":
init_logger()
config_manager = ConfigManager()
config = config_manager.parse_args()
asyncio.run(async_main(config))
sys.exit(0)
Loading
Loading