Skip to content

Commit 83bb41e

Browse files
liviazhuhaoyangeng-db
authored andcommitted
[SPARK-52171][SS] StateDataSource join implementation for state v3
### What changes were proposed in this pull request? Add implementation for StateDataSource for state format v3 which uses virtual column families for the 4 join stores. This entails a few changes: * Inferring schema for for joins needs to take in oldSchemaFilePaths for state format v3. * sourceOptions need to be modified when the join store name is specified for state format v3, since the name is no longer the store name but the colFamily name. Subsequent metadata checks must also account for this. * A new joinColFamilyOpt needs to be passed through to the StateReaderInfo, StatePartitionReader, etc so that it can be used to read the correct column family. ### Why are the changes needed? Enable StateDataSource for join version 3. ### Does this PR introduce _any_ user-facing change? Yes. Previously StateDataSource could not be used on checkpoints that use join state version 3, and now it can. ### How was this patch tested? New unit tests and enable disabled unit tests. ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#51004 from liviazhu/liviazhu-db/statedatasourcereader-v3. Authored-by: Livia Zhu <[email protected]> Signed-off-by: Anish Shrigondekar <[email protected]>
1 parent c70e1a5 commit 83bb41e

File tree

15 files changed

+364
-147
lines changed

15 files changed

+364
-147
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSource.scala

