-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add ingest/input/bytes metric and Kafka consumer metrics. #14582
Conversation
New metrics: 1) ingest/input/bytes. Equivalent to processedBytes in the task reports. 2) kafka/consumer/bytesConsumed: Equivalent to the Kafka consumer metric "bytes-consumed-total". Only emitted for Kafka tasks. 3) kafka/consumer/recordsConsumed: Equivalent to the Kafka consumer metric "records-consumed-total". Only emitted for Kafka tasks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤘
private static boolean isTopicMetric(final MetricName metricName) | ||
{ | ||
// Certain metrics are emitted both as grand totals and broken down by topic; we want to ignore the grand total and | ||
// only look at the per-topic metrics. See https://kafka.apache.org/documentation/#new_consumer_fetch_monitoring. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this doc link didn't take me to any particular section, is this supposed to point to https://kafka.apache.org/documentation/#consumer_fetch_monitoring
since we only talk to one topic i assume there shouldn't be any difference between the grand total and per topic metrics?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, yes, I'm not sure where I got the new_
from. Fixed.
since we only talk to one topic i assume there shouldn't be any difference between the grand total and per topic metrics?
I assume so too, although I did it this way to future-proof against a possible future scenario where we support reading from multiple topics. It's also nice to have topic
as an attribute for the metric.
if (toolbox.getMonitorScheduler() != null) { | ||
final Monitor monitor = recordSupplier.monitor(); | ||
if (monitor != null) { | ||
toolbox.getMonitorScheduler().addMonitor(monitor); | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of registering it here, this should really be the job of the newTaskRecordSupplier()
call. How about we change that method to take TaskToolbox
and add a default implementation of that which delegates to the no-argument call.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds fine. I'm assuming the goal of the default implementation would be to generally switch to newTaskRecordSupplier(TaskToolbox)
, but maintain compatibility with existing third-party extensions. So to do that, I updated the patch with these changes:
- Deprecate
newTaskRecordSupplier()
, but keep it around to avoid breaking extenstions. - Add
newTaskRecordSupplier(TaskToolbox)
and switch callers and builtin extensions to use that.
LMK if this matches what you had in mind.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
requesting change with this account to see if it shows up?
for (final Map.Entry<MetricName, ? extends Metric> entry : consumer.metrics().entrySet()) { | ||
final MetricName metricName = entry.getKey(); | ||
|
||
if (METRICS.containsKey(metricName.name()) && isTopicMetric(metricName)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will be useful to have task id as a dimension as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taskId should already be on the serviceEmitter
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is indeed on the emitter itself.
*/ | ||
protected RecordSupplier<PartitionIdType, SequenceOffsetType, RecordType> newTaskRecordSupplier(final TaskToolbox toolbox) | ||
{ | ||
return newTaskRecordSupplier(); |
Check notice
Code scanning / CodeQL
Deprecated method or constructor invocation
docs/operations/metrics.md
Outdated
|`ingest/events/unparseable`|Number of events rejected because the events are unparseable.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|0| | ||
|`ingest/events/thrownAway`|Number of events rejected because they are either null, or filtered by the transform spec, or outside the windowPeriod.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|0| |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|`ingest/events/thrownAway`|Number of events rejected because they are either null, or filtered by the transform spec, or outside the windowPeriod.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|0| | |
|`ingest/events/thrownAway`|Number of events rejected because they are either null, filtered by the transform spec, or outside the window period.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|0| |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Replaced with the specific names of configuration parameters.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you prefer to keep the original style, windowPeriod should be in code font. For example:
Number of events rejected because they are either null, filtered by the transform spec, or outside the `windowPeriod` parameter.
docs/operations/metrics.md
Outdated
|`ingest/events/duplicate`|Number of events rejected because the events are duplicated.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|0| | ||
|`ingest/events/processed`|Number of events successfully processed per emission period.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Equal to the number of events per emission period.| | ||
|`ingest/input/bytes`|Number of bytes read from input sources, after decompression but prior to parsing. This covers all data read, including data that does not end up being fully processed and ingested. For example, this includes data that ends up being rejected for being unparseable or filtered out.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Depends on amount of data read.| |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|`ingest/input/bytes`|Number of bytes read from input sources, after decompression but prior to parsing. This covers all data read, including data that does not end up being fully processed and ingested. For example, this includes data that ends up being rejected for being unparseable or filtered out.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Depends on amount of data read.| | |
|`ingest/input/bytes`|Number of bytes read from input sources, after decompression but prior to parsing. This covers all data read, including data that does not end up being fully processed and ingested. For example, this includes data that ends up being rejected for being unparseable or filtered out.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Depends on the amount of data read.| |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't seem consistent about these: some others do both "depends on X" and "depends on the X". Anyway, I'll change it, since "depends on the X" seems slightly more common.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Once this PR is merged, I can go through metrics.md
and update the rest of the document to make it consistent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed the documentation portion of the PR. A couple of minor nits. Otherwise, looks good.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just one minor suggestion, overall LGTM 👍
...-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
Outdated
Show resolved
Hide resolved
…blestream/SeekableStreamIndexTask.java Co-authored-by: Benedict Jin <[email protected]>
* Add ingest/input/bytes metric and Kafka consumer metrics. New metrics: 1) ingest/input/bytes. Equivalent to processedBytes in the task reports. 2) kafka/consumer/bytesConsumed: Equivalent to the Kafka consumer metric "bytes-consumed-total". Only emitted for Kafka tasks. 3) kafka/consumer/recordsConsumed: Equivalent to the Kafka consumer metric "records-consumed-total". Only emitted for Kafka tasks. * Fix anchor. * Fix KafkaConsumerMonitor. * Interface updates. * Doc changes. * Update indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java Co-authored-by: Benedict Jin <[email protected]> --------- Co-authored-by: Benedict Jin <[email protected]>
New metrics:
ingest/input/bytes
. Equivalent to processedBytes in the task reports.kafka/consumer/bytesConsumed
: Equivalent to the Kafka consumermetric "bytes-consumed-total". Only emitted for Kafka tasks.
kafka/consumer/recordsConsumed
: Equivalent to the Kafka consumermetric "records-consumed-total". Only emitted for Kafka tasks.