Skip to content

Commit

Permalink
add more iteration and file reading metrics (#10)
Browse files Browse the repository at this point in the history
<!-- Thank you for your contribution! Please review
https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before
opening a pull request. -->

<!-- Please add a reviewer to the assignee section when you create a PR.
If you don't have the access to it, we will shortly find a reviewer and
assign them to your PR. -->

## Why are these changes needed?

<!-- Please give a short summary of the change and the problem this
solves. -->

## Related issue number

<!-- For example: "Closes ray-project#1234" -->

## Checks

- [ ] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [ ] I've run `scripts/format.sh` to lint the changes in this PR.
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [ ] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(
  • Loading branch information
votrou authored Jul 10, 2024
1 parent f12e5b4 commit 6e2f3cd
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 2 deletions.
84 changes: 83 additions & 1 deletion python/ray/data/_internal/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,72 @@ def __init__(self, max_stats=1000):
tag_keys=iter_tag_keys,
)

self.iter_total_s = Gauge(
"data_iter_total_seconds",
description="Total time spent in iteration",
tag_keys=iter_tag_keys,
)
self.iter_wait_s = Gauge(
"data_iter_wait_seconds",
description="Seconds spent in ray.wait()",
tag_keys=iter_tag_keys,
)
self.iter_get_s = Gauge(
"data_iter_get_seconds",
description="Seconds spent in ray.get()",
tag_keys=iter_tag_keys,
)
self.iter_next_batch_s = Gauge(
"data_iter_next_batch_seconds",
description="Seconds spent getting next batch", # Need a better description for this?
tag_keys=iter_tag_keys,
)
self.iter_format_batch_s = Gauge(
"data_iter_format_batch_seconds",
description="Seconds spent formatting batch",
tag_keys=iter_tag_keys,
)
self.iter_collate_batch_s = Gauge(
"data_iter_collate_batch_seconds",
description="Seconds spent applying collate function to batch",
tag_keys=iter_tag_keys,
)
self.iter_finalize_batch_s = Gauge(
"data_iter_finalize_batch_seconds",
description="Seconds spent applying finalize function to batch",
tag_keys=iter_tag_keys,
)

self.iter_blocks_local = Gauge(
"data_iter_blocks_local",
description="Number of blocks in local node",
tag_keys=iter_tag_keys,
)
self.iter_blocks_remote = Gauge(
"data_iter_blocks_remote",
description="Number of blocks in remote nodes",
tag_keys=iter_tag_keys,
)

self.iter_blocks_unknown = Gauge(
"data_iter_blocks_unknown",
description="Number of blocks with unknown location",
tag_keys=iter_tag_keys,
)

self.streaming_split_coordinator_s = Gauge(
"data_iter_streaming_split_coordinator_seconds",
description="Seconds spent in the coordinator actor to distribute blocks",
tag_keys=iter_tag_keys,
)


self.streaming_exec_schedule_s = Gauge(
"data_streaming_exec_schedule_seconds",
description="Seconds spent streaming executor scheduling",
tag_keys=iter_tag_keys,
)

def _create_prometheus_metrics_for_execution_metrics(
self, metrics_group: str, tag_keys: Tuple[str, ...]
) -> Dict[str, Gauge]:
Expand Down Expand Up @@ -374,6 +440,14 @@ def update_iteration_metrics(
self.iter_total_blocked_s.set(stats.iter_total_blocked_s.get(), tags)
self.iter_user_s.set(stats.iter_user_s.get(), tags)
self.iter_initialize_s.set(stats.iter_initialize_s.get(), tags)
self.iter_total_s.set(stats.iter_total_s.get(), tags)
self.iter_wait_s.set(stats.iter_wait_s.get(), tags)
self.iter_get_s.set(stats.iter_get_s.get(), tags)
self.iter_next_batch_s.set(stats.iter_next_batch_s.get(), tags)
self.iter_format_batch_s.set(stats.iter_format_batch_s.get(), tags)
self.iter_collate_batch_s.set(stats.iter_collate_batch_s.get(), tags)
self.iter_finalize_batch_s.set(stats.iter_finalize_batch_s.get(), tags)
self.streaming_split_coordinator_s.set(stats.streaming_split_coordinator_s.get(), tags)

def clear_execution_metrics(self, dataset_tag: str, operator_tags: List[str]):
for operator_tag in operator_tags:
Expand Down Expand Up @@ -407,6 +481,14 @@ def clear_iteration_metrics(self, dataset_tag: str):
self.iter_total_blocked_s.set(0, tags)
self.iter_user_s.set(0, tags)
self.iter_initialize_s.set(0, tags)
self.iter_total_s.set(0, tags)
self.iter_wait_s.set(0, tags)
self.iter_get_s.set(0, tags)
self.iter_next_batch_s.set(0, tags)
self.iter_format_batch_s.set(0, tags)
self.iter_collate_batch_s.set(0, tags)
self.iter_finalize_batch_s.set(0, tags)
self.streaming_split_coordinator_s.set(0, tags)

def register_dataset(self, dataset_tag: str, operator_tags: List[str]):
self.datasets[dataset_tag] = {
Expand Down Expand Up @@ -1460,4 +1542,4 @@ def __repr__(self, level=0) -> str:
f"{indent} user_time={fmt(self.user_time.get()) or None},\n"
f"{indent} total_time={fmt(self.total_time.get()) or None},\n"
f"{indent})"
)
)
37 changes: 36 additions & 1 deletion python/ray/data/datasource/parquet_base_datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
if TYPE_CHECKING:
import pyarrow

from ray.util.metrics import Histogram
from ray.serve._private.constants import DEFAULT_LATENCY_BUCKET_MS
import time

logger = logging.getLogger(__name__)

Expand All @@ -29,6 +32,21 @@ def __init__(
read_table_args = {}

self.read_table_args = read_table_args
self.null_count_metric = Histogram(
"data_read_null_percentage",
boundaries=[0.00001, 1],
description=("Coverage of input data (percentage of null to filled values in columns)"),
)
self.pyarrow_table_size_metric = Histogram(
"data_pyarrow_decompressed_size",
boundaries=DEFAULT_LATENCY_BUCKET_MS, # Even though this is ms, will be reused to give buckets for MB
description=("Size of the output PyArrow table after being downloaded from S3 and deserialized"),
)
self.read_file_latency = Histogram(
"data_pyarrow_read_table_latency",
boundaries=DEFAULT_LATENCY_BUCKET_MS,
description=("Latency to read input file and convert to PyArrow format"),
)

def get_name(self):
"""Return a human-readable name for this datasource.
Expand All @@ -41,7 +59,14 @@ def _read_stream(self, f: "pyarrow.NativeFile", path: str):
import pyarrow.parquet as pq

use_threads = self.read_table_args.pop("use_threads", False)
yield pq.read_table(f, use_threads=use_threads, **self.read_table_args)
start = time.time()
table = pq.read_table(f, use_threads=use_threads, **self.read_table_args)
stop = time.time()
self.read_file_latency.observe((stop - start) * 1000)
if table and table.nbytes:
self.pyarrow_table_size_metric.observe(table.nbytes / 1024 / 1024) # Measured in MB
self.calculate_nulls(table)
yield table

def _open_input_source(
self,
Expand All @@ -51,3 +76,13 @@ def _open_input_source(
) -> "pyarrow.NativeFile":
# Parquet requires `open_input_file` due to random access reads
return filesystem.open_input_file(path, **open_args)

def calculate_nulls(self, table):
total = 0
null = 0
for column in table.columns:
null += column.null_count
total += len(column)

percent = (total - null) / total
self.null_count_metric.observe(percent)

0 comments on commit 6e2f3cd

Please sign in to comment.