Skip to content

Commit

Permalink
[SPARK-38787][SS] Replace found value with non-null element in the re…
Browse files Browse the repository at this point in the history
…maining list for key and remove remaining null elements from values in keyWithIndexToValue store for stream-stream joins

### What changes were proposed in this pull request?

In stream-stream joins, for removing old state (watermark by value), we call the `removeByValue` function with a removal condition. Within the iterator returned, if we find null at the end for matched value at non-last index, we are currently not removing and swapping the matched value. With this change, we will find the first non-null value from end and swap current index with that value and remove all elements from index + 1 to the end and then drop the last element as before.

### Why are the changes needed?

This change fixes a bug where we were not replacing found/matching values for `removeByValue` when encountering nulls in the symmetric hash join code.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Added a unit test for this change with nulls added. Here is a sample output:
```
Executing tests from //sql/core:org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManagerSuite-hive-2.3__hadoop-3.2
-----------------------------------------------------------------------------
2022-04-01 21:11:59,641 INFO  CodeGenerator - Code generated in 225.884757 ms
2022-04-01 21:11:59,662 INFO  CodeGenerator - Code generated in 10.870786 ms
Run starting. Expected test count is: 4
…
===== TEST OUTPUT FOR o.a.s.sql.execution.streaming.state.SymmetricHashJoinStateManagerSuite: 'StreamingJoinStateManager V2 - all operations with nulls' =====

2022-04-01 21:12:03,487 INFO  StateStore - State Store maintenance task started
2022-04-01 21:12:03,508 INFO  CheckpointFileManager - Writing atomically to /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyToNumValues/_metadata/schema using temp file /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyToNumValues/_metadata/.schema.9bcc39c9-721e-4ee0-b369-fb4f516c4fd6.tmp
2022-04-01 21:12:03,524 INFO  CheckpointFileManager - Renamed temp file /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyToNumValues/_metadata/.schema.9bcc39c9-721e-4ee0-b369-fb4f516c4fd6.tmp to /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyToNumValues/_metadata/schema
2022-04-01 21:12:03,525 INFO  StateStore - Retrieved reference to StateStoreCoordinator: org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef374ccb9
2022-04-01 21:12:03,525 INFO  StateStore - Reported that the loaded instance StateStoreProviderId(StateStoreId(/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292,0,0,left-keyToNumValues),47925997-9891-4025-a36a-3e18bc758b50) is active
2022-04-01 21:12:03,525 INFO  HDFSBackedStateStoreProvider - Retrieved version 0 of HDFSStateStoreProvider[id = (op=0,part=0),dir = /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyToNumValues] for update
2022-04-01 21:12:03,525 INFO  SymmetricHashJoinStateManager$KeyToNumValuesStore - Loaded store StateStoreId(/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292,0,0,left-keyToNumValues)
2022-04-01 21:12:03,541 INFO  CheckpointFileManager - Writing atomically to /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyWithIndexToValue/_metadata/schema using temp file /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyWithIndexToValue/_metadata/.schema.fcde2229-a4fa-409b-b3eb-751572f06c08.tmp
2022-04-01 21:12:03,556 INFO  CheckpointFileManager - Renamed temp file /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyWithIndexToValue/_metadata/.schema.fcde2229-a4fa-409b-b3eb-751572f06c08.tmp to /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyWithIndexToValue/_metadata/schema
2022-04-01 21:12:03,558 INFO  StateStore - Retrieved reference to StateStoreCoordinator: org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef1ea930eb
2022-04-01 21:12:03,559 INFO  StateStore - Reported that the loaded instance StateStoreProviderId(StateStoreId(/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292,0,0,left-keyWithIndexToValue),47925997-9891-4025-a36a-3e18bc758b50) is active
2022-04-01 21:12:03,559 INFO  HDFSBackedStateStoreProvider - Retrieved version 0 of HDFSStateStoreProvider[id = (op=0,part=0),dir = /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyWithIndexToValue] for update
2022-04-01 21:12:03,559 INFO  SymmetricHashJoinStateManager$KeyWithIndexToValueStore - Loaded store StateStoreId(/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292,0,0,left-keyWithIndexToValue)
2022-04-01 21:12:03,564 INFO  CheckpointFileManager - Writing atomically to /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyWithIndexToValue/1.delta using temp file /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyWithIndexToValue/.1.delta.86db3ac9-aa68-4a6b-9729-df93dc4b8a45.tmp
2022-04-01 21:12:03,568 INFO  CheckpointFileManager - Writing atomically to /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyToNumValues/1.delta using temp file /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyToNumValues/.1.delta.9673bc1b-2bbe-412d-a0af-69f237cde31e.tmp
2022-04-01 21:12:03,572 WARN  SymmetricHashJoinStateManager - `keyWithIndexToValue` returns a null value for index 4 at current key [false,40,10.0].
2022-04-01 21:12:03,574 WARN  SymmetricHashJoinStateManager - `keyWithIndexToValue` returns a null value for index 3 at current key [false,40,10.0].
2022-04-01 21:12:03,576 WARN  SymmetricHashJoinStateManager - `keyWithIndexToValue` returns a null value for index 3 at current key [false,60,10.0].
2022-04-01 21:12:03,576 WARN  SymmetricHashJoinStateManager - `keyWithIndexToValue` returns a null value for index 1 at current key [false,40,10.0].
2022-04-01 21:12:03,577 INFO  SymmetricHashJoinStateManager$KeyToNumValuesStore - Aborted store StateStoreId(/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292,0,0,left-keyToNumValues)
2022-04-01 21:12:03,577 INFO  HDFSBackedStateStoreProvider - Aborted version 1 for HDFSStateStore[id=(op=0,part=0),dir=/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyToNumValues]
2022-04-01 21:12:03,577 INFO  SymmetricHashJoinStateManager$KeyWithIndexToValueStore - Aborted store StateStoreId(/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292,0,0,left-keyWithIndexToValue)
2022-04-01 21:12:03,577 INFO  HDFSBackedStateStoreProvider - Aborted version 1 for HDFSStateStore[id=(op=0,part=0),dir=/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyWithIndexToValue]
2022-04-01 21:12:03,580 INFO  StateStore - StateStore stopped
2022-04-01 21:12:03,580 INFO  SymmetricHashJoinStateManagerSuite -

===== FINISHED o.a.s.sql.execution.streaming.state.SymmetricHashJoinStateManagerSuite: 'StreamingJoinStateManager V2 - all operations with nulls' =====
…
2022-04-01 21:12:04,205 INFO  StateStore - StateStore stopped
Run completed in 5 seconds, 908 milliseconds.
Total number of tests run: 4
Suites: completed 1, aborted 0
Tests: succeeded 4, failed 0, canceled 0, ignored 0, pending 0
All tests passed.
2022-04-01 21:12:04,605 INFO  ShutdownHookManager - Shutdown hook called
2022-04-01 21:12:04,605 INFO  ShutdownHookManager - Deleting directory /tmp/spark-37347802-bee5-4e7f-bffe-acb13eda1c5c
2022-04-01 21:12:04,608 INFO  ShutdownHookManager - Deleting directory /tmp/spark-9e79a2e1-cec7-4fbf-804a-92e63913f516
```

Closes #36073 from anishshri-db/bfix/SPARK-38787.

Authored-by: Anish Shrigondekar <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
(cherry picked from commit 6d9bfb6)
Signed-off-by: Jungtaek Lim <[email protected]>
  • Loading branch information
anishshri-db authored and HeartSaVioR committed Apr 6, 2022
1 parent 72a0562 commit 6f6eb3f
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,16 @@ class SymmetricHashJoinStateManager(
return null
}

/**
* Find the first non-null value index starting from end
* and going up-to stopIndex.
*/
private def getRightMostNonNullIndex(stopIndex: Long): Option[Long] = {
(numValues - 1 to stopIndex by -1).find { idx =>
keyWithIndexToValue.get(currentKey, idx) != null
}
}

override def getNext(): KeyToValuePair = {
val currentValue = findNextValueForIndex()

Expand All @@ -272,12 +282,33 @@ class SymmetricHashJoinStateManager(
if (index != numValues - 1) {
val valuePairAtMaxIndex = keyWithIndexToValue.get(currentKey, numValues - 1)
if (valuePairAtMaxIndex != null) {
// Likely case where last element is non-null and we can simply swap with index.
keyWithIndexToValue.put(currentKey, index, valuePairAtMaxIndex.value,
valuePairAtMaxIndex.matched)
} else {
val projectedKey = getInternalRowOfKeyWithIndex(currentKey)
logWarning(s"`keyWithIndexToValue` returns a null value for index ${numValues - 1} " +
s"at current key $projectedKey.")
// Find the rightmost non null index and swap values with that index,
// if index returned is not the same as the passed one
val nonNullIndex = getRightMostNonNullIndex(index + 1).getOrElse(index)
if (nonNullIndex != index) {
val valuePair = keyWithIndexToValue.get(currentKey, nonNullIndex)
keyWithIndexToValue.put(currentKey, index, valuePair.value,
valuePair.matched)
}

// If nulls were found at the end, log a warning for the range of null indices.
if (nonNullIndex != numValues - 1) {
logWarning(s"`keyWithIndexToValue` returns a null value for indices " +
s"with range from startIndex=${nonNullIndex + 1} " +
s"and endIndex=${numValues - 1}.")
}

// Remove all null values from nonNullIndex + 1 onwards
// The nonNullIndex itself will be handled as removing the last entry,
// similar to finding the value as the last element
(numValues - 1 to nonNullIndex + 1 by -1).foreach { removeIndex =>
keyWithIndexToValue.remove(currentKey, removeIndex)
numValues -= 1
}
}
}
keyWithIndexToValue.remove(currentKey, numValues - 1)
Expand Down Expand Up @@ -324,6 +355,15 @@ class SymmetricHashJoinStateManager(
)
}

/**
* Update number of values for a key.
* NOTE: this function is only intended for use in unit tests
* to simulate null values.
*/
private[state] def updateNumValuesTestOnly(key: UnsafeRow, numValues: Long): Unit = {
keyToNumValues.put(key, numValues)
}

/*
=====================================================
Private methods and inner classes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ class SymmetricHashJoinStateManagerSuite extends StreamTest with BeforeAndAfter
}
}

SymmetricHashJoinStateManager.supportedVersions.foreach { version =>
test(s"StreamingJoinStateManager V${version} - all operations with nulls") {
testAllOperationsWithNulls(version)
}
}

SymmetricHashJoinStateManager.supportedVersions.foreach { version =>
test(s"SPARK-35689: StreamingJoinStateManager V${version} - " +
"printable key of keyWithIndexToValue") {
Expand All @@ -68,7 +74,6 @@ class SymmetricHashJoinStateManagerSuite extends StreamTest with BeforeAndAfter
}
}


private def testAllOperations(stateFormatVersion: Int): Unit = {
withJoinStateManager(inputValueAttribs, joinKeyExprs, stateFormatVersion) { manager =>
implicit val mgr = manager
Expand Down Expand Up @@ -99,11 +104,6 @@ class SymmetricHashJoinStateManagerSuite extends StreamTest with BeforeAndAfter
assert(get(30) === Seq.empty) // should remove 30
assert(numRows === 0)

def appendAndTest(key: Int, values: Int*): Unit = {
values.foreach { value => append(key, value)}
require(get(key) === values)
}

appendAndTest(40, 100, 200, 300)
appendAndTest(50, 125)
appendAndTest(60, 275) // prepare for testing removeByValue
Expand All @@ -130,6 +130,43 @@ class SymmetricHashJoinStateManagerSuite extends StreamTest with BeforeAndAfter
assert(numRows === 0)
}
}

/* Test removeByValue with nulls simulated by updating numValues on the state manager */
private def testAllOperationsWithNulls(stateFormatVersion: Int): Unit = {
withJoinStateManager(inputValueAttribs, joinKeyExprs, stateFormatVersion) { manager =>
implicit val mgr = manager

appendAndTest(40, 100, 200, 300)
appendAndTest(50, 125)
appendAndTest(60, 275) // prepare for testing removeByValue
assert(numRows === 5)

updateNumValues(40, 5) // update total values to 5 to create 2 nulls
removeByValue(125)
assert(get(40) === Seq(200, 300))
assert(get(50) === Seq.empty)
assert(get(60) === Seq(275)) // should remove only some values, not all and nulls
assert(numRows === 3)

append(40, 50)
assert(get(40) === Seq(50, 200, 300))
assert(numRows === 4)
updateNumValues(40, 4) // update total values to 4 to create 1 null

removeByValue(200)
assert(get(40) === Seq(300))
assert(get(60) === Seq(275)) // should remove only some values, not all and nulls
assert(numRows === 2)
updateNumValues(40, 2) // update total values to simulate nulls
updateNumValues(60, 4)

removeByValue(300)
assert(get(40) === Seq.empty)
assert(get(60) === Seq.empty) // should remove all values now including nulls
assert(numRows === 0)
}
}

val watermarkMetadata = new MetadataBuilder().putLong(EventTimeWatermark.delayKey, 10).build()
val inputValueSchema = new StructType()
.add(StructField("time", IntegerType, metadata = watermarkMetadata))
Expand Down Expand Up @@ -157,6 +194,17 @@ class SymmetricHashJoinStateManagerSuite extends StreamTest with BeforeAndAfter
manager.append(toJoinKeyRow(key), toInputValue(value), matched = false)
}

def appendAndTest(key: Int, values: Int*)
(implicit manager: SymmetricHashJoinStateManager): Unit = {
values.foreach { value => append(key, value)}
require(get(key) === values)
}

def updateNumValues(key: Int, numValues: Long)
(implicit manager: SymmetricHashJoinStateManager): Unit = {
manager.updateNumValuesTestOnly(toJoinKeyRow(key), numValues)
}

def get(key: Int)(implicit manager: SymmetricHashJoinStateManager): Seq[Int] = {
manager.get(toJoinKeyRow(key)).map(toValueInt).toSeq.sorted
}
Expand Down

0 comments on commit 6f6eb3f

Please sign in to comment.