diff --git a/src/integrations/prefect-dbt/prefect_dbt/core/profiles.py b/src/integrations/prefect-dbt/prefect_dbt/core/profiles.py new file mode 100644 index 000000000000..355bf24f71e2 --- /dev/null +++ b/src/integrations/prefect-dbt/prefect_dbt/core/profiles.py @@ -0,0 +1,155 @@ +""" +Utilities for working with dbt profiles.yml files, including resolving +block document and variable references. +""" + +import contextlib +import os +import tempfile +from pathlib import Path +from typing import ( + Any, + AsyncGenerator, + Generator, + Optional, +) + +import slugify +import yaml + +from prefect.utilities.asyncutils import run_coro_as_sync +from prefect.utilities.templating import ( + resolve_block_document_references, + resolve_variables, +) + + +def get_profiles_dir() -> str: + """Get the dbt profiles directory from environment or default location.""" + profiles_dir = os.getenv("DBT_PROFILES_DIR") + if not profiles_dir: + profiles_dir = os.path.expanduser("~/.dbt") + return profiles_dir + + +def load_profiles_yml(profiles_dir: Optional[str]) -> dict[str, Any]: + """ + Load and parse the profiles.yml file. + + Args: + profiles_dir: Path to the directory containing profiles.yml. + If None, uses the default profiles directory. + + Returns: + Dict containing the parsed profiles.yml contents + + Raises: + ValueError: If profiles.yml is not found + """ + if profiles_dir is None: + profiles_dir = get_profiles_dir() + + profiles_path = os.path.join(profiles_dir, "profiles.yml") + if not os.path.exists(profiles_path): + raise ValueError(f"No profiles.yml found at {profiles_path}") + + with open(profiles_path, "r") as f: + return yaml.safe_load(f) + + +def replace_with_env_var_call(placeholder: str, value: Any) -> str: + """ + A block reference replacement function that returns template text for an env var call. + + Args: + placeholder: The placeholder text to replace + value: The value to replace the placeholder with + + Returns: + The template text for an env var call + """ + env_var_name = slugify.slugify(placeholder, separator="_").upper() + + os.environ[env_var_name] = str(value) + + template_text = f"{{{{ env_var('{env_var_name}') }}}}" + + return template_text + + +@contextlib.asynccontextmanager +async def aresolve_profiles_yml( + profiles_dir: Optional[str] = None, +) -> AsyncGenerator[str, None]: + """ + Asynchronous context manager that creates a temporary directory with a resolved profiles.yml file. + + Args: + profiles_dir: Path to the directory containing profiles.yml. + If None, uses the default profiles directory. + + Yields: + str: Path to temporary directory containing the resolved profiles.yml. + Directory and contents are automatically cleaned up after context exit. + + Example: + ```python + async with aresolve_profiles_yml() as temp_dir: + # temp_dir contains resolved profiles.yml + # use temp_dir for dbt operations + # temp_dir is automatically cleaned up + ``` + """ + with tempfile.TemporaryDirectory() as temp_dir: + temp_dir_path = Path(temp_dir) + profiles_yml: dict[str, Any] = load_profiles_yml(profiles_dir) + profiles_yml = await resolve_block_document_references( + profiles_yml, value_transformer=replace_with_env_var_call + ) + profiles_yml = await resolve_variables(profiles_yml) + + temp_profiles_path = temp_dir_path / "profiles.yml" + temp_profiles_path.write_text( + yaml.dump(profiles_yml, default_style=None, default_flow_style=False) + ) + yield str(temp_dir_path) + + +@contextlib.contextmanager +def resolve_profiles_yml( + profiles_dir: Optional[str] = None, +) -> Generator[str, None, None]: + """ + Synchronous context manager that creates a temporary directory with a resolved profiles.yml file. + + Args: + profiles_dir: Path to the directory containing profiles.yml. + If None, uses the default profiles directory. + + Yields: + str: Path to temporary directory containing the resolved profiles.yml. + Directory and contents are automatically cleaned up after context exit. + + Example: + ```python + with resolve_profiles_yml() as temp_dir: + # temp_dir contains resolved profiles.yml + # use temp_dir for dbt operations + # temp_dir is automatically cleaned up + ``` + """ + with tempfile.TemporaryDirectory() as temp_dir: + temp_dir_path = Path(temp_dir) + profiles_yml: dict[str, Any] = load_profiles_yml(profiles_dir) + profiles_yml = run_coro_as_sync( + resolve_block_document_references( + profiles_yml, value_transformer=replace_with_env_var_call + ) + ) + profiles_yml = run_coro_as_sync(resolve_variables(profiles_yml)) + + temp_profiles_path = temp_dir_path / "profiles.yml" + temp_profiles_path.write_text( + yaml.dump(profiles_yml, default_style=None, default_flow_style=False) + ) + yield str(temp_dir_path) diff --git a/src/integrations/prefect-dbt/tests/core/test_profiles.py b/src/integrations/prefect-dbt/tests/core/test_profiles.py new file mode 100644 index 000000000000..7c049975115f --- /dev/null +++ b/src/integrations/prefect-dbt/tests/core/test_profiles.py @@ -0,0 +1,291 @@ +import os +from pathlib import Path + +import pytest +import yaml +from prefect_dbt.core.profiles import ( + aresolve_profiles_yml, + get_profiles_dir, + load_profiles_yml, + replace_with_env_var_call, + resolve_profiles_yml, +) + +from prefect.client.orchestration import get_client + +SAMPLE_PROFILE = { + "jaffle_shop": { + "outputs": { + "dev": { + "type": "duckdb", + "path": "jaffle_shop.duckdb", + "schema": "main", + "threads": 4, + } + } + } +} + +VARIABLES_PROFILE = { + "jaffle_shop_with_variable_reference": { + "outputs": { + "dev": { + "type": "duckdb", + "path": "jaffle_shop.duckdb", + "schema": "main", + "threads": 4, + } + }, + "target": "{{ prefect.variables.target }}", + } +} + +BLOCKS_PROFILE = { + "jaffle_shop_with_blocks_reference": { + "outputs": { + "dev": { + "type": "duckdb", + "path": "jaffle_shop.duckdb", + "schema": "main", + "threads": 4, + "password": "{{ prefect.blocks.secret.my-password }}", + } + }, + "target": "dev", + } +} + + +@pytest.fixture +def temp_profiles_dir(tmp_path): + profiles_dir = tmp_path / ".dbt" + profiles_dir.mkdir() + + profiles_path = profiles_dir / "profiles.yml" + with open(profiles_path, "w") as f: + yaml.dump(SAMPLE_PROFILE, f) + + return str(profiles_dir) + + +@pytest.fixture +def temp_variables_profiles_dir(tmp_path): + profiles_dir = tmp_path / ".dbt" + profiles_dir.mkdir() + + profiles_path = profiles_dir / "profiles.yml" + with open(profiles_path, "w") as f: + yaml.dump(VARIABLES_PROFILE, f) + + return str(profiles_dir) + + +@pytest.fixture +def temp_blocks_profiles_dir(tmp_path): + """Create a temporary profiles directory with a profile that references a block.""" + profiles_dir = tmp_path / ".dbt" + profiles_dir.mkdir() + + profiles_path = profiles_dir / "profiles.yml" + with open(profiles_path, "w") as f: + yaml.dump(BLOCKS_PROFILE, f) + + return str(profiles_dir) + + +def test_get_profiles_dir_default(): + if "DBT_PROFILES_DIR" in os.environ: + del os.environ["DBT_PROFILES_DIR"] + + expected = os.path.expanduser("~/.dbt") + assert get_profiles_dir() == expected + + +def test_get_profiles_dir_from_env(monkeypatch): + test_path = "/custom/path" + monkeypatch.setenv("DBT_PROFILES_DIR", test_path) + assert get_profiles_dir() == test_path + + +def test_load_profiles_yml_success(temp_profiles_dir): + profiles = load_profiles_yml(temp_profiles_dir) + assert profiles == SAMPLE_PROFILE + + +def test_load_profiles_yml_default_dir(monkeypatch, temp_profiles_dir): + monkeypatch.setenv("DBT_PROFILES_DIR", temp_profiles_dir) + profiles = load_profiles_yml(None) + assert profiles == SAMPLE_PROFILE + + +def test_load_profiles_yml_file_not_found(): + nonexistent_dir = "/path/that/does/not/exist" + with pytest.raises( + ValueError, + match=f"No profiles.yml found at {os.path.join(nonexistent_dir, 'profiles.yml')}", + ): + load_profiles_yml(nonexistent_dir) + + +def test_load_profiles_yml_invalid_yaml(temp_profiles_dir): + profiles_path = Path(temp_profiles_dir) / "profiles.yml" + with open(profiles_path, "w") as f: + f.write("invalid: yaml: content:\nindentation error") + + with pytest.raises(yaml.YAMLError): + load_profiles_yml(temp_profiles_dir) + + +async def test_aresolve_profiles_yml_success( + temp_profiles_dir, +): + """Test that aresolve_profiles_yml creates and cleans up temporary directory.""" + async with aresolve_profiles_yml(temp_profiles_dir) as temp_dir: + temp_path = Path(temp_dir) + profiles_path = temp_path / "profiles.yml" + + # Check temporary directory exists and contains profiles.yml + assert temp_path.exists() + assert profiles_path.exists() + + # Verify contents match expected profiles + loaded_profiles = yaml.safe_load(profiles_path.read_text()) + assert loaded_profiles == SAMPLE_PROFILE + + # Verify cleanup happened + assert not temp_path.exists() + + +def test_resolve_profiles_yml_success(temp_profiles_dir): + """Test that resolve_profiles_yml creates and cleans up temporary directory.""" + with resolve_profiles_yml(temp_profiles_dir) as temp_dir: + temp_path = Path(temp_dir) + profiles_path = temp_path / "profiles.yml" + + # Check temporary directory exists and contains profiles.yml + assert temp_path.exists() + assert profiles_path.exists() + + # Verify contents match expected profiles + loaded_profiles = yaml.safe_load(profiles_path.read_text()) + assert loaded_profiles == SAMPLE_PROFILE + + # Verify cleanup happened + assert not temp_path.exists() + + +async def test_aresolve_profiles_yml_error_cleanup(temp_profiles_dir): + """Test that temporary directory is cleaned up even if an error occurs.""" + temp_path = None + + with pytest.raises(ValueError): + async with aresolve_profiles_yml(temp_profiles_dir) as temp_dir: + temp_path = Path(temp_dir) + assert temp_path.exists() + raise ValueError("Test error") + + # Verify cleanup happened despite error + assert temp_path is not None + assert not temp_path.exists() + + +def test_resolve_profiles_yml_error_cleanup(temp_profiles_dir): + """Test that temporary directory is cleaned up even if an error occurs.""" + temp_path = None + + with pytest.raises(ValueError): + with resolve_profiles_yml(temp_profiles_dir) as temp_dir: + temp_path = Path(temp_dir) + assert temp_path.exists() + raise ValueError("Test error") + + # Verify cleanup happened despite error + assert temp_path is not None + assert not temp_path.exists() + + +async def test_aresolve_profiles_yml_resolves_variables(temp_variables_profiles_dir): + """Test that variables in profiles.yml are properly resolved.""" + # Create a variable + async with get_client() as client: + await client._client.post( + "/variables/", json={"name": "target", "value": "dev"} + ) + + # Use the context manager and verify variable resolution + async with aresolve_profiles_yml(temp_variables_profiles_dir) as temp_dir: + temp_path = Path(temp_dir) + profiles_path = temp_path / "profiles.yml" + + # Verify contents have resolved variables + loaded_profiles = yaml.safe_load(profiles_path.read_text()) + assert loaded_profiles["jaffle_shop_with_variable_reference"]["target"] == "dev" + + +def test_resolve_profiles_yml_resolves_variables(temp_variables_profiles_dir): + with resolve_profiles_yml(temp_variables_profiles_dir) as temp_dir: + temp_path = Path(temp_dir) + profiles_path = temp_path / "profiles.yml" + + loaded_profiles = yaml.safe_load(profiles_path.read_text()) + assert loaded_profiles["jaffle_shop_with_variable_reference"]["target"] == "dev" + + +async def test_aresolve_profiles_yml_resolves_blocks(temp_blocks_profiles_dir): + from prefect.blocks.system import Secret + + secret_block = Secret(value="super-secret-password") + await secret_block.save("my-password", overwrite=True) + + async with aresolve_profiles_yml(temp_blocks_profiles_dir) as temp_dir: + temp_path = Path(temp_dir) + profiles_path = temp_path / "profiles.yml" + + loaded_profiles = yaml.safe_load(profiles_path.read_text()) + assert ( + loaded_profiles["jaffle_shop_with_blocks_reference"]["outputs"]["dev"][ + "password" + ] + == "{{ env_var('PREFECT_BLOCKS_SECRET_MY_PASSWORD') }}" + ) + + +def test_resolve_profiles_yml_resolves_blocks(temp_blocks_profiles_dir): + from prefect.blocks.system import Secret + + secret_block = Secret(value="super-secret-password") + secret_block.save("my-password", overwrite=True) + + with resolve_profiles_yml(temp_blocks_profiles_dir) as temp_dir: + temp_path = Path(temp_dir) + profiles_path = temp_path / "profiles.yml" + + loaded_profiles = yaml.safe_load(profiles_path.read_text()) + assert ( + loaded_profiles["jaffle_shop_with_blocks_reference"]["outputs"]["dev"][ + "password" + ] + == "{{ env_var('PREFECT_BLOCKS_SECRET_MY_PASSWORD') }}" + ) + + +def test_replace_with_env_var_call(): + """Test that replace_with_env_var_call properly creates env vars and returns template text.""" + # Test with a simple block reference + result = replace_with_env_var_call( + "prefect.blocks.secret.my-password.value", "test-value" + ) + assert result == "{{ env_var('PREFECT_BLOCKS_SECRET_MY_PASSWORD_VALUE') }}" + assert os.environ["PREFECT_BLOCKS_SECRET_MY_PASSWORD_VALUE"] == "test-value" + + # Test with a complex block instance name + result = replace_with_env_var_call( + "prefect.blocks.json.complex-name!@123.value", "complex-value" + ) + assert result == "{{ env_var('PREFECT_BLOCKS_JSON_COMPLEX_NAME_123_VALUE') }}" + assert os.environ["PREFECT_BLOCKS_JSON_COMPLEX_NAME_123_VALUE"] == "complex-value" + + # Test with non-string value + result = replace_with_env_var_call("prefect.blocks.json.number-config.value", 42) + assert result == "{{ env_var('PREFECT_BLOCKS_JSON_NUMBER_CONFIG_VALUE') }}" + assert os.environ["PREFECT_BLOCKS_JSON_NUMBER_CONFIG_VALUE"] == "42" diff --git a/src/prefect/utilities/templating.py b/src/prefect/utilities/templating.py index a009e7d03366..c721478d65b5 100644 --- a/src/prefect/utilities/templating.py +++ b/src/prefect/utilities/templating.py @@ -4,6 +4,7 @@ from typing import ( TYPE_CHECKING, Any, + Callable, Literal, NamedTuple, Optional, @@ -197,11 +198,13 @@ def apply_values( @inject_client async def resolve_block_document_references( - template: T, client: Optional["PrefectClient"] = None + template: T, + client: Optional["PrefectClient"] = None, + value_transformer: Optional[Callable[[str, Any], Any]] = None, ) -> Union[T, dict[str, Any]]: """ Resolve block document references in a template by replacing each reference with - the data of the block document. + its value or the return value of the transformer function if provided. Recursively searches for block document references in dictionaries and lists. @@ -258,6 +261,7 @@ async def resolve_block_document_references( Args: template: The template to resolve block documents in + value_transformer: A function that takes the block placeholder and the block value and returns replacement text for the template Returns: The template with block documents resolved @@ -275,13 +279,15 @@ async def resolve_block_document_references( updated_template: dict[str, Any] = {} for key, value in template.items(): updated_value = await resolve_block_document_references( - value, client=client + value, value_transformer=value_transformer, client=client ) updated_template[key] = updated_value return updated_template elif isinstance(template, list): return [ - await resolve_block_document_references(item, client=client) + await resolve_block_document_references( + item, value_transformer=value_transformer, client=client + ) for item in template ] elif isinstance(template, str): @@ -326,6 +332,9 @@ async def resolve_block_document_references( ) value = from_dict + if value_transformer: + value = value_transformer(placeholder.full_match, value) + return value else: raise ValueError(