Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Airflow Operator - dry_run and support for constructor to create job_request #3889

Merged
merged 1 commit into from
Aug 23, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions docs/python_airflow_operator.md
Original file line number Diff line number Diff line change
@@ -12,7 +12,7 @@ This class provides integration with Airflow and Armada
## armada.operators.armada module


### _class_ armada.operators.armada.ArmadaOperator(name, channel_args, armada_queue, job_request, job_set_prefix='', lookout_url_template=None, poll_interval=30, container_logs=None, k8s_token_retriever=None, deferrable=False, job_acknowledgement_timeout=300, \*\*kwargs)
### _class_ armada.operators.armada.ArmadaOperator(name, channel_args, armada_queue, job_request, job_set_prefix='', lookout_url_template=None, poll_interval=30, container_logs=None, k8s_token_retriever=None, deferrable=False, job_acknowledgement_timeout=300, dry_run=False, \*\*kwargs)
Bases: `BaseOperator`, `LoggingMixin`

An Airflow operator that manages Job submission to Armada.
@@ -33,7 +33,7 @@ and handles job cancellation if the Airflow task is killed.
* **armada_queue** (*str*) –


* **job_request** (*JobSubmitRequestItem*) –
* **job_request** (*JobSubmitRequestItem** | **Callable**[**[**Context**, **jinja2.Environment**]**, **JobSubmitRequestItem**]*) –


* **job_set_prefix** (*Optional**[**str**]*) –
@@ -57,6 +57,9 @@ and handles job cancellation if the Airflow task is killed.
* **job_acknowledgement_timeout** (*int*) –


* **dry_run** (*bool*) –



#### execute(context)
Submits the job to Armada and polls for completion.
@@ -138,7 +141,7 @@ Initializes a new ArmadaOperator.
* **armada_queue** (*str*) – The name of the Armada queue to which the job will be submitted.


* **job_request** (*JobSubmitRequestItem*) – The job to be submitted to Armada.
* **job_request** (*JobSubmitRequestItem** | **Callable**[**[**Context**, **jinja2.Environment**]**, **JobSubmitRequestItem**]*) – The job to be submitted to Armada.


* **job_set_prefix** (*Optional**[**str**]*) – A string to prepend to the jobSet name.
@@ -162,6 +165,8 @@ for asynchronous execution.
:param job_acknowledgement_timeout: The timeout in seconds to wait for a job to be
acknowledged by Armada.
:type job_acknowledgement_timeout: int
:param dry_run: Run Operator in dry-run mode - render Armada request and terminate.
:type dry_run: bool
:param kwargs: Additional keyword arguments to pass to the BaseOperator.


26 changes: 23 additions & 3 deletions third_party/airflow/armada/operators/armada.py
Original file line number Diff line number Diff line change
@@ -21,7 +21,7 @@
import datetime
import os
import time
from typing import Any, Dict, Optional, Sequence, Tuple
from typing import Any, Callable, Dict, Optional, Sequence, Tuple

import jinja2
from airflow.configuration import conf
@@ -79,7 +79,8 @@ class ArmadaOperator(BaseOperator, LoggingMixin):
:param armada_queue: The name of the Armada queue to which the job will be submitted.
:type armada_queue: str
:param job_request: The job to be submitted to Armada.
:type job_request: JobSubmitRequestItem
:type job_request: JobSubmitRequestItem | \
Callable[[Context, jinja2.Environment], JobSubmitRequestItem]
:param job_set_prefix: A string to prepend to the jobSet name.
:type job_set_prefix: Optional[str]
:param lookout_url_template: Template for creating lookout links. If not specified
@@ -98,6 +99,8 @@ class ArmadaOperator(BaseOperator, LoggingMixin):
:param job_acknowledgement_timeout: The timeout in seconds to wait for a job to be
acknowledged by Armada.
:type job_acknowledgement_timeout: int
:param dry_run: Run Operator in dry-run mode - render Armada request and terminate.
:type dry_run: bool
:param kwargs: Additional keyword arguments to pass to the BaseOperator.
"""

@@ -106,7 +109,10 @@ def __init__(
name: str,
channel_args: GrpcChannelArgs,
armada_queue: str,
job_request: JobSubmitRequestItem,
job_request: (
JobSubmitRequestItem
| Callable[[Context, jinja2.Environment], JobSubmitRequestItem]
),
job_set_prefix: Optional[str] = "",
lookout_url_template: Optional[str] = None,
poll_interval: int = 30,
@@ -116,6 +122,7 @@ def __init__(
"operators", "default_deferrable", fallback=True
),
job_acknowledgement_timeout: int = 5 * 60,
dry_run: bool = False,
**kwargs,
) -> None:
super().__init__(**kwargs)
@@ -131,6 +138,7 @@ def __init__(
self.k8s_token_retriever = k8s_token_retriever
self.deferrable = deferrable
self.job_acknowledgement_timeout = job_acknowledgement_timeout
self.dry_run = dry_run
self.job_context = None

if self.container_logs and self.k8s_token_retriever is None:
@@ -153,6 +161,13 @@ def execute(self, context) -> None:

self._annotate_job_request(context, self.job_request)

if self.dry_run:
self.log.info(
f"Running in dry_run mode. job_set_id: {self.job_set_id} \n"
f"{self.job_request}"
)
return

# Submit job or reattach to previously submitted job.
# Always do this synchronously.
self.job_context = self._reattach_or_submit_job(
@@ -183,6 +198,11 @@ def render_template_fields(
:param context: Airflow Context dict wi1th values to apply on content
:param jinja_env: jinja’s environment to use for rendering.
"""
if callable(self.job_request):
if not jinja_env:
jinja_env = self.get_template_env()
self.job_request = self.job_request(context, jinja_env)

self.job_request = MessageToDict(
self.job_request, preserving_proto_field_name=True
)
2 changes: 1 addition & 1 deletion third_party/airflow/pyproject.toml
Original file line number Diff line number Diff line change
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "armada_airflow"
version = "1.0.1"
version = "1.0.2"
description = "Armada Airflow Operator"
readme='README.md'
authors = [{name = "Armada-GROSS", email = "[email protected]"}]