Skip to content

Commit

Permalink
formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
emattia committed Nov 2, 2023
1 parent afa1c04 commit 611e282
Show file tree
Hide file tree
Showing 9 changed files with 105 additions and 77 deletions.
1 change: 1 addition & 0 deletions examples/multi-node/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
N_NODES = 2
N_GPU = 2


class MultiNodeTensorFlow(FlowSpec):
tarfile = "mnist.tar.gz"
local_model_dir = "model"
Expand Down
7 changes: 4 additions & 3 deletions examples/multi-node/reload.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
"metadata": {},
"outputs": [],
"source": [
"FLOW_NAME='MultiNodeTensorFlow'\n",
"model_unzip_path = 'model'"
"FLOW_NAME = \"MultiNodeTensorFlow\"\n",
"model_unzip_path = \"model\""
]
},
{
Expand Down Expand Up @@ -63,7 +63,8 @@
"outputs": [],
"source": [
"import tensorflow as tf\n",
"(x_train, y_train), (x_valid, y_valid) = tf.keras.datasets.mnist.load_data()\n",
"\n",
"(x_train, y_train), (x_valid, y_valid) = tf.keras.datasets.mnist.load_data()\n",
"probs = model.predict(x_valid)\n",
"preds = probs.argmax(axis=1)\n",
"correct_pred_ct = (preds == y_valid).sum()\n",
Expand Down
22 changes: 12 additions & 10 deletions examples/single-node/flow.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,33 @@
from metaflow import FlowSpec, step, batch, conda, environment

N_GPU=2
N_GPU = 2

class SingleNodeTensorFlow(FlowSpec):

local_model_dir = 'model'
local_tar_name = 'mnist.tar.gz'
class SingleNodeTensorFlow(FlowSpec):
local_model_dir = "model"
local_tar_name = "mnist.tar.gz"

@step
def start(self):
self.next(self.foo)

@environment(vars={"TF_CPP_MIN_LOG_LEVEL": "2"})
@batch(gpu=N_GPU, image='tensorflow/tensorflow:latest-gpu')
@batch(gpu=N_GPU, image="tensorflow/tensorflow:latest-gpu")
@step
def foo(self):
from mnist_mirrored_strategy import main

main(
run = self,
local_model_dir = self.local_model_dir,
local_tar_name = self.local_tar_name
run=self,
local_model_dir=self.local_model_dir,
local_tar_name=self.local_tar_name,
)
self.next(self.end)

@step
def end(self):
pass


if __name__ == "__main__":
SingleNodeTensorFlow()
SingleNodeTensorFlow()
108 changes: 64 additions & 44 deletions examples/single-node/mnist_mirrored_strategy.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,27 @@
import os
import tensorflow as tf

try:
import tensorflow_datasets as tfds
except ImportError:
import os
import subprocess

with open(os.devnull, "wb") as devnull:
subprocess.check_call(["pip", "install", "tensorflow-datasets"], stdout=devnull, stderr=subprocess.STDOUT)
subprocess.check_call(
["pip", "install", "tensorflow-datasets"],
stdout=devnull,
stderr=subprocess.STDOUT,
)
import tensorflow_datasets as tfds


def scale(image, label):
image = tf.cast(image, tf.float32)
image /= 255
return image, label


def decay(epoch):
if epoch < 3:
return 1e-3
Expand All @@ -24,76 +32,84 @@ def decay(epoch):


def keras_model_path_to_tar(
local_model_dir: str = '/model',
local_tar_name = 'model.tar.gz'
local_model_dir: str = "/model", local_tar_name="model.tar.gz"
):
import tarfile

with tarfile.open(local_tar_name, mode="w:gz") as _tar:
_tar.add(local_model_dir, recursive=True)
return local_tar_name


def main(
checkpoint_dir = './training_checkpoints',
local_model_dir = '/tmp',
local_tar_name = 'model.tar.gz',
run=None
checkpoint_dir="./training_checkpoints",
local_model_dir="/tmp",
local_tar_name="model.tar.gz",
run=None,
):

# download data
datasets, info = tfds.load(name='mnist', with_info=True, as_supervised=True)
mnist_train, mnist_test = datasets['train'], datasets['test']
datasets, info = tfds.load(name="mnist", with_info=True, as_supervised=True)
mnist_train, mnist_test = datasets["train"], datasets["test"]

# Define the distribution strategy
strategy = tf.distribute.MirroredStrategy()
print('Number of devices: {}'.format(strategy.num_replicas_in_sync))
print("Number of devices: {}".format(strategy.num_replicas_in_sync))

# Set up the input pipeline
num_train_examples = info.splits['train'].num_examples
num_test_examples = info.splits['test'].num_examples
num_train_examples = info.splits["train"].num_examples
num_test_examples = info.splits["test"].num_examples

BUFFER_SIZE = 10000

BATCH_SIZE_PER_REPLICA = 64
BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync

train_dataset = mnist_train.map(scale).cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE)
train_dataset = (
mnist_train.map(scale).cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE)
)
eval_dataset = mnist_test.map(scale).batch(BATCH_SIZE)

