From 2a084ee8d7fb27cbc3ad28f4845c5d20c82f0cbe Mon Sep 17 00:00:00 2001 From: Leonard Meerwood Date: Tue, 14 Jun 2022 21:24:07 +1000 Subject: [PATCH] Update Oracle library to latest version (#24311) --- .../google/cloud/transfers/oracle_to_gcs.py | 20 ++--- airflow/providers/oracle/hooks/oracle.py | 53 +++++------- .../operators/transfer/oracle_to_gcs.rst | 2 +- .../connections/oracle.rst | 4 +- .../apache-airflow-providers-oracle/index.rst | 2 +- docs/conf.py | 2 +- setup.py | 2 +- .../cloud/transfers/test_oracle_to_gcs.py | 6 +- .../test_oracle_to_azure_data_lake.py | 8 +- tests/providers/oracle/hooks/test_oracle.py | 85 +++++-------------- .../oracle/transfers/test_oracle_to_oracle.py | 4 +- 11 files changed, 69 insertions(+), 119 deletions(-) diff --git a/airflow/providers/google/cloud/transfers/oracle_to_gcs.py b/airflow/providers/google/cloud/transfers/oracle_to_gcs.py index 3306c9801063a..d5fa696c50732 100644 --- a/airflow/providers/google/cloud/transfers/oracle_to_gcs.py +++ b/airflow/providers/google/cloud/transfers/oracle_to_gcs.py @@ -22,7 +22,7 @@ from decimal import Decimal from typing import Dict -import cx_Oracle +import oracledb from airflow.providers.google.cloud.transfers.sql_to_gcs import BaseSQLToGCSOperator from airflow.providers.oracle.hooks.oracle import OracleHook @@ -45,15 +45,15 @@ class OracleToGCSOperator(BaseSQLToGCSOperator): ui_color = '#a0e08c' type_map = { - cx_Oracle.DB_TYPE_BINARY_DOUBLE: 'DECIMAL', - cx_Oracle.DB_TYPE_BINARY_FLOAT: 'DECIMAL', - cx_Oracle.DB_TYPE_BINARY_INTEGER: 'INTEGER', - cx_Oracle.DB_TYPE_BOOLEAN: 'BOOLEAN', - cx_Oracle.DB_TYPE_DATE: 'TIMESTAMP', - cx_Oracle.DB_TYPE_NUMBER: 'NUMERIC', - cx_Oracle.DB_TYPE_TIMESTAMP: 'TIMESTAMP', - cx_Oracle.DB_TYPE_TIMESTAMP_LTZ: 'TIMESTAMP', - cx_Oracle.DB_TYPE_TIMESTAMP_TZ: 'TIMESTAMP', + oracledb.DB_TYPE_BINARY_DOUBLE: 'DECIMAL', # type: ignore + oracledb.DB_TYPE_BINARY_FLOAT: 'DECIMAL', # type: ignore + oracledb.DB_TYPE_BINARY_INTEGER: 'INTEGER', # type: ignore + oracledb.DB_TYPE_BOOLEAN: 'BOOLEAN', # type: ignore + oracledb.DB_TYPE_DATE: 'TIMESTAMP', # type: ignore + oracledb.DB_TYPE_NUMBER: 'NUMERIC', # type: ignore + oracledb.DB_TYPE_TIMESTAMP: 'TIMESTAMP', # type: ignore + oracledb.DB_TYPE_TIMESTAMP_LTZ: 'TIMESTAMP', # type: ignore + oracledb.DB_TYPE_TIMESTAMP_TZ: 'TIMESTAMP', # type: ignore } def __init__(self, *, oracle_conn_id='oracle_default', ensure_utc=False, **kwargs): diff --git a/airflow/providers/oracle/hooks/oracle.py b/airflow/providers/oracle/hooks/oracle.py index c018c7c6c04bf..57b5bead41042 100644 --- a/airflow/providers/oracle/hooks/oracle.py +++ b/airflow/providers/oracle/hooks/oracle.py @@ -21,7 +21,7 @@ from datetime import datetime from typing import Dict, List, Optional, Union -import cx_Oracle +import oracledb try: import numpy @@ -57,7 +57,7 @@ class OracleHook(DbApiHook): supports_autocommit = True - def get_conn(self) -> 'OracleHook': + def get_conn(self) -> oracledb.Connection: """ Returns a oracle connection object Optional parameters for using a custom DSN connection @@ -84,8 +84,8 @@ def get_conn(self) -> 'OracleHook': ) } - see more param detail in - `cx_Oracle.connect `_ + see more param detail in `oracledb.connect + `_ """ @@ -98,9 +98,9 @@ def get_conn(self) -> 'OracleHook': service_name = conn.extra_dejson.get('service_name') port = conn.port if conn.port else 1521 if conn.host and sid and not service_name: - conn_config['dsn'] = cx_Oracle.makedsn(conn.host, port, sid) + conn_config['dsn'] = oracledb.makedsn(conn.host, port, sid) elif conn.host and service_name and not sid: - conn_config['dsn'] = cx_Oracle.makedsn(conn.host, port, service_name=service_name) + conn_config['dsn'] = oracledb.makedsn(conn.host, port, service_name=service_name) else: dsn = conn.extra_dejson.get('dsn') if dsn is None: @@ -119,51 +119,40 @@ def get_conn(self) -> 'OracleHook': dsn += "/" + conn.schema conn_config['dsn'] = dsn - if 'encoding' in conn.extra_dejson: - conn_config['encoding'] = conn.extra_dejson.get('encoding') - # if `encoding` is specific but `nencoding` is not - # `nencoding` should use same values as `encoding` to set encoding, inspired by - # https://github.com/oracle/python-cx_Oracle/issues/157#issuecomment-371877993 - if 'nencoding' not in conn.extra_dejson: - conn_config['nencoding'] = conn.extra_dejson.get('encoding') - if 'nencoding' in conn.extra_dejson: - conn_config['nencoding'] = conn.extra_dejson.get('nencoding') - if 'threaded' in conn.extra_dejson: - conn_config['threaded'] = conn.extra_dejson.get('threaded') if 'events' in conn.extra_dejson: conn_config['events'] = conn.extra_dejson.get('events') mode = conn.extra_dejson.get('mode', '').lower() if mode == 'sysdba': - conn_config['mode'] = cx_Oracle.SYSDBA + conn_config['mode'] = oracledb.AUTH_MODE_SYSDBA elif mode == 'sysasm': - conn_config['mode'] = cx_Oracle.SYSASM + conn_config['mode'] = oracledb.AUTH_MODE_SYSASM elif mode == 'sysoper': - conn_config['mode'] = cx_Oracle.SYSOPER + conn_config['mode'] = oracledb.AUTH_MODE_SYSOPER elif mode == 'sysbkp': - conn_config['mode'] = cx_Oracle.SYSBKP + conn_config['mode'] = oracledb.AUTH_MODE_SYSBKP elif mode == 'sysdgd': - conn_config['mode'] = cx_Oracle.SYSDGD + conn_config['mode'] = oracledb.AUTH_MODE_SYSDGD elif mode == 'syskmt': - conn_config['mode'] = cx_Oracle.SYSKMT + conn_config['mode'] = oracledb.AUTH_MODE_SYSKMT elif mode == 'sysrac': - conn_config['mode'] = cx_Oracle.SYSRAC + conn_config['mode'] = oracledb.AUTH_MODE_SYSRAC purity = conn.extra_dejson.get('purity', '').lower() if purity == 'new': - conn_config['purity'] = cx_Oracle.ATTR_PURITY_NEW + conn_config['purity'] = oracledb.PURITY_NEW elif purity == 'self': - conn_config['purity'] = cx_Oracle.ATTR_PURITY_SELF + conn_config['purity'] = oracledb.PURITY_SELF elif purity == 'default': - conn_config['purity'] = cx_Oracle.ATTR_PURITY_DEFAULT + conn_config['purity'] = oracledb.PURITY_DEFAULT - conn = cx_Oracle.connect(**conn_config) + conn = oracledb.connect(**conn_config) if mod is not None: conn.module = mod # if Connection.schema is defined, set schema after connecting successfully # cannot be part of conn_config - # https://cx-oracle.readthedocs.io/en/latest/api_manual/connection.html?highlight=schema#Connection.current_schema + # https://python-oracledb.readthedocs.io/en/latest/api_manual/connection.html?highlight=schema#Connection.current_schema # Only set schema when not using conn.schema as Service Name if schema and service_name: conn.current_schema = schema @@ -184,7 +173,7 @@ def insert_rows( the whole set of inserts is treated as one transaction Changes from standard DbApiHook implementation: - - Oracle SQL queries in cx_Oracle can not be terminated with a semicolon (`;`) + - Oracle SQL queries in oracledb can not be terminated with a semicolon (`;`) - Replace NaN values with NULL using `numpy.nan_to_num` (not using `is_nan()` because of input types error for strings) - Coerce datetime cells to Oracle DATETIME format during insert @@ -245,7 +234,7 @@ def bulk_insert_rows( commit_every: int = 5000, ): """ - A performant bulk insert for cx_Oracle + A performant bulk insert for oracledb that uses prepared statements via `executemany()`. For best performance, pass in `rows` as an iterator. @@ -307,7 +296,7 @@ def callproc( provided `parameters` argument. See - https://cx-oracle.readthedocs.io/en/latest/api_manual/cursor.html#Cursor.var + https://python-oracledb.readthedocs.io/en/latest/api_manual/cursor.html#Cursor.var for further reference. """ if parameters is None: diff --git a/docs/apache-airflow-providers-google/operators/transfer/oracle_to_gcs.rst b/docs/apache-airflow-providers-google/operators/transfer/oracle_to_gcs.rst index 9299d417a5643..2c23209ae46f3 100644 --- a/docs/apache-airflow-providers-google/operators/transfer/oracle_to_gcs.rst +++ b/docs/apache-airflow-providers-google/operators/transfer/oracle_to_gcs.rst @@ -49,5 +49,5 @@ Reference --------- For further information, look at: -* `cx_Oracle Documentation `__ +* `oracledb Documentation `__ * `Google Cloud Storage Documentation `__ diff --git a/docs/apache-airflow-providers-oracle/connections/oracle.rst b/docs/apache-airflow-providers-oracle/connections/oracle.rst index 4a26c36b3142a..3934b6648e3ac 100644 --- a/docs/apache-airflow-providers-oracle/connections/oracle.rst +++ b/docs/apache-airflow-providers-oracle/connections/oracle.rst @@ -82,8 +82,8 @@ Extra (optional) Schema = "orcl" - More details on all Oracle connect parameters supported can be found in `cx_Oracle documentation - `_. + More details on all Oracle connect parameters supported can be found in `oracledb documentation + `_. Information on creating an Oracle Connection through the web user interface can be found in Airflow's :doc:`Managing Connections Documentation `. diff --git a/docs/apache-airflow-providers-oracle/index.rst b/docs/apache-airflow-providers-oracle/index.rst index c2792fff698b8..09f041922a05b 100644 --- a/docs/apache-airflow-providers-oracle/index.rst +++ b/docs/apache-airflow-providers-oracle/index.rst @@ -79,7 +79,7 @@ Requirements PIP package Version required ================== ================== ``apache-airflow`` ``>=2.2.0`` -``cx_Oracle`` ``>=5.1.2`` +``oracledb`` ``>=1.0.0`` ================== ================== .. include:: ../../airflow/providers/oracle/CHANGELOG.rst diff --git a/docs/conf.py b/docs/conf.py index 47334381fc8e4..72080b43cba0e 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -551,7 +551,6 @@ def _get_params(root_schema: dict, prefix: str = "", default_section: str = "") 'celery', 'cloudant', 'cryptography', - 'cx_Oracle', 'datadog', 'distributed', 'docker', @@ -567,6 +566,7 @@ def _get_params(root_schema: dict, prefix: str = "", default_section: str = "") 'kubernetes', 'msrestazure', 'oss2', + 'oracledb', 'pandas', 'pandas_gbq', 'paramiko', diff --git a/setup.py b/setup.py index a07368d127732..d56e3e6233dc8 100644 --- a/setup.py +++ b/setup.py @@ -463,7 +463,7 @@ def write_version(filename: str = os.path.join(*[my_dir, "airflow", "git_version 'opsgenie-sdk>=2.1.5', ] oracle = [ - 'cx_Oracle>=5.1.2', + 'oracledb>=1.0.0', ] pagerduty = [ 'pdpyras>=4.1.2', diff --git a/tests/providers/google/cloud/transfers/test_oracle_to_gcs.py b/tests/providers/google/cloud/transfers/test_oracle_to_gcs.py index b90510cbae19c..38a46a8d997d6 100644 --- a/tests/providers/google/cloud/transfers/test_oracle_to_gcs.py +++ b/tests/providers/google/cloud/transfers/test_oracle_to_gcs.py @@ -19,7 +19,7 @@ import unittest from unittest import mock -import cx_Oracle +import oracledb from airflow.providers.google.cloud.transfers.oracle_to_gcs import OracleToGCSOperator @@ -32,8 +32,8 @@ ROWS = [('mock_row_content_1', 42), ('mock_row_content_2', 43), ('mock_row_content_3', 44)] CURSOR_DESCRIPTION = ( - ('some_str', cx_Oracle.DB_TYPE_VARCHAR, None, None, None, None, None), - ('some_num', cx_Oracle.DB_TYPE_NUMBER, None, None, None, None, None), + ('some_str', oracledb.DB_TYPE_VARCHAR, None, None, None, None, None), # type: ignore + ('some_num', oracledb.DB_TYPE_NUMBER, None, None, None, None, None), # type: ignore ) NDJSON_LINES = [ b'{"some_num": 42, "some_str": "mock_row_content_1"}\n', diff --git a/tests/providers/microsoft/azure/transfers/test_oracle_to_azure_data_lake.py b/tests/providers/microsoft/azure/transfers/test_oracle_to_azure_data_lake.py index 507e143a03774..17d308273c80f 100644 --- a/tests/providers/microsoft/azure/transfers/test_oracle_to_azure_data_lake.py +++ b/tests/providers/microsoft/azure/transfers/test_oracle_to_azure_data_lake.py @@ -44,8 +44,8 @@ def test_write_temp_file(self): delimiter = '|' encoding = 'utf-8' cursor_description = [ - ('id', "", 39, None, 38, 0, 0), - ('description', "", 60, 240, None, None, 1), + ('id', "", 39, None, 38, 0, 0), + ('description', "", 60, 240, None, None, 1), ] cursor_rows = [[1, 'description 1'], [2, 'description 2']] mock_cursor = MagicMock() @@ -95,8 +95,8 @@ def test_execute(self, mock_data_lake_hook, mock_oracle_hook): delimiter = '|' encoding = 'latin-1' cursor_description = [ - ('id', "", 39, None, 38, 0, 0), - ('description', "", 60, 240, None, None, 1), + ('id', "", 39, None, 38, 0, 0), + ('description', "", 60, 240, None, None, 1), ] cursor_rows = [[1, 'description 1'], [2, 'description 2']] cursor_mock = MagicMock() diff --git a/tests/providers/oracle/hooks/test_oracle.py b/tests/providers/oracle/hooks/test_oracle.py index db2e9f41ade98..d33dbf79f721a 100644 --- a/tests/providers/oracle/hooks/test_oracle.py +++ b/tests/providers/oracle/hooks/test_oracle.py @@ -28,12 +28,12 @@ from airflow.providers.oracle.hooks.oracle import OracleHook try: - import cx_Oracle + import oracledb except ImportError: - cx_Oracle = None + oracledb = None # type: ignore -@unittest.skipIf(cx_Oracle is None, 'cx_Oracle package not present') +@unittest.skipIf(oracledb is None, 'oracledb package not present') class TestOracleHookConn(unittest.TestCase): def setUp(self): super().setUp() @@ -46,7 +46,7 @@ def setUp(self): self.db_hook.get_connection = mock.Mock() self.db_hook.get_connection.return_value = self.connection - @mock.patch('airflow.providers.oracle.hooks.oracle.cx_Oracle.connect') + @mock.patch('airflow.providers.oracle.hooks.oracle.oracledb.connect') def test_get_conn_host(self, mock_connect): self.db_hook.get_conn() assert mock_connect.call_count == 1 @@ -56,7 +56,7 @@ def test_get_conn_host(self, mock_connect): assert kwargs['password'] == 'password' assert kwargs['dsn'] == 'host:1521/schema' - @mock.patch('airflow.providers.oracle.hooks.oracle.cx_Oracle.connect') + @mock.patch('airflow.providers.oracle.hooks.oracle.oracledb.connect') def test_get_conn_host_alternative_port(self, mock_connect): self.connection.port = 1522 self.db_hook.get_conn() @@ -67,7 +67,7 @@ def test_get_conn_host_alternative_port(self, mock_connect): assert kwargs['password'] == 'password' assert kwargs['dsn'] == 'host:1522/schema' - @mock.patch('airflow.providers.oracle.hooks.oracle.cx_Oracle.connect') + @mock.patch('airflow.providers.oracle.hooks.oracle.oracledb.connect') def test_get_conn_sid(self, mock_connect): dsn_sid = {'dsn': 'ignored', 'sid': 'sid'} self.connection.extra = json.dumps(dsn_sid) @@ -75,9 +75,9 @@ def test_get_conn_sid(self, mock_connect): assert mock_connect.call_count == 1 args, kwargs = mock_connect.call_args assert args == () - assert kwargs['dsn'] == cx_Oracle.makedsn("host", self.connection.port, dsn_sid['sid']) + assert kwargs['dsn'] == oracledb.makedsn("host", self.connection.port, dsn_sid['sid']) - @mock.patch('airflow.providers.oracle.hooks.oracle.cx_Oracle.connect') + @mock.patch('airflow.providers.oracle.hooks.oracle.oracledb.connect') def test_get_conn_service_name(self, mock_connect): dsn_service_name = {'dsn': 'ignored', 'service_name': 'service_name'} self.connection.extra = json.dumps(dsn_service_name) @@ -85,49 +85,19 @@ def test_get_conn_service_name(self, mock_connect): assert mock_connect.call_count == 1 args, kwargs = mock_connect.call_args assert args == () - assert kwargs['dsn'] == cx_Oracle.makedsn( + assert kwargs['dsn'] == oracledb.makedsn( "host", self.connection.port, service_name=dsn_service_name['service_name'] ) - @mock.patch('airflow.providers.oracle.hooks.oracle.cx_Oracle.connect') - def test_get_conn_encoding_without_nencoding(self, mock_connect): - self.connection.extra = json.dumps({'encoding': 'UTF-8'}) - self.db_hook.get_conn() - assert mock_connect.call_count == 1 - args, kwargs = mock_connect.call_args - assert args == () - assert kwargs['encoding'] == 'UTF-8' - assert kwargs['nencoding'] == 'UTF-8' - - @mock.patch('airflow.providers.oracle.hooks.oracle.cx_Oracle.connect') - def test_get_conn_encoding_with_nencoding(self, mock_connect): - self.connection.extra = json.dumps({'encoding': 'UTF-8', 'nencoding': 'gb2312'}) - self.db_hook.get_conn() - assert mock_connect.call_count == 1 - args, kwargs = mock_connect.call_args - assert args == () - assert kwargs['encoding'] == 'UTF-8' - assert kwargs['nencoding'] == 'gb2312' - - @mock.patch('airflow.providers.oracle.hooks.oracle.cx_Oracle.connect') - def test_get_conn_nencoding(self, mock_connect): - self.connection.extra = json.dumps({'nencoding': 'UTF-8'}) - self.db_hook.get_conn() - assert mock_connect.call_count == 1 - args, kwargs = mock_connect.call_args - assert args == () - assert 'encoding' not in kwargs - assert kwargs['nencoding'] == 'UTF-8' - - @mock.patch('airflow.providers.oracle.hooks.oracle.cx_Oracle.connect') + @mock.patch('airflow.providers.oracle.hooks.oracle.oracledb.connect') def test_get_conn_mode(self, mock_connect): mode = { - 'sysdba': cx_Oracle.SYSDBA, - 'sysasm': cx_Oracle.SYSASM, - 'sysoper': cx_Oracle.SYSOPER, - 'sysbkp': cx_Oracle.SYSBKP, - 'sysdgd': cx_Oracle.SYSDGD, - 'syskmt': cx_Oracle.SYSKMT, + 'sysdba': oracledb.AUTH_MODE_SYSDBA, + 'sysasm': oracledb.AUTH_MODE_SYSASM, + 'sysoper': oracledb.AUTH_MODE_SYSOPER, + 'sysbkp': oracledb.AUTH_MODE_SYSBKP, + 'sysdgd': oracledb.AUTH_MODE_SYSDGD, + 'syskmt': oracledb.AUTH_MODE_SYSKMT, } first = True for mod in mode: @@ -140,16 +110,7 @@ def test_get_conn_mode(self, mock_connect): assert args == () assert kwargs['mode'] == mode.get(mod) - @mock.patch('airflow.providers.oracle.hooks.oracle.cx_Oracle.connect') - def test_get_conn_threaded(self, mock_connect): - self.connection.extra = json.dumps({'threaded': True}) - self.db_hook.get_conn() - assert mock_connect.call_count == 1 - args, kwargs = mock_connect.call_args - assert args == () - assert kwargs['threaded'] is True - - @mock.patch('airflow.providers.oracle.hooks.oracle.cx_Oracle.connect') + @mock.patch('airflow.providers.oracle.hooks.oracle.oracledb.connect') def test_get_conn_events(self, mock_connect): self.connection.extra = json.dumps({'events': True}) self.db_hook.get_conn() @@ -158,12 +119,12 @@ def test_get_conn_events(self, mock_connect): assert args == () assert kwargs['events'] is True - @mock.patch('airflow.providers.oracle.hooks.oracle.cx_Oracle.connect') + @mock.patch('airflow.providers.oracle.hooks.oracle.oracledb.connect') def test_get_conn_purity(self, mock_connect): purity = { - 'new': cx_Oracle.ATTR_PURITY_NEW, - 'self': cx_Oracle.ATTR_PURITY_SELF, - 'default': cx_Oracle.ATTR_PURITY_DEFAULT, + 'new': oracledb.PURITY_NEW, + 'self': oracledb.PURITY_SELF, + 'default': oracledb.PURITY_DEFAULT, } first = True for pur in purity: @@ -176,14 +137,14 @@ def test_get_conn_purity(self, mock_connect): assert args == () assert kwargs['purity'] == purity.get(pur) - @mock.patch('airflow.providers.oracle.hooks.oracle.cx_Oracle.connect') + @mock.patch('airflow.providers.oracle.hooks.oracle.oracledb.connect') def test_set_current_schema(self, mock_connect): self.connection.schema = "schema_name" self.connection.extra = json.dumps({'service_name': 'service_name'}) assert self.db_hook.get_conn().current_schema == self.connection.schema -@unittest.skipIf(cx_Oracle is None, 'cx_Oracle package not present') +@unittest.skipIf(oracledb is None, 'oracledb package not present') class TestOracleHook(unittest.TestCase): def setUp(self): super().setUp() diff --git a/tests/providers/oracle/transfers/test_oracle_to_oracle.py b/tests/providers/oracle/transfers/test_oracle_to_oracle.py index a036a3e990df0..7970448c18bf0 100644 --- a/tests/providers/oracle/transfers/test_oracle_to_oracle.py +++ b/tests/providers/oracle/transfers/test_oracle_to_oracle.py @@ -33,8 +33,8 @@ def test_execute(): source_sql_params = {':p_data': "2018-01-01"} rows_chunk = 5000 cursor_description = [ - ('id', "", 39, None, 38, 0, 0), - ('description', "", 60, 240, None, None, 1), + ('id', "", 39, None, 38, 0, 0), + ('description', "", 60, 240, None, None, 1), ] cursor_rows = [[1, 'description 1'], [2, 'description 2']]