diff --git a/.github/ISSUE_TEMPLATE/airflow_providers_bug_report.yml b/.github/ISSUE_TEMPLATE/airflow_providers_bug_report.yml index 1194fc6407902..bf7f48908f55c 100644 --- a/.github/ISSUE_TEMPLATE/airflow_providers_bug_report.yml +++ b/.github/ISSUE_TEMPLATE/airflow_providers_bug_report.yml @@ -112,6 +112,7 @@ body: - vertica - weaviate - yandex + - ydb - zendesk validations: required: true diff --git a/.github/boring-cyborg.yml b/.github/boring-cyborg.yml index 94d01f93d7dbb..a6c1ed09199b9 100644 --- a/.github/boring-cyborg.yml +++ b/.github/boring-cyborg.yml @@ -523,6 +523,12 @@ labelPRBasedOnFilePath: - tests/providers/yandex/**/* - tests/system/providers/yandex/**/* + provider:ydb: + - airflow/providers/ydb/**/* + - docs/apache-airflow-providers-ydb/**/* + - tests/providers/ydb/**/* + - tests/system/providers/ydb/**/* + provider:zendesk: - airflow/providers/zendesk/**/* - docs/apache-airflow-providers-zendesk/**/* diff --git a/INSTALL b/INSTALL index d148079a81d82..b5ca100f7d013 100644 --- a/INSTALL +++ b/INSTALL @@ -279,7 +279,7 @@ influxdb, jdbc, jenkins, microsoft.azure, microsoft.mssql, microsoft.psrp, micro mysql, neo4j, odbc, openai, openfaas, openlineage, opensearch, opsgenie, oracle, pagerduty, papermill, pgvector, pinecone, postgres, presto, qdrant, redis, salesforce, samba, segment, sendgrid, sftp, singularity, slack, smtp, snowflake, sqlite, ssh, tableau, tabular, telegram, -teradata, trino, vertica, weaviate, yandex, zendesk +teradata, trino, vertica, weaviate, yandex, ydb, zendesk # END PROVIDER EXTRAS HERE diff --git a/airflow/providers/ydb/CHANGELOG.rst b/airflow/providers/ydb/CHANGELOG.rst new file mode 100644 index 0000000000000..0b538144e1687 --- /dev/null +++ b/airflow/providers/ydb/CHANGELOG.rst @@ -0,0 +1,33 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + + +.. NOTE TO CONTRIBUTORS: + Please, only add notes to the Changelog just below the "Changelog" header when there are some breaking changes + and you want to add an explanation to the users on how they are supposed to deal with them. + The changelog is updated and maintained semi-automatically by release manager. + +``apache-airflow-providers-ydb`` + + +Changelog +--------- + +1.0.0 +..... + +Initial version of the provider. diff --git a/airflow/providers/ydb/__init__.py b/airflow/providers/ydb/__init__.py new file mode 100644 index 0000000000000..bbed709177aa9 --- /dev/null +++ b/airflow/providers/ydb/__init__.py @@ -0,0 +1,39 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +# NOTE! THIS FILE IS AUTOMATICALLY GENERATED AND WILL BE +# OVERWRITTEN WHEN PREPARING DOCUMENTATION FOR THE PACKAGES. +# +# IF YOU WANT TO MODIFY THIS FILE, YOU SHOULD MODIFY THE TEMPLATE +# `PROVIDER__INIT__PY_TEMPLATE.py.jinja2` IN the `dev/breeze/src/airflow_breeze/templates` DIRECTORY +# +from __future__ import annotations + +import packaging.version + +from airflow import __version__ as airflow_version + +__all__ = ["__version__"] + +__version__ = "1.0.0" + +if packaging.version.parse(packaging.version.parse(airflow_version).base_version) < packaging.version.parse( + "2.7.0" +): + raise RuntimeError( + f"The package `apache-airflow-providers-ydb:{__version__}` needs Apache Airflow 2.7.0+" + ) diff --git a/airflow/providers/ydb/hooks/__init__.py b/airflow/providers/ydb/hooks/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/airflow/providers/ydb/hooks/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow/providers/ydb/hooks/_vendor/__init__.py b/airflow/providers/ydb/hooks/_vendor/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/airflow/providers/ydb/hooks/_vendor/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow/providers/ydb/hooks/_vendor/dbapi/__init__.py b/airflow/providers/ydb/hooks/_vendor/dbapi/__init__.py new file mode 100644 index 0000000000000..f8fffe7c46f51 --- /dev/null +++ b/airflow/providers/ydb/hooks/_vendor/dbapi/__init__.py @@ -0,0 +1,43 @@ +from .connection import AsyncConnection, Connection, IsolationLevel # noqa: F401 +from .cursor import AsyncCursor, Cursor, YdbQuery # noqa: F401 +from .errors import ( + DatabaseError, + DataError, + Error, + IntegrityError, + InterfaceError, + InternalError, + NotSupportedError, + OperationalError, + ProgrammingError, + Warning, +) + + +class YdbDBApi: + def __init__(self): + self.paramstyle = "pyformat" + self.threadsafety = 0 + self.apilevel = "1.0" + self._init_dbapi_attributes() + + def _init_dbapi_attributes(self): + for name, value in { + "Warning": Warning, + "Error": Error, + "InterfaceError": InterfaceError, + "DatabaseError": DatabaseError, + "DataError": DataError, + "OperationalError": OperationalError, + "IntegrityError": IntegrityError, + "InternalError": InternalError, + "ProgrammingError": ProgrammingError, + "NotSupportedError": NotSupportedError, + }.items(): + setattr(self, name, value) + + def connect(self, *args, **kwargs) -> Connection: + return Connection(*args, **kwargs) + + def async_connect(self, *args, **kwargs) -> AsyncConnection: + return AsyncConnection(*args, **kwargs) diff --git a/airflow/providers/ydb/hooks/_vendor/dbapi/connection.py b/airflow/providers/ydb/hooks/_vendor/dbapi/connection.py new file mode 100644 index 0000000000000..148b0a78f6817 --- /dev/null +++ b/airflow/providers/ydb/hooks/_vendor/dbapi/connection.py @@ -0,0 +1,185 @@ +import collections.abc +import posixpath +from typing import Any, List, NamedTuple, Optional + +import sqlalchemy.util as util +import ydb + +from .cursor import AsyncCursor, Cursor +from .errors import InterfaceError, InternalError, NotSupportedError + + +class IsolationLevel: + SERIALIZABLE = "SERIALIZABLE" + ONLINE_READONLY = "ONLINE READONLY" + ONLINE_READONLY_INCONSISTENT = "ONLINE READONLY INCONSISTENT" + STALE_READONLY = "STALE READONLY" + SNAPSHOT_READONLY = "SNAPSHOT READONLY" + AUTOCOMMIT = "AUTOCOMMIT" + + +class Connection: + _await = staticmethod(util.await_only) + + _is_async = False + _ydb_driver_class = ydb.Driver + _ydb_session_pool_class = ydb.SessionPool + _ydb_table_client_class = ydb.TableClient + _cursor_class = Cursor + + def __init__( + self, + host: str = "", + port: str = "", + database: str = "", + **conn_kwargs: Any, + ): + self.endpoint = f"grpc://{host}:{port}" + self.database = database + self.conn_kwargs = conn_kwargs + self.credentials = self.conn_kwargs.pop("credentials", None) + self.table_path_prefix = self.conn_kwargs.pop("ydb_table_path_prefix", "") + + if "ydb_session_pool" in self.conn_kwargs: # Use session pool managed manually + self._shared_session_pool = True + self.session_pool: ydb.SessionPool = self.conn_kwargs.pop("ydb_session_pool") + self.driver = ( + self.session_pool._driver + if hasattr(self.session_pool, "_driver") + else self.session_pool._pool_impl._driver + ) + self.driver.table_client = self._ydb_table_client_class(self.driver, self._get_table_client_settings()) + else: + self._shared_session_pool = False + self.driver = self._create_driver() + self.session_pool = self._ydb_session_pool_class(self.driver, size=5) + + self.interactive_transaction: bool = False # AUTOCOMMIT + self.tx_mode: ydb.AbstractTransactionModeBuilder = ydb.SerializableReadWrite() + self.tx_context: Optional[ydb.TxContext] = None + + def cursor(self): + return self._cursor_class(self.session_pool, self.tx_mode, self.tx_context, self.table_path_prefix) + + def describe(self, table_path: str) -> ydb.TableDescription: + abs_table_path = posixpath.join(self.database, self.table_path_prefix, table_path) + cursor = self.cursor() + return cursor.describe_table(abs_table_path) + + def check_exists(self, table_path: str) -> ydb.SchemeEntry: + abs_table_path = posixpath.join(self.database, self.table_path_prefix, table_path) + cursor = self.cursor() + return cursor.check_exists(abs_table_path) + + def get_table_names(self) -> List[str]: + abs_dir_path = posixpath.join(self.database, self.table_path_prefix) + cursor = self.cursor() + return [posixpath.relpath(path, abs_dir_path) for path in cursor.get_table_names(abs_dir_path)] + + def set_isolation_level(self, isolation_level: str): + class IsolationSettings(NamedTuple): + ydb_mode: ydb.AbstractTransactionModeBuilder + interactive: bool + + ydb_isolation_settings_map = { + IsolationLevel.AUTOCOMMIT: IsolationSettings(ydb.SerializableReadWrite(), interactive=False), + IsolationLevel.SERIALIZABLE: IsolationSettings(ydb.SerializableReadWrite(), interactive=True), + IsolationLevel.ONLINE_READONLY: IsolationSettings(ydb.OnlineReadOnly(), interactive=False), + IsolationLevel.ONLINE_READONLY_INCONSISTENT: IsolationSettings( + ydb.OnlineReadOnly().with_allow_inconsistent_reads(), interactive=False + ), + IsolationLevel.STALE_READONLY: IsolationSettings(ydb.StaleReadOnly(), interactive=False), + IsolationLevel.SNAPSHOT_READONLY: IsolationSettings(ydb.SnapshotReadOnly(), interactive=True), + } + ydb_isolation_settings = ydb_isolation_settings_map[isolation_level] + if self.tx_context and self.tx_context.tx_id: + raise InternalError("Failed to set transaction mode: transaction is already began") + self.tx_mode = ydb_isolation_settings.ydb_mode + self.interactive_transaction = ydb_isolation_settings.interactive + + def get_isolation_level(self) -> str: + if self.tx_mode.name == ydb.SerializableReadWrite().name: + if self.interactive_transaction: + return IsolationLevel.SERIALIZABLE + else: + return IsolationLevel.AUTOCOMMIT + elif self.tx_mode.name == ydb.OnlineReadOnly().name: + if self.tx_mode.settings.allow_inconsistent_reads: + return IsolationLevel.ONLINE_READONLY_INCONSISTENT + else: + return IsolationLevel.ONLINE_READONLY + elif self.tx_mode.name == ydb.StaleReadOnly().name: + return IsolationLevel.STALE_READONLY + elif self.tx_mode.name == ydb.SnapshotReadOnly().name: + return IsolationLevel.SNAPSHOT_READONLY + else: + raise NotSupportedError(f"{self.tx_mode.name} is not supported") + + def begin(self): + self.tx_context = None + if self.interactive_transaction: + session = self._maybe_await(self.session_pool.acquire) + self.tx_context = session.transaction(self.tx_mode) + self._maybe_await(self.tx_context.begin) + + def commit(self): + if self.tx_context and self.tx_context.tx_id: + self._maybe_await(self.tx_context.commit) + self._maybe_await(self.session_pool.release, self.tx_context.session) + self.tx_context = None + + def rollback(self): + if self.tx_context and self.tx_context.tx_id: + self._maybe_await(self.tx_context.rollback) + self._maybe_await(self.session_pool.release, self.tx_context.session) + self.tx_context = None + + def close(self): + self.rollback() + if not self._shared_session_pool: + self._maybe_await(self.session_pool.stop) + self._stop_driver() + + @classmethod + def _maybe_await(cls, callee: collections.abc.Callable, *args, **kwargs) -> Any: + if cls._is_async: + return cls._await(callee(*args, **kwargs)) + return callee(*args, **kwargs) + + def _get_table_client_settings(self) -> ydb.TableClientSettings: + return ( + ydb.TableClientSettings() + .with_native_date_in_result_sets(True) + .with_native_datetime_in_result_sets(True) + .with_native_timestamp_in_result_sets(True) + .with_native_interval_in_result_sets(True) + .with_native_json_in_result_sets(False) + ) + + def _create_driver(self): + driver_config = ydb.DriverConfig( + endpoint=self.endpoint, + database=self.database, + table_client_settings=self._get_table_client_settings(), + credentials=self.credentials, + ) + driver = self._ydb_driver_class(driver_config) + try: + self._maybe_await(driver.wait, timeout=5, fail_fast=True) + except ydb.Error as e: + raise InterfaceError(e.message, original_error=e) from e + except Exception as e: + self._maybe_await(driver.stop) + raise InterfaceError(f"Failed to connect to YDB, details {driver.discovery_debug_details()}") from e + return driver + + def _stop_driver(self): + self._maybe_await(self.driver.stop) + + +class AsyncConnection(Connection): + _is_async = True + _ydb_driver_class = ydb.aio.Driver + _ydb_session_pool_class = ydb.aio.SessionPool + _ydb_table_client_class = ydb.aio.table.TableClient + _cursor_class = AsyncCursor diff --git a/airflow/providers/ydb/hooks/_vendor/dbapi/constants.py b/airflow/providers/ydb/hooks/_vendor/dbapi/constants.py new file mode 100644 index 0000000000000..a27aef1dd5835 --- /dev/null +++ b/airflow/providers/ydb/hooks/_vendor/dbapi/constants.py @@ -0,0 +1,218 @@ +YDB_KEYWORDS = { + "abort", + "action", + "add", + "after", + "all", + "alter", + "analyze", + "and", + "ansi", + "any", + "array", + "as", + "asc", + "assume", + "async", + "attach", + "autoincrement", + "before", + "begin", + "bernoulli", + "between", + "bitcast", + "by", + "cascade", + "case", + "cast", + "changefeed", + "check", + "collate", + "column", + "columns", + "commit", + "compact", + "conditional", + "conflict", + "constraint", + "consumer", + "cover", + "create", + "cross", + "cube", + "current", + "current_date", + "current_time", + "current_timestamp", + "data", + "database", + "decimal", + "declare", + "default", + "deferrable", + "deferred", + "define", + "delete", + "desc", + "detach", + "disable", + "discard", + "distinct", + "do", + "drop", + "each", + "else", + "empty", + "empty_action", + "encrypted", + "end", + "erase", + "error", + "escape", + "evaluate", + "except", + "exclude", + "exclusion", + "exclusive", + "exists", + "explain", + "export", + "external", + "fail", + "family", + "filter", + "flatten", + "following", + "for", + "foreign", + "from", + "full", + "function", + "glob", + "group", + "grouping", + "groups", + "hash", + "having", + "hop", + "if", + "ignore", + "ilike", + "immediate", + "import", + "in", + "index", + "indexed", + "inherits", + "initially", + "inner", + "insert", + "instead", + "intersect", + "into", + "is", + "isnull", + "join", + "json_exists", + "json_query", + "json_value", + "key", + "left", + "like", + "limit", + "local", + "match", + "natural", + "no", + "not", + "notnull", + "null", + "nulls", + "object", + "of", + "offset", + "on", + "only", + "or", + "order", + "others", + "outer", + "over", + "partition", + "passing", + "password", + "plan", + "pragma", + "preceding", + "presort", + "primary", + "process", + "raise", + "range", + "reduce", + "references", + "regexp", + "reindex", + "release", + "rename", + "replace", + "replication", + "reset", + "respect", + "restrict", + "result", + "return", + "returning", + "revert", + "right", + "rlike", + "rollback", + "rollup", + "row", + "rows", + "sample", + "savepoint", + "schema", + "select", + "semi", + "sets", + "source", + "stream", + "subquery", + "symbols", + "sync", + "system", + "table", + "tablesample", + "tablestore", + "temp", + "temporary", + "then", + "ties", + "to", + "topic", + "transaction", + "trigger", + "type", + "unbounded", + "unconditional", + "union", + "unique", + "unknown", + "update", + "upsert", + "use", + "user", + "using", + "vacuum", + "values", + "view", + "virtual", + "when", + "where", + "window", + "with", + "without", + "wrapper", + "xor", +} diff --git a/airflow/providers/ydb/hooks/_vendor/dbapi/cursor.py b/airflow/providers/ydb/hooks/_vendor/dbapi/cursor.py new file mode 100644 index 0000000000000..7fb50e11c80aa --- /dev/null +++ b/airflow/providers/ydb/hooks/_vendor/dbapi/cursor.py @@ -0,0 +1,338 @@ +import collections.abc +import dataclasses +import functools +import hashlib +import itertools +import logging +import posixpath +from typing import Any, Dict, List, Mapping, Optional, Sequence, Union + +import ydb +import ydb.aio +from sqlalchemy import util + +from .errors import ( + DatabaseError, + DataError, + IntegrityError, + InternalError, + NotSupportedError, + OperationalError, + ProgrammingError, +) + +logger = logging.getLogger(__name__) + + +def get_column_type(type_obj: Any) -> str: + return str(ydb.convert.type_to_native(type_obj)) + + +@dataclasses.dataclass +class YdbQuery: + yql_text: str + parameters_types: Dict[str, Union[ydb.PrimitiveType, ydb.AbstractTypeBuilder]] = dataclasses.field( + default_factory=dict + ) + is_ddl: bool = False + + +def _handle_ydb_errors(func): + @functools.wraps(func) + def wrapper(*args, **kwargs): + try: + return func(*args, **kwargs) + except (ydb.issues.AlreadyExists, ydb.issues.PreconditionFailed) as e: + raise IntegrityError(e.message, original_error=e) from e + except (ydb.issues.Unsupported, ydb.issues.Unimplemented) as e: + raise NotSupportedError(e.message, original_error=e) from e + except (ydb.issues.BadRequest, ydb.issues.SchemeError) as e: + raise ProgrammingError(e.message, original_error=e) from e + except ( + ydb.issues.TruncatedResponseError, + ydb.issues.ConnectionError, + ydb.issues.Aborted, + ydb.issues.Unavailable, + ydb.issues.Overloaded, + ydb.issues.Undetermined, + ydb.issues.Timeout, + ydb.issues.Cancelled, + ydb.issues.SessionBusy, + ydb.issues.SessionExpired, + ydb.issues.SessionPoolEmpty, + ) as e: + raise OperationalError(e.message, original_error=e) from e + except ydb.issues.GenericError as e: + raise DataError(e.message, original_error=e) from e + except ydb.issues.InternalError as e: + raise InternalError(e.message, original_error=e) from e + except ydb.Error as e: + raise DatabaseError(e.message, original_error=e) from e + except Exception as e: + raise DatabaseError("Failed to execute query") from e + + return wrapper + + +class Cursor: + def __init__( + self, + session_pool: Union[ydb.SessionPool, ydb.aio.SessionPool], + tx_mode: ydb.AbstractTransactionModeBuilder, + tx_context: Optional[ydb.BaseTxContext] = None, + table_path_prefix: str = "", + ): + self.session_pool = session_pool + self.tx_mode = tx_mode + self.tx_context = tx_context + self.description = None + self.arraysize = 1 + self.rows = None + self._rows_prefetched = None + self.root_directory = table_path_prefix + + @_handle_ydb_errors + def describe_table(self, abs_table_path: str) -> ydb.TableDescription: + return self._retry_operation_in_pool(self._describe_table, abs_table_path) + + def check_exists(self, abs_table_path: str) -> bool: + try: + self._retry_operation_in_pool(self._describe_path, abs_table_path) + return True + except ydb.SchemeError: + return False + + @_handle_ydb_errors + def get_table_names(self, abs_dir_path: str) -> List[str]: + directory: ydb.Directory = self._retry_operation_in_pool(self._list_directory, abs_dir_path) + result = [] + for child in directory.children: + child_abs_path = posixpath.join(abs_dir_path, child.name) + if child.is_table(): + result.append(child_abs_path) + elif child.is_directory() and not child.name.startswith("."): + result.extend(self.get_table_names(child_abs_path)) + return result + + def execute(self, operation: YdbQuery, parameters: Optional[Mapping[str, Any]] = None): + query = self._get_ydb_query(operation) + + logger.info("execute sql: %s, params: %s", query, parameters) + if operation.is_ddl: + chunks = self._execute_ddl(query) + else: + chunks = self._execute_dml(query, parameters) + + rows = self._rows_iterable(chunks) + # Prefetch the description: + try: + first_row = next(rows) + except StopIteration: + pass + else: + rows = itertools.chain((first_row,), rows) + if self.rows is not None: + rows = itertools.chain(self.rows, rows) + + self.rows = rows + + def _get_ydb_query(self, operation: YdbQuery) -> Union[ydb.DataQuery, str]: + pragma = "" + if self.root_directory: + pragma = f'PRAGMA TablePathPrefix = "{self.root_directory}";\n' + + yql_with_pragma = pragma + operation.yql_text + + if operation.is_ddl or not operation.parameters_types: + return yql_with_pragma + + return self._make_data_query(yql_with_pragma, operation.parameters_types) + + def _make_data_query( + self, + yql_text: str, + parameters_types: Dict[str, Union[ydb.PrimitiveType, ydb.AbstractTypeBuilder]], + ) -> ydb.DataQuery: + """ + ydb.DataQuery uses hashed SQL text as cache key, which may cause issues if parameters change type within + the same session, so we include parameter types to the key to prevent false positive cache hit. + """ + + sorted_parameters = sorted(parameters_types.items()) # dict keys are unique, so the sorting is stable + + yql_with_params = yql_text + "".join([k + str(v) for k, v in sorted_parameters]) + name = hashlib.sha256(yql_with_params.encode("utf-8")).hexdigest() + return ydb.DataQuery(yql_text, parameters_types, name=name) + + @_handle_ydb_errors + def _execute_dml( + self, query: Union[ydb.DataQuery, str], parameters: Optional[Mapping[str, Any]] = None + ) -> ydb.convert.ResultSets: + prepared_query = query + if isinstance(query, str) and parameters: + if self.tx_context: + prepared_query = self._run_operation_in_session(self._prepare, query) + else: + prepared_query = self._retry_operation_in_pool(self._prepare, query) + + if self.tx_context: + return self._run_operation_in_tx(self._execute_in_tx, prepared_query, parameters) + + return self._retry_operation_in_pool(self._execute_in_session, self.tx_mode, prepared_query, parameters) + + @_handle_ydb_errors + def _execute_ddl(self, query: str) -> ydb.convert.ResultSets: + return self._retry_operation_in_pool(self._execute_scheme, query) + + @staticmethod + def _execute_scheme(session: ydb.Session, query: str) -> ydb.convert.ResultSets: + return session.execute_scheme(query) + + @staticmethod + def _describe_table(session: ydb.Session, abs_table_path: str) -> ydb.TableDescription: + return session.describe_table(abs_table_path) + + @staticmethod + def _describe_path(session: ydb.Session, table_path: str) -> ydb.SchemeEntry: + return session._driver.scheme_client.describe_path(table_path) + + @staticmethod + def _list_directory(session: ydb.Session, abs_dir_path: str) -> ydb.Directory: + return session._driver.scheme_client.list_directory(abs_dir_path) + + @staticmethod + def _prepare(session: ydb.Session, query: str) -> ydb.DataQuery: + return session.prepare(query) + + @staticmethod + def _execute_in_tx( + tx_context: ydb.TxContext, prepared_query: ydb.DataQuery, parameters: Optional[Mapping[str, Any]] + ) -> ydb.convert.ResultSets: + return tx_context.execute(prepared_query, parameters, commit_tx=False) + + @staticmethod + def _execute_in_session( + session: ydb.Session, + tx_mode: ydb.AbstractTransactionModeBuilder, + prepared_query: ydb.DataQuery, + parameters: Optional[Mapping[str, Any]], + ) -> ydb.convert.ResultSets: + return session.transaction(tx_mode).execute(prepared_query, parameters, commit_tx=True) + + def _run_operation_in_tx(self, callee: collections.abc.Callable, *args, **kwargs): + return callee(self.tx_context, *args, **kwargs) + + def _run_operation_in_session(self, callee: collections.abc.Callable, *args, **kwargs): + return callee(self.tx_context.session, *args, **kwargs) + + def _retry_operation_in_pool(self, callee: collections.abc.Callable, *args, **kwargs): + return self.session_pool.retry_operation_sync(callee, None, *args, **kwargs) + + def _rows_iterable(self, chunks_iterable: ydb.convert.ResultSets): + try: + for chunk in chunks_iterable: + self.description = [ + ( + col.name, + get_column_type(col.type), + None, + None, + None, + None, + None, + ) + for col in chunk.columns + ] + for row in chunk.rows: + # returns tuple to be compatible with SqlAlchemy and because + # of this PEP to return a sequence: https://www.python.org/dev/peps/pep-0249/#fetchmany + yield row[::] + except ydb.Error as e: + raise DatabaseError(e.message, original_error=e) from e + + def _ensure_prefetched(self): + if self.rows is not None and self._rows_prefetched is None: + self._rows_prefetched = list(self.rows) + self.rows = iter(self._rows_prefetched) + return self._rows_prefetched + + def executemany(self, operation: YdbQuery, seq_of_parameters: Optional[Sequence[Mapping[str, Any]]]): + for parameters in seq_of_parameters: + self.execute(operation, parameters) + + def executescript(self, script): + return self.execute(script) + + def fetchone(self): + return next(self.rows or [], None) + + def fetchmany(self, size=None): + return list(itertools.islice(self.rows, size or self.arraysize)) + + def fetchall(self): + return list(self.rows) + + def nextset(self): + self.fetchall() + + def setinputsizes(self, sizes): + pass + + def setoutputsize(self, column=None): + pass + + def close(self): + self.rows = None + self._rows_prefetched = None + + @property + def rowcount(self): + return len(self._ensure_prefetched()) + + +class AsyncCursor(Cursor): + _await = staticmethod(util.await_only) + + @staticmethod + async def _describe_table(session: ydb.aio.table.Session, abs_table_path: str) -> ydb.TableDescription: + return await session.describe_table(abs_table_path) + + @staticmethod + async def _describe_path(session: ydb.aio.table.Session, abs_table_path: str) -> ydb.SchemeEntry: + return await session._driver.scheme_client.describe_path(abs_table_path) + + @staticmethod + async def _list_directory(session: ydb.aio.table.Session, abs_dir_path: str) -> ydb.Directory: + return await session._driver.scheme_client.list_directory(abs_dir_path) + + @staticmethod + async def _execute_scheme(session: ydb.aio.table.Session, query: str) -> ydb.convert.ResultSets: + return await session.execute_scheme(query) + + @staticmethod + async def _prepare(session: ydb.aio.table.Session, query: str) -> ydb.DataQuery: + return await session.prepare(query) + + @staticmethod + async def _execute_in_tx( + tx_context: ydb.aio.table.TxContext, prepared_query: ydb.DataQuery, parameters: Optional[Mapping[str, Any]] + ) -> ydb.convert.ResultSets: + return await tx_context.execute(prepared_query, parameters, commit_tx=False) + + @staticmethod + async def _execute_in_session( + session: ydb.aio.table.Session, + tx_mode: ydb.AbstractTransactionModeBuilder, + prepared_query: ydb.DataQuery, + parameters: Optional[Mapping[str, Any]], + ) -> ydb.convert.ResultSets: + return await session.transaction(tx_mode).execute(prepared_query, parameters, commit_tx=True) + + def _run_operation_in_tx(self, callee: collections.abc.Coroutine, *args, **kwargs): + return self._await(callee(self.tx_context, *args, **kwargs)) + + def _run_operation_in_session(self, callee: collections.abc.Coroutine, *args, **kwargs): + return self._await(callee(self.tx_context.session, *args, **kwargs)) + + def _retry_operation_in_pool(self, callee: collections.abc.Coroutine, *args, **kwargs): + return self._await(self.session_pool.retry_operation(callee, *args, **kwargs)) diff --git a/airflow/providers/ydb/hooks/_vendor/dbapi/errors.py b/airflow/providers/ydb/hooks/_vendor/dbapi/errors.py new file mode 100644 index 0000000000000..79faba801aed2 --- /dev/null +++ b/airflow/providers/ydb/hooks/_vendor/dbapi/errors.py @@ -0,0 +1,103 @@ +from typing import List, Optional + +import ydb +from google.protobuf.message import Message + + +class Warning(Exception): + pass + + +class Error(Exception): + def __init__( + self, + message: str, + original_error: Optional[ydb.Error] = None, + ): + super(Error, self).__init__(message) + + self.original_error = original_error + if original_error: + pretty_issues = _pretty_issues(original_error.issues) + self.issues = original_error.issues + self.message = pretty_issues or message + self.status = original_error.status + + +class InterfaceError(Error): + pass + + +class DatabaseError(Error): + pass + + +class DataError(DatabaseError): + pass + + +class OperationalError(DatabaseError): + pass + + +class IntegrityError(DatabaseError): + pass + + +class InternalError(DatabaseError): + pass + + +class ProgrammingError(DatabaseError): + pass + + +class NotSupportedError(DatabaseError): + pass + + +def _pretty_issues(issues: List[Message]) -> str: + if issues is None: + return None + + children_messages = [_get_messages(issue, root=True) for issue in issues] + + if None in children_messages: + return None + + return "\n" + "\n".join(children_messages) + + +def _get_messages(issue: Message, max_depth: int = 100, indent: int = 2, depth: int = 0, root: bool = False) -> str: + if depth >= max_depth: + return None + + margin_str = " " * depth * indent + pre_message = "" + children = "" + + if issue.issues: + collapsed_messages = [] + while not root and len(issue.issues) == 1: + collapsed_messages.append(issue.message) + issue = issue.issues[0] + + if collapsed_messages: + pre_message = f"{margin_str}{', '.join(collapsed_messages)}\n" + depth += 1 + margin_str = " " * depth * indent + + children_messages = [ + _get_messages(iss, max_depth=max_depth, indent=indent, depth=depth + 1) for iss in issue.issues + ] + + if None in children_messages: + return None + + children = "\n".join(children_messages) + + return ( + f"{pre_message}{margin_str}{issue.message}\n{margin_str}" + f"severity level: {issue.severity}\n{margin_str}" + f"issue code: {issue.issue_code}\n{children}" + ) diff --git a/airflow/providers/ydb/hooks/_vendor/readme.md b/airflow/providers/ydb/hooks/_vendor/readme.md new file mode 100644 index 0000000000000..3336923e05411 --- /dev/null +++ b/airflow/providers/ydb/hooks/_vendor/readme.md @@ -0,0 +1,3 @@ +dbapi is extracted from https://github.com/ydb-platform/ydb-sqlalchemy/releases/tag/0.0.1b17 (Apache License 2.0) to avoid dependency on sqlalchemy package ver > 2. +_vendor could be removed in favor of ydb-sqlalchemy package after switching Airflow core to sqlalchemy > 2 (related issue https://github.com/apache/airflow/issues/28723). +Another option is to wait for separate package for ydb-dbapi: https://github.com/ydb-platform/ydb-sqlalchemy/issues/46 and switch to it afterwards. diff --git a/airflow/providers/ydb/hooks/ydb.py b/airflow/providers/ydb/hooks/ydb.py new file mode 100644 index 0000000000000..5d5083d238464 --- /dev/null +++ b/airflow/providers/ydb/hooks/ydb.py @@ -0,0 +1,251 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from typing import TYPE_CHECKING, Any, Mapping, Sequence + +import ydb +from sqlalchemy.engine import URL + +from airflow.exceptions import AirflowException +from airflow.providers.common.sql.hooks.sql import DbApiHook +from airflow.providers.ydb.hooks._vendor.dbapi.connection import Connection as DbApiConnection +from airflow.providers.ydb.hooks._vendor.dbapi.cursor import YdbQuery +from airflow.providers.ydb.utils.credentials import get_credentials_from_connection +from airflow.providers.ydb.utils.defaults import CONN_NAME_ATTR, CONN_TYPE, DEFAULT_CONN_NAME + +DEFAULT_YDB_GRPCS_PORT: int = 2135 + +if TYPE_CHECKING: + from airflow.models.connection import Connection + from airflow.providers.ydb.hooks._vendor.dbapi.cursor import Cursor as DbApiCursor + + +class YDBCursor: + """YDB cursor wrapper.""" + + def __init__(self, delegatee: DbApiCursor, is_ddl: bool): + self.delegatee: DbApiCursor = delegatee + self.is_ddl: bool = is_ddl + + def execute(self, sql: str, parameters: Mapping[str, Any] | None = None): + if parameters is not None: + raise AirflowException("parameters is not supported yet") + + q = YdbQuery(yql_text=sql, is_ddl=self.is_ddl) + return self.delegatee.execute(q, parameters) + + def executemany(self, sql: str, seq_of_parameters: Sequence[Mapping[str, Any]]): + for parameters in seq_of_parameters: + self.execute(sql, parameters) + + def executescript(self, script): + return self.execute(script) + + def fetchone(self): + return self.delegatee.fetchone() + + def fetchmany(self, size=None): + return self.delegatee.fetchmany(size=size) + + def fetchall(self): + return self.delegatee.fetchall() + + def nextset(self): + return self.delegatee.nextset() + + def setinputsizes(self, sizes): + return self.delegatee.setinputsizes(sizes) + + def setoutputsize(self, column=None): + return self.delegatee.setoutputsize(column) + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() + + def close(self): + return self.delegatee.close() + + @property + def rowcount(self): + return self.delegatee.rowcount + + @property + def description(self): + return self.delegatee.description + + +class YDBConnection: + """YDB connection wrapper.""" + + def __init__(self, endpoint: str, database: str, credentials: Any, is_ddl: bool = False): + self.is_ddl = is_ddl + driver_config = ydb.DriverConfig( + endpoint=endpoint, + database=database, + table_client_settings=YDBConnection._get_table_client_settings(), + credentials=credentials, + ) + driver = ydb.Driver(driver_config) + # wait until driver become initialized + driver.wait(fail_fast=True, timeout=10) + ydb_session_pool = ydb.SessionPool(driver, size=5) + self.delegatee: DbApiConnection = DbApiConnection(ydb_session_pool=ydb_session_pool) + + def cursor(self) -> YDBCursor: + return YDBCursor(self.delegatee.cursor(), is_ddl=self.is_ddl) + + def begin(self) -> None: + self.delegatee.begin() + + def commit(self) -> None: + self.delegatee.commit() + + def rollback(self) -> None: + self.delegatee.rollback() + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb) -> None: + self.close() + + def close(self) -> None: + self.delegatee.close() + + @staticmethod + def _get_table_client_settings() -> ydb.TableClientSettings: + return ( + ydb.TableClientSettings() + .with_native_date_in_result_sets(True) + .with_native_datetime_in_result_sets(True) + .with_native_timestamp_in_result_sets(True) + .with_native_interval_in_result_sets(True) + .with_native_json_in_result_sets(False) + ) + + +class YDBHook(DbApiHook): + """Interact with YDB.""" + + conn_name_attr: str = CONN_NAME_ATTR + default_conn_name: str = DEFAULT_CONN_NAME + conn_type: str = CONN_TYPE + hook_name: str = "YDB" + supports_autocommit: bool = True + supports_executemany: bool = True + + def __init__(self, *args, is_ddl: bool = False, **kwargs) -> None: + super().__init__(*args, **kwargs) + self.is_ddl = is_ddl + + @classmethod + def get_connection_form_widgets(cls) -> dict[str, Any]: + """Return connection widgets to add to YDB connection form.""" + from flask_appbuilder.fieldwidgets import BS3PasswordFieldWidget, BS3TextFieldWidget + from flask_babel import lazy_gettext + from wtforms import BooleanField, PasswordField, StringField + + return { + "database": StringField( + lazy_gettext("Database name"), + widget=BS3TextFieldWidget(), + description="Required. YDB database name", + ), + "service_account_json": PasswordField( + lazy_gettext("Service account auth JSON"), + widget=BS3PasswordFieldWidget(), + description="Service account auth JSON. Looks like " + '{"id": "...", "service_account_id": "...", "private_key": "..."}. ' + "Will be used instead of IAM token and SA JSON file path field if specified.", + ), + "service_account_json_path": StringField( + lazy_gettext("Service account auth JSON file path"), + widget=BS3TextFieldWidget(), + description="Service account auth JSON file path. File content looks like " + '{"id": "...", "service_account_id": "...", "private_key": "..."}. ', + ), + "token": PasswordField( + lazy_gettext("IAM token"), + widget=BS3PasswordFieldWidget(), + description="User account IAM token.", + ), + "use_vm_metadata": BooleanField( + lazy_gettext("Use VM metadata"), + default=False, + description="Optional. Whether to use VM metadata to retrieve IAM token", + ), + } + + @classmethod + def get_ui_field_behaviour(cls) -> dict[str, Any]: + """Return custom UI field behaviour for YDB connection.""" + return { + "hidden_fields": ["schema", "extra"], + "relabeling": {}, + "placeholders": { + "host": "eg. grpcs://my_host or ydb.serverless.yandexcloud.net or lb.etn9txxxx.ydb.mdb.yandexcloud.net", + "login": "root", + "password": "my_password", + "database": "e.g. /local or /ru-central1/b1gtl2kg13him37quoo6/etndqstq7ne4v68n6c9b", + "service_account_json": 'e.g. {"id": "...", "service_account_id": "...", "private_key": "..."}', + "token": "t1.9....AAQ", + }, + } + + @property + def sqlalchemy_url(self) -> URL: + conn: Connection = self.get_connection(getattr(self, self.conn_name_attr)) + connection_extra: dict[str, Any] = conn.extra_dejson + database: str | None = connection_extra.get("database") + return URL.create( + drivername="ydb", + username=conn.login, + password=conn.password, + host=conn.host, + port=conn.port, + query={"database": database}, + ) + + def get_conn(self) -> YDBConnection: + """Establish a connection to a YDB database.""" + conn: Connection = self.get_connection(getattr(self, self.conn_name_attr)) + host: str | None = conn.host + if not host: + raise ValueError("YDB host must be specified") + port: int = conn.port or DEFAULT_YDB_GRPCS_PORT + + connection_extra: dict[str, Any] = conn.extra_dejson + database: str | None = connection_extra.get("database") + if not database: + raise ValueError("YDB database must be specified") + + endpoint = f"{host}:{port}" + credentials = get_credentials_from_connection( + endpoint=endpoint, database=database, connection=conn, connection_extra=connection_extra + ) + + return YDBConnection( + endpoint=endpoint, database=database, credentials=credentials, is_ddl=self.is_ddl + ) + + @staticmethod + def _serialize_cell(cell: object, conn: YDBConnection | None = None) -> Any: + return cell diff --git a/airflow/providers/ydb/operators/__init__.py b/airflow/providers/ydb/operators/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/airflow/providers/ydb/operators/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow/providers/ydb/operators/ydb.py b/airflow/providers/ydb/operators/ydb.py new file mode 100644 index 0000000000000..1867f227b4eb6 --- /dev/null +++ b/airflow/providers/ydb/operators/ydb.py @@ -0,0 +1,54 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from typing import Iterable, Mapping + +from airflow.exceptions import AirflowException +from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator + + +class YDBExecuteQueryOperator(SQLExecuteQueryOperator): + """ + Executes sql code in a specific YDB database. + + :param sql: the SQL code to be executed as a single string, or + a list of str (sql statements), or a reference to a template file. + Template references are recognized by str ending in '.sql' + :param ydb_conn_id: The :ref:`ydb conn id ` + reference to a specific YDB cluster and database. + :param parameters: (optional) the parameters to render the SQL query with. + """ + + ui_color = "#ededed" + + def __init__( + self, + sql: str | list[str], + is_ddl: bool = False, + ydb_conn_id: str = "ydb_default", + parameters: Mapping | Iterable | None = None, + **kwargs, + ) -> None: + if parameters is not None: + raise AirflowException("parameters are not supported yet") + + if is_ddl: + hook_params = kwargs.pop("hook_params", {}) + kwargs["hook_params"] = {"is_ddl": is_ddl, **hook_params} + + super().__init__(conn_id=ydb_conn_id, sql=sql, parameters=parameters, **kwargs) diff --git a/airflow/providers/ydb/provider.yaml b/airflow/providers/ydb/provider.yaml new file mode 100644 index 0000000000000..944df92043906 --- /dev/null +++ b/airflow/providers/ydb/provider.yaml @@ -0,0 +1,59 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +--- +package-name: apache-airflow-providers-ydb +name: YDB +description: | + `YDB `__ + +state: ready +source-date-epoch: 1715384469 +# note that those versions are maintained by release manager - do not update them manually +versions: + - 1.0.0 + +dependencies: + - apache-airflow>=2.7.0 + - apache-airflow-providers-common-sql>=1.3.1 + - ydb>=3.11.3 + +integrations: + - integration-name: YDB + external-doc-url: https://ydb.tech/docs/en/ + how-to-guide: + - /docs/apache-airflow-providers-ydb/operators/ydb_operator_howto_guide.rst + logo: /integration-logos/ydb/ydb.png + tags: [software] + +operators: + - integration-name: YDB + python-modules: + - airflow.providers.ydb.operators.ydb + +hooks: + - integration-name: YDB + python-modules: + - airflow.providers.ydb.hooks.ydb + - airflow.providers.ydb.hooks._vendor.dbapi.connection + - airflow.providers.ydb.hooks._vendor.dbapi.constants + - airflow.providers.ydb.hooks._vendor.dbapi.cursor + - airflow.providers.ydb.hooks._vendor.dbapi.errors + +connection-types: + - hook-class-name: airflow.providers.ydb.hooks.ydb.YDBHook + connection-type: ydb diff --git a/airflow/providers/ydb/utils/__init__.py b/airflow/providers/ydb/utils/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/airflow/providers/ydb/utils/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/airflow/providers/ydb/utils/credentials.py b/airflow/providers/ydb/utils/credentials.py new file mode 100644 index 0000000000000..61e08ac109190 --- /dev/null +++ b/airflow/providers/ydb/utils/credentials.py @@ -0,0 +1,76 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +import ydb +import ydb.iam.auth as auth + +from airflow.exceptions import AirflowException + +if TYPE_CHECKING: + from airflow.models.connection import Connection + + +def get_credentials_from_connection( + endpoint: str, database: str, connection: Connection, connection_extra: dict[str, Any] | None = None +) -> Any: + """ + Return YDB credentials object for YDB SDK based on connection settings. + + Credentials will be used with this priority: + + * login + * token + * service_account_json_path + * service_account_json + * use_vm_metadata + * anonymous + + :param endpoint: address of YDB cluster, e.g. ``grpcs://my-server.com:2135`` + :param database: YDB database name, e.g. ``/local`` + :param connection: connection object + :param connection_extra: connection extra settings + :return: YDB credentials object + """ + if connection.login: + driver_config = ydb.DriverConfig( + endpoint=endpoint, + database=database, + ) + + return ydb.StaticCredentials(driver_config, user=connection.login, password=connection.password) + + connection_extra = connection_extra or {} + token = connection_extra.get("token") + if token: + return ydb.AccessTokenCredentials(token) + + service_account_json_path = connection_extra.get("service_account_json_path") + if service_account_json_path: + return auth.BaseJWTCredentials.from_file(auth.ServiceAccountCredentials, service_account_json_path) + + service_account_json = connection_extra.get("service_account_json") + if service_account_json: + raise AirflowException("service_account_json parameter is not supported yet") + + use_vm_metadata = connection_extra.get("use_vm_metadata", False) + if use_vm_metadata: + return auth.MetadataUrlCredentials() + + return ydb.AnonymousCredentials() diff --git a/airflow/providers/ydb/utils/defaults.py b/airflow/providers/ydb/utils/defaults.py new file mode 100644 index 0000000000000..1658fbd2457b0 --- /dev/null +++ b/airflow/providers/ydb/utils/defaults.py @@ -0,0 +1,21 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +CONN_NAME_ATTR: str = "ydb_conn_id" +DEFAULT_CONN_NAME: str = "ydb_default" +CONN_TYPE: str = "ydb" diff --git a/airflow/utils/db.py b/airflow/utils/db.py index 1309c90235ec2..c7f8cbb35f8af 100644 --- a/airflow/utils/db.py +++ b/airflow/utils/db.py @@ -741,6 +741,16 @@ def create_default_connections(session: Session = NEW_SESSION): ), session, ) + merge_conn( + Connection( + conn_id="ydb_default", + conn_type="ydb", + host="grpc://localhost", + port=2135, + extra={"database": "/local"}, + ), + session, + ) def _get_flask_db(sql_database_uri): diff --git a/contributing-docs/12_airflow_dependencies_and_extras.rst b/contributing-docs/12_airflow_dependencies_and_extras.rst index ff2f5998235b0..c6ae334c44a56 100644 --- a/contributing-docs/12_airflow_dependencies_and_extras.rst +++ b/contributing-docs/12_airflow_dependencies_and_extras.rst @@ -187,7 +187,7 @@ influxdb, jdbc, jenkins, microsoft.azure, microsoft.mssql, microsoft.psrp, micro mysql, neo4j, odbc, openai, openfaas, openlineage, opensearch, opsgenie, oracle, pagerduty, papermill, pgvector, pinecone, postgres, presto, qdrant, redis, salesforce, samba, segment, sendgrid, sftp, singularity, slack, smtp, snowflake, sqlite, ssh, tableau, tabular, telegram, -teradata, trino, vertica, weaviate, yandex, zendesk +teradata, trino, vertica, weaviate, yandex, ydb, zendesk .. END PROVIDER EXTRAS HERE diff --git a/contributing-docs/testing/integration_tests.rst b/contributing-docs/testing/integration_tests.rst index 823b4a509c05f..322298d4f00c0 100644 --- a/contributing-docs/testing/integration_tests.rst +++ b/contributing-docs/testing/integration_tests.rst @@ -80,6 +80,8 @@ The following integrations are available: +--------------+----------------------------------------------------+ | trino | Integration required for Trino hooks. | +--------------+----------------------------------------------------+ +| ydb | Integration required for YDB tests. | ++--------------+----------------------------------------------------+ .. END AUTO-GENERATED INTEGRATION LIST' diff --git a/dev/breeze/doc/images/output-commands.svg b/dev/breeze/doc/images/output-commands.svg index 1381742085b64..dbc639544c32c 100644 --- a/dev/breeze/doc/images/output-commands.svg +++ b/dev/breeze/doc/images/output-commands.svg @@ -300,7 +300,7 @@ [default: 3.8]                                               --integrationIntegration(s) to enable when running (can be more than one).                        (all | all-testable | cassandra | celery | drill | kafka | kerberos | mongo | mssql  -| openlineage | otel | pinot | qdrant | redis | statsd | trino)                      +| openlineage | otel | pinot | qdrant | redis | statsd | trino | ydb)                --standalone-dag-processorRun standalone dag processor for start-airflow. --database-isolationRun airflow in database isolation mode. ╰──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯ diff --git a/dev/breeze/doc/images/output_build-docs.svg b/dev/breeze/doc/images/output_build-docs.svg index ace5782d9b051..38e43f232f3f8 100644 --- a/dev/breeze/doc/images/output_build-docs.svg +++ b/dev/breeze/doc/images/output_build-docs.svg @@ -198,7 +198,7 @@ microsoft.mssql | microsoft.psrp | microsoft.winrm | mongo | mysql | neo4j | odbc | openai | openfaas | openlineage |  opensearch | opsgenie | oracle | pagerduty | papermill | pgvector | pinecone | postgres | presto | qdrant | redis |    salesforce | samba | segment | sendgrid | sftp | singularity | slack | smtp | snowflake | sqlite | ssh | tableau |     -tabular | telegram | teradata | trino | vertica | weaviate | yandex | zendesk]...                                      +tabular | telegram | teradata | trino | vertica | weaviate | yandex | ydb | zendesk]...                                Build documents. diff --git a/dev/breeze/doc/images/output_build-docs.txt b/dev/breeze/doc/images/output_build-docs.txt index a3fd8a1049cde..b08fb02d2fd65 100644 --- a/dev/breeze/doc/images/output_build-docs.txt +++ b/dev/breeze/doc/images/output_build-docs.txt @@ -1 +1 @@ -51ee6f7789d0f590d99d4ea36e900376 +05f5785d302be0fb247a967da774a68a diff --git a/dev/breeze/doc/images/output_release-management_add-back-references.svg b/dev/breeze/doc/images/output_release-management_add-back-references.svg index 5783cda41a42b..3f1523112bf5f 100644 --- a/dev/breeze/doc/images/output_release-management_add-back-references.svg +++ b/dev/breeze/doc/images/output_release-management_add-back-references.svg @@ -146,19 +146,19 @@ microsoft.mssql | microsoft.psrp | microsoft.winrm | mongo | mysql | neo4j | odbc | openai | openfaas | openlineage |  opensearch | opsgenie | oracle | pagerduty | papermill | pgvector | pinecone | postgres | presto | qdrant | redis |    salesforce | samba | segment | sendgrid | sftp | singularity | slack | smtp | snowflake | sqlite | ssh | tableau |     -tabular | telegram | teradata | trino | vertica | weaviate | yandex | zendesk]...                                      +tabular | telegram | teradata | trino | vertica | weaviate | yandex | ydb | zendesk]...                                Command to add back references for documentation to make it backward compatible. ╭─ Add Back References to Docs ────────────────────────────────────────────────────────────────────────────────────────╮ -*--airflow-site-directory-aLocal directory path of cloned airflow-site repo.(DIRECTORY)[required] ---include-not-ready-providersWhether to include providers that are not yet ready to be released. ---include-removed-providersWhether to include providers that are removed. +*--airflow-site-directory-aLocal directory path of cloned airflow-site repo.(DIRECTORY)[required] +--include-not-ready-providersWhether to include providers that are not yet ready to be released. +--include-removed-providersWhether to include providers that are removed. ╰──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯ ╭─ Common options ─────────────────────────────────────────────────────────────────────────────────────────────────────╮ ---dry-run-DIf dry-run is set, commands are only printed, not executed. ---verbose-vPrint verbose information about performed steps. ---help-hShow this message and exit. +--dry-run-DIf dry-run is set, commands are only printed, not executed. +--verbose-vPrint verbose information about performed steps. +--help-hShow this message and exit. ╰──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯ diff --git a/dev/breeze/doc/images/output_release-management_add-back-references.txt b/dev/breeze/doc/images/output_release-management_add-back-references.txt index 7c41874af793c..0213ea275b449 100644 --- a/dev/breeze/doc/images/output_release-management_add-back-references.txt +++ b/dev/breeze/doc/images/output_release-management_add-back-references.txt @@ -1 +1 @@ -2764dbd6139c3eded9ac7065010be3c9 +4fcc46bb0f7687e962c07ee506796a64 diff --git a/dev/breeze/doc/images/output_release-management_generate-issue-content-providers.svg b/dev/breeze/doc/images/output_release-management_generate-issue-content-providers.svg index 15d20ef0666f8..4134a4428b94a 100644 --- a/dev/breeze/doc/images/output_release-management_generate-issue-content-providers.svg +++ b/dev/breeze/doc/images/output_release-management_generate-issue-content-providers.svg @@ -146,21 +146,21 @@ microsoft.azure | microsoft.mssql | microsoft.psrp | microsoft.winrm | mongo | mysql | neo4j | odbc | openai |         openfaas | openlineage | opensearch | opsgenie | oracle | pagerduty | papermill | pgvector | pinecone | postgres |     presto | qdrant | redis | salesforce | samba | segment | sendgrid | sftp | singularity | slack | smtp | snowflake |    -sqlite | ssh | tableau | tabular | telegram | teradata | trino | vertica | weaviate | yandex | zendesk]...             +sqlite | ssh | tableau | tabular | telegram | teradata | trino | vertica | weaviate | yandex | ydb | zendesk]...       Generates content for issue to test the release. ╭─ Generate issue content flags ───────────────────────────────────────────────────────────────────────────────────────╮ ---disable-progressDisable progress bar ---excluded-pr-listComa-separated list of PRs to exclude from the issue.(TEXT) ---github-tokenGitHub token used to authenticate. You can set omit it if you have GITHUB_TOKEN env      +--disable-progressDisable progress bar +--excluded-pr-listComa-separated list of PRs to exclude from the issue.(TEXT) +--github-tokenGitHub token used to authenticate. You can set omit it if you have GITHUB_TOKEN env      variable set. Can be generated with:                                                     https://github.com/settings/tokens/new?description=Read%20sssues&scopes=repo:status      (TEXT)                                                                                   ---only-available-in-distOnly consider package ids with packages prepared in the dist folder +--only-available-in-distOnly consider package ids with packages prepared in the dist folder ╰──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯ ╭─ Common options ─────────────────────────────────────────────────────────────────────────────────────────────────────╮ ---help-hShow this message and exit. +--help-hShow this message and exit. ╰──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯ diff --git a/dev/breeze/doc/images/output_release-management_generate-issue-content-providers.txt b/dev/breeze/doc/images/output_release-management_generate-issue-content-providers.txt index e38768e2b8d56..e6b8e5993e950 100644 --- a/dev/breeze/doc/images/output_release-management_generate-issue-content-providers.txt +++ b/dev/breeze/doc/images/output_release-management_generate-issue-content-providers.txt @@ -1 +1 @@ -fe29a2627df02339d908e1439919b86c +71426eec51048dfd92d9b5725cbd2137 diff --git a/dev/breeze/doc/images/output_release-management_prepare-provider-documentation.svg b/dev/breeze/doc/images/output_release-management_prepare-provider-documentation.svg index 16ab20dc69312..d33f8acdf8c88 100644 --- a/dev/breeze/doc/images/output_release-management_prepare-provider-documentation.svg +++ b/dev/breeze/doc/images/output_release-management_prepare-provider-documentation.svg @@ -182,33 +182,33 @@ microsoft.azure | microsoft.mssql | microsoft.psrp | microsoft.winrm | mongo | mysql | neo4j | odbc | openai |         openfaas | openlineage | opensearch | opsgenie | oracle | pagerduty | papermill | pgvector | pinecone | postgres |     presto | qdrant | redis | salesforce | samba | segment | sendgrid | sftp | singularity | slack | smtp | snowflake |    -sqlite | ssh | tableau | tabular | telegram | teradata | trino | vertica | weaviate | yandex | zendesk]...             +sqlite | ssh | tableau | tabular | telegram | teradata | trino | vertica | weaviate | yandex | ydb | zendesk]...       Prepare CHANGELOG, README and COMMITS information for providers. ╭─ Provider documentation preparation flags ───────────────────────────────────────────────────────────────────────────╮ ---base-branchBase branch to use as diff for documentation generation (used for releasing from  +--base-branchBase branch to use as diff for documentation generation (used for releasing from  old branch)                                                                       (TEXT)                                                                            ---github-repository-gGitHub repository used to pull, push run images.(TEXT)[default: apache/airflow] ---include-not-ready-providersWhether to include providers that are not yet ready to be released. ---include-removed-providersWhether to include providers that are removed. ---non-interactiveRun in non-interactive mode. Provides random answers to the type of changes and   +--github-repository-gGitHub repository used to pull, push run images.(TEXT)[default: apache/airflow] +--include-not-ready-providersWhether to include providers that are not yet ready to be released. +--include-removed-providersWhether to include providers that are removed. +--non-interactiveRun in non-interactive mode. Provides random answers to the type of changes and   confirms releasefor providers prepared for release - useful to test the script in non-interactive mode in CI.                                                       ---only-min-version-updateOnly update minimum version in __init__.py files and regenerate corresponding     +--only-min-version-updateOnly update minimum version in __init__.py files and regenerate corresponding     documentation                                                                     ---reapply-templates-onlyOnly reapply templates, do not bump version. Useful if templates were added and   +--reapply-templates-onlyOnly reapply templates, do not bump version. Useful if templates were added and   you need to regenerate documentation.                                             ---skip-git-fetchSkips removal and recreation of `apache-https-for-providers` remote in git. By    +--skip-git-fetchSkips removal and recreation of `apache-https-for-providers` remote in git. By    default, the remote is recreated and fetched to make sure that it's up to date    and that recent commits are not missing                                           ╰──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯ ╭─ Common options ─────────────────────────────────────────────────────────────────────────────────────────────────────╮ ---answer-aForce answer to questions.(y | n | q | yes | no | quit) ---dry-run-DIf dry-run is set, commands are only printed, not executed. ---verbose-vPrint verbose information about performed steps. ---help-hShow this message and exit. +--answer-aForce answer to questions.(y | n | q | yes | no | quit) +--dry-run-DIf dry-run is set, commands are only printed, not executed. +--verbose-vPrint verbose information about performed steps. +--help-hShow this message and exit. ╰──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯ diff --git a/dev/breeze/doc/images/output_release-management_prepare-provider-documentation.txt b/dev/breeze/doc/images/output_release-management_prepare-provider-documentation.txt index 6bb54fa2253fa..d456c7e9ed45b 100644 --- a/dev/breeze/doc/images/output_release-management_prepare-provider-documentation.txt +++ b/dev/breeze/doc/images/output_release-management_prepare-provider-documentation.txt @@ -1 +1 @@ -588a3418ca2e846fe5a77701e02f9924 +92a9d186f7b5f474ff17627b5d0f4361 diff --git a/dev/breeze/doc/images/output_release-management_prepare-provider-packages.svg b/dev/breeze/doc/images/output_release-management_prepare-provider-packages.svg index 0fbacfc53978c..20d5c8b61c013 100644 --- a/dev/breeze/doc/images/output_release-management_prepare-provider-packages.svg +++ b/dev/breeze/doc/images/output_release-management_prepare-provider-packages.svg @@ -182,7 +182,7 @@ microsoft.azure | microsoft.mssql | microsoft.psrp | microsoft.winrm | mongo | mysql | neo4j | odbc | openai |         openfaas | openlineage | opensearch | opsgenie | oracle | pagerduty | papermill | pgvector | pinecone | postgres |     presto | qdrant | redis | salesforce | samba | segment | sendgrid | sftp | singularity | slack | smtp | snowflake |    -sqlite | ssh | tableau | tabular | telegram | teradata | trino | vertica | weaviate | yandex | zendesk]...             +sqlite | ssh | tableau | tabular | telegram | teradata | trino | vertica | weaviate | yandex | ydb | zendesk]...       Prepare sdist/whl packages of Airflow Providers. diff --git a/dev/breeze/doc/images/output_release-management_prepare-provider-packages.txt b/dev/breeze/doc/images/output_release-management_prepare-provider-packages.txt index 5ef3821cb8f04..eb9e71ed880bd 100644 --- a/dev/breeze/doc/images/output_release-management_prepare-provider-packages.txt +++ b/dev/breeze/doc/images/output_release-management_prepare-provider-packages.txt @@ -1 +1 @@ -f4c203a55cd13a763a5fd02c6a795934 +af579eab45fdc401c93fd4eb5379ffd1 diff --git a/dev/breeze/doc/images/output_release-management_publish-docs.svg b/dev/breeze/doc/images/output_release-management_publish-docs.svg index ae583be4a4169..f8d2b62ffb5af 100644 --- a/dev/breeze/doc/images/output_release-management_publish-docs.svg +++ b/dev/breeze/doc/images/output_release-management_publish-docs.svg @@ -203,7 +203,7 @@ microsoft.mssql | microsoft.psrp | microsoft.winrm | mongo | mysql | neo4j | odbc | openai | openfaas | openlineage |  opensearch | opsgenie | oracle | pagerduty | papermill | pgvector | pinecone | postgres | presto | qdrant | redis |    salesforce | samba | segment | sendgrid | sftp | singularity | slack | smtp | snowflake | sqlite | ssh | tableau |     -tabular | telegram | teradata | trino | vertica | weaviate | yandex | zendesk]...                                      +tabular | telegram | teradata | trino | vertica | weaviate | yandex | ydb | zendesk]...                                Command to publish generated documentation to airflow-site diff --git a/dev/breeze/doc/images/output_release-management_publish-docs.txt b/dev/breeze/doc/images/output_release-management_publish-docs.txt index 22c2fb2b1f981..3a3190a4dc47a 100644 --- a/dev/breeze/doc/images/output_release-management_publish-docs.txt +++ b/dev/breeze/doc/images/output_release-management_publish-docs.txt @@ -1 +1 @@ -243a1028d1b00e884e2e16f613e8239e +24bf07ed4bfd1c282031336535e9ef29 diff --git a/dev/breeze/doc/images/output_sbom_generate-providers-requirements.svg b/dev/breeze/doc/images/output_sbom_generate-providers-requirements.svg index 6a054627c0c9e..59fa6024f19e9 100644 --- a/dev/breeze/doc/images/output_sbom_generate-providers-requirements.svg +++ b/dev/breeze/doc/images/output_sbom_generate-providers-requirements.svg @@ -180,9 +180,9 @@ Generate requirements for selected provider. ╭─ Generate provider requirements flags ───────────────────────────────────────────────────────────────────────────────╮ ---pythonPython version to update sbom from. (defaults to all historical python versions) +--pythonPython version to update sbom from. (defaults to all historical python versions) (3.6 | 3.7 | 3.8 | 3.9 | 3.10 | 3.11 | 3.12)                                     ---provider-idProvider id to generate the requirements for                                                   +--provider-idProvider id to generate the requirements for                                                   (airbyte | alibaba | amazon | apache.beam | apache.cassandra | apache.drill | apache.druid |   apache.flink | apache.hdfs | apache.hive | apache.iceberg | apache.impala | apache.kafka |     apache.kylin | apache.livy | apache.pig | apache.pinot | apache.spark | apprise | arangodb |   @@ -193,26 +193,26 @@ | neo4j | odbc | openai | openfaas | openlineage | opensearch | opsgenie | oracle | pagerduty  | papermill | pgvector | pinecone | postgres | presto | qdrant | redis | salesforce | samba |  segment | sendgrid | sftp | singularity | slack | smtp | snowflake | sqlite | ssh | tableau |  -tabular | telegram | teradata | trino | vertica | weaviate | yandex | zendesk)                 ---provider-versionProvider version to generate the requirements for i.e `2.1.0`. `latest` is also a supported    +tabular | telegram | teradata | trino | vertica | weaviate | yandex | ydb | zendesk)           +--provider-versionProvider version to generate the requirements for i.e `2.1.0`. `latest` is also a supported    value to account for the most recent version of the provider                                   (TEXT)                                                                                         ---forceForce update providers requirements even if they already exist. +--forceForce update providers requirements even if they already exist. ╰──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯ ╭─ Parallel running ───────────────────────────────────────────────────────────────────────────────────────────────────╮ ---run-in-parallelRun the operation in parallel on all or selected subset of parameters. ---parallelismMaximum number of processes to use while running the operation in parallel. +--run-in-parallelRun the operation in parallel on all or selected subset of parameters. +--parallelismMaximum number of processes to use while running the operation in parallel. (INTEGER RANGE)                                                             [default: 4; 1<=x<=8]                                                       ---skip-cleanupSkip cleanup of temporary files created during parallel run. ---debug-resourcesWhether to show resource information while running in parallel. ---include-success-outputsWhether to include outputs of successful parallel runs (skipped by default). +--skip-cleanupSkip cleanup of temporary files created during parallel run. +--debug-resourcesWhether to show resource information while running in parallel. +--include-success-outputsWhether to include outputs of successful parallel runs (skipped by default). ╰──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯ ╭─ Common options ─────────────────────────────────────────────────────────────────────────────────────────────────────╮ ---verbose-vPrint verbose information about performed steps. ---dry-run-DIf dry-run is set, commands are only printed, not executed. ---answer-aForce answer to questions.(y | n | q | yes | no | quit) ---help-hShow this message and exit. +--verbose-vPrint verbose information about performed steps. +--dry-run-DIf dry-run is set, commands are only printed, not executed. +--answer-aForce answer to questions.(y | n | q | yes | no | quit) +--help-hShow this message and exit. ╰──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯ diff --git a/dev/breeze/doc/images/output_sbom_generate-providers-requirements.txt b/dev/breeze/doc/images/output_sbom_generate-providers-requirements.txt index 4402f49b86924..81bea80a19418 100644 --- a/dev/breeze/doc/images/output_sbom_generate-providers-requirements.txt +++ b/dev/breeze/doc/images/output_sbom_generate-providers-requirements.txt @@ -1 +1 @@ -08d9f4a5900b0db8f5ba575c3df75d09 +ef38b63a4007208ff85a9269ecb67073 diff --git a/dev/breeze/doc/images/output_shell.svg b/dev/breeze/doc/images/output_shell.svg index c4a7c212e20b7..ec4a34ea0444c 100644 --- a/dev/breeze/doc/images/output_shell.svg +++ b/dev/breeze/doc/images/output_shell.svg @@ -524,7 +524,7 @@ [default: 3.8]                                               --integrationIntegration(s) to enable when running (can be more than one).                        (all | all-testable | cassandra | celery | drill | kafka | kerberos | mongo | mssql  -| openlineage | otel | pinot | qdrant | redis | statsd | trino)                      +| openlineage | otel | pinot | qdrant | redis | statsd | trino | ydb)                --standalone-dag-processorRun standalone dag processor for start-airflow. --database-isolationRun airflow in database isolation mode. ╰──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯ diff --git a/dev/breeze/doc/images/output_shell.txt b/dev/breeze/doc/images/output_shell.txt index bf6f6c56b417b..b5f8e7d6bc2f1 100644 --- a/dev/breeze/doc/images/output_shell.txt +++ b/dev/breeze/doc/images/output_shell.txt @@ -1 +1 @@ -a5bba63821f20893fc787cc71c6a5c66 +45c750eb8b454a68145b51fed5cc508f diff --git a/dev/breeze/doc/images/output_start-airflow.svg b/dev/breeze/doc/images/output_start-airflow.svg index 9ff84f31fb05c..3eacb5089f4ee 100644 --- a/dev/breeze/doc/images/output_start-airflow.svg +++ b/dev/breeze/doc/images/output_start-airflow.svg @@ -400,7 +400,7 @@ --platformPlatform for Airflow image.(linux/amd64 | linux/arm64) --integrationIntegration(s) to enable when running (can be more than one).                        (all | all-testable | cassandra | celery | drill | kafka | kerberos | mongo | mssql  -| openlineage | otel | pinot | qdrant | redis | statsd | trino)                      +| openlineage | otel | pinot | qdrant | redis | statsd | trino | ydb)                --standalone-dag-processorRun standalone dag processor for start-airflow. --database-isolationRun airflow in database isolation mode. --load-example-dags-eEnable configuration to load example DAGs when starting Airflow. diff --git a/dev/breeze/doc/images/output_start-airflow.txt b/dev/breeze/doc/images/output_start-airflow.txt index 7f9086008d6e3..9e0c09d38c699 100644 --- a/dev/breeze/doc/images/output_start-airflow.txt +++ b/dev/breeze/doc/images/output_start-airflow.txt @@ -1 +1 @@ -ae3a2480bdd0cce8f42234fb0b7804e6 +fe0a3267f29e2d719c96a3f88ae988d8 diff --git a/dev/breeze/doc/images/output_testing_integration-tests.svg b/dev/breeze/doc/images/output_testing_integration-tests.svg index 6801d5d67f46e..db3badfa9d708 100644 --- a/dev/breeze/doc/images/output_testing_integration-tests.svg +++ b/dev/breeze/doc/images/output_testing_integration-tests.svg @@ -213,7 +213,7 @@ --integrationIntegration(s) to enable when running (can be more than one).        (all | all-testable | cassandra | celery | drill | kafka | kerberos  | mongo | mssql | openlineage | otel | pinot | qdrant | redis |      -statsd | trino)                                                      +statsd | trino | ydb)                                                --backend-bDatabase backend to use. If 'none' is selected, breeze starts with   invalid DB configuration and no database and any attempts to connect to Airflow DB will fail.                                             diff --git a/dev/breeze/doc/images/output_testing_integration-tests.txt b/dev/breeze/doc/images/output_testing_integration-tests.txt index 4e797be3663c9..0d85cb0dbf9c0 100644 --- a/dev/breeze/doc/images/output_testing_integration-tests.txt +++ b/dev/breeze/doc/images/output_testing_integration-tests.txt @@ -1 +1 @@ -29357a3838bf8ff90520c3bfbf818bbe +1de6fbc3cb562fe6a9f7275ef28e5744 diff --git a/dev/breeze/doc/images/output_testing_tests.svg b/dev/breeze/doc/images/output_testing_tests.svg index d9b94589e6643..7fe27255df206 100644 --- a/dev/breeze/doc/images/output_testing_tests.svg +++ b/dev/breeze/doc/images/output_testing_tests.svg @@ -474,7 +474,7 @@ --integrationIntegration(s) to enable when running (can be more than one).        (all | all-testable | cassandra | celery | drill | kafka | kerberos  | mongo | mssql | openlineage | otel | pinot | qdrant | redis |      -statsd | trino)                                                      +statsd | trino | ydb)                                                --backend-bDatabase backend to use. If 'none' is selected, breeze starts with   invalid DB configuration and no database and any attempts to connect to Airflow DB will fail.                                             diff --git a/dev/breeze/doc/images/output_testing_tests.txt b/dev/breeze/doc/images/output_testing_tests.txt index e51cb04161b32..8fbfb3ac96510 100644 --- a/dev/breeze/doc/images/output_testing_tests.txt +++ b/dev/breeze/doc/images/output_testing_tests.txt @@ -1 +1 @@ -53dce8653118d4af6c7867ae9a2d0eff +c4e37427ddb49a49268161177fdc41b3 diff --git a/dev/breeze/src/airflow_breeze/global_constants.py b/dev/breeze/src/airflow_breeze/global_constants.py index b672f8afa3732..c0c29a933b37b 100644 --- a/dev/breeze/src/airflow_breeze/global_constants.py +++ b/dev/breeze/src/airflow_breeze/global_constants.py @@ -62,6 +62,7 @@ "qdrant", "redis", "trino", + "ydb", ] OTHER_INTEGRATIONS = ["statsd", "otel", "openlineage"] ALLOWED_DEBIAN_VERSIONS = ["bookworm", "bullseye"] diff --git a/dev/breeze/tests/test_selective_checks.py b/dev/breeze/tests/test_selective_checks.py index ccdf34f1b7e73..e8031df61d1b6 100644 --- a/dev/breeze/tests/test_selective_checks.py +++ b/dev/breeze/tests/test_selective_checks.py @@ -1550,7 +1550,7 @@ def test_upgrade_to_newer_dependencies( "docs-list-as-string": "apache-airflow amazon apache.drill apache.druid apache.hive " "apache.impala apache.pinot common.sql databricks elasticsearch " "exasol google jdbc microsoft.mssql mysql odbc openlineage " - "oracle pgvector postgres presto slack snowflake sqlite teradata trino vertica", + "oracle pgvector postgres presto slack snowflake sqlite teradata trino vertica ydb", }, id="Common SQL provider package python files changed", ), diff --git a/docs/apache-airflow-providers-ydb/changelog.rst b/docs/apache-airflow-providers-ydb/changelog.rst new file mode 100644 index 0000000000000..801c69978c6b6 --- /dev/null +++ b/docs/apache-airflow-providers-ydb/changelog.rst @@ -0,0 +1,25 @@ + + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + + .. NOTE! THIS FILE IS AUTOMATICALLY GENERATED AND WILL BE + OVERWRITTEN WHEN PREPARING PACKAGES. + + .. IF YOU WANT TO MODIFY THIS FILE, YOU SHOULD MODIFY THE TEMPLATE + `PROVIDER_CHANGELOG_TEMPLATE.rst.jinja2` IN the `dev/breeze/src/airflow_breeze/templates` DIRECTORY + +.. include:: ../../airflow/providers/ydb/CHANGELOG.rst diff --git a/docs/apache-airflow-providers-ydb/commits.rst b/docs/apache-airflow-providers-ydb/commits.rst new file mode 100644 index 0000000000000..d509e90e29486 --- /dev/null +++ b/docs/apache-airflow-providers-ydb/commits.rst @@ -0,0 +1,28 @@ + + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + + .. NOTE! THIS FILE IS AUTOMATICALLY GENERATED AND WILL BE + OVERWRITTEN WHEN PREPARING PACKAGES. + + .. IF YOU WANT TO MODIFY THIS FILE, YOU SHOULD MODIFY THE TEMPLATE + `PROVIDER_COMMITS_TEMPLATE.rst.jinja2` IN the `dev/breeze/src/airflow_breeze/templates` DIRECTORY + + .. THE REMAINDER OF THE FILE IS AUTOMATICALLY GENERATED. IT WILL BE OVERWRITTEN AT RELEASE TIME! + +Package apache-airflow-providers-ydb +------------------------------------ diff --git a/docs/apache-airflow-providers-ydb/connections/ydb.rst b/docs/apache-airflow-providers-ydb/connections/ydb.rst new file mode 100644 index 0000000000000..9799189958410 --- /dev/null +++ b/docs/apache-airflow-providers-ydb/connections/ydb.rst @@ -0,0 +1,64 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + + + +.. _howto/connection:ydb: + +YDB Connection +====================== +The YDB connection type provides connection to a YDB database. + +Configuring the Connection +-------------------------- +Host (required) + The host without port to connect to. Acceptable schemes: ``grpc/grpcs``, e.g. ``grpc://my_host``, ``ydb.serverless.yandexcloud.net`` or ``lb.etn9txxxx.ydb.mdb.yandexcloud.net``. + +Database (required) + Specify the database to connect to, e.g. ``/local`` or ``/ru-central1/b1gtl2kg13him37quoo6/etndqstq7ne4v68n6c9b``. + +Port (optional) + The port or the YDB cluster to connect to. Default is 2135. + +Login (optional) + Specify the user name to connect. + +Password (optional) + Specify the password to connect. + +Service account auth JSON (optional) + Service account auth JSON, e.g. {"id": "...", "service_account_id": "...", "private_key": "..."}. + +Service account auth JSON file path (optional) + Service account auth JSON file path. File content looks like: {"id": "...", "service_account_id": "...", "private_key": "..."}. + +Access Token (optional) + User account IAM token. + +Use VM metadata (optional) + Whether to use VM metadata to retrieve access token + + When specifying the connection as URI (in :envvar:`AIRFLOW_CONN_{CONN_ID}` variable) you should specify it + following the standard syntax of DB connections, where extras are passed as parameters + of the URI (note that all components of the URI should be URL-encoded). The connection could be specified as JSON string as well. + + For example: + + .. code-block:: bash + + AIRFLOW_CONN_YDB_DEFAULT1='ydb://grpcs://my_name:my_password@example.com:2135/?database=%2Flocal' + AIRFLOW_CONN_YDB_DEFAULT2='{"conn_type": "ydb", "host": "grpcs://example.com", "login": "my_name", "password": "my_password", "port": 2135, "extra": {"database": "/local"}}' diff --git a/docs/apache-airflow-providers-ydb/index.rst b/docs/apache-airflow-providers-ydb/index.rst new file mode 100644 index 0000000000000..bb2ac3e3a2e68 --- /dev/null +++ b/docs/apache-airflow-providers-ydb/index.rst @@ -0,0 +1,71 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +``apache-airflow-providers-ydb`` +===================================== + + +.. toctree:: + :hidden: + :maxdepth: 1 + :caption: Basics + + Home + Changelog + Security + +.. toctree:: + :hidden: + :maxdepth: 1 + :caption: Guides + + Connection types + YDBExecuteQueryOperator types + + +.. toctree:: + :hidden: + :maxdepth: 1 + :caption: References + + Python API <_api/airflow/providers/ydb/index> + +.. toctree:: + :hidden: + :maxdepth: 1 + :caption: System tests + + System Tests <_api/tests/system/providers/ydb/index> + +.. toctree:: + :hidden: + :maxdepth: 1 + :caption: Resources + + Example DAGs + PyPI Repository + Installing from sources + +.. THE REMAINDER OF THE FILE IS AUTOMATICALLY GENERATED. IT WILL BE OVERWRITTEN AT RELEASE TIME! + + +.. toctree:: + :hidden: + :maxdepth: 1 + :caption: Commits + + Detailed list of commits diff --git a/docs/apache-airflow-providers-ydb/installing-providers-from-sources.rst b/docs/apache-airflow-providers-ydb/installing-providers-from-sources.rst new file mode 100644 index 0000000000000..b4e730f4ff21a --- /dev/null +++ b/docs/apache-airflow-providers-ydb/installing-providers-from-sources.rst @@ -0,0 +1,18 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. include:: ../exts/includes/installing-providers-from-sources.rst diff --git a/docs/apache-airflow-providers-ydb/operators/ydb_operator_howto_guide.rst b/docs/apache-airflow-providers-ydb/operators/ydb_operator_howto_guide.rst new file mode 100644 index 0000000000000..3b7aaab44c2b0 --- /dev/null +++ b/docs/apache-airflow-providers-ydb/operators/ydb_operator_howto_guide.rst @@ -0,0 +1,182 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. _howto/operators:ydb: + +How-to Guide for YDB using YDBExecuteQueryOperator +================================================== + +Introduction +------------ + +Apache Airflow has a robust trove of operators that can be used to implement the various tasks that make up your +workflow. Airflow is essentially a graph (Directed Acyclic Graph) made up of tasks (nodes) and dependencies (edges). + +A task defined or implemented by a operator is a unit of work in your data pipeline. + +The purpose of this guide is to define tasks involving interactions with a YDB database with +the :class:`~airflow.providers.ydb.operators.YDBExecuteQueryOperator`. + +Common database operations with YDBExecuteQueryOperator +------------------------------------------------------- + +YDBExecuteQueryOperator executes either DML or DDL query. Parameters of the operators are: + +- ``sql`` - string with query; +- ``is_ddl`` - flag indicating that query is DDL. By default is ``false``; +- ``conn_id`` - YDB connection id. Default value is ``ydb_default``; +- ``params`` - parameters to be injected into query if it is Jinja template, more details about :doc:`params ` + + +.. note:: + Parameter ``is_ddl`` could be removed in future versions of operator. + +Creating an YDB table +--------------------- + +The code snippets below are based on Airflow-2.0 + +.. exampleinclude:: /../../tests/system/providers/ydb/example_ydb.py + :language: python + :start-after: [START ydb_operator_howto_guide] + :end-before: [END ydb_operator_howto_guide_create_pet_table] + + +Dumping SQL statements into your operator isn't quite appealing and will create maintainability pains somewhere +down to the road. To prevent this, Airflow offers an elegant solution. This is how it works: you simply create +a directory inside the DAG folder called ``sql`` and then put all the SQL files containing your SQL queries inside it. + +Your ``dags/sql/pet_schema.sql`` should like this: + +:: + + -- create pet table + CREATE TABLE pet ( + pet_id INT, + name TEXT NOT NULL, + pet_type TEXT NOT NULL, + birth_date TEXT NOT NULL, + owner TEXT NOT NULL, + PRIMARY KEY (pet_id) + ); + +Now let's refactor ``create_pet_table`` in our DAG: + +.. code-block:: python + + create_pet_table = YDBExecuteQueryOperator( + task_id="create_pet_table", + sql="sql/pet_schema.sql", + ) + + +Inserting data into an YDB table +-------------------------------- + +Let's say we already have the SQL insert statement below in our ``dags/sql/pet_schema.sql`` file: + +:: + + -- populate pet table + UPSERT INTO pet (pet_id, name, pet_type, birth_date, owner) + VALUES (1, 'Max', 'Dog', '2018-07-05', 'Jane'); + + UPSERT INTO pet (pet_id, name, pet_type, birth_date, owner) + VALUES (2, 'Susie', 'Cat', '2019-05-01', 'Phil'); + + UPSERT INTO pet (pet_id, name, pet_type, birth_date, owner) + VALUES (3, 'Lester', 'Hamster', '2020-06-23', 'Lily'); + + UPSERT INTO pet (pet_id, name, pet_type, birth_date, owner) + VALUES (4, 'Quincy', 'Parrot', '2013-08-11', 'Anne'); + +We can then create a YDBExecuteQueryOperator task that populate the ``pet`` table. + +.. code-block:: python + + populate_pet_table = YDBExecuteQueryOperator( + task_id="populate_pet_table", + sql="sql/pet_schema.sql", + ) + + +Fetching records from your YDB table +------------------------------------ + +Fetching records from your YDB table can be as simple as: + +.. code-block:: python + + get_all_pets = YDBExecuteQueryOperator( + task_id="get_all_pets", + sql="SELECT * FROM pet;", + ) + + +Passing parameters into YDBExecuteQueryOperator +----------------------------------------------- + +The BaseOperator class has the ``params`` attribute which is available to the YDBExecuteQueryOperator +by virtue of inheritance. ``params`` make it possible to dynamically pass in parameters in many +interesting ways. + +To find the owner of the pet called 'Lester': + +.. code-block:: python + + get_birth_date = YDBExecuteQueryOperator( + task_id="get_birth_date", + sql="SELECT * FROM pet WHERE birth_date BETWEEN '{{params.begin_date}}' AND '{{params.end_date}}'", + params={"begin_date": "2020-01-01", "end_date": "2020-12-31"}, + ) + +Now lets refactor our ``get_birth_date`` task. Instead of dumping SQL statements directly into our code, let's tidy things up +by creating a sql file. + +:: + + -- dags/sql/birth_date.sql + SELECT * FROM pet WHERE birth_date BETWEEN '{{params.begin_date}}' AND '{{params.end_date}}'; + + +.. code-block:: python + + get_birth_date = YDBExecuteQueryOperator( + task_id="get_birth_date", + sql="sql/birth_date.sql", + params={"begin_date": "2020-01-01", "end_date": "2020-12-31"}, + ) + + +The complete YDB Operator DAG +----------------------------- + +When we put everything together, our DAG should look like this: + +.. exampleinclude:: /../../tests/system/providers/ydb/example_ydb.py + :language: python + :start-after: [START ydb_operator_howto_guide] + :end-before: [END ydb_operator_howto_guide] + + +Conclusion +---------- + +In this how-to guide we explored the Apache Airflow YDBExecuteQueryOperator to connect to YDB database. Let's quickly highlight the key takeaways. +It is best practice to create subdirectory called ``sql`` in your ``dags`` directory where you can store your sql files. +This will make your code more elegant and more maintainable. +And finally, we looked at the templated version of sql script and usage of ``params`` attribute. diff --git a/docs/apache-airflow-providers-ydb/security.rst b/docs/apache-airflow-providers-ydb/security.rst new file mode 100644 index 0000000000000..afa13dac6fc9b --- /dev/null +++ b/docs/apache-airflow-providers-ydb/security.rst @@ -0,0 +1,18 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. include:: ../exts/includes/security.rst diff --git a/docs/apache-airflow/extra-packages-ref.rst b/docs/apache-airflow/extra-packages-ref.rst index 135f26ab21171..ee234491d0294 100644 --- a/docs/apache-airflow/extra-packages-ref.rst +++ b/docs/apache-airflow/extra-packages-ref.rst @@ -240,6 +240,8 @@ These are extras that add dependencies needed for integration with external serv +---------------------+-----------------------------------------------------+-----------------------------------------------------+ | yandex | ``pip install 'apache-airflow[yandex]'`` | Yandex.cloud hooks and operators | +---------------------+-----------------------------------------------------+-----------------------------------------------------+ +| ydb | ``pip install 'apache-airflow[ydb]'`` | YDB hooks and operators | ++---------------------+-----------------------------------------------------+-----------------------------------------------------+ | zendesk | ``pip install 'apache-airflow[zendesk]'`` | Zendesk hooks | +---------------------+-----------------------------------------------------+-----------------------------------------------------+ diff --git a/docs/integration-logos/ydb/ydb.png b/docs/integration-logos/ydb/ydb.png new file mode 100644 index 0000000000000..fe10bb6bcb839 Binary files /dev/null and b/docs/integration-logos/ydb/ydb.png differ diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt index 3b30f20fc59ea..80d232247a588 100644 --- a/docs/spelling_wordlist.txt +++ b/docs/spelling_wordlist.txt @@ -1843,6 +1843,8 @@ Yandex yandex yandexcloud yarnpkg +YDB +ydb Yieldr yml youtrack diff --git a/generated/provider_dependencies.json b/generated/provider_dependencies.json index 44fd883ef1143..9bd435e9f2fa1 100644 --- a/generated/provider_dependencies.json +++ b/generated/provider_dependencies.json @@ -1327,6 +1327,20 @@ "excluded-python-versions": [], "state": "ready" }, + "ydb": { + "deps": [ + "apache-airflow-providers-common-sql>=1.3.1", + "apache-airflow>=2.7.0", + "ydb>=3.11.3" + ], + "devel-deps": [], + "plugins": [], + "cross-providers-deps": [ + "common.sql" + ], + "excluded-python-versions": [], + "state": "ready" + }, "zendesk": { "deps": [ "apache-airflow>=2.7.0", diff --git a/hatch_build.py b/hatch_build.py index 5af5b01f0355e..a51689d9e2679 100644 --- a/hatch_build.py +++ b/hatch_build.py @@ -492,7 +492,7 @@ # We should remove this dependency when Providers are limited to Airflow 2.7+ # as we replaced the usage of unicodecsv with csv in Airflow 2.7 # See https://github.com/apache/airflow/pull/31693 - # We should also remove "licenses/LICENSE-unicodecsv.txt" file when we remove this dependency + # We should also remove "3rd-party-licenses/LICENSE-unicodecsv.txt" file when we remove this dependency "unicodecsv>=0.14.1", # The Universal Pathlib provides Pathlib-like interface for FSSPEC "universal-pathlib>=0.2.2", diff --git a/pyproject.toml b/pyproject.toml index f4ab327b4571e..6112b1c6d7de2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -140,7 +140,7 @@ dynamic = ["version", "optional-dependencies", "dependencies"] # mysql, neo4j, odbc, openai, openfaas, openlineage, opensearch, opsgenie, oracle, pagerduty, # papermill, pgvector, pinecone, postgres, presto, qdrant, redis, salesforce, samba, segment, # sendgrid, sftp, singularity, slack, smtp, snowflake, sqlite, ssh, tableau, tabular, telegram, -# teradata, trino, vertica, weaviate, yandex, zendesk +# teradata, trino, vertica, weaviate, yandex, ydb, zendesk # # END PROVIDER EXTRAS HERE @@ -246,8 +246,7 @@ target-version = "py38" line-length = 110 extend-exclude = [ ".eggs", - "airflow/_vendor/*", - "airflow/providers/google/ads/_vendor/*", + "*/_vendor/*", # The files generated by stubgen aren't 100% valid syntax it turns out, and we don't ship them, so we can # ignore them in ruff "airflow/providers/common/sql/*/*.pyi", @@ -608,6 +607,10 @@ disable_error_code = [ module="airflow.migrations.*" ignore_errors = true +[[tool.mypy.overrides]] +module="airflow.*._vendor.*" +ignore_errors = true + [[tool.mypy.overrides]] module= [ "google.cloud.*", diff --git a/scripts/ci/docker-compose/integration-ydb.yml b/scripts/ci/docker-compose/integration-ydb.yml new file mode 100644 index 0000000000000..55f25aed04668 --- /dev/null +++ b/scripts/ci/docker-compose/integration-ydb.yml @@ -0,0 +1,35 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +--- +version: "3.8" +services: + ydb: + image: cr.yandex/yc/yandex-docker-local-ydb:latest + labels: + breeze.description: "Integration required for YDB tests." + hostname: ydb + environment: + YDB_USE_IN_MEMORY_PDISKS: true + GRPC_PORT: 2136 + restart: "on-failure" + + airflow: + environment: + - INTEGRATION_YDB=true + depends_on: + ydb: + condition: service_started diff --git a/scripts/ci/pre_commit/mypy_folder.py b/scripts/ci/pre_commit/mypy_folder.py index e17dc4795fac9..366c54aae4c12 100755 --- a/scripts/ci/pre_commit/mypy_folder.py +++ b/scripts/ci/pre_commit/mypy_folder.py @@ -88,7 +88,7 @@ if res.returncode != 0: if ci_environment: console.print( - "[yellow]You are running mypy with the folders selected. If you want to" + "[yellow]You are running mypy with the folders selected. If you want to " "reproduce it locally, you need to run the following command:\n" ) console.print("pre-commit run --hook-stage manual mypy- --all-files\n") diff --git a/scripts/in_container/check_environment.sh b/scripts/in_container/check_environment.sh index 8ddcaa9fcae0b..5dce3a60a939c 100755 --- a/scripts/in_container/check_environment.sh +++ b/scripts/in_container/check_environment.sh @@ -199,6 +199,10 @@ if [[ ${INTEGRATION_DRILL} == "true" ]]; then check_service "drill" "run_nc drill 8047" 50 fi +if [[ ${INTEGRATION_YDB} == "true" ]]; then + check_service "YDB Cluster" "run_nc ydb 2136" 50 +fi + if [[ ${EXIT_CODE} != 0 ]]; then echo echo "Error: some of the CI environment failed to initialize!" diff --git a/tests/always/test_project_structure.py b/tests/always/test_project_structure.py index 5fb2b819caca9..6dfabe90bd47a 100644 --- a/tests/always/test_project_structure.py +++ b/tests/always/test_project_structure.py @@ -166,6 +166,8 @@ def test_providers_modules_should_have_tests(self): modules_files = list(os.path.relpath(f, ROOT_FOLDER) for f in modules_files) # Exclude example_dags modules_files = list(f for f in modules_files if "/example_dags/" not in f) + # Exclude _vendor + modules_files = list(f for f in modules_files if "/_vendor/" not in f) # Exclude __init__.py modules_files = list(f for f in modules_files if not f.endswith("__init__.py")) # Change airflow/ to tests/ diff --git a/tests/integration/providers/ydb/__init__.py b/tests/integration/providers/ydb/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/tests/integration/providers/ydb/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/integration/providers/ydb/hooks/__init__.py b/tests/integration/providers/ydb/hooks/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/tests/integration/providers/ydb/hooks/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/integration/providers/ydb/operators/__init__.py b/tests/integration/providers/ydb/operators/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/tests/integration/providers/ydb/operators/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/integration/providers/ydb/operators/test_ydb.py b/tests/integration/providers/ydb/operators/test_ydb.py new file mode 100644 index 0000000000000..6e54ede7e7a26 --- /dev/null +++ b/tests/integration/providers/ydb/operators/test_ydb.py @@ -0,0 +1,60 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from unittest.mock import MagicMock + +import pytest + +from airflow.models.connection import Connection +from airflow.models.dag import DAG +from airflow.providers.common.sql.hooks.sql import fetch_all_handler +from airflow.providers.ydb.operators.ydb import YDBExecuteQueryOperator +from airflow.utils import timezone + +DEFAULT_DATE = timezone.datetime(2024, 1, 1) + + +@pytest.fixture(scope="module", autouse=True) +def ydb_connections(): + """Create YDB connection which use for testing purpose.""" + c = Connection( + conn_id="ydb_default", conn_type="ydb", host="grpc://ydb", port=2136, extra={"database": "/local"} + ) + + with pytest.MonkeyPatch.context() as mp: + mp.setenv("AIRFLOW_CONN_YDB_DEFAULT", c.as_json()) + yield + + +@pytest.mark.integration("ydb") +class TestYDBExecuteQueryOperator: + def setup_method(self): + args = {"owner": "airflow", "start_date": DEFAULT_DATE} + + self.dag = DAG("test_ydb_dag_id", default_args=args) + + self.mock_context = MagicMock() + + def test_execute_hello(self): + operator = YDBExecuteQueryOperator( + task_id="simple_sql", sql="select 987", is_ddl=False, handler=fetch_all_handler + ) + + results = operator.execute(self.mock_context) + assert results == [(987,)] diff --git a/tests/providers/ydb/__init__.py b/tests/providers/ydb/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/tests/providers/ydb/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/providers/ydb/hooks/__init__.py b/tests/providers/ydb/hooks/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/tests/providers/ydb/hooks/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/providers/ydb/hooks/test_ydb.py b/tests/providers/ydb/hooks/test_ydb.py new file mode 100644 index 0000000000000..f644fc88cd57c --- /dev/null +++ b/tests/providers/ydb/hooks/test_ydb.py @@ -0,0 +1,94 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from unittest.mock import PropertyMock, patch + +from airflow.models import Connection +from airflow.providers.ydb.hooks.ydb import YDBHook + + +class FakeDriver: + def wait(*args, **kwargs): + pass + + +class FakeSessionPoolImpl: + def __init__(self, driver): + self._driver = driver + + +class FakeSessionPool: + def __init__(self, driver): + self._pool_impl = FakeSessionPoolImpl(driver) + + +class FakeYDBCursor: + def __init__(self, *args, **kwargs): + self.description = True + + def execute(self, operation, parameters=None): + return True + + def fetchone(self): + return 1, 2 + + def fetchmany(self, size=None): + return [(1, 2), (2, 3), (3, 4)][0:size] + + def fetchall(self): + return [(1, 2), (2, 3), (3, 4)] + + def close(self): + pass + + @property + def rowcount(self): + return 1 + + +@patch("airflow.hooks.base.BaseHook.get_connection") +@patch("ydb.Driver") +@patch("ydb.SessionPool") +@patch( + "airflow.providers.ydb.hooks._vendor.dbapi.connection.Connection._cursor_class", new_callable=PropertyMock +) +def test_execute(cursor_class, mock_session_pool, mock_driver, mock_get_connection): + mock_get_connection.return_value = Connection( + conn_type="ydb", + host="grpc://localhost", + port=2135, + login="my_user", + password="my_pwd", + extra={"database": "/my_db1"}, + ) + driver_instance = FakeDriver() + + cursor_class.return_value = FakeYDBCursor + mock_driver.return_value = driver_instance + mock_session_pool.return_value = FakeSessionPool(driver_instance) + + hook = YDBHook() + assert hook.get_uri() == "ydb://grpc://my_user:my_pwd@localhost:2135/?database=%2Fmy_db1" + with hook.get_conn() as conn: + with conn.cursor() as cur: + assert cur.execute("INSERT INTO table VALUES ('aaa'), ('bbbb')") + conn.commit() + assert cur.execute("SELECT * FROM table") + assert cur.fetchone() == (1, 2) + assert cur.fetchmany(2) == [(1, 2), (2, 3)] + assert cur.fetchall() == [(1, 2), (2, 3), (3, 4)] diff --git a/tests/providers/ydb/operators/__init__.py b/tests/providers/ydb/operators/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/tests/providers/ydb/operators/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/providers/ydb/operators/test_ydb.py b/tests/providers/ydb/operators/test_ydb.py new file mode 100644 index 0000000000000..ad1d641e1a1da --- /dev/null +++ b/tests/providers/ydb/operators/test_ydb.py @@ -0,0 +1,125 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from datetime import datetime, timedelta +from unittest.mock import MagicMock, PropertyMock, patch + +import pytest + +from airflow.models import Connection +from airflow.models.dag import DAG +from airflow.providers.common.sql.hooks.sql import fetch_all_handler, fetch_one_handler +from airflow.providers.ydb.operators.ydb import YDBExecuteQueryOperator +from airflow.utils import timezone + + +@pytest.mark.db_test +def test_sql_templating(create_task_instance_of_operator): + ti = create_task_instance_of_operator( + YDBExecuteQueryOperator, + sql="SELECT * FROM pet WHERE birth_date BETWEEN '{{params.begin_date}}' AND '{{params.end_date}}'", + params={"begin_date": "2020-01-01", "end_date": "2020-12-31"}, + ydb_conn_id="ydb_default1", + dag_id="test_template_body_templating_dag", + task_id="test_template_body_templating_task", + execution_date=timezone.datetime(2024, 2, 1, tzinfo=timezone.utc), + ) + ti.render_templates() + task: YDBExecuteQueryOperator = ti.task + assert task.sql == "SELECT * FROM pet WHERE birth_date BETWEEN '2020-01-01' AND '2020-12-31'" + + +class FakeDriver: + def wait(*args, **kwargs): + pass + + +class FakeSessionPoolImpl: + def __init__(self, driver): + self._driver = driver + + +class FakeSessionPool: + def __init__(self, driver): + self._pool_impl = FakeSessionPoolImpl(driver) + + +class FakeYDBCursor: + def __init__(self, *args, **kwargs): + self.description = True + + def execute(self, operation, parameters=None): + return True + + def fetchall(self): + return "fetchall: result" + + def fetchone(self): + return "fetchone: result" + + def close(self): + pass + + @property + def rowcount(self): + return 1 + + +class TestYDBExecuteQueryOperator: + def setup_method(self): + dag_id = "test_dag" + self.dag = DAG( + dag_id, + default_args={ + "owner": "airflow", + "start_date": datetime.today(), + "end_date": datetime.today() + timedelta(days=1), + }, + schedule="@once", + ) + + @patch("airflow.hooks.base.BaseHook.get_connection") + @patch("ydb.Driver") + @patch("ydb.SessionPool") + @patch( + "airflow.providers.ydb.hooks._vendor.dbapi.connection.Connection._cursor_class", + new_callable=PropertyMock, + ) + def test_execute_query(self, cursor_class, mock_session_pool, mock_driver, mock_get_connection): + mock_get_connection.return_value = Connection( + conn_type="ydb", host="localhost", extra={"database": "my_db"} + ) + driver_instance = FakeDriver() + + cursor_class.return_value = FakeYDBCursor + mock_driver.return_value = driver_instance + mock_session_pool.return_value = FakeSessionPool(driver_instance) + context = {"ti": MagicMock()} + operator = YDBExecuteQueryOperator( + task_id="simple_sql", sql="select 987", is_ddl=False, handler=fetch_one_handler + ) + + results = operator.execute(context) + assert results == "fetchone: result" + + operator = YDBExecuteQueryOperator( + task_id="simple_sql", sql="select 987", is_ddl=False, handler=fetch_all_handler + ) + + results = operator.execute(context) + assert results == "fetchall: result" diff --git a/tests/providers/ydb/utils/__init__.py b/tests/providers/ydb/utils/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/tests/providers/ydb/utils/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/providers/ydb/utils/test_credentials.py b/tests/providers/ydb/utils/test_credentials.py new file mode 100644 index 0000000000000..864069ddbcc85 --- /dev/null +++ b/tests/providers/ydb/utils/test_credentials.py @@ -0,0 +1,158 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from unittest.mock import patch + +from airflow.models.connection import Connection +from airflow.providers.ydb.utils.credentials import get_credentials_from_connection + +TEST_ENDPOINT = "my_endpoint" +TEST_DATABASE = "/my_db" +MAGIC_CONST = 42 + + +@patch("ydb.StaticCredentials") +def test_static_creds(mock): + mock.return_value = MAGIC_CONST + c = Connection(conn_type="ydb", host="localhost", login="my_login", password="my_pwd") + credentials = get_credentials_from_connection(TEST_ENDPOINT, TEST_DATABASE, c, {}) + assert credentials == MAGIC_CONST + + assert len(mock.call_args.args) == 1 + driver_config = mock.call_args.args[0] + assert driver_config.endpoint == TEST_ENDPOINT + assert driver_config.database == TEST_DATABASE + assert mock.call_args.kwargs == {"user": "my_login", "password": "my_pwd"} + + +@patch("ydb.AccessTokenCredentials") +def test_token_creds(mock): + mock.return_value = MAGIC_CONST + c = Connection(conn_type="ydb", host="localhost") + credentials = get_credentials_from_connection(TEST_ENDPOINT, TEST_DATABASE, c, {"token": "my_token"}) + assert credentials == MAGIC_CONST + + mock.assert_called_with("my_token") + + +@patch("ydb.AnonymousCredentials") +def test_anonymous_creds(mock): + mock.return_value = MAGIC_CONST + c = Connection(conn_type="ydb", host="localhost") + credentials = get_credentials_from_connection(TEST_ENDPOINT, TEST_DATABASE, c) + assert credentials == MAGIC_CONST + mock.assert_called_once() + + +@patch("ydb.iam.auth.MetadataUrlCredentials") +def test_vm_metadata_creds(mock): + mock.return_value = MAGIC_CONST + c = Connection(conn_type="ydb", host="localhost") + credentials = get_credentials_from_connection(TEST_ENDPOINT, TEST_DATABASE, c, {"use_vm_metadata": True}) + assert credentials == MAGIC_CONST + mock.assert_called_once() + + +@patch("ydb.iam.auth.BaseJWTCredentials.from_file") +def test_service_account_json_path_creds(mock): + mock.return_value = MAGIC_CONST + c = Connection(conn_type="ydb", host="localhost") + + credentials = get_credentials_from_connection( + TEST_ENDPOINT, TEST_DATABASE, c, {"service_account_json_path": "my_path"} + ) + assert credentials == MAGIC_CONST + mock.assert_called_once() + + assert len(mock.call_args.args) == 2 + assert mock.call_args.args[1] == "my_path" + + +def test_creds_priority(): + # 1. static creds + with patch("ydb.StaticCredentials") as mock: + c = Connection(conn_type="ydb", host="localhost", login="my_login", password="my_pwd") + mock.return_value = MAGIC_CONST + credentials = get_credentials_from_connection( + TEST_ENDPOINT, + TEST_DATABASE, + c, + { + "service_account_json_path": "my_path", + "use_vm_metadata": True, + "token": "my_token", + }, + ) + assert credentials == MAGIC_CONST + mock.assert_called_once() + + # 2. token + with patch("ydb.AccessTokenCredentials") as mock: + c = Connection(conn_type="ydb", host="localhost", password="my_pwd") + mock.return_value = MAGIC_CONST + credentials = get_credentials_from_connection( + TEST_ENDPOINT, + TEST_DATABASE, + c, + { + "service_account_json_path": "my_path", + "use_vm_metadata": True, + "token": "my_token", + }, + ) + assert credentials == MAGIC_CONST + mock.assert_called_once() + + # 3. service account json path + with patch("ydb.iam.auth.BaseJWTCredentials.from_file") as mock: + c = Connection(conn_type="ydb", host="localhost") + mock.return_value = MAGIC_CONST + credentials = get_credentials_from_connection( + TEST_ENDPOINT, + TEST_DATABASE, + c, + { + "service_account_json_path": "my_path", + "use_vm_metadata": True, + }, + ) + assert credentials == MAGIC_CONST + mock.assert_called_once() + + # 4. vm metadata + with patch("ydb.iam.auth.MetadataUrlCredentials") as mock: + c = Connection(conn_type="ydb", host="localhost") + mock.return_value = MAGIC_CONST + credentials = get_credentials_from_connection( + TEST_ENDPOINT, + TEST_DATABASE, + c, + { + "use_vm_metadata": True, + }, + ) + assert credentials == MAGIC_CONST + mock.assert_called_once() + + # 5. anonymous + with patch("ydb.AnonymousCredentials") as mock: + c = Connection(conn_type="ydb", host="localhost") + mock.return_value = MAGIC_CONST + credentials = get_credentials_from_connection(TEST_ENDPOINT, TEST_DATABASE, c, {}) + assert credentials == MAGIC_CONST + mock.assert_called_once() diff --git a/tests/providers/ydb/utils/test_defaults.py b/tests/providers/ydb/utils/test_defaults.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/tests/providers/ydb/utils/test_defaults.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/system/providers/ydb/__init__.py b/tests/system/providers/ydb/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/tests/system/providers/ydb/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/system/providers/ydb/example_ydb.py b/tests/system/providers/ydb/example_ydb.py new file mode 100644 index 0000000000000..41df0333dd316 --- /dev/null +++ b/tests/system/providers/ydb/example_ydb.py @@ -0,0 +1,98 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import datetime +import os + +from airflow import DAG +from airflow.providers.ydb.operators.ydb import YDBExecuteQueryOperator + +# [START ydb_operator_howto_guide] + + +# create_pet_table, populate_pet_table, get_all_pets, and get_birth_date are examples of tasks created by +# instantiating the YDB Operator + +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") +DAG_ID = "ydb_operator_dag" + +with DAG( + dag_id=DAG_ID, + start_date=datetime.datetime(2020, 2, 2), + schedule="@once", + catchup=False, +) as dag: + # [START ydb_operator_howto_guide_create_pet_table] + create_pet_table = YDBExecuteQueryOperator( + task_id="create_pet_table", + sql=""" + CREATE TABLE pet ( + pet_id INT, + name TEXT NOT NULL, + pet_type TEXT NOT NULL, + birth_date TEXT NOT NULL, + owner TEXT NOT NULL, + PRIMARY KEY (pet_id) + ); + """, + is_ddl=True, # must be specified for DDL queries + ) + + # [END ydb_operator_howto_guide_create_pet_table] + # [START ydb_operator_howto_guide_populate_pet_table] + populate_pet_table = YDBExecuteQueryOperator( + task_id="populate_pet_table", + sql=""" + UPSERT INTO pet (pet_id, name, pet_type, birth_date, owner) + VALUES (1, 'Max', 'Dog', '2018-07-05', 'Jane'); + + UPSERT INTO pet (pet_id, name, pet_type, birth_date, owner) + VALUES (2, 'Susie', 'Cat', '2019-05-01', 'Phil'); + + UPSERT INTO pet (pet_id, name, pet_type, birth_date, owner) + VALUES (3, 'Lester', 'Hamster', '2020-06-23', 'Lily'); + + UPSERT INTO pet (pet_id, name, pet_type, birth_date, owner) + VALUES (4, 'Quincy', 'Parrot', '2013-08-11', 'Anne'); + """, + ) + # [END ydb_operator_howto_guide_populate_pet_table] + # [START ydb_operator_howto_guide_get_all_pets] + get_all_pets = YDBExecuteQueryOperator(task_id="get_all_pets", sql="SELECT * FROM pet;") + # [END ydb_operator_howto_guide_get_all_pets] + # [START ydb_operator_howto_guide_get_birth_date] + get_birth_date = YDBExecuteQueryOperator( + task_id="get_birth_date", + sql="SELECT * FROM pet WHERE birth_date BETWEEN '{{params.begin_date}}' AND '{{params.end_date}}'", + params={"begin_date": "2020-01-01", "end_date": "2020-12-31"}, + ) + # [END ydb_operator_howto_guide_get_birth_date] + + create_pet_table >> populate_pet_table >> get_all_pets >> get_birth_date + # [END ydb_operator_howto_guide] + + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() + +from tests.system.utils import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag)