-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Emit state of replace and append for native batch tasks #12488
Conversation
docs/operations/metrics.md
Outdated
`true` | `false` | `APPEND`| | ||
`true` | `true ` | Invalid combination, exception thrown. | | ||
`false` | `false` | `OVERWRITE` (this is the default for native batch ingestion). | | ||
`false` | `true` | `REPLACE`| |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might want to add details here about the creation of tombstones for time-chunks within the date interval specified for ingestion that have no data, if in REPLACE mode, and the absence of this in OVERWRITE mode, if my understanding is correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, this is to document the batch ingestion mode not the semantics of the method so I am not sure that the explanation for its semantics belongs here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh ok, I guess the semantics are described elsewhere? If so should we link to that? not a big deal
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, in the documentation for the flag isDropExisting
. The actual names APPEND
, REPLACE
, and OVERWRITE
are not formally documented and I am introducing them here. But documenting them is our of scope for this ticket I feel.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added some explanation about what the modes mean.
emitter.emit(buildEvent("compact/overwrite/count", 1)); | ||
break; | ||
default: | ||
throw new ISE("Invalid compact ingestion mode [%s]", mode); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we just log error instead of possibly failing ingestion task with this thrown exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Integrating the BatchIngestionMode
(see comment below) made this not required anymore.
docs/operations/metrics.md
Outdated
|`isAppendToExisting` | `isdDropExisting` | mode | | ||
|---------------------|-------------------|------| | ||
`true` | `false` | `APPEND`| | ||
`true` | `true ` | Invalid combination, exception thrown. | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking at code, it doesnt seem that we throw an exception in this case, but rather treat it as APPEND, since we consider a job an append job if isAppendToExisting
is true. and short-circuit the check to isDropExisting
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes it does but it was confusing where. Due to this great comment I decided to fully integrate the concept of BatchIngestionMode
in the batch ingestion task internals replacing all the confusing tests for append or drop existing etc. There were many changes but they should straightforward to follow. The code is better now, IMO.
Since there are many different config flags that might be of interest, and these can change as devs add new config settings. Have you considered extending the |
@suneet-s No I have not considered |
docs/operations/metrics.md
Outdated
of the `isAppendToExisting` and `isDropExisting` flags in the | ||
task's `IOConfig` as follows: | ||
|
||
|`isAppendToExisting` | `isdDropExisting` | mode | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|`isAppendToExisting` | `isdDropExisting` | mode | | |
|`isAppendToExisting` | `isDropExisting` | mode | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fix in next commit
docs/operations/metrics.md
Outdated
|
||
|Metric|Description|Dimensions|Normal Value| | ||
|------|-----------|----------|------------| | ||
|`ingest/compact/ovewrite/count`|Count of `1` every time a compaction overwrite job runs|dataSource, taskId, taskType|Always `1`.| |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Won't taskType always be compact
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taskType
yeah for compact is compact but for other batch can be other...so for completeness & consistency it is better to live that dimension there I think
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed task type from metric.
docs/operations/metrics.md
Outdated
@@ -221,6 +221,41 @@ Note: If the JVM does not support CPU time measurement for the current thread, i | |||
|`worker/taskSlot/total/count`|Number of total task slots on the reporting worker per emission period. This metric is only available if the WorkerTaskCountStatsMonitor module is included.|category, version.|Varies.| | |||
|`worker/taskSlot/used/count`|Number of busy task slots on the reporting worker per emission period. This metric is only available if the WorkerTaskCountStatsMonitor module is included.|category, version.|Varies.| | |||
|
|||
## Batch ingestion metrics (Native parallel task) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did we decide to have a separate metric name for each type vs a single metric name with the type being a dimension?
ingest/batch/count
, ingest/batch/segments/count
and a dimension type
(or even 2 dimensions for isAppendToExisting
and isDropExisting
- in this case, you don't need to deal with the error case described on line 243)
`true` | `true ` | Invalid combination, exception thrown. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In a similar vein, won't these metrics be useful for streaming ingestion as well - Why not make the metric name ingest/count
and then the taskType
dimension tells you whether or not it is streaming or batch?
Also, reading these docs, it's not clear to me whether a compaction job will also be counted as a batch job
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The goal of this feature is to gather data to let us know the utilization of the various modes of batch ingest, in particular REPLACE
thus the categorization.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The invalid combination is enabled by the fact that the IOConfig
part of the spec exposes both dropExisting
and appendToExisting
today. If we were to deprecate those and expose instead batchProcessingMode
with its three possible values then this invalid state would no longer be possible. But I think that it is better to do this later if we agree it is a good idea.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The taskType
is already a dimension (i.e. for native batch types we have: compact
, index
, index_parallel
and for streaming index_kafka
, etc; for non native index_hadoop
...) so yeah...maybe I should just drop the batch
(and compact
) from the metric name and just use the indexType
dimension to filter...is this what you are thinking @suneet-s ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with @suneet-s suggestion to drop the batch and compact from metric name, as the task-type dimension included signifies this, and then the metric can be reused for other ingestion types, not just batch / compact.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I agree too. Working on it now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In a similar vein, won't these metrics be useful for streaming ingestion as well - Why not make the metric name ingest/count and then the taskType dimension tells you whether or not it is streaming or batch?
I had suggested a batch-specific name when discussing with Agustin since I think the semantics of the metric are batch specific, it's being emitted once per ingest spec posted, and I'm not sure what the equivalent for streaming would be. It's a metric indicating "number of times user did some action with a specific intent" vs. "detail of execution" (like a parallel subtask count for batch or number of streaming subtasks). Taking it out for a shared metric name seems fine to me though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@suneet-s I have removed the batch type from the metric and added taskIngestionMode
as a dimension. As a result the new metrics are much clean and the code is cleaner as well. Also, now it was simple to add the metrics to streaming as well, take a look.
docs/operations/metrics.md
Outdated
|`ingest/compact/ovewrite/count`|Count of `1` every time a compaction overwrite job runs|dataSource, taskId, taskType|Always `1`.| | ||
|`ingest/compact/replace/count`|Count of `1` every time a compaction replace job runs|dataSource, taskId, taskType|Always `1`.| | ||
|
||
|`isDropExisting` | mode | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this table have a description? Or maybe I missed it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zachjsh Simplified & cleaned up docs... see PR description or the metrics.md
changes
…nsion. Move IngestionMode to AbstractTask to facilitate having mode as a dimension. Add metrics to streaming. Add missing coverage.
This pull request introduces 2 alerts when merging 1622b74 into 3e8d7a6 - view on LGTM.com new alerts:
|
…ation of IngestionMode to make it more robust to null IOConfig and fix test.
This pull request introduces 1 alert when merging c5f139f into 3e8d7a6 - view on LGTM.com new alerts:
|
I don't understand the LGTM concern about the "new publishedSegmentsAndCommitMetadata". That variable existed already before I added the metric. Maybe it is complaining because the new code explicitly check that the variable is not null? |
This pull request introduces 1 alert when merging 8ba3f60 into 3e8d7a6 - view on LGTM.com new alerts:
|
isAppendToExisting = BatchIOConfig.DEFAULT_APPEND_EXISTING; | ||
isDropExisting = BatchIOConfig.DEFAULT_DROP_EXISTING; | ||
} else { | ||
isAppendToExisting = BatchIOConfig.DEFAULT_APPEND_EXISTING; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How come this uses the default from BatchIOConfig instead of using ioConfig.isAppendToExisting()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is specific to CompactionIOCOnfig
which does not have a isAppendToExisting()
flag/method. Added a comment to the code so this is more clear.
return ingestionMode; | ||
} | ||
|
||
protected static IngestionMode computeIngestionMode(@Nullable CompactionIOConfig ioConfig) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest distinguishing these two computeIngestionMode
methods with different names since they both have nullable parameters
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call. Done in next commit.
This pull request introduces 1 alert when merging 2945222 into c236227 - view on LGTM.com new alerts:
|
…or if they are passed null. Other minor cleanup.
This pull request introduces 1 alert when merging 59aa6d7 into c236227 - view on LGTM.com new alerts:
|
A native batch ingestion job can be in either of these three modes:
REPLACE
(new since tombstones were added),APPEND
, andREPLACE_LEGACY
(current default for native batch ingestion). These modes are set with appropriate values for the followingIOConfig
flags:appendToExisting
anddropExisting
. The goal of this ticket is to emit Druid metrics to be able to assess the utilization of the different flavors of batch ingestion, in particular replace & tombstone creation.Copying from the
metrics.md
documentation:ingest/count
1
every time an ingestion job runs (includes compaction jobs). Aggregate using dimensions.1
.ingest/segments/count
1
.ingest/tombstones/count
The
taskIngestionMode
dimension includes the following modes:APPEND
,REPLACE_LEGACY
, andREPLACE
. TheAPPEND
mode indicates a nativeingestion job that is appending to existing segments;
REPLACE
a native ingestionjob replacing existing segments using tombstones;
and
REPLACE_LEGACY
the original replace before tombstones.The mode is decided using the values
of the
isAppendToExisting
andisDropExisting
flags in thetask's
IOConfig
as follows:isAppendToExisting
isDropExisting
true
false
APPEND
true
true
false
false
REPLACE_LEGACY
(this is the default for native batch ingestion).false
true
REPLACE
This PR has: