-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
druid nested data column type #12753
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've reviewed this in private, seen it active and work in the wild, so am throwing my +1 on it. Tests must pass, the description should be fleshed out and all of that before merge.
This pull request introduces 5 alerts when merging 3e7b1c6 into 059aba7 - view on LGTM.com new alerts:
|
Nice work! Very cool. I just noticed this when you posted it on Slack. The first thing that comes to mind is questions about the intended maturity of this feature after initial contribution:
|
I was planning to add documentation in a follow-up PR since I thought this one was already big enough 😅
I modeled the column after existing Druid columns so most things are decorated with a version byte which should allow us to make changes in the future while still being able to continue reading the existing data. For the specific list of what is versioned:
In the "Future work" section of #12695 I mention storage format as an area that we can iterate on in the future, the biggest things I have in mind right now are storing arrays of literal values as array typed columns instead of broken out as they currently are, as well as customization such as allowing skipping building indexes on certain columns or storing them all-together also probably falls into this. Nothing about the current code should block this afaik, nor should those future enhancements interfere with our ability to read data that is stored with the current versions of stuff, so long as we practice good version hygiene whenever we make changes.
The surface area here is huge since it essentially provides all of the normal Druid column functionality within these The use cases I would feel most comfortable with are replacements for what can currently be done via flattening, meaning not heavily centered on nested arrays. I do have ideas of how to better support nested arrays and my goal is to allow arrays extracted from nested columns to be exposed as druid There is also the matter of different performance characteristics at both ingest and query time for these columns. Ingestion time segment merge is pretty heavy right now because the global value dictionary is stored in heap. Query performance can vary a fair bit with nested columns compared to flat columns, especially with numbers due to the existence of indexes on these numeric columns, which currently at least sometimes results in dramatically faster but also sometimes slower query performance. I'm still exploring this quite a bit, besides documentation follow-up I also have been working on doing some benchmarking to see where things currently stand and plan on sharing those results relatively soon. So, long story short, due to the unknowns I think the answer for right now is that operators should experiment with |
I created some query benchmarks to see how various nested column queries perform against traditional columns and see where things currently stand to help guide which areas I will be investigating in the future. I'll add the benchmark code in a follow-up PR. The nested columns are identical to the traditional columns, created using a transform expression:
For the most part things are pretty similar as expected, and the ones that aren't I will be digging deeper into. For numeric columns, which have indexes in nested columns, the indexes in most cases make the queries for nested columns faster, though in a few cases are currently slower. One of the double columns with the bound filter, which part of the reason for that is that currently there is no native numeric range index for the bound filter to use, so bound filters on numbers have to use a
|
Thanks for the details. It's helpful in understanding the maturity of the feature. It sounds relatively mature by the standards of a new feature.
To me this sounds like the basics of the feature are production-ready. There may be various callouts about performance, but it seems that the compatibility story is tight enough that the feature doesn't need the experimental markings at this time. You mentioned being somewhat less certain about the behavior of nested arrays. We should figure out if that part is going to be included in the production-ready feature set, or if we'll call that particular scenario out as an evolving area. What is your intent & recommendation in this area? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 from a design perspective. I mainly considered the behavior of the functions being introduced, how this is all plugged into the existing query engine concepts, and how we're handling evolution and compatibility. It all sounds good to me.
I think that is fair 👍
It is definitely going to be an evolving area (which I think could be said of our array support in general), though there are probably a narrow range use cases that could be used today, mainly where array lengths and element positions are known and have some meaning and query time operations are primarily extracting and operating on individual elements. This is more or less the current limitations of There are some lower hanging fruit that would improve stuff in the near term, some of which might be possible to get in before the next release. The first supporting wildcards in the subset of the path syntax that we support, which would allow For nested arrays of JSON objects extracted by At some point after that, I intend to introduce the option to begin storing literal arrays in nested I guess I'm getting a bit into the weeds, but my point I guess is that I think this feature will evolve along-side and should help us improve array support in general, so am hyped to get it there. |
This is amazing stuff. I am excited to follow this up with a web console PR to support some of this stuff there. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had some review comments 😄 Sorry for being late.
@Override | ||
public Expr apply(List<Expr> args) | ||
{ | ||
Preconditions.checkArgument(args.size() % 2 == 0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add the size of arguments in the exception error message.
ExprEval field = args.get(i).eval(bindings); | ||
ExprEval value = args.get(i + 1).eval(bindings); | ||
|
||
Preconditions.checkArgument(field.type().is(ExprType.STRING), "field name must be a STRING"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add the actual type of field in the error message.
} | ||
} | ||
|
||
static List<NestedPathFinder.NestedPathPart> getArg1PathPartsFromLiteral(String fnName, List<Expr> args) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit. you could just pass the args.get(1) itself as an argument.
return input.value(); | ||
} | ||
|
||
static void checkArg0NestedType(String fnName, List<Expr> args, ExprEval input) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit - you could just pass args.get(0) as an argument. since the intention to use 0th argument is explicit in the function name.
import java.util.SortedMap; | ||
import java.util.TreeMap; | ||
|
||
public class NestedDataColumnIndexer implements DimensionIndexer<StructuredData, StructuredData, StructuredData> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you add some javadocs here?
boolean allNulls = allNull(dimValues.getSortedStrings()) && | ||
allNull(dimValues.getSortedLongs()) && | ||
allNull(dimValues.getSortedDoubles()); | ||
sortedLookup = dimValues; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't this be inside the if
block below?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nah, this is a strange flow that i picked up from this code being based on DictionaryEncodedColumnMerger
. Every iteration sets sortedLookup, which is only used if there is a single thing being "merged", which really means that an incremental index is being persisted and so just passes through its sorted dictionary it built. If there are more than 1 set things being merged, then it uses a DictionaryMergingIterator
to split apart each segments sorted global dictionary and build arrays for each locally sorted set of values to merge them, which null values are just skipped.
Its probably worth adjusting this code, will try to re-arrange in the future so its a bit easier to follow, especially since unlike DictionaryEncodedColumnMerger
the "indexable adapter" abstraction sort of falls apart and i have to know when i'm dealing with incremental indexes vs queryable indexes.
|
||
final BaseColumn col = columnHolder.getColumn(); | ||
|
||
closer.register(col); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we are doing this two times. another occurrence is inside the function.
* Reader for a virtual contiguous address range backed by compressed blocks of data. | ||
* | ||
* Format: | ||
* | version (byte) | compression (byte) | num blocks (int) | block size (int) | end offsets | compressed data | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
block size comes before number of blocks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hah oops, good catch
/** | ||
* Get size in bytes of virtual contiguous buffer | ||
*/ | ||
public long getSize() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is compressedSize, correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, this is the end position of the last uncompressed value so the uncompressed size. The endOffsetsBuffer
contains the uncompressed end position of every value that is compressed in the blocks. Using the uncompressed start and end position of a value and knowing the size of blocks we can then locate which block(s) the value is contained in to extract it.
I'll try to add some more comments and javadocs to this area so its a bit clearer what is going on
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that will be of help. Since At line #75, it says final int compressedSize = offsetView.get(numBlocks - 1);
public class CompressedBlockSerializer implements Serializer | ||
{ | ||
private static final MetaSerdeHelper<CompressedBlockSerializer> META_SERDE_HELPER = MetaSerdeHelper | ||
.firstWriteByte((CompressedBlockSerializer x) -> (byte) 0x01) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit - can you directly point to the VERSION variable in CompressedBlockReader?
final StructuredDataProcessor processor = new StructuredDataProcessor() | ||
{ | ||
@Override | ||
public int processLiteralField(String fieldName, Object fieldValue) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this method needs javadoc. what is the significance of return value here?
ExprEval input = args.get(0).eval(bindings); | ||
// maybe in the future ProcessResults should deal in PathFinder.PathPart instead of strings for fields | ||
StructuredDataProcessor.ProcessResults info = processor.processFields(maybeUnwrapStructuredData(input)); | ||
List<String> transformed = info.getLiteralFields() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
have you thought about caching some results here? For example, many literal fields are going to be repeated across input records and parseJqPath
doesn't seem cheap.
/** | ||
* Get size in bytes of virtual contiguous buffer | ||
*/ | ||
public long getSize() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that will be of help. Since At line #75, it says final int compressedSize = offsetView.get(numBlocks - 1);
Channels.writeFully(valuesOut, compressed); | ||
uncompressedDataBuffer.clear(); | ||
numBlocks++; | ||
if (numBlocks < 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how does this happen?
Description
This PR implements the functionality described in #12695. This design is the result of collaboration between myself and @cheddar over the past several months and we've iterated on it a few times (you might notice the column version is 3 for example, despite there only being 1 implementation). We are now confident enough about the viability of this approach to release it into the wild. There is a still a lot of work to do, but I think this is now at a pretty good jumping off point, and has built some pieces that might have some wider use cases which I'll try to get into later.
Since #12695 describes the high level design, here I will just dig straight into implementation details.
Ingestion
NestedDataDimensionSchema
,NestedDataDimensionHandler
NestedDataDimensionSchema
is the native ingestDimensionSchema
that is used to specify nested data columns, specified in JSON asNestedDataDimensionHandler
is registered as theDimensionHandler
for thejson
complex type so that the proper indexer and mergers can be created to process the values during ingestion.NestedDataColumnIndexer
,StructuredData
The
NestedDataColumnIndexer
is theDimensionIndexer
for nested data columns which deals in a type calledStructuredData
, which is basically a wrapper around any java object that provides some niceness for operating on it and persisting to columns and the like.NestedDataColumnIndexer
job is to process any type of input, traverse its structure to locate any 'literal' fields to construct a super set of all nested columns, and to construct a 'global' value dictionary of every literal value that is encountered while processing input.NestedDataColumnMerger
,NestedDataColumnSerializer
NestedDataColumnMerger
is theDimensionMergerV9
implementation, which is responsible for merging and sorting the global dimension dictionary and constructingNestedDataColumnSerializer
which is aSerializer
implementation which then does the bulk of the work creating the segment.NestedDataColumnSerializer
with the set of fields and their types, along with the sorted global dictionary can now write out the actual column. It maintains aGlobalDictionaryEncodedFieldColumnWriter
for each nested field, which are responsible for writing the nested columns themselves. WhileNestedDataColumnSerializer
is serializing the 'raw' complex column also processes the data to feed values toGlobalDictionaryEncodedFieldColumnWriter
which build 'local' dictionaries that map the values contained within the nested column to the global ids and write out an intermediary column of these unsorted local ids. After the 'raw' column is fully serialized, the nested columns created byGlobalDictionaryEncodedFieldColumnWriter
can now also be finalized, which go through a similar process of sorting their local dictionaries, then build bitmap indexes for the local values and write out compressed value columns. For columns which are a single numeric type, the writer will also persist a long or double value column as appropriate.Segment
NestedDataComplexTypeSerde
,NestedDataColumnSupplier
,NestedDataComplexColumn
,NestedFieldLiteralDictionaryEncodedColumn
After serialization, the segment is left with a
NestedDataComplexColumn
, which is theComplexColumn
implementation that is used to read the 'raw' data out of the column when queried directly.NestedDataComplexTypeSerde
is theComplexMetricsSerde
that tiesNestedDataColumnSupplier
toCOMPLEX<json>
.NestedDataComplexColumn
also provides mechanisms to read the nested literal columns, and maintains a column cache of these columns so that they can be closed when the complex column itself is closed. These nested columns are all of the typeNestedFieldLiteralDictionaryEncodedColumn
, which are dictionary encoded with the local to global value dictionaries, and if single typed numeric, also contain a numeric value column.Querying
NestedFieldVirtualColumn
,NestedFieldLiteralColumnIndexSupplier
NestedFieldVirtualColumn
is a specializedVirtualColumn
that can be used to create column selectors for theNestedFieldLiteralDictionaryEncodedColumn
that are associated with theNestedDataComplexColumn
, and is used by theJSON_VALUE
SQL function to provide fast queries to avoid resorting to processing the 'raw' nested data, andJSON_QUERY
to have direct access to the raw data column.NestedFieldLiteralColumnIndexSupplier
is theColumnIndexSupplier
which allowsNestedFieldLiteralColumnIndexSupplier
to utilize the bitmap indexes of theNestedFieldLiteralDictionaryEncodedColumn
used byJSON_VALUE
for fast filtering.Native Expressions
To fill in additional functionality, a number of additional native Druid expressions have been added:
json_value(expr, path)
STRING
,LONG
,DOUBLE
) value from aCOMPLEX<json>
column or inputexpr
using JSONPath syntax ofpath
json_query(expr, path)
COMPLEX<json>
value from aCOMPLEX<json>
column or inputexpr
using JSONPath syntax ofpath
json_object(expr1, expr2[, expr3, expr4 ...])
COMPLEX<json>
with alternating 'key' and 'value' argumentsparse_json(expr)
STRING
into aCOMPLEX<json>
to be used with expressions which operate onCOMPLEX<json>
inputs.to_json(expr)
COMPLEX<json>
to be used with expressions which operate onCOMPLEX<json>
inputs, like aCAST
operation (rather than deserializingSTRING
values likePARSE_JSON
)to_json_string(expr)
COMPLEX<json>
input into a JSONSTRING
valuejson_keys(expr, path)
expr
at the specified JSONPathpath
, or null if the data does not exist or have any fieldsjson_paths(expr)
expr
SQL Functions
The
NestedFieldVirtualColumn
and the expressions are wired up as SQL operators.JSON_VALUE
andJSON_QUERY
plan toNestedFieldVirtualColumn
whenever the input is aCOMPLEX<json>
column, anything else plans into anExpressionVirtualColumn
for the associated native expressions.JSON_VALUE(expr, path [RETURNING sqltype])
STRING
,LONG
,DOUBLE
) value from aCOMPLEX<json>
column or inputexpr
using JSONPath syntax ofpath
. The optionalRETURNING sqltype
syntax allows suggesting the type of the nested literal to be extracted from aCOMPLEX<json>
column, and if not present type will attempted to be determined from context before falling back to considering the output asVARCHAR
.JSON_QUERY(expr, path)
COMPLEX<json>
value from aCOMPLEX<json>
column or inputexpr
using JSONPath syntax ofpath
JSON_OBJECT(KEY expr1 VALUE expr2[, KEY expr3 VALUE expr4 ...])
COMPLEX<json>
storing the results ofVALUE
expressions atKEY
expressionsPARSE_JSON(expr)
STRING
into aCOMPLEX<json>
to be used with expressions which operate onCOMPLEX<json>
inputs.TO_JSON(expr)
COMPLEX<json>
to be used with expressions which operate onCOMPLEX<json>
inputs, like aCAST
operation (rather than deserializingSTRING
values likePARSE_JSON
)TO_JSON_STRING(expr)
COMPLEX<json>
input into a JSONSTRING
valueJSON_KEYS(expr, path)
expr
at the specified JSONPathpath
, or null if the data does not exist or have any fieldsJSON_PATHS(expr)
expr
Other notable parts
NestedPathFinder
,StructuredDataProcessor
Out of worry that using an off the shelf library for parsing path expressions would be too slow, I ended up writing my own JSONPath parser inside
NestedPathFinder
that supports the small subset of functionality that we needed and minimize overhead as much as possible. There is also code in there to parse 'jq' style paths, which is what my first prototype functions used, and is still accepted by the virtual column. Both are parsed into aList<NestedPathPart>
, whereNestedPartPath
is either aMapField
or anArrayElement
, and the sequence is processed in order to extract values from some object.Processing of nested structure is done with
StructuredDataProcessor
, which provides mechanisms to dig through objects to do stuff. This is used at both ingest time to build the global dictionary, estimate sizes, and collect the superset of paths, and also by expressions which use it to perform the function such as extracting a value, collecting the list of keys at some path, etc.Building blocks
FixedIndexed
Like
GenericIndexed
but for fixed width values so no 'offsets' are required to be stored since values offsets can be computed by index. This is used for both the long and double components of the global value dictionary, and each nested field column has a intFixedIndexed
to store mapping of local dictionary id to global dictionary ids.ComparatorDimensionDictionary
,SortedComparatorDimensionDictionary
Comparator
based version ofDimensionDictionary
andSortedDimensionDictionary
. I might swap out these new ones for the current ones at some point in the future to standardize on usingComparator
for comparison instead ofComparable
.CompressedBlockReader
,CompressedBlockSerializer
Models a contiguous memory address space which is compressed into 64kb blocks (sized to the compression buffer pool). This is a sort of replacement for using
GenericIndexed
to store compressed blocks, without the baggage ofGenericIndexed
. It currently only supports being contained in a single smoosh file and so is limited to at most 2gb of compressed data, but in future work I would like to do would allow for splitting across multiple files.CompressedVariableSizedBlobColumn
,CompressedVariableSizedBlobColumnSupplier
,CompressedVariableSizedBlobColumnSerializer
Generic structure to store columns of variably sized compressed blobs, basically it stores a compressed set of longs corresponding to the 'end' offset of all of the values, and then the compressed blocks of data itself. This is built on top of a
CompressedBlockSerializer
/CompressedBlockReader
and another new thing,CompressedLongsReader
/CompressedLongsWriter
that is itself also built withCompressedBlockSerializer
/CompressedBlockReader
(instead of using existingColumnarLongs
implementations).This is used to store the 'raw' json value of nested columns, and I could also see it being used to support compressed complex aggregators as an alternative to
GenericIndexedBasedComplexColumn
in the future.This PR has: