Skip to content

Commit 38c6ef4

Browse files
jovanm-dbcloud-fan
authored andcommitted
[SPARK-50529][SQL] Change char/varchar behavior under the spark.sql.preserveCharVarcharTypeInfo config
### What changes were proposed in this pull request? This PR changes char/varchar behaviour under the `PRESERVE_CHAR_VARCHAR_TYPE_INFO` configuration flag, (exposed as `spark.sql.preserveCharVarcharTypeInfo`). ### Why are the changes needed? This PR enables the improvement of char/varchar types in a backwards compatible way. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added tests in: - `RowEncoderSuite` - `LiteralExpressionSuite` - `CharVarcharTestSuite` ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#49128 from jovanm-db/char_varchar_conf. Authored-by: Jovan Markovic <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 5c075c3 commit 38c6ef4

File tree

19 files changed

+265
-33
lines changed

19 files changed

+265
-33
lines changed

sql/api/src/main/scala/org/apache/spark/sql/Encoders.scala

+14
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,20 @@ object Encoders {
8181
*/
8282
def DOUBLE: Encoder[java.lang.Double] = BoxedDoubleEncoder
8383

84+
/**
85+
* An encoder for nullable char type.
86+
*
87+
* @since 4.0.0
88+
*/
89+
def CHAR(length: Int): Encoder[java.lang.String] = CharEncoder(length)
90+
91+
/**
92+
* An encoder for nullable varchar type.
93+
*
94+
* @since 4.0.0
95+
*/
96+
def VARCHAR(length: Int): Encoder[java.lang.String] = VarcharEncoder(length)
97+
8498
/**
8599
* An encoder for nullable string type.
86100
*

sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/AgnosticEncoder.scala

+2
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,8 @@ object AgnosticEncoders {
231231
// Nullable leaf encoders
232232
case object NullEncoder extends LeafEncoder[java.lang.Void](NullType)
233233
case object StringEncoder extends LeafEncoder[String](StringType)
234+
case class CharEncoder(length: Int) extends LeafEncoder[String](CharType(length))
235+
case class VarcharEncoder(length: Int) extends LeafEncoder[String](VarcharType(length))
234236
case object BinaryEncoder extends LeafEncoder[Array[Byte]](BinaryType)
235237
case object ScalaBigIntEncoder extends LeafEncoder[BigInt](DecimalType.BigIntDecimal)
236238
case object JavaBigIntEncoder extends LeafEncoder[JBigInt](DecimalType.BigIntDecimal)

sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala

+6-6
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import scala.collection.mutable
2121
import scala.reflect.classTag
2222

2323
import org.apache.spark.sql.{AnalysisException, Row}
24-
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{BinaryEncoder, BoxedBooleanEncoder, BoxedByteEncoder, BoxedDoubleEncoder, BoxedFloatEncoder, BoxedIntEncoder, BoxedLongEncoder, BoxedShortEncoder, CalendarIntervalEncoder, DateEncoder, DayTimeIntervalEncoder, EncoderField, InstantEncoder, IterableEncoder, JavaDecimalEncoder, LocalDateEncoder, LocalDateTimeEncoder, MapEncoder, NullEncoder, RowEncoder => AgnosticRowEncoder, StringEncoder, TimestampEncoder, UDTEncoder, VariantEncoder, YearMonthIntervalEncoder}
24+
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{BinaryEncoder, BoxedBooleanEncoder, BoxedByteEncoder, BoxedDoubleEncoder, BoxedFloatEncoder, BoxedIntEncoder, BoxedLongEncoder, BoxedShortEncoder, CalendarIntervalEncoder, CharEncoder, DateEncoder, DayTimeIntervalEncoder, EncoderField, InstantEncoder, IterableEncoder, JavaDecimalEncoder, LocalDateEncoder, LocalDateTimeEncoder, MapEncoder, NullEncoder, RowEncoder => AgnosticRowEncoder, StringEncoder, TimestampEncoder, UDTEncoder, VarcharEncoder, VariantEncoder, YearMonthIntervalEncoder}
2525
import org.apache.spark.sql.errors.{DataTypeErrorsBase, ExecutionErrors}
2626
import org.apache.spark.sql.internal.SqlApiConf
2727
import org.apache.spark.sql.types._
@@ -80,11 +80,11 @@ object RowEncoder extends DataTypeErrorsBase {
8080
case DoubleType => BoxedDoubleEncoder
8181
case dt: DecimalType => JavaDecimalEncoder(dt, lenientSerialization = true)
8282
case BinaryType => BinaryEncoder
83-
case CharType(_) | VarcharType(_) =>
84-
throw new AnalysisException(
85-
errorClass = "UNSUPPORTED_DATA_TYPE_FOR_ENCODER",
86-
messageParameters = Map("dataType" -> toSQLType(dataType)))
87-
case _: StringType => StringEncoder
83+
case CharType(length) if SqlApiConf.get.preserveCharVarcharTypeInfo =>
84+
CharEncoder(length)
85+
case VarcharType(length) if SqlApiConf.get.preserveCharVarcharTypeInfo =>
86+
VarcharEncoder(length)
87+
case s: StringType if s.constraint == NoConstraint => StringEncoder
8888
case TimestampType if SqlApiConf.get.datetimeJava8ApiEnabled => InstantEncoder(lenient)
8989
case TimestampType => TimestampEncoder(lenient)
9090
case TimestampNTZType => LocalDateTimeEncoder

sql/api/src/main/scala/org/apache/spark/sql/catalyst/util/SparkCharVarcharUtils.scala

+1-2
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,7 @@ trait SparkCharVarcharUtils {
5454
StructType(fields.map { field =>
5555
field.copy(dataType = replaceCharVarcharWithString(field.dataType))
5656
})
57-
case _: CharType => StringType
58-
case _: VarcharType => StringType
57+
case CharType(_) | VarcharType(_) if !SqlApiConf.get.preserveCharVarcharTypeInfo => StringType
5958
case _ => dt
6059
}
6160
}

sql/api/src/main/scala/org/apache/spark/sql/internal/SqlApiConf.scala

+2
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ private[sql] trait SqlApiConf {
4040
def timestampType: AtomicType
4141
def allowNegativeScaleOfDecimalEnabled: Boolean
4242
def charVarcharAsString: Boolean
43+
def preserveCharVarcharTypeInfo: Boolean
4344
def datetimeJava8ApiEnabled: Boolean
4445
def sessionLocalTimeZone: String
4546
def legacyTimeParserPolicy: LegacyBehaviorPolicy.Value
@@ -80,6 +81,7 @@ private[sql] object DefaultSqlApiConf extends SqlApiConf {
8081
override def timestampType: AtomicType = TimestampType
8182
override def allowNegativeScaleOfDecimalEnabled: Boolean = false
8283
override def charVarcharAsString: Boolean = false
84+
override def preserveCharVarcharTypeInfo: Boolean = false
8385
override def datetimeJava8ApiEnabled: Boolean = false
8486
override def sessionLocalTimeZone: String = TimeZone.getDefault.getID
8587
override def legacyTimeParserPolicy: LegacyBehaviorPolicy.Value = LegacyBehaviorPolicy.CORRECTED

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala

+29
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,8 @@ object CatalystTypeConverters {
6666
case arrayType: ArrayType => ArrayConverter(arrayType.elementType)
6767
case mapType: MapType => MapConverter(mapType.keyType, mapType.valueType)
6868
case structType: StructType => StructConverter(structType)
69+
case CharType(length) => new CharConverter(length)
70+
case VarcharType(length) => new VarcharConverter(length)
6971
case _: StringType => StringConverter
7072
case DateType if SQLConf.get.datetimeJava8ApiEnabled => LocalDateConverter
7173
case DateType => DateConverter
@@ -296,6 +298,33 @@ object CatalystTypeConverters {
296298
toScala(row.getStruct(column, structType.size))
297299
}
298300

301+
private class CharConverter(length: Int) extends CatalystTypeConverter[Any, String, UTF8String] {
302+
override def toCatalystImpl(scalaValue: Any): UTF8String =
303+
CharVarcharCodegenUtils.charTypeWriteSideCheck(
304+
StringConverter.toCatalystImpl(scalaValue), length)
305+
override def toScala(catalystValue: UTF8String): String = if (catalystValue == null) {
306+
null
307+
} else {
308+
CharVarcharCodegenUtils.charTypeWriteSideCheck(catalystValue, length).toString
309+
}
310+
override def toScalaImpl(row: InternalRow, column: Int): String =
311+
CharVarcharCodegenUtils.charTypeWriteSideCheck(row.getUTF8String(column), length).toString
312+
}
313+
314+
private class VarcharConverter(length: Int)
315+
extends CatalystTypeConverter[Any, String, UTF8String] {
316+
override def toCatalystImpl(scalaValue: Any): UTF8String =
317+
CharVarcharCodegenUtils.varcharTypeWriteSideCheck(
318+
StringConverter.toCatalystImpl(scalaValue), length)
319+
override def toScala(catalystValue: UTF8String): String = if (catalystValue == null) {
320+
null
321+
} else {
322+
CharVarcharCodegenUtils.varcharTypeWriteSideCheck(catalystValue, length).toString
323+
}
324+
override def toScalaImpl(row: InternalRow, column: Int): String =
325+
CharVarcharCodegenUtils.varcharTypeWriteSideCheck(row.getUTF8String(column), length).toString
326+
}
327+
299328
private object StringConverter extends CatalystTypeConverter[Any, String, UTF8String] {
300329
override def toCatalystImpl(scalaValue: Any): UTF8String = scalaValue match {
301330
case str: String => UTF8String.fromString(str)

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/DeserializerBuildHelper.scala

+32-2
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,11 @@ package org.apache.spark.sql.catalyst
2020
import org.apache.spark.sql.catalyst.{expressions => exprs}
2121
import org.apache.spark.sql.catalyst.analysis.{GetColumnByOrdinal, UnresolvedExtractValue}
2222
import org.apache.spark.sql.catalyst.encoders.{AgnosticEncoder, AgnosticEncoders, Codec, JavaSerializationCodec, KryoSerializationCodec}
23-
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{ArrayEncoder, BoxedLeafEncoder, DateEncoder, DayTimeIntervalEncoder, InstantEncoder, IterableEncoder, JavaBeanEncoder, JavaBigIntEncoder, JavaDecimalEncoder, JavaEnumEncoder, LocalDateEncoder, LocalDateTimeEncoder, MapEncoder, OptionEncoder, PrimitiveBooleanEncoder, PrimitiveByteEncoder, PrimitiveDoubleEncoder, PrimitiveFloatEncoder, PrimitiveIntEncoder, PrimitiveLongEncoder, PrimitiveShortEncoder, ProductEncoder, ScalaBigIntEncoder, ScalaDecimalEncoder, ScalaEnumEncoder, StringEncoder, TimestampEncoder, TransformingEncoder, UDTEncoder, YearMonthIntervalEncoder}
23+
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{ArrayEncoder, BoxedLeafEncoder, CharEncoder, DateEncoder, DayTimeIntervalEncoder, InstantEncoder, IterableEncoder, JavaBeanEncoder, JavaBigIntEncoder, JavaDecimalEncoder, JavaEnumEncoder, LocalDateEncoder, LocalDateTimeEncoder, MapEncoder, OptionEncoder, PrimitiveBooleanEncoder, PrimitiveByteEncoder, PrimitiveDoubleEncoder, PrimitiveFloatEncoder, PrimitiveIntEncoder, PrimitiveLongEncoder, PrimitiveShortEncoder, ProductEncoder, ScalaBigIntEncoder, ScalaDecimalEncoder, ScalaEnumEncoder, StringEncoder, TimestampEncoder, TransformingEncoder, UDTEncoder, VarcharEncoder, YearMonthIntervalEncoder}
2424
import org.apache.spark.sql.catalyst.encoders.EncoderUtils.{externalDataTypeFor, isNativeEncoder}
2525
import org.apache.spark.sql.catalyst.expressions.{Expression, GetStructField, IsNull, Literal, MapKeys, MapValues, UpCast}
2626
import org.apache.spark.sql.catalyst.expressions.objects.{AssertNotNull, CreateExternalRow, DecodeUsingSerializer, InitializeJavaBean, Invoke, NewInstance, StaticInvoke, UnresolvedCatalystToExternalMap, UnresolvedMapObjects, WrapOption}
27-
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils, IntervalUtils}
27+
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, CharVarcharCodegenUtils, DateTimeUtils, IntervalUtils}
2828
import org.apache.spark.sql.types._
2929

3030
object DeserializerBuildHelper {
@@ -80,6 +80,32 @@ object DeserializerBuildHelper {
8080
returnNullable = false)
8181
}
8282

83+
def createDeserializerForChar(
84+
path: Expression,
85+
returnNullable: Boolean,
86+
length: Int): Expression = {
87+
val expr = StaticInvoke(
88+
classOf[CharVarcharCodegenUtils],
89+
StringType,
90+
"charTypeWriteSideCheck",
91+
path :: Literal(length) :: Nil,
92+
returnNullable = returnNullable)
93+
createDeserializerForString(expr, returnNullable)
94+
}
95+
96+
def createDeserializerForVarchar(
97+
path: Expression,
98+
returnNullable: Boolean,
99+
length: Int): Expression = {
100+
val expr = StaticInvoke(
101+
classOf[CharVarcharCodegenUtils],
102+
StringType,
103+
"varcharTypeWriteSideCheck",
104+
path :: Literal(length) :: Nil,
105+
returnNullable = returnNullable)
106+
createDeserializerForString(expr, returnNullable)
107+
}
108+
83109
def createDeserializerForString(path: Expression, returnNullable: Boolean): Expression = {
84110
Invoke(path, "toString", ObjectType(classOf[java.lang.String]),
85111
returnNullable = returnNullable)
@@ -258,6 +284,10 @@ object DeserializerBuildHelper {
258284
"withName",
259285
createDeserializerForString(path, returnNullable = false) :: Nil,
260286
returnNullable = false)
287+
case CharEncoder(length) =>
288+
createDeserializerForChar(path, returnNullable = false, length)
289+
case VarcharEncoder(length) =>
290+
createDeserializerForVarchar(path, returnNullable = false, length)
261291
case StringEncoder =>
262292
createDeserializerForString(path, returnNullable = false)
263293
case _: ScalaDecimalEncoder =>

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SerializerBuildHelper.scala

+22-2
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,11 @@ import scala.language.existentials
2222
import org.apache.spark.sql.catalyst.{expressions => exprs}
2323
import org.apache.spark.sql.catalyst.DeserializerBuildHelper.expressionWithNullSafety
2424
import org.apache.spark.sql.catalyst.encoders.{AgnosticEncoder, AgnosticEncoders, Codec, JavaSerializationCodec, KryoSerializationCodec}
25-
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{ArrayEncoder, BoxedBooleanEncoder, BoxedByteEncoder, BoxedDoubleEncoder, BoxedFloatEncoder, BoxedIntEncoder, BoxedLeafEncoder, BoxedLongEncoder, BoxedShortEncoder, DateEncoder, DayTimeIntervalEncoder, InstantEncoder, IterableEncoder, JavaBeanEncoder, JavaBigIntEncoder, JavaDecimalEncoder, JavaEnumEncoder, LocalDateEncoder, LocalDateTimeEncoder, MapEncoder, OptionEncoder, PrimitiveLeafEncoder, ProductEncoder, ScalaBigIntEncoder, ScalaDecimalEncoder, ScalaEnumEncoder, StringEncoder, TimestampEncoder, TransformingEncoder, UDTEncoder, YearMonthIntervalEncoder}
25+
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{ArrayEncoder, BoxedBooleanEncoder, BoxedByteEncoder, BoxedDoubleEncoder, BoxedFloatEncoder, BoxedIntEncoder, BoxedLeafEncoder, BoxedLongEncoder, BoxedShortEncoder, CharEncoder, DateEncoder, DayTimeIntervalEncoder, InstantEncoder, IterableEncoder, JavaBeanEncoder, JavaBigIntEncoder, JavaDecimalEncoder, JavaEnumEncoder, LocalDateEncoder, LocalDateTimeEncoder, MapEncoder, OptionEncoder, PrimitiveLeafEncoder, ProductEncoder, ScalaBigIntEncoder, ScalaDecimalEncoder, ScalaEnumEncoder, StringEncoder, TimestampEncoder, TransformingEncoder, UDTEncoder, VarcharEncoder, YearMonthIntervalEncoder}
2626
import org.apache.spark.sql.catalyst.encoders.EncoderUtils.{externalDataTypeFor, isNativeEncoder, lenientExternalDataTypeFor}
2727
import org.apache.spark.sql.catalyst.expressions.{BoundReference, CheckOverflow, CreateNamedStruct, Expression, IsNull, KnownNotNull, Literal, UnsafeArrayData}
2828
import org.apache.spark.sql.catalyst.expressions.objects._
29-
import org.apache.spark.sql.catalyst.util.{ArrayData, DateTimeUtils, GenericArrayData, IntervalUtils}
29+
import org.apache.spark.sql.catalyst.util.{ArrayData, CharVarcharCodegenUtils, DateTimeUtils, GenericArrayData, IntervalUtils}
3030
import org.apache.spark.sql.internal.SQLConf
3131
import org.apache.spark.sql.types._
3232
import org.apache.spark.unsafe.types.UTF8String
@@ -63,6 +63,24 @@ object SerializerBuildHelper {
6363
Invoke(inputObject, "doubleValue", DoubleType)
6464
}
6565

66+
def createSerializerForChar(inputObject: Expression, length: Int): Expression = {
67+
StaticInvoke(
68+
classOf[CharVarcharCodegenUtils],
69+
CharType(length),
70+
"charTypeWriteSideCheck",
71+
createSerializerForString(inputObject) :: Literal(length) :: Nil,
72+
returnNullable = false)
73+
}
74+
75+
def createSerializerForVarchar(inputObject: Expression, length: Int): Expression = {
76+
StaticInvoke(
77+
classOf[CharVarcharCodegenUtils],
78+
VarcharType(length),
79+
"varcharTypeWriteSideCheck",
80+
createSerializerForString(inputObject) :: Literal(length) :: Nil,
81+
returnNullable = false)
82+
}
83+
6684
def createSerializerForString(inputObject: Expression): Expression = {
6785
StaticInvoke(
6886
classOf[UTF8String],
@@ -298,6 +316,8 @@ object SerializerBuildHelper {
298316
case BoxedDoubleEncoder => createSerializerForDouble(input)
299317
case JavaEnumEncoder(_) => createSerializerForJavaEnum(input)
300318
case ScalaEnumEncoder(_, _) => createSerializerForScalaEnum(input)
319+
case CharEncoder(length) => createSerializerForChar(input, length)
320+
case VarcharEncoder(length) => createSerializerForVarchar(input, length)
301321
case StringEncoder => createSerializerForString(input)
302322
case ScalaDecimalEncoder(dt) => createSerializerForBigDecimal(input, dt)
303323
case JavaDecimalEncoder(dt, false) => createSerializerForBigDecimal(input, dt)

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala

+4-2
Original file line numberDiff line numberDiff line change
@@ -283,9 +283,11 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB
283283
plan.foreachUp {
284284
case p if p.analyzed => // Skip already analyzed sub-plans
285285

286-
case leaf: LeafNode if leaf.output.map(_.dataType).exists(CharVarcharUtils.hasCharVarchar) =>
286+
case leaf: LeafNode if !SQLConf.get.preserveCharVarcharTypeInfo &&
287+
leaf.output.map(_.dataType).exists(CharVarcharUtils.hasCharVarchar) =>
287288
throw SparkException.internalError(
288-
"Logical plan should not have output of char/varchar type: " + leaf)
289+
s"Logical plan should not have output of char/varchar type when " +
290+
s"${SQLConf.PRESERVE_CHAR_VARCHAR_TYPE_INFO.key} is false: " + leaf)
289291

290292
case u: UnresolvedNamespace =>
291293
u.schemaNotFound(u.multipartIdentifier)

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala

+4-2
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,8 @@ object ExpressionEncoder {
8787
}
8888
constructProjection(row).get(0, anyObjectType).asInstanceOf[T]
8989
} catch {
90-
case e: SparkRuntimeException if e.getCondition == "NOT_NULL_ASSERT_VIOLATION" =>
90+
case e: SparkRuntimeException if e.getCondition == "NOT_NULL_ASSERT_VIOLATION" ||
91+
e.getCondition == "EXCEED_LIMIT_LENGTH" =>
9192
throw e
9293
case e: Exception =>
9394
throw QueryExecutionErrors.expressionDecodingError(e, expressions)
@@ -115,7 +116,8 @@ object ExpressionEncoder {
115116
inputRow(0) = t
116117
extractProjection(inputRow)
117118
} catch {
118-
case e: SparkRuntimeException if e.getCondition == "NOT_NULL_ASSERT_VIOLATION" =>
119+
case e: SparkRuntimeException if e.getCondition == "NOT_NULL_ASSERT_VIOLATION" ||
120+
e.getCondition == "EXCEED_LIMIT_LENGTH" =>
119121
throw e
120122
case e: Exception =>
121123
throw QueryExecutionErrors.expressionEncodingError(e, expressions)

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala

+9-3
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,8 @@ object Literal {
166166
case _: DayTimeIntervalType if v.isInstanceOf[Duration] =>
167167
Literal(CatalystTypeConverters.createToCatalystConverter(dataType)(v), dataType)
168168
case _: ObjectType => Literal(v, dataType)
169+
case CharType(_) | VarcharType(_) if SQLConf.get.preserveCharVarcharTypeInfo =>
170+
Literal(CatalystTypeConverters.createToCatalystConverter(dataType)(v), dataType)
169171
case _ => Literal(CatalystTypeConverters.convertToCatalyst(v), dataType)
170172
}
171173
}
@@ -196,9 +198,13 @@ object Literal {
196198
case TimestampNTZType => create(0L, TimestampNTZType)
197199
case it: DayTimeIntervalType => create(0L, it)
198200
case it: YearMonthIntervalType => create(0, it)
199-
case CharType(_) | VarcharType(_) =>
200-
throw QueryExecutionErrors.noDefaultForDataTypeError(dataType)
201-
case st: StringType => Literal(UTF8String.fromString(""), st)
201+
case CharType(length) =>
202+
create(CharVarcharCodegenUtils.charTypeWriteSideCheck(UTF8String.fromString(""), length),
203+
dataType)
204+
case VarcharType(length) =>
205+
create(CharVarcharCodegenUtils.varcharTypeWriteSideCheck(UTF8String.fromString(""), length),
206+
dataType)
207+
case st: StringType if st.constraint == NoConstraint => Literal(UTF8String.fromString(""), st)
202208
case BinaryType => Literal("".getBytes(StandardCharsets.UTF_8))
203209
case CalendarIntervalType => Literal(new CalendarInterval(0, 0, 0))
204210
case arr: ArrayType => create(Array(), arr)

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CharVarcharUtils.scala

+10-2
Original file line numberDiff line numberDiff line change
@@ -164,15 +164,23 @@ object CharVarcharUtils extends Logging with SparkCharVarcharUtils {
164164
case CharType(length) if charFuncName.isDefined =>
165165
StaticInvoke(
166166
classOf[CharVarcharCodegenUtils],
167-
StringType,
167+
if (SQLConf.get.preserveCharVarcharTypeInfo) {
168+
CharType(length)
169+
} else {
170+
StringType
171+
},
168172
charFuncName.get,
169173
expr :: Literal(length) :: Nil,
170174
returnNullable = false)
171175

172176
case VarcharType(length) if varcharFuncName.isDefined =>
173177
StaticInvoke(
174178
classOf[CharVarcharCodegenUtils],
175-
StringType,
179+
if (SQLConf.get.preserveCharVarcharTypeInfo) {
180+
VarcharType(length)
181+
} else {
182+
StringType
183+
},
176184
varcharFuncName.get,
177185
expr :: Literal(length) :: Nil,
178186
returnNullable = false)

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

+10
Original file line numberDiff line numberDiff line change
@@ -4938,6 +4938,14 @@ object SQLConf {
49384938
.booleanConf
49394939
.createWithDefault(false)
49404940

4941+
val PRESERVE_CHAR_VARCHAR_TYPE_INFO = buildConf("spark.sql.preserveCharVarcharTypeInfo")
4942+
.doc("When true, Spark does not replace CHAR/VARCHAR types the STRING type, which is the " +
4943+
"default behavior of Spark 3.0 and earlier versions. This means the length checks for " +
4944+
"CHAR/VARCHAR types is enforced and CHAR type is also properly padded.")
4945+
.version("4.0.0")
4946+
.booleanConf
4947+
.createWithDefault(false)
4948+
49414949
val READ_SIDE_CHAR_PADDING = buildConf("spark.sql.readSideCharPadding")
49424950
.doc("When true, Spark applies string padding when reading CHAR type columns/fields, " +
49434951
"in addition to the write-side padding. This config is true by default to better enforce " +
@@ -6343,6 +6351,8 @@ class SQLConf extends Serializable with Logging with SqlApiConf {
63436351

63446352
def charVarcharAsString: Boolean = getConf(SQLConf.LEGACY_CHAR_VARCHAR_AS_STRING)
63456353

6354+
def preserveCharVarcharTypeInfo: Boolean = getConf(SQLConf.PRESERVE_CHAR_VARCHAR_TYPE_INFO)
6355+
63466356
def readSideCharPadding: Boolean = getConf(SQLConf.READ_SIDE_CHAR_PADDING)
63476357

63486358
def cliPrintHeader: Boolean = getConf(SQLConf.CLI_PRINT_HEADER)

0 commit comments

Comments
 (0)