Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JDBC operator not logging errors #16295

Closed
stijn-meersman opened this issue Jun 7, 2021 · 7 comments · Fixed by #21540
Closed

JDBC operator not logging errors #16295

stijn-meersman opened this issue Jun 7, 2021 · 7 comments · Fixed by #21540
Assignees

Comments

@stijn-meersman
Copy link
Contributor

stijn-meersman commented Jun 7, 2021

Hi,

Since Airflow 2.0, we are having issues with logging for the JDBC operator. When such a tasks fails, we only see
INFO - Task exited with return code 1
The actual error and stack trace is not present.
It also seems to not try to execute it again, it only tries once even though my max_tries is 3.

I am using a Local Executor, and logs are also stored locally.
This issue occurs for both local installations and Docker.

full log:

*** Reading local file: /home/stijn/airflow/logs/airflow_incr/fmc_mtd/2021-06-01T15:00:00+00:00/1.log
[2021-06-01 18:00:13,389] {taskinstance.py:876} INFO - Dependencies all met for <TaskInstance: airflow_incr.fmc_mtd 2021-06-01T15:00:00+00:00 [queued]>
[2021-06-01 18:00:13,592] {taskinstance.py:876} INFO - Dependencies all met for <TaskInstance: airflow_incr.fmc_mtd 2021-06-01T15:00:00+00:00 [queued]>
[2021-06-01 18:00:13,592] {taskinstance.py:1067} INFO - 
--------------------------------------------------------------------------------
[2021-06-01 18:00:13,592] {taskinstance.py:1068} INFO - Starting attempt 1 of 4
[2021-06-01 18:00:13,593] {taskinstance.py:1069} INFO - 
--------------------------------------------------------------------------------
[2021-06-01 18:00:13,975] {taskinstance.py:1087} INFO - Executing <Task(JdbcOperator): fmc_mtd> on 2021-06-01T15:00:00+00:00
[2021-06-01 18:00:13,980] {standard_task_runner.py:52} INFO - Started process 957 to run task
[2021-06-01 18:00:13,983] {standard_task_runner.py:76} INFO - Running: ['airflow', 'tasks', 'run', 'airflow_incr', 'fmc_mtd', '2021-06-01T15:00:00+00:00', '--job-id', '2841', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/100_FL_DAG_airflow_incr_20210531_122511.py', '--cfg-path', '/tmp/tmp67h9tgso', '--error-file', '/tmp/tmp4w35rr0g']
[2021-06-01 18:00:13,990] {standard_task_runner.py:77} INFO - Job 2841: Subtask fmc_mtd
[2021-06-01 18:00:15,336] {logging_mixin.py:104} INFO - Running <TaskInstance: airflow_incr.fmc_mtd 2021-06-01T15:00:00+00:00 [running]> on host DESKTOP-VNC70B9.localdomain
[2021-06-01 18:00:17,757] {taskinstance.py:1282} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=Vaultspeed
AIRFLOW_CTX_DAG_ID=airflow_incr
AIRFLOW_CTX_TASK_ID=fmc_mtd
AIRFLOW_CTX_EXECUTION_DATE=2021-06-01T15:00:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=scheduled__2021-06-01T15:00:00+00:00
[2021-06-01 18:00:17,757] {jdbc.py:70} INFO - Executing: ['INSERT INTO "moto_fmc"."fmc_loading_history" \n\t\tSELECT \n\t\t\t\'airflow_incr\',\n\t\t\t\'airflow\',\n\t\t\t35,\n\t\t\tTO_TIMESTAMP(\'2021-06-01 16:00:00.000000\', \'YYYY-MM-DD HH24:MI:SS.US\'::varchar),\n\t\t\t"fmc_begin_lw_timestamp" + -15 * interval\'1 minute\',\n\t\t\tTO_TIMESTAMP(\'2021-06-01 16:00:00.000000\', \'YYYY-MM-DD HH24:MI:SS.US\'::varchar),\n\t\t\tTO_TIMESTAMP(\'2021-06-01 15:59:59.210732\', \'YYYY-MM-DD HH24:MI:SS.US\'::varchar),\n\t\t\tnull,\n\t\t\tnull\n\t\tFROM (\n\t\t\tSELECT MAX("fmc_end_lw_timestamp") as "fmc_begin_lw_timestamp" \n\t\t\tFROM "moto_fmc"."fmc_loading_history" \n\t\t\tWHERE "src_bk" = \'airflow\' \n\t\t\tAND "success_flag" = 1\n\t\t\tAND "load_cycle_id" < 35\n\t\t) SRC_WINDOW\n\t\tWHERE NOT EXISTS(SELECT 1 FROM "moto_fmc"."fmc_loading_history" WHERE "load_cycle_id" = 35)', 'TRUNCATE TABLE "airflow_mtd"."load_cycle_info" ', 'INSERT INTO "airflow_mtd"."load_cycle_info"("load_cycle_id","load_date") \n\t\t\tSELECT 35,TO_TIMESTAMP(\'2021-06-01 16:00:00.000000\', \'YYYY-MM-DD HH24:MI:SS.US\'::varchar)', 'TRUNCATE TABLE "airflow_mtd"."fmc_loading_window_table" ', 'INSERT INTO "airflow_mtd"."fmc_loading_window_table"("fmc_begin_lw_timestamp","fmc_end_lw_timestamp") \n\t\t\tSELECT "fmc_begin_lw_timestamp" + -15 * interval\'1 minute\', TO_TIMESTAMP(\'2021-06-01 16:00:00.000000\', \'YYYY-MM-DD HH24:MI:SS.US\'::varchar)\n\t\t\tFROM (\n\t\t\t\tSELECT MAX("fmc_end_lw_timestamp") as "fmc_begin_lw_timestamp" \n\t\t\t\tFROM "moto_fmc"."fmc_loading_history" \n\t\t\t\tWHERE "src_bk" = \'airflow\' \n\t\t\t\tAND "success_flag" = 1\n\t\t\t\tAND "load_cycle_id" < 35\n\t\t\t) SRC_WINDOW']
[2021-06-01 18:00:18,097] {base.py:78} INFO - Using connection to: id: test_dv. Host: jdbc:postgresql://localhost:5432/test_dv_stijn, Port: None, Schema: , Login: postgres, Password: ***, extra: {'extra__jdbc__drv_path': '/home/stijn/airflow/jdbc/postgresql-9.4.1212.jar', 'extra__jdbc__drv_clsname': 'org.postgresql.Driver', 'extra__google_cloud_platform__project': '', 'extra__google_cloud_platform__key_path': '', 'extra__google_cloud_platform__keyfile_dict': '', 'extra__google_cloud_platform__scope': '', 'extra__google_cloud_platform__num_retries': 5, 'extra__grpc__auth_type': '', 'extra__grpc__credential_pem_file': '', 'extra__grpc__scopes': '', 'extra__yandexcloud__service_account_json': '', 'extra__yandexcloud__service_account_json_path': '', 'extra__yandexcloud__oauth': '', 'extra__yandexcloud__public_ssh_key': '', 'extra__yandexcloud__folder_id': '', 'extra__kubernetes__in_cluster': False, 'extra__kubernetes__kube_config': '', 'extra__kubernetes__namespace': ''}
[2021-06-01 18:00:18,530] {local_task_job.py:151} INFO - Task exited with return code 1
`

@stijn-meersman stijn-meersman added the kind:bug This is a clearly a bug label Jun 7, 2021
@boring-cyborg
Copy link

boring-cyborg bot commented Jun 7, 2021

Thanks for opening your first issue here! Be sure to follow the issue template!

@hubert-pietron
Copy link
Contributor

Hi, What is the status of this issue @eladkal? Can You assign it to me?

@potiuk
Copy link
Member

potiuk commented Jan 9, 2022

Assigned you

@hubert-pietron
Copy link
Contributor

Need some advice.
I think it's caused because in jaydebeapi exceptions are firstly internally handled and then reraised (And somehow task can't see that).
I found way to get Exception by rewriting _run_command from airflow/hooks/dbapi.py in airflow/providers/jdbc/hooks/jdbc.py, and it would look like this:

   def _run_command(self, cur, sql_statement, parameters):
       """Runs a statement using an already open jaydebeapi cursor."""
       self.log.info("Running statement: %s, parameters: %s", sql_statement, parameters)
       try:
           if parameters:
               cur.execute(sql_statement, parameters)
           else:
               cur.execute(sql_statement)
       except (jaydebeapi.DatabaseError, jaydebeapi.InterfaceError) as e:
           self.log.error(e)
           raise
       # According to PEP 249, this is -1 when query result is not applicable.
       if cur.rowcount >= 0:
           self.log.info("Rows affected: %s", cur.rowcount)

Example log after change:

[2022-01-15, 16:02:49 UTC] {base.py:70} INFO - Using connection to: id: jdbc_test. Host: jdbc:postgresql://172.19.0.1:25433/***, Port: None, Schema: , Login: postgres, Password: ***, extra: {'extra__jdbc__drv_clsname': 'org.postgresql.Driver', 'extra__jdbc__drv_path': '/opt/***/postgresql-42.2.24.jre6.jar'}
[2022-01-15, 16:02:49 UTC] {jdbc.py:106} INFO - Running statement: select 1 from x , parameters: None
[2022-01-15, 16:02:49 UTC] {jdbc.py:113} ERROR - org.postgresql.util.PSQLException: ERROR: relation "x" does not exist
  Position: 15
[2022-01-15, 16:02:49 UTC] {local_task_job.py:153} INFO - Task exited with return code 1

Please let me know if it's okay.

@uranusjr
Copy link
Member

Is it not possible to do this instead?

def _run_command(self, cur, sql_statement, parameters):
    try:
        return super()._run_command(cur, sql_statement, parameters)
    except (jaydebeapi.DatabaseError, jaydebeapi.InterfaceError):
        self.log.exception("Failed to execute statement in JDBC")
        raise

@uranusjr
Copy link
Member

Also please investigate how far above the stack the exception is bubbled to. When is the exception lost so “somehow task can't see that”? It’d be awesome if we could ge a reason better than “somehow” 🙂

@hubert-pietron
Copy link
Contributor

Thanks for response, of course i will provide explanation in PR why and when exception it's lost. I have theory about that, but right now i have issues with configuring debugger in Pycharm and i can't check it properly.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

6 participants