-
Notifications
You must be signed in to change notification settings - Fork 14.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support in AWS Batch Operator for multinode jobs #29522
Conversation
…obs; update client to collect log info from multinode job descriptions
…batch-operator-for-multinode-jobs
…batch-operator-for-multinode-jobs
rename overrides param
…batch-operator-for-multinode-jobs
@vandonr-amz just and idea not a strong opinion. What if we create separate Operator for multinode Job? |
Co-authored-by: Andrey Anshin <[email protected]>
I'm not super fan of it, it's the same boto operation behind it, just with a different behavior... And as a user I think I would be surprised if I had to use a different operator for this. |
There is 3 different sets of parameters for SubmitJob
I also guess that arrayProperties only applicable for IMHO SubmitJob it is pretty complicated. One potential benefit for keep all in one operator it is ability to set upstream task create kwargs for |
Ok, maybe you're right after all. I'll give it a better look. I'm actually not that familiar with batch jobs 😅 |
90a4681
to
87bc61a
Compare
After taking a closer look at it, I think having 2 (or more) operators would duplicate a lot of code, without removing much complexity. Also, maybe the user isn't always right, but the initial for of this PR comes from an actual user of the operator, so I'd tend to follow their way of thinking (not being a user myself). |
As I mention before I do not not have strict concern about is it should be single operator or 3 operators (Regular, Node, EKS). I use combination Airflow + Batch since Sept 2019, and this combination cover a lot of limitations of each other. Like same implementation as Dynamic Task Mapping available in Batch years ago and work thought arrayProperties, in the other side dependency between Batch Jobs not such good rather Airflow And there is no many changes happen in Batch operator since this time however the design of Hooks and BatchOperator still from pre-provider era and now it looks ugly even if it have exclusive backoff API caller. Moth concern that potentially most of the parameters exclusively for containerOverrides options and we do not check it right now. I'm not a user of nodeOverrides because usually such architecture more suits for Hadoop Cluster, so for that purpose better to use EMR. Different users different point. I would try to check that options and return back after weekend. |
did you have time to check the options ? |
Sorry, not yet. Hectic days. I will try tomorrow morning. I we also have a question about log links, but let me check it first. |
""" | ||
job_container_desc = self.get_job_description(job_id=job_id).get("container", {}) | ||
log_configuration = job_container_desc.get("logConfiguration", {}) | ||
job_desc = self.get_job_description(job_id=job_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me add a bit more context what going on here initially.
Because everything executed outside of the Airflow users do not have any information about logs in AWS Batch.
For regular batch job we have 0 or 1 dict information about Cloudwatch: log group, region name, log stream.
This information mainly for generate operator extra link which is visible int the UI
Right now 0 could be in different situations:
- User doesn't use Cloudwatch
- This is Array Job
- For some reason AWS API do not return Cloudwatch link, I personally do not have this situation, but potentially this could happen if JOB finished very quick. That also the reason why we check this in the end of operator execution.
If user use nodeProperties, than jobs would run in multiple places and there is 0..many, in this case we cant utilise Operator Extra Link, so better we could do here is print all links to cloudwatch in Airflow log, but with current implementation the only one would be returned.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vandonr-amz can you please add this context to the PR description ^^^ It would be great to have future users able to immediately understand what's going on here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok I see your point, but should there really be more than one log link ?
I'm looking at it, and it seems that in the case of a multinode job, there is multiple log_configuration
(one per node), but from that log config we get
- the log group
- the region
I'd imagine that multinode batch jobs would not be multi-region ? So that'd would be a constant across all nodes.
And also, I suppose in an overwhelming majority of the cases, the log group would be the same for all nodes (it would be very weird if it wasn't).
Then we get the stream name from the attempts, but this does not depend on the number of nodes. I imagine in most cases there would be one attempt. If there are more, we make the choice of returning the stream name for the last attempt, which makes sense.
The job runs on many nodes, but the logs all end up in the same log stream.
What we can do is iterate on the log configs to make sure they are all sending logs
- to aws
- in the same region
- in the same group
and log a warning if it's not the case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd imagine that multinode batch jobs would not be multi-region ?
AFAIK, Batch resources are resource specific for any type of jobs
- Compute Environment (ECS or EKS clusters)
- Job Definition
- Job Queues
You could configure logging to another region (Cloudwatch) or supported logger drivers. But it configure during creation (register) Batch Job Definition and it couldn't change by submit job. So it should all store in one destination
The job runs on many nodes, but the logs all end up in the same log stream.
Nope, each node has own logs within unique log stream
batch-multinode-jobs.mp4
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wow ok that's hmm... surprising.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added a commit to log links to all logs
""" | ||
job_container_desc = self.get_job_description(job_id=job_id).get("container", {}) | ||
log_configuration = job_container_desc.get("logConfiguration", {}) | ||
job_desc = self.get_job_description(job_id=job_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vandonr-amz can you please add this context to the PR description ^^^ It would be great to have future users able to immediately understand what's going on here.
I've tested on own AWS Account this simple DAG import pendulum
from airflow import DAG
from airflow.providers.amazon.aws.operators.batch import BatchOperator
JOB_NAME = "multi-node-sample"
JOB_DEFINITION = "batch-nodes"
JOB_QUEUE = "multinode-job-queue"
CONTAINER_OVERRIDES = None
ARRAY_PROPERTIES = None
NODE_OVERRIDES = {
"numNodes": 5
}
with DAG(
dag_id="example_batch_submit_job_multi_node",
schedule_interval=None,
start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
tags=["example", "amazon-provider", "batch", "multi-node"],
catchup=False,
):
submit_batch_job = BatchOperator(
task_id="submit_batch_job",
job_name=JOB_NAME,
job_queue=JOB_QUEUE,
job_definition=JOB_DEFINITION,
container_overrides=CONTAINER_OVERRIDES,
array_properties=ARRAY_PROPERTIES,
node_overrides=NODE_OVERRIDES,
aws_conn_id=None,
) If only set If I set botocore.errorfactory.ClientException: An error occurred (ClientException) when calling the SubmitJob operation: Container overrides and node overrides are mutually exclusive, only one can be set. If I set botocore.errorfactory.ClientException: An error occurred (ClientException) when calling the SubmitJob operation: Multinode Array Job not supported. |
do you mean while still keeping
Same story here, boto is telling you that you cannot set I'm not super familiar with batch jobs, but I think the valid combinations are:
I haven't tested the array properties, but container and node overrides both work well when not mixed. |
Yep array_properties only allowed for container_overrides, not for node overrides
And also I think all of them would conflicts with Batch jobs on EKS 🤣 Bet lets keep it as surprise of future implementations. |
well yes, this comes from #25522 which was opened by a user ! |
@dimberman @Taragolis do you think you can take a look at the latest changes and see if it looks OK to you ? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for addressing the changes @vandonr-amz ! LGTM 👍
picking up #28321 after it's been somewhat abandoned by the original author.
Addressed my own comment about empty array, and it should be good to go I think.
Initial description from @camilleanne: