diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_partition_sets.py b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_partition_sets.py index 23e56f9bf2fa2..17c1be69cb378 100644 --- a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_partition_sets.py +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_partition_sets.py @@ -1,4 +1,3 @@ -import yaml from graphql.execution.base import ResolveInfo import dagster._check as check @@ -9,6 +8,7 @@ ) from dagster.core.storage.pipeline_run import RunsFilter from dagster.core.storage.tags import PARTITION_NAME_TAG, PARTITION_SET_TAG, TagType, get_tag_type +from dagster.utils.yaml_utils import dump_run_config_yaml from .utils import capture_error @@ -95,9 +95,7 @@ def get_partition_config(graphene_info, repository_handle, partition_set_name, p partition_name, ) - return GraphenePartitionRunConfig( - yaml=yaml.safe_dump(result.run_config, default_flow_style=False) - ) + return GraphenePartitionRunConfig(yaml=dump_run_config_yaml(result.run_config)) @capture_error diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/instigation.py b/python_modules/dagster-graphql/dagster_graphql/schema/instigation.py index 0f1295da81222..96abb77eaef71 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/instigation.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/instigation.py @@ -3,7 +3,6 @@ import graphene import pendulum -import yaml import dagster._check as check from dagster.core.definitions.schedule_definition import ScheduleExecutionData @@ -20,6 +19,7 @@ from dagster.core.storage.tags import TagType, get_tag_type from dagster.seven.compat.pendulum import to_timezone from dagster.utils.error import SerializableErrorInfo, serializable_error_info_from_exc_info +from dagster.utils.yaml_utils import dump_run_config_yaml from ..implementation.fetch_schedules import get_schedule_next_tick from ..implementation.fetch_sensors import get_sensor_next_tick @@ -247,7 +247,7 @@ def resolve_tags(self, _graphene_info): ] def resolve_runConfigYaml(self, _graphene_info): - return yaml.dump(self._run_request.run_config, default_flow_style=False, allow_unicode=True) + return dump_run_config_yaml(self._run_request.run_config) class GrapheneFutureInstigationTicks(graphene.ObjectType): diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/pipelines/pipeline.py b/python_modules/dagster-graphql/dagster_graphql/schema/pipelines/pipeline.py index 263ec8cc31b82..177be171b64e6 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/pipelines/pipeline.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/pipelines/pipeline.py @@ -1,7 +1,6 @@ from typing import List import graphene -import yaml from dagster_graphql.implementation.events import iterate_metadata_entries from dagster_graphql.schema.metadata import GrapheneMetadataEntry @@ -12,6 +11,7 @@ from dagster.core.storage.pipeline_run import PipelineRunStatus, RunRecord, RunsFilter from dagster.core.storage.tags import TagType, get_tag_type from dagster.utils import datetime_as_float +from dagster.utils.yaml_utils import dump_run_config_yaml from ...implementation.events import from_event_record from ...implementation.fetch_assets import get_assets_for_run_id @@ -347,9 +347,7 @@ def resolve_stepKeysToExecute(self, _graphene_info): return self._pipeline_run.step_keys_to_execute def resolve_runConfigYaml(self, _graphene_info): - return yaml.dump( - self._pipeline_run.run_config, default_flow_style=False, allow_unicode=True - ) + return dump_run_config_yaml(self._pipeline_run.run_config) def resolve_runConfig(self, _graphene_info): return self._pipeline_run.run_config @@ -665,10 +663,7 @@ def resolve_solidSelection(self, _graphene_info): return self._active_preset_data.solid_selection def resolve_runConfigYaml(self, _graphene_info): - yaml_str = yaml.safe_dump( - self._active_preset_data.run_config, default_flow_style=False, allow_unicode=True - ) - return yaml_str if yaml_str else "" + return dump_run_config_yaml(self._active_preset_data.run_config) or "" def resolve_mode(self, _graphene_info): return self._active_preset_data.mode diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/runs.py b/python_modules/dagster-graphql/dagster_graphql/schema/runs.py index 96ed9a4c57840..9944db836514f 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/runs.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/runs.py @@ -6,6 +6,7 @@ import dagster._check as check from dagster.utils.error import serializable_error_info_from_exc_info +from dagster.utils.yaml_utils import load_run_config_yaml from ..implementation.fetch_runs import get_runs, get_runs_count from ..implementation.utils import UserFacingGraphQLError @@ -149,7 +150,7 @@ class Meta: def parse_run_config_input(run_config, raise_on_error: bool): if run_config and isinstance(run_config, str): try: - return yaml.safe_load(run_config) + return load_run_config_yaml(run_config) except yaml.YAMLError: if raise_on_error: raise UserFacingGraphQLError( diff --git a/python_modules/dagster/dagster/cli/pipeline.py b/python_modules/dagster/dagster/cli/pipeline.py index dc47739c3792d..fd091ecc6a331 100644 --- a/python_modules/dagster/dagster/cli/pipeline.py +++ b/python_modules/dagster/dagster/cli/pipeline.py @@ -6,7 +6,6 @@ import click import pendulum -import yaml from tabulate import tabulate import dagster._check as check @@ -50,6 +49,7 @@ from dagster.utils.hosted_user_process import recon_pipeline_from_origin from dagster.utils.indenting_printer import IndentingPrinter from dagster.utils.interrupts import capture_interrupts +from dagster.utils.yaml_utils import dump_run_config_yaml from .config_scaffolder import scaffold_pipeline_config from .utils import get_instance_for_service @@ -717,7 +717,7 @@ def do_scaffold_command( check.bool_param(skip_non_required, "skip_non_required") config_dict = scaffold_pipeline_config(pipeline_def, skip_non_required=skip_non_required) - yaml_string = yaml.dump(config_dict, default_flow_style=False) + yaml_string = dump_run_config_yaml(config_dict) printer(yaml_string) diff --git a/python_modules/dagster/dagster/cli/sensor.py b/python_modules/dagster/dagster/cli/sensor.py index 7cb2cf252e802..81720ad7d639e 100644 --- a/python_modules/dagster/dagster/cli/sensor.py +++ b/python_modules/dagster/dagster/cli/sensor.py @@ -2,7 +2,6 @@ import sys import click -import yaml import dagster._check as check from dagster import DagsterInvariantViolationError @@ -22,6 +21,7 @@ SensorInstigatorData, ) from dagster.utils.error import serializable_error_info_from_exc_info +from dagster.utils.yaml_utils import dump_run_config_yaml @click.group(name="sensor") @@ -323,7 +323,7 @@ def execute_preview_command( "Sensor returning run requests for {num} run(s):\n\n{run_requests}".format( num=len(sensor_runtime_data.run_requests), run_requests="\n".join( - yaml.safe_dump(run_request.run_config, default_flow_style=False) + dump_run_config_yaml(run_request.run_config) for run_request in sensor_runtime_data.run_requests ), ) diff --git a/python_modules/dagster/dagster/core/definitions/preset.py b/python_modules/dagster/dagster/core/definitions/preset.py index a58ee64bc4367..073dc5df62aec 100644 --- a/python_modules/dagster/dagster/core/definitions/preset.py +++ b/python_modules/dagster/dagster/core/definitions/preset.py @@ -1,12 +1,12 @@ from typing import Dict, List, NamedTuple, Optional import pkg_resources -import yaml import dagster._check as check from dagster.core.definitions.utils import config_from_files, config_from_yaml_strings from dagster.core.errors import DagsterInvariantViolationError from dagster.utils.merger import deep_merge_dicts +from dagster.utils.yaml_utils import dump_run_config_yaml from .mode import DEFAULT_MODE_NAME from .utils import check_valid_name @@ -200,7 +200,7 @@ def get_environment_yaml(self): Returns: str: The environment dict as YAML. """ - return yaml.dump(self.run_config or {}, default_flow_style=False) + return dump_run_config_yaml(self.run_config or {}) def with_additional_config(self, run_config): """Return a new PresetDefinition with additional config merged in to the existing config.""" diff --git a/python_modules/dagster/dagster/serdes/config_class.py b/python_modules/dagster/dagster/serdes/config_class.py index 0b62a9d93499f..af39a06f3c583 100644 --- a/python_modules/dagster/dagster/serdes/config_class.py +++ b/python_modules/dagster/dagster/serdes/config_class.py @@ -2,10 +2,9 @@ from abc import ABC, abstractmethod from typing import NamedTuple -import yaml - import dagster._check as check +from ..utils.yaml_utils import load_run_config_yaml from .serdes import whitelist_for_serdes @@ -39,7 +38,7 @@ def __new__(cls, module_name, class_name, config_yaml): @property def config_dict(self): - return yaml.safe_load(self.config_yaml) + return load_run_config_yaml(self.config_yaml) def info_dict(self): return { diff --git a/python_modules/dagster/dagster/utils/yaml_utils.py b/python_modules/dagster/dagster/utils/yaml_utils.py index 6f2a1ee7bc24d..e190b4d2d33ba 100644 --- a/python_modules/dagster/dagster/utils/yaml_utils.py +++ b/python_modules/dagster/dagster/utils/yaml_utils.py @@ -1,5 +1,6 @@ import functools import glob +from typing import Any, Dict import yaml @@ -7,12 +8,56 @@ from .merger import deep_merge_dicts +YAML_TIMESTAMP_TAG = "tag:yaml.org,2002:timestamp" +YAML_STR_TAG = "tag:yaml.org,2002:str" -def load_yaml_from_globs(*globs): - return load_yaml_from_glob_list(list(globs)) +class _CanRemoveImplicitResolver: + # Adds a "remove_implicit_resolver" method that can be used to selectively + # disable default PyYAML resolvers + @classmethod + def remove_implicit_resolver(cls, tag): + # See https://github.com/yaml/pyyaml/blob/master/lib/yaml/resolver.py#L26 for inspiration + if not "yaml_implicit_resolvers" in cls.__dict__: + implicit_resolvers = {} + for key in cls.yaml_implicit_resolvers: + implicit_resolvers[key] = cls.yaml_implicit_resolvers[key][:] + cls.yaml_implicit_resolvers = implicit_resolvers -def load_yaml_from_glob_list(glob_list): + for ch, mappings in cls.yaml_implicit_resolvers.items(): + cls.yaml_implicit_resolvers[ch] = [ + (existing_tag, regexp) for existing_tag, regexp in mappings if existing_tag != tag + ] + + +# Handles strings with leading 0s being unexpectedly parsed as octal ints +# See: https://github.com/yaml/pyyaml/issues/98#issuecomment-436814271 +def _octal_string_representer(dumper, value): + if value.startswith("0"): + return dumper.represent_scalar(YAML_STR_TAG, value, style="'") + return dumper.represent_scalar(YAML_STR_TAG, value) + + +class DagsterRunConfigYamlLoader(yaml.SafeLoader, _CanRemoveImplicitResolver): + pass + + +DagsterRunConfigYamlLoader.remove_implicit_resolver(YAML_TIMESTAMP_TAG) + + +class DagsterRunConfigYamlDumper(yaml.SafeDumper, _CanRemoveImplicitResolver): + pass + + +DagsterRunConfigYamlDumper.remove_implicit_resolver(YAML_TIMESTAMP_TAG) +DagsterRunConfigYamlDumper.add_representer(str, _octal_string_representer) + + +def load_yaml_from_globs(*globs, loader=DagsterRunConfigYamlLoader): + return load_yaml_from_glob_list(list(globs), loader=loader) + + +def load_yaml_from_glob_list(glob_list, loader=DagsterRunConfigYamlLoader): check.list_param(glob_list, "glob_list", of_type=str) all_files_list = [] @@ -20,10 +65,10 @@ def load_yaml_from_glob_list(glob_list): for env_file_pattern in glob_list: all_files_list.extend(glob.glob(env_file_pattern)) - return merge_yamls(all_files_list) + return merge_yamls(all_files_list, loader=loader) -def merge_yamls(file_list): +def merge_yamls(file_list, loader=DagsterRunConfigYamlLoader): """Combine a list of YAML files into a dictionary. Args: @@ -40,7 +85,7 @@ def merge_yamls(file_list): merged = {} for yaml_file in file_list: - yaml_dict = load_yaml_from_path(yaml_file) or {} + yaml_dict = load_yaml_from_path(yaml_file, loader=loader) or {} check.invariant( isinstance(yaml_dict, dict), @@ -54,7 +99,7 @@ def merge_yamls(file_list): return merged -def merge_yaml_strings(yaml_strs): +def merge_yaml_strings(yaml_strs, loader=DagsterRunConfigYamlLoader): """Combine a list of YAML strings into a dictionary. Right-most overrides left-most. Args: @@ -69,7 +114,7 @@ def merge_yaml_strings(yaml_strs): check.list_param(yaml_strs, "yaml_strs", of_type=str) # Read YAML strings. - yaml_dicts = list([yaml.safe_load(y) for y in yaml_strs]) + yaml_dicts = list([yaml.load(y, Loader=loader) for y in yaml_strs]) for yaml_dict in yaml_dicts: check.invariant( @@ -80,7 +125,17 @@ def merge_yaml_strings(yaml_strs): return functools.reduce(deep_merge_dicts, yaml_dicts, {}) -def load_yaml_from_path(path: str) -> object: +def load_yaml_from_path(path: str, loader=DagsterRunConfigYamlLoader) -> object: check.str_param(path, "path") with open(path, "r", encoding="utf8") as ff: - return yaml.safe_load(ff) + return yaml.load(ff, Loader=loader) + + +def load_run_config_yaml(yaml_str: str): + return yaml.load(yaml_str, Loader=DagsterRunConfigYamlLoader) + + +def dump_run_config_yaml(run_config: Dict[str, Any]) -> str: + return yaml.dump( + run_config, Dumper=DagsterRunConfigYamlDumper, default_flow_style=False, allow_unicode=True + ) diff --git a/python_modules/dagster/dagster_tests/general_tests/utils_tests/test_yaml.py b/python_modules/dagster/dagster_tests/general_tests/utils_tests/test_yaml.py index 50badf38dea46..7e673bc585858 100644 --- a/python_modules/dagster/dagster_tests/general_tests/utils_tests/test_yaml.py +++ b/python_modules/dagster/dagster_tests/general_tests/utils_tests/test_yaml.py @@ -1,3 +1,4 @@ +import datetime import os import pytest @@ -6,6 +7,8 @@ import dagster._check as check from dagster.utils import file_relative_path from dagster.utils.yaml_utils import ( + dump_run_config_yaml, + load_run_config_yaml, load_yaml_from_glob_list, load_yaml_from_globs, load_yaml_from_path, @@ -111,3 +114,37 @@ def test_merge_yaml_strings(): ): bad_yaml = "--- `" merge_yaml_strings([a, bad_yaml]) + + +def test_dump_octal_string(): + + octal_str_list = {"keys": ["0001823", "0001234"]} + + # normal dump parses the first string as an int + assert yaml.safe_dump(octal_str_list) == "keys:\n- 0001823\n- '0001234'\n" + + # our dump does not + assert dump_run_config_yaml(octal_str_list) == "keys:\n- '0001823'\n- '0001234'\n" + + +def test_load_datetime_string(): + date_config_yaml = """ops: + my_op: + config: + start: 2022-06-10T00:00:00.000000+00:00""" + + # normal dump parses as a datetime + assert yaml.safe_load(date_config_yaml) == { + "ops": { + "my_op": { + "config": { + "start": datetime.datetime(2022, 6, 10, 0, 0, tzinfo=datetime.timezone.utc) + } + } + } + } + + # ours does not + assert load_run_config_yaml(date_config_yaml) == { + "ops": {"my_op": {"config": {"start": "2022-06-10T00:00:00.000000+00:00"}}} + } diff --git a/python_modules/libraries/dagster-airflow/dagster_airflow/cli.py b/python_modules/libraries/dagster-airflow/dagster_airflow/cli.py index 111f6de593811..70060a3d07d40 100644 --- a/python_modules/libraries/dagster-airflow/dagster_airflow/cli.py +++ b/python_modules/libraries/dagster-airflow/dagster_airflow/cli.py @@ -2,13 +2,13 @@ from datetime import datetime, timedelta import click -import yaml import dagster._check as check from dagster.cli.load_handle import recon_repo_for_cli_args from dagster.utils import load_yaml_from_glob_list from dagster.utils.backcompat import canonicalize_backcompat_args from dagster.utils.indenting_printer import IndentingStringIoPrinter +from dagster.utils.yaml_utils import dump_run_config_yaml def construct_environment_yaml(preset_name, config, pipeline_name, module_name): @@ -59,7 +59,7 @@ def construct_scaffolded_file_contents(module_name, job_name, run_config): printer.comment("#") printer.line("#" * 80) printer.line("ENVIRONMENT = '''") - printer.line(yaml.dump(run_config, default_flow_style=False)) + printer.line(dump_run_config_yaml(run_config)) printer.line("'''") printer.blank_line() printer.blank_line()