Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.sql.avro

import scala.util.control.NonFatal

import org.apache.avro.Schema
import org.apache.avro.generic.GenericDatumReader
import org.apache.avro.io.{BinaryDecoder, DecoderFactory}

Expand Down Expand Up @@ -57,8 +56,7 @@ case class AvroDataToCatalyst(

private lazy val avroOptions = AvroOptions(options)

@transient private lazy val actualSchema =
new Schema.Parser().setValidateDefaults(false).parse(jsonFormatSchema)
@transient private lazy val actualSchema = AvroUtils.parseAvroSchema(jsonFormatSchema)

@transient private lazy val expectedSchema = avroOptions.schema.getOrElse(actualSchema)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.sql.avro

import java.io.ByteArrayOutputStream

import org.apache.avro.Schema
import org.apache.avro.generic.GenericDatumWriter
import org.apache.avro.io.{BinaryEncoder, EncoderFactory}

Expand All @@ -35,7 +34,7 @@ case class CatalystDataToAvro(

@transient private lazy val avroType =
jsonFormatSchema
.map(new Schema.Parser().setValidateDefaults(false).parse)
.map(AvroUtils.parseAvroSchema)
.getOrElse(SchemaConverters.toAvroType(child.dataType, child.nullable))

@transient private lazy val serializer =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ case class SchemaOfAvro(

@transient private lazy val avroOptions = AvroOptions(options)

@transient private lazy val actualSchema =
new Schema.Parser().setValidateDefaults(false).parse(jsonFormatSchema)
@transient private lazy val actualSchema = AvroUtils.parseAvroSchema(jsonFormatSchema)

@transient private lazy val expectedSchema = avroOptions.schema.getOrElse(actualSchema)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,4 +386,53 @@ class AvroCatalystDataConversionSuite extends SparkFunSuite
checkDeserialization(avroSchema, avroRecord, Some(expected))
checkDeserialization(avroSchema, avroRecord, Some(expected))
}

test("SPARK-56043: AvroDataToCatalyst with unresolvable schema reference") {
// Avro 1.12.x throws NPE from ParseContext.resolve() for undefined named types.
// Our fix wraps the NPE in SchemaParseException so the existing parseMode error
// handling works correctly (FAILFAST -> MALFORMED_AVRO_MESSAGE).
val invalidSchema =
"""
|{
| "type": "record",
| "name": "TestRecord",
| "fields": [
| {"name": "value", "type": "UndefinedType"}
| ]
|}
""".stripMargin

val data = Literal(Array[Byte](1, 2, 3))
intercept[SparkException] {
AvroDataToCatalyst(data, invalidSchema, Map("mode" -> "FAILFAST")).eval()
}
}

test("SPARK-56043: AvroDataToCatalyst with malformed JSON schema") {
val malformedSchema = "not valid json"
val data = Literal(Array[Byte](1, 2, 3))
intercept[SparkException] {
AvroDataToCatalyst(data, malformedSchema, Map("mode" -> "FAILFAST")).eval()
}
}

test("SPARK-56043: bare string schema reference triggers NPE in Avro 1.12.x") {
// This is the exact pattern that triggers NPE from ParseContext.resolve() in 1.12.x.
// In 1.11.x this threw SchemaParseException. Our fix wraps NPE in SchemaParseException.
val bareStringRef = "\"com.test.Missing\""

// Verify raw Avro 1.12.x parser throws NPE for bare string references
val rawException = intercept[Exception] {
new Schema.Parser().setValidateDefaults(false).parse(bareStringRef)
}
assert(rawException.isInstanceOf[NullPointerException],
s"Expected NullPointerException from Avro 1.12.x ParseContext.resolve(), " +
s"but got ${rawException.getClass.getName}: ${rawException.getMessage}")

// Verify our fix wraps it in SchemaParseException
val wrappedException = intercept[org.apache.avro.SchemaParseException] {
AvroUtils.parseAvroSchema(bareStringRef)
}
assert(wrappedException.getCause.isInstanceOf[NullPointerException])
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -743,4 +743,37 @@ class AvroFunctionsSuite extends QueryTest with SharedSparkSession {
from_avro($"avro", avroStructSchema).as("schedule"))
checkAnswer(readBack, df)
}

test("SPARK-56043: from_avro with invalid schema should not throw NPE") {
// An Avro schema that references an undefined type should produce MALFORMED_AVRO_MESSAGE
// rather than a raw NullPointerException from the Avro library's ParseContext.resolve().
val invalidSchema =
"""
|{
| "type": "record",
| "name": "TestRecord",
| "fields": [
| {"name": "value", "type": "UndefinedType"}
| ]
|}
""".stripMargin

val df = spark.range(1).select(lit(Array[Byte](1, 2, 3)).as("data"))
val ex = intercept[Exception] {
df.select(from_avro($"data", invalidSchema)).collect()
}
assert(!ex.isInstanceOf[NullPointerException],
s"Should not throw NPE, but got: ${ex.getClass.getName}: ${ex.getMessage}")
}

test("SPARK-56043: from_avro with completely unparseable schema should not throw NPE") {
val garbageSchema = "this is not valid JSON at all"

val df = spark.range(1).select(lit(Array[Byte](1, 2, 3)).as("data"))
val ex = intercept[Exception] {
df.select(from_avro($"data", garbageSchema)).collect()
}
assert(!ex.isInstanceOf[NullPointerException],
s"Should not throw NPE, but got: ${ex.getClass.getName}: ${ex.getMessage}")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ private[sql] class AvroOptions(
* instead of "string" type in the default converted schema.
*/
val schema: Option[Schema] = {
parameters.get(AVRO_SCHEMA).map(new Schema.Parser().setValidateDefaults(false).parse).orElse({
parameters.get(AVRO_SCHEMA).map(AvroUtils.parseAvroSchema).orElse({
val avroUrlSchema = parameters.get(AVRO_SCHEMA_URL).map(url => {
log.debug("loading avro schema from url: " + url)
val fs = FileSystem.get(new URI(url), conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.sql.avro

import org.apache.avro.Schema
import org.apache.avro.mapreduce.AvroJob
import org.apache.hadoop.mapreduce.TaskAttemptContext

Expand All @@ -37,7 +36,7 @@ private[sql] class AvroOutputWriterFactory(
avroSchemaAsJsonString: String,
positionalFieldMatching: Boolean) extends OutputWriterFactory {

private lazy val avroSchema = new Schema.Parser().parse(avroSchemaAsJsonString)
private lazy val avroSchema = AvroUtils.parseAvroSchema(avroSchemaAsJsonString)

override def getFileExtension(context: TaskAttemptContext): String = {
val codec = context.getConfiguration.get(AvroJob.CONF_OUTPUT_CODEC)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.util.Locale

import scala.jdk.CollectionConverters._

import org.apache.avro.{Schema, SchemaFormatter}
import org.apache.avro.{Schema, SchemaFormatter, SchemaParseException}
import org.apache.avro.file.{DataFileReader, FileReader}
import org.apache.avro.generic.{GenericDatumReader, GenericRecord}
import org.apache.avro.mapred.{AvroOutputFormat, FsInput}
Expand All @@ -48,6 +48,30 @@ private[sql] object AvroUtils extends Logging {
val JSON_INLINE_FORMAT: String = "json/inline"
val JSON_PRETTY_FORMAT: String = "json/pretty"

/**
* Parses an Avro schema from a JSON string with proper error handling.
*
* The Avro 1.12.x library's Schema.Parser can throw NullPointerException from
* ParseContext.resolve() when the schema references named types that cannot be
* resolved. This method wraps such NPEs in SchemaParseException so they are
* handled by the existing error handling in from_avro/to_avro (which catches
* NonFatal exceptions and reports them via the parseMode mechanism).
*
* @param jsonFormatSchema the Avro schema in JSON string format
* @return the parsed Avro Schema
* @throws SchemaParseException if the schema cannot be parsed
*/
def parseAvroSchema(jsonFormatSchema: String): Schema = {
try {
new Schema.Parser().setValidateDefaults(false).parse(jsonFormatSchema)
} catch {
case e: NullPointerException =>
val ex = new SchemaParseException(s"Failed to parse Avro schema: ${e.getMessage}")
ex.initCause(e)
throw ex
}
}

def inferSchema(
spark: SparkSession,
options: Map[String, String],
Expand Down