-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Kill Hadoop MR task on kill of Hadoop ingestion task #6828
Conversation
@jihoonson @jon-wei can you pls look at this PR for the kill job proposal #6803 |
@@ -229,6 +231,9 @@ public TaskStatus call() | |||
final File taskDir = taskConfig.getTaskDir(task.getId()); | |||
final File attemptDir = new File(taskDir, attemptUUID); | |||
|
|||
task.getContext().put(INDEX_TASK_DIR, taskDir.toString()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think you need these context params, you can get the TaskConfig
by calling toolbox.getConfig()
in HadoopIndexTask.run()
.
public String runTask(String[] args) throws Exception | ||
{ | ||
int res = ToolRunner.run(new JobClient(), args); | ||
return res == 0 ? "Sucess" : "Fail"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sucess -> Success
@@ -585,6 +662,7 @@ public String runTask(String[] args) throws Exception | |||
{ | |||
final String schema = args[0]; | |||
String version = args[1]; | |||
final String HadoopJobIdFile = args[2]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
HadoopJobIdFile -> hadoopJobIdFile
new Object[]{buildKillJobInput} | ||
); | ||
|
||
log.info(String.format(Locale.ENGLISH, "Tried killing job %s , status: %s", jobId, killStatusString)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should use StringUtils.format
instead
@@ -218,6 +224,15 @@ public String getClasspathPrefix() | |||
return classpathPrefix; | |||
} | |||
|
|||
public String getHadoopJobIdFileName() | |||
{ | |||
String hadoopJobIdFileName = "mapReduceJobId.json"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's make this a constant
log.info("MR job id is written to jobId file"); | ||
} | ||
catch (IOException e) { | ||
log.error("Error wriritng job id to jobId file. Exception %s ", Throwables.getStackTraceAsString(e)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
log.error accepts a Throwable directly, the getStackTraceAsString is unnecessary, likewise for runSingleJob
also, wriritng -> writing
objectMapper.writeValue(new OutputStreamWriter( | ||
new FileOutputStream(new File(hadoopJobIdFileName)), StandardCharsets.UTF_8), | ||
hadoopJobId); | ||
log.info("MR job id is written to jobId file"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest putting the full job id path in the log message
@@ -412,6 +430,63 @@ private TaskStatus runInternal(TaskToolbox toolbox) throws Exception | |||
} | |||
} | |||
|
|||
@Override | |||
public boolean canRestore() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than tying the MR job cleanup to the restore functionality, I think it would be better to change the Task.stopGracefully()
contract such that it's always called.
Regardless of whether the task would be restored, I think it makes sense to call stopGracefully()
in case the task wants to attempt to clean up any open resources, like in this situation.
Also, as is, the MR job termination won't run unless the user has enabled restoreTasksOnRestart
which is false by default:
if (taskConfig.isRestoreTasksOnRestart() && task.canRestore()) {
…lying MR job, thus saving unnecessary compute Run in jobby is now split into 2 1. submitAndGetHadoopJobId followed by 2. run submitAndGetHadoopJobId is responsible for submitting the job and returning the jobId as a string, run monitors this job for completion JobHelper writes this jobId in the path provided by HadoopIndexTask which in turn is provided by the ForkingTaskRunner HadoopIndexTask reads this path when kill task is clicked to get hte jobId and fire the kill command via the yarn api. This is taken care in the stopGracefully method which is called in SingleTaskBackgroundRunner. Have enabled `canRestore` method to return `true` for HadoopIndexTask in order for the stopGracefully method to be called Hadoop*Job files have been changed to incorporate the changes to jobby
97cf628
to
9b79c3a
Compare
…ully() `SingleTaskBackgroundRunner` calls stopGracefully in stop() and then checks for canRestore condition to return the status of the task
@jon-wei Thanks for reviewing |
@ankit0811 thanks, I'll take a look today. |
* | ||
* @return A string represtenting the jobId of the actual MR job. | ||
* Run method is now divided into two parts. The first one being submitAndGetHadoopJobId which just submits the job and returns the job ID | ||
* Run then monitors this job for completion |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks that the last two lines are descriptions for run
method. Please move it to the proper method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, please annotate this method with @Nullable
and add a description about when this is null and how null is checked in where.
@@ -349,6 +352,22 @@ public static void ensurePaths(HadoopDruidIndexerConfig config) | |||
|
|||
public static boolean runSingleJob(Jobby job, HadoopDruidIndexerConfig config) | |||
{ | |||
String hadoopJobId = job.submitAndGetHadoopJobId(); | |||
ObjectMapper objectMapper = new ObjectMapper(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use HadoopDruidIndexerConfig.JSON_MAPPER
instead.
@@ -372,7 +391,23 @@ public static boolean runSingleJob(Jobby job, HadoopDruidIndexerConfig config) | |||
public static boolean runJobs(List<Jobby> jobs, HadoopDruidIndexerConfig config) | |||
{ | |||
boolean succeeded = true; | |||
ObjectMapper objectMapper = new ObjectMapper(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use HadoopDruidIndexerConfig.JSON_MAPPER
instead.
return groupByJob.getJobID().toString(); | ||
} | ||
catch (Exception e) { | ||
throw Throwables.propagate(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please throw new RuntimeException(e)
instead.
* Run method is now divided into two parts. The first one being submitAndGetHadoopJobId which just submits the job and returns the job ID | ||
* Run then monitors this job for completion | ||
*/ | ||
default String submitAndGetHadoopJobId() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DeterminePartitionsJob
should implement this method and use JobHelper
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @jihoonson
That was a miss from my side
I see DeterminePartitionJob basically runs 2 job
determine_partitions_groupby
determine_partitions_dimselection
Is it alright to split this into two separate method and then handle their job Id in JobHelper.singleRunJob() bu casting the job
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I'm not sure what you mean. Would you tell me more details for how to split DeterminPartitionJob
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So currently the run() issues two MR jobs
I need these job to return their jobIds first and then check for their status, hence planning to split run to two parts
Basically, case when job is an instance of DetermineJobPartition JobHelper.runSingleJob() will look like this
if (job instanceof DeterminePartitionsJob) {
String hadoopJobId = ((DeterminePartitionsJob) job).submitAndGetHadoopJobIdForDeterminePartitionsGroupBy();
((DeterminePartitionsJob) job).RunDeterminePartitionsGroupBy();
the above will take care of the determine_partitions_groupby job
followed by submitAndGetHadoopJobId and run() (the normal code flow) which will take care of determine_partitions_dimselection job
Let me know if that makes sense
Thanks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, but I think it's too specific for an implementation of DeterminePartitionsJob
which can't handle any other custom Jobby
implementations running two or more Hadoop jobs.
I think maybe it's not a good idea to add submitAndGetHadoopJobId
to Jobby
because a single Jobby
can run 0, 1, or more Hadoop jobs. How about removing submitAndGetHadoopJobId
from Jobby
but adding writeHadoopJobId
to JobHelper
? Every Jobby
running one or more Hadoop jobs should use this method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes
That simplifies things 👍
Thanks. Will make the changes
} | ||
catch (Exception e) { | ||
log.info("Exeption while reading json file from path: " + hadoopJobIdFile); | ||
log.error(Throwables.getStackTraceAsString(e)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You don't need to print two logs. Please replace them with log.warn(e, "Exeption while reading Hadoop Job ID from: %s", hadoopJobIdFile);
.
ClassLoader loader = HadoopTask.buildClassLoader(getHadoopDependencyCoordinates(), | ||
taskConfig.getDefaultHadoopCoordinates()); | ||
|
||
Object killMRJobInnerProcessingRunner = getForeignClassloaderObject("org.apache.druid.indexing.common.task.HadoopIndexTask$HadoopKillMRJobIdProcessingRunner", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Format:
Object killMRJobInnerProcessingRunner = getForeignClassloaderObject(
"org.apache.druid.indexing.common.task.HadoopIndexTask$HadoopKillMRJobIdProcessingRunner",
loader
);
@@ -163,8 +163,8 @@ default int getPriority() | |||
boolean canRestore(); | |||
|
|||
/** | |||
* Asks a task to arrange for its "run" method to exit promptly. This method will only be called if | |||
* {@link #canRestore()} returns true. Tasks that take too long to stop gracefully will be terminated with | |||
* Asks a task to arrange for its "run" method to exit promptly. This method will be called, whether |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's just remove This method will be called, whether {@link #canRestore()} returns true/false.
log.info("Starting graceful shutdown of task[%s].", task.getId()); | ||
// stopGracefully for resource cleaning, independent of the fact whether the task is restorable or not | ||
// Attempt graceful shutdown. | ||
graceful = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
graceful
is always true. Please remove it and set true
for metric like below.
final ServiceMetricEvent.Builder metricBuilder = ServiceMetricEvent
.builder()
.setDimension("task", task.getId())
.setDimension("dataSource", task.getDataSource())
.setDimension("graceful", "true") // for backward compatibility
.setDimension("error", String.valueOf(error));
indexing-service/pom.xml
Outdated
@@ -90,6 +90,16 @@ | |||
<type>test-jar</type> | |||
<scope>test</scope> | |||
</dependency> | |||
<dependency> | |||
<groupId>org.apache.hadoop</groupId> | |||
<artifactId>hadoop-common</artifactId> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We really shouldn't add hadoop dependencies to druid core because 1) it's unnecessary if we don't use hadoop and 2) it might cause some errors because of the version mismatch when we use a different version of Hadoop.
Please move them to indexing-hadoop
. The version must not be specified and the scope should be provided
.
1. Formatting 2. Removing `submitAndGetHadoopJobId` from `Jobby` and calling writeJobIdToFile in the job itself
1. POM change. Moving hadoop dependency to indexing-hadoop
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally LGTM, but I think this needs one more major change:
Since stopGracefully
is always called now, we should adjust the tasks that previously implemented graceful shutdown so that they continue to only do graceful shutdown when restore is enabled.
I also left some minor comments re: log messages
log.warn(e, "Error writing job id [%s] to the file [%s]", hadoopJobId, hadoopJobIdFileName); | ||
} | ||
} else { | ||
log.info("Either job Id or File Name is null for the submitted job. Skipping writing the file [%s]", hadoopJobIdFileName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: suggest "job id" and "filename" without capitalization
@@ -153,11 +153,14 @@ public boolean canRestore() | |||
return false; | |||
} | |||
|
|||
/** | |||
* Should be called independent of canRestore so that Resource cleaning can be achieved. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resource -> resource
} | ||
} | ||
catch (Exception e) { | ||
log.warn(e, "Exeption while reading Hadoop Job ID from: %s", hadoopJobIdFile); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exception -> exception
// Attempt graceful shutdown. | ||
graceful = true; | ||
log.info("Starting graceful shutdown of task[%s].", task.getId()); | ||
// stopGracefully for resource cleaning, independent of the fact whether the task is restorable or not |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can delete the "independent of the fact" portion and the later "Attempt graceful shutdown" comment
@@ -223,7 +221,7 @@ public void stop() | |||
.builder() | |||
.setDimension("task", task.getId()) | |||
.setDimension("dataSource", task.getDataSource()) | |||
.setDimension("graceful", String.valueOf(graceful)) | |||
.setDimension("graceful", "true") // for backword compatibility |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
backword -> backward
Just wanted to be sure, |
I would change AbstractTask so that stopGracefully() no longer throws an exception but just does nothing:
For the tasks that did have a stopGracefully() implementation (AppenderatorDriverRealtimeIndexTask, RealtimeIndexTask, SeekableStreamIndexTask), this used to be called only when You could change the stopGracefully() method to have the task runner pass in the TaskConfig object for access to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jon-wei thanks for catching it!
</dependency> | ||
<dependency> | ||
<groupId>org.apache.hadoop</groupId> | ||
<artifactId>hadoop-mapreduce-client-core</artifactId> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add provided
scope here too.
1. stopGracefully now accepts TaskConfig as a param Handling isRestoreOnRestart in stopGracefully for `AppenderatorDriverRealtimeIndexTask, RealtimeIndexTask, SeekableStreamIndexTask` Changing tests to make TaskConfig param isRestoreOnRestart to true
@jihoonson @jon-wei |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
KillTask from overlord UI now makes sure that it terminates the underlying MR job, thus saving unnecessary compute #6803
Run in
jobby
is now split into 2submitAndGetHadoopJobId is responsible for submitting the job and returning the jobId as a string, run monitors this job for completion
JobHelper
writes this jobId in the path provided by HadoopIndexTask which in turn is provided by the ForkingTaskRunnerHadoopIndexTask
reads this path when kill task is clicked to get the jobId and fire the kill command via the yarn api. This is taken care in the stopGracefully method which is called in SingleTaskBackgroundRunner. Have enabledcanRestore
method to returntrue
for HadoopIndexTask in order for the stopGracefully method to be calledHadoop*Job
classes have been changed to incorporate the changes tojobby