Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integration test that tests driver failure with receivers in an end-to-end manner #25

Open
wants to merge 2 commits into
base: driver-ha-rbt
Choose a base branch
from

Conversation

tdas
Copy link
Owner

@tdas tdas commented Oct 31, 2014

No description provided.

asfgit pushed a commit to apache/spark that referenced this pull request Nov 5, 2014
…recover received block metadata on driver failures

As part of the initiative of preventing data loss on driver failure, this JIRA tracks the sub task of modifying the streaming driver to reliably save received block metadata, and recover them on driver restart.

This was solved by introducing a `ReceivedBlockTracker` that takes all the responsibility of managing the metadata of received blocks (i.e. `ReceivedBlockInfo`, and any actions on them (e.g, allocating blocks to batches, etc.). All actions to block info get written out to a write ahead log (using `WriteAheadLogManager`). On recovery, all the actions are replaying to recreate the pre-failure state of the `ReceivedBlockTracker`, which include the batch-to-block allocations and the unallocated blocks.

Furthermore, the `ReceiverInputDStream` was modified to create `WriteAheadLogBackedBlockRDD`s when file segment info is present in the `ReceivedBlockInfo`. After recovery of all the block info (through recovery `ReceivedBlockTracker`), the `WriteAheadLogBackedBlockRDD`s gets recreated with the recovered info, and jobs submitted. The data of the blocks gets pulled from the write ahead logs, thanks to the segment info present in the `ReceivedBlockInfo`.

This is still a WIP. Things that are missing here are.

- *End-to-end integration tests:* Unit tests that tests the driver recovery, by killing and restarting the streaming context, and verifying all the input data gets processed. This has been implemented but not included in this PR yet. A sneak peek of that DriverFailureSuite can be found in this PR (on my personal repo): tdas#25 I can either include it in this PR, or submit that as a separate PR after this gets in.

- *WAL cleanup:* Cleaning up the received data write ahead log, by calling `ReceivedBlockHandler.cleanupOldBlocks`. This is being worked on.

Author: Tathagata Das <[email protected]>

Closes #3026 from tdas/driver-ha-rbt and squashes the following commits:

a8009ed [Tathagata Das] Added comment
1d704bb [Tathagata Das] Enabled storing recovered WAL-backed blocks to BM
2ee2484 [Tathagata Das] More minor changes based on PR
47fc1e3 [Tathagata Das] Addressed PR comments.
9a7e3e4 [Tathagata Das] Refactored ReceivedBlockTracker API a bit to make things a little cleaner for users of the tracker.
af63655 [Tathagata Das] Minor changes.
fce2b21 [Tathagata Das] Removed commented lines
59496d3 [Tathagata Das] Changed class names, made allocation more explicit and added cleanup
19aec7d [Tathagata Das] Fixed casting bug.
f66d277 [Tathagata Das] Fix line lengths.
cda62ee [Tathagata Das] Added license
25611d6 [Tathagata Das] Minor changes before submitting PR
7ae0a7f [Tathagata Das] Transferred changes from driver-ha-working branch
asfgit pushed a commit to apache/spark that referenced this pull request Nov 5, 2014
…recover received block metadata on driver failures

As part of the initiative of preventing data loss on driver failure, this JIRA tracks the sub task of modifying the streaming driver to reliably save received block metadata, and recover them on driver restart.

This was solved by introducing a `ReceivedBlockTracker` that takes all the responsibility of managing the metadata of received blocks (i.e. `ReceivedBlockInfo`, and any actions on them (e.g, allocating blocks to batches, etc.). All actions to block info get written out to a write ahead log (using `WriteAheadLogManager`). On recovery, all the actions are replaying to recreate the pre-failure state of the `ReceivedBlockTracker`, which include the batch-to-block allocations and the unallocated blocks.

Furthermore, the `ReceiverInputDStream` was modified to create `WriteAheadLogBackedBlockRDD`s when file segment info is present in the `ReceivedBlockInfo`. After recovery of all the block info (through recovery `ReceivedBlockTracker`), the `WriteAheadLogBackedBlockRDD`s gets recreated with the recovered info, and jobs submitted. The data of the blocks gets pulled from the write ahead logs, thanks to the segment info present in the `ReceivedBlockInfo`.

This is still a WIP. Things that are missing here are.

- *End-to-end integration tests:* Unit tests that tests the driver recovery, by killing and restarting the streaming context, and verifying all the input data gets processed. This has been implemented but not included in this PR yet. A sneak peek of that DriverFailureSuite can be found in this PR (on my personal repo): tdas#25 I can either include it in this PR, or submit that as a separate PR after this gets in.

- *WAL cleanup:* Cleaning up the received data write ahead log, by calling `ReceivedBlockHandler.cleanupOldBlocks`. This is being worked on.

Author: Tathagata Das <[email protected]>

Closes #3026 from tdas/driver-ha-rbt and squashes the following commits:

a8009ed [Tathagata Das] Added comment
1d704bb [Tathagata Das] Enabled storing recovered WAL-backed blocks to BM
2ee2484 [Tathagata Das] More minor changes based on PR
47fc1e3 [Tathagata Das] Addressed PR comments.
9a7e3e4 [Tathagata Das] Refactored ReceivedBlockTracker API a bit to make things a little cleaner for users of the tracker.
af63655 [Tathagata Das] Minor changes.
fce2b21 [Tathagata Das] Removed commented lines
59496d3 [Tathagata Das] Changed class names, made allocation more explicit and added cleanup
19aec7d [Tathagata Das] Fixed casting bug.
f66d277 [Tathagata Das] Fix line lengths.
cda62ee [Tathagata Das] Added license
25611d6 [Tathagata Das] Minor changes before submitting PR
7ae0a7f [Tathagata Das] Transferred changes from driver-ha-working branch

(cherry picked from commit 5f13759)
Signed-off-by: Tathagata Das <[email protected]>
tdas pushed a commit that referenced this pull request Apr 23, 2015
Invoking .size on arrays is valid, but requires an implicit conversion to SeqLike. This incurs a compile time overhead and more importantly a runtime overhead, as the Array must be wrapped before the method can be invoked. For example, the difference in generated byte code is:

  public int withSize();
    Code:
       0: getstatic     #23                 // Field scala/Predef$.MODULE$:Lscala/Predef$;
       3: aload_0
       4: invokevirtual #25                 // Method array:()[I
       7: invokevirtual #29                 // Method scala/Predef$.intArrayOps:([I)Lscala/collection/mutable/ArrayOps;
      10: invokeinterface #34,  1           // InterfaceMethod scala/collection/mutable/ArrayOps.size:()I
      15: ireturn

  public int withLength();
    Code:
       0: aload_0
       1: invokevirtual #25                 // Method array:()[I
       4: arraylength
       5: ireturn

Author: sksamuel <[email protected]>

Closes apache#5376 from sksamuel/master and squashes the following commits:

77ec261 [sksamuel] Replace use of .size with .length for Arrays.
tdas pushed a commit that referenced this pull request Jan 11, 2016
Add a Reader/Writer Interface for Streaming
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant