Skip to content

Commit

Permalink
[SPARK-50854][SS] Make path fully qualified before passing it to File…
Browse files Browse the repository at this point in the history
…StreamSink
  • Loading branch information
vrozov committed Jan 24, 2025
1 parent fab0cca commit 9bd12dc
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2939,4 +2939,8 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase with ExecutionE
)
)
}

def notAbsolutePathError(path: Path): SparkException = {
SparkException.internalError(s"$path is not absolute path.")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,11 @@ case class DataSource(
private def newHadoopConfiguration(): Configuration =
sparkSession.sessionState.newHadoopConfWithOptions(options)

private def makeQualified(path: Path): Path = {
val fs = path.getFileSystem(newHadoopConfiguration())
path.makeQualified(fs.getUri, fs.getWorkingDirectory)
}

lazy val sourceInfo: SourceInfo = sourceSchema()
private val caseInsensitiveOptions = CaseInsensitiveMap(options)
private val equality = sparkSession.sessionState.conf.resolver
Expand Down Expand Up @@ -317,9 +322,9 @@ case class DataSource(
s.createSink(sparkSession.sqlContext, caseInsensitiveOptions, partitionColumns, outputMode)

case fileFormat: FileFormat =>
val path = caseInsensitiveOptions.getOrElse("path", {
val path = makeQualified(new Path(caseInsensitiveOptions.getOrElse("path", {
throw QueryExecutionErrors.dataPathNotSpecifiedError()
})
}))).toString
if (outputMode != OutputMode.Append) {
throw QueryCompilationErrors.dataSourceOutputModeUnsupportedError(className, outputMode)
}
Expand Down Expand Up @@ -454,11 +459,11 @@ case class DataSource(
// 3. It's OK that the output path doesn't exist yet;
val allPaths = paths ++ caseInsensitiveOptions.get("path")
val outputPath = if (allPaths.length == 1) {
val path = new Path(allPaths.head)
val fs = path.getFileSystem(newHadoopConfiguration())
path.makeQualified(fs.getUri, fs.getWorkingDirectory)
} else {
makeQualified(new Path(allPaths.head))
} else if (allPaths.length > 1) {
throw QueryExecutionErrors.multiplePathsSpecifiedError(allPaths)
} else {
throw QueryExecutionErrors.dataPathNotSpecifiedError()
}

val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ class FileStreamSink(

private val hadoopConf = sparkSession.sessionState.newHadoopConf()
private val basePath = new Path(path)
if (!basePath.isAbsolute) {
throw QueryExecutionErrors.notAbsolutePathError(basePath)
}
private val logPath = getMetadataLogPath(basePath.getFileSystem(hadoopConf), basePath,
sparkSession.sessionState.conf)
private val retention = options.get("retention").map(Utils.timeStringAsMs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1101,7 +1101,7 @@ class AsyncProgressTrackingMicroBatchExecutionSuite
.start("/tmp")
}

e.getMessage should equal("Sink FileSink[/tmp] does not support async progress tracking")
e.getMessage should equal("Sink FileSink[file:/tmp] does not support async progress tracking")
}

test("with log purging") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.streaming

import java.io.{File, IOException}
import java.nio.file.Files
import java.nio.file.{Files, Paths}
import java.util.Locale

import scala.collection.mutable.ArrayBuffer
Expand All @@ -27,7 +27,7 @@ import scala.jdk.CollectionConverters._
import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem}
import org.apache.hadoop.mapreduce.JobContext

import org.apache.spark.SparkConf
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.paths.SparkPath
Expand All @@ -36,6 +36,7 @@ import org.apache.spark.sql.{AnalysisException, DataFrame}
import org.apache.spark.sql.catalyst.util.stringToFile
import org.apache.spark.sql.execution.DataSourceScanExec
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, DataSourceV2Relation, FileScan, FileTable}
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.functions._
Expand Down Expand Up @@ -292,35 +293,48 @@ abstract class FileStreamSinkSuite extends StreamTest {
test("parquet") {
testFormat(None) // should not throw error as default format parquet when not specified
testFormat(Some("parquet"))
testFormat(None, relativizeOutputPath = true)
testFormat(Some("parquet"), relativizeOutputPath = true)
}

test("orc") {
testFormat(Some("orc"))
testFormat(Some("orc"), relativizeOutputPath = true)
}

test("text") {
testFormat(Some("text"))
testFormat(Some("text"), relativizeOutputPath = true)
}

test("json") {
testFormat(Some("json"))
testFormat(Some("json"), relativizeOutputPath = true)
}

def testFormat(format: Option[String]): Unit = {
val inputData = MemoryStream[Int]
def testFormat(format: Option[String], relativizeOutputPath: Boolean = false): Unit = {
val inputData = MemoryStream[String] // text format only supports String
val ds = inputData.toDS()

val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath
val tempDir = Utils.createTempDir(namePrefix = "stream.output");
val outputPath = if (relativizeOutputPath) {
Paths.get("").toAbsolutePath.relativize(tempDir.toPath).toString
} else {
tempDir.getCanonicalPath
}
val checkpointDir = Utils.createTempDir(namePrefix = "stream.checkpoint").getCanonicalPath

var query: StreamingQuery = null
val writer = ds.toDF("value").writeStream
.option("checkpointLocation", checkpointDir)
if (format.nonEmpty) {
writer.format(format.get)
}

var query: StreamingQuery = null
try {
val writer = ds.map(i => (i, i * 1000)).toDF("id", "value").writeStream
if (format.nonEmpty) {
writer.format(format.get)
}
query = writer.option("checkpointLocation", checkpointDir).start(outputDir)
query = writer.start(outputPath)
inputData.addData("data")
query.processAllAvailable()
} finally {
if (query != null) {
query.stop()
Expand Down Expand Up @@ -664,6 +678,16 @@ abstract class FileStreamSinkSuite extends StreamTest {
s" $path."))
}
}

test("SPARK-50854: Make path fully qualified before passing it to FileStreamSink") {
val fileFormat = new ParquetFileFormat() // any valid FileFormat
val partitionColumnNames = Seq.empty[String]
val options = Map.empty[String, String]
val exception = intercept[SparkException] {
new FileStreamSink(spark, "test.parquet", fileFormat, partitionColumnNames, options)
}
assert(exception.getMessage.contains("is not absolute path."))
}
}

object PendingCommitFilesTrackingManifestFileCommitProtocol {
Expand Down

0 comments on commit 9bd12dc

Please sign in to comment.