Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Try fixed https://github.com/sutugin/spark-streaming-jdbc-source/issu… #6

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,10 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.sources.offset._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Column, DataFrame, SQLContext}
import org.apache.spark.sql.{DataFrame, SQLContext}

import scala.util.{Failure, Success, Try}
//import scala.util.{Failure, Success, Try}

class JDBCStreamSource(
sqlContext: SQLContext,
Expand All @@ -18,7 +17,6 @@ class JDBCStreamSource(
with Logging {

import JDBCStreamSource._
import sqlContext.implicits._

// ToDo: implement
// private val maxOffsetsPerTrigger = None
Expand All @@ -32,22 +30,31 @@ class JDBCStreamSource(

private var currentOffset: Option[JDBCOffset] = None

private def getOffsetValue(sortFunc: String => Column) =
Try { df.select(offsetColumn).orderBy(sortFunc(offsetColumn)).as[String].first } match {
case Success(value) => Some(value)
case Failure(ex) => logWarning(s"Not found offset ${ex.getStackTrace.mkString("\n")}"); None
}
private def getOffsetValues: (Option[String], Option[String]) = {
val tableKey = "dbtable"
val dbTable = parameters(tableKey)
val minMaxQuery = s"(select max($offsetColumn) as max_$offsetColumn, min($offsetColumn) as min_$offsetColumn from $dbTable) minMaxTable"
val jdbOps = parameters - tableKey + (tableKey -> minMaxQuery)
val mm = df.sparkSession.read.format("jdbc").options(jdbOps).load().collect()(0)
val min = Option(mm.get(1)).map(x => x.toString)
val max = Option(mm.get(0)).map(x => x.toString)
(min, max)
}



private def initFirstOffset(): Unit = {
val (min, max) = getOffsetValues
val start = startingOffset match {
case EarliestOffsetRangeLimit => SpecificOffsetRangeLimit(getOffsetValue(asc).get)
case LatestOffsetRangeLimit => SpecificOffsetRangeLimit(getOffsetValue(desc).get)
case SpecificOffsetRangeLimit(p) => SpecificOffsetRangeLimit(p)
case EarliestOffsetRangeLimit => min.map(x => SpecificOffsetRangeLimit(x).toString)
case LatestOffsetRangeLimit => max.map(x => SpecificOffsetRangeLimit(x).toString)
case SpecificOffsetRangeLimit(p) => Some(SpecificOffsetRangeLimit(p).toString)
}
val end = SpecificOffsetRangeLimit(getOffsetValue(desc).get)
val end = max.map(x => SpecificOffsetRangeLimit(x).toString)

val offsetRange = OffsetRange(Some(start.toString), Some(end.toString))
currentOffset = Some(JDBCOffset(offsetColumn, offsetRange))
if (start.nonEmpty && end.nonEmpty){
currentOffset = Some(JDBCOffset(offsetColumn, OffsetRange(start, end)))
}
}

private val startingOffset = {
Expand Down Expand Up @@ -112,10 +119,17 @@ class JDBCStreamSource(
)

case _ =>
val batchRange = resolveBatchRange(start, end)
val batchData = getBatchData(batchRange)
batchData

if (end.asInstanceOf[JDBCOffset].range.end.isEmpty){
sqlContext.internalCreateDataFrame(
sqlContext.sparkContext.emptyRDD[InternalRow].setName("empty"),
schema,
isStreaming = true
)
} else {
val batchRange = resolveBatchRange(start, end)
val batchData = getBatchData(batchRange)
batchData
}
}
}

Expand All @@ -132,15 +146,16 @@ class JDBCStreamSource(
override def getOffset: Option[Offset] =
if (currentOffset.isEmpty) {
logInfo("No offset, will try to get it from the source.")
Try(initFirstOffset()) match {
case Success(_) => logInfo(s"Offsets retrieved from data: '$currentOffset'."); currentOffset
case Failure(ex) => logWarning(s"Not found offset in source table${ex.getStackTrace.mkString("\n")}"); None
}
initFirstOffset()
currentOffset
} else {
getOffsetValue(desc) match {
case Some(candidateNewEndOffset) if candidateNewEndOffset != currentOffset.get.range.end.get =>
updateCurrentOffsets(newEndOffset = candidateNewEndOffset)
logInfo(s"New offset found: '$currentOffset'.")
val (_, max) = getOffsetValues
max match {
case Some(candidateNewEndOffset) =>
if (currentOffset.isEmpty || candidateNewEndOffset != currentOffset.get.range.end.get) {
updateCurrentOffsets(newEndOffset = candidateNewEndOffset)
logInfo(s"New offset found: '$currentOffset'.")
}
currentOffset
case _ =>
logDebug(s"No new offset found. Previous offset: $currentOffset")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ class JDBCStreamSourceTest
saveStreamingDataToTempDir(jdbc, tmpCheckpoint, tmpOutputDir, spark)

val actual = spark.read.schema(expected.schema).json(tmpOutputDir)
actual.show(false)
expected.show(false)

assertSmallDatasetEquality(actualDS = actual, expectedDS = expected, orderedComparison = false)
}
Expand Down Expand Up @@ -275,4 +277,20 @@ class JDBCStreamSourceTest
val actual = spark.read.schema(schema).json(tmpOutputDir)
assertSmallDatasetEquality(actualDS = actual, expectedDS = expected, orderedComparison = false)
}

it should "work with custom dbtable" in {
import spark.implicits._
val offsetColumn = "dt"
val jdbcTableName = s"tbl${java.util.UUID.randomUUID.toString.replace('-', 'n')}"
val expected = inputData.toDF(columns: _*)
val jdbc = jdbcDefaultParams(jdbcTableName, offsetColumn)
writeToJDBC(jdbc, expected, SaveMode.Append)
val jdbc2 = jdbcDefaultParams(s"(select * from $jdbcTableName) t", offsetColumn)
val tmpCheckpoint = s"${createLocalTempDir("checkopoint")}"
val tmpOutputDir = s"${createLocalTempDir("output")}"
saveStreamingDataToTempDir(jdbc2, tmpCheckpoint, tmpOutputDir, spark)
val actual = spark.read.schema(expected.schema).json(tmpOutputDir)
assertSmallDatasetEquality(actualDS = actual, expectedDS = expected, orderedComparison = false)

}
}