Lines changed: 153 additions & 79 deletions
Large diffs are not rendered by default.

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -42,19 +42,20 @@ class StatePartitionReaderFactory(
4242
keyStateEncoderSpec: KeyStateEncoderSpec,
4343
stateVariableInfoOpt: Option[TransformWithStateVariableInfo],
4444
stateStoreColFamilySchemaOpt: Option[StateStoreColFamilySchema],
45-
stateSchemaProviderOpt: Option[StateSchemaProvider])
45+
stateSchemaProviderOpt: Option[StateSchemaProvider],
46+
joinColFamilyOpt: Option[String])
4647
extends PartitionReaderFactory {
4748

4849
override def createReader(partition: InputPartition): PartitionReader[InternalRow] = {
4950
val stateStoreInputPartition = partition.asInstanceOf[StateStoreInputPartition]
5051
if (stateStoreInputPartition.sourceOptions.readChangeFeed) {
5152
new StateStoreChangeDataPartitionReader(storeConf, hadoopConf,
5253
stateStoreInputPartition, schema, keyStateEncoderSpec, stateVariableInfoOpt,
53-
stateStoreColFamilySchemaOpt, stateSchemaProviderOpt)
54+
stateStoreColFamilySchemaOpt, stateSchemaProviderOpt, joinColFamilyOpt)
5455
} else {
5556
new StatePartitionReader(storeConf, hadoopConf,
5657
stateStoreInputPartition, schema, keyStateEncoderSpec, stateVariableInfoOpt,
57-
stateStoreColFamilySchemaOpt, stateSchemaProviderOpt)
58+
stateStoreColFamilySchemaOpt, stateSchemaProviderOpt, joinColFamilyOpt)
5859
}
5960
}
6061
}
@@ -71,7 +72,8 @@ abstract class StatePartitionReaderBase(
7172
keyStateEncoderSpec: KeyStateEncoderSpec,
7273
stateVariableInfoOpt: Option[TransformWithStateVariableInfo],
7374
stateStoreColFamilySchemaOpt: Option[StateStoreColFamilySchema],
74-
stateSchemaProviderOpt: Option[StateSchemaProvider])
75+
stateSchemaProviderOpt: Option[StateSchemaProvider],
76+
joinColFamilyOpt: Option[String])
7577
extends PartitionReader[InternalRow] with Logging {
7678
// Used primarily as a placeholder for the value schema in the context of
7779
// state variables used within the transformWithState operator.
@@ -98,11 +100,7 @@ abstract class StatePartitionReaderBase(
98100
partition.sourceOptions.operatorId, partition.partition, partition.sourceOptions.storeName)
99101
val stateStoreProviderId = StateStoreProviderId(stateStoreId, partition.queryId)
100102

101-
val useColFamilies = if (stateVariableInfoOpt.isDefined) {
102-
true
103-
} else {
104-
false
105-
}
103+
val useColFamilies = stateVariableInfoOpt.isDefined || joinColFamilyOpt.isDefined
106104

107105
val useMultipleValuesPerKey = SchemaUtil.checkVariableType(stateVariableInfoOpt,
108106
StateVariableType.ListState)
@@ -164,10 +162,11 @@ class StatePartitionReader(
164162
keyStateEncoderSpec: KeyStateEncoderSpec,
165163
stateVariableInfoOpt: Option[TransformWithStateVariableInfo],
166164
stateStoreColFamilySchemaOpt: Option[StateStoreColFamilySchema],
167-
stateSchemaProviderOpt: Option[StateSchemaProvider])
165+
stateSchemaProviderOpt: Option[StateSchemaProvider],
166+
joinColFamilyOpt: Option[String])
168167
extends StatePartitionReaderBase(storeConf, hadoopConf, partition, schema,
169168
keyStateEncoderSpec, stateVariableInfoOpt, stateStoreColFamilySchemaOpt,
170-
stateSchemaProviderOpt) {
169+
stateSchemaProviderOpt, joinColFamilyOpt) {
171170

172171
private lazy val store: ReadStateStore = {
173172
partition.sourceOptions.fromSnapshotOptions match {
@@ -186,17 +185,18 @@ class StatePartitionReader(
186185
}
187186

188187
override lazy val iter: Iterator[InternalRow] = {
189-
val stateVarName = stateVariableInfoOpt
190-
.map(_.stateName).getOrElse(StateStore.DEFAULT_COL_FAMILY_NAME)
188+
val colFamilyName = stateStoreColFamilySchemaOpt
189+
.map(_.colFamilyName).getOrElse(
190+
joinColFamilyOpt.getOrElse(StateStore.DEFAULT_COL_FAMILY_NAME))
191191

192192
if (stateVariableInfoOpt.isDefined) {
193193
val stateVariableInfo = stateVariableInfoOpt.get
194194
val stateVarType = stateVariableInfo.stateVariableType
195-
SchemaUtil.processStateEntries(stateVarType, stateVarName, store,
195+
SchemaUtil.processStateEntries(stateVarType, colFamilyName, store,
196196
keySchema, partition.partition, partition.sourceOptions)
197197
} else {
198198
store
199-
.iterator(stateVarName)
199+
.iterator(colFamilyName)
200200
.map { pair =>
201201
SchemaUtil.unifyStateRowPair((pair.key, pair.value), partition.partition)
202202
}
@@ -221,10 +221,11 @@ class StateStoreChangeDataPartitionReader(
221221
keyStateEncoderSpec: KeyStateEncoderSpec,
222222
stateVariableInfoOpt: Option[TransformWithStateVariableInfo],
223223
stateStoreColFamilySchemaOpt: Option[StateStoreColFamilySchema],
224-
stateSchemaProviderOpt: Option[StateSchemaProvider])
224+
stateSchemaProviderOpt: Option[StateSchemaProvider],
225+
joinColFamilyOpt: Option[String])
225226
extends StatePartitionReaderBase(storeConf, hadoopConf, partition, schema,
226227
keyStateEncoderSpec, stateVariableInfoOpt, stateStoreColFamilySchemaOpt,
227-
stateSchemaProviderOpt) {
228+
stateSchemaProviderOpt, joinColFamilyOpt) {
228229

229230
private lazy val changeDataReader:
230231
NextIterator[(RecordType.Value, UnsafeRow, UnsafeRow, Long)] = {
@@ -235,6 +236,8 @@ class StateStoreChangeDataPartitionReader(
235236

236237
val colFamilyNameOpt = if (stateVariableInfoOpt.isDefined) {
237238
Some(stateVariableInfoOpt.get.stateName)
239+
} else if (joinColFamilyOpt.isDefined) {
240+
Some(joinColFamilyOpt.get)
238241
} else {
239242
None
240243
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateScanBuilder.scala

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,11 @@ class StateScanBuilder(
4545
keyStateEncoderSpec: KeyStateEncoderSpec,
4646
stateVariableInfoOpt: Option[TransformWithStateVariableInfo],
4747
stateStoreColFamilySchemaOpt: Option[StateStoreColFamilySchema],
48-
stateSchemaProviderOpt: Option[StateSchemaProvider]) extends ScanBuilder {
48+
stateSchemaProviderOpt: Option[StateSchemaProvider],
49+
joinColFamilyOpt: Option[String]) extends ScanBuilder {
4950
override def build(): Scan = new StateScan(session, schema, sourceOptions, stateStoreConf,
50-
keyStateEncoderSpec, stateVariableInfoOpt, stateStoreColFamilySchemaOpt, stateSchemaProviderOpt)
51+
keyStateEncoderSpec, stateVariableInfoOpt, stateStoreColFamilySchemaOpt, stateSchemaProviderOpt,
52+
joinColFamilyOpt)
5153
}
5254

5355
/** An implementation of [[InputPartition]] for State Store data source. */
@@ -65,7 +67,8 @@ class StateScan(
6567
keyStateEncoderSpec: KeyStateEncoderSpec,
6668
stateVariableInfoOpt: Option[TransformWithStateVariableInfo],
6769
stateStoreColFamilySchemaOpt: Option[StateStoreColFamilySchema],
68-
stateSchemaProviderOpt: Option[StateSchemaProvider])
70+
stateSchemaProviderOpt: Option[StateSchemaProvider],
71+
joinColFamilyOpt: Option[String])
6972
extends Scan with Batch {
7073

7174
// A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it
@@ -120,24 +123,28 @@ class StateScan(
120123
override def createReaderFactory(): PartitionReaderFactory = sourceOptions.joinSide match {
121124
case JoinSideValues.left =>
122125
val userFacingSchema = schema
126+
val oldSchemaFilePaths = StateDataSource.getOldSchemaFilePaths(sourceOptions,
127+
hadoopConfBroadcast.value.value)
123128
val stateSchema = StreamStreamJoinStateHelper.readSchema(session,
124129
sourceOptions.stateCheckpointLocation.toString, sourceOptions.operatorId, LeftSide,
125-
excludeAuxColumns = false)
130+
oldSchemaFilePaths, excludeAuxColumns = false)
126131
new StreamStreamJoinStatePartitionReaderFactory(stateStoreConf,
127132
hadoopConfBroadcast.value, userFacingSchema, stateSchema)
128133

129134
case JoinSideValues.right =>
130135
val userFacingSchema = schema
136+
val oldSchemaFilePaths = StateDataSource.getOldSchemaFilePaths(sourceOptions,
137+
hadoopConfBroadcast.value.value)
131138
val stateSchema = StreamStreamJoinStateHelper.readSchema(session,
132139
sourceOptions.stateCheckpointLocation.toString, sourceOptions.operatorId, RightSide,
133-
excludeAuxColumns = false)
140+
oldSchemaFilePaths, excludeAuxColumns = false)
134141
new StreamStreamJoinStatePartitionReaderFactory(stateStoreConf,
135142
hadoopConfBroadcast.value, userFacingSchema, stateSchema)
136143

137144
case JoinSideValues.none =>
138145
new StatePartitionReaderFactory(stateStoreConf, hadoopConfBroadcast.value, schema,
139146
keyStateEncoderSpec, stateVariableInfoOpt, stateStoreColFamilySchemaOpt,
140-
stateSchemaProviderOpt)
147+
stateSchemaProviderOpt, joinColFamilyOpt)
141148
}
142149

143150
override def toBatch: Batch = this

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateTable.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,8 @@ class StateTable(
4444
keyStateEncoderSpec: KeyStateEncoderSpec,
4545
stateVariableInfoOpt: Option[TransformWithStateVariableInfo],
4646
stateStoreColFamilySchemaOpt: Option[StateStoreColFamilySchema],
47-
stateSchemaProviderOpt: Option[StateSchemaProvider])
47+
stateSchemaProviderOpt: Option[StateSchemaProvider],
48+
joinColFamilyOpt: Option[String])
4849
extends Table with SupportsRead with SupportsMetadataColumns {
4950

5051
import StateTable._
@@ -85,7 +86,8 @@ class StateTable(
8586

8687
override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder =
8788
new StateScanBuilder(session, schema, sourceOptions, stateConf, keyStateEncoderSpec,
88-
stateVariableInfoOpt, stateStoreColFamilySchemaOpt, stateSchemaProviderOpt)
89+
stateVariableInfoOpt, stateStoreColFamilySchemaOpt, stateSchemaProviderOpt,
90+
joinColFamilyOpt)
8991

9092
override def properties(): util.Map[String, String] = Map.empty[String, String].asJava
9193

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StreamStreamJoinStateHelper.scala

Lines changed: 65 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,12 @@ package org.apache.spark.sql.execution.datasources.v2.state
1818

1919
import java.util.UUID
2020

21+
import org.apache.hadoop.conf.Configuration
22+
import org.apache.hadoop.fs.Path
23+
2124
import org.apache.spark.sql.SparkSession
22-
import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper.JoinSide
25+
import org.apache.spark.sql.execution.streaming.CheckpointFileManager
26+
import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper.{JoinSide, LeftSide}
2327
import org.apache.spark.sql.execution.streaming.state.{StateSchemaCompatibilityChecker, StateStore, StateStoreId, StateStoreProviderId, SymmetricHashJoinStateManager}
2428
import org.apache.spark.sql.types.{BooleanType, StructType}
2529

@@ -35,52 +39,92 @@ object StreamStreamJoinStateHelper {
3539
stateCheckpointLocation: String,
3640
operatorId: Int,
3741
side: JoinSide,
42+
oldSchemaFilePaths: List[Path],
3843
excludeAuxColumns: Boolean = true): StructType = {
3944
val (keySchema, valueSchema) = readKeyValueSchema(session, stateCheckpointLocation,
40-
operatorId, side, excludeAuxColumns)
45+
operatorId, side, oldSchemaFilePaths, excludeAuxColumns)
4146

4247
new StructType()
4348
.add("key", keySchema)
4449
.add("value", valueSchema)
4550
}
4651

52+
// Returns whether the checkpoint uses stateFormatVersion 3 which uses VCF for the join.
53+
def usesVirtualColumnFamilies(
54+
hadoopConf: Configuration,
55+
stateCheckpointLocation: String,
56+
operatorId: Int): Boolean = {
57+
// If the schema exists for operatorId/partitionId/left-keyToNumValues, it is not
58+
// stateFormatVersion 3.
59+
val partitionId = StateStore.PARTITION_ID_TO_CHECK_SCHEMA
60+
val storeId = new StateStoreId(stateCheckpointLocation, operatorId,
61+
partitionId, SymmetricHashJoinStateManager.allStateStoreNames(LeftSide).toList.head)
62+
val schemaFilePath = StateSchemaCompatibilityChecker.schemaFile(
63+
storeId.storeCheckpointLocation())
64+
val fm = CheckpointFileManager.create(schemaFilePath, hadoopConf)
65+
!fm.exists(schemaFilePath)
66+
}
67+
4768
def readKeyValueSchema(
4869
session: SparkSession,
4970
stateCheckpointLocation: String,
5071
operatorId: Int,
5172
side: JoinSide,
73+
oldSchemaFilePaths: List[Path],
5274
excludeAuxColumns: Boolean = true): (StructType, StructType) = {
5375

76+
val newHadoopConf = session.sessionState.newHadoopConf()
77+
val partitionId = StateStore.PARTITION_ID_TO_CHECK_SCHEMA
5478
// KeyToNumValuesType, KeyWithIndexToValueType
5579
val storeNames = SymmetricHashJoinStateManager.allStateStoreNames(side).toList
5680

57-
val partitionId = StateStore.PARTITION_ID_TO_CHECK_SCHEMA
58-
val storeIdForKeyToNumValues = new StateStoreId(stateCheckpointLocation, operatorId,
59-
partitionId, storeNames(0))
60-
val providerIdForKeyToNumValues = new StateStoreProviderId(storeIdForKeyToNumValues,
61-
UUID.randomUUID())
81+
val (keySchema, valueSchema) =
82+
if (!usesVirtualColumnFamilies(
83+
newHadoopConf, stateCheckpointLocation, operatorId)) {
84+
val storeIdForKeyToNumValues = new StateStoreId(stateCheckpointLocation, operatorId,
85+
partitionId, storeNames(0))
86+
val providerIdForKeyToNumValues = new StateStoreProviderId(storeIdForKeyToNumValues,
87+
UUID.randomUUID())
6288

63-
val storeIdForKeyWithIndexToValue = new StateStoreId(stateCheckpointLocation,
64-
operatorId, partitionId, storeNames(1))
65-
val providerIdForKeyWithIndexToValue = new StateStoreProviderId(storeIdForKeyWithIndexToValue,
66-
UUID.randomUUID())
89+
val storeIdForKeyWithIndexToValue = new StateStoreId(stateCheckpointLocation,
90+
operatorId, partitionId, storeNames(1))
91+
val providerIdForKeyWithIndexToValue = new StateStoreProviderId(
92+
storeIdForKeyWithIndexToValue, UUID.randomUUID())
6793

68-
val newHadoopConf = session.sessionState.newHadoopConf()
94+
// read the key schema from the keyToNumValues store for the join keys
95+
val manager = new StateSchemaCompatibilityChecker(
96+
providerIdForKeyToNumValues, newHadoopConf, oldSchemaFilePaths)
97+
val kSchema = manager.readSchemaFile().head.keySchema
98+
99+
// read the value schema from the keyWithIndexToValue store for the values
100+
val manager2 = new StateSchemaCompatibilityChecker(providerIdForKeyWithIndexToValue,
101+
newHadoopConf, oldSchemaFilePaths)
102+
val vSchema = manager2.readSchemaFile().head.valueSchema
103+
104+
(kSchema, vSchema)
105+
} else {
106+
val storeId = new StateStoreId(stateCheckpointLocation, operatorId,
107+
partitionId, StateStoreId.DEFAULT_STORE_NAME)
108+
val providerId = new StateStoreProviderId(storeId, UUID.randomUUID())
109+
110+
val manager = new StateSchemaCompatibilityChecker(
111+
providerId, newHadoopConf, oldSchemaFilePaths)
112+
val kSchema = manager.readSchemaFile().find { schema =>
113+
schema.colFamilyName == storeNames(0)
114+
}.map(_.keySchema).get
69115

70-
// read the key schema from the keyToNumValues store for the join keys
71-
val manager = new StateSchemaCompatibilityChecker(providerIdForKeyToNumValues, newHadoopConf)
72-
val keySchema = manager.readSchemaFile().head.keySchema
116+
val vSchema = manager.readSchemaFile().find { schema =>
117+
schema.colFamilyName == storeNames(1)
118+
}.map(_.valueSchema).get
73119

74-
// read the value schema from the keyWithIndexToValue store for the values
75-
val manager2 = new StateSchemaCompatibilityChecker(providerIdForKeyWithIndexToValue,
76-
newHadoopConf)
77-
val valueSchema = manager2.readSchemaFile().head.valueSchema
120+
(kSchema, vSchema)
121+
}
78122

79123
val maybeMatchedColumn = valueSchema.last
80124

81125
if (excludeAuxColumns
82-
&& maybeMatchedColumn.name == "matched"
83-
&& maybeMatchedColumn.dataType == BooleanType) {
126+
&& maybeMatchedColumn.name == "matched"
127+
&& maybeMatchedColumn.dataType == BooleanType) {
84128
// remove internal column `matched` for format version 2
85129
(keySchema, StructType(valueSchema.dropRight(1)))
86130
} else {

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StreamStreamJoinStatePartitionReader.scala

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,18 @@ class StreamStreamJoinStatePartitionReader(
8080
private val (inputAttributes, formatVersion) = {
8181
val maybeMatchedColumn = valueSchema.last
8282
val (fields, version) = {
83+
// If there is a matched column, version is either 2 or 3. We need to drop the matched
84+
// column from the value schema to get the actual fields.
8385
if (maybeMatchedColumn.name == "matched" && maybeMatchedColumn.dataType == BooleanType) {
84-
(valueSchema.dropRight(1), 2)
86+
// If checkpoint is using one store and virtual column families, version is 3
87+
if (StreamStreamJoinStateHelper.usesVirtualColumnFamilies(
88+
hadoopConf.value,
89+
partition.sourceOptions.stateCheckpointLocation.toString,
90+
partition.sourceOptions.operatorId)) {
91+
(valueSchema.dropRight(1), 3)
92+
} else {
93+
(valueSchema.dropRight(1), 2)
94+
}
8595
} else {
8696
(valueSchema, 1)
8797
}
@@ -137,7 +147,7 @@ class StreamStreamJoinStatePartitionReader(
137147
inputAttributes)
138148

139149
joinStateManager.iterator.map { pair =>
140-
if (formatVersion == 2) {
150+
if (formatVersion >= 2) {
141151
val row = valueWithMatchedRowGenerator(pair.value)
142152
row.setBoolean(indexOrdinalInValueWithMatchedRow, pair.matched)
143153
unifyStateRowPair(pair.key, row)

sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPySparkExec.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ import org.apache.spark.sql.execution.{CoGroupedIterator, SparkPlan}
3535
import org.apache.spark.sql.execution.metric.SQLMetric
3636
import org.apache.spark.sql.execution.python.ArrowPythonRunner
3737
import org.apache.spark.sql.execution.python.PandasGroupUtils.{executePython, groupAndProject, resolveArgOffsets}
38-
import org.apache.spark.sql.execution.streaming.{DriverStatefulProcessorHandleImpl, StatefulOperatorStateInfo, StatefulProcessorHandleImpl, TransformWithStateExecBase, TransformWithStateVariableInfo}
38+
import org.apache.spark.sql.execution.streaming.{DriverStatefulProcessorHandleImpl, StatefulOperatorStateInfo, StatefulOperatorsUtils, StatefulProcessorHandleImpl, TransformWithStateExecBase, TransformWithStateVariableInfo}
3939
import org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper.StateStoreAwareZipPartitionsHelper
4040
import org.apache.spark.sql.execution.streaming.state.{NoPrefixKeyStateEncoderSpec, RocksDBStateStoreProvider, StateSchemaValidationResult, StateStore, StateStoreColFamilySchema, StateStoreConf, StateStoreId, StateStoreOps, StateStoreProvider, StateStoreProviderId}
4141
import org.apache.spark.sql.internal.SQLConf
@@ -95,9 +95,9 @@ case class TransformWithStateInPySparkExec(
9595
override def shortName: String = if (
9696
userFacingDataType == TransformWithStateInPySpark.UserFacingDataType.PANDAS
9797
) {
98-
"transformWithStateInPandasExec"
98+
StatefulOperatorsUtils.TRANSFORM_WITH_STATE_IN_PANDAS_EXEC_OP_NAME
9999
} else {
100-
"transformWithStateInPySparkExec"
100+
StatefulOperatorsUtils.TRANSFORM_WITH_STATE_IN_PYSPARK_EXEC_OP_NAME
101101
}
102102

103103
private val pythonUDF = functionExpr.asInstanceOf[PythonUDF]

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/StreamingSymmetricHashJoinExec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@ case class StreamingSymmetricHashJoinExec(
236236
case _ => throwBadJoinTypeException()
237237
}
238238

239-
override def shortName: String = "symmetricHashJoin"
239+
override def shortName: String = StatefulOperatorsUtils.SYMMETRIC_HASH_JOIN_EXEC_OP_NAME
240240

241241
override val stateStoreNames: Seq[String] = _stateStoreNames
242242

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/join/SymmetricHashJoinStateManager.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -737,9 +737,9 @@ abstract class SymmetricHashJoinStateManager(
737737
if (useVirtualColumnFamilies) {
738738
stateStore.createColFamilyIfAbsent(
739739
colFamilyName,
740-
keySchema,
740+
keyWithIndexSchema,
741741
valueRowConverter.valueAttributes.toStructType,
742-
NoPrefixKeyStateEncoderSpec(keySchema)
742+
NoPrefixKeyStateEncoderSpec(keyWithIndexSchema)
743743
)
744744
}
745745

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/statefulOperators.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1546,3 +1546,16 @@ trait SchemaValidationUtils extends Logging {
15461546
schemaEvolutionEnabled = usingAvro && schemaEvolutionEnabledForOperator))
15471547
}
15481548
}
1549+
1550+
object StatefulOperatorsUtils {
1551+
val TRANSFORM_WITH_STATE_EXEC_OP_NAME = "transformWithStateExec"
1552+
val TRANSFORM_WITH_STATE_IN_PANDAS_EXEC_OP_NAME = "transformWithStateInPandasExec"
1553+
val TRANSFORM_WITH_STATE_IN_PYSPARK_EXEC_OP_NAME = "transformWithStateInPySparkExec"
1554+
// Seq of operator names who uses state schema v3 and TWS related options.
1555+
val TRANSFORM_WITH_STATE_OP_NAMES: Seq[String] = Seq(
1556+
TRANSFORM_WITH_STATE_EXEC_OP_NAME,
1557+
TRANSFORM_WITH_STATE_IN_PANDAS_EXEC_OP_NAME,
1558+
TRANSFORM_WITH_STATE_IN_PYSPARK_EXEC_OP_NAME
1559+
)
1560+
val SYMMETRIC_HASH_JOIN_EXEC_OP_NAME = "symmetricHashJoin"
1561+
}

0 commit comments

Comments
 (0)