Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -714,6 +714,21 @@ case class StreamingSymmetricHashJoinExec(
case _ => (_: InternalRow) => Iterator.empty
}

// For V4, skip updating the matched flag on the non-outer side to avoid unnecessary
// state store writes. The matched flag is only needed on the outer side (for evicting
// unmatched rows) and on the left side of left semi (for matched-rows removal).
// For older versions, we do not apply the optimization as it is a behavioral change,
// although the optimization is valid for all versions.
val needToUpdateMatchedOnOtherSide = joinType match {
case Inner => false
case LeftOuter => joinSide == RightSide
case RightOuter => joinSide == LeftSide
case FullOuter => true
case LeftSemi => joinSide == RightSide
case _ => true
}
val skipUpdatingMatchedFlag = stateFormatVersion == 4 && !needToUpdateMatchedOnOtherSide

val generateOutputIter: (InternalRow, Iterator[JoinedRow]) => Iterator[InternalRow] =
joinSide match {
case LeftSide if joinType == LeftSemi =>
Expand Down Expand Up @@ -758,7 +773,8 @@ case class StreamingSymmetricHashJoinExec(
otherSideJoiner.joinStateManager.getJoinedRows(
key,
thatRow => generateJoinedRow(thisRow, thatRow),
postJoinFilter)
postJoinFilter,
skipUpdatingMatchedFlag)
}
val outputIter = generateOutputIter(thisRow, joinedRowIter)
new AddingProcessedRowToStateCompletionIterator(key, thisRow, outputIter)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,17 @@ trait SymmetricHashJoinStateManager {
* The matched flag will be updated to true for the values being returned, if it is semantically
* required to do so.
*
* For skipUpdatingMatchedFlag = true, the method will skip updating the matched flag for the
* values being returned. This is useful for the non-outer side of stream-stream join where
* the matched flag is never checked during state eviction.
*
* It is caller's responsibility to consume the whole iterator.
*/
def getJoinedRows(
key: UnsafeRow,
generateJoinedRow: InternalRow => JoinedRow,
predicate: JoinedRow => Boolean): Iterator[JoinedRow]
predicate: JoinedRow => Boolean,
skipUpdatingMatchedFlag: Boolean = false): Iterator[JoinedRow]

/**
* Retrieve all joined rows for the given key and remove the matched rows from state. The joined
Expand Down Expand Up @@ -343,7 +348,8 @@ class SymmetricHashJoinStateManagerV4(
override def getJoinedRows(
key: UnsafeRow,
generateJoinedRow: InternalRow => JoinedRow,
predicate: JoinedRow => Boolean): Iterator[JoinedRow] = {
predicate: JoinedRow => Boolean,
skipUpdatingMatchedFlag: Boolean = false): Iterator[JoinedRow] = {
// TODO: [SPARK-55147] We could improve this method to get the scope of timestamp and scan keys
// more efficiently. For now, we just get all values for the key.
def getJoinedRowsFromTsAndValues(
Expand All @@ -361,7 +367,7 @@ class SymmetricHashJoinStateManagerV4(

val joinedRow = generateJoinedRow(vmp.value)
if (predicate(joinedRow)) {
if (!vmp.matched) {
if (!skipUpdatingMatchedFlag && !vmp.matched) {
valuesAndMatched(currentIndex) = vmp.copy(matched = true)
shouldUpdateValuesIntoStateStore = true
}
Expand All @@ -383,7 +389,6 @@ class SymmetricHashJoinStateManagerV4(

override protected def close(): Unit = {
if (shouldUpdateValuesIntoStateStore) {
// Update back to the state store
val updatedValuesWithMatched = valuesAndMatched.map { vmp =>
(vmp.value, vmp.matched)
}.toSeq
Expand Down Expand Up @@ -1047,16 +1052,20 @@ abstract class SymmetricHashJoinStateManagerBase(
/**
* Get all the matched values for given join condition, with marking matched.
* This method is designed to mark joined rows properly without exposing internal index of row.
*
* @param skipUpdatingMatchedFlag If true, do not update the matched flag even when the row
* matches.
*/
def getJoinedRows(
key: UnsafeRow,
generateJoinedRow: InternalRow => JoinedRow,
predicate: JoinedRow => Boolean): Iterator[JoinedRow] = {
predicate: JoinedRow => Boolean,
skipUpdatingMatchedFlag: Boolean = false): Iterator[JoinedRow] = {
val numValues = keyToNumValues.get(key)
keyWithIndexToValue.getAll(key, numValues).map { keyIdxToValue =>
val joinedRow = generateJoinedRow(keyIdxToValue.value)
if (predicate(joinedRow)) {
if (!keyIdxToValue.matched) {
if (!skipUpdatingMatchedFlag && !keyIdxToValue.matched) {
keyWithIndexToValue.put(key, keyIdxToValue.valueIndex, keyIdxToValue.value,
matched = true)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -697,6 +697,50 @@ class SymmetricHashJoinStateManagerEventTimeInKeySuite
}
}
}

// V1 excluded: V1 converter does not persist matched flags (SPARK-26154)
versionsInTest.filter(_ >= 2).foreach { ver =>
test(s"StreamingJoinStateManager V$ver - skipUpdatingMatchedFlag skips matched flag update") {
withTempDir { checkpointDir =>
withJoinStateManagerWithCheckpointDir(
inputValueAttributes, joinKeyExpressions, stateFormatVersion = ver,
checkpointDir, storeVersion = 0, changelogCheckpoint = false) { manager =>
implicit val mgr = manager

append(20, 2)
append(20, 3)
append(30, 1)

val dummyRow = new GenericInternalRow(0)
val matched = manager.getJoinedRows(
toJoinKeyRow(20),
row => new JoinedRow(row, dummyRow),
jr => jr.getInt(1) == 2,
skipUpdatingMatchedFlag = true
).toSeq
assert(matched.size == 1)

mgr.commit()
}

withJoinStateManagerWithCheckpointDir(
inputValueAttributes, joinKeyExpressions, stateFormatVersion = ver,
checkpointDir, storeVersion = 1, changelogCheckpoint = false) { manager =>
implicit val mgr = manager

val evicted = removeAndReturnByKey(25)
val evictedPairs = evicted.map(p => (toValueInt(p.value), p.matched)).toSeq
val matchedByValue = evictedPairs.toMap
// Without skipUpdatingMatchedFlag = true, the value would be true.
assert(matchedByValue(2) === false)
assert(matchedByValue(3) === false)

mgr.commit()
}
}
}
}

}

class SymmetricHashJoinStateManagerEventTimeInValueSuite
Expand Down Expand Up @@ -1009,4 +1053,45 @@ class SymmetricHashJoinStateManagerEventTimeInValueSuite
}
}
}

// V1 excluded: V1 converter does not persist matched flags (SPARK-26154)
versionsInTest.filter(_ >= 2).foreach { ver =>
test(s"StreamingJoinStateManager V$ver - skipUpdatingMatchedFlag skips matched flag update") {
withTempDir { checkpointDir =>
withJoinStateManagerWithCheckpointDir(
inputValueAttributes, joinKeyExpressions, stateFormatVersion = ver,
checkpointDir, storeVersion = 0, changelogCheckpoint = false) { manager =>
implicit val mgr = manager

appendAndTest(40, 100, 200, 300)

val dummyRow = new GenericInternalRow(0)
val matched = manager.getJoinedRows(
toJoinKeyRow(40),
row => new JoinedRow(row, dummyRow),
jr => jr.getInt(1) == 100,
skipUpdatingMatchedFlag = true
).toSeq
assert(matched.size == 1)

mgr.commit()
}

withJoinStateManagerWithCheckpointDir(
inputValueAttributes, joinKeyExpressions, stateFormatVersion = ver,
checkpointDir, storeVersion = 1, changelogCheckpoint = false) { manager =>
implicit val mgr = manager

val evicted = removeAndReturnByValue(125)
val evictedPairs = evicted.map(p => (toValueInt(p.value), p.matched)).toSeq
val matchedByValue = evictedPairs.toMap
// Without skipUpdatingMatchedFlag = true, the value would be true.
assert(matchedByValue(100) === false)

mgr.commit()
}
}
}
}

}