# Create the model and instantiate the optimizer
with strategy.scope():
model = tf.keras.Sequential([
tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28, 28, 1)),
tf.keras.layers.MaxPooling2D(),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(64, activation='relu'),
tf.keras.layers.Dense(10)
])

model.compile(loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
metrics=['accuracy'])



model = tf.keras.Sequential(
[
tf.keras.layers.Conv2D(
32, 3, activation="relu", input_shape=(28, 28, 1)
),
tf.keras.layers.MaxPooling2D(),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(64, activation="relu"),
tf.keras.layers.Dense(10),
]
)

model.compile(
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
metrics=["accuracy"],
)

# Define the name of the checkpoint files.
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt_{epoch}")

# Define a callback for printing the learning rate at the end of each epoch.
class PrintLR(tf.keras.callbacks.Callback):
def on_epoch_end(self, epoch, logs=None):
print('\nLearning rate for epoch {} is {}'.format(epoch + 1, model.optimizer.lr.numpy()))
print(
"\nLearning rate for epoch {} is {}".format(
epoch + 1, model.optimizer.lr.numpy()
)
)

# Put all the callbacks together.
callbacks = [
tf.keras.callbacks.TensorBoard(log_dir='./logs'),
tf.keras.callbacks.TensorBoard(log_dir="./logs"),
tf.keras.callbacks.ModelCheckpoint(
filepath=checkpoint_prefix,
save_weights_only=True
filepath=checkpoint_prefix, save_weights_only=True
),
tf.keras.callbacks.LearningRateScheduler(decay),
PrintLR(),
tf.keras.callbacks.BackupAndRestore(backup_dir='/tmp/backup')
tf.keras.callbacks.BackupAndRestore(backup_dir="/tmp/backup"),
]

# Train and evaluate
Expand All @@ -106,16 +122,17 @@ def on_epoch_end(self, epoch, logs=None):
# Restore the latest checkpoint
model.load_weights(tf.train.latest_checkpoint(checkpoint_dir))
eval_loss, eval_acc = model.evaluate(eval_dataset)
print('Eval loss: {}, Eval accuracy: {}'.format(eval_loss, eval_acc))
print("Eval loss: {}, Eval accuracy: {}".format(eval_loss, eval_acc))

# Save the model
model.save(local_model_dir)

# Zip the model dir and send to S3 for future use
from metaflow import S3

tar_file = keras_model_path_to_tar(local_model_dir, local_tar_name)
key = tar_file.split('/')[-1]
s3 = S3(run=run)
key = tar_file.split("/")[-1]
s3 = S3(run=run)
s3.put_files([(key, tar_file)])

# load model without scope
Expand All @@ -124,21 +141,24 @@ def on_epoch_end(self, epoch, logs=None):
unreplicated_model.compile(
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
optimizer=tf.keras.optimizers.Adam(),
metrics=['accuracy']
metrics=["accuracy"],
)

eval_loss, eval_acc = unreplicated_model.evaluate(eval_dataset)
print('Eval loss: {}, Eval Accuracy: {}'.format(eval_loss, eval_acc))
print("Eval loss: {}, Eval Accuracy: {}".format(eval_loss, eval_acc))

# load model with scope
with strategy.scope():
replicated_model = tf.keras.models.load_model(local_model_dir)
replicated_model.compile(loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
optimizer=tf.keras.optimizers.Adam(),
metrics=['accuracy'])
replicated_model.compile(
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
optimizer=tf.keras.optimizers.Adam(),
metrics=["accuracy"],
)

eval_loss, eval_acc = replicated_model.evaluate(eval_dataset)
print ('Eval loss: {}, Eval Accuracy: {}'.format(eval_loss, eval_acc))
print("Eval loss: {}, Eval Accuracy: {}".format(eval_loss, eval_acc))


if __name__ == '__main__':
main()
if __name__ == "__main__":
main()
7 changes: 4 additions & 3 deletions examples/single-node/reload.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
"metadata": {},
"outputs": [],
"source": [
"FLOW_NAME='SingleNodeTensorFlow'\n",
"model_unzip_path = 'model'"
"FLOW_NAME = \"SingleNodeTensorFlow\"\n",
"model_unzip_path = \"model\""
]
},
{
Expand Down Expand Up @@ -63,7 +63,8 @@
"outputs": [],
"source": [
"import tensorflow as tf\n",
"(x_train, y_train), (x_valid, y_valid) = tf.keras.datasets.mnist.load_data()\n",
"\n",
"(x_train, y_train), (x_valid, y_valid) = tf.keras.datasets.mnist.load_data()\n",
"probs = model.predict(x_valid)\n",
"preds = probs.argmax(axis=1)\n",
"correct_pred_ct = (preds == y_valid).sum()\n",
Expand Down
8 changes: 5 additions & 3 deletions examples/test_import.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
from metaflow import FlowSpec, step, kubernetes, conda, tensorflow_parallel

class TestImport(FlowSpec):

class TestImport(FlowSpec):
@step
def start(self):
self.next(self.foo, num_parallel=2)

@conda(libraries={"tensorflow": "2.12.1"})
@kubernetes # comment this out and ensure it runs locally too.
@kubernetes # comment this out and ensure it runs locally too.
@tensorflow_parallel
@step
def foo(self):
import tensorflow as tf

self.next(self.join)

@step
Expand All @@ -22,5 +23,6 @@ def join(self, inputs):
def end(self):
pass


if __name__ == "__main__":
TestImport()
TestImport()
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
STEP_DECORATORS_DESC = [("tensorflow", ".tensorflow_decorator.TensorFlowParallelDecorator")]
STEP_DECORATORS_DESC = [
("tensorflow", ".tensorflow_decorator.TensorFlowParallelDecorator")
]
21 changes: 10 additions & 11 deletions metaflow_extensions/tensorflow/plugins/tensorflow_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,20 @@
from threading import Thread
from metaflow.exception import MetaflowException
from metaflow.unbounded_foreach import UBF_CONTROL
from metaflow.plugins.parallel_decorator import ParallelDecorator, _local_multinode_control_task_step_func
from metaflow.plugins.parallel_decorator import (
ParallelDecorator,
_local_multinode_control_task_step_func,
)

class TensorFlowParallelDecorator(ParallelDecorator):

class TensorFlowParallelDecorator(ParallelDecorator):
name = "tensorflow"
defaults = {}
IS_PARALLEL = True

def task_decorate(
self, step_func, flow, graph, retry_count, max_user_code_retries, ubf_context
):

def _empty_mapper_task():
pass

Expand All @@ -32,10 +34,10 @@ def _empty_mapper_task():
return partial(
_local_multinode_control_task_step_func,
flow,
env_to_use,
env_to_use,
step_func,
retry_count,
)
)
return partial(_empty_mapper_task)

return super().task_decorate(
Expand All @@ -45,8 +47,8 @@ def _empty_mapper_task():
def setup_distributed_env(self, flow):
setup_tf_distributed(flow)

def setup_tf_distributed(run):

def setup_tf_distributed(run):
from metaflow import current, S3

num_nodes = current.parallel.num_nodes
Expand Down Expand Up @@ -77,10 +79,7 @@ def setup_tf_distributed(run):

my_task = {"type": "worker", "index": node_index}
cluster = {
"worker": [
all_workers[node_id]["address"]
for node_id in range(num_nodes)
]
"worker": [all_workers[node_id]["address"] for node_id in range(num_nodes)]
}
json_config = json.dumps({"cluster": cluster, "task": my_task})
os.environ["TF_CONFIG"] = json_config
os.environ["TF_CONFIG"] = json_config
Loading

0 comments on commit 611e282

Please sign in to comment.