Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kill Hadoop MR task on kill of Hadoop ingestion task #6828

Merged
merged 7 commits into from
Jan 25, 2019

Conversation

ankit0811
Copy link
Contributor

KillTask from overlord UI now makes sure that it terminates the underlying MR job, thus saving unnecessary compute #6803

Run in jobby is now split into 2

  1. submitAndGetHadoopJobId followed by 2. run
    submitAndGetHadoopJobId is responsible for submitting the job and returning the jobId as a string, run monitors this job for completion

JobHelper writes this jobId in the path provided by HadoopIndexTask which in turn is provided by the ForkingTaskRunner

HadoopIndexTask reads this path when kill task is clicked to get the jobId and fire the kill command via the yarn api. This is taken care in the stopGracefully method which is called in SingleTaskBackgroundRunner. Have enabled canRestore method to return true for HadoopIndexTask in order for the stopGracefully method to be called

Hadoop*Job classes have been changed to incorporate the changes to jobby

@ankit0811
Copy link
Contributor Author

@jihoonson @jon-wei can you pls look at this PR for the kill job proposal #6803
Thanks

@ankit0811 ankit0811 changed the title Kill Hadoop MR task on kill of ingestion task and resume ability for Hadoop ingestion tasks Kill Hadoop MR task on kill of Hadoop ingestion task Jan 10, 2019
@@ -229,6 +231,9 @@ public TaskStatus call()
final File taskDir = taskConfig.getTaskDir(task.getId());
final File attemptDir = new File(taskDir, attemptUUID);

task.getContext().put(INDEX_TASK_DIR, taskDir.toString());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you need these context params, you can get the TaskConfig by calling toolbox.getConfig() in HadoopIndexTask.run().

public String runTask(String[] args) throws Exception
{
int res = ToolRunner.run(new JobClient(), args);
return res == 0 ? "Sucess" : "Fail";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sucess -> Success

@@ -585,6 +662,7 @@ public String runTask(String[] args) throws Exception
{
final String schema = args[0];
String version = args[1];
final String HadoopJobIdFile = args[2];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HadoopJobIdFile -> hadoopJobIdFile

new Object[]{buildKillJobInput}
);

log.info(String.format(Locale.ENGLISH, "Tried killing job %s , status: %s", jobId, killStatusString));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should use StringUtils.format instead

@@ -218,6 +224,15 @@ public String getClasspathPrefix()
return classpathPrefix;
}

public String getHadoopJobIdFileName()
{
String hadoopJobIdFileName = "mapReduceJobId.json";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make this a constant

log.info("MR job id is written to jobId file");
}
catch (IOException e) {
log.error("Error wriritng job id to jobId file. Exception %s ", Throwables.getStackTraceAsString(e));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log.error accepts a Throwable directly, the getStackTraceAsString is unnecessary, likewise for runSingleJob

also, wriritng -> writing

objectMapper.writeValue(new OutputStreamWriter(
new FileOutputStream(new File(hadoopJobIdFileName)), StandardCharsets.UTF_8),
hadoopJobId);
log.info("MR job id is written to jobId file");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest putting the full job id path in the log message

@@ -412,6 +430,63 @@ private TaskStatus runInternal(TaskToolbox toolbox) throws Exception
}
}

@Override
public boolean canRestore()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than tying the MR job cleanup to the restore functionality, I think it would be better to change the Task.stopGracefully() contract such that it's always called.

Regardless of whether the task would be restored, I think it makes sense to call stopGracefully() in case the task wants to attempt to clean up any open resources, like in this situation.

Also, as is, the MR job termination won't run unless the user has enabled restoreTasksOnRestart which is false by default:

      if (taskConfig.isRestoreTasksOnRestart() && task.canRestore()) {

…lying MR job, thus saving unnecessary compute

Run in jobby is now split into 2
 1. submitAndGetHadoopJobId followed by 2. run
  submitAndGetHadoopJobId is responsible for submitting the job and returning the jobId as a string, run monitors this job for completion

JobHelper writes this jobId in the path provided by HadoopIndexTask which in turn is provided by the ForkingTaskRunner

HadoopIndexTask reads this path when kill task is clicked to get hte jobId and fire the kill command via the yarn api. This is taken care in the stopGracefully method which is called in SingleTaskBackgroundRunner. Have enabled `canRestore` method to return `true` for HadoopIndexTask in order for the stopGracefully method to be called

Hadoop*Job files have been changed to incorporate the changes to jobby
…ully()

`SingleTaskBackgroundRunner` calls stopGracefully in stop() and then checks for canRestore condition to return the status of the task
@ankit0811
Copy link
Contributor Author

@jon-wei Thanks for reviewing
Have tried to make the necessary changes suggested
And apologies for accidentally using force-push to merge my changes

@jihoonson
Copy link
Contributor

@ankit0811 thanks, I'll take a look today.

*
* @return A string represtenting the jobId of the actual MR job.
* Run method is now divided into two parts. The first one being submitAndGetHadoopJobId which just submits the job and returns the job ID
* Run then monitors this job for completion
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks that the last two lines are descriptions for run method. Please move it to the proper method.

Copy link
Contributor

@jihoonson jihoonson Jan 16, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, please annotate this method with @Nullable and add a description about when this is null and how null is checked in where.

@@ -349,6 +352,22 @@ public static void ensurePaths(HadoopDruidIndexerConfig config)

public static boolean runSingleJob(Jobby job, HadoopDruidIndexerConfig config)
{
String hadoopJobId = job.submitAndGetHadoopJobId();
ObjectMapper objectMapper = new ObjectMapper();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use HadoopDruidIndexerConfig.JSON_MAPPER instead.

@@ -372,7 +391,23 @@ public static boolean runSingleJob(Jobby job, HadoopDruidIndexerConfig config)
public static boolean runJobs(List<Jobby> jobs, HadoopDruidIndexerConfig config)
{
boolean succeeded = true;
ObjectMapper objectMapper = new ObjectMapper();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use HadoopDruidIndexerConfig.JSON_MAPPER instead.

return groupByJob.getJobID().toString();
}
catch (Exception e) {
throw Throwables.propagate(e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please throw new RuntimeException(e) instead.

* Run method is now divided into two parts. The first one being submitAndGetHadoopJobId which just submits the job and returns the job ID
* Run then monitors this job for completion
*/
default String submitAndGetHadoopJobId()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DeterminePartitionsJob should implement this method and use JobHelper.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @jihoonson
That was a miss from my side
I see DeterminePartitionJob basically runs 2 job

  1. determine_partitions_groupby
  2. determine_partitions_dimselection

Is it alright to split this into two separate method and then handle their job Id in JobHelper.singleRunJob() bu casting the job

Copy link
Contributor

@jihoonson jihoonson Jan 16, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I'm not sure what you mean. Would you tell me more details for how to split DeterminPartitionJob?

Copy link
Contributor Author

@ankit0811 ankit0811 Jan 16, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So currently the run() issues two MR jobs

  1. determine_partitions_groupby
  2. determine_partitions_dimselection

I need these job to return their jobIds first and then check for their status, hence planning to split run to two parts

Basically, case when job is an instance of DetermineJobPartition JobHelper.runSingleJob() will look like this

 if (job instanceof DeterminePartitionsJob) {
        String hadoopJobId = ((DeterminePartitionsJob) job).submitAndGetHadoopJobIdForDeterminePartitionsGroupBy();
((DeterminePartitionsJob) job).RunDeterminePartitionsGroupBy();

the above will take care of the determine_partitions_groupby job
followed by submitAndGetHadoopJobId and run() (the normal code flow) which will take care of determine_partitions_dimselection job

Let me know if that makes sense
Thanks

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, but I think it's too specific for an implementation of DeterminePartitionsJob which can't handle any other custom Jobby implementations running two or more Hadoop jobs.

I think maybe it's not a good idea to add submitAndGetHadoopJobId to Jobby because a single Jobby can run 0, 1, or more Hadoop jobs. How about removing submitAndGetHadoopJobId from Jobby but adding writeHadoopJobId to JobHelper? Every Jobby running one or more Hadoop jobs should use this method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes
That simplifies things 👍
Thanks. Will make the changes

}
catch (Exception e) {
log.info("Exeption while reading json file from path: " + hadoopJobIdFile);
log.error(Throwables.getStackTraceAsString(e));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need to print two logs. Please replace them with log.warn(e, "Exeption while reading Hadoop Job ID from: %s", hadoopJobIdFile);.

ClassLoader loader = HadoopTask.buildClassLoader(getHadoopDependencyCoordinates(),
taskConfig.getDefaultHadoopCoordinates());

Object killMRJobInnerProcessingRunner = getForeignClassloaderObject("org.apache.druid.indexing.common.task.HadoopIndexTask$HadoopKillMRJobIdProcessingRunner",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Format:

        Object killMRJobInnerProcessingRunner = getForeignClassloaderObject(
            "org.apache.druid.indexing.common.task.HadoopIndexTask$HadoopKillMRJobIdProcessingRunner",
            loader
        );

@@ -163,8 +163,8 @@ default int getPriority()
boolean canRestore();

/**
* Asks a task to arrange for its "run" method to exit promptly. This method will only be called if
* {@link #canRestore()} returns true. Tasks that take too long to stop gracefully will be terminated with
* Asks a task to arrange for its "run" method to exit promptly. This method will be called, whether
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's just remove This method will be called, whether {@link #canRestore()} returns true/false.

log.info("Starting graceful shutdown of task[%s].", task.getId());
// stopGracefully for resource cleaning, independent of the fact whether the task is restorable or not
// Attempt graceful shutdown.
graceful = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

graceful is always true. Please remove it and set true for metric like below.

      final ServiceMetricEvent.Builder metricBuilder = ServiceMetricEvent
          .builder()
          .setDimension("task", task.getId())
          .setDimension("dataSource", task.getDataSource())
          .setDimension("graceful", "true") // for backward compatibility
          .setDimension("error", String.valueOf(error));

@@ -90,6 +90,16 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
Copy link
Contributor

@jihoonson jihoonson Jan 16, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We really shouldn't add hadoop dependencies to druid core because 1) it's unnecessary if we don't use hadoop and 2) it might cause some errors because of the version mismatch when we use a different version of Hadoop.

Please move them to indexing-hadoop. The version must not be specified and the scope should be provided.

 1. Formatting
 2. Removing `submitAndGetHadoopJobId` from `Jobby` and calling writeJobIdToFile in the job itself
 1. POM change. Moving hadoop dependency to indexing-hadoop
Copy link
Contributor

@jon-wei jon-wei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally LGTM, but I think this needs one more major change:

Since stopGracefully is always called now, we should adjust the tasks that previously implemented graceful shutdown so that they continue to only do graceful shutdown when restore is enabled.

I also left some minor comments re: log messages

log.warn(e, "Error writing job id [%s] to the file [%s]", hadoopJobId, hadoopJobIdFileName);
}
} else {
log.info("Either job Id or File Name is null for the submitted job. Skipping writing the file [%s]", hadoopJobIdFileName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: suggest "job id" and "filename" without capitalization

@@ -153,11 +153,14 @@ public boolean canRestore()
return false;
}

/**
* Should be called independent of canRestore so that Resource cleaning can be achieved.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resource -> resource

}
}
catch (Exception e) {
log.warn(e, "Exeption while reading Hadoop Job ID from: %s", hadoopJobIdFile);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exception -> exception

// Attempt graceful shutdown.
graceful = true;
log.info("Starting graceful shutdown of task[%s].", task.getId());
// stopGracefully for resource cleaning, independent of the fact whether the task is restorable or not
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can delete the "independent of the fact" portion and the later "Attempt graceful shutdown" comment

@@ -223,7 +221,7 @@ public void stop()
.builder()
.setDimension("task", task.getId())
.setDimension("dataSource", task.getDataSource())
.setDimension("graceful", String.valueOf(graceful))
.setDimension("graceful", "true") // for backword compatibility
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

backword -> backward

@ankit0811
Copy link
Contributor Author

Generally LGTM, but I think this needs one more major change:

Since stopGracefully is always called now, we should adjust the tasks that previously implemented graceful shutdown so that they continue to only do graceful shutdown when restore is enabled

Just wanted to be sure,
The only case which is not handled here is when isRestoreOnRestart = False and canRestore = True ?
As realtime tasks have canRestore set to True and Index task have canRestore set to False always

@jon-wei
Copy link
Contributor

jon-wei commented Jan 17, 2019

@ankit0811

I would change AbstractTask so that stopGracefully() no longer throws an exception but just does nothing:

  public void stopGracefully()
  {
    // Should not be called when canRestore = false.
    throw new UnsupportedOperationException("Cannot stop gracefully");
  }

For the tasks that did have a stopGracefully() implementation (AppenderatorDriverRealtimeIndexTask, RealtimeIndexTask, SeekableStreamIndexTask), this used to be called only when taskConfig. isRestoreOnRestart is true, so that check should be moved now into the stopGracefully() implementations there.

You could change the stopGracefully() method to have the task runner pass in the TaskConfig object for access to restoreTasksOnRestart

Copy link
Contributor

@jihoonson jihoonson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jon-wei thanks for catching it!

</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add provided scope here too.

 1. stopGracefully now accepts TaskConfig as a param
     Handling isRestoreOnRestart in stopGracefully for `AppenderatorDriverRealtimeIndexTask, RealtimeIndexTask, SeekableStreamIndexTask`
     Changing tests to make TaskConfig param isRestoreOnRestart to true
@ankit0811
Copy link
Contributor Author

@jihoonson @jon-wei
Thanks for the reviews
As per your suggestions, have made the necessary changes to stopGracefully() method
Can u pls review the same
Thanks :)

Copy link
Contributor

@jon-wei jon-wei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants