Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix #7502: write run_results.json for run operation #7655

Merged
merged 10 commits into from
May 23, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20230522-132924.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: write run_results.json for run operation
time: 2023-05-22T13:29:24.182612-07:00
custom:
Author: aranke
Issue: "7502"
4 changes: 1 addition & 3 deletions core/dbt/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
from dbt.contracts.results import (
CatalogArtifact,
RunExecutionResult,
RunOperationResultsArtifact,
)
from dbt.events.base_types import EventMsg
from dbt.task.build import BuildTask
Expand Down Expand Up @@ -53,8 +52,7 @@ class dbtRunnerResult:
List[str], # list/ls
Manifest, # parse
None, # clean, deps, init, source
RunExecutionResult, # build, compile, run, seed, snapshot, test
RunOperationResultsArtifact, # run-operation
RunExecutionResult, # build, compile, run, seed, snapshot, test, run-operation
] = None


Expand Down
34 changes: 0 additions & 34 deletions core/dbt/contracts/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,40 +247,6 @@ def write(self, path: str):
write_json(path, self.to_dict(omit_none=False))


@dataclass
class RunOperationResult(ExecutionResult):
success: bool


@dataclass
class RunOperationResultMetadata(BaseArtifactMetadata):
dbt_schema_version: str = field(
default_factory=lambda: str(RunOperationResultsArtifact.dbt_schema_version)
)


@dataclass
@schema_version("run-operation-result", 1)
class RunOperationResultsArtifact(RunOperationResult, ArtifactMixin):
@classmethod
def from_success(
cls,
success: bool,
elapsed_time: float,
generated_at: datetime,
):
meta = RunOperationResultMetadata(
dbt_schema_version=str(cls.dbt_schema_version),
generated_at=generated_at,
)
return cls(
metadata=meta,
results=[],
elapsed_time=elapsed_time,
success=success,
)


# due to issues with typing.Union collapsing subclasses, this can't subclass
# PartialResult

Expand Down
62 changes: 53 additions & 9 deletions core/dbt/task/run_operation.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,25 @@
from datetime import datetime
import os
import threading
import traceback
from datetime import datetime

import agate

from .base import ConfiguredTask

import dbt.exceptions
from dbt.adapters.factory import get_adapter
from dbt.contracts.results import RunOperationResultsArtifact
from dbt.contracts.files import FileHash
from dbt.contracts.graph.nodes import HookNode
from dbt.contracts.results import RunResultsArtifact, RunResult, RunStatus, TimingInfo
from dbt.events.functions import fire_event
from dbt.events.types import (
RunningOperationCaughtError,
RunningOperationUncaughtError,
LogDebugStackTrace,
)
from dbt.node_types import NodeType
from .base import ConfiguredTask
aranke marked this conversation as resolved.
Show resolved Hide resolved

RESULT_FILE_NAME = "run_results.json"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am assuming you don't need to define the name here? the RESULT_FILE_NAME should be defined somewhere else?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, they're defined inline:

RESULT_FILE_NAME = "run_results.json"

RESULT_FILE_NAME = "sources.json"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should put those constants in base task file or somewhere and import them in other tasks so the string is only being defined once. I will leave it to you to either do it here, a follow up PR, or not do anything.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll leave it as-is for now.



class RunOperationTask(ConfiguredTask):
Expand All @@ -22,10 +28,13 @@ def _get_macro_parts(self):
if "." in macro_name:
package_name, macro_name = macro_name.split(".", 1)
else:
package_name = None
package_name = self.config.project_name

return package_name, macro_name

def result_path(self):
aranke marked this conversation as resolved.
Show resolved Hide resolved
return os.path.join(self.config.target_path, RESULT_FILE_NAME)

def _run_unsafe(self) -> agate.Table:
adapter = get_adapter(self.config)

Expand All @@ -40,7 +49,7 @@ def _run_unsafe(self) -> agate.Table:

return res

def run(self) -> RunOperationResultsArtifact:
def run(self) -> RunResultsArtifact:
start = datetime.utcnow()
self.compile_manifest()
try:
Expand All @@ -56,11 +65,46 @@ def run(self) -> RunOperationResultsArtifact:
else:
success = True
end = datetime.utcnow()
return RunOperationResultsArtifact.from_success(

package_name, macro_name = self._get_macro_parts()
fqn = [NodeType.Operation, package_name, macro_name]
unique_id = ".".join(fqn)

run_result = RunResult(
adapter_response={},
status=RunStatus.Success if success else RunStatus.Error,
execution_time=(end - start).total_seconds(),
failures=0 if success else 1,
message=None,
node=HookNode(
alias=macro_name,
checksum=FileHash.from_contents(unique_id),
database=self.config.credentials.database,
schema=self.config.credentials.schema,
resource_type=NodeType.Operation,
fqn=fqn,
name=macro_name,
unique_id=unique_id,
package_name=package_name,
path="",
original_file_path="",
),
thread_id=threading.current_thread().name,
timing=[TimingInfo(name=macro_name, started_at=start, completed_at=end)],
)

results = RunResultsArtifact.from_execution_results(
generated_at=end,
elapsed_time=(end - start).total_seconds(),
success=success,
args={
k: v
for k, v in self.args.__dict__.items()
if k.islower() and type(v) in (str, int, float, bool, list, dict)
},
results=[run_result],
)
results.write(self.result_path())
aranke marked this conversation as resolved.
Show resolved Hide resolved
return results

def interpret_results(self, results):
return results.success
return results.results[0].status == RunStatus.Success
54 changes: 50 additions & 4 deletions tests/functional/artifacts/test_artifacts.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import dbt
import jsonschema

from dbt.tests.util import run_dbt, get_artifact, check_datetime_between
from dbt.tests.util import run_dbt, get_artifact, check_datetime_between, run_dbt_and_capture
from tests.functional.artifacts.expected_manifest import (
expected_seeded_manifest,
expected_references_manifest,
Expand All @@ -17,7 +17,7 @@
)

from dbt.contracts.graph.manifest import WritableManifest
from dbt.contracts.results import RunResultsArtifact
from dbt.contracts.results import RunResultsArtifact, RunStatus

models__schema_yml = """
version: 2
Expand Down Expand Up @@ -129,6 +129,17 @@
select * from {{ ref('seed') }}
"""

models__model_with_pre_hook_sql = """
{{
config(
pre_hook={
"sql": "{{ alter_timezone(timezone='Etc/UTC') }}"
}
)
}}
select current_setting('timezone') as timezone
"""

seed__schema_yml = """
version: 2
seeds:
Expand Down Expand Up @@ -184,6 +195,17 @@
{% endtest %}
"""

macros__alter_timezone_sql = """
{% macro alter_timezone(timezone='America/Los_Angeles') %}
{% set sql %}
SET TimeZone='{{ timezone }}';
{% endset %}

{% do run_query(sql) %}
{% do log("Timezone set to: " + timezone, info=True) %}
{% endmacro %}
"""

snapshot__snapshot_seed_sql = """
{% snapshot snapshot_seed %}
{{
Expand Down Expand Up @@ -328,7 +350,6 @@

"""


versioned_models__schema_yml = """
version: 2

Expand Down Expand Up @@ -508,7 +529,7 @@ def verify_run_results(project, expected_run_results, start_time, run_results_sc
# sort the results so we can make reasonable assertions
run_results["results"].sort(key=lambda r: r["unique_id"])
assert run_results["results"] == expected_run_results
set(run_results) == {"elapsed_time", "results", "metadata"}
assert set(run_results) == {"elapsed_time", "results", "metadata", "args"}


class BaseVerifyProject:
Expand Down Expand Up @@ -649,3 +670,28 @@ def test_versions(self, project, manifest_schema_path, run_results_schema_path):
verify_run_results(
project, expected_versions_run_results(), start_time, run_results_schema_path
)


class TestVerifyRunOperation(BaseVerifyProject):
@pytest.fixture(scope="class")
def macros(self):
return {"alter_timezone.sql": macros__alter_timezone_sql}

@pytest.fixture(scope="class")
def models(self):
return {
"model_with_pre_hook.sql": models__model_with_pre_hook_sql,
}

def test_run_operation(self, project):
results, log_output = run_dbt_and_capture(["run-operation", "alter_timezone"])
assert len(results) == 1
assert results[0].status == RunStatus.Success
assert results[0].unique_id == "operation.test.alter_timezone"
assert "Timezone set to: America/Los_Angeles" in log_output

def test_run_model(self, project):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this test is added? Seems like a test for Run not runOperation

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The run-operation is a pre-hook on the model, so I'm testing two things:

  1. That the pre-hook ran
  2. That its result isn't in run_results.json

I'm going to leave this test in unless you have a strong objection here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leaving the test makes sense! Can we rename the test to something like run model with operation and add a bit more comment about why run operation should not be in results in this case?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jtcohen6 Can you confirm this is the expected behavior here? If not, I can file a follow-up ticket to fix.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed live: Macros run as pre/post hooks on a single model aren't recorded in run_results.json — that tracks for me, given how the run results artifact works today. (Run results also don't account for multi-step materializations.)

results, log_output = run_dbt_and_capture(["run", "--select", "model_with_pre_hook"])
assert len(results) == 1
assert results[0].status == RunStatus.Success
assert "Timezone set to: Etc/UTC" in log_output
3 changes: 2 additions & 1 deletion tests/functional/run_query/test_types.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import pytest

from dbt.contracts.results import NodeStatus
from dbt.tests.util import run_dbt

macros_sql = """
Expand Down Expand Up @@ -30,4 +31,4 @@ def macros(self):

def test_nested_types(self, project):
result = run_dbt(["run-operation", "test_array_results"])
assert result.success
assert result.results[0].status == NodeStatus.Success