-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Foundation for the Druid metadata catalog #12647
Conversation
This pull request introduces 1 alert when merging 44ad179 into afaea25 - view on LGTM.com new alerts:
|
Provides the DB and REST layer, but not yet the integration with the Calcite SQL layer.
6374b80
to
27614b9
Compare
Will continue this in a private branch. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Initial review focused on CatalogResource, TableSpec, and its subclasses DatasourceSpec + InputTableSpec. Those seem like the central classes, so I wanted to focus on them.
I haven't looked at most of the classes in detail yet beyond the ones I mentioned above, but, the general structure of things does look good to me.
@JsonProperty("rollupGranularity") String rollupGranularity, | ||
@JsonProperty("targetSegmentRows") int targetSegmentRows, | ||
@JsonProperty("enableAutoCompaction") boolean enableAutoCompaction, | ||
@JsonProperty("autoCompactionDelay") String autoCompactionDelay, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this meant to be a replacement or alternative for (certain parts of) DataSourceCompactionConfig?
How will we reconcile what's here with any existing DataSourceCompactionConfig?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is still preliminary: just working out the storage and REST layers at the moment. For this one, the thought is if there is a compaction spec, that takes precedence. If the spec exists, but leaves out this property, this is the value we use. If the spec doesn't exist, this info takes over (for the simple case of direct compaction.) Details TBD when we get that far.
|
||
public DatasourceSpec( | ||
@JsonProperty("segmentGranularity") String segmentGranularity, | ||
@JsonProperty("rollupGranularity") String rollupGranularity, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally we call this queryGranularity
. But! What do you think about getting rid of it? I've started to think that it makes more sense to represent this as an explicit TIME_FLOOR
function call rather than a table property.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good points. I was always confused by the "query" granularity: it has little to do with a query: it is the ingest/compaction time granularity. Hence my attempt to sneak in a different name. In fact, I could imagine having a "true" query granularity as s separate field: we decided to apply rollup of 1m, but used to have 1s. To get consistent results, use a query-time granularity of 1m, even for the older 1s segments. But, that's just a whim, not implemented here (because it would take additional query work.)
As to the usage: I'm aware of the discussion. The other half of the argument is that other dimensions might be similarly trimmed. Geo data might be rounded to a city level. Sales data rounded from (store, cashier, lane) to just (store). In this case, time is similarly rounded.
Since this is a prototype, the thought was that time is special: it is what enables other forms of compression. If the rollup granularity is missing, Druid stores data at the detail level, even if I do the other dimension rounding. Only by making a rollup grain of some actual time (even 1ms), do I get rollup. So, time is special.
The other approach, when we work out the details of dimensions and measures, is to drop this field, add a "rollupEnabled" field, and require the user to specify the grain via a TIME_FLOOR
attached to the __time
column, in parallel with the SUM(LONG)
attached to a measure.
@JsonProperty("targetSegmentRows") int targetSegmentRows, | ||
@JsonProperty("enableAutoCompaction") boolean enableAutoCompaction, | ||
@JsonProperty("autoCompactionDelay") String autoCompactionDelay, | ||
@JsonProperty("properties") Map<String, Object> properties, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When is something a property vs. a top-level, named thing? Do you have any properties in mind?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Properties are "extensions" things provided by other than Druid itself. For example, I might want to track if the column contains PII. Or, might want to track the input source that defined the column. Or, might want to add info about the kind of UI widget to use to display it. Rather than creating my own parallel schema for such use cases, I just add a custom property. We might define a naming convention "com.foo.is-pii", or "org.whiz.lineage.input-source". Druid doesn't understand them, but the external tool (or user) does.
public MeasureSpec( | ||
@JsonProperty("name") String name, | ||
@JsonProperty("sqlType") String sqlType, | ||
@JsonProperty("aggregateFn") String aggregateFn |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What kind of string is meant to be in here? How are we going to interpret it?
I'm especially interested in this because I've been contemplating recently what's the best way to write an INSERT or REPLACE into a rollup table.
/** | ||
* Helper functions for the catalog REST API actions. | ||
*/ | ||
public class Actions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you thinking this class would be useful for other server APIs in the future? It seems written in such a way that it would be.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. I've already written variations of these several times here and there in Druid. Making it more general is left as a later exercise to minimize the size of this PR.
|
||
public InputTableSpec( | ||
@JsonProperty("inputSource") InputSource inputSource, | ||
@JsonProperty("format") InputFormat format, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd go with inputSource
+ inputFormat
, for rhyming purposes, & because it's what indexing tasks and ExternalDataSource do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. I was kind of leaning toward the Go pattern: terseness. The only "format" it could possibly be is the "input format". Same is true for "input source", but I didn't spend time to simplify it yet. Same reason the other fields are not tableColumns
and userDefinedProperties
. Changed it to inputFormat
for now; we can clean it up in a renaming pass later.
} | ||
|
||
@JsonProperty("inputSource") | ||
public InputSource inputSource() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will there be a way to parameterize the source somehow at runtime? I feel that a prime use case for input tables is going to be incremental ingestion, meaning the input source will change from statement to statement.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Absolutely; that is the next bit of work to be done. Having fun with the old "what SQL syntax can we use for this non-standard concept" game. Current thought is a Calcite macro, something like:
SELECT *
FROM TABLE(INPUT(myInputTable, files = "foo.csv, bar.csv"))
With the bits and pieces adjusted to fit Calcite's existing constraints. Would love to mimic Snowflake:
SELECT *
FROM myInputTable(files = "foo.csv, bar.csv")
But Calcite has already grabbed that syntax to specify columns:
FROM myInputTable(a VARCHAR, b INT, c DOUBLE)
Antway, this is work in progress, details to be discussed separately.
table.dbSchema(), | ||
table.name())); | ||
} else { | ||
return Actions.okWithVersion(0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this return the current version of the already-existing table?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It could, but that would take an extra DB read. This feature mimics the SQL CREATE TABLE myTempTable IF NOT EXISTS
use case where all we want is to not fail if we've already done this step; we typically don't then change the definition.
If we do want to change anything, we've got to read the existing values, which would provide the version. By not providing the version here, we save a DB read internally, since the failed SQL INSERT
didn't fetch it.
@PathParam("dbSchema") String dbSchema, | ||
@PathParam("name") String name, | ||
TableSpec spec, | ||
@QueryParam("version") long version, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The version thing is cool. I'm a fan of this sort of thing in CRUD APIs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Killed three birds with one stone: an update time, and a free way to do optimistic locking for those who are into such things. Also helps keep the remote cache in sync. It will be foiled by those who make more than one change per ms, but I suspect that will happen rarely. If it does, the cheat is to sleep for 1ms to bump the number. To be bullet-proof, there needs to be some prevention of moving backwards if the auto clock sync decides our system clock is moving fast and sets it back. We'll fine-tune that later, once the basics are seen to work.
@JsonProperty("enableAutoCompaction") boolean enableAutoCompaction, | ||
@JsonProperty("autoCompactionDelay") String autoCompactionDelay, | ||
@JsonProperty("properties") Map<String, Object> properties, | ||
@JsonProperty("columns") List<DatasourceColumnSpec> columns |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this column list meant to be authoritative, or partial?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Partial: only those for which the user wants to provide info beyond what Druid already knows. Examples:
- Add a new column, not yet in any datasource, to use in ingestion.
- Column is ingested with multiple types, pick one as the preferred type.
- Column exists, but no longer needed. Mark it as hidden.
- Add a comment to explain the column.
@gianm, thanks for the review. Your many questions illustrate why I pulled this PR back. On the one hand I want early feedback before I build on this foundation (so thanks for the comments!), but, on the other hand, until later work is done, objects and properties are preliminary placeholders and subject to change. So, it might be a bit early to promote the work to prime time. |
The Druid catalog provides a collection of metadata "hints" about tables (datasources, input sources, views, etc.) within Druid. This PR provides the foundation: the DB and REST layer, but not yet the integration with the Calcite SQL layer.
The DB layer extends what is done for other Druid metadata tables. The semantic ("business logic") layer provides the usual CRUD operations on tables, as well as operations to sync metadata between the Coordinator and Broker. A synchronization layer handles the Coordinator/Broker sync: the Broker polls for the information it does not yet have: the Coordinator pushes updates to known Brokers.
The entire design is pretty standard and follows Druid patterns. The key difference is the rather extreme lengths taken by the implementation to ensure each bit is easily testable without mocks. That means many interfaces which can be implemented in multiple ways.
While the entire catalog mechanism is present in this PR, the Guice configuration is not yet enabled, meaning that the catalog is not yet enabled. This project has created, or depends on, multiple in-flight PRs and it is becoming a bit complex to combine them all in a private branch. This is one of several PRs that provide slices of catalog work.
We'll want to create integration tests when we enable the feature, and that work is waiting for the new IT PR to be merged.
The next step in the catalog work is to integrate the catalog with the Druid planner. For that, we'll need the planner test framework to be merged.
This code will likely evolve as we work on the SQL layer. Some of that work has already been done in a private branch and suggests that the present code is pretty much on the right track: we'll just expand the table and column definitions as needed.
This is a great opportunity for reviewers to provide guidance on the basic catalog mechanism before we start building SQL integration on top.
This PR has: