Skip to content

Commit

Permalink
[feat] support watching custom CloudWatch job streams for Batch jobs (#…
Browse files Browse the repository at this point in the history
…21)

* [feat] support watching custom CloudWatch job streams for Batch jobs
  • Loading branch information
nh13 authored Jun 2, 2022
1 parent e765e40 commit 1f04066
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 4 deletions.
8 changes: 7 additions & 1 deletion pyfgaws/batch/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,12 @@ def stream(self) -> Optional[str]:
"""The log stream for the job, if available."""
return self.describe_job()["container"].get("logStreamName")

@property
def group(self) -> Optional[str]:
"""The log group for the job, if available."""
options = self.describe_job()["container"]["logConfiguration"]["options"] # type: ignore
return options.get("awslogs-group") # type: ignore

def submit(self) -> SubmitJobResponseTypeDef:
"""Submits this job."""

Expand Down Expand Up @@ -325,7 +331,7 @@ def get_status(self) -> Optional[Status]:
return Status.from_string(self.describe_job()["status"])

def describe_job(self) -> JobDetailTypeDef:
"""Gets detauled information about this job."""
"""Gets detailed information about this job."""
jobs_response = self.client.describe_jobs(jobs=[self.job_id])
job_statuses = jobs_response["jobs"]
assert len(job_statuses) == 1
Expand Down
13 changes: 10 additions & 3 deletions pyfgaws/batch/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ def watch_job(
logger.info(f"Watching job with name '{job.name}' and id '{job.job_id}'")
if print_logs:
_log_it(region_name=region_name, job=job, logger=logger, delay=delay)
if delay is None:
time.sleep(DEFAULT_LOGS_POLLING_INTERVAL)
else:
time.sleep(delay)

job.wait_on_complete(delay=delay)
end_status = job.get_status()
Expand Down Expand Up @@ -202,12 +206,15 @@ def _watch_logs(
client: Optional[logs.Client] = boto3.client(
service_name="logs", region_name=region_name # type: ignore
)
log: Log = Log(client=client, group="/aws/batch/job", stream=job.stream)
log: Log = Log(client=client, group=job.group, stream=job.stream)

try:
while True:
for line in log:
logger.info(line)
try:
for line in log:
logger.info(line)
except client.exceptions.ResourceNotFoundException:
logger.warning("The log stream has not been created, will try again.")
time.sleep(polling_interval)
log.reset()
if not indefinitely:
Expand Down

0 comments on commit 1f04066

Please sign in to comment.