From 83e1fc7ea3e02ed2482fe9919ea5bcce68030020 Mon Sep 17 00:00:00 2001 From: Ryan Abernathey Date: Sat, 16 Jan 2021 10:48:30 -0500 Subject: [PATCH] fix type hints --- rechunker/executors/dask.py | 15 ++++++++++++--- rechunker/executors/pipeline.py | 7 ++++--- setup.cfg | 2 ++ 3 files changed, 18 insertions(+), 6 deletions(-) diff --git a/rechunker/executors/dask.py b/rechunker/executors/dask.py index 1a26c61..36fe3e5 100644 --- a/rechunker/executors/dask.py +++ b/rechunker/executors/dask.py @@ -52,7 +52,12 @@ def _make_stage(stage: Stage) -> Delayed: dsk = {(name, i): (stage.func, arg) for i, arg in enumerate(stage.map_args)} # create a barrier top_key = "stage-" + dask.base.tokenize(stage.func, stage.map_args) - dsk[top_key] = (lambda *args: None, *list(dsk)) + + def merge_all(*args): + # this function is dependent on its arguments but doesn't actually do anything + return None + + dsk.update({top_key: (merge_all, *list(dsk))}) return Delayed(top_key, dsk) @@ -62,9 +67,13 @@ def _merge_task(*args): def _merge(*args: Iterable[Delayed]) -> Delayed: name = "merge-" + dask.base.tokenize(*args) - keys = [arg.key for arg in args] + # mypy doesn't like arg.key + keys = [getattr(arg, "key") for arg in args] new_task = (_merge_task, *keys) - graph = dask.base.merge(*[dask.utils.ensure_dict(d.dask) for d in args]) + # mypy doesn't like arg.dask + graph = dask.base.merge( + *[dask.utils.ensure_dict(getattr(arg, "dask")) for arg in args] + ) graph[name] = new_task d = Delayed(name, graph) return d diff --git a/rechunker/executors/pipeline.py b/rechunker/executors/pipeline.py index 5a416b5..b121d70 100644 --- a/rechunker/executors/pipeline.py +++ b/rechunker/executors/pipeline.py @@ -1,12 +1,13 @@ import itertools import math -from typing import Iterable, Iterator, Tuple, TypeVar +from typing import Iterable, Iterator, Tuple, TypeVar, Any import dask import numpy as np from rechunker.types import ( CopySpec, + CopySpecExecutor, MultiStagePipeline, ParallelPipelines, ReadableArray, @@ -71,12 +72,12 @@ def specs_to_pipelines(specs: Iterable[CopySpec]) -> ParallelPipelines: T = TypeVar("T") -class CopySpecToPipelinesMixin: +class CopySpecToPipelinesMixin(CopySpecExecutor): # This signature doesn't work as a mixin because we don't know what type T is def prepare_plan(self, specs: Iterable[CopySpec]) -> T: pipelines = specs_to_pipelines(specs) return self.pipelines_to_plan(pipelines) - def pipelines_to_plan(self, pipelines: ParallelPipelines) -> T: + def pipelines_to_plan(self, pipelines: ParallelPipelines) -> Any: """Transform ParallelPiplines to an execution plan""" raise NotImplementedError diff --git a/setup.cfg b/setup.cfg index 9a326ca..d514d49 100644 --- a/setup.cfg +++ b/setup.cfg @@ -26,5 +26,7 @@ ignore_missing_imports = True ignore_missing_imports = True [mypy-pywren_ibm_cloud.*] ignore_missing_imports = True +[mypy-numpy.*] +ignore_missing_imports = True [mypy-zarr.*] ignore_missing_imports = True