Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix and enhance multi-gpu example #3

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ The examples in this repository are based on the [original TensorFlow Examples](
| Directory | TensorFlow script description |
| :--- | ---: |
| [MirroredStrategy](examples/single-node/README.md) | Synchronous distributed training on multiple GPUs on one machine. |
| [MultiWorkerMirroredStrategy](examples/multi-node/README.md) | Synchronous distributed training across multiple workers, each with potentially multiple GPUs. |
| [MultiWorkerMirroredStrategy](examples/multi_node/README.md) | Synchronous distributed training across multiple workers, each with potentially multiple GPUs. |

#### Parameter Server
Not yet tested, please reach out to the Outerbounds team if you need help.
Expand Down
4 changes: 0 additions & 4 deletions examples/multi-node/README.md

This file was deleted.

14 changes: 14 additions & 0 deletions examples/multi_node/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Introduction

The following four files showcase how to leverage tensorflow's `MultiWorkerMirroredStrategy` with `@kubernetes` and `@tensorflow`. This enables distributed training on multiple GPUs on multiple machines.

1. `gpu_profile.py` contains the `@gpu_profile` decorator, and is available [here](https://github.com/outerbounds/metaflow-gpu-profile). It is used in the file `flow.py`

2. `train_mnist.py` contains the main snippet for how to use the `MultiWorkerMirroredStrategy` while training a model on the MNIST dataset.

3. `flow.py` contains a flow that uses the training code from `train_mnist.py` and uses the docker image `tensorflow/tensorflow:2.15.0-gpu` for GPU setup.

- This can be run using `python flow.py --environment=pypi run`
- If you are on the [Outerbounds](https://outerbounds.com/) platform, you can leverage `fast-bakery` for blazingly fast docker image builds. This can be used by `python flow.py --environment=fast-bakery run`

4. `reload.ipynb` showcases how to use the trained model for inference later on. Please make sure to have `tensorflow==2.15.1` installed locally to be able to run this notebook correctly.
21 changes: 11 additions & 10 deletions examples/multi-node/flow.py → examples/multi_node/flow.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,31 @@
from metaflow import FlowSpec, step, batch, conda, tensorflow, environment

N_NODES = 2
N_GPU = 2
from metaflow import FlowSpec, step, kubernetes, environment, pypi, tensorflow
from gpu_profile import gpu_profile


class MultiNodeTensorFlow(FlowSpec):
tarfile = "mnist.tar.gz"
local_model_dir = "model"
local_tar_name = "mnist.tar.gz"

@step
def start(self):
self.next(self.train, num_parallel=N_NODES)
self.next(self.train, num_parallel=2)

@gpu_profile(interval=1)
@environment(vars={"TF_CPP_MIN_LOG_LEVEL": "2"})
@batch(gpu=N_GPU, image="tensorflow/tensorflow:latest-gpu")
@kubernetes(gpu=2, image="registry.hub.docker.com/tensorflow/tensorflow:2.15.0-gpu")
@pypi(packages={"matplotlib": "3.10.0",})
@tensorflow
@step
def train(self):
from mnist import main
from train_mnist import main

main(
num_workers=N_NODES,
num_workers=2,
run=self,
local_model_dir=self.local_model_dir,
local_tar_name=self.tarfile,
local_tar_name=self.local_tar_name,
)

self.next(self.join)

@step
Expand Down
324 changes: 324 additions & 0 deletions examples/multi_node/gpu_profile.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,324 @@
import re
import os
from tempfile import TemporaryFile
from subprocess import check_call, check_output, Popen
from datetime import datetime
from functools import wraps

# Card plot styles
MEM_COLOR = "#0c64d6"
GPU_COLOR = "#ff69b4"
AXES_COLOR = "#666"
LABEL_COLOR = "#333"
FONTSIZE = 10
AXES_LINEWIDTH = 0.5
PLOT_LINEWIDTH = 1.2
WIDTH = 12
HEIGHT = 8

DRIVER_VER = re.compile(b"Driver Version: (.+?) ")
CUDA_VER = re.compile(b"CUDA Version:(.*) ")

MONITOR_FIELDS = [
"timestamp",
"gpu_utilization",
"memory_used",
"memory_total",
]

MONITOR = """
set -e;
while kill -0 {pid} 2>/dev/null;
do
nvidia-smi
--query-gpu=pci.bus_id,timestamp,utilization.gpu,memory.used,memory.total
--format=csv,noheader,nounits;
sleep {interval};
done
""".replace(
"\n", " "
)


class GPUProfiler:
def __init__(self, interval=1):
self.driver_ver, self.cuda_ver, self.error = self._read_versions()
if self.error:
self.devices = []
else:
self.devices = self._read_devices()
self._monitor_out = TemporaryFile()
cmd = MONITOR.format(interval=interval, pid=os.getpid())
self._monitor_proc = Popen(["bash", "-c", cmd], stdout=self._monitor_out)

def finish(self):
ret = {
"error": self.error,
"cuda_version": self.cuda_ver,
"driver_version": self.driver_ver,
}
if self.error:
return ret
else:
self._monitor_proc.terminate()
ret["devices"] = self.devices
ret["profile"] = self._read_monitor()
return ret

def _read_monitor(self):
devdata = {}
self._monitor_out.seek(0)
for line in self._monitor_out:
fields = [f.strip() for f in line.decode("utf-8").split(",")]
if len(fields) == len(MONITOR_FIELDS) + 1:
# strip subsecond resolution from timestamps that doesn't align across devices
fields[1] = fields[1].split(".")[0]
if fields[0] in devdata:
data = devdata[fields[0]]
else:
devdata[fields[0]] = data = {}

for i, field in enumerate(MONITOR_FIELDS):
if field not in data:
data[field] = []
data[field].append(fields[i + 1])
else:
# expect that the last line may be truncated
break
return devdata

def _read_versions(self):
def parse(r, s):
return r.search(s).group(1).strip().decode("utf-8")

try:
out = check_output(["nvidia-smi"])
return parse(DRIVER_VER, out), parse(CUDA_VER, out), None
except FileNotFoundError:
return None, None, "nvidia-smi not found"
except AttributeError:
return None, None, "nvidia-smi output is unexpected"
except:
return None, None, "nvidia-smi error"

def _read_devices(self):
out = check_output(
[
"nvidia-smi",
"--query-gpu=name,pci.bus_id,memory.total",
"--format=csv,noheader",
]
)
return [
dict(
zip(("name", "device_id", "memory"), (x.strip() for x in l.split(",")))
)
for l in out.decode("utf-8").splitlines()
]


class gpu_profile:
def __init__(
self,
with_card=True,
include_artifacts=True,
artifact_prefix="gpu_profile_",
interval=1,
):
self.with_card = with_card
self.include_artifacts = include_artifacts
self.artifact_prefix = artifact_prefix
self.interval = interval

def __call__(self, f):
@wraps(f)
def func(s):
prof = GPUProfiler(interval=self.interval)
if self.include_artifacts:
setattr(s, self.artifact_prefix + "num_gpus", len(prof.devices))
try:
f(s)
finally:
try:
results = prof.finish()
except:
results = {"error": "couldn't read profiler results"}
if self.include_artifacts:
setattr(s, self.artifact_prefix + "data", results)
if self.with_card:
try:
make_card(results, self.artifact_prefix + "data")
except:
pass

if self.with_card:
from metaflow import card

return card(type="blank", id="gpu_profile")(func)
else:
return func


def make_plot(
tstamps,
vals,
y_label,
legend,
line_color=None,
secondary_y_factor=None,
secondary_y_label="",
):
import matplotlib.dates as mdates
import matplotlib.ticker as mtick
import matplotlib.pyplot as plt

first = tstamps[0]

def seconds_since_start(x):
return (x - mdates.date2num(first)) * (24 * 60 * 60)

with plt.rc_context(
{
"axes.edgecolor": AXES_COLOR,
"axes.linewidth": AXES_LINEWIDTH,
"xtick.color": AXES_COLOR,
"ytick.color": AXES_COLOR,
"text.color": LABEL_COLOR,
"font.size": FONTSIZE,
}
):
fig = plt.figure(figsize=(WIDTH, HEIGHT))
ax = fig.add_subplot(111)

# left Y axis shows %
ax.yaxis.set_major_formatter(mtick.PercentFormatter())
ax.set_ylabel(y_label, labelpad=20)

# top X axis shows seconds since start
topax = ax.secondary_xaxis("top", functions=(seconds_since_start, lambda _: _))
topax.set_xlabel("Seconds since task start", labelpad=20)
# strange bug - secondary x axis become slightly thicker without this
topax.spines["top"].set_linewidth(0)

# bottom X axis shows timestamp
ax.xaxis.set_major_formatter(
mdates.ConciseDateFormatter(ax.xaxis.get_major_locator())
)
ax.set_xlabel("Time", labelpad=20)

if secondary_y_factor is not None:
rightax = ax.secondary_yaxis(
"right",
functions=(
lambda x: (x / 100) * secondary_y_factor,
lambda x: (x / 100) / secondary_y_factor,
),
)
rightax.set_ylabel(secondary_y_label, labelpad=20)

line = ax.plot(tstamps, vals, linewidth=PLOT_LINEWIDTH, color=line_color)
ax.legend([legend], loc="upper left")
return ax


def profile_plots(device_id, profile_data):
data = profile_data[device_id]
tstamps = [datetime.strptime(t, "%Y/%m/%d %H:%M:%S") for t in data["timestamp"]]
gpu = list(map(float, data["gpu_utilization"]))
mem = [
100.0 * float(used) / float(total)
for used, total in zip(data["memory_used"], data["memory_total"])
]
gpu_plot = make_plot(
tstamps, gpu, "GPU utilization", "device: %s" % device_id, line_color=GPU_COLOR
)
mem_plot = make_plot(
tstamps,
mem,
"Memory utilization",
"device: %s" % device_id,
line_color=MEM_COLOR,
secondary_y_factor=float(data["memory_total"][0]),
secondary_y_label="Memory usage in MBs",
)
return gpu_plot, mem_plot


def make_card(results, artifact_name):
from metaflow import current
from metaflow.cards import Table, Markdown, Image

els = []

def _error():
els.append(Markdown(f"## GPU profiler failed:\n```results['error']```"))

def _drivers():
els.append(Markdown("## Drivers"))
els.append(
Table(
[[results["cuda_version"], results["driver_version"]]],
headers=["NVidia driver version", "CUDA version"],
)
)

def _devices():
els.append(Markdown("## Devices"))
rows = [[d["device_id"], d["name"], d["memory"]] for d in results["devices"]]
els.append(Table(rows, headers=["Device ID", "Device type", "GPU memory"]))

def _utilization():
els.append(Markdown("## Maximum utilization"))
rows = []
for device, data in results["profile"].items():
max_gpu = max(map(float, data["gpu_utilization"]))
max_mem = max(map(float, data["memory_used"]))
rows.append([device, "%2.1f%%" % max_gpu, "%dMB" % max_mem])
els.append(Table(rows, headers=["Device ID", "Max GPU %", "Max memory"]))
els.append(Markdown(f"Detailed data saved in an artifact `{artifact_name}`"))

def _plots():
rows = []
for device in results["profile"]:
gpu_plot, mem_plot = profile_plots(device, results["profile"])
rows.append(
[
device,
Image.from_matplotlib(gpu_plot),
Image.from_matplotlib(mem_plot),
]
)
els.append(
Table(rows, headers=["Device ID", "GPU Utilization", "Memory usage"])
)

els.append(Markdown(f"# GPU profile for `{current.pathspec}`"))
if results["error"]:
_error()
else:
_drivers()
_devices()
_utilization()

try:
import matplotlib
except:
els.append(Markdown("Install `matplotlib` to enable plots"))
else:
try:
_plots()
except:
els.append(Markdown("Couldn't create plots"))

for el in els:
current.card["gpu_profile"].append(el)


if __name__ == "__main__":
prof = GPUProfiler()
import time

time.sleep(5 * 60)
import json

print(json.dumps(prof.finish()))
Loading