Skip to content

[SPARK-52326][SQL] Add partitions related ExternalCatalogEvent and post them in corresponding operations #51030

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,10 @@ class ExternalCatalogWithListener(delegate: ExternalCatalog)
table: String,
parts: Seq[CatalogTablePartition],
ignoreIfExists: Boolean): Unit = {
val partSpecs = parts.map(_.spec)
postToAll(CreatePartitionsPreEvent(db, table, partSpecs))
delegate.createPartitions(db, table, parts, ignoreIfExists)
postToAll(CreatePartitionsEvent(db, table, partSpecs))
}

override def dropPartitions(
Expand All @@ -208,22 +211,29 @@ class ExternalCatalogWithListener(delegate: ExternalCatalog)
ignoreIfNotExists: Boolean,
purge: Boolean,
retainData: Boolean): Unit = {
postToAll(DropPartitionsPreEvent(db, table, partSpecs))
delegate.dropPartitions(db, table, partSpecs, ignoreIfNotExists, purge, retainData)
postToAll(DropPartitionsEvent(db, table, partSpecs))
}

override def renamePartitions(
db: String,
table: String,
specs: Seq[TablePartitionSpec],
newSpecs: Seq[TablePartitionSpec]): Unit = {
postToAll(RenamePartitionsPreEvent(db, table, specs, newSpecs))
delegate.renamePartitions(db, table, specs, newSpecs)
postToAll(RenamePartitionsEvent(db, table, specs, newSpecs))
}

override def alterPartitions(
db: String,
table: String,
parts: Seq[CatalogTablePartition]): Unit = {
val partSpecs = parts.map(_.spec)
postToAll(AlterPartitionsPreEvent(db, table, partSpecs))
delegate.alterPartitions(db, table, parts)
postToAll(AlterPartitionsEvent(db, table, partSpecs))
}

