Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into aj/feat/cloud-deploy-…
Browse files Browse the repository at this point in the history
…features-from-experimental
  • Loading branch information
aaronsteers committed Dec 6, 2024
2 parents 6799b29 + 1eef889 commit 1d4f53e
Show file tree
Hide file tree
Showing 24 changed files with 2,037 additions and 1,525 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/pypi_publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,4 @@ jobs:
file_glob: true

- name: Publish
uses: pypa/gh-action-pypi-publish@v1.10.2
uses: pypa/gh-action-pypi-publish@v1.12.2
22 changes: 22 additions & 0 deletions .github/workflows/python_pytest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -187,3 +187,25 @@ jobs:
with:
name: py${{ matrix.python-version }}-${{ matrix.os }}-test-coverage
path: htmlcov/

dependency-analysis:
name: Dependency Analysis with Deptry
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.10'
- name: Set up Poetry
uses: Gr1N/setup-poetry@v9
with:
poetry-version: "1.7.1"
- name: Install dependencies
run: poetry install

# Job-specific step(s):
- name: Run Deptry
run: |
poetry run deptry .
51 changes: 6 additions & 45 deletions .github/workflows/test-pr-command.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,10 @@ jobs:
name: Append 'Starting' Comment
runs-on: ubuntu-latest
steps:
- name: Get PR JSON
id: pr-info
env:
GH_TOKEN: ${{ github.token }}
run: |
PR_JSON=$(gh api repos/${{ github.repository }}/pulls/${{ github.event.inputs.pr }})
echo "$PR_JSON" > pr-info.json
echo "sha=$(cat pr-info.json | jq -r .head.sha)" >> $GITHUB_OUTPUT
echo "run-url=https://github.com/$GITHUB_REPOSITORY/actions/runs/$GITHUB_RUN_ID" >> $GITHUB_OUTPUT
- name: Upload PR details as artifact
uses: actions/upload-artifact@v4
with:
name: pr-info
path: pr-info.json

- name: Create URL to the run output
id: vars
run: echo "run-url=https://github.com/$GITHUB_REPOSITORY/actions/runs/$GITHUB_RUN_ID" >> $GITHUB_OUTPUT
- name: Append comment with job run link
id: first-comment-action
uses: peter-evans/create-or-update-comment@v4
Expand All @@ -44,7 +34,7 @@ jobs:
> PR test job started... [Check job output.][1]
[1]: ${{ steps.pr-info.outputs.run-url }}
[1]: ${{ steps.vars.outputs.run-url }}
# This is copied from the `python_pytest.yml` file.
# Only the first two steps of the job are different, and they check out the PR's branch.
Expand All @@ -71,13 +61,6 @@ jobs:

# Custom steps to fetch the PR and checkout the code:

- name: Download PR info
# This puts the `pr-info.json` file in the current directory.
# We need this to get the PR's SHA at the time of the workflow run.
uses: actions/download-artifact@v4
with:
name: pr-info

- name: Checkout PR
uses: actions/checkout@v4
with:
Expand Down Expand Up @@ -108,29 +91,7 @@ jobs:
run: >
poetry run pytest
--verbose
-m "not super_slow and not flaky"
- name: Run Pytest (Flaky Only)
continue-on-error: true
timeout-minutes: 60
env:
GCP_GSM_CREDENTIALS: ${{ secrets.GCP_GSM_CREDENTIALS }}
run: >
poetry run pytest
--verbose
-m "flaky and not super_slow"
- name: Post CI Success to GitHub
run: |
curl --request POST \
--url https://api.github.com/repos/${{ github.repository }}/statuses/$(cat pr-info.json | jq -r .head.sha) \
--header 'authorization: Bearer ${{ secrets.GITHUB_TOKEN }}' \
--header 'content-type: application/json' \
--data '{
"state": "success",
"context": "Pytest (All, Python ${{ matrix.python-version }}, ${{ matrix.os }})",
"target_url": "https://github.com/${{ github.repository }}/actions/runs/${{ github.run_id }}",
}' \
-m "not super_slow"
log-success-comment:
name: Append 'Success' Comment
Expand Down
2 changes: 2 additions & 0 deletions airbyte/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@

from airbyte import (
caches,
callbacks,
# cli, # Causes circular import if included
cloud,
constants,
Expand Down Expand Up @@ -157,6 +158,7 @@
__all__ = [
# Modules
"caches",
"callbacks",
# "cli", # Causes circular import if included
"cloud",
"constants",
Expand Down
15 changes: 14 additions & 1 deletion airbyte/_connector_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from airbyte_protocol.models import (
AirbyteMessage,
ConnectorSpecification,
OrchestratorType,
Status,
TraceType,
Type,
Expand All @@ -40,6 +41,7 @@

from airbyte._executors.base import Executor
from airbyte._message_iterators import AirbyteMessageIterator
from airbyte.callbacks import ConfigChangeCallback
from airbyte.progress import ProgressTracker


Expand All @@ -56,13 +58,15 @@ def __init__(
executor: Executor,
name: str,
config: dict[str, Any] | None = None,
config_change_callback: ConfigChangeCallback | None = None,
*,
validate: bool = False,
) -> None:
"""Initialize the source.
If config is provided, it will be validated against the spec if validate is True.
"""
self.config_change_callback = config_change_callback
self.executor = executor
self._name = name
self._config_dict: dict[str, Any] | None = None
Expand Down Expand Up @@ -361,7 +365,8 @@ def _peek_airbyte_message(
This method handles reading Airbyte messages and taking action, if needed, based on the
message type. For instance, log messages are logged, records are tallied, and errors are
raised as exceptions if `raise_on_error` is True.
raised as exceptions if `raise_on_error` is True. If a config change message is received,
the config change callback is called.
Raises:
AirbyteConnectorFailedError: If a TRACE message of type ERROR is emitted.
Expand All @@ -380,6 +385,14 @@ def _peek_airbyte_message(
)
return

if (
message.type == Type.CONTROL
and message.control.type == OrchestratorType.CONNECTOR_CONFIG
and self.config_change_callback is not None
):
self.config_change_callback(message.control.connectorConfig.config)
return

def _execute(
self,
args: list[str],
Expand Down
19 changes: 17 additions & 2 deletions airbyte/_executors/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from airbyte._executors.python import VenvExecutor
from airbyte._util.meta import which
from airbyte._util.telemetry import EventState, log_install_state # Non-public API
from airbyte.constants import TEMP_DIR_OVERRIDE
from airbyte.constants import AIRBYTE_OFFLINE_MODE, TEMP_DIR_OVERRIDE
from airbyte.sources.registry import ConnectorMetadata, InstallType, get_connector_metadata
from airbyte.version import get_version

Expand Down Expand Up @@ -115,7 +115,7 @@ def _get_local_executor(
)


def get_connector_executor( # noqa: PLR0912, PLR0913 # Too complex
def get_connector_executor( # noqa: PLR0912, PLR0913, PLR0915 # Too many branches/arugments/statements
name: str,
*,
version: str | None = None,
Expand Down Expand Up @@ -161,6 +161,21 @@ def get_connector_executor( # noqa: PLR0912, PLR0913 # Too complex
# Fail the install.
log_install_state(name, state=EventState.FAILED, exception=ex)
raise
except requests.exceptions.ConnectionError as ex:
if not AIRBYTE_OFFLINE_MODE:
# If the user has not enabled offline mode, raise an error.
raise exc.AirbyteConnectorRegistryError(
message="Failed to connect to the connector registry.",
context={"connector_name": name},
guidance=(
"\nThere was a problem connecting to the Airbyte connector registry. "
"Please check your internet connection and try again.\nTo operate "
"offline, set the `AIRBYTE_OFFLINE_MODE` environment variable to `1`."
"This will prevent errors related to registry connectivity and disable "
"telemetry. \nIf you have a custom registry, set `_REGISTRY_ENV_VAR` "
"environment variable to the URL of your custom registry."
),
) from ex

if install_method_count == 0:
# User has not specified how to install the connector.
Expand Down
11 changes: 11 additions & 0 deletions airbyte/_processors/sql/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,17 @@ class SnowflakeConfig(SqlConfig):
database: str
role: str
schema_name: str = Field(default=DEFAULT_CACHE_SCHEMA_NAME)
data_retention_time_in_days: int | None = None

@overrides
def get_create_table_extra_clauses(self) -> list[str]:
"""Return a list of clauses to append on CREATE TABLE statements."""
clauses = []

if self.data_retention_time_in_days is not None:
clauses.append(f"DATA_RETENTION_TIME_IN_DAYS = {self.data_retention_time_in_days}")

return clauses

@overrides
def get_database_name(self) -> str:
Expand Down
5 changes: 3 additions & 2 deletions airbyte/_util/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
WriterRuntimeInfo,
)
from airbyte._util.hashing import one_way_hash
from airbyte.constants import AIRBYTE_OFFLINE_MODE
from airbyte.version import get_version


Expand Down Expand Up @@ -89,7 +90,7 @@ def _setup_analytics() -> str | bool:
anonymous_user_id: str | None = None
issues: list[str] = []

if os.environ.get(DO_NOT_TRACK):
if os.environ.get(DO_NOT_TRACK) or AIRBYTE_OFFLINE_MODE:
# User has opted out of tracking.
return False

Expand Down Expand Up @@ -207,7 +208,7 @@ def send_telemetry(
exception: Exception | None = None,
) -> None:
# If DO_NOT_TRACK is set, we don't send any telemetry
if os.environ.get(DO_NOT_TRACK):
if os.environ.get(DO_NOT_TRACK) or AIRBYTE_OFFLINE_MODE:
return

payload_props: dict[str, str | int | dict] = {
Expand Down
2 changes: 0 additions & 2 deletions airbyte/caches/_state_backend_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
import abc
from typing import TYPE_CHECKING

from airbyte_protocol.models import AirbyteStreamState


if TYPE_CHECKING:
from airbyte_protocol.models import (
Expand Down
5 changes: 1 addition & 4 deletions airbyte/caches/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,7 @@
from airbyte.constants import DEFAULT_ARROW_MAX_CHUNK_SIZE, TEMP_FILE_CLEANUP
from airbyte.datasets._sql import CachedDataset
from airbyte.shared.catalog_providers import CatalogProvider
from airbyte.shared.sql_processor import (
SqlConfig,
SqlProcessorBase,
)
from airbyte.shared.sql_processor import SqlConfig
from airbyte.shared.state_writers import StdOutStateWriter


Expand Down
50 changes: 50 additions & 0 deletions airbyte/callbacks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
"""Callbacks for working with PyAirbyte."""

from __future__ import annotations

from collections.abc import Callable
from typing import Any


ConfigChangeCallback = Callable[[dict[str, Any]], None]
"""Callback for when the configuration changes while the connector is running.
This callback can be passed to supporting functions like `airbyte.get_source()` and
`airbyte.get_destination()` to take action whenever configuration changes.
The callback will be called with the new configuration as the only argument.
The most common use case for this callback is for connectors with OAuth APIs to pass updated
refresh tokens when the previous token is about to expire.
Note that the dictionary passed will contain the entire configuration, not just the changed fields.
Example Usage:
```python
import airbyte as ab
import yaml
from pathlib import Path
config_file = Path("path/to/my/config.yaml")
config_dict = yaml.safe_load(config_file.read_text())
# Define the callback function:
def config_callback(new_config: dict[str, Any]) -> None:
# Write new config back to config file
config_file.write_text(yaml.safe_dump(new_config))
# Pass in the callback function when creating the source:
source = get_source(
"source-faker",
config=config_dict,
config_change_callback=config_callback,
)
# Now read as usual. If config changes during sync, the callback will be called.
source.read()
```
For more information on the underlying Airbyte protocol, please see documentation on the
[`CONNECTOR_CONFIG`](https://docs.airbyte.com/understanding-airbyte/airbyte-protocol#airbytecontrolconnectorconfigmessage)
control messages.
"""
Loading

0 comments on commit 1d4f53e

Please sign in to comment.