Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Proposal] Kill Hadoop MR task on kill of ingestion task and resume ability for Hadoop ingestion tasks #6803

Open
ankit0811 opened this issue Jan 3, 2019 · 6 comments

Comments

@ankit0811
Copy link
Contributor

We plan to implement these features in a two phase solution:

Phase I : Implement the kill task feature

Currently on killing the Hadoop ingestion task from the overlord ui does not kill the MR job resulting in unnecessary wastage of resources.

The way we are thinking of doing this is by writing the job id of the MR job in a file mapReduceJobId.json which will be stored under the taskBaseDir
A separate file will be required as the MR job is executed in different JVM and there is no way it could communicate with kill task snippet, so writing the required info in a file was the only option
Currently, this file will only store the running JobID, this way when some one wishes to kill the ingestion task it can read the current running MR job (if any) and issue a yarn kill command

Phase II: Implement the resume ability for Hadoop ingestion tasks

The above file will now store the job Id of each intermediate MR task so that we can track till what step the ingestion was completed and can resume from the following step instead of executing from the beginning

@jon-wei
Copy link
Contributor

jon-wei commented Jan 3, 2019

Phase I sounds pretty useful, what entity would issue the yarn kill command? Do you have more details on the interfaces/code areas you plan to change for that?

For Phase II, can you describe the use case in more detail? When would a task resume occur? What happens if the input to the ingestion task changes between retries (e.g. the task is reading from some s3 bucket which gets some new files)?

@jihoonson
Copy link
Contributor

Thanks @ankit0811. Sounds useful!

For phase 1, I think we need a unified way to support various platforms like Hadoop, Spark, and so on. So, I would suggest to change the way killing Druid tasks. Currently, the overlord sends a kill request to a middleManager where the task is running. Then, the middleManager just destroys the task process. As a result, the task can't have a chance to prepare stopping like cleaning up its resources or killing Hadoop jobs. Instead, I think the task can clean up resources before stop by changing how the middleManager kills the task. This way makes more sense to me because the Hadoop job is started and killed in the same place (Druid Hadoop task).

Fortunately, we have some logics already implemented. First, there's stopGracefully in Task.

  /**
   * Asks a task to arrange for its "run" method to exit promptly. This method will only be called if
   * {@link #canRestore()} returns true. Tasks that take too long to stop gracefully will be terminated with
   * extreme prejudice.
   */
  void stopGracefully();

This is currently only for restorable tasks, so you may want to make the hadoop task restorable (maybe related to phase 2?). stopGracefully method is currently called in SingleTaskBackgroundRunner.stop() which in turn is called when the output stream of the task process is closed (see ForkingTaskRunner.stop() and ExecutorLifecycle.start()). So, it would work if you make hadoop task restorable and change the way to kill to closing the output stream of the task process instead of destroying the task process directly. What do you think?

@ankit0811
Copy link
Contributor Author

ankit0811 commented Jan 4, 2019

@jon-wei HadoopIndexTask will issue the kill command using the ToolRunner util class
We do have a working implementation ready for 0.12.2 and have been testing it for quite some time now. The classes/interface affected will be JobHelper, IndexGeneratorJob, HadoopDruidIndexGeneratorJob, HadoopDruidIndexerConfig, HadoopDruidDetermineConfigurationJob, DetermineHashedPartitionsJob, Jobby, Task, ForkingTaskRunner

for phase II will share a detailed use case soon

@ankit0811
Copy link
Contributor Author

@jihoonson so you are suggesting to go for phaseII first and then make a unified kill task support?

@jihoonson
Copy link
Contributor

@ankit0811 I'm also not sure what restoring means for Hadoop indexTask and how it's useful. I just mean, you may need to make Hadoop indexTask restorable no matter what it does on restore(). Or, maybe it's better to extend stopGracefully() to be called for non-restorable tasks as well.

@github-actions
Copy link

This issue has been marked as stale due to 280 days of inactivity.
It will be closed in 4 weeks if no further activity occurs. If this issue is still
relevant, please simply write any comment. Even if closed, you can still revive the
issue at any time or discuss it on the [email protected] list.
Thank you for your contributions.

@github-actions github-actions bot added the stale label Jul 31, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants