Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kill Hadoop MR task on kill of Hadoop ingestion task #6828

Merged
merged 7 commits into from
Jan 25, 2019
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions indexing-hadoop/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,21 @@
<version>${hadoop.compile.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add provided scope here too.

<exclusions>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-server</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public class DetermineHashedPartitionsJob implements Jobby
private final HadoopDruidIndexerConfig config;
private String failureCause;
private Job groupByJob;
private long startTime;

public DetermineHashedPartitionsJob(
HadoopDruidIndexerConfig config
Expand All @@ -91,7 +92,7 @@ public boolean run()
* Group by (timestamp, dimensions) so we can correctly count dimension values as they would appear
* in the final segment.
*/
final long startTime = System.currentTimeMillis();
startTime = System.currentTimeMillis();
groupByJob = Job.getInstance(
new Configuration(),
StringUtils.format("%s-determine_partitions_hashed-%s", config.getDataSource(), config.getIntervals())
Expand Down Expand Up @@ -125,6 +126,11 @@ public boolean run()
groupByJob.submit();
log.info("Job %s submitted, status available at: %s", groupByJob.getJobName(), groupByJob.getTrackingURL());

// Store the jobId in the file
if (groupByJob.getJobID() != null) {
JobHelper.writeJobIdToFile(config.getHadoopJobIdFileName(), groupByJob.getJobID().toString());
}

if (!groupByJob.waitForCompletion(true)) {
log.error("Job failed: %s", groupByJob.getJobID());
failureCause = Utils.getFailureMessage(groupByJob, config.JSON_MAPPER);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,12 @@ public boolean run()
groupByJob.submit();
log.info("Job %s submitted, status available at: %s", groupByJob.getJobName(), groupByJob.getTrackingURL());

// Store the jobId in the file
if (groupByJob.getJobID() != null) {
JobHelper.writeJobIdToFile(config.getHadoopJobIdFileName(), groupByJob.getJobID().toString());
}


if (!groupByJob.waitForCompletion(true)) {
log.error("Job failed: %s", groupByJob.getJobID());
failureCause = Utils.getFailureMessage(groupByJob, config.JSON_MAPPER);
Expand Down Expand Up @@ -218,6 +224,12 @@ public boolean run()
dimSelectionJob.getTrackingURL()
);

// Store the jobId in the file
if (dimSelectionJob.getJobID() != null) {
JobHelper.writeJobIdToFile(config.getHadoopJobIdFileName(), dimSelectionJob.getJobID().toString());
}


if (!dimSelectionJob.waitForCompletion(true)) {
log.error("Job failed: %s", dimSelectionJob.getJobID().toString());
failureCause = Utils.getFailureMessage(dimSelectionJob, config.JSON_MAPPER);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class HadoopDruidDetermineConfigurationJob implements Jobby
private static final Logger log = new Logger(HadoopDruidDetermineConfigurationJob.class);
private final HadoopDruidIndexerConfig config;
private Jobby job;
private String hadoopJobIdFile;

@Inject
public HadoopDruidDetermineConfigurationJob(
Expand All @@ -55,6 +56,7 @@ public boolean run()

if (config.isDeterminingPartitions()) {
job = config.getPartitionsSpec().getPartitionJob(config);
config.setHadoopJobIdFileName(hadoopJobIdFile);
return JobHelper.runSingleJob(job, config);
} else {
int shardsPerInterval = config.getPartitionsSpec().getNumShards();
Expand Down Expand Up @@ -109,4 +111,9 @@ public String getErrorMessage()

return job.getErrorMessage();
}

public void setHadoopJobIdFile(String hadoopJobIdFile)
{
this.hadoopJobIdFile = hadoopJobIdFile;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ public static HadoopDruidIndexerConfig fromConfiguration(Configuration conf)

private HadoopIngestionSpec schema;
private PathSpec pathSpec;
private String hadoopJobIdFileName;
private final Map<Long, ShardSpecLookup> shardSpecLookups = new HashMap<>();
private final Map<Long, Map<ShardSpec, HadoopyShardSpec>> hadoopShardSpecLookup = new HashMap<>();
private final Granularity rollupGran;
Expand Down Expand Up @@ -375,6 +376,16 @@ public int getMaxParseExceptions()
return schema.getTuningConfig().getMaxParseExceptions();
}

public void setHadoopJobIdFileName(String hadoopJobIdFileName)
{
this.hadoopJobIdFileName = hadoopJobIdFileName;
}

public String getHadoopJobIdFileName()
{
return hadoopJobIdFileName;
}

/**
* Job instance should have Configuration set (by calling {@link #addJobProperties(Job)}
* or via injected system properties) before this method is called. The {@link PathSpec} may
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public class HadoopDruidIndexerJob implements Jobby
private final MetadataStorageUpdaterJob metadataStorageUpdaterJob;
private IndexGeneratorJob indexJob;
private volatile List<DataSegment> publishedSegments = null;
private String hadoopJobIdFile;

@Inject
public HadoopDruidIndexerJob(
Expand Down Expand Up @@ -92,7 +93,7 @@ public boolean run()
}
);


config.setHadoopJobIdFileName(hadoopJobIdFile);
return JobHelper.runJobs(jobs, config);
}

Expand Down Expand Up @@ -124,4 +125,9 @@ public List<DataSegment> getPublishedSegments()
}
return publishedSegments;
}

public void setHadoopJobIdFile(String hadoopJobIdFile)
{
this.hadoopJobIdFile = hadoopJobIdFile;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@
import java.util.concurrent.TimeoutException;

/**
*
*/
public class IndexGeneratorJob implements Jobby
{
Expand Down Expand Up @@ -207,6 +208,11 @@ public boolean run()
job.submit();
log.info("Job %s submitted, status available at %s", job.getJobName(), job.getTrackingURL());

// Store the jobId in the file
if (job.getJobID() != null) {
JobHelper.writeJobIdToFile(config.getHadoopJobIdFileName(), job.getJobID().toString());
}

boolean success = job.waitForCompletion(true);

Counters counters = job.getCounters();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,10 @@
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -347,6 +349,24 @@ public static void ensurePaths(HadoopDruidIndexerConfig config)
}
}

public static void writeJobIdToFile(String hadoopJobIdFileName, String hadoopJobId)
{
if (hadoopJobId != null && hadoopJobIdFileName != null) {
try {
HadoopDruidIndexerConfig.JSON_MAPPER.writeValue(
new OutputStreamWriter(new FileOutputStream(new File(hadoopJobIdFileName)), StandardCharsets.UTF_8),
hadoopJobId
);
log.info("MR job id [%s] is written to the file [%s]", hadoopJobId, hadoopJobIdFileName);
}
catch (IOException e) {
log.warn(e, "Error writing job id [%s] to the file [%s]", hadoopJobId, hadoopJobIdFileName);
}
} else {
log.info("Either job Id or File Name is null for the submitted job. Skipping writing the file [%s]", hadoopJobIdFileName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: suggest "job id" and "filename" without capitalization

}
}

public static boolean runSingleJob(Jobby job, HadoopDruidIndexerConfig config)
{
boolean succeeded = job.run();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,14 @@ public boolean canRestore()
return false;
}

/**
* Should be called independent of canRestore so that Resource cleaning can be achieved.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resource -> resource

* If resource cleaning is required, concrete class should override this method
*/
@Override
public void stopGracefully()
{
// Should not be called when canRestore = false.
throw new UnsupportedOperationException("Cannot stop gracefully");
// Do nothing and let the concrete class handle it
}

@Override
Expand Down
Loading