From 6f6eb3f628dd3c4adaec2ef5cb0a6b052664b220 Mon Sep 17 00:00:00 2001 From: Anish Shrigondekar Date: Thu, 7 Apr 2022 05:51:57 +0900 Subject: [PATCH] [SPARK-38787][SS] Replace found value with non-null element in the remaining list for key and remove remaining null elements from values in keyWithIndexToValue store for stream-stream joins MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### What changes were proposed in this pull request? In stream-stream joins, for removing old state (watermark by value), we call the `removeByValue` function with a removal condition. Within the iterator returned, if we find null at the end for matched value at non-last index, we are currently not removing and swapping the matched value. With this change, we will find the first non-null value from end and swap current index with that value and remove all elements from index + 1 to the end and then drop the last element as before. ### Why are the changes needed? This change fixes a bug where we were not replacing found/matching values for `removeByValue` when encountering nulls in the symmetric hash join code. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added a unit test for this change with nulls added. Here is a sample output: ``` Executing tests from //sql/core:org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManagerSuite-hive-2.3__hadoop-3.2 ----------------------------------------------------------------------------- 2022-04-01 21:11:59,641 INFO CodeGenerator - Code generated in 225.884757 ms 2022-04-01 21:11:59,662 INFO CodeGenerator - Code generated in 10.870786 ms Run starting. Expected test count is: 4 … ===== TEST OUTPUT FOR o.a.s.sql.execution.streaming.state.SymmetricHashJoinStateManagerSuite: 'StreamingJoinStateManager V2 - all operations with nulls' ===== 2022-04-01 21:12:03,487 INFO StateStore - State Store maintenance task started 2022-04-01 21:12:03,508 INFO CheckpointFileManager - Writing atomically to /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyToNumValues/_metadata/schema using temp file /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyToNumValues/_metadata/.schema.9bcc39c9-721e-4ee0-b369-fb4f516c4fd6.tmp 2022-04-01 21:12:03,524 INFO CheckpointFileManager - Renamed temp file /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyToNumValues/_metadata/.schema.9bcc39c9-721e-4ee0-b369-fb4f516c4fd6.tmp to /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyToNumValues/_metadata/schema 2022-04-01 21:12:03,525 INFO StateStore - Retrieved reference to StateStoreCoordinator: org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef374ccb9 2022-04-01 21:12:03,525 INFO StateStore - Reported that the loaded instance StateStoreProviderId(StateStoreId(/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292,0,0,left-keyToNumValues),47925997-9891-4025-a36a-3e18bc758b50) is active 2022-04-01 21:12:03,525 INFO HDFSBackedStateStoreProvider - Retrieved version 0 of HDFSStateStoreProvider[id = (op=0,part=0),dir = /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyToNumValues] for update 2022-04-01 21:12:03,525 INFO SymmetricHashJoinStateManager$KeyToNumValuesStore - Loaded store StateStoreId(/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292,0,0,left-keyToNumValues) 2022-04-01 21:12:03,541 INFO CheckpointFileManager - Writing atomically to /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyWithIndexToValue/_metadata/schema using temp file /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyWithIndexToValue/_metadata/.schema.fcde2229-a4fa-409b-b3eb-751572f06c08.tmp 2022-04-01 21:12:03,556 INFO CheckpointFileManager - Renamed temp file /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyWithIndexToValue/_metadata/.schema.fcde2229-a4fa-409b-b3eb-751572f06c08.tmp to /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyWithIndexToValue/_metadata/schema 2022-04-01 21:12:03,558 INFO StateStore - Retrieved reference to StateStoreCoordinator: org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef1ea930eb 2022-04-01 21:12:03,559 INFO StateStore - Reported that the loaded instance StateStoreProviderId(StateStoreId(/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292,0,0,left-keyWithIndexToValue),47925997-9891-4025-a36a-3e18bc758b50) is active 2022-04-01 21:12:03,559 INFO HDFSBackedStateStoreProvider - Retrieved version 0 of HDFSStateStoreProvider[id = (op=0,part=0),dir = /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyWithIndexToValue] for update 2022-04-01 21:12:03,559 INFO SymmetricHashJoinStateManager$KeyWithIndexToValueStore - Loaded store StateStoreId(/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292,0,0,left-keyWithIndexToValue) 2022-04-01 21:12:03,564 INFO CheckpointFileManager - Writing atomically to /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyWithIndexToValue/1.delta using temp file /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyWithIndexToValue/.1.delta.86db3ac9-aa68-4a6b-9729-df93dc4b8a45.tmp 2022-04-01 21:12:03,568 INFO CheckpointFileManager - Writing atomically to /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyToNumValues/1.delta using temp file /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyToNumValues/.1.delta.9673bc1b-2bbe-412d-a0af-69f237cde31e.tmp 2022-04-01 21:12:03,572 WARN SymmetricHashJoinStateManager - `keyWithIndexToValue` returns a null value for index 4 at current key [false,40,10.0]. 2022-04-01 21:12:03,574 WARN SymmetricHashJoinStateManager - `keyWithIndexToValue` returns a null value for index 3 at current key [false,40,10.0]. 2022-04-01 21:12:03,576 WARN SymmetricHashJoinStateManager - `keyWithIndexToValue` returns a null value for index 3 at current key [false,60,10.0]. 2022-04-01 21:12:03,576 WARN SymmetricHashJoinStateManager - `keyWithIndexToValue` returns a null value for index 1 at current key [false,40,10.0]. 2022-04-01 21:12:03,577 INFO SymmetricHashJoinStateManager$KeyToNumValuesStore - Aborted store StateStoreId(/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292,0,0,left-keyToNumValues) 2022-04-01 21:12:03,577 INFO HDFSBackedStateStoreProvider - Aborted version 1 for HDFSStateStore[id=(op=0,part=0),dir=/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyToNumValues] 2022-04-01 21:12:03,577 INFO SymmetricHashJoinStateManager$KeyWithIndexToValueStore - Aborted store StateStoreId(/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292,0,0,left-keyWithIndexToValue) 2022-04-01 21:12:03,577 INFO HDFSBackedStateStoreProvider - Aborted version 1 for HDFSStateStore[id=(op=0,part=0),dir=/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyWithIndexToValue] 2022-04-01 21:12:03,580 INFO StateStore - StateStore stopped 2022-04-01 21:12:03,580 INFO SymmetricHashJoinStateManagerSuite - ===== FINISHED o.a.s.sql.execution.streaming.state.SymmetricHashJoinStateManagerSuite: 'StreamingJoinStateManager V2 - all operations with nulls' ===== … 2022-04-01 21:12:04,205 INFO StateStore - StateStore stopped Run completed in 5 seconds, 908 milliseconds. Total number of tests run: 4 Suites: completed 1, aborted 0 Tests: succeeded 4, failed 0, canceled 0, ignored 0, pending 0 All tests passed. 2022-04-01 21:12:04,605 INFO ShutdownHookManager - Shutdown hook called 2022-04-01 21:12:04,605 INFO ShutdownHookManager - Deleting directory /tmp/spark-37347802-bee5-4e7f-bffe-acb13eda1c5c 2022-04-01 21:12:04,608 INFO ShutdownHookManager - Deleting directory /tmp/spark-9e79a2e1-cec7-4fbf-804a-92e63913f516 ``` Closes #36073 from anishshri-db/bfix/SPARK-38787. Authored-by: Anish Shrigondekar Signed-off-by: Jungtaek Lim (cherry picked from commit 6d9bfb675f3e58c6e7d9facd8cf3f22069c4cc48) Signed-off-by: Jungtaek Lim --- .../state/SymmetricHashJoinStateManager.scala | 46 +++++++++++++- .../SymmetricHashJoinStateManagerSuite.scala | 60 +++++++++++++++++-- 2 files changed, 97 insertions(+), 9 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala index f301d233cb0a0..56c47d564a3b3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala @@ -256,6 +256,16 @@ class SymmetricHashJoinStateManager( return null } + /** + * Find the first non-null value index starting from end + * and going up-to stopIndex. + */ + private def getRightMostNonNullIndex(stopIndex: Long): Option[Long] = { + (numValues - 1 to stopIndex by -1).find { idx => + keyWithIndexToValue.get(currentKey, idx) != null + } + } + override def getNext(): KeyToValuePair = { val currentValue = findNextValueForIndex() @@ -272,12 +282,33 @@ class SymmetricHashJoinStateManager( if (index != numValues - 1) { val valuePairAtMaxIndex = keyWithIndexToValue.get(currentKey, numValues - 1) if (valuePairAtMaxIndex != null) { + // Likely case where last element is non-null and we can simply swap with index. keyWithIndexToValue.put(currentKey, index, valuePairAtMaxIndex.value, valuePairAtMaxIndex.matched) } else { - val projectedKey = getInternalRowOfKeyWithIndex(currentKey) - logWarning(s"`keyWithIndexToValue` returns a null value for index ${numValues - 1} " + - s"at current key $projectedKey.") + // Find the rightmost non null index and swap values with that index, + // if index returned is not the same as the passed one + val nonNullIndex = getRightMostNonNullIndex(index + 1).getOrElse(index) + if (nonNullIndex != index) { + val valuePair = keyWithIndexToValue.get(currentKey, nonNullIndex) + keyWithIndexToValue.put(currentKey, index, valuePair.value, + valuePair.matched) + } + + // If nulls were found at the end, log a warning for the range of null indices. + if (nonNullIndex != numValues - 1) { + logWarning(s"`keyWithIndexToValue` returns a null value for indices " + + s"with range from startIndex=${nonNullIndex + 1} " + + s"and endIndex=${numValues - 1}.") + } + + // Remove all null values from nonNullIndex + 1 onwards + // The nonNullIndex itself will be handled as removing the last entry, + // similar to finding the value as the last element + (numValues - 1 to nonNullIndex + 1 by -1).foreach { removeIndex => + keyWithIndexToValue.remove(currentKey, removeIndex) + numValues -= 1 + } } } keyWithIndexToValue.remove(currentKey, numValues - 1) @@ -324,6 +355,15 @@ class SymmetricHashJoinStateManager( ) } + /** + * Update number of values for a key. + * NOTE: this function is only intended for use in unit tests + * to simulate null values. + */ + private[state] def updateNumValuesTestOnly(key: UnsafeRow, numValues: Long): Unit = { + keyToNumValues.put(key, numValues) + } + /* ===================================================== Private methods and inner classes diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManagerSuite.scala index 8a03d46d00007..deeebe1fc42bf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManagerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManagerSuite.scala @@ -46,6 +46,12 @@ class SymmetricHashJoinStateManagerSuite extends StreamTest with BeforeAndAfter } } + SymmetricHashJoinStateManager.supportedVersions.foreach { version => + test(s"StreamingJoinStateManager V${version} - all operations with nulls") { + testAllOperationsWithNulls(version) + } + } + SymmetricHashJoinStateManager.supportedVersions.foreach { version => test(s"SPARK-35689: StreamingJoinStateManager V${version} - " + "printable key of keyWithIndexToValue") { @@ -68,7 +74,6 @@ class SymmetricHashJoinStateManagerSuite extends StreamTest with BeforeAndAfter } } - private def testAllOperations(stateFormatVersion: Int): Unit = { withJoinStateManager(inputValueAttribs, joinKeyExprs, stateFormatVersion) { manager => implicit val mgr = manager @@ -99,11 +104,6 @@ class SymmetricHashJoinStateManagerSuite extends StreamTest with BeforeAndAfter assert(get(30) === Seq.empty) // should remove 30 assert(numRows === 0) - def appendAndTest(key: Int, values: Int*): Unit = { - values.foreach { value => append(key, value)} - require(get(key) === values) - } - appendAndTest(40, 100, 200, 300) appendAndTest(50, 125) appendAndTest(60, 275) // prepare for testing removeByValue @@ -130,6 +130,43 @@ class SymmetricHashJoinStateManagerSuite extends StreamTest with BeforeAndAfter assert(numRows === 0) } } + + /* Test removeByValue with nulls simulated by updating numValues on the state manager */ + private def testAllOperationsWithNulls(stateFormatVersion: Int): Unit = { + withJoinStateManager(inputValueAttribs, joinKeyExprs, stateFormatVersion) { manager => + implicit val mgr = manager + + appendAndTest(40, 100, 200, 300) + appendAndTest(50, 125) + appendAndTest(60, 275) // prepare for testing removeByValue + assert(numRows === 5) + + updateNumValues(40, 5) // update total values to 5 to create 2 nulls + removeByValue(125) + assert(get(40) === Seq(200, 300)) + assert(get(50) === Seq.empty) + assert(get(60) === Seq(275)) // should remove only some values, not all and nulls + assert(numRows === 3) + + append(40, 50) + assert(get(40) === Seq(50, 200, 300)) + assert(numRows === 4) + updateNumValues(40, 4) // update total values to 4 to create 1 null + + removeByValue(200) + assert(get(40) === Seq(300)) + assert(get(60) === Seq(275)) // should remove only some values, not all and nulls + assert(numRows === 2) + updateNumValues(40, 2) // update total values to simulate nulls + updateNumValues(60, 4) + + removeByValue(300) + assert(get(40) === Seq.empty) + assert(get(60) === Seq.empty) // should remove all values now including nulls + assert(numRows === 0) + } + } + val watermarkMetadata = new MetadataBuilder().putLong(EventTimeWatermark.delayKey, 10).build() val inputValueSchema = new StructType() .add(StructField("time", IntegerType, metadata = watermarkMetadata)) @@ -157,6 +194,17 @@ class SymmetricHashJoinStateManagerSuite extends StreamTest with BeforeAndAfter manager.append(toJoinKeyRow(key), toInputValue(value), matched = false) } + def appendAndTest(key: Int, values: Int*) + (implicit manager: SymmetricHashJoinStateManager): Unit = { + values.foreach { value => append(key, value)} + require(get(key) === values) + } + + def updateNumValues(key: Int, numValues: Long) + (implicit manager: SymmetricHashJoinStateManager): Unit = { + manager.updateNumValuesTestOnly(toJoinKeyRow(key), numValues) + } + def get(key: Int)(implicit manager: SymmetricHashJoinStateManager): Seq[Int] = { manager.get(toJoinKeyRow(key)).map(toValueInt).toSeq.sorted }