Skip to content

Commit

Permalink
Feat: Improve log file handling for connector failures (#333)
Browse files Browse the repository at this point in the history
Co-authored-by: octavia-squidington-iii <[email protected]>
  • Loading branch information
aaronsteers and octavia-squidington-iii authored Aug 9, 2024
1 parent 383d89c commit 23da920
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 34 deletions.
17 changes: 9 additions & 8 deletions airbyte/_connector_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@
)

from airbyte import exceptions as exc
from airbyte._util import meta
from airbyte._util.telemetry import (
EventState,
log_config_validation_result,
log_connector_check_result,
)
from airbyte._util.temp_files import as_temp_files
from airbyte.constants import AIRBYTE_LOGGING_ROOT


if TYPE_CHECKING:
Expand Down Expand Up @@ -307,16 +307,22 @@ def _init_logger(self) -> logging.Logger:
# Prevent logging to stderr by stopping propagation to the root logger
logger.propagate = False

if AIRBYTE_LOGGING_ROOT is None:
# No temp directory available, so return a basic logger
return logger

# Else, configure the logger to write to a file

# Remove any existing handlers
for handler in logger.handlers:
logger.removeHandler(handler)

folder = meta.get_logging_root() / self.name
folder = AIRBYTE_LOGGING_ROOT / self.name
folder.mkdir(parents=True, exist_ok=True)

# Create and configure file handler
handler = logging.FileHandler(
filename=folder / f"{ulid.ULID()!s}-run-log.txt",
filename=folder / f"connector-log-{ulid.ULID()!s}.txt",
encoding="utf-8",
)
handler.setFormatter(
Expand All @@ -329,11 +335,6 @@ def _init_logger(self) -> logging.Logger:
logger.addHandler(handler)
return logger

def _new_log_file(self, verb: str = "run") -> Path:
folder = meta.get_logging_root() / self.name
folder.mkdir(parents=True, exist_ok=True)
return folder / f"{ulid.ULID()!s}-{self.name}-{verb}-log.txt"

def _peek_airbyte_message(
self,
message: AirbyteMessage,
Expand Down
16 changes: 0 additions & 16 deletions airbyte/_util/meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

import os
import sys
import tempfile
from contextlib import suppress
from functools import lru_cache
from pathlib import Path
Expand All @@ -21,21 +20,6 @@
"""URL to get the current Google Colab session information."""


@lru_cache
def get_logging_root() -> Path:
"""Return the root directory for logs.
This is the directory where logs are stored.
"""
if "AIRBYTE_LOGGING_ROOT" in os.environ:
log_root = Path(os.environ["AIRBYTE_LOGGING_ROOT"])
else:
log_root = Path(tempfile.gettempdir()) / "airbyte" / "logs"

log_root.mkdir(parents=True, exist_ok=True)
return log_root


def get_colab_release_version() -> str | None:
if "COLAB_RELEASE_TAG" in os.environ:
return os.environ["COLAB_RELEASE_TAG"]
Expand Down
49 changes: 49 additions & 0 deletions airbyte/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@

from __future__ import annotations

import os
import tempfile
import warnings
from functools import lru_cache
from pathlib import Path


DEBUG_MODE = False # Set to True to enable additional debug logging.

Expand Down Expand Up @@ -41,3 +47,46 @@

DEFAULT_ARROW_MAX_CHUNK_SIZE = 100_000
"""The default number of records to include in each batch of an Arrow dataset."""


@lru_cache
def _get_logging_root() -> Path | None:
"""Return the root directory for logs.
Returns `None` if no valid path can be found.
This is the directory where logs are stored.
"""
if "AIRBYTE_LOGGING_ROOT" in os.environ:
log_root = Path(os.environ["AIRBYTE_LOGGING_ROOT"])
else:
log_root = Path(tempfile.gettempdir()) / "airbyte" / "logs"

try:
# Attempt to create the log root directory if it does not exist
log_root.mkdir(parents=True, exist_ok=True)
except OSError:
# Handle the error by returning None
warnings.warn(
(
f"Failed to create PyAirbyte logging directory at `{log_root}`. "
"You can override the default path by setting the `AIRBYTE_LOGGING_ROOT` "
"environment variable."
),
category=UserWarning,
stacklevel=0,
)
return None
else:
return log_root


AIRBYTE_LOGGING_ROOT: Path | None = _get_logging_root()
"""The root directory for Airbyte logs.
This value can be overridden by setting the `AIRBYTE_LOGGING_ROOT` environment variable.
If not provided, PyAirbyte will use `/tmp/airbyte/logs/` where `/tmp/` is the OS's default
temporary directory. If the directory cannot be created, PyAirbyte will log a warning and
set this value to `None`.
"""
26 changes: 16 additions & 10 deletions airbyte/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ class PyAirbyteError(Exception):
guidance: str | None = None
help_url: str | None = None
log_text: str | list[str] | None = None
log_file: Path | None = None
context: dict[str, Any] | None = None
message: str | None = None

Expand All @@ -81,7 +82,7 @@ def get_message(self) -> str:

def __str__(self) -> str:
"""Return a string representation of the exception."""
special_properties = ["message", "guidance", "help_url", "log_text", "context"]
special_properties = ["message", "guidance", "help_url", "log_text", "context", "log_file"]
display_properties = {
k: v
for k, v in self.__dict__.items()
Expand All @@ -99,13 +100,16 @@ def __str__(self) -> str:
if isinstance(self.log_text, list):
self.log_text = "\n".join(self.log_text)

exception_str += f"\nLog output: \n {indent(self.log_text, ' ')}"
exception_str += f"\n Log output: \n {indent(self.log_text, ' ')}"

if self.log_file:
exception_str += f"\n Log file: {self.log_file.absolute()!s}"

if self.guidance:
exception_str += f"\nSuggestion: {self.guidance}"
exception_str += f"\n Suggestion: {self.guidance}"

if self.help_url:
exception_str += f"\nMore info: {self.help_url}"
exception_str += f"\n More info: {self.help_url}"

return exception_str

Expand Down Expand Up @@ -263,13 +267,13 @@ class AirbyteConnectorError(PyAirbyteError):
connector_name: str | None = None

def __post_init__(self) -> None:
"""Log the error message when the exception is raised."""
"""Set the log file path for the connector."""
self.log_file = self._get_log_file()

def _get_log_file(self) -> Path | None:
"""Return the log file path for the connector."""
if self.connector_name:
logger = logging.getLogger(f"airbyte.{self.connector_name}")
if self.connector_name:
logger.error(str(self))
else:
logger.error(str(self))

log_paths: list[Path] = [
Path(handler.baseFilename).absolute()
Expand All @@ -278,7 +282,9 @@ def __post_init__(self) -> None:
]

if log_paths:
print(f"Connector logs: {', '.join(str(path) for path in log_paths)}")
return log_paths[0]

return None


class AirbyteConnectorExecutableNotFoundError(AirbyteConnectorError):
Expand Down

0 comments on commit 23da920

Please sign in to comment.