1
+ from pyspark .sql import *
2
+ from pyspark import SparkConf
3
+ from pyspark .sql import Row , Column , DataFrame as SDataFrame
4
+ from pyspark .sql .types import *
5
+ import pandas as pd
6
+ import os , sys
7
+ from collections import namedtuple as ntup
8
+
9
+ def p (msg ,o = None ): omsg = ": %s" % repr (o ) if o is not None else "" ; print ("%s%s\n " % (msg , omsg ))
10
+
11
+ def setupSpark ():
12
+ os .environ ["PYSPARK_SUBMIT_ARGS" ] = "pyspark-shell"
13
+ pd .set_option ('display.max_rows' , 20 )
14
+ pd .set_option ('display.max_colwidth' , - 1 )
15
+ pd .set_option ('display.max_columns' , None )
16
+ pd .set_option ('expand_frame_repr' , False )
17
+ spark = SparkSession .builder .appName ("pyspark_utils" ).master ("local" ).getOrCreate ()
18
+ return spark
19
+
20
+ # # Start the Apache Spark server
21
+ #
22
+ def getSpark ():
23
+ spark = spark if 'spark' in globals () else setupSpark ()
24
+ return spark
25
+
26
+ def describeWithNulls (df , doPrint = True ):
27
+ # df = pd.DataFrame({ 'a': [1,2,3], 'b': ['a','b','c'], 'c': [99.5,11.2, 433.1], 'd':[123,'abc',None]})
28
+ desc = df .describe () # Returns a DataFrame with stats in the row index
29
+ combo = pd .concat ([df .isna ().sum (),desc .T ],axis = 1 ).set_axis (['Nulls' ]+ list (desc .index ),axis = 1 ,inplace = False )
30
+ if doPrint :
31
+ p (combo .head (100 ))
32
+ return combo
33
+
34
+ # Read a Parquet File into Spark DataFRame and also create Pandas Dataframe
35
+ #
36
+ def toPandas (path , tname , sql = None , count = False ):
37
+ df = getSpark ().read .parquet (path )
38
+ if count : p (tname + ": count=" + str (df .count ()))
39
+ df .createOrReplaceTempView (tname )
40
+ if sql is not None :
41
+ df = getSpark ().sql (sql )
42
+ df .createOrReplaceTempView (tname )
43
+ pdf = df .toPandas ()
44
+ describeWithNulls (pdf )
45
+ return df ,pdf
46
+
47
+ # Run a Spark SQL and return pandas and spark dataframes
48
+ #
49
+ def sparkSql (sql , tname = None , count = True , describe = False ):
50
+ sdf = getSpark ().sql (sql )
51
+ if count and tname : p (tname + ": count=" + str (sdf .count ()))
52
+ if tname : sdf .createOrReplaceTempView (tname )
53
+ pdf = sdf .toPandas ()
54
+ with pd .option_context ('display.max_rows' , None , 'display.max_columns' , None ): # more options can be specified also
55
+ print (pdf )
56
+ if describe :
57
+ describeWithNulls (pdf )
58
+ return sdf ,pdf
59
+
60
+ # Read a CSV and create pandas and spark dataframes
61
+ #
62
+ def fromCsvSpark (path , tname , header = True , count = True , sql = None ):
63
+ p ('Reading from %s..' % path )
64
+ df = getSpark ().read .csv (path ,header = header )
65
+ if count : p (tname + ": count=" + str (df .count ()))
66
+ df .createOrReplaceTempView (tname )
67
+ if sql is not None :
68
+ df = getSpark ().sql (sql )
69
+ df .createOrReplaceTempView (tname )
70
+ pdf = df .toPandas ()
71
+ describeWithNulls (pdf )
72
+ return df ,pdf
73
+
74
+ # Return selected columns of pandas DF
75
+ #
76
+ def cols (df , cnames ):
77
+ cnames = cnames if isinstance (cnames ,list ) else [cnames ]
78
+ return df .loc [:,cnames ]
79
+
80
+ # Tokenization Spark UDF using comma delimiter
81
+ #
82
+ def tokenize (txt ):
83
+ def stripx (x ): return x .strip ()
84
+ if txt .find (',' ) < 0 : return txt
85
+ else :
86
+ toks = list (map (stripx , txt .split (',' )))
87
+ # return [toks[0], toks]
88
+ return toks
89
+
90
+ if __name__ == '__main__' :
91
+ sampleCode = """
92
+ from pyspark.sql.types import *
93
+ from pyspark.sql.functions import udf
94
+ tokenize_udf = udf(tokenize, ArrayType(StructType([StructField("tok", StringType(), False)])))
95
+ spark.udf.register("tokenize",tokenize_udf)
96
+ """
0 commit comments