Skip to content

Commit 817d1d7

Browse files
Yaohua628cloud-fan
authored andcommitted
[SPARK-37769][SQL][FOLLOWUP] Filtering files if metadata columns are present in the data filter
### What changes were proposed in this pull request? Follow-up PR of apache#34575. Filtering files if metadata columns are present in the data filter. ### Why are the changes needed? Performance improvements. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing UTs and a new UT. Closes apache#35055 from Yaohua628/spark-37769. Authored-by: yaohua <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 61abae3 commit 817d1d7

File tree

4 files changed

+110
-17
lines changed

4 files changed

+110
-17
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import org.apache.spark.sql.errors.QueryExecutionErrors
3030
import org.apache.spark.sql.internal.SQLConf
3131
import org.apache.spark.sql.sources.Filter
3232
import org.apache.spark.sql.types.{DataType, LongType, StringType, StructField, StructType, TimestampType}
33+
import org.apache.spark.unsafe.types.UTF8String
3334

3435

3536
/**
@@ -192,6 +193,36 @@ object FileFormat {
192193

193194
// create a file metadata struct col
194195
def createFileMetadataCol: AttributeReference = MetadataAttribute(METADATA_NAME, METADATA_STRUCT)
196+
197+
// create an internal row given required metadata fields and file information
198+
def createMetadataInternalRow(
199+
fieldNames: Seq[String],
200+
filePath: Path,
201+
fileSize: Long,
202+
fileModificationTime: Long): InternalRow =
203+
updateMetadataInternalRow(new GenericInternalRow(fieldNames.length), fieldNames,
204+
filePath, fileSize, fileModificationTime)
205+
206+
// update an internal row given required metadata fields and file information
207+
def updateMetadataInternalRow(
208+
row: InternalRow,
209+
fieldNames: Seq[String],
210+
filePath: Path,
211+
fileSize: Long,
212+
fileModificationTime: Long): InternalRow = {
213+
fieldNames.zipWithIndex.foreach { case (name, i) =>
214+
name match {
215+
case FILE_PATH => row.update(i, UTF8String.fromString(filePath.toString))
216+
case FILE_NAME => row.update(i, UTF8String.fromString(filePath.getName))
217+
case FILE_SIZE => row.update(i, fileSize)
218+
case FILE_MODIFICATION_TIME =>
219+
// the modificationTime from the file is in millisecond,
220+
// while internally, the TimestampType `file_modification_time` is stored in microsecond
221+
row.update(i, fileModificationTime * 1000L)
222+
}
223+
}
224+
row
225+
}
195226
}
196227

197228
/**

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ import org.apache.spark.sql.execution.datasources.FileFormat._
3434
import org.apache.spark.sql.execution.vectorized.{OnHeapColumnVector, WritableColumnVector}
3535
import org.apache.spark.sql.types.{LongType, StringType, StructType}
3636
import org.apache.spark.sql.vectorized.ColumnarBatch
37-
import org.apache.spark.unsafe.types.UTF8String
3837
import org.apache.spark.util.NextIterator
3938

4039
/**
@@ -136,18 +135,8 @@ class FileScanRDD(
136135
*/
137136
private def updateMetadataRow(): Unit = {
138137
if (metadataColumns.nonEmpty && currentFile != null) {
139-
val path = new Path(currentFile.filePath)
140-
metadataColumns.zipWithIndex.foreach { case (attr, i) =>
141-
attr.name match {
142-
case FILE_PATH => metadataRow.update(i, UTF8String.fromString(path.toString))
143-
case FILE_NAME => metadataRow.update(i, UTF8String.fromString(path.getName))
144-
case FILE_SIZE => metadataRow.update(i, currentFile.fileSize)
145-
case FILE_MODIFICATION_TIME =>
146-
// the modificationTime from the file is in millisecond,
147-
// while internally, the TimestampType is stored in microsecond
148-
metadataRow.update(i, currentFile.modificationTime * 1000L)
149-
}
150-
}
138+
updateMetadataInternalRow(metadataRow, metadataColumns.map(_.name),
139+
new Path(currentFile.filePath), currentFile.fileSize, currentFile.modificationTime)
151140
}
152141
}
153142

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import org.apache.spark.sql.SparkSession
2727
import org.apache.spark.sql.catalyst.{expressions, InternalRow}
2828
import org.apache.spark.sql.catalyst.expressions._
2929
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
30+
import org.apache.spark.sql.execution.datasources.FileFormat.createMetadataInternalRow
3031
import org.apache.spark.sql.types.StructType
3132

