Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ingest/input/bytes metric and Kafka consumer metrics. #14582

Merged
merged 6 commits into from
Jul 20, 2023

Conversation

gianm
Copy link
Contributor

@gianm gianm commented Jul 13, 2023

New metrics:

  1. ingest/input/bytes. Equivalent to processedBytes in the task reports.

  2. kafka/consumer/bytesConsumed: Equivalent to the Kafka consumer
    metric "bytes-consumed-total". Only emitted for Kafka tasks.

  3. kafka/consumer/recordsConsumed: Equivalent to the Kafka consumer
    metric "records-consumed-total". Only emitted for Kafka tasks.

New metrics:

1) ingest/input/bytes. Equivalent to processedBytes in the task reports.

2) kafka/consumer/bytesConsumed: Equivalent to the Kafka consumer
   metric "bytes-consumed-total". Only emitted for Kafka tasks.

3) kafka/consumer/recordsConsumed: Equivalent to the Kafka consumer
   metric "records-consumed-total". Only emitted for Kafka tasks.
Copy link
Member

@clintropolis clintropolis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤘

private static boolean isTopicMetric(final MetricName metricName)
{
// Certain metrics are emitted both as grand totals and broken down by topic; we want to ignore the grand total and
// only look at the per-topic metrics. See https://kafka.apache.org/documentation/#new_consumer_fetch_monitoring.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this doc link didn't take me to any particular section, is this supposed to point to https://kafka.apache.org/documentation/#consumer_fetch_monitoring

since we only talk to one topic i assume there shouldn't be any difference between the grand total and per topic metrics?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yes, I'm not sure where I got the new_ from. Fixed.

since we only talk to one topic i assume there shouldn't be any difference between the grand total and per topic metrics?

I assume so too, although I did it this way to future-proof against a possible future scenario where we support reading from multiple topics. It's also nice to have topic as an attribute for the metric.

Comment on lines 422 to 428
if (toolbox.getMonitorScheduler() != null) {
final Monitor monitor = recordSupplier.monitor();
if (monitor != null) {
toolbox.getMonitorScheduler().addMonitor(monitor);
}
}

Copy link
Contributor

@imply-cheddar imply-cheddar Jul 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of registering it here, this should really be the job of the newTaskRecordSupplier() call. How about we change that method to take TaskToolbox and add a default implementation of that which delegates to the no-argument call.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds fine. I'm assuming the goal of the default implementation would be to generally switch to newTaskRecordSupplier(TaskToolbox), but maintain compatibility with existing third-party extensions. So to do that, I updated the patch with these changes:

  • Deprecate newTaskRecordSupplier(), but keep it around to avoid breaking extenstions.
  • Add newTaskRecordSupplier(TaskToolbox) and switch callers and builtin extensions to use that.

LMK if this matches what you had in mind.

Copy link
Contributor

@cheddar cheddar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

requesting change with this account to see if it shows up?

for (final Map.Entry<MetricName, ? extends Metric> entry : consumer.metrics().entrySet()) {
final MetricName metricName = entry.getKey();

if (METRICS.containsKey(metricName.name()) && isTopicMetric(metricName)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be useful to have task id as a dimension as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

taskId should already be on the serviceEmitter

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is indeed on the emitter itself.

*/
protected RecordSupplier<PartitionIdType, SequenceOffsetType, RecordType> newTaskRecordSupplier(final TaskToolbox toolbox)
{
return newTaskRecordSupplier();

Check notice

Code scanning / CodeQL

Deprecated method or constructor invocation

Invoking [SeekableStreamIndexTask.newTaskRecordSupplier](1) should be avoided because it has been deprecated.
|`ingest/events/unparseable`|Number of events rejected because the events are unparseable.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|0|
|`ingest/events/thrownAway`|Number of events rejected because they are either null, or filtered by the transform spec, or outside the windowPeriod.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|0|
Copy link
Contributor

@ektravel ektravel Jul 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
|`ingest/events/thrownAway`|Number of events rejected because they are either null, or filtered by the transform spec, or outside the windowPeriod.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|0|
|`ingest/events/thrownAway`|Number of events rejected because they are either null, filtered by the transform spec, or outside the window period.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|0|

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replaced with the specific names of configuration parameters.

Copy link
Contributor

@ektravel ektravel Jul 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you prefer to keep the original style, windowPeriod should be in code font. For example:

Number of events rejected because they are either null, filtered by the transform spec, or outside the `windowPeriod` parameter.

|`ingest/events/duplicate`|Number of events rejected because the events are duplicated.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|0|
|`ingest/events/processed`|Number of events successfully processed per emission period.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Equal to the number of events per emission period.|
|`ingest/input/bytes`|Number of bytes read from input sources, after decompression but prior to parsing. This covers all data read, including data that does not end up being fully processed and ingested. For example, this includes data that ends up being rejected for being unparseable or filtered out.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Depends on amount of data read.|
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
|`ingest/input/bytes`|Number of bytes read from input sources, after decompression but prior to parsing. This covers all data read, including data that does not end up being fully processed and ingested. For example, this includes data that ends up being rejected for being unparseable or filtered out.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Depends on amount of data read.|
|`ingest/input/bytes`|Number of bytes read from input sources, after decompression but prior to parsing. This covers all data read, including data that does not end up being fully processed and ingested. For example, this includes data that ends up being rejected for being unparseable or filtered out.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Depends on the amount of data read.|

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't seem consistent about these: some others do both "depends on X" and "depends on the X". Anyway, I'll change it, since "depends on the X" seems slightly more common.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once this PR is merged, I can go through metrics.md and update the rest of the document to make it consistent.

Copy link
Contributor

@ektravel ektravel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed the documentation portion of the PR. A couple of minor nits. Otherwise, looks good.

Copy link
Member

@asdf2014 asdf2014 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just one minor suggestion, overall LGTM 👍

…blestream/SeekableStreamIndexTask.java

Co-authored-by: Benedict Jin <[email protected]>
@asdf2014 asdf2014 merged commit bac5ef3 into apache:master Jul 20, 2023
sergioferragut pushed a commit to sergioferragut/druid that referenced this pull request Jul 21, 2023
* Add ingest/input/bytes metric and Kafka consumer metrics.

New metrics:

1) ingest/input/bytes. Equivalent to processedBytes in the task reports.

2) kafka/consumer/bytesConsumed: Equivalent to the Kafka consumer
   metric "bytes-consumed-total". Only emitted for Kafka tasks.

3) kafka/consumer/recordsConsumed: Equivalent to the Kafka consumer
   metric "records-consumed-total". Only emitted for Kafka tasks.

* Fix anchor.

* Fix KafkaConsumerMonitor.

* Interface updates.

* Doc changes.

* Update indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java

Co-authored-by: Benedict Jin <[email protected]>

---------

Co-authored-by: Benedict Jin <[email protected]>
@LakshSingla LakshSingla added this to the 28.0 milestone Oct 12, 2023
@kfaraz kfaraz mentioned this pull request Jan 15, 2025
10 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants