Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support in AWS Batch Operator for multinode jobs #29522

Merged
merged 27 commits into from
Apr 12, 2023

Conversation

vandonr-amz
Copy link
Contributor

picking up #28321 after it's been somewhat abandoned by the original author.
Addressed my own comment about empty array, and it should be good to go I think.

Initial description from @camilleanne:

  • Adds support for AWS Batch multinode jobs by allowing a node_overrides json object to be passed through to the boto3 submit_job method.

    • Adds support for multinode jobs by properly parsing the output of describe_jobs (which is different for container vs multinode) to extract the log stream name.

closes: #25522

@boring-cyborg boring-cyborg bot added area:providers provider:amazon AWS/Amazon - related issues labels Feb 14, 2023
@Taragolis
Copy link
Contributor

@vandonr-amz just and idea not a strong opinion. What if we create separate Operator for multinode Job?
node_overrides should conflicts with container_overrides (current overrides) and operator have a different logic for obtain logs, in additional BatchOperator use their own implementation of waiter (separate additional hook 🙄 ), I'm not sure is it would work with this changes or not.

Co-authored-by: Andrey Anshin <[email protected]>
@vandonr-amz
Copy link
Contributor Author

@vandonr-amz just and idea not a strong opinion. What if we create separate Operator for multinode Job? node_overrides should conflicts with container_overrides (current overrides) and operator have a different logic for obtain logs, in additional BatchOperator use their own implementation of waiter (separate additional hook 🙄 ), I'm not sure is it would work with this changes or not.

I'm not super fan of it, it's the same boto operation behind it, just with a different behavior... And as a user I think I would be surprised if I had to use a different operator for this.
I think an operator should be "do a thing" and then the parameters should be about the specifics of how the thing should be done. I don't think lanching a batch job or a multinode batch job are really different "things", but it's up for debate I guess.

@Taragolis
Copy link
Contributor

Taragolis commented Feb 14, 2023

There is 3 different sets of parameters for SubmitJob

  1. containerOverrides which run batch job on either EC2 or Fargate
  2. nodeOverrides which run batch job in EC2. This property include own containerOverrides
  3. eksPropertiesOverride run batch job on EKS cluster

I also guess that arrayProperties only applicable for containerOverrides

IMHO SubmitJob it is pretty complicated. One potential benefit for keep all in one operator it is ability to set upstream task create kwargs for BatchOperator.partial(...).expand_kwargs(...). But right now BatchOperator can't work with Dynamic Tasks

@vandonr-amz
Copy link
Contributor Author

Ok, maybe you're right after all. I'll give it a better look. I'm actually not that familiar with batch jobs 😅

@vandonr-amz
Copy link
Contributor Author

After taking a closer look at it, I think having 2 (or more) operators would duplicate a lot of code, without removing much complexity.
I see your arguments about how submitting a batch job can mean very different things, but it's also an operation that takes very similar parameters, and for which the actions to take on our side of the API are super similar.

Also, maybe the user isn't always right, but the initial for of this PR comes from an actual user of the operator, so I'd tend to follow their way of thinking (not being a user myself).

@Taragolis
Copy link
Contributor

As I mention before I do not not have strict concern about is it should be single operator or 3 operators (Regular, Node, EKS). I use combination Airflow + Batch since Sept 2019, and this combination cover a lot of limitations of each other. Like same implementation as Dynamic Task Mapping available in Batch years ago and work thought arrayProperties, in the other side dependency between Batch Jobs not such good rather Airflow

And there is no many changes happen in Batch operator since this time however the design of Hooks and BatchOperator still from pre-provider era and now it looks ugly even if it have exclusive backoff API caller.

Moth concern that potentially most of the parameters exclusively for containerOverrides options and we do not check it right now. I'm not a user of nodeOverrides because usually such architecture more suits for Hadoop Cluster, so for that purpose better to use EMR. Different users different point.

I would try to check that options and return back after weekend.

@vandonr-amz
Copy link
Contributor Author

did you have time to check the options ?
If we want to do it, I think that rewriting the whole batch hook and operator(s) should probably separated from that PR, which is just about resolving a user's issue.

@Taragolis
Copy link
Contributor

Sorry, not yet. Hectic days. I will try tomorrow morning.

I we also have a question about log links, but let me check it first.

Comment on lines 421 to 422
"""
job_container_desc = self.get_job_description(job_id=job_id).get("container", {})
log_configuration = job_container_desc.get("logConfiguration", {})
job_desc = self.get_job_description(job_id=job_id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me add a bit more context what going on here initially.
Because everything executed outside of the Airflow users do not have any information about logs in AWS Batch.

For regular batch job we have 0 or 1 dict information about Cloudwatch: log group, region name, log stream.
This information mainly for generate operator extra link which is visible int the UI

Grid View
image

Graph View
image

Right now 0 could be in different situations:

  1. User doesn't use Cloudwatch
  2. This is Array Job
  3. For some reason AWS API do not return Cloudwatch link, I personally do not have this situation, but potentially this could happen if JOB finished very quick. That also the reason why we check this in the end of operator execution.

If user use nodeProperties, than jobs would run in multiple places and there is 0..many, in this case we cant utilise Operator Extra Link, so better we could do here is print all links to cloudwatch in Airflow log, but with current implementation the only one would be returned.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vandonr-amz can you please add this context to the PR description ^^^ It would be great to have future users able to immediately understand what's going on here.

Copy link
Contributor Author

@vandonr-amz vandonr-amz Mar 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok I see your point, but should there really be more than one log link ?
I'm looking at it, and it seems that in the case of a multinode job, there is multiple log_configuration (one per node), but from that log config we get

  • the log group
  • the region

I'd imagine that multinode batch jobs would not be multi-region ? So that'd would be a constant across all nodes.
And also, I suppose in an overwhelming majority of the cases, the log group would be the same for all nodes (it would be very weird if it wasn't).

Then we get the stream name from the attempts, but this does not depend on the number of nodes. I imagine in most cases there would be one attempt. If there are more, we make the choice of returning the stream name for the last attempt, which makes sense.

The job runs on many nodes, but the logs all end up in the same log stream.

What we can do is iterate on the log configs to make sure they are all sending logs

  • to aws
  • in the same region
  • in the same group

and log a warning if it's not the case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd imagine that multinode batch jobs would not be multi-region ?

AFAIK, Batch resources are resource specific for any type of jobs

  • Compute Environment (ECS or EKS clusters)
  • Job Definition
  • Job Queues

You could configure logging to another region (Cloudwatch) or supported logger drivers. But it configure during creation (register) Batch Job Definition and it couldn't change by submit job. So it should all store in one destination

The job runs on many nodes, but the logs all end up in the same log stream.

Nope, each node has own logs within unique log stream

batch-multinode-jobs.mp4

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wow ok that's hmm... surprising.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added a commit to log links to all logs

Comment on lines 421 to 422
"""
job_container_desc = self.get_job_description(job_id=job_id).get("container", {})
log_configuration = job_container_desc.get("logConfiguration", {})
job_desc = self.get_job_description(job_id=job_id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vandonr-amz can you please add this context to the PR description ^^^ It would be great to have future users able to immediately understand what's going on here.

@Taragolis
Copy link
Contributor

I've tested on own AWS Account this simple DAG

import pendulum
from airflow import DAG
from airflow.providers.amazon.aws.operators.batch import BatchOperator

JOB_NAME = "multi-node-sample"
JOB_DEFINITION = "batch-nodes"
JOB_QUEUE = "multinode-job-queue"
CONTAINER_OVERRIDES = None
ARRAY_PROPERTIES = None
NODE_OVERRIDES = {
    "numNodes": 5
}

with DAG(
    dag_id="example_batch_submit_job_multi_node",
    schedule_interval=None,
    start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),
    tags=["example", "amazon-provider", "batch", "multi-node"],
    catchup=False,
):
    submit_batch_job = BatchOperator(
        task_id="submit_batch_job",
        job_name=JOB_NAME,
        job_queue=JOB_QUEUE,
        job_definition=JOB_DEFINITION,
        container_overrides=CONTAINER_OVERRIDES,
        array_properties=ARRAY_PROPERTIES,
        node_overrides=NODE_OVERRIDES,
        aws_conn_id=None,
    )

If only set NODE_OVERRIDES then it run mostly successfully, time to time one or many nodes fail during run without no reason. But it refers ether miss configuration of multinode environment or some internals of AWS.

If I set CONTAINER_OVERRIDES to any value rather than None (even {}), I've got:

botocore.errorfactory.ClientException: An error occurred (ClientException) when calling the SubmitJob operation: Container overrides and node overrides are mutually exclusive, only one can be set.

If I set ARRAY_PROPERTIES than I've got

botocore.errorfactory.ClientException: An error occurred (ClientException) when calling the SubmitJob operation: Multinode Array Job not supported.

@potiuk potiuk requested a review from dimberman March 20, 2023 14:58
@vandonr-amz
Copy link
Contributor Author

If I set CONTAINER_OVERRIDES to any value rather than None (even {}), I've got:

botocore.errorfactory.ClientException: An error occurred (ClientException) when calling the SubmitJob operation: Container overrides and node overrides are mutually exclusive, only one can be set.

do you mean while still keeping NODE_OVERRIDES set ? That'd be normal, and the error message explains it. If you want to use containers override, you need to unset the node override.

If I set ARRAY_PROPERTIES than I've got

botocore.errorfactory.ClientException: An error occurred (ClientException) when calling the SubmitJob operation: Multinode Array Job not supported.

Same story here, boto is telling you that you cannot set NODE_OVERRIDES (which implies a multi-nodes job) and ARRAY_PROPERTIES at the same time, though in a less clear way.

I'm not super familiar with batch jobs, but I think the valid combinations are:

  • container_overrides
  • container_overrides + array_properties
  • node_overrides

I haven't tested the array properties, but container and node overrides both work well when not mixed.

@Taragolis
Copy link
Contributor

do you mean while still keeping NODE_OVERRIDES set ? That'd be normal, and the error message explains it. If you want to use containers override, you need to unset the node override.

Yep array_properties only allowed for container_overrides, not for node overrides

I'm not super familiar with batch jobs, but I think the valid combinations are

And also I think all of them would conflicts with Batch jobs on EKS 🤣 Bet lets keep it as surprise of future implementations.
Personally I'm not use Multi-node batch jobs (due to all limitations ) but I guess some users might found it useful in some specific cases

@vandonr-amz
Copy link
Contributor Author

Personally I'm not use Multi-node batch jobs (due to all limitations ) but I guess some users might found it useful in some specific cases

well yes, this comes from #25522 which was opened by a user !

@vandonr-amz
Copy link
Contributor Author

@dimberman @Taragolis do you think you can take a look at the latest changes and see if it looks OK to you ?

Copy link
Contributor

@dimberman dimberman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for addressing the changes @vandonr-amz ! LGTM 👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:providers provider:amazon AWS/Amazon - related issues
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support AWS Batch multinode job types
4 participants