-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-24552][core][SQL] Use task ID instead of attempt number for writes. #21606
Conversation
Credit here should go to @rdblue when merging. |
committer: FileCommitProtocol, | ||
iterator: Iterator[(K, V)]): TaskCommitMessage = { | ||
// Set up a task. | ||
val taskContext = config.createTaskAttemptContext( | ||
jobTrackerId, commitJobId, sparkPartitionId, sparkAttemptNumber) | ||
jobTrackerId, commitJobId, sparkPartitionId, sparkTaskId.toInt) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it safe?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
task id is unique across the entire Spark application life cycle, which means we may have very large task id in a long-running micro-batch streaming application.
If we do need an int here, I'd suggest we combine stageAttemptNumber
and taskAttemptNumber
into a int, which is much less risky.(Spark won't have a lot of stage/task attempts)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Streaming still generates separate jobs / stages for each batch, right?
In that case this should be fine; this would only be a problem if a single stage has enough tasks to cover all the integer space (4 billion tasks). That shouldn't be even possible since I doubt that you'd be able to have more than Integer.MAX_VALUE
tasks (and even that is unlikely to ever happen).
I could use abs
here (and in the sql code) to avoid a negative value (potentially avoiding weird file names).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't follow, the task ids increment across jobs. so if you have a very long running application that continues to start new jobs you could potentially run out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But what does "run out" mean?
If your task ID goes past Int.MaxValue
, you'll start getting negative values here. Eventually you'll get to a long value that wraps back again to 0
when cast to an integer:
(2L + Int.MaxValue + Int.MaxValue).toInt
res2: Int = 0
So for this to "not work", which means you'd have a conflict where two tasks will generate the same output file name based on all these values (stage, task, partition, etc, etc), you need that situation to happen, which means you need about 4 billion tasks in the same stage for this to be a problem.
In other situations, you may get weird values because of the cast, but it should still work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah I see what you are saying, we just need to make sure it going negative doesn't cause any side affects or anything unpexected
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I commented before I saw this thread, but I think it is better to use the TID because that is already exposed in the UI so it is better for tracking between UI tasks and logs. The combined attempt number isn't used anywhere so this would introduce another number to identify a task. And, shifting by 16 means that these grow huge anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To backport this, can we use the .toInt
version? I think that should be safe.
* @param epochId A monotonically increasing id for streaming queries that are split in to | ||
* discrete periods of execution. For non-streaming queries, | ||
* this ID will always be 0. | ||
*/ | ||
DataWriter<T> createDataWriter(int partitionId, int attemptNumber, long epochId); | ||
DataWriter<T> createDataWriter(int partitionId, int taskId, long epochId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in SparkHadoopWriter
we must have a int, but here why not just change the type to long? data source v2 is still evolving and we already made a lot of changes in the master branch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm fine with that if you're ok with it, but wouldn't that make backporting this to 2.3 a little fishy? Yeah it's evolving, but it's still a little sub-optimal to break things in a maintenance release.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if this patch targets 2.3, I'd say we should not change any API or document, just pass taskId.toInt
as attemptNumber
and add comments to explain this hacky workaround.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just so I understand, what's the reason for not changing the parameter name and API docs? The name is not a public API in Java, so it doesn't break anything.
And regardless of the parameter name, the API documentation is wrong (since it says you can have multiple tasks with the same ID, but different attempts, which does not happen).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The V2 commit stuff is not in 2.3
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, interesting. But there is an API in 2.3:
DataWriter<T> createDataWriter(int partitionId, int attemptNumber);
Which I guess would still suffer from the problem Ryan describes in the bug. In any case, that makes it not possible to cleanly backport this, so we can make the type change here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for the type change.
logInfo(s"Writer for stage $stageId / $stageAttempt, " + | ||
s"task $partId.$attemptId is authorized to commit.") | ||
logInfo(s"Writer for stage $stageId.$stageAttempt, " + | ||
s"task $partId.$taskId is authorized to commit.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this we want to leave as attemptNumber
logInfo(message) | ||
// throwing CommitDeniedException will trigger the catch block for abort | ||
throw new CommitDeniedException(message, stageId, partId, attemptId) | ||
throw new CommitDeniedException(message, stageId, partId, taskId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think these and above messages should be the attempt number to match the output committer
I guess it depends on how picky we want to be there are other places that use attemptNumber that we could update to task id: InternalRowDataWriterFactory, memoryV2, and SimpleWritableDataSource, places that implement createDataWriter |
// The first two are currently the case in Spark, while the last one is very unlikely to | ||
// occur. If it does, two tasks IDs on a single stage could have a clashing integer value, | ||
// which could lead to code that generates clashing file names for different tasks. Still, | ||
// if the commit coordinator is enabled, only one task would be allowed to commit. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since it's not a simple toInt
anymore, how about we combine stage and task attempt number?
val stageAttemptNumer = ...
val taskAttempNumber = ...
assert(stageAttemptNumer <= Short.MaxValue)
assert(taskAttempNumber <= Short.MaxValue)
val sparkAttempNumber = (stageAttemptNumer << 16) | taskAttempNumber
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can also remove the assert and assume that, even we have so many attempts, they are not all active.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I'll use that. I think Spark might fail everything before you even go that high in attempt numbers anyway...
Test build #92181 has finished for PR 21606 at commit
|
Test build #92187 has finished for PR 21606 at commit
|
Test build #92189 has finished for PR 21606 at commit
|
Test build #92190 has finished for PR 21606 at commit
|
Test build #92192 has finished for PR 21606 at commit
|
test this please |
@@ -76,13 +76,17 @@ object SparkHadoopWriter extends Logging { | |||
// Try to write all RDD partitions as a Hadoop OutputFormat. | |||
try { | |||
val ret = sparkContext.runJob(rdd, (context: TaskContext, iter: Iterator[(K, V)]) => { | |||
// SPARK-24552: Generate a unique "task ID" based on the stage and task atempt numbers. | |||
// Assumes that there won't be more than Short.MaxValue attempts, at least not concurrently. | |||
val taskId = (context.stageAttemptNumber << 16) | context.attemptNumber |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
perhaps we should rename taskId to be something more unique so we don't confuse it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe just something like uniqueTaskId or specialTaskId but not a big deal.
Test build #92214 has finished for PR 21606 at commit
|
+1 |
@@ -76,13 +76,17 @@ object SparkHadoopWriter extends Logging { | |||
// Try to write all RDD partitions as a Hadoop OutputFormat. | |||
try { | |||
val ret = sparkContext.runJob(rdd, (context: TaskContext, iter: Iterator[(K, V)]) => { | |||
// SPARK-24552: Generate a unique "attempt ID" based on the stage and task atempt numbers. | |||
// Assumes that there won't be more than Short.MaxValue attempts, at least not concurrently. | |||
val attemptId = (context.stageAttemptNumber << 16) | context.attemptNumber |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we should generate an ID this way. We already have a unique ID that is exposed in the Spark UI. I'd much rather make it clear that the TID passed to committers as an attempt ID is the same as the TID in the stage view. That makes debugging easier. Going with this approach just introduces yet another number to track an attempt.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the problem is that taskid is a long, we can't change the hadoop api for that, and to me its more possible to have a valid task id > 2^32. It might not be ideal to do it this way but I think its a good bug fix especially for now, we can file a follow on to improve if we have ideas or want to change interface
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, that makes sense if this is just for Hadoop attempt IDs. Maybe that's a good thing to put in the comment as well?
s"task $partId.$attemptId is authorized to commit.") | ||
dataWriter.commit() | ||
} else { | ||
val message = s"Stage $stageId / $stageAttempt, " + | ||
val message = s"Stage $stageId.$stageAttempt, " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should these logs use TID instead of attempt number? The format used in other log messages is s"Task $taskId (TID $tid)"
, I think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(This is for the next line, sorry for the confusion)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll change these log messages a bit. I think the attempt is still helpful while we don't change the coordinator API (SPARK-24611), and doesn't hurt to have to there even after we do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 Thanks!
+1 |
A general comment about the log messages: it seems pretty noisy to have "logInfo" messages for every task (doing so only in the "failure" paths would be better in my opinion); but I'm keeping the current log level. |
Also, since this patch won't backport, I'll go ahead and send versions of it for branch-2.3 and branch-2.2 (which I think will be enough to also backport to 2.1). |
just an fyi, I was looking at backporting to 2.2, looks like at least some write calls don't have the issue: Looks like that was lost when things we refactored. In fact a test that rolls it, not sure why that was added: |
Interesting. But I found the same code in a different place: |
Test build #92218 has finished for PR 21606 at commit
|
Yeah so things like saveAsTextFile in 2.2 are ok but other functions like saveAsNewAPIHadoopFile and the dataframe writers have the issue, so we do need to backport |
Test build #92225 has finished for PR 21606 at commit
|
Test build #92235 has finished for PR 21606 at commit
|
@@ -125,12 +124,12 @@ object DataWritingSparkTask extends Logging { | |||
val coordinator = SparkEnv.get.outputCommitCoordinator | |||
val commitAuthorized = coordinator.canCommit(stageId, stageAttempt, partId, attemptId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a note for the followup: since we decided to use taskId
as a unique identifier for write tasks, the output coordinator can also use taskId
instead of stage and task attempts.
Given the deafening silence, I'll merge the PRs myself, given there's a bunch of +1s from others. |
…ites. This passes the unique task attempt id instead of attempt number to v2 data sources because attempt number is reused when stages are retried. When attempt numbers are reused, sources that track data by partition id and attempt number may incorrectly clean up data because the same attempt number can be both committed and aborted. For v1 / Hadoop writes, generate a unique ID based on available attempt numbers to avoid a similar problem. Closes apache#21558 Author: Marcelo Vanzin <[email protected]> Author: Ryan Blue <[email protected]> Closes apache#21606 from vanzin/SPARK-24552.2. Ref: LIHADOOP-48531
This passes the unique task attempt id instead of attempt number to v2 data sources because attempt number is reused when stages are retried. When attempt numbers are reused, sources that track data by partition id and attempt number may incorrectly clean up data because the same attempt number can be both committed and aborted.
For v1 / Hadoop writes, generate a unique ID based on available attempt numbers to avoid a similar problem.
Closes #21558