Skip to content

Commit

Permalink
openlineage: replace dt.now with airflow.utils.timezone.utcnow (#40887)
Browse files Browse the repository at this point in the history
Signed-off-by: Kacper Muda <[email protected]>
  • Loading branch information
kacpermuda authored Jul 22, 2024
1 parent 0212f67 commit 12e17d1
Showing 1 changed file with 4 additions and 4 deletions.
8 changes: 4 additions & 4 deletions airflow/providers/openlineage/plugins/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import logging
import os
from concurrent.futures import ProcessPoolExecutor
from datetime import datetime
from typing import TYPE_CHECKING

import psutil
Expand All @@ -43,6 +42,7 @@
)
from airflow.settings import configure_orm
from airflow.stats import Stats
from airflow.utils import timezone
from airflow.utils.timeout import timeout

if TYPE_CHECKING:
Expand Down Expand Up @@ -145,7 +145,7 @@ def on_running():
with Stats.timer(f"ol.extract.{event_type}.{operator_name}"):
task_metadata = self.extractor_manager.extract_metadata(dagrun, task)

start_date = task_instance.start_date if task_instance.start_date else datetime.now()
start_date = task_instance.start_date if task_instance.start_date else timezone.utcnow()
data_interval_start = (
dagrun.data_interval_start.isoformat() if dagrun.data_interval_start else None
)
Expand Down Expand Up @@ -224,7 +224,7 @@ def on_success():
dagrun, task, complete=True, task_instance=task_instance
)

end_date = task_instance.end_date if task_instance.end_date else datetime.now()
end_date = task_instance.end_date if task_instance.end_date else timezone.utcnow()

redacted_event = self.adapter.complete_task(
run_id=task_uuid,
Expand Down Expand Up @@ -318,7 +318,7 @@ def on_failure():
dagrun, task, complete=True, task_instance=task_instance
)

end_date = task_instance.end_date if task_instance.end_date else datetime.now()
end_date = task_instance.end_date if task_instance.end_date else timezone.utcnow()

redacted_event = self.adapter.fail_task(
run_id=task_uuid,
Expand Down

0 comments on commit 12e17d1

Please sign in to comment.