-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathrandom_forest.py
69 lines (51 loc) · 2.43 KB
/
random_forest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import numpy as np
import pandas as pd
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession
from pyspark.sql import Row
from sklearn.preprocessing import LabelEncoder
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
if __name__ == "__main__":
sc = SparkContext('local', 'dataframe')
sqlContext = SQLContext(sc)
spark = SparkSession\
.builder\
.appName("RandomForestClassifier")\
.getOrCreate()
#dataset = sc.textFile("./data/titanic.csv", 1)
#dataset = sqlContext.read.format('com.databricks.spark.csv').options(header='true').load('./data/titanic.csv')
dataset = pd.read_csv("./data/titanic.csv")
dataset = dataset.drop(['PassengerId','Name','Ticket','Cabin'], axis=1)
#map categorical data
for col in dataset.columns:
if dataset[col].dtype == 'object':
lbl = LabelEncoder()
lbl.fit(dataset[col])
dataset[col] = lbl.transform(dataset[col])
#fill NaN
median_age = dataset['Age'].dropna().median()
dataset['Age'] = dataset['Age'].fillna(median_age)
#dataset.isnull().sum()
rdd_data = sc.parallelize(dataset.values)
rdd_row = rdd_data.map(lambda p: Row(survived=int(p[0]),pclass=int(p[1]),sex=int(p[2]),age=float(p[3]),sibsp=int(p[4]),parch=int(p[5]),fare=float(p[6]),embarked=int(p[7])))
titanic = spark.createDataFrame(rdd_row)
assembler = VectorAssembler(
inputCols=["age","embarked","fare","parch","pclass","sex","sibsp"],
outputCol="features")
titanic = assembler.transform(titanic)
(trainingData, testData) = titanic.randomSplit([0.8, 0.2])
#random forest classifier
rfc = RandomForestClassifier(numTrees=100, maxDepth=6, labelCol="survived", featuresCol="features", seed=0)
model = rfc.fit(trainingData)
#feature importances
model.featureImportances
#predictions
predictions = model.transform(testData)
predictions.select("survived", "probability", "prediction").show(truncate=False)
#compute test error
evaluator = MulticlassClassificationEvaluator(labelCol="survived", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print "RFC accuracy = %2.4f" % accuracy