Skip to content

Commit

Permalink
use a custom yaml loader/dumper for loading and dumping run config, w…
Browse files Browse the repository at this point in the history
…ithout surprising datetime parsing or octal string behavior (#8621)

Summary:
Switching dagit to send and receive data in YAML wasn't quite sufficient to handle weird edge cases that pyyaml handles strangely:
- datetime-like strings in config are parsed as datetimes, even though that's not a valid dagster config type
- Octal strings without any 8s or higher in them are bizarrely parsed as integers

This adds a custom yaml loader and dumper that we can customize to change behaviors like this. It digs a little bit into pyyaml internals to do so, but I haven't been able to find another way to customize the default tag behaviors of Pyyaml.
  • Loading branch information
gibsondan authored Jun 28, 2022
1 parent 54b173f commit 049cf60
Show file tree
Hide file tree
Showing 11 changed files with 121 additions and 36 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import yaml
from graphql.execution.base import ResolveInfo

import dagster._check as check
Expand All @@ -9,6 +8,7 @@
)
from dagster.core.storage.pipeline_run import RunsFilter
from dagster.core.storage.tags import PARTITION_NAME_TAG, PARTITION_SET_TAG, TagType, get_tag_type
from dagster.utils.yaml_utils import dump_run_config_yaml

from .utils import capture_error

Expand Down Expand Up @@ -95,9 +95,7 @@ def get_partition_config(graphene_info, repository_handle, partition_set_name, p
partition_name,
)

return GraphenePartitionRunConfig(
yaml=yaml.safe_dump(result.run_config, default_flow_style=False)
)
return GraphenePartitionRunConfig(yaml=dump_run_config_yaml(result.run_config))


@capture_error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

import graphene
import pendulum
import yaml

import dagster._check as check
from dagster.core.definitions.schedule_definition import ScheduleExecutionData
Expand All @@ -20,6 +19,7 @@
from dagster.core.storage.tags import TagType, get_tag_type
from dagster.seven.compat.pendulum import to_timezone
from dagster.utils.error import SerializableErrorInfo, serializable_error_info_from_exc_info
from dagster.utils.yaml_utils import dump_run_config_yaml

from ..implementation.fetch_schedules import get_schedule_next_tick
from ..implementation.fetch_sensors import get_sensor_next_tick
Expand Down Expand Up @@ -247,7 +247,7 @@ def resolve_tags(self, _graphene_info):
]

def resolve_runConfigYaml(self, _graphene_info):
return yaml.dump(self._run_request.run_config, default_flow_style=False, allow_unicode=True)
return dump_run_config_yaml(self._run_request.run_config)


class GrapheneFutureInstigationTicks(graphene.ObjectType):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from typing import List

import graphene
import yaml
from dagster_graphql.implementation.events import iterate_metadata_entries
from dagster_graphql.schema.metadata import GrapheneMetadataEntry

Expand All @@ -12,6 +11,7 @@
from dagster.core.storage.pipeline_run import PipelineRunStatus, RunRecord, RunsFilter
from dagster.core.storage.tags import TagType, get_tag_type
from dagster.utils import datetime_as_float
from dagster.utils.yaml_utils import dump_run_config_yaml

from ...implementation.events import from_event_record
from ...implementation.fetch_assets import get_assets_for_run_id
Expand Down Expand Up @@ -347,9 +347,7 @@ def resolve_stepKeysToExecute(self, _graphene_info):
return self._pipeline_run.step_keys_to_execute

def resolve_runConfigYaml(self, _graphene_info):
return yaml.dump(
self._pipeline_run.run_config, default_flow_style=False, allow_unicode=True
)
return dump_run_config_yaml(self._pipeline_run.run_config)

def resolve_runConfig(self, _graphene_info):
return self._pipeline_run.run_config
Expand Down Expand Up @@ -665,10 +663,7 @@ def resolve_solidSelection(self, _graphene_info):
return self._active_preset_data.solid_selection

def resolve_runConfigYaml(self, _graphene_info):
yaml_str = yaml.safe_dump(
self._active_preset_data.run_config, default_flow_style=False, allow_unicode=True
)
return yaml_str if yaml_str else ""
return dump_run_config_yaml(self._active_preset_data.run_config) or ""

def resolve_mode(self, _graphene_info):
return self._active_preset_data.mode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import dagster._check as check
from dagster.utils.error import serializable_error_info_from_exc_info
from dagster.utils.yaml_utils import load_run_config_yaml

from ..implementation.fetch_runs import get_runs, get_runs_count
from ..implementation.utils import UserFacingGraphQLError
Expand Down Expand Up @@ -149,7 +150,7 @@ class Meta:
def parse_run_config_input(run_config, raise_on_error: bool):
if run_config and isinstance(run_config, str):
try:
return yaml.safe_load(run_config)
return load_run_config_yaml(run_config)
except yaml.YAMLError:
if raise_on_error:
raise UserFacingGraphQLError(
Expand Down
4 changes: 2 additions & 2 deletions python_modules/dagster/dagster/cli/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

import click
import pendulum
import yaml
from tabulate import tabulate

import dagster._check as check
Expand Down Expand Up @@ -50,6 +49,7 @@
from dagster.utils.hosted_user_process import recon_pipeline_from_origin
from dagster.utils.indenting_printer import IndentingPrinter
from dagster.utils.interrupts import capture_interrupts
from dagster.utils.yaml_utils import dump_run_config_yaml

from .config_scaffolder import scaffold_pipeline_config
from .utils import get_instance_for_service
Expand Down Expand Up @@ -717,7 +717,7 @@ def do_scaffold_command(
check.bool_param(skip_non_required, "skip_non_required")

config_dict = scaffold_pipeline_config(pipeline_def, skip_non_required=skip_non_required)
yaml_string = yaml.dump(config_dict, default_flow_style=False)
yaml_string = dump_run_config_yaml(config_dict)
printer(yaml_string)


Expand Down
4 changes: 2 additions & 2 deletions python_modules/dagster/dagster/cli/sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import sys

import click
import yaml

import dagster._check as check
from dagster import DagsterInvariantViolationError
Expand All @@ -22,6 +21,7 @@
SensorInstigatorData,
)
from dagster.utils.error import serializable_error_info_from_exc_info
from dagster.utils.yaml_utils import dump_run_config_yaml


@click.group(name="sensor")
Expand Down Expand Up @@ -323,7 +323,7 @@ def execute_preview_command(
"Sensor returning run requests for {num} run(s):\n\n{run_requests}".format(
num=len(sensor_runtime_data.run_requests),
run_requests="\n".join(
yaml.safe_dump(run_request.run_config, default_flow_style=False)
dump_run_config_yaml(run_request.run_config)
for run_request in sensor_runtime_data.run_requests
),
)
Expand Down
4 changes: 2 additions & 2 deletions python_modules/dagster/dagster/core/definitions/preset.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
from typing import Dict, List, NamedTuple, Optional

import pkg_resources
import yaml

import dagster._check as check
from dagster.core.definitions.utils import config_from_files, config_from_yaml_strings
from dagster.core.errors import DagsterInvariantViolationError
from dagster.utils.merger import deep_merge_dicts
from dagster.utils.yaml_utils import dump_run_config_yaml

from .mode import DEFAULT_MODE_NAME
from .utils import check_valid_name
Expand Down Expand Up @@ -200,7 +200,7 @@ def get_environment_yaml(self):
Returns:
str: The environment dict as YAML.
"""
return yaml.dump(self.run_config or {}, default_flow_style=False)
return dump_run_config_yaml(self.run_config or {})

def with_additional_config(self, run_config):
"""Return a new PresetDefinition with additional config merged in to the existing config."""
Expand Down
5 changes: 2 additions & 3 deletions python_modules/dagster/dagster/serdes/config_class.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@
from abc import ABC, abstractmethod
from typing import NamedTuple

import yaml

import dagster._check as check

from ..utils.yaml_utils import load_run_config_yaml
from .serdes import whitelist_for_serdes


Expand Down Expand Up @@ -39,7 +38,7 @@ def __new__(cls, module_name, class_name, config_yaml):

@property
def config_dict(self):
return yaml.safe_load(self.config_yaml)
return load_run_config_yaml(self.config_yaml)

def info_dict(self):
return {
Expand Down
75 changes: 65 additions & 10 deletions python_modules/dagster/dagster/utils/yaml_utils.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,74 @@
import functools
import glob
from typing import Any, Dict

import yaml

import dagster._check as check

from .merger import deep_merge_dicts

YAML_TIMESTAMP_TAG = "tag:yaml.org,2002:timestamp"
YAML_STR_TAG = "tag:yaml.org,2002:str"

def load_yaml_from_globs(*globs):
return load_yaml_from_glob_list(list(globs))

class _CanRemoveImplicitResolver:
# Adds a "remove_implicit_resolver" method that can be used to selectively
# disable default PyYAML resolvers
@classmethod
def remove_implicit_resolver(cls, tag):
# See https://github.com/yaml/pyyaml/blob/master/lib/yaml/resolver.py#L26 for inspiration
if not "yaml_implicit_resolvers" in cls.__dict__:
implicit_resolvers = {}
for key in cls.yaml_implicit_resolvers:
implicit_resolvers[key] = cls.yaml_implicit_resolvers[key][:]
cls.yaml_implicit_resolvers = implicit_resolvers

def load_yaml_from_glob_list(glob_list):
for ch, mappings in cls.yaml_implicit_resolvers.items():
cls.yaml_implicit_resolvers[ch] = [
(existing_tag, regexp) for existing_tag, regexp in mappings if existing_tag != tag
]


# Handles strings with leading 0s being unexpectedly parsed as octal ints
# See: https://github.com/yaml/pyyaml/issues/98#issuecomment-436814271
def _octal_string_representer(dumper, value):
if value.startswith("0"):
return dumper.represent_scalar(YAML_STR_TAG, value, style="'")
return dumper.represent_scalar(YAML_STR_TAG, value)


class DagsterRunConfigYamlLoader(yaml.SafeLoader, _CanRemoveImplicitResolver):
pass


DagsterRunConfigYamlLoader.remove_implicit_resolver(YAML_TIMESTAMP_TAG)


class DagsterRunConfigYamlDumper(yaml.SafeDumper, _CanRemoveImplicitResolver):
pass


DagsterRunConfigYamlDumper.remove_implicit_resolver(YAML_TIMESTAMP_TAG)
DagsterRunConfigYamlDumper.add_representer(str, _octal_string_representer)


def load_yaml_from_globs(*globs, loader=DagsterRunConfigYamlLoader):
return load_yaml_from_glob_list(list(globs), loader=loader)


def load_yaml_from_glob_list(glob_list, loader=DagsterRunConfigYamlLoader):
check.list_param(glob_list, "glob_list", of_type=str)

all_files_list = []

for env_file_pattern in glob_list:
all_files_list.extend(glob.glob(env_file_pattern))

return merge_yamls(all_files_list)
return merge_yamls(all_files_list, loader=loader)


def merge_yamls(file_list):
def merge_yamls(file_list, loader=DagsterRunConfigYamlLoader):
"""Combine a list of YAML files into a dictionary.
Args:
Expand All @@ -40,7 +85,7 @@ def merge_yamls(file_list):
merged = {}

for yaml_file in file_list:
yaml_dict = load_yaml_from_path(yaml_file) or {}
yaml_dict = load_yaml_from_path(yaml_file, loader=loader) or {}

check.invariant(
isinstance(yaml_dict, dict),
Expand All @@ -54,7 +99,7 @@ def merge_yamls(file_list):
return merged


def merge_yaml_strings(yaml_strs):
def merge_yaml_strings(yaml_strs, loader=DagsterRunConfigYamlLoader):
"""Combine a list of YAML strings into a dictionary. Right-most overrides left-most.
Args:
Expand All @@ -69,7 +114,7 @@ def merge_yaml_strings(yaml_strs):
check.list_param(yaml_strs, "yaml_strs", of_type=str)

# Read YAML strings.
yaml_dicts = list([yaml.safe_load(y) for y in yaml_strs])
yaml_dicts = list([yaml.load(y, Loader=loader) for y in yaml_strs])

for yaml_dict in yaml_dicts:
check.invariant(
Expand All @@ -80,7 +125,17 @@ def merge_yaml_strings(yaml_strs):
return functools.reduce(deep_merge_dicts, yaml_dicts, {})


def load_yaml_from_path(path: str) -> object:
def load_yaml_from_path(path: str, loader=DagsterRunConfigYamlLoader) -> object:
check.str_param(path, "path")
with open(path, "r", encoding="utf8") as ff:
return yaml.safe_load(ff)
return yaml.load(ff, Loader=loader)


def load_run_config_yaml(yaml_str: str):
return yaml.load(yaml_str, Loader=DagsterRunConfigYamlLoader)


def dump_run_config_yaml(run_config: Dict[str, Any]) -> str:
return yaml.dump(
run_config, Dumper=DagsterRunConfigYamlDumper, default_flow_style=False, allow_unicode=True
)
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import datetime
import os

import pytest
Expand All @@ -6,6 +7,8 @@
import dagster._check as check
from dagster.utils import file_relative_path
from dagster.utils.yaml_utils import (
dump_run_config_yaml,
load_run_config_yaml,
load_yaml_from_glob_list,
load_yaml_from_globs,
load_yaml_from_path,
Expand Down Expand Up @@ -111,3 +114,37 @@ def test_merge_yaml_strings():
):
bad_yaml = "--- `"
merge_yaml_strings([a, bad_yaml])


def test_dump_octal_string():

octal_str_list = {"keys": ["0001823", "0001234"]}

# normal dump parses the first string as an int
assert yaml.safe_dump(octal_str_list) == "keys:\n- 0001823\n- '0001234'\n"

# our dump does not
assert dump_run_config_yaml(octal_str_list) == "keys:\n- '0001823'\n- '0001234'\n"


def test_load_datetime_string():
date_config_yaml = """ops:
my_op:
config:
start: 2022-06-10T00:00:00.000000+00:00"""

# normal dump parses as a datetime
assert yaml.safe_load(date_config_yaml) == {
"ops": {
"my_op": {
"config": {
"start": datetime.datetime(2022, 6, 10, 0, 0, tzinfo=datetime.timezone.utc)
}
}
}
}

# ours does not
assert load_run_config_yaml(date_config_yaml) == {
"ops": {"my_op": {"config": {"start": "2022-06-10T00:00:00.000000+00:00"}}}
}
Loading

0 comments on commit 049cf60

Please sign in to comment.