From a63f8252f51ce19735d6aed46c2e7479c59ca275 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Fri, 24 Mar 2023 23:40:33 -0700 Subject: [PATCH 1/4] fix cache Signed-off-by: Eric Liang --- python/ray/data/_internal/plan.py | 2 +- .../data/tests/test_dataset_consumption.py | 27 +++++++++++++++++++ 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/python/ray/data/_internal/plan.py b/python/ray/data/_internal/plan.py index 62dd255cc30c..68251d0caaed 100644 --- a/python/ray/data/_internal/plan.py +++ b/python/ray/data/_internal/plan.py @@ -497,7 +497,7 @@ def execute_to_iterator( """ ctx = DatasetContext.get_current() - if not ctx.use_streaming_executor: + if not ctx.use_streaming_executor or self.has_computed_output(): return ( self.execute(allow_clear_input_blocks, force_read).iter_blocks(), self._snapshot_stats, diff --git a/python/ray/data/tests/test_dataset_consumption.py b/python/ray/data/tests/test_dataset_consumption.py index f4882dc4e743..208368cc56c5 100644 --- a/python/ray/data/tests/test_dataset_consumption.py +++ b/python/ray/data/tests/test_dataset_consumption.py @@ -180,6 +180,33 @@ def test_empty_dataset(ray_start_regular_shared): assert ds.count() == 0 +def test_cache_dataset(ray_start_regular_shared): + @ray.remote + class Counter: + def __init__(self): + self.i = 0 + + def inc(self): + print("INC") + self.i += 1 + return self.i + + c = Counter.remote() + + def inc(x): + ray.get(c.inc.remote()) + return x + + ds = ray.data.range(1) + ds = ds.map(inc) + ds = ds.cache() + + for _ in range(10): + ds.take_all() + + assert ray.get(c.inc.remote()) == 2 + + def test_schema(ray_start_regular_shared): ds = ray.data.range(10, parallelism=10) ds2 = ray.data.range_table(10, parallelism=10) From 8f4f2ef1f9f1e4633877295758800970ae450cd5 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Fri, 24 Mar 2023 23:40:40 -0700 Subject: [PATCH 2/4] lint Signed-off-by: Eric Liang --- python/ray/data/tests/test_dataset_consumption.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/data/tests/test_dataset_consumption.py b/python/ray/data/tests/test_dataset_consumption.py index 208368cc56c5..affa3f2ceae0 100644 --- a/python/ray/data/tests/test_dataset_consumption.py +++ b/python/ray/data/tests/test_dataset_consumption.py @@ -190,7 +190,7 @@ def inc(self): print("INC") self.i += 1 return self.i - + c = Counter.remote() def inc(x): From 1ec81fa2a80fa159d119cc37cfa001a988ae92c2 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Sat, 25 Mar 2023 13:40:06 -0700 Subject: [PATCH 3/4] remove unneeded test specializations for streaming Signed-off-by: Eric Liang --- python/ray/data/tests/test_dataset_parquet.py | 67 +++++++++---------- python/ray/data/tests/test_stats.py | 23 ++----- 2 files changed, 34 insertions(+), 56 deletions(-) diff --git a/python/ray/data/tests/test_dataset_parquet.py b/python/ray/data/tests/test_dataset_parquet.py index 15345b5942c3..d1a1335d3953 100644 --- a/python/ray/data/tests/test_dataset_parquet.py +++ b/python/ray/data/tests/test_dataset_parquet.py @@ -33,15 +33,8 @@ from pytest_lazyfixture import lazy_fixture -def check_num_computed(ds, expected, streaming_expected) -> None: - # When streaming executor is on, the _num_computed() is affected only - # by the ds.schema() which will still partial read the blocks, but will - # not affected by operations like take() as it's executed via streaming - # executor. - if not ray.data.context.DatasetContext.get_current().use_streaming_executor: - assert ds._plan.execute()._num_computed() == expected - else: - assert ds._plan.execute()._num_computed() == streaming_expected +def check_num_computed(ds, expected) -> None: + assert ds._plan.execute()._num_computed() == expected @pytest.mark.parametrize( @@ -135,11 +128,11 @@ def test_parquet_read_basic(ray_start_regular_shared, fs, data_path): ds = ray.data.read_parquet(data_path, filesystem=fs) # Test metadata-only parquet ops. - check_num_computed(ds, 0, 0) + check_num_computed(ds, 0) assert ds.count() == 6 assert ds.size_bytes() > 0 assert ds.schema() is not None - check_num_computed(ds, 1, 1) + check_num_computed(ds, 1) input_files = ds.input_files() assert len(input_files) == 2, input_files assert "test1.parquet" in str(input_files) @@ -152,11 +145,11 @@ def test_parquet_read_basic(ray_start_regular_shared, fs, data_path): repr(ds) == "Dataset(num_blocks=2, num_rows=6, " "schema={one: int64, two: string})" ), ds - check_num_computed(ds, 1, 1) + check_num_computed(ds, 1) # Forces a data read. values = [[s["one"], s["two"]] for s in ds.take_all()] - check_num_computed(ds, 2, 1) + check_num_computed(ds, 2) assert sorted(values) == [ [1, "a"], [2, "b"], @@ -214,7 +207,7 @@ def prefetch_file_metadata(self, pieces): ) # Expect to lazily compute all metadata correctly. - check_num_computed(ds, 0, 0) + check_num_computed(ds, 0) assert ds.count() == 6 assert ds.size_bytes() > 0 assert ds.schema() is not None @@ -230,11 +223,11 @@ def prefetch_file_metadata(self, pieces): repr(ds) == "Dataset(num_blocks=2, num_rows=6, " "schema={one: int64, two: string})" ), ds - check_num_computed(ds, 2, 2) + check_num_computed(ds, 2) # Forces a data read. values = [[s["one"], s["two"]] for s in ds.take()] - check_num_computed(ds, 2, 2) + check_num_computed(ds, 2) assert sorted(values) == [ [1, "a"], [2, "b"], @@ -291,7 +284,7 @@ def test_parquet_read_bulk(ray_start_regular_shared, fs, data_path): assert ds._meta_count() is None # Expect to lazily compute all metadata correctly. - check_num_computed(ds, 0, 0) + check_num_computed(ds, 0) assert ds.count() == 6 assert ds.size_bytes() > 0 assert ds.schema() is not None @@ -307,11 +300,11 @@ def test_parquet_read_bulk(ray_start_regular_shared, fs, data_path): repr(ds) == "Dataset(num_blocks=2, num_rows=6, " "schema={one: int64, two: string})" ), ds - check_num_computed(ds, 2, 2) + check_num_computed(ds, 2) # Forces a data read. values = [[s["one"], s["two"]] for s in ds.take()] - check_num_computed(ds, 2, 2) + check_num_computed(ds, 2) assert sorted(values) == [ [1, "a"], [2, "b"], @@ -332,7 +325,7 @@ def test_parquet_read_bulk(ray_start_regular_shared, fs, data_path): # Forces a data read. values = [[s["one"], s["two"]] for s in ds.take()] - check_num_computed(ds, 2, 0) + check_num_computed(ds, 2) assert sorted(values) == [ [1, "a"], [2, "b"], @@ -381,7 +374,7 @@ def test_parquet_read_bulk_meta_provider(ray_start_regular_shared, fs, data_path assert ds._meta_count() is None # Expect to lazily compute all metadata correctly. - check_num_computed(ds, 0, 0) + check_num_computed(ds, 0) assert ds.count() == 6 assert ds.size_bytes() > 0 assert ds.schema() is not None @@ -397,11 +390,11 @@ def test_parquet_read_bulk_meta_provider(ray_start_regular_shared, fs, data_path repr(ds) == "Dataset(num_blocks=2, num_rows=6, " "schema={one: int64, two: string})" ), ds - check_num_computed(ds, 2, 2) + check_num_computed(ds, 2) # Forces a data read. values = [[s["one"], s["two"]] for s in ds.take()] - check_num_computed(ds, 2, 2) + check_num_computed(ds, 2) assert sorted(values) == [ [1, "a"], [2, "b"], @@ -440,7 +433,7 @@ def test_parquet_read_partitioned(ray_start_regular_shared, fs, data_path): ds = ray.data.read_parquet(data_path, filesystem=fs) # Test metadata-only parquet ops. - check_num_computed(ds, 0, 0) + check_num_computed(ds, 0) assert ds.count() == 6 assert ds.size_bytes() > 0 assert ds.schema() is not None @@ -462,11 +455,11 @@ def test_parquet_read_partitioned(ray_start_regular_shared, fs, data_path): "one: dictionary}\n" ")" ), ds - check_num_computed(ds, 1, 1) + check_num_computed(ds, 1) # Forces a data read. values = [[s["one"], s["two"]] for s in ds.take()] - check_num_computed(ds, 2, 1) + check_num_computed(ds, 2) assert sorted(values) == [ [1, "a"], [1, "b"], @@ -498,7 +491,7 @@ def test_parquet_read_partitioned_with_filter(ray_start_regular_shared, tmp_path ) values = [[s["one"], s["two"]] for s in ds.take()] - check_num_computed(ds, 1, 0) + check_num_computed(ds, 1) assert sorted(values) == [[1, "a"], [1, "a"]] # 2 partitions, 1 empty partition, 2 block/read tasks, 1 empty block @@ -508,7 +501,7 @@ def test_parquet_read_partitioned_with_filter(ray_start_regular_shared, tmp_path ) values = [[s["one"], s["two"]] for s in ds.take()] - check_num_computed(ds, 2, 0) + check_num_computed(ds, 2) assert sorted(values) == [[1, "a"], [1, "a"]] @@ -532,7 +525,7 @@ def test_parquet_read_partitioned_explicit(ray_start_regular_shared, tmp_path): ) # Test metadata-only parquet ops. - check_num_computed(ds, 0, 0) + check_num_computed(ds, 0) assert ds.count() == 6 assert ds.size_bytes() > 0 assert ds.schema() is not None @@ -546,11 +539,11 @@ def test_parquet_read_partitioned_explicit(ray_start_regular_shared, tmp_path): repr(ds) == "Dataset(num_blocks=2, num_rows=6, " "schema={two: string, one: int32})" ), ds - check_num_computed(ds, 1, 1) + check_num_computed(ds, 1) # Forces a data read. values = [[s["one"], s["two"]] for s in ds.take()] - check_num_computed(ds, 2, 1) + check_num_computed(ds, 2) assert sorted(values) == [ [1, "a"], [1, "b"], @@ -579,7 +572,7 @@ def _block_udf(block: pa.Table): ds = ray.data.read_parquet(str(tmp_path), parallelism=1, _block_udf=_block_udf) ones, twos = zip(*[[s["one"], s["two"]] for s in ds.take()]) - check_num_computed(ds, 1, 0) + check_num_computed(ds, 1) np.testing.assert_array_equal(sorted(ones), np.array(one_data) + 1) # 2 blocks/read tasks @@ -587,7 +580,7 @@ def _block_udf(block: pa.Table): ds = ray.data.read_parquet(str(tmp_path), parallelism=2, _block_udf=_block_udf) ones, twos = zip(*[[s["one"], s["two"]] for s in ds.take()]) - check_num_computed(ds, 2, 0) + check_num_computed(ds, 2) np.testing.assert_array_equal(sorted(ones), np.array(one_data) + 1) # 2 blocks/read tasks, 1 empty block @@ -600,7 +593,7 @@ def _block_udf(block: pa.Table): ) ones, twos = zip(*[[s["one"], s["two"]] for s in ds.take()]) - check_num_computed(ds, 2, 0) + check_num_computed(ds, 2) np.testing.assert_array_equal(sorted(ones), np.array(one_data[:2]) + 1) @@ -630,17 +623,17 @@ def test_parquet_read_parallel_meta_fetch(ray_start_regular_shared, fs, data_pat ds = ray.data.read_parquet(data_path, filesystem=fs, parallelism=parallelism) # Test metadata-only parquet ops. - check_num_computed(ds, 0, 0) + check_num_computed(ds, 0) assert ds.count() == num_dfs * 3 assert ds.size_bytes() > 0 assert ds.schema() is not None input_files = ds.input_files() assert len(input_files) == num_dfs, input_files - check_num_computed(ds, 1, 1) + check_num_computed(ds, 1) # Forces a data read. values = [s["one"] for s in ds.take(limit=3 * num_dfs)] - check_num_computed(ds, parallelism, 1) + check_num_computed(ds, parallelism) assert sorted(values) == list(range(3 * num_dfs)) diff --git a/python/ray/data/tests/test_stats.py b/python/ray/data/tests/test_stats.py index 0c4dc3c84367..2eeab5edb43a 100644 --- a/python/ray/data/tests/test_stats.py +++ b/python/ray/data/tests/test_stats.py @@ -121,24 +121,9 @@ def test_dataset_stats_basic(ray_start_regular_shared, enable_auto_log_stats): stats = canonicalize(ds.cache().stats()) if context.new_execution_backend: - if context.use_streaming_executor: - assert ( - stats - == """Stage N ReadRange->MapBatches(dummy_map_batches)->Map: N/N blocks executed in T -* Remote wall time: T min, T max, T mean, T total -* Remote cpu time: T min, T max, T mean, T total -* Peak heap memory usage (MiB): N min, N max, N mean -* Output num rows: N min, N max, N mean, N total -* Output size bytes: N min, N max, N mean, N total -* Tasks per node: N min, N max, N mean; N nodes used -* Extra metrics: {'obj_store_mem_alloc': N, 'obj_store_mem_freed': N, \ -'obj_store_mem_peak': N} -""" - ) - else: - assert ( - stats - == """Stage N ReadRange->MapBatches(dummy_map_batches): N/N blocks executed in T + assert ( + stats + == """Stage N ReadRange->MapBatches(dummy_map_batches): N/N blocks executed in T * Remote wall time: T min, T max, T mean, T total * Remote cpu time: T min, T max, T mean, T total * Peak heap memory usage (MiB): N min, N max, N mean @@ -169,7 +154,7 @@ def test_dataset_stats_basic(ray_start_regular_shared, enable_auto_log_stats): * In user code: T * Total time: T """ - ) + ) else: assert ( stats From 7e3b1aa9a0248cde78ee3f1180e5cc17904437f9 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Sat, 25 Mar 2023 19:03:36 -0700 Subject: [PATCH 4/4] fix tests Signed-off-by: Eric Liang --- python/ray/data/tests/test_dataset_parquet.py | 67 ++++++++++--------- 1 file changed, 37 insertions(+), 30 deletions(-) diff --git a/python/ray/data/tests/test_dataset_parquet.py b/python/ray/data/tests/test_dataset_parquet.py index d1a1335d3953..0d98d792ca67 100644 --- a/python/ray/data/tests/test_dataset_parquet.py +++ b/python/ray/data/tests/test_dataset_parquet.py @@ -33,8 +33,15 @@ from pytest_lazyfixture import lazy_fixture -def check_num_computed(ds, expected) -> None: - assert ds._plan.execute()._num_computed() == expected +def check_num_computed(ds, expected, streaming_expected) -> None: + # When streaming executor is on, the _num_computed() is affected only + # by the ds.schema() which will still partial read the blocks, but will + # not affected by operations like take() as it's executed via streaming + # executor. + if not ray.data.context.DatasetContext.get_current().use_streaming_executor: + assert ds._plan.execute()._num_computed() == expected + else: + assert ds._plan.execute()._num_computed() == streaming_expected @pytest.mark.parametrize( @@ -128,11 +135,11 @@ def test_parquet_read_basic(ray_start_regular_shared, fs, data_path): ds = ray.data.read_parquet(data_path, filesystem=fs) # Test metadata-only parquet ops. - check_num_computed(ds, 0) + check_num_computed(ds, 0, 0) assert ds.count() == 6 assert ds.size_bytes() > 0 assert ds.schema() is not None - check_num_computed(ds, 1) + check_num_computed(ds, 1, 1) input_files = ds.input_files() assert len(input_files) == 2, input_files assert "test1.parquet" in str(input_files) @@ -145,11 +152,11 @@ def test_parquet_read_basic(ray_start_regular_shared, fs, data_path): repr(ds) == "Dataset(num_blocks=2, num_rows=6, " "schema={one: int64, two: string})" ), ds - check_num_computed(ds, 1) + check_num_computed(ds, 1, 1) # Forces a data read. values = [[s["one"], s["two"]] for s in ds.take_all()] - check_num_computed(ds, 2) + check_num_computed(ds, 2, 2) assert sorted(values) == [ [1, "a"], [2, "b"], @@ -207,7 +214,7 @@ def prefetch_file_metadata(self, pieces): ) # Expect to lazily compute all metadata correctly. - check_num_computed(ds, 0) + check_num_computed(ds, 0, 0) assert ds.count() == 6 assert ds.size_bytes() > 0 assert ds.schema() is not None @@ -223,11 +230,11 @@ def prefetch_file_metadata(self, pieces): repr(ds) == "Dataset(num_blocks=2, num_rows=6, " "schema={one: int64, two: string})" ), ds - check_num_computed(ds, 2) + check_num_computed(ds, 2, 2) # Forces a data read. values = [[s["one"], s["two"]] for s in ds.take()] - check_num_computed(ds, 2) + check_num_computed(ds, 2, 2) assert sorted(values) == [ [1, "a"], [2, "b"], @@ -284,7 +291,7 @@ def test_parquet_read_bulk(ray_start_regular_shared, fs, data_path): assert ds._meta_count() is None # Expect to lazily compute all metadata correctly. - check_num_computed(ds, 0) + check_num_computed(ds, 0, 0) assert ds.count() == 6 assert ds.size_bytes() > 0 assert ds.schema() is not None @@ -300,11 +307,11 @@ def test_parquet_read_bulk(ray_start_regular_shared, fs, data_path): repr(ds) == "Dataset(num_blocks=2, num_rows=6, " "schema={one: int64, two: string})" ), ds - check_num_computed(ds, 2) + check_num_computed(ds, 2, 2) # Forces a data read. values = [[s["one"], s["two"]] for s in ds.take()] - check_num_computed(ds, 2) + check_num_computed(ds, 2, 2) assert sorted(values) == [ [1, "a"], [2, "b"], @@ -325,7 +332,7 @@ def test_parquet_read_bulk(ray_start_regular_shared, fs, data_path): # Forces a data read. values = [[s["one"], s["two"]] for s in ds.take()] - check_num_computed(ds, 2) + check_num_computed(ds, 2, 0) assert sorted(values) == [ [1, "a"], [2, "b"], @@ -374,7 +381,7 @@ def test_parquet_read_bulk_meta_provider(ray_start_regular_shared, fs, data_path assert ds._meta_count() is None # Expect to lazily compute all metadata correctly. - check_num_computed(ds, 0) + check_num_computed(ds, 0, 0) assert ds.count() == 6 assert ds.size_bytes() > 0 assert ds.schema() is not None @@ -390,11 +397,11 @@ def test_parquet_read_bulk_meta_provider(ray_start_regular_shared, fs, data_path repr(ds) == "Dataset(num_blocks=2, num_rows=6, " "schema={one: int64, two: string})" ), ds - check_num_computed(ds, 2) + check_num_computed(ds, 2, 2) # Forces a data read. values = [[s["one"], s["two"]] for s in ds.take()] - check_num_computed(ds, 2) + check_num_computed(ds, 2, 2) assert sorted(values) == [ [1, "a"], [2, "b"], @@ -433,7 +440,7 @@ def test_parquet_read_partitioned(ray_start_regular_shared, fs, data_path): ds = ray.data.read_parquet(data_path, filesystem=fs) # Test metadata-only parquet ops. - check_num_computed(ds, 0) + check_num_computed(ds, 0, 0) assert ds.count() == 6 assert ds.size_bytes() > 0 assert ds.schema() is not None @@ -455,11 +462,11 @@ def test_parquet_read_partitioned(ray_start_regular_shared, fs, data_path): "one: dictionary}\n" ")" ), ds - check_num_computed(ds, 1) + check_num_computed(ds, 1, 1) # Forces a data read. values = [[s["one"], s["two"]] for s in ds.take()] - check_num_computed(ds, 2) + check_num_computed(ds, 2, 2) assert sorted(values) == [ [1, "a"], [1, "b"], @@ -491,7 +498,7 @@ def test_parquet_read_partitioned_with_filter(ray_start_regular_shared, tmp_path ) values = [[s["one"], s["two"]] for s in ds.take()] - check_num_computed(ds, 1) + check_num_computed(ds, 1, 0) assert sorted(values) == [[1, "a"], [1, "a"]] # 2 partitions, 1 empty partition, 2 block/read tasks, 1 empty block @@ -501,7 +508,7 @@ def test_parquet_read_partitioned_with_filter(ray_start_regular_shared, tmp_path ) values = [[s["one"], s["two"]] for s in ds.take()] - check_num_computed(ds, 2) + check_num_computed(ds, 2, 0) assert sorted(values) == [[1, "a"], [1, "a"]] @@ -525,7 +532,7 @@ def test_parquet_read_partitioned_explicit(ray_start_regular_shared, tmp_path): ) # Test metadata-only parquet ops. - check_num_computed(ds, 0) + check_num_computed(ds, 0, 0) assert ds.count() == 6 assert ds.size_bytes() > 0 assert ds.schema() is not None @@ -539,11 +546,11 @@ def test_parquet_read_partitioned_explicit(ray_start_regular_shared, tmp_path): repr(ds) == "Dataset(num_blocks=2, num_rows=6, " "schema={two: string, one: int32})" ), ds - check_num_computed(ds, 1) + check_num_computed(ds, 1, 1) # Forces a data read. values = [[s["one"], s["two"]] for s in ds.take()] - check_num_computed(ds, 2) + check_num_computed(ds, 2, 2) assert sorted(values) == [ [1, "a"], [1, "b"], @@ -572,7 +579,7 @@ def _block_udf(block: pa.Table): ds = ray.data.read_parquet(str(tmp_path), parallelism=1, _block_udf=_block_udf) ones, twos = zip(*[[s["one"], s["two"]] for s in ds.take()]) - check_num_computed(ds, 1) + check_num_computed(ds, 1, 0) np.testing.assert_array_equal(sorted(ones), np.array(one_data) + 1) # 2 blocks/read tasks @@ -580,7 +587,7 @@ def _block_udf(block: pa.Table): ds = ray.data.read_parquet(str(tmp_path), parallelism=2, _block_udf=_block_udf) ones, twos = zip(*[[s["one"], s["two"]] for s in ds.take()]) - check_num_computed(ds, 2) + check_num_computed(ds, 2, 0) np.testing.assert_array_equal(sorted(ones), np.array(one_data) + 1) # 2 blocks/read tasks, 1 empty block @@ -593,7 +600,7 @@ def _block_udf(block: pa.Table): ) ones, twos = zip(*[[s["one"], s["two"]] for s in ds.take()]) - check_num_computed(ds, 2) + check_num_computed(ds, 2, 0) np.testing.assert_array_equal(sorted(ones), np.array(one_data[:2]) + 1) @@ -623,17 +630,17 @@ def test_parquet_read_parallel_meta_fetch(ray_start_regular_shared, fs, data_pat ds = ray.data.read_parquet(data_path, filesystem=fs, parallelism=parallelism) # Test metadata-only parquet ops. - check_num_computed(ds, 0) + check_num_computed(ds, 0, 0) assert ds.count() == num_dfs * 3 assert ds.size_bytes() > 0 assert ds.schema() is not None input_files = ds.input_files() assert len(input_files) == num_dfs, input_files - check_num_computed(ds, 1) + check_num_computed(ds, 1, 1) # Forces a data read. values = [s["one"] for s in ds.take(limit=3 * num_dfs)] - check_num_computed(ds, parallelism) + check_num_computed(ds, parallelism, parallelism) assert sorted(values) == list(range(3 * num_dfs))