Description
I write my python code with Zeppelin 0.7.3 and Spark 2.3.0 on an EMR (emr-5.13.0) cluster to use SageMaker's XGBoost algorithm. The input data is a csv file. The first 3 lines of the file are (the first column is 0 or 1 for target class, and there is no header line):
0,9.6071,2,1,1,2,1,1,1,1,3,1,0,0,0,0,3,0,0,3,0,0,3,0,2,1,1,1 0,2.7296,3,1,1,1,1,1,0,0,8,1,0,0,0,0,3,0,0,3,0,0,3,0,1,1,1,1 0,10.3326,1,0,1,2,1,1,0,0,4,1,1,0,1,0,3,0,0,3,0,0,3,0,0,3,0,0
I imported as the example does:
%pyspark from pyspark import SparkContext, SparkConf from sagemaker_pyspark import IAMRole, classpath_jars from sagemaker_pyspark.algorithms import XGBoostSageMakerEstimator
I initialize the estimator:
%pyspark xgboost_estimator = XGBoostSageMakerEstimator( trainingInstanceType="ml.m3.xlarge", trainingInstanceCount=1, endpointInstanceType="ml.m3.xlarge", endpointInitialInstanceCount=1) xgboost_estimator.setObjective('multi:softprob') xgboost_estimator.setNumRound(25) xgboost_estimator.setNumClasses(2)
I read the csv file with:
training_data = spark.read.csv("s3://poc.sagemaker.myfile/myfile.csv", sep=",", header="false", inferSchema="true")
training_data.show() gives:
+---+-------+---+---+---+---+---+---+---+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+ |_c0| _c1|_c2|_c3|_c4|_c5|_c6|_c7|_c8|_c9|_c10|_c11|_c12|_c13|_c14|_c15|_c16|_c17|_c18|_c19|_c20|_c21|_c22|_c23|_c24|_c25|_c26|_c27| +---+-------+---+---+---+---+---+---+---+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+ | 0| 7.1732| 1| 0| 1| 2| 2| 2| 0| 0| 5| 1| 1| 0| 1| 0| 3| 0| 0| 3| 0| 0| 3| 0| 0| 3| 0| 0| | 0| 1.3087| 1| 0| 1| 2| 1| 1| 0| 0| 2| 1| 1| 0| 2| 0| 3| 0| 0| 3| 0| 0| 3| 0| 0| 3| 0| 0| | 0| 3.3539| 1| 0| 1| 2| 2| 1| 0| 0| 6| 1| 1| 0| 0| 0| 3| 0| 0| 3| 0| 0| 3| 0| 0| 3| 0| 0| | 0| 1.9767| 1| 0| 1| 1| 1| 1| 1| 1| 73| 1| 0| 0| 1| 0| 3| 0| 0| 3| 0| 1| 0| 1| 1| 0| 1| 1| | 0| 5.7194| 1| 0| 1| 2| 1| 1| 0| 0| 3| 1| 0| 0| 0| 0| 3| 0| 0| 3| 0| 0| 3| 0| 0| 3| 0| 0| | 0| 9.8398| 3| 1| 1| 2| 1| 1| 0| 0| 2| 1| 1| 0| 1| 0| 3| 0| 0| 3| 0| 2| 1| 1| 2| 1| 1| 1| | 0| 2.4942| 1| 0| 1| 2| 1| 1| 0| 0| 377| 1| 1| 0| 2| 0| 3| 0| 0| 3| 0| 0| 3| 0| 0| 3| 0| 0| | 0| 7.9179| 4| 1| 1| 2| 1| 1| 0| 0| 4| 1| 1| 0| 2| 0| 3| 0| 0| 3| 0| 2| 0| 1| 2| 1| 1| 1|
When I try to fit the xgboost model with:
xgboost_model = xgboost_estimator.fit(training_data)
The following exception returns:
Traceback (most recent call last): File "/tmp/zeppelin_pyspark-8068283221541252178.py", line 367, in <module> raise Exception(traceback.format_exc()) Exception: Traceback (most recent call last): File "/tmp/zeppelin_pyspark-8068283221541252178.py", line 360, in <module> exec(code, _zcUserQueryNameSpace) File "<stdin>", line 1, in <module> File "/usr/local/lib/python2.7/site-packages/sagemaker_pyspark/SageMakerEstimator.py", line 253, in fit return self._call_java("fit", dataset) File "/usr/local/lib/python2.7/site-packages/sagemaker_pyspark/wrapper.py", line 76, in _call_java java_value = super(SageMakerJavaWrapper, self)._call_java(name, *java_args) File "/usr/lib/spark/python/pyspark/ml/wrapper.py", line 51, in _call_java return _java2py(sc, m(*java_args)) File "/usr/lib/spark/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__ answer, self.gateway_client, self.target_id, self.name) File "/usr/lib/spark/python/pyspark/sql/utils.py", line 63, in deco return f(*a, **kw) File "/usr/lib/spark/python/lib/py4j-0.10.6-src.zip/py4j/protocol.py", line 320, in get_return_value format(target_id, ".", name), value) Py4JJavaError: An error occurred while calling o130.fit. : java.io.IOException: Illegal schema for libsvm data, schema=StructType(StructField(_c0,IntegerType,true), StructField(_c1,DoubleType,true), StructField(_c2,IntegerType,true), StructField(_c3,IntegerType,true), StructField(_c4,IntegerType,true), StructField(_c5,IntegerType,true), StructField(_c6,IntegerType,true), StructField(_c7,IntegerType,true), StructField(_c8,IntegerType,true), StructField(_c9,IntegerType,true), StructField(_c10,IntegerType,true), StructField(_c11,IntegerType,true), StructField(_c12,IntegerType,true), StructField(_c13,IntegerType,true), StructField(_c14,IntegerType,true), StructField(_c15,IntegerType,true), StructField(_c16,IntegerType,true), StructField(_c17,IntegerType,true), StructField(_c18,IntegerType,true), StructField(_c19,IntegerType,true), StructField(_c20,IntegerType,true), StructField(_c21,IntegerType,true), StructField(_c22,IntegerType,true), StructField(_c23,IntegerType,true), StructField(_c24,IntegerType,true), StructField(_c25,IntegerType,true), StructField(_c26,IntegerType,true), StructField(_c27,IntegerType,true)) at org.apache.spark.ml.source.libsvm.LibSVMFileFormat.verifySchema(LibSVMRelation.scala:86) at org.apache.spark.ml.source.libsvm.LibSVMFileFormat.prepareWrite(LibSVMRelation.scala:122) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:140) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:154) at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104) at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102) at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:654) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:654) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:654) at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:273) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:267) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:225) at com.amazonaws.services.sagemaker.sparksdk.internal.DataUploader.writeData(DataUploader.scala:111) at com.amazonaws.services.sagemaker.sparksdk.internal.DataUploader.uploadData(DataUploader.scala:90) at com.amazonaws.services.sagemaker.sparksdk.SageMakerEstimator.fit(SageMakerEstimator.scala:299) at com.amazonaws.services.sagemaker.sparksdk.SageMakerEstimator.fit(SageMakerEstimator.scala:175) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Thread.java:748)
Did I miss some steps so that the estimator use the libsvm libraries to process the csv input?