-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-38787] [SS] Replace found value with non-null element in the remaining list for key and remove remaining null elements from values in keyWithIndexToValue store for stream-stream joins #36073
Conversation
…emaining list for key and remove remaining null elements from values in keyWithIndexToValue store If we find null at end for found value at non-last index, we are currently not removing and swapping the found value. With this change, we will find the first non-null value from end and swap current index with that value and remove all elements from null index + 1 to the end and then drop the last element as before.
@HeartSaVioR - Could you please review ? Thx |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably clarify in the PR comment that this relates to stream-stream joins.
...ain/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
Outdated
Show resolved
Hide resolved
...ain/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
Outdated
Show resolved
Hide resolved
...ain/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
Outdated
Show resolved
Hide resolved
...ain/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
Outdated
Show resolved
Hide resolved
Done - Updated the PR title as well as description noting that the change relates to stream-stream joins |
Can one of the admins verify this patch? |
...ain/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 except a nit. Once the nit is addressed I'm going to merge this.
…maining list for key and remove remaining null elements from values in keyWithIndexToValue store for stream-stream joins ### What changes were proposed in this pull request? In stream-stream joins, for removing old state (watermark by value), we call the `removeByValue` function with a removal condition. Within the iterator returned, if we find null at the end for matched value at non-last index, we are currently not removing and swapping the matched value. With this change, we will find the first non-null value from end and swap current index with that value and remove all elements from index + 1 to the end and then drop the last element as before. ### Why are the changes needed? This change fixes a bug where we were not replacing found/matching values for `removeByValue` when encountering nulls in the symmetric hash join code. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added a unit test for this change with nulls added. Here is a sample output: ``` Executing tests from //sql/core:org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManagerSuite-hive-2.3__hadoop-3.2 ----------------------------------------------------------------------------- 2022-04-01 21:11:59,641 INFO CodeGenerator - Code generated in 225.884757 ms 2022-04-01 21:11:59,662 INFO CodeGenerator - Code generated in 10.870786 ms Run starting. Expected test count is: 4 … ===== TEST OUTPUT FOR o.a.s.sql.execution.streaming.state.SymmetricHashJoinStateManagerSuite: 'StreamingJoinStateManager V2 - all operations with nulls' ===== 2022-04-01 21:12:03,487 INFO StateStore - State Store maintenance task started 2022-04-01 21:12:03,508 INFO CheckpointFileManager - Writing atomically to /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyToNumValues/_metadata/schema using temp file /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyToNumValues/_metadata/.schema.9bcc39c9-721e-4ee0-b369-fb4f516c4fd6.tmp 2022-04-01 21:12:03,524 INFO CheckpointFileManager - Renamed temp file /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyToNumValues/_metadata/.schema.9bcc39c9-721e-4ee0-b369-fb4f516c4fd6.tmp to /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyToNumValues/_metadata/schema 2022-04-01 21:12:03,525 INFO StateStore - Retrieved reference to StateStoreCoordinator: org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef374ccb9 2022-04-01 21:12:03,525 INFO StateStore - Reported that the loaded instance StateStoreProviderId(StateStoreId(/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292,0,0,left-keyToNumValues),47925997-9891-4025-a36a-3e18bc758b50) is active 2022-04-01 21:12:03,525 INFO HDFSBackedStateStoreProvider - Retrieved version 0 of HDFSStateStoreProvider[id = (op=0,part=0),dir = /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyToNumValues] for update 2022-04-01 21:12:03,525 INFO SymmetricHashJoinStateManager$KeyToNumValuesStore - Loaded store StateStoreId(/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292,0,0,left-keyToNumValues) 2022-04-01 21:12:03,541 INFO CheckpointFileManager - Writing atomically to /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyWithIndexToValue/_metadata/schema using temp file /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyWithIndexToValue/_metadata/.schema.fcde2229-a4fa-409b-b3eb-751572f06c08.tmp 2022-04-01 21:12:03,556 INFO CheckpointFileManager - Renamed temp file /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyWithIndexToValue/_metadata/.schema.fcde2229-a4fa-409b-b3eb-751572f06c08.tmp to /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyWithIndexToValue/_metadata/schema 2022-04-01 21:12:03,558 INFO StateStore - Retrieved reference to StateStoreCoordinator: org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef1ea930eb 2022-04-01 21:12:03,559 INFO StateStore - Reported that the loaded instance StateStoreProviderId(StateStoreId(/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292,0,0,left-keyWithIndexToValue),47925997-9891-4025-a36a-3e18bc758b50) is active 2022-04-01 21:12:03,559 INFO HDFSBackedStateStoreProvider - Retrieved version 0 of HDFSStateStoreProvider[id = (op=0,part=0),dir = /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyWithIndexToValue] for update 2022-04-01 21:12:03,559 INFO SymmetricHashJoinStateManager$KeyWithIndexToValueStore - Loaded store StateStoreId(/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292,0,0,left-keyWithIndexToValue) 2022-04-01 21:12:03,564 INFO CheckpointFileManager - Writing atomically to /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyWithIndexToValue/1.delta using temp file /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyWithIndexToValue/.1.delta.86db3ac9-aa68-4a6b-9729-df93dc4b8a45.tmp 2022-04-01 21:12:03,568 INFO CheckpointFileManager - Writing atomically to /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyToNumValues/1.delta using temp file /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyToNumValues/.1.delta.9673bc1b-2bbe-412d-a0af-69f237cde31e.tmp 2022-04-01 21:12:03,572 WARN SymmetricHashJoinStateManager - `keyWithIndexToValue` returns a null value for index 4 at current key [false,40,10.0]. 2022-04-01 21:12:03,574 WARN SymmetricHashJoinStateManager - `keyWithIndexToValue` returns a null value for index 3 at current key [false,40,10.0]. 2022-04-01 21:12:03,576 WARN SymmetricHashJoinStateManager - `keyWithIndexToValue` returns a null value for index 3 at current key [false,60,10.0]. 2022-04-01 21:12:03,576 WARN SymmetricHashJoinStateManager - `keyWithIndexToValue` returns a null value for index 1 at current key [false,40,10.0]. 2022-04-01 21:12:03,577 INFO SymmetricHashJoinStateManager$KeyToNumValuesStore - Aborted store StateStoreId(/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292,0,0,left-keyToNumValues) 2022-04-01 21:12:03,577 INFO HDFSBackedStateStoreProvider - Aborted version 1 for HDFSStateStore[id=(op=0,part=0),dir=/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyToNumValues] 2022-04-01 21:12:03,577 INFO SymmetricHashJoinStateManager$KeyWithIndexToValueStore - Aborted store StateStoreId(/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292,0,0,left-keyWithIndexToValue) 2022-04-01 21:12:03,577 INFO HDFSBackedStateStoreProvider - Aborted version 1 for HDFSStateStore[id=(op=0,part=0),dir=/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyWithIndexToValue] 2022-04-01 21:12:03,580 INFO StateStore - StateStore stopped 2022-04-01 21:12:03,580 INFO SymmetricHashJoinStateManagerSuite - ===== FINISHED o.a.s.sql.execution.streaming.state.SymmetricHashJoinStateManagerSuite: 'StreamingJoinStateManager V2 - all operations with nulls' ===== … 2022-04-01 21:12:04,205 INFO StateStore - StateStore stopped Run completed in 5 seconds, 908 milliseconds. Total number of tests run: 4 Suites: completed 1, aborted 0 Tests: succeeded 4, failed 0, canceled 0, ignored 0, pending 0 All tests passed. 2022-04-01 21:12:04,605 INFO ShutdownHookManager - Shutdown hook called 2022-04-01 21:12:04,605 INFO ShutdownHookManager - Deleting directory /tmp/spark-37347802-bee5-4e7f-bffe-acb13eda1c5c 2022-04-01 21:12:04,608 INFO ShutdownHookManager - Deleting directory /tmp/spark-9e79a2e1-cec7-4fbf-804a-92e63913f516 ``` Closes #36073 from anishshri-db/bfix/SPARK-38787. Authored-by: Anish Shrigondekar <[email protected]> Signed-off-by: Jungtaek Lim <[email protected]> (cherry picked from commit 6d9bfb6) Signed-off-by: Jungtaek Lim <[email protected]>
…maining list for key and remove remaining null elements from values in keyWithIndexToValue store for stream-stream joins ### What changes were proposed in this pull request? In stream-stream joins, for removing old state (watermark by value), we call the `removeByValue` function with a removal condition. Within the iterator returned, if we find null at the end for matched value at non-last index, we are currently not removing and swapping the matched value. With this change, we will find the first non-null value from end and swap current index with that value and remove all elements from index + 1 to the end and then drop the last element as before. ### Why are the changes needed? This change fixes a bug where we were not replacing found/matching values for `removeByValue` when encountering nulls in the symmetric hash join code. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added a unit test for this change with nulls added. Here is a sample output: ``` Executing tests from //sql/core:org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManagerSuite-hive-2.3__hadoop-3.2 ----------------------------------------------------------------------------- 2022-04-01 21:11:59,641 INFO CodeGenerator - Code generated in 225.884757 ms 2022-04-01 21:11:59,662 INFO CodeGenerator - Code generated in 10.870786 ms Run starting. Expected test count is: 4 … ===== TEST OUTPUT FOR o.a.s.sql.execution.streaming.state.SymmetricHashJoinStateManagerSuite: 'StreamingJoinStateManager V2 - all operations with nulls' ===== 2022-04-01 21:12:03,487 INFO StateStore - State Store maintenance task started 2022-04-01 21:12:03,508 INFO CheckpointFileManager - Writing atomically to /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyToNumValues/_metadata/schema using temp file /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyToNumValues/_metadata/.schema.9bcc39c9-721e-4ee0-b369-fb4f516c4fd6.tmp 2022-04-01 21:12:03,524 INFO CheckpointFileManager - Renamed temp file /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyToNumValues/_metadata/.schema.9bcc39c9-721e-4ee0-b369-fb4f516c4fd6.tmp to /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyToNumValues/_metadata/schema 2022-04-01 21:12:03,525 INFO StateStore - Retrieved reference to StateStoreCoordinator: org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef374ccb9 2022-04-01 21:12:03,525 INFO StateStore - Reported that the loaded instance StateStoreProviderId(StateStoreId(/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292,0,0,left-keyToNumValues),47925997-9891-4025-a36a-3e18bc758b50) is active 2022-04-01 21:12:03,525 INFO HDFSBackedStateStoreProvider - Retrieved version 0 of HDFSStateStoreProvider[id = (op=0,part=0),dir = /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyToNumValues] for update 2022-04-01 21:12:03,525 INFO SymmetricHashJoinStateManager$KeyToNumValuesStore - Loaded store StateStoreId(/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292,0,0,left-keyToNumValues) 2022-04-01 21:12:03,541 INFO CheckpointFileManager - Writing atomically to /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyWithIndexToValue/_metadata/schema using temp file /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyWithIndexToValue/_metadata/.schema.fcde2229-a4fa-409b-b3eb-751572f06c08.tmp 2022-04-01 21:12:03,556 INFO CheckpointFileManager - Renamed temp file /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyWithIndexToValue/_metadata/.schema.fcde2229-a4fa-409b-b3eb-751572f06c08.tmp to /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyWithIndexToValue/_metadata/schema 2022-04-01 21:12:03,558 INFO StateStore - Retrieved reference to StateStoreCoordinator: org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef1ea930eb 2022-04-01 21:12:03,559 INFO StateStore - Reported that the loaded instance StateStoreProviderId(StateStoreId(/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292,0,0,left-keyWithIndexToValue),47925997-9891-4025-a36a-3e18bc758b50) is active 2022-04-01 21:12:03,559 INFO HDFSBackedStateStoreProvider - Retrieved version 0 of HDFSStateStoreProvider[id = (op=0,part=0),dir = /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyWithIndexToValue] for update 2022-04-01 21:12:03,559 INFO SymmetricHashJoinStateManager$KeyWithIndexToValueStore - Loaded store StateStoreId(/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292,0,0,left-keyWithIndexToValue) 2022-04-01 21:12:03,564 INFO CheckpointFileManager - Writing atomically to /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyWithIndexToValue/1.delta using temp file /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyWithIndexToValue/.1.delta.86db3ac9-aa68-4a6b-9729-df93dc4b8a45.tmp 2022-04-01 21:12:03,568 INFO CheckpointFileManager - Writing atomically to /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyToNumValues/1.delta using temp file /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyToNumValues/.1.delta.9673bc1b-2bbe-412d-a0af-69f237cde31e.tmp 2022-04-01 21:12:03,572 WARN SymmetricHashJoinStateManager - `keyWithIndexToValue` returns a null value for index 4 at current key [false,40,10.0]. 2022-04-01 21:12:03,574 WARN SymmetricHashJoinStateManager - `keyWithIndexToValue` returns a null value for index 3 at current key [false,40,10.0]. 2022-04-01 21:12:03,576 WARN SymmetricHashJoinStateManager - `keyWithIndexToValue` returns a null value for index 3 at current key [false,60,10.0]. 2022-04-01 21:12:03,576 WARN SymmetricHashJoinStateManager - `keyWithIndexToValue` returns a null value for index 1 at current key [false,40,10.0]. 2022-04-01 21:12:03,577 INFO SymmetricHashJoinStateManager$KeyToNumValuesStore - Aborted store StateStoreId(/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292,0,0,left-keyToNumValues) 2022-04-01 21:12:03,577 INFO HDFSBackedStateStoreProvider - Aborted version 1 for HDFSStateStore[id=(op=0,part=0),dir=/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyToNumValues] 2022-04-01 21:12:03,577 INFO SymmetricHashJoinStateManager$KeyWithIndexToValueStore - Aborted store StateStoreId(/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292,0,0,left-keyWithIndexToValue) 2022-04-01 21:12:03,577 INFO HDFSBackedStateStoreProvider - Aborted version 1 for HDFSStateStore[id=(op=0,part=0),dir=/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyWithIndexToValue] 2022-04-01 21:12:03,580 INFO StateStore - StateStore stopped 2022-04-01 21:12:03,580 INFO SymmetricHashJoinStateManagerSuite - ===== FINISHED o.a.s.sql.execution.streaming.state.SymmetricHashJoinStateManagerSuite: 'StreamingJoinStateManager V2 - all operations with nulls' ===== … 2022-04-01 21:12:04,205 INFO StateStore - StateStore stopped Run completed in 5 seconds, 908 milliseconds. Total number of tests run: 4 Suites: completed 1, aborted 0 Tests: succeeded 4, failed 0, canceled 0, ignored 0, pending 0 All tests passed. 2022-04-01 21:12:04,605 INFO ShutdownHookManager - Shutdown hook called 2022-04-01 21:12:04,605 INFO ShutdownHookManager - Deleting directory /tmp/spark-37347802-bee5-4e7f-bffe-acb13eda1c5c 2022-04-01 21:12:04,608 INFO ShutdownHookManager - Deleting directory /tmp/spark-9e79a2e1-cec7-4fbf-804a-92e63913f516 ``` Closes #36073 from anishshri-db/bfix/SPARK-38787. Authored-by: Anish Shrigondekar <[email protected]> Signed-off-by: Jungtaek Lim <[email protected]> (cherry picked from commit 6d9bfb6) Signed-off-by: Jungtaek Lim <[email protected]>
cc. @MaxGekk This got into Spark 3.3, since it's a part of correctness bug fix. Sorry I missed to mention the release manager. |
…maining list for key and remove remaining null elements from values in keyWithIndexToValue store for stream-stream joins ### What changes were proposed in this pull request? In stream-stream joins, for removing old state (watermark by value), we call the `removeByValue` function with a removal condition. Within the iterator returned, if we find null at the end for matched value at non-last index, we are currently not removing and swapping the matched value. With this change, we will find the first non-null value from end and swap current index with that value and remove all elements from index + 1 to the end and then drop the last element as before. ### Why are the changes needed? This change fixes a bug where we were not replacing found/matching values for `removeByValue` when encountering nulls in the symmetric hash join code. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added a unit test for this change with nulls added. Here is a sample output: ``` Executing tests from //sql/core:org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManagerSuite-hive-2.3__hadoop-3.2 ----------------------------------------------------------------------------- 2022-04-01 21:11:59,641 INFO CodeGenerator - Code generated in 225.884757 ms 2022-04-01 21:11:59,662 INFO CodeGenerator - Code generated in 10.870786 ms Run starting. Expected test count is: 4 … ===== TEST OUTPUT FOR o.a.s.sql.execution.streaming.state.SymmetricHashJoinStateManagerSuite: 'StreamingJoinStateManager V2 - all operations with nulls' ===== 2022-04-01 21:12:03,487 INFO StateStore - State Store maintenance task started 2022-04-01 21:12:03,508 INFO CheckpointFileManager - Writing atomically to /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyToNumValues/_metadata/schema using temp file /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyToNumValues/_metadata/.schema.9bcc39c9-721e-4ee0-b369-fb4f516c4fd6.tmp 2022-04-01 21:12:03,524 INFO CheckpointFileManager - Renamed temp file /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyToNumValues/_metadata/.schema.9bcc39c9-721e-4ee0-b369-fb4f516c4fd6.tmp to /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyToNumValues/_metadata/schema 2022-04-01 21:12:03,525 INFO StateStore - Retrieved reference to StateStoreCoordinator: org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef374ccb9 2022-04-01 21:12:03,525 INFO StateStore - Reported that the loaded instance StateStoreProviderId(StateStoreId(/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292,0,0,left-keyToNumValues),47925997-9891-4025-a36a-3e18bc758b50) is active 2022-04-01 21:12:03,525 INFO HDFSBackedStateStoreProvider - Retrieved version 0 of HDFSStateStoreProvider[id = (op=0,part=0),dir = /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyToNumValues] for update 2022-04-01 21:12:03,525 INFO SymmetricHashJoinStateManager$KeyToNumValuesStore - Loaded store StateStoreId(/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292,0,0,left-keyToNumValues) 2022-04-01 21:12:03,541 INFO CheckpointFileManager - Writing atomically to /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyWithIndexToValue/_metadata/schema using temp file /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyWithIndexToValue/_metadata/.schema.fcde2229-a4fa-409b-b3eb-751572f06c08.tmp 2022-04-01 21:12:03,556 INFO CheckpointFileManager - Renamed temp file /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyWithIndexToValue/_metadata/.schema.fcde2229-a4fa-409b-b3eb-751572f06c08.tmp to /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyWithIndexToValue/_metadata/schema 2022-04-01 21:12:03,558 INFO StateStore - Retrieved reference to StateStoreCoordinator: org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef1ea930eb 2022-04-01 21:12:03,559 INFO StateStore - Reported that the loaded instance StateStoreProviderId(StateStoreId(/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292,0,0,left-keyWithIndexToValue),47925997-9891-4025-a36a-3e18bc758b50) is active 2022-04-01 21:12:03,559 INFO HDFSBackedStateStoreProvider - Retrieved version 0 of HDFSStateStoreProvider[id = (op=0,part=0),dir = /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyWithIndexToValue] for update 2022-04-01 21:12:03,559 INFO SymmetricHashJoinStateManager$KeyWithIndexToValueStore - Loaded store StateStoreId(/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292,0,0,left-keyWithIndexToValue) 2022-04-01 21:12:03,564 INFO CheckpointFileManager - Writing atomically to /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyWithIndexToValue/1.delta using temp file /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyWithIndexToValue/.1.delta.86db3ac9-aa68-4a6b-9729-df93dc4b8a45.tmp 2022-04-01 21:12:03,568 INFO CheckpointFileManager - Writing atomically to /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyToNumValues/1.delta using temp file /tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyToNumValues/.1.delta.9673bc1b-2bbe-412d-a0af-69f237cde31e.tmp 2022-04-01 21:12:03,572 WARN SymmetricHashJoinStateManager - `keyWithIndexToValue` returns a null value for index 4 at current key [false,40,10.0]. 2022-04-01 21:12:03,574 WARN SymmetricHashJoinStateManager - `keyWithIndexToValue` returns a null value for index 3 at current key [false,40,10.0]. 2022-04-01 21:12:03,576 WARN SymmetricHashJoinStateManager - `keyWithIndexToValue` returns a null value for index 3 at current key [false,60,10.0]. 2022-04-01 21:12:03,576 WARN SymmetricHashJoinStateManager - `keyWithIndexToValue` returns a null value for index 1 at current key [false,40,10.0]. 2022-04-01 21:12:03,577 INFO SymmetricHashJoinStateManager$KeyToNumValuesStore - Aborted store StateStoreId(/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292,0,0,left-keyToNumValues) 2022-04-01 21:12:03,577 INFO HDFSBackedStateStoreProvider - Aborted version 1 for HDFSStateStore[id=(op=0,part=0),dir=/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyToNumValues] 2022-04-01 21:12:03,577 INFO SymmetricHashJoinStateManager$KeyWithIndexToValueStore - Aborted store StateStoreId(/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292,0,0,left-keyWithIndexToValue) 2022-04-01 21:12:03,577 INFO HDFSBackedStateStoreProvider - Aborted version 1 for HDFSStateStore[id=(op=0,part=0),dir=/tmp/spark-d94b9f11-e04c-4871-aeea-1d0b5c62e292/0/0/left-keyWithIndexToValue] 2022-04-01 21:12:03,580 INFO StateStore - StateStore stopped 2022-04-01 21:12:03,580 INFO SymmetricHashJoinStateManagerSuite - ===== FINISHED o.a.s.sql.execution.streaming.state.SymmetricHashJoinStateManagerSuite: 'StreamingJoinStateManager V2 - all operations with nulls' ===== … 2022-04-01 21:12:04,205 INFO StateStore - StateStore stopped Run completed in 5 seconds, 908 milliseconds. Total number of tests run: 4 Suites: completed 1, aborted 0 Tests: succeeded 4, failed 0, canceled 0, ignored 0, pending 0 All tests passed. 2022-04-01 21:12:04,605 INFO ShutdownHookManager - Shutdown hook called 2022-04-01 21:12:04,605 INFO ShutdownHookManager - Deleting directory /tmp/spark-37347802-bee5-4e7f-bffe-acb13eda1c5c 2022-04-01 21:12:04,608 INFO ShutdownHookManager - Deleting directory /tmp/spark-9e79a2e1-cec7-4fbf-804a-92e63913f516 ``` Closes apache#36073 from anishshri-db/bfix/SPARK-38787. Authored-by: Anish Shrigondekar <[email protected]> Signed-off-by: Jungtaek Lim <[email protected]> (cherry picked from commit 6d9bfb6) Signed-off-by: Jungtaek Lim <[email protected]> (cherry picked from commit 6f6eb3f) Signed-off-by: Dongjoon Hyun <[email protected]>
What changes were proposed in this pull request?
In stream-stream joins, for removing old state (watermark by value), we call the
removeByValue
function with a removal condition. Within the iterator returned, if we find null at the end for matched value at non-last index, we are currently not removing and swapping the matched value. With this change, we will find the first non-null value from end and swap current index with that value and remove all elements from index + 1 to the end and then drop the last element as before.Why are the changes needed?
This change fixes a bug where we were not replacing found/matching values for
removeByValue
when encountering nulls in the symmetric hash join code.Does this PR introduce any user-facing change?
No
How was this patch tested?
Added a unit test for this change with nulls added. Here is a sample output: