Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add thread_id context var #7942

Merged
merged 6 commits into from
Jun 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20230623-173357.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Add thread_id context var
time: 2023-06-23T17:33:57.412102-07:00
custom:
Author: NiallRees
Issue: "7941"
6 changes: 6 additions & 0 deletions core/dbt/context/base.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import json
import os
from typing import Any, Dict, NoReturn, Optional, Mapping, Iterable, Set, List
import threading

from dbt.flags import get_flags
import dbt.flags as flags_module
Expand Down Expand Up @@ -596,6 +597,11 @@ def invocation_id(self) -> Optional[str]:
"""
return get_invocation_id()

@contextproperty
def thread_id(self) -> str:
"""thread_id outputs an ID for the current thread (useful for auditing)"""
return threading.current_thread().name

@contextproperty
def modules(self) -> Dict[str, Any]:
"""The `modules` variable in the Jinja context contains useful Python
Expand Down
3 changes: 2 additions & 1 deletion tests/adapter/dbt/tests/adapter/hooks/data/seed_model.sql
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@ create table {schema}.on_model_hook (
target_pass TEXT,
target_threads INTEGER,
run_started_at TEXT,
invocation_id TEXT
invocation_id TEXT,
thread_id TEXT
);
3 changes: 2 additions & 1 deletion tests/adapter/dbt/tests/adapter/hooks/data/seed_run.sql
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@ create table {schema}.on_run_hook (
target_pass TEXT,
target_threads INTEGER,
run_started_at TEXT,
invocation_id TEXT
invocation_id TEXT,
thread_id TEXT
);
52 changes: 35 additions & 17 deletions tests/adapter/dbt/tests/adapter/hooks/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@
target_pass,
target_threads,
run_started_at,
invocation_id
invocation_id,
thread_id
) VALUES (
'{{ state }}',
'{{ target.dbname }}',
Expand All @@ -52,7 +53,8 @@
'{{ target.get("pass", "") }}',
{{ target.threads }},
'{{ run_started_at }}',
'{{ invocation_id }}'
'{{ invocation_id }}',
'{{ thread_id }}'
)

{% endmacro %}
Expand Down Expand Up @@ -83,7 +85,8 @@
target_pass,\
target_threads,\
run_started_at,\
invocation_id
invocation_id,\
thread_id
) VALUES (\
'start',\
'{{ target.dbname }}',\
Expand All @@ -95,7 +98,8 @@
'{{ target.get(\\"pass\\", \\"\\") }}',\
{{ target.threads }},\
'{{ run_started_at }}',\
'{{ invocation_id }}'\
'{{ invocation_id }}',\
'{{ thread_id }}'\
)",
"post-hook": "\
insert into {{this.schema}}.on_model_hook (\
Expand All @@ -109,7 +113,9 @@
target_pass,\
target_threads,\
run_started_at,\
invocation_id
invocation_id,\
thread_id

) VALUES (\
'end',\
'{{ target.dbname }}',\
Expand All @@ -121,7 +127,8 @@
'{{ target.get(\\"pass\\", \\"\\") }}',\
{{ target.threads }},\
'{{ run_started_at }}',\
'{{ invocation_id }}'\
'{{ invocation_id }}',\
'{{ thread_id }}'\
)"
})
}}
Expand All @@ -144,7 +151,8 @@
target_pass,\
target_threads,\
run_started_at,\
invocation_id
invocation_id,\
thread_id
) VALUES (\
'start',\
'{{ target.dbname }}',\
Expand All @@ -156,7 +164,8 @@
'{{ target.get(\\"pass\\", \\"\\") }}',\
{{ target.threads }},\
'{{ run_started_at }}',\
'{{ invocation_id }}'
'{{ invocation_id }}',\
'{{ thread_id }}'
)",
"pre-hook": "\
insert into {{this.schema}}.on_model_hook (\
Expand All @@ -170,7 +179,8 @@
target_pass,\
target_threads,\
run_started_at,\
invocation_id
invocation_id,\
thread_id
) VALUES (\
'start',\
'{{ target.dbname }}',\
Expand All @@ -182,7 +192,8 @@
'{{ target.get(\\"pass\\", \\"\\") }}',\
{{ target.threads }},\
'{{ run_started_at }}',\
'{{ invocation_id }}'
'{{ invocation_id }}',\
'{{ thread_id }}'
)",
"post-hook": "\
insert into {{this.schema}}.on_model_hook (\
Expand All @@ -196,7 +207,8 @@
target_pass,\
target_threads,\
run_started_at,\
invocation_id
invocation_id,\
thread_id
) VALUES (\
'end',\
'{{ target.dbname }}',\
Expand All @@ -208,7 +220,8 @@
'{{ target.get(\\"pass\\", \\"\\") }}',\
{{ target.threads }},\
'{{ run_started_at }}',\
'{{ invocation_id }}'\
'{{ invocation_id }}',\
'{{ thread_id }}'\
)"
})
}}
Expand All @@ -231,7 +244,8 @@
target_pass,\
target_threads,\
run_started_at,\
invocation_id
invocation_id,\
thread_id
) VALUES (\
'start',\
'{{ target.dbname }}',\
Expand All @@ -243,7 +257,8 @@
'{{ target.get(\\"pass\\", \\"\\") }}',\
{{ target.threads }},\
'{{ run_started_at }}',\
'{{ invocation_id }}'\
'{{ invocation_id }}',\
'{{ thread_id }}'\
)",
post_hook="\
insert into {{this.schema}}.on_model_hook (\
Expand All @@ -257,7 +272,8 @@
target_pass,\
target_threads,\
run_started_at,\
invocation_id\
invocation_id,\
thread_id\
) VALUES (\
'end',\
'{{ target.dbname }}',\
Expand All @@ -269,7 +285,8 @@
'{{ target.get(\\"pass\\", \\"\\") }}',\
{{ target.threads }},\
'{{ run_started_at }}',\
'{{ invocation_id }}'\
'{{ invocation_id }}',\
'{{ thread_id }}'\
)"
)
}}
Expand All @@ -292,7 +309,8 @@
'{{ target.get(\\"pass\\", \\"\\") }}' as target_pass,\
{{ target.threads }} as target_threads,\
'{{ run_started_at }}' as run_started_at,\
'{{ invocation_id }}' as invocation_id
'{{ invocation_id }}' as invocation_id,\
'{{ thread_id }}' as thread_id
from {{ ref('pre') }}\
"
})
Expand Down
17 changes: 12 additions & 5 deletions tests/adapter/dbt/tests/adapter/hooks/test_model_hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@
target_pass,
target_threads,
run_started_at,
invocation_id
invocation_id,
thread_id
) VALUES (
'start',
'{{ target.dbname }}',
Expand All @@ -47,7 +48,8 @@
'{{ target.get("pass", "") }}',
{{ target.threads }},
'{{ run_started_at }}',
'{{ invocation_id }}'
'{{ invocation_id }}',
'{{ thread_id }}'
)
"""

Expand All @@ -63,7 +65,8 @@
target_pass,
target_threads,
run_started_at,
invocation_id
invocation_id,
thread_id
) VALUES (
'end',
'{{ target.dbname }}',
Expand All @@ -75,7 +78,8 @@
'{{ target.get("pass", "") }}',
{{ target.threads }},
'{{ run_started_at }}',
'{{ invocation_id }}'
'{{ invocation_id }}',
'{{ thread_id }}'
)
"""

Expand All @@ -98,6 +102,7 @@ def get_ctx_vars(self, state, count, project):
"target_pass",
"run_started_at",
"invocation_id",
"thread_id",
]
field_list = ", ".join(['"{}"'.format(f) for f in fields])
query = f"select {field_list} from {project.test_schema}.on_model_hook where test_state = '{state}'"
Expand Down Expand Up @@ -127,6 +132,7 @@ def check_hooks(self, state, project, host, count=1):
assert (
ctx["invocation_id"] is not None and len(ctx["invocation_id"]) > 0
), "invocation_id was not set"
assert ctx["thread_id"].startswith("Thread-")


class TestPrePostModelHooks(BaseTestPrePost):
Expand Down Expand Up @@ -204,7 +210,8 @@ def project_config_update(self):
'{{ target.get(pass, "") }}' as target_pass,
{{ target.threads }} as target_threads,
'{{ run_started_at }}' as run_started_at,
'{{ invocation_id }}' as invocation_id
'{{ invocation_id }}' as invocation_id,
'{{ thread_id }}' as thread_id
from {{ ref('post') }}""".strip()
],
}
Expand Down
2 changes: 2 additions & 0 deletions tests/adapter/dbt/tests/adapter/hooks/test_run_hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ def get_ctx_vars(self, state, project):
"target_pass",
"run_started_at",
"invocation_id",
"thread_id",
]
field_list = ", ".join(['"{}"'.format(f) for f in fields])
query = f"select {field_list} from {project.test_schema}.on_run_hook where test_state = '{state}'"
Expand Down Expand Up @@ -119,6 +120,7 @@ def check_hooks(self, state, project, host):
assert (
ctx["invocation_id"] is not None and len(ctx["invocation_id"]) > 0
), "invocation_id was not set"
assert ctx["thread_id"].startswith("Thread-") or ctx["thread_id"] == "MainThread"

def test_pre_and_post_run_hooks(self, setUp, project, dbt_profile_target):
run_dbt(["run"])
Expand Down
2 changes: 2 additions & 0 deletions tests/functional/context_methods/test_env_vars.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
-- runtime variables
'{{ run_started_at }}' as run_started_at,
'{{ invocation_id }}' as invocation_id,
'{{ thread_id }}' as thread_id,

'{{ env_var("DBT_TEST_ENV_VAR") }}' as env_var,
'{{ env_var("DBT_TEST_IGNORE_DEFAULT", "ignored_default_val") }}' as env_var_ignore_default,
Expand Down Expand Up @@ -114,6 +115,7 @@ def get_ctx_vars(self, project):
"target.pass",
"run_started_at",
"invocation_id",
"thread_id",
"env_var",
]
field_list = ", ".join(['"{}"'.format(f) for f in fields])
Expand Down
1 change: 1 addition & 0 deletions tests/functional/context_methods/test_secret_env_vars.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def test_disallow_secret(self, project):
-- runtime variables
'{{ run_started_at }}' as run_started_at,
'{{ invocation_id }}' as invocation_id,
'{{ thread_id }}' as thread_id,

'{{ env_var("DBT_TEST_ENV_VAR") }}' as env_var,
'secret_variable' as env_var_secret, -- make sure the value itself is scrubbed from the logs
Expand Down
1 change: 1 addition & 0 deletions tests/unit/test_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ def assert_has_keys(required_keys: Set[str], maybe_keys: Set[str], ctx: Dict[str
"log",
"run_started_at",
"invocation_id",
"thread_id",
"modules",
"flags",
"print",
Expand Down