generated from databricks-industry-solutions/industry-solutions-blueprints
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path02_scalable.py
216 lines (149 loc) · 10.5 KB
/
02_scalable.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
# Databricks notebook source
# MAGIC %md
# MAGIC This solution accelerator notebook is available at [Databricks Industry Solutions](https://github.com/databricks-industry-solutions/daxs).
# COMMAND ----------
# MAGIC %md
# MAGIC # Train Anomaly Detections for 10,000 Turbines using ECOD and Pandas UDFs
# COMMAND ----------
# MAGIC %md
# MAGIC In this notebook, we will demonstrate how ECOD models can be trained on a large dataset by utilizing Pandas UDFs with Spark.
# COMMAND ----------
# MAGIC %md
# MAGIC ## 1. Cluster setup
# MAGIC We recommend using a multi-node CPU cluster with [Databricks Runtime 15.4 LTS for ML](https://docs.databricks.com/en/release-notes/runtime/15.4lts-ml.html) or above. This notebook will leverage [Pandas UDF](https://docs.databricks.com/en/udf/pandas.html) and will utilize all the available resource (i.e., cores). Make sure to set the following Spark configurations before you start your cluster: [`spark.sql.execution.arrow.enabled true`](https://spark.apache.org/docs/3.0.1/sql-pyspark-pandas-with-arrow.html#enabling-for-conversion-tofrom-pandas) and [`spark.sql.adaptive.enabled false`](https://spark.apache.org/docs/latest/sql-performance-tuning.html#adaptive-query-execution). You can do this by specifying [Spark configuration](https://docs.databricks.com/en/compute/configure.html#spark-configuration) in the advanced options on the cluster creation page.
# COMMAND ----------
# DBTITLE 1,Install necessary libraries
# MAGIC %pip install -r requirements.txt --quiet
# MAGIC dbutils.library.restartPython()
# COMMAND ----------
# DBTITLE 1,Run utility functions
# MAGIC %run ./99_utilities
# COMMAND ----------
# DBTITLE 1,Import necessary libraries
import numpy as np
import pandas as pd
from pyod.models.ecod import ECOD
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, FloatType, IntegerType, TimestampType, ArrayType
from pyspark.sql import functions as F
import mlflow
from base64 import urlsafe_b64encode, urlsafe_b64decode
import pickle
# Get the current user name and store it in a variable
current_user_name = spark.sql("SELECT current_user()").collect()[0][0]
# Set the experiment name
mlflow.set_experiment(f"/Users/{current_user_name}/elevator_anomaly_detection")
# COMMAND ----------
# DBTITLE 1,Create the catalog and schema if they don't exist
catalog = "daxs"
db = "default"
# Make sure that the catalog exists
_ = spark.sql(f"CREATE CATALOG IF NOT EXISTS {catalog}")
# Make sure that the schema exists
_ = spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog}.{db}")
# COMMAND ----------
# MAGIC %md
# MAGIC ## 2. Synthetic data creation
# MAGIC
# MAGIC To demonstrate how ECOD efficiently handles large-scale operations, we will generate a synthetic dataset and use ECOD to perform model fitting and prediction. The dataset simulates a fictitious scenario where thousands of wind turbines are monitored in the field. Each turbine is equipped with a hundred sensors, generating readings sampled at a one-minute frequency. Our objective is to train thousands of individual ECOD models and later use them for inference. To achieve this, we leverage [Pandas UDFs](https://docs.databricks.com/en/udf/pandas.html) for distributed processing.
# COMMAND ----------
num_turbines = 10000 # number of turbines
num_sensors = 100 # number of sensors in each turbine
samples_per_turbine = 1440 # corresponds to 1 day of data
# COMMAND ----------
# MAGIC %md
# MAGIC We will set the number of [Spark shuffle partitions](https://spark.apache.org/docs/3.5.1/sql-performance-tuning.html) equal to the number of turbines. This ensures that the same number of Spark tasks as turbines is created when performing a `groupby` operation before applying `applyInPandas`. This approach allows us to utilize resources efficiently.
# COMMAND ----------
sqlContext.setConf("spark.sql.shuffle.partitions", num_turbines)
# COMMAND ----------
# MAGIC %md
# MAGIC We run a custom function, `create_turbine_dataset`, defined in the `99_utilities` notebook to create the dataset. Under the hood, the function leverages the Pandas UDF to generate data from a hundred sensors across thousands of turbines. Once generated, the dataset is written to a Delta table: `{catalog}.{db}.turbine_data_{num_turbines}`.
# COMMAND ----------
create_turbine_dataset(catalog, db, num_turbines, num_sensors, samples_per_turbine, start_date='2025-01-01')
# COMMAND ----------
# Read the data from the Delta table to Spark DataFrame
spark_df = spark.read.table(f"{catalog}.{db}.turbine_data_train_{num_turbines}")
# Display the first few rows of the DataFrame
display(spark_df.filter("turbine_id='Turbine_1'"))
# COMMAND ----------
# MAGIC %md
# MAGIC ## 2. Train many ECOD models using Pandas UDF
# MAGIC
# MAGIC Pandas UDF is a feature in PySpark that combines the distributed processing power of Spark with the data manipulation capabilities of pandas It uses [Apache Arrow](https://spark.apache.org/docs/latest/api/python/user_guide/sql/arrow_pandas.html) to efficiently transfer data between JVM and Python processes, allowing for vectorized operations that can significantly improve performance compared to traditional row-at-a-time UDFs. The first step in utilizing Pandas UDF is to define a function.
# COMMAND ----------
# Define the Pandas UDF for training ECOD models for each turbine
def train_ecod_model(turbine_pdf: pd.DataFrame) -> pd.DataFrame:
from pyod.models.ecod import ECOD # Import the ECOD model from PyOD
import pickle # Import pickle for model serialization
# Extract the turbine ID
turbine_id = turbine_pdf['turbine_id'].iloc[0]
# Identify feature columns by excluding non-feature columns
feature_columns = turbine_pdf.columns.drop(['turbine_id', 'timestamp'])
# Prepare training data by selecting feature columns and filling missing values with 0
X_train = turbine_pdf[feature_columns].fillna(0)
# Count the number of records used for training
n_used = turbine_pdf.shape[0]
# Initialize and train the ECOD model
clf = ECOD(contamination=0.1, n_jobs=1)
clf.fit(X_train)
# Serialize the trained model using base64 encoding
model_encoder = urlsafe_b64encode(pickle.dumps(clf)).decode("utf-8")
# Create a return DataFrame with turbine ID, number of records used, and encoded model
returnDF = pd.DataFrame(
[[turbine_id, n_used, model_encoder]],
columns=["turbine_id", "n_used", "encode_model"]
)
# Return the result as a DataFrame
return returnDF
# COMMAND ----------
# MAGIC %md
# MAGIC Next, we define the output schema for the Pandas UDF.
# COMMAND ----------
schema = StructType([
StructField('turbine_id', StringType(), True),
StructField('n_used', IntegerType(), True),
StructField('encode_model', StringType(), True)
])
# COMMAND ----------
# MAGIC %md
# MAGIC Finally, we execute the `applyInPandas` method using the previously defined function and schema. The output of this operation is then written to a Delta table. Note that we are adding the current timestamp to the dataframe. This is to distinguish the latest version of the models with their previous ones.
# MAGIC
# MAGIC Databricks recommends [MLflow](https://www.databricks.com/product/managed-mlflow) as the best practice for tracking models and experiment runs. However, at the scale of this exercise, logging and tracking thousands of runs in a short time can be challenging due to resource [limitations](https://docs.databricks.com/en/resources/limits.html) on the MLflow Tracking Server. To address this, we are using a Delta table to track runs and models instead. That said, we will still be logging aggregated information to mlflow.
# COMMAND ----------
from pyspark.sql.functions import current_timestamp
with mlflow.start_run(run_name="ECOD_models_batch_training") as run:
# Group the data by turbine_id and train models using applyInPandas
model_df = spark_df.groupBy('turbine_id').applyInPandas(train_ecod_model, schema=schema)
# Write the output of applyInPandas to a delta table
(
model_df
.withColumn("created_at", current_timestamp())
.write.mode("overwrite")
.saveAsTable(f"{catalog}.{db}.models")
)
mlflow.log_param("contamination", 0.1)
mlflow.log_param("source", f"{catalog}.{db}.turbine_data_train_{num_turbines}")
mlflow.log_param("target", f"{catalog}.{db}.models")
# COMMAND ----------
# MAGIC %md
# MAGIC Let's take a peek at the output table. The `n_used` column shows the number of samples used to train each model, while the `encode_model` column contains the binary representation of each model, which can be easily unpickled and used for inference (as will be demonstrated shortly).
# COMMAND ----------
display(spark.sql(f"""
SELECT turbine_id, n_used, LEFT(encode_model, 20) AS encode_model, created_at FROM {catalog}.{db}.models LIMIT 10
"""))
# COMMAND ----------
# MAGIC %md
# MAGIC ## 3. Wrap up
# MAGIC
# MAGIC In this notebook, we demonstrated how ECOD can be used to fit thousands of models to a large dataset by utilizing Pandas UDFs with Spark.
# MAGIC
# MAGIC To execute this notebook, we used a multi-node interactive cluster consisting of 8 workers, each equipped with 4 cores and 32 GB of memory. The setup corresponds to [r8g.xlarge](https://www.databricks.com/product/pricing/product-pricing/instance-types) (memory optimized) instances on AWS (27 DBU/h) or [Standard_E4d_v4](https://www.databricks.com/product/pricing/product-pricing/instance-types) instances on Azure (18 DBU/h). Training individual models for 1,440 time points across 10,000 turbines with 100 sensors took approximately 2 minutes.
# MAGIC
# MAGIC An efficient implementation of ECOD combined with Pandas UDF allows these [embarrasigly parallelizable](https://en.wikipedia.org/wiki/Embarrassingly_parallel) operations to scale proportionally with the size of the cluster: i.e., number of cores.
# COMMAND ----------
# MAGIC %md
# MAGIC © 2024 Databricks, Inc. All rights reserved. The source in this notebook is provided subject to the Databricks License. All included or referenced third party libraries and dataset are subject to the licenses set forth below.
# MAGIC
# MAGIC | library / datas | description | license | source |
# MAGIC |----------------------------------------|-------------------------|------------|-----------------------------------------------------|
# MAGIC | pyod | A Comprehensive and Scalable Python Library for Outlier Detection (Anomaly Detection) | BSD License | https://pypi.org/project/pyod/