-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathmlp.py
66 lines (47 loc) · 2.56 KB
/
mlp.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.types import DoubleType, IntegerType
if __name__ == "__main__":
sc = SparkContext('local', 'mlp')
sqlContext = SQLContext(sc)
spark = SparkSession\
.builder\
.appName("MLPClassifier")\
.getOrCreate()
#read in csv as dataframe
dataset = sqlContext.read.format('com.databricks.spark.csv').options(header='true').load('./data/titanic.csv')
dataset = dataset.drop('PassengerId','Name','Ticket','Cabin')
#set column types
dataset = dataset.withColumn("Survived", dataset["Survived"].cast(IntegerType()))
dataset = dataset.withColumn("Pclass", dataset["Pclass"].cast(IntegerType()))
dataset = dataset.withColumn("Age", dataset["Age"].cast(DoubleType()))
dataset = dataset.withColumn("SibSp", dataset["SibSp"].cast(IntegerType()))
dataset = dataset.withColumn("Parch", dataset["Parch"].cast(IntegerType()))
dataset = dataset.withColumn("Fare", dataset["Fare"].cast(DoubleType()))
#fill NaN
avg_age = round(dataset.groupBy().avg("age").collect()[0][0],2)
dataset = dataset.na.fill({'Age': avg_age})
dataset = dataset.na.drop()
#map categorical data
indexer = StringIndexer(inputCol="Sex", outputCol="SexInd")
dataset = indexer.fit(dataset).transform(dataset)
indexer = StringIndexer(inputCol="Embarked", outputCol="EmbarkedInd")
dataset = indexer.fit(dataset).transform(dataset)
#assemble features
assembler = VectorAssembler(
inputCols=["Age","Pclass","SexInd","SibSp","Parch","Fare","EmbarkedInd"],
outputCol="features")
dataset = assembler.transform(dataset)
(trainingData, testData) = dataset.randomSplit([0.8, 0.2])
#MLP
layers = [7, 8, 4, 2] #input: 7 features; output: 2 classes
mlp = MultilayerPerceptronClassifier(maxIter=100, layers=layers, labelCol="Survived", featuresCol="features", blockSize=128, seed=0)
model = mlp.fit(trainingData)
result = model.transform(testData)
prediction_label = result.select("prediction", "Survived")
evaluator = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction", metricName="accuracy")
print "MLP test accuracy: " + str(evaluator.evaluate(prediction_label))