override def getPartition(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.spark.sql.catalyst.catalog

import org.apache.spark.scheduler.SparkListenerEvent
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec

/**
* Event emitted by the external catalog when it is modified. Events are either fired before or
Expand Down Expand Up @@ -202,3 +203,87 @@ case class RenameFunctionEvent(
name: String,
newName: String)
extends FunctionEvent

/**
* Event fired when some partitions (of a table) are created, dropped, renamed, altered.
*/
trait PartitionsEvent extends TableEvent {
/**
* Specs of the partitions which are touched.
*/
val partSpecs: Seq[TablePartitionSpec]
}

/**
* Event fired before some partitions (of a table) are created.
*/
case class CreatePartitionsPreEvent(
database: String,
name /* of table */: String,
partSpecs: Seq[TablePartitionSpec])
extends PartitionsEvent

/**
* Event fired after some partitions (of a table) have been created.
*/
case class CreatePartitionsEvent(
database: String,
name /* of table */: String,
partSpecs: Seq[TablePartitionSpec])
extends PartitionsEvent

/**
* Event fired before some partitions (of a table) are dropped.
*/
case class DropPartitionsPreEvent(
database: String,
name /* of table */ : String,
partSpecs: Seq[TablePartitionSpec])
extends PartitionsEvent

/**
* Event fired after some partitions (of a table) have been dropped.
*/
case class DropPartitionsEvent(
database: String,
name /* of table */ : String,
partSpecs: Seq[TablePartitionSpec])
extends PartitionsEvent

/**
* Event fired before some partitions (of a table) are renamed.
*/
case class RenamePartitionsPreEvent(
database: String,
name /* of table */ : String,
partSpecs: Seq[TablePartitionSpec],
newPartSpecs: Seq[TablePartitionSpec])
extends PartitionsEvent

/**
* Event fired after some partitions (of a table) have been renamed.
*/
case class RenamePartitionsEvent(
database: String,
name /* of table */ : String,
partSpecs: Seq[TablePartitionSpec],
newPartSpecs: Seq[TablePartitionSpec])
extends PartitionsEvent

/**
* Event fired before some partitions (of a table) are altered.
*/
case class AlterPartitionsPreEvent(
database: String,
name /* of table */ : String,
partSpecs: Seq[TablePartitionSpec])
extends PartitionsEvent

/**
* Event fired after some partitions (of a table) have been altered.
*/
case class AlterPartitionsEvent(
database: String,
name /* of table */ : String,
partSpecs: Seq[TablePartitionSpec])
extends PartitionsEvent
Original file line number Diff line number Diff line change
Expand Up @@ -209,4 +209,112 @@ class ExternalCatalogEventSuite extends SparkFunSuite {
catalog.dropFunction("db5", "fn4")
checkEvents(DropFunctionPreEvent("db5", "fn4") :: DropFunctionEvent("db5", "fn4") :: Nil)
}

testWithCatalog("partitions") { (catalog, checkEvents) =>
// Prepare db
val db = "db1"
val dbUri = preparePath(Files.createTempDirectory(db + "_"))
val dbDefinition = CatalogDatabase(
name = db,
description = "",
locationUri = dbUri,
properties = Map.empty)

catalog.createDatabase(dbDefinition, ignoreIfExists = false)
checkEvents(
CreateDatabasePreEvent(db) ::
CreateDatabaseEvent(db) :: Nil)

// Prepare table
val table = "table1"
val tableUri = preparePath(Files.createTempDirectory(table + "_"))
val tableDefinition = CatalogTable(
identifier = TableIdentifier(table, Some(db)),
tableType = CatalogTableType.MANAGED,
storage = CatalogStorageFormat.empty.copy(locationUri = Option(tableUri)),
schema = new StructType()
.add("year", "int")
.add("month", "int")
.add("sales", "long"))

catalog.createTable(tableDefinition, ignoreIfExists = false)
checkEvents(
CreateTablePreEvent(db, table) ::
CreateTableEvent(db, table) :: Nil)

// Prepare partitions
val storageFormat = CatalogStorageFormat(
locationUri = Some(tableUri),
inputFormat = Some("tableInputFormat"),
outputFormat = Some("tableOutputFormat"),
serde = None,
compressed = false,
properties = Map.empty)
val parts = Seq(CatalogTablePartition(Map("year" -> "2025", "month" -> "Jan"), storageFormat))
val partSpecs = parts.map(_.spec)

val newPartSpecs = Seq(Map("year" -> "2026", "month" -> "Feb"))

// CREATE
catalog.createPartitions(db, table, parts, ignoreIfExists = false)
checkEvents(
CreatePartitionsPreEvent(db, table, partSpecs) ::
CreatePartitionsEvent(db, table, partSpecs) :: Nil)

// Re-create with ignoreIfExists as true
catalog.createPartitions(db, table, parts, ignoreIfExists = true)
checkEvents(
CreatePartitionsPreEvent(db, table, partSpecs) ::
CreatePartitionsEvent(db, table, partSpecs) :: Nil)

// createPartitions() failed because re-creating with ignoreIfExists as false, so PreEvent only
intercept[AnalysisException] {
catalog.createPartitions(db, table, parts, ignoreIfExists = false)
}
checkEvents(CreatePartitionsPreEvent(db, table, partSpecs) :: Nil)

// ALTER
catalog.alterPartitions(db, table, parts)
checkEvents(
AlterPartitionsPreEvent(db, table, partSpecs) ::
AlterPartitionsEvent(db, table, partSpecs) ::
Nil)

// RENAME
catalog.renamePartitions(db, table, partSpecs, newPartSpecs)
checkEvents(
RenamePartitionsPreEvent(db, table, partSpecs, newPartSpecs) ::
RenamePartitionsEvent(db, table, partSpecs, newPartSpecs) :: Nil)

// renamePartitions() failed because partitions have been renamed according to newPartSpecs,
// so PreEvent only
intercept[AnalysisException] {
catalog.renamePartitions(db, table, partSpecs, newPartSpecs)
}
checkEvents(RenamePartitionsPreEvent(db, table, partSpecs, newPartSpecs) :: Nil)

// DROP
// dropPartitions() failed
// because partition of (old) partSpecs do not exist and ignoreIfNotExists is false,
// So PreEvent only
intercept[AnalysisException] {
catalog.dropPartitions(db, table, partSpecs,
ignoreIfNotExists = false, purge = true, retainData = true)
}
checkEvents(DropPartitionsPreEvent(db, table, partSpecs) :: Nil)

// Drop the renamed partitions
catalog.dropPartitions(db, table, newPartSpecs,
ignoreIfNotExists = false, purge = true, retainData = true)
checkEvents(
DropPartitionsPreEvent(db, table, newPartSpecs) ::
DropPartitionsEvent(db, table, newPartSpecs) :: Nil)

// Re-drop with ignoreIfNotExists being true
catalog.dropPartitions(db, table, newPartSpecs,
ignoreIfNotExists = true, purge = true, retainData = true)
checkEvents(
DropPartitionsPreEvent(db, table, newPartSpecs) ::
DropPartitionsEvent(db, table, newPartSpecs) :: Nil)
}
}