Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP for adding materializers to examples #1

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions machine_learning/custom_materializers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import dataclasses
import pickle
from typing import Any, Collection, Dict, Type

import numpy as np
from sklearn import base

from hamilton import registry
from hamilton.io import utils
from hamilton.io.data_adapters import DataSaver

# TODO -- put this back in the standard library


@dataclasses.dataclass
class NumpyMatrixToCSV(DataSaver):
path: str
sep: str = ","

def __post_init__(self):
if not self.path.endswith(".csv"):
raise ValueError(f"CSV files must end with .csv, got {self.path}")

def save_data(self, data: np.ndarray) -> Dict[str, Any]:
np.savetxt(self.path, data, delimiter=self.sep)
return utils.get_file_metadata(self.path)

@classmethod
def applicable_types(cls) -> Collection[Type]:
return [np.ndarray]

@classmethod
def name(cls) -> str:
return "csv"


@dataclasses.dataclass
class SKLearnPickler(DataSaver):
path: str

def save_data(self, data: base.ClassifierMixin) -> Dict[str, Any]:
pickle.dump(data, open(self.path, "wb"))
return utils.get_file_metadata(self.path)

@classmethod
def applicable_types(cls) -> Collection[Type]:
return [base.ClassifierMixin]

@classmethod
def name(cls) -> str:
return "pickle"


for adapter in [NumpyMatrixToCSV, SKLearnPickler]:
registry.register_adapter(adapter)
58 changes: 53 additions & 5 deletions machine_learning/run.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,22 @@
import datetime
import os
import click
import json

from dagworks import driver as dw_driver
from hamilton import base as h_base
from hamilton import driver as h_driver
from hamilton.function_modifiers import source
from hamilton.io.materialization import to

from components import iris_loader
from components import feature_transforms
from components import model_fitting
from components import models

import importlib
importlib.import_module("custom_materializers")


from typing import Union

Expand Down Expand Up @@ -38,7 +45,7 @@ def run(dry_run: bool, api_key: str, config: str=None):
"""
# Load the configuration file (optional). It is used to shape the DAG.
config_loaded = _load_config(config)
dag_name = f"machine_learning_dag"
dag_name = f"machine_learning_dag_{datetime.datetime.now().isoformat()}"
# if config is not None:
# dag_name += f"_{config}"
if api_key is None:
Expand All @@ -50,9 +57,9 @@ def run(dry_run: bool, api_key: str, config: str=None):
feature_transforms,
model_fitting,
models,
username="stefan@dagworks.io",
username="elijah@dagworks.io",
api_key=api_key,
project_id=29,
project_id=68,
dag_name=dag_name,
tags={"change_from_previous": "hyperparameter inputs"},
adapter=h_base.SimplePythonGraphAdapter(h_base.DictResult()),
Expand All @@ -66,8 +73,49 @@ def run(dry_run: bool, api_key: str, config: str=None):
models,
adapter=h_base.SimplePythonGraphAdapter(h_base.DictResult()),
)
inputs = {"gamma": 0.001, "penalty": "l2", "solver": "lbfgs"}
result = dr.execute(['best_model'], inputs=inputs)
inputs = {
"gamma": 0.001,
"penalty": "l2",
"solver": "lbfgs",
"prefit_lr_clf_file": "prefit_lr_clf.pkl",
"prefit_svm_clf_file": "prefit_svm_clf.pkl",
"dataset_v2_save_file": "dataset_v2.csv",
}
materializers = [
# to.pickle(
# dependencies=["best_model"],
# id="best_model_params_pkl",
# path=source("best_model_file")),
# classificaiton report to .txt file
# to.file(
# dependencies=["classification_report"],
# id="classification_report_to_txt",
# path=source("classification_report_file")),
# # materialize the model to a pickle file
to.pickle(
dependencies=["lr_model.fit_clf"],
id="fit_clf_lr_to_pickle",
path=source("prefit_lr_clf_file"),
),
to.pickle(
dependencies=["svm_model.fit_clf"],
id="fit_clf_svm_to_pickle",
path=source("prefit_svm_clf_file"),
),
# materialize the predictions we made to a csv file
to.csv(
dependencies=["data_set_v2"],
id="dataset_v2_save",
path=source("dataset_v2_save_file")
),
]
# result = dr.execute(['best_model'], inputs=inputs)
result = dr.materialize(
*materializers,
inputs=inputs
)
import pdb
pdb.set_trace()

print(result)

Expand Down