1
1
# Databricks notebook source
2
- import numpy as np
3
- import pandas as pd
2
+ from pyspark .mllib .stat import Statistics
3
+ from pyspark .sql .functions import *
4
+ import pyspark .pandas as ps
4
5
import matplotlib .pyplot as plt
5
6
6
- from pyspark .ml .clustering import KMeans
7
- from pyspark .ml .evaluation import ClusteringEvaluator
8
- from pyspark .ml .feature import VectorAssembler
9
- from pyspark .ml .feature import StandardScaler
7
+ % sql
8
+ Use deltabase
10
9
11
- import pyspark .pandas as ps
10
+ % sql
11
+ set spark .databricks .delta .properties .defaults .autoOptimize .optimizeWrite = true ;
12
+ set spark .databricks .delta .properties .defaults .autoOptimize .autoCompact = true ;
13
+ set spark .sql .cbo .enabled = true ;
12
14
13
- # COMMAND ----------
14
-
15
- data_2005 = spark .sql ("SELECT AAPL_adjClose, \
16
- AA_adjClose, \
17
- AAL_adjClose, \
18
- AAP_adjClose, \
19
- A_adjClose, \
20
- ABBV_adjClose, \
21
- ABC_adjClose, \
22
- ABMD_adjClose, \
23
- ABT_adjClose, \
24
- ACN_adjClose, \
25
- ACV_adjClose \
26
- FROM deltabase.aapl_30min_delta \
27
- FULL JOIN deltabase.aa_30min_delta ON AAPL_dateTime = AA_dateTime \
28
- FULL JOIN deltabase.aal_30min_delta ON AAPL_dateTime = AAL_dateTime \
29
- FULL JOIN deltabase.aap_30min_delta ON AAPL_dateTime = AAP_dateTime \
30
- FULL JOIN deltabase.a_30min_delta ON AAPL_dateTime = A_dateTime \
31
- FULL JOIN deltabase.abbv_30min_delta ON AAPL_dateTime = ABBV_dateTime \
32
- FULL JOIN deltabase.abc_30min_delta ON AAPL_dateTime = ABC_dateTime \
33
- FULL JOIN deltabase.abmd_30min_delta ON AAPL_dateTime = ABMD_dateTime \
34
- FULL JOIN deltabase.abt_30min_delta ON AAPL_dateTime = ABT_dateTime \
35
- FULL JOIN deltabase.acn_30min_delta ON AAPL_dateTime = ACN_dateTime \
36
- FULL JOIN deltabase.acv_30min_delta ON AAPL_dateTime = ACV_dateTime \
37
- WHERE DATE_FORMAT(AAPL_dateTime,'HHmm') between '0930' and '1630' and DATE_FORMAT(AAPL_dateTime,'yyyy-MM-dd') between '2021-01-03' and '2022-12-31'\
38
- ORDER BY AAPL_dateTime asc \
39
- " )
40
-
41
- display (data_2005 )
15
+ # Enable Arrow-based columnar data transfers
16
+ spark .conf .set ("spark.sql.execution.arrow.pyspark.enabled" , "true" )
17
+ #spark.conf.set("spark.sql.execution.arrow.pyspark.fallback.enabled", "true");
18
+ ps .set_option ('compute.max_rows' , 10000000 )
42
19
43
20
# COMMAND ----------
44
21
45
- data_2005 = spark .sql ("SELECT AAPL_adjClose, \
22
+ df = spark .sql ("SELECT AAPL_adjClose, \
46
23
AA_adjClose, \
47
24
AAL_adjClose, \
48
25
AAP_adjClose, \
1346
1323
FULL JOIN deltabase.zbra_30min_delta ON AAPL_dateTime = ZBRA_dateTime \
1347
1324
FULL JOIN deltabase.zion_30min_delta ON AAPL_dateTime = ZION_dateTime \
1348
1325
FULL JOIN deltabase.zts_30min_delta ON AAPL_dateTime = ZTS_dateTime \
1349
- WHERE DATE_FORMAT(AAPL_dateTime,'HHmm') between '0930 ' and '1630' and DATE_FORMAT(AAPL_dateTime,'yyyy-MM-dd') between '2005-01-03' and '2005-12-31 '\
1326
+ WHERE DATE_FORMAT(AAPL_dateTime,'HHmm') =='1600 ' and DATE_FORMAT(AAPL_dateTime,'yyyy-MM-dd') between '2005-01-03' and '2022-04-29 '\
1350
1327
ORDER BY AAPL_dateTime asc \
1351
- " )
1328
+ " )
1352
1329
1353
1330
# COMMAND ----------
1354
1331
1355
- data_2005_dropna = data_2005 . dropna ()
1332
+ # Spark Kmeans Clustering
1356
1333
1357
- #assemble = VectorAssembler(inputCols =["AAPL_adjClose","AA_adjClose","AAL_adjClose","AAP_adjClose","A_adjClose","ABBV_adjClose","ABC_adjClose","ABMD_adjClose","ABT_adjClose","ACN_adjClose","ACV_adjClose"], outputCol = 'features')
1334
+ import mlflow
1335
+ from pyspark .ml .feature import VectorAssembler
1336
+ from pyspark .ml .feature import Normalizer
1337
+ from pyspark .ml .feature import Imputer
1338
+ from pyspark .ml .feature import PCA
1339
+ from pyspark .ml .clustering import KMeans
1340
+ from pyspark .ml .clustering import BisectingKMeans
1341
+ from pyspark .ml .evaluation import ClusteringEvaluator
1342
+ from pyspark .sql import DataFrame
1343
+ import pyspark .pandas as ps
1344
+ import pandas as pd
1345
+ from sklearn .impute import SimpleImputer
1346
+ from databricks import feature_store
1347
+ from databricks .feature_store import feature_table , FeatureLookup
1358
1348
1359
- assemble = VectorAssembler (inputCols = data_2005_dropna .columns , outputCol = 'features' )
1349
+ # Calculate percent change
1350
+ df_ps_pct = df .pandas_api ().pct_change ().to_spark ()
1360
1351
1361
- assembled_data = assemble .transform (data_2005_dropna )
1352
+ #Create Delta Lake for saving percent changed data.
1353
+ df_ps_pct .write .format ("delta" ).mode ("overwrite" ).option ("overwriteSchema" ,"True" ).saveAsTable ('sp500_components_pctChanged_delta' )
1362
1354
1363
- assembled_data .show ()
1355
+ # Option 1: Drop rows with NaN values
1356
+ #df_pd_pct_FilteredNA = df_pd_pct.dropna()
1364
1357
1365
- # COMMAND ----------
1358
+ # Option 2: Use imputer with NaN values
1359
+ imputer = Imputer (strategy = 'mean' )
1360
+ imputer .setInputCols (df_pd_pct .columns )
1366
1361
1367
- scale = StandardScaler (inputCol = 'features' ,outputCol = 'standardized' )
1362
+ output_col = []
1363
+ for col in df_pd_pct .columns :
1364
+ output_col .append ("out_" + col )
1365
+ print (output_col )
1368
1366
1369
- data_scale = scale . fit ( assembled_data )
1367
+ imputer . setOutputCols ( output_col )
1370
1368
1371
- data_scale_output = data_scale .transform (assembled_data )
1369
+ model = imputer .fit (df_pd_pct )
1370
+ df_spark_pct_FilteredNA = model .transform (df_pd_pct )
1372
1371
1373
- data_scale_output .show (2 )
1372
+ #Create Delta Lake for saving imputed percent changed data.
1373
+ df_spark_pct_FilteredNA .write .format ("delta" ).mode ("overwrite" ).option ("overwriteSchema" ,"True" ).saveAsTable ('sp500_components_imputed_pctChanged_delta' )
1374
1374
1375
- # COMMAND ----------
1375
+ df_spark_pct_FilteredNA . pandas_api (). transpose ()
1376
1376
1377
- silhouette_score = []
1378
- evaluator = ClusteringEvaluator (predictionCol = 'prediction' , featuresCol = 'standardized' , \
1379
- metricName = 'silhouette' , distanceMeasure = 'squaredEuclidean' )
1377
+ #Start mlflow
1378
+ mlflow .autolog (exclusive = False )
1380
1379
1381
- kmean_range = range (7 ,20 )
1380
+ with mlflow .start_run () as run :
1381
+ # Define vector assembler
1382
+ #assembler = VectorAssembler(inputCols=df_spark_pct_FilteredNA.columns, outputCol="features")
1383
+ #assembled_df = assembler.transform(df_spark_pct_FilteredNA)
1384
+ assembled_df = VectorAssembler (inputCols = df_spark_pct_FilteredNA .columns , outputCol = "features" ).transform (df_spark_pct_FilteredNA )
1382
1385
1383
- for i in kmean_range :
1384
-
1385
- KMeans_algo = KMeans (featuresCol = 'standardized' , k = i )
1386
-
1387
- KMeans_fit = KMeans_algo .fit (data_scale_output )
1388
-
1389
- output = KMeans_fit .transform (data_scale_output )
1390
-
1391
- score = evaluator .evaluate (output )
1386
+ # Define normalizer
1387
+ #normalizer = Normalizer(inputCol="features", outputCol="normalized_features")
1388
+ #normalized_df = normalizer.transform(assembled_df)
1389
+ normalized_df = Normalizer (inputCol = "features" , outputCol = "normalized_features" ).transform (assembled_df )
1392
1390
1393
- silhouette_score .append (score )
1394
-
1395
- print ("Silhouette Score:" ,score )
1391
+ # Visualize the results
1392
+ pca = PCA (k = 2 , inputCol = "normalized_features" , outputCol = "reduced_features" )
1393
+ #model = pca.fit(normalized_df)
1394
+ #reduced_data = model.transform(normalized_df).select("reduced_features")
1395
+ reduced_data = pca .fit (normalized_df ).transform (normalized_df ).select ("reduced_features" )
1396
1396
1397
- # COMMAND ----------
1397
+ # Run K-means on reduced data
1398
+ kmeans = BisectingKMeans (k = 11 , featuresCol = "reduced_features" )
1399
+ model = kmeans .fit (reduced_data )
1400
+ prediction = model .transform (reduced_data )
1401
+ labels = prediction .select ("prediction" )
1402
+
1403
+ # Evaluate clustering by computing Silhouette score
1404
+ evaluator = ClusteringEvaluator (predictionCol = 'prediction' ,featuresCol = "reduced_features" )
1405
+ silhouette = evaluator .evaluate (prediction )
1406
+ print ("Silhouette with squared euclidean distance = " + str (silhouette ))
1407
+
1408
+ centers = model .clusterCenters ()
1409
+ print ("Cluster Centers: " )
1410
+ for center in centers :
1411
+ print (center )
1412
+
1413
+ # create DataFrame aligning labels & companies
1414
+ df_result = pd .DataFrame ({'labels' : labels .collect (), 'companies' : df_pd_pct .columns })
1415
+ df_result = spark .createDataFrame (df_result )
1398
1416
1399
- #Visualizing the silhouette scores in a plot
1417
+ # Display df sorted by cluster labels
1418
+ display (df_result .orderBy ("labels" ))
1400
1419
1401
- fig , ax = plt .subplots (1 ,1 , figsize = (8 ,6 ))
1402
- ax .plot (kmean_range ,silhouette_score )
1403
- ax .set_xlabel ('k' )
1404
- ax .set_ylabel ('cost' )
1420
+ df_result .write .format ("delta" ).mode ("overwrite" ).option ("overwriteSchema" ,"True" ).saveAsTable ('sp500_components_features' )
0 commit comments