From 611e282d239b9e6ce8c3016a849593b9a9077838 Mon Sep 17 00:00:00 2001 From: Eddie Mattia Date: Thu, 2 Nov 2023 16:12:26 -0700 Subject: [PATCH] formatting --- examples/multi-node/flow.py | 1 + examples/multi-node/reload.ipynb | 7 +- examples/single-node/flow.py | 22 ++-- .../single-node/mnist_mirrored_strategy.py | 108 +++++++++++------- examples/single-node/reload.ipynb | 7 +- examples/test_import.py | 8 +- .../plugins/mfextinit_tensorflow.py | 4 +- .../plugins/tensorflow_decorator.py | 21 ++-- setup.py | 4 +- 9 files changed, 105 insertions(+), 77 deletions(-) diff --git a/examples/multi-node/flow.py b/examples/multi-node/flow.py index bf9c6d5..4e6f54f 100644 --- a/examples/multi-node/flow.py +++ b/examples/multi-node/flow.py @@ -3,6 +3,7 @@ N_NODES = 2 N_GPU = 2 + class MultiNodeTensorFlow(FlowSpec): tarfile = "mnist.tar.gz" local_model_dir = "model" diff --git a/examples/multi-node/reload.ipynb b/examples/multi-node/reload.ipynb index 6448f63..f5875b2 100644 --- a/examples/multi-node/reload.ipynb +++ b/examples/multi-node/reload.ipynb @@ -18,8 +18,8 @@ "metadata": {}, "outputs": [], "source": [ - "FLOW_NAME='MultiNodeTensorFlow'\n", - "model_unzip_path = 'model'" + "FLOW_NAME = \"MultiNodeTensorFlow\"\n", + "model_unzip_path = \"model\"" ] }, { @@ -63,7 +63,8 @@ "outputs": [], "source": [ "import tensorflow as tf\n", - "(x_train, y_train), (x_valid, y_valid) = tf.keras.datasets.mnist.load_data()\n", + "\n", + "(x_train, y_train), (x_valid, y_valid) = tf.keras.datasets.mnist.load_data()\n", "probs = model.predict(x_valid)\n", "preds = probs.argmax(axis=1)\n", "correct_pred_ct = (preds == y_valid).sum()\n", diff --git a/examples/single-node/flow.py b/examples/single-node/flow.py index 245d1ae..f4487d2 100644 --- a/examples/single-node/flow.py +++ b/examples/single-node/flow.py @@ -1,25 +1,26 @@ from metaflow import FlowSpec, step, batch, conda, environment -N_GPU=2 +N_GPU = 2 -class SingleNodeTensorFlow(FlowSpec): - local_model_dir = 'model' - local_tar_name = 'mnist.tar.gz' +class SingleNodeTensorFlow(FlowSpec): + local_model_dir = "model" + local_tar_name = "mnist.tar.gz" @step def start(self): self.next(self.foo) - + @environment(vars={"TF_CPP_MIN_LOG_LEVEL": "2"}) - @batch(gpu=N_GPU, image='tensorflow/tensorflow:latest-gpu') + @batch(gpu=N_GPU, image="tensorflow/tensorflow:latest-gpu") @step def foo(self): from mnist_mirrored_strategy import main + main( - run = self, - local_model_dir = self.local_model_dir, - local_tar_name = self.local_tar_name + run=self, + local_model_dir=self.local_model_dir, + local_tar_name=self.local_tar_name, ) self.next(self.end) @@ -27,5 +28,6 @@ def foo(self): def end(self): pass + if __name__ == "__main__": - SingleNodeTensorFlow() \ No newline at end of file + SingleNodeTensorFlow() diff --git a/examples/single-node/mnist_mirrored_strategy.py b/examples/single-node/mnist_mirrored_strategy.py index 4835822..fee9acc 100644 --- a/examples/single-node/mnist_mirrored_strategy.py +++ b/examples/single-node/mnist_mirrored_strategy.py @@ -1,19 +1,27 @@ import os import tensorflow as tf + try: import tensorflow_datasets as tfds except ImportError: import os import subprocess + with open(os.devnull, "wb") as devnull: - subprocess.check_call(["pip", "install", "tensorflow-datasets"], stdout=devnull, stderr=subprocess.STDOUT) + subprocess.check_call( + ["pip", "install", "tensorflow-datasets"], + stdout=devnull, + stderr=subprocess.STDOUT, + ) import tensorflow_datasets as tfds + def scale(image, label): image = tf.cast(image, tf.float32) image /= 255 return image, label + def decay(epoch): if epoch < 3: return 1e-3 @@ -24,76 +32,84 @@ def decay(epoch): def keras_model_path_to_tar( - local_model_dir: str = '/model', - local_tar_name = 'model.tar.gz' + local_model_dir: str = "/model", local_tar_name="model.tar.gz" ): import tarfile + with tarfile.open(local_tar_name, mode="w:gz") as _tar: _tar.add(local_model_dir, recursive=True) return local_tar_name def main( - checkpoint_dir = './training_checkpoints', - local_model_dir = '/tmp', - local_tar_name = 'model.tar.gz', - run=None + checkpoint_dir="./training_checkpoints", + local_model_dir="/tmp", + local_tar_name="model.tar.gz", + run=None, ): - # download data - datasets, info = tfds.load(name='mnist', with_info=True, as_supervised=True) - mnist_train, mnist_test = datasets['train'], datasets['test'] + datasets, info = tfds.load(name="mnist", with_info=True, as_supervised=True) + mnist_train, mnist_test = datasets["train"], datasets["test"] # Define the distribution strategy strategy = tf.distribute.MirroredStrategy() - print('Number of devices: {}'.format(strategy.num_replicas_in_sync)) + print("Number of devices: {}".format(strategy.num_replicas_in_sync)) # Set up the input pipeline - num_train_examples = info.splits['train'].num_examples - num_test_examples = info.splits['test'].num_examples + num_train_examples = info.splits["train"].num_examples + num_test_examples = info.splits["test"].num_examples BUFFER_SIZE = 10000 BATCH_SIZE_PER_REPLICA = 64 BATCH_SIZE = BATCH_SIZE_PER_REPLICA * strategy.num_replicas_in_sync - train_dataset = mnist_train.map(scale).cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE) + train_dataset = ( + mnist_train.map(scale).cache().shuffle(BUFFER_SIZE).batch(BATCH_SIZE) + ) eval_dataset = mnist_test.map(scale).batch(BATCH_SIZE) # Create the model and instantiate the optimizer with strategy.scope(): - model = tf.keras.Sequential([ - tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28, 28, 1)), - tf.keras.layers.MaxPooling2D(), - tf.keras.layers.Flatten(), - tf.keras.layers.Dense(64, activation='relu'), - tf.keras.layers.Dense(10) - ]) - - model.compile(loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True), - optimizer=tf.keras.optimizers.Adam(learning_rate=0.001), - metrics=['accuracy']) - - - + model = tf.keras.Sequential( + [ + tf.keras.layers.Conv2D( + 32, 3, activation="relu", input_shape=(28, 28, 1) + ), + tf.keras.layers.MaxPooling2D(), + tf.keras.layers.Flatten(), + tf.keras.layers.Dense(64, activation="relu"), + tf.keras.layers.Dense(10), + ] + ) + + model.compile( + loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True), + optimizer=tf.keras.optimizers.Adam(learning_rate=0.001), + metrics=["accuracy"], + ) + # Define the name of the checkpoint files. checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt_{epoch}") # Define a callback for printing the learning rate at the end of each epoch. class PrintLR(tf.keras.callbacks.Callback): def on_epoch_end(self, epoch, logs=None): - print('\nLearning rate for epoch {} is {}'.format(epoch + 1, model.optimizer.lr.numpy())) + print( + "\nLearning rate for epoch {} is {}".format( + epoch + 1, model.optimizer.lr.numpy() + ) + ) # Put all the callbacks together. callbacks = [ - tf.keras.callbacks.TensorBoard(log_dir='./logs'), + tf.keras.callbacks.TensorBoard(log_dir="./logs"), tf.keras.callbacks.ModelCheckpoint( - filepath=checkpoint_prefix, - save_weights_only=True + filepath=checkpoint_prefix, save_weights_only=True ), tf.keras.callbacks.LearningRateScheduler(decay), PrintLR(), - tf.keras.callbacks.BackupAndRestore(backup_dir='/tmp/backup') + tf.keras.callbacks.BackupAndRestore(backup_dir="/tmp/backup"), ] # Train and evaluate @@ -106,16 +122,17 @@ def on_epoch_end(self, epoch, logs=None): # Restore the latest checkpoint model.load_weights(tf.train.latest_checkpoint(checkpoint_dir)) eval_loss, eval_acc = model.evaluate(eval_dataset) - print('Eval loss: {}, Eval accuracy: {}'.format(eval_loss, eval_acc)) + print("Eval loss: {}, Eval accuracy: {}".format(eval_loss, eval_acc)) # Save the model model.save(local_model_dir) # Zip the model dir and send to S3 for future use from metaflow import S3 + tar_file = keras_model_path_to_tar(local_model_dir, local_tar_name) - key = tar_file.split('/')[-1] - s3 = S3(run=run) + key = tar_file.split("/")[-1] + s3 = S3(run=run) s3.put_files([(key, tar_file)]) # load model without scope @@ -124,21 +141,24 @@ def on_epoch_end(self, epoch, logs=None): unreplicated_model.compile( loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True), optimizer=tf.keras.optimizers.Adam(), - metrics=['accuracy'] + metrics=["accuracy"], ) eval_loss, eval_acc = unreplicated_model.evaluate(eval_dataset) - print('Eval loss: {}, Eval Accuracy: {}'.format(eval_loss, eval_acc)) + print("Eval loss: {}, Eval Accuracy: {}".format(eval_loss, eval_acc)) # load model with scope with strategy.scope(): replicated_model = tf.keras.models.load_model(local_model_dir) - replicated_model.compile(loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True), - optimizer=tf.keras.optimizers.Adam(), - metrics=['accuracy']) + replicated_model.compile( + loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True), + optimizer=tf.keras.optimizers.Adam(), + metrics=["accuracy"], + ) eval_loss, eval_acc = replicated_model.evaluate(eval_dataset) - print ('Eval loss: {}, Eval Accuracy: {}'.format(eval_loss, eval_acc)) + print("Eval loss: {}, Eval Accuracy: {}".format(eval_loss, eval_acc)) + -if __name__ == '__main__': - main() \ No newline at end of file +if __name__ == "__main__": + main() diff --git a/examples/single-node/reload.ipynb b/examples/single-node/reload.ipynb index 771ee6e..4e21e09 100644 --- a/examples/single-node/reload.ipynb +++ b/examples/single-node/reload.ipynb @@ -18,8 +18,8 @@ "metadata": {}, "outputs": [], "source": [ - "FLOW_NAME='SingleNodeTensorFlow'\n", - "model_unzip_path = 'model'" + "FLOW_NAME = \"SingleNodeTensorFlow\"\n", + "model_unzip_path = \"model\"" ] }, { @@ -63,7 +63,8 @@ "outputs": [], "source": [ "import tensorflow as tf\n", - "(x_train, y_train), (x_valid, y_valid) = tf.keras.datasets.mnist.load_data()\n", + "\n", + "(x_train, y_train), (x_valid, y_valid) = tf.keras.datasets.mnist.load_data()\n", "probs = model.predict(x_valid)\n", "preds = probs.argmax(axis=1)\n", "correct_pred_ct = (preds == y_valid).sum()\n", diff --git a/examples/test_import.py b/examples/test_import.py index 5cbd311..0612668 100644 --- a/examples/test_import.py +++ b/examples/test_import.py @@ -1,17 +1,18 @@ from metaflow import FlowSpec, step, kubernetes, conda, tensorflow_parallel -class TestImport(FlowSpec): +class TestImport(FlowSpec): @step def start(self): self.next(self.foo, num_parallel=2) @conda(libraries={"tensorflow": "2.12.1"}) - @kubernetes # comment this out and ensure it runs locally too. + @kubernetes # comment this out and ensure it runs locally too. @tensorflow_parallel @step def foo(self): import tensorflow as tf + self.next(self.join) @step @@ -22,5 +23,6 @@ def join(self, inputs): def end(self): pass + if __name__ == "__main__": - TestImport() \ No newline at end of file + TestImport() diff --git a/metaflow_extensions/tensorflow/plugins/mfextinit_tensorflow.py b/metaflow_extensions/tensorflow/plugins/mfextinit_tensorflow.py index 3003a7f..446bf40 100644 --- a/metaflow_extensions/tensorflow/plugins/mfextinit_tensorflow.py +++ b/metaflow_extensions/tensorflow/plugins/mfextinit_tensorflow.py @@ -1 +1,3 @@ -STEP_DECORATORS_DESC = [("tensorflow", ".tensorflow_decorator.TensorFlowParallelDecorator")] \ No newline at end of file +STEP_DECORATORS_DESC = [ + ("tensorflow", ".tensorflow_decorator.TensorFlowParallelDecorator") +] diff --git a/metaflow_extensions/tensorflow/plugins/tensorflow_decorator.py b/metaflow_extensions/tensorflow/plugins/tensorflow_decorator.py index c1fcd7b..48542ea 100644 --- a/metaflow_extensions/tensorflow/plugins/tensorflow_decorator.py +++ b/metaflow_extensions/tensorflow/plugins/tensorflow_decorator.py @@ -11,10 +11,13 @@ from threading import Thread from metaflow.exception import MetaflowException from metaflow.unbounded_foreach import UBF_CONTROL -from metaflow.plugins.parallel_decorator import ParallelDecorator, _local_multinode_control_task_step_func +from metaflow.plugins.parallel_decorator import ( + ParallelDecorator, + _local_multinode_control_task_step_func, +) -class TensorFlowParallelDecorator(ParallelDecorator): +class TensorFlowParallelDecorator(ParallelDecorator): name = "tensorflow" defaults = {} IS_PARALLEL = True @@ -22,7 +25,6 @@ class TensorFlowParallelDecorator(ParallelDecorator): def task_decorate( self, step_func, flow, graph, retry_count, max_user_code_retries, ubf_context ): - def _empty_mapper_task(): pass @@ -32,10 +34,10 @@ def _empty_mapper_task(): return partial( _local_multinode_control_task_step_func, flow, - env_to_use, + env_to_use, step_func, retry_count, - ) + ) return partial(_empty_mapper_task) return super().task_decorate( @@ -45,8 +47,8 @@ def _empty_mapper_task(): def setup_distributed_env(self, flow): setup_tf_distributed(flow) -def setup_tf_distributed(run): +def setup_tf_distributed(run): from metaflow import current, S3 num_nodes = current.parallel.num_nodes @@ -77,10 +79,7 @@ def setup_tf_distributed(run): my_task = {"type": "worker", "index": node_index} cluster = { - "worker": [ - all_workers[node_id]["address"] - for node_id in range(num_nodes) - ] + "worker": [all_workers[node_id]["address"] for node_id in range(num_nodes)] } json_config = json.dumps({"cluster": cluster, "task": my_task}) - os.environ["TF_CONFIG"] = json_config \ No newline at end of file + os.environ["TF_CONFIG"] = json_config diff --git a/setup.py b/setup.py index 20238ff..0fa308e 100644 --- a/setup.py +++ b/setup.py @@ -12,5 +12,5 @@ py_modules=[ "metaflow_extensions", ], - install_requires=[] -) \ No newline at end of file + install_requires=[], +)