3233
/**
@@ -71,8 +72,37 @@ abstract class PartitioningAwareFileIndex(
7172
def isNonEmptyFile(f: FileStatus): Boolean = {
7273
isDataPath(f.getPath) && f.getLen > 0
7374
}
75+
76+
// retrieve the file metadata filters and reduce to a final filter expression
77+
val fileMetadataFilterOpt = dataFilters.filter(_.references.forall {
78+
case MetadataAttribute(_) => true
79+
case _ => false
80+
}).reduceOption(expressions.And)
81+
82+
// - create a bound references for filters: put the metadata struct at 0 position for each file
83+
// - retrieve the final metadata struct (could be pruned) from filters
84+
val boundedFilterMetadataStructOpt = fileMetadataFilterOpt.map { fileMetadataFilter =>
85+
val metadataStruct = fileMetadataFilter.references.head.dataType
86+
val boundedFilter = Predicate.createInterpreted(fileMetadataFilter.transform {
87+
case _: AttributeReference => BoundReference(0, metadataStruct, nullable = true)
88+
})
89+
(boundedFilter, metadataStruct)
90+
}
91+
92+
def matchFileMetadataPredicate(f: FileStatus): Boolean = {
93+
// use option.forall, so if there is no filter no metadata struct, return true
94+
boundedFilterMetadataStructOpt.forall { case (boundedFilter, metadataStruct) =>
95+
val row = InternalRow.fromSeq(Seq(
96+
createMetadataInternalRow(metadataStruct.asInstanceOf[StructType].names,
97+
f.getPath, f.getLen, f.getModificationTime)
98+
))
99+
boundedFilter.eval(row)
100+
}
101+
}
102+
74103
val selectedPartitions = if (partitionSpec().partitionColumns.isEmpty) {
75-
PartitionDirectory(InternalRow.empty, allFiles().filter(isNonEmptyFile)) :: Nil
104+
PartitionDirectory(InternalRow.empty, allFiles()
105+
.filter(f => isNonEmptyFile(f) && matchFileMetadataPredicate(f))) :: Nil
76106
} else {
77107
if (recursiveFileLookup) {
78108
throw new IllegalArgumentException(
@@ -83,7 +113,8 @@ abstract class PartitioningAwareFileIndex(
83113
val files: Seq[FileStatus] = leafDirToChildrenFiles.get(path) match {
84114
case Some(existingDir) =>
85115
// Directory has children files in it, return them
86-
existingDir.filter(f => matchPathPattern(f) && isNonEmptyFile(f))
116+
existingDir.filter(f => matchPathPattern(f) && isNonEmptyFile(f) &&
117+
matchFileMetadataPredicate(f))
87118

88119
case None =>
89120
// Directory does not exist, or has no children files

sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -279,16 +279,58 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession {
279279
}
280280

281281
metadataColumnsTest("filter", schema) { (df, f0, _) =>
282+
val filteredDF = df.select("name", "age", METADATA_FILE_NAME)
283+
.where(Column(METADATA_FILE_NAME) === f0(METADATA_FILE_NAME))
284+
285+
// check the filtered file
286+
val partitions = filteredDF.queryExecution.sparkPlan.collectFirst {
287+
case p: FileSourceScanExec => p.selectedPartitions
288+
}.get
289+
290+
assert(partitions.length == 1) // 1 partition
291+
assert(partitions.head.files.length == 1) // 1 file in that partition
292+
assert(partitions.head.files.head.getPath.toString == f0(METADATA_FILE_PATH)) // the file is f0
293+
294+
// check result
282295
checkAnswer(
283-
df.select("name", "age", METADATA_FILE_NAME)
284-
.where(Column(METADATA_FILE_NAME) === f0(METADATA_FILE_NAME)),
296+
filteredDF,
285297
Seq(
286298
// _file_name == f0's name, so we will only have 1 row
287299
Row("jack", 24, f0(METADATA_FILE_NAME))
288300
)
289301
)
290302
}
291303

304+
metadataColumnsTest("filter on metadata and user data", schema) { (df, _, f1) =>
305+
306+
val filteredDF = df.select("name", "age", "info",
307+
METADATA_FILE_NAME, METADATA_FILE_PATH,
308+
METADATA_FILE_SIZE, METADATA_FILE_MODIFICATION_TIME)
309+
// mix metadata column + user column
310+
.where(Column(METADATA_FILE_NAME) === f1(METADATA_FILE_NAME) and Column("name") === "lily")
311+
// only metadata columns
312+
.where(Column(METADATA_FILE_PATH) === f1(METADATA_FILE_PATH))
313+
// only user column
314+
.where("age == 31")
315+
316+
// check the filtered file
317+
val partitions = filteredDF.queryExecution.sparkPlan.collectFirst {
318+
case p: FileSourceScanExec => p.selectedPartitions
319+
}.get
320+
321+
assert(partitions.length == 1) // 1 partition
322+
assert(partitions.head.files.length == 1) // 1 file in that partition
323+
assert(partitions.head.files.head.getPath.toString == f1(METADATA_FILE_PATH)) // the file is f1
324+
325+
// check result
326+
checkAnswer(
327+
filteredDF,
328+
Seq(Row("lily", 31, Row(54321L, "ucb"),
329+
f1(METADATA_FILE_NAME), f1(METADATA_FILE_PATH),
330+
f1(METADATA_FILE_SIZE), f1(METADATA_FILE_MODIFICATION_TIME)))
331+
)
332+
}
333+
292334
Seq(true, false).foreach { caseSensitive =>
293335
metadataColumnsTest(s"upper/lower case when case " +
294336
s"sensitive is $caseSensitive", schemaWithNameConflicts) { (df, f0, f1) =>

0 commit comments

Comments
 (0)