Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

druid nested data column type #12753

Merged
merged 9 commits into from
Jul 14, 2022
Merged

Conversation

clintropolis
Copy link
Member

@clintropolis clintropolis commented Jul 7, 2022

Description

This PR implements the functionality described in #12695. This design is the result of collaboration between myself and @cheddar over the past several months and we've iterated on it a few times (you might notice the column version is 3 for example, despite there only being 1 implementation). We are now confident enough about the viability of this approach to release it into the wild. There is a still a lot of work to do, but I think this is now at a pretty good jumping off point, and has built some pieces that might have some wider use cases which I'll try to get into later.

Since #12695 describes the high level design, here I will just dig straight into implementation details.

Ingestion

NestedDataDimensionSchema, NestedDataDimensionHandler

NestedDataDimensionSchema is the native ingest DimensionSchema that is used to specify nested data columns, specified in JSON as

{ "type": "json", "name": "nestedColumnName" }

NestedDataDimensionHandler is registered as the DimensionHandler for the json complex type so that the proper indexer and mergers can be created to process the values during ingestion.

NestedDataColumnIndexer, StructuredData

The NestedDataColumnIndexer is the DimensionIndexer for nested data columns which deals in a type called StructuredData, which is basically a wrapper around any java object that provides some niceness for operating on it and persisting to columns and the like. NestedDataColumnIndexer job is to process any type of input, traverse its structure to locate any 'literal' fields to construct a super set of all nested columns, and to construct a 'global' value dictionary of every literal value that is encountered while processing input.

NestedDataColumnMerger, NestedDataColumnSerializer

NestedDataColumnMerger is the DimensionMergerV9 implementation, which is responsible for merging and sorting the global dimension dictionary and constructing NestedDataColumnSerializer which is a Serializer implementation which then does the bulk of the work creating the segment. NestedDataColumnSerializer with the set of fields and their types, along with the sorted global dictionary can now write out the actual column. It maintains a GlobalDictionaryEncodedFieldColumnWriter for each nested field, which are responsible for writing the nested columns themselves. While NestedDataColumnSerializer is serializing the 'raw' complex column also processes the data to feed values to GlobalDictionaryEncodedFieldColumnWriter which build 'local' dictionaries that map the values contained within the nested column to the global ids and write out an intermediary column of these unsorted local ids. After the 'raw' column is fully serialized, the nested columns created by GlobalDictionaryEncodedFieldColumnWriter can now also be finalized, which go through a similar process of sorting their local dictionaries, then build bitmap indexes for the local values and write out compressed value columns. For columns which are a single numeric type, the writer will also persist a long or double value column as appropriate.

Segment

NestedDataComplexTypeSerde, NestedDataColumnSupplier, NestedDataComplexColumn, NestedFieldLiteralDictionaryEncodedColumn

After serialization, the segment is left with a NestedDataComplexColumn, which is the ComplexColumn implementation that is used to read the 'raw' data out of the column when queried directly. NestedDataComplexTypeSerde is the ComplexMetricsSerde that ties NestedDataColumnSupplier to COMPLEX<json>. NestedDataComplexColumn also provides mechanisms to read the nested literal columns, and maintains a column cache of these columns so that they can be closed when the complex column itself is closed. These nested columns are all of the type NestedFieldLiteralDictionaryEncodedColumn, which are dictionary encoded with the local to global value dictionaries, and if single typed numeric, also contain a numeric value column.

Querying

NestedFieldVirtualColumn, NestedFieldLiteralColumnIndexSupplier

NestedFieldVirtualColumn is a specialized VirtualColumn that can be used to create column selectors for the NestedFieldLiteralDictionaryEncodedColumn that are associated with the NestedDataComplexColumn, and is used by the JSON_VALUE SQL function to provide fast queries to avoid resorting to processing the 'raw' nested data, and JSON_QUERY to have direct access to the raw data column. NestedFieldLiteralColumnIndexSupplier is the ColumnIndexSupplier which allows NestedFieldLiteralColumnIndexSupplier to utilize the bitmap indexes of the NestedFieldLiteralDictionaryEncodedColumn used by JSON_VALUE for fast filtering.

Native Expressions

To fill in additional functionality, a number of additional native Druid expressions have been added:

function notes
json_value(expr, path) Extract a Druid literal (STRING, LONG, DOUBLE) value from a COMPLEX<json> column or input expr using JSONPath syntax of path
json_query(expr, path) Extract a COMPLEX<json> value from a COMPLEX<json> column or input expr using JSONPath syntax of path
json_object(expr1, expr2[, expr3, expr4 ...]) Construct a COMPLEX<json> with alternating 'key' and 'value' arguments
parse_json(expr) Deserialize a JSON STRING into a COMPLEX<json> to be used with expressions which operate on COMPLEX<json> inputs.
to_json(expr) Convert any input type to COMPLEX<json> to be used with expressions which operate on COMPLEX<json> inputs, like a CAST operation (rather than deserializing STRING values like PARSE_JSON)
to_json_string(expr) Convert a COMPLEX<json> input into a JSON STRING value
json_keys(expr, path) get array of field names in expr at the specified JSONPath path, or null if the data does not exist or have any fields
json_paths(expr) get array of all JSONPath paths available in expr

SQL Functions

The NestedFieldVirtualColumn and the expressions are wired up as SQL operators. JSON_VALUE and JSON_QUERY plan to NestedFieldVirtualColumn whenever the input is a COMPLEX<json> column, anything else plans into an ExpressionVirtualColumn for the associated native expressions.

function notes
JSON_VALUE(expr, path [RETURNING sqltype]) Extract a Druid literal (STRING, LONG, DOUBLE) value from a COMPLEX<json> column or input expr using JSONPath syntax of path. The optional RETURNING sqltype syntax allows suggesting the type of the nested literal to be extracted from a COMPLEX<json> column, and if not present type will attempted to be determined from context before falling back to considering the output as VARCHAR.
JSON_QUERY(expr, path) Extract a COMPLEX<json> value from a COMPLEX<json> column or input expr using JSONPath syntax of path
JSON_OBJECT(KEY expr1 VALUE expr2[, KEY expr3 VALUE expr4 ...]) Construct a COMPLEX<json> storing the results of VALUE expressions at KEY expressions
PARSE_JSON(expr) Deserialize a JSON STRING into a COMPLEX<json> to be used with expressions which operate on COMPLEX<json> inputs.
TO_JSON(expr) Convert any input type to COMPLEX<json> to be used with expressions which operate on COMPLEX<json> inputs, like a CAST operation (rather than deserializing STRING values like PARSE_JSON)
TO_JSON_STRING(expr) Convert a COMPLEX<json> input into a JSON STRING value
JSON_KEYS(expr, path) get array of field names in expr at the specified JSONPath path, or null if the data does not exist or have any fields
JSON_PATHS(expr) get array of all JSONPath paths available in expr

Other notable parts

NestedPathFinder, StructuredDataProcessor

Out of worry that using an off the shelf library for parsing path expressions would be too slow, I ended up writing my own JSONPath parser inside NestedPathFinder that supports the small subset of functionality that we needed and minimize overhead as much as possible. There is also code in there to parse 'jq' style paths, which is what my first prototype functions used, and is still accepted by the virtual column. Both are parsed into a List<NestedPathPart>, where NestedPartPath is either a MapField or an ArrayElement, and the sequence is processed in order to extract values from some object.

Processing of nested structure is done with StructuredDataProcessor, which provides mechanisms to dig through objects to do stuff. This is used at both ingest time to build the global dictionary, estimate sizes, and collect the superset of paths, and also by expressions which use it to perform the function such as extracting a value, collecting the list of keys at some path, etc.

Building blocks

FixedIndexed

Like GenericIndexed but for fixed width values so no 'offsets' are required to be stored since values offsets can be computed by index. This is used for both the long and double components of the global value dictionary, and each nested field column has a int FixedIndexed to store mapping of local dictionary id to global dictionary ids.

ComparatorDimensionDictionary, SortedComparatorDimensionDictionary

Comparator based version of DimensionDictionary and SortedDimensionDictionary. I might swap out these new ones for the current ones at some point in the future to standardize on using Comparator for comparison instead of Comparable.

CompressedBlockReader, CompressedBlockSerializer

Models a contiguous memory address space which is compressed into 64kb blocks (sized to the compression buffer pool). This is a sort of replacement for using GenericIndexed to store compressed blocks, without the baggage of GenericIndexed. It currently only supports being contained in a single smoosh file and so is limited to at most 2gb of compressed data, but in future work I would like to do would allow for splitting across multiple files.

CompressedVariableSizedBlobColumn, CompressedVariableSizedBlobColumnSupplier, CompressedVariableSizedBlobColumnSerializer

Generic structure to store columns of variably sized compressed blobs, basically it stores a compressed set of longs corresponding to the 'end' offset of all of the values, and then the compressed blocks of data itself. This is built on top of a CompressedBlockSerializer/CompressedBlockReader and another new thing, CompressedLongsReader/CompressedLongsWriter that is itself also built with CompressedBlockSerializer/CompressedBlockReader (instead of using existing ColumnarLongs implementations).

This is used to store the 'raw' json value of nested columns, and I could also see it being used to support compressed complex aggregators as an alternative to GenericIndexedBasedComplexColumn in the future.


This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

Copy link
Contributor

@cheddar cheddar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've reviewed this in private, seen it active and work in the wild, so am throwing my +1 on it. Tests must pass, the description should be fleshed out and all of that before merge.

@lgtm-com
Copy link

lgtm-com bot commented Jul 7, 2022

This pull request introduces 5 alerts when merging 3e7b1c6 into 059aba7 - view on LGTM.com

new alerts:

  • 2 for Missing format argument
  • 2 for Boxed variable is never null
  • 1 for Dereferenced variable may be null

@gianm
Copy link
Contributor

gianm commented Jul 12, 2022

Nice work! Very cool. I just noticed this when you posted it on Slack. The first thing that comes to mind is questions about the intended maturity of this feature after initial contribution:

  • Is it intentionally undocumented in this PR? Do you plan to add documentation?
  • Are there any impediments to maintaining forwards compatibility of the storage format, such that new versions of Druid will always be able to read JSON columns written by older versions? Do you foresee any reason we might want to break compatibility?
  • Are there any impediments to maintaining compatibility of the behavior of the JSON functions, such that queries written today will work the same way in future versions? Do you foresee any reason we might want to break compatibility?
  • Would you recommend we present this feature in its current state as experimental or production-ready, & why?

@clintropolis
Copy link
Member Author

Is it intentionally undocumented in this PR? Do you plan to add documentation?

I was planning to add documentation in a follow-up PR since I thought this one was already big enough 😅

Are there any impediments to maintaining forwards compatibility of the storage format, such that new versions of Druid will always be able to read JSON columns written by older versions? Do you foresee any reason we might want to break compatibility?

I modeled the column after existing Druid columns so most things are decorated with a version byte which should allow us to make changes in the future while still being able to continue reading the existing data. For the specific list of what is versioned:

  • NestedDataColumnSerializer for the complex column itself (currently on v3 actually, i removed the reader code for older versions from prototyping to get rid of dead code)
  • GlobalDictionaryEncodedFieldColumnWriter which writes the nested columns and is currently re-using DictionaryEncodedColumnPartSerde.VERSION (i should probably decouple this at some point in the future...)
  • FixedIndexed (building block used to store local to global dictionary mapping and long and double value dictionaries)
  • CompressedVariableSizedBlobColumnSerializer (used to compress raw data)
  • CompressedBlockSerializer (used internally by CompressedVariableSizedBlobColumnSerializer)

In the "Future work" section of #12695 I mention storage format as an area that we can iterate on in the future, the biggest things I have in mind right now are storing arrays of literal values as array typed columns instead of broken out as they currently are, as well as customization such as allowing skipping building indexes on certain columns or storing them all-together also probably falls into this. Nothing about the current code should block this afaik, nor should those future enhancements interfere with our ability to read data that is stored with the current versions of stuff, so long as we practice good version hygiene whenever we make changes.

Would you recommend we present this feature in its current state as experimental or production-ready, & why?
This is a hard one to answer, though I am hesitant to call it production ready right from the start, I think the answer might vary a bit per use case.

The surface area here is huge since it essentially provides all of the normal Druid column functionality within these COMPLEX<json> columns, and I definitely won't claim this to be bug free. That said, quite a lot of internal testing has been done at this point, even at scale and with complicated nested schemas, which has allowed this codebase to be iterated on to get it to the place it currently is. There are some rough spots which I'm looking to improve in the near future, such as ingest time memory footprint, better array handling, etc, but I think if we get the documentation in a good enough state and can list out the limitations it could be used today.

The use cases I would feel most comfortable with are replacements for what can currently be done via flattening, meaning not heavily centered on nested arrays. I do have ideas of how to better support nested arrays and my goal is to allow arrays extracted from nested columns to be exposed as druid ARRAY types, but I am not there yet, so I'm not sure I would recommend most array use cases unless they are more like vectors which have expected lengths and array positions are known/meaningful (and such that most queries would be extracting specific array positions, not entire arrays).

There is also the matter of different performance characteristics at both ingest and query time for these columns. Ingestion time segment merge is pretty heavy right now because the global value dictionary is stored in heap. Query performance can vary a fair bit with nested columns compared to flat columns, especially with numbers due to the existence of indexes on these numeric columns, which currently at least sometimes results in dramatically faster but also sometimes slower query performance. I'm still exploring this quite a bit, besides documentation follow-up I also have been working on doing some benchmarking to see where things currently stand and plan on sharing those results relatively soon.

So, long story short, due to the unknowns I think the answer for right now is that operators should experiment with COMPLEX<json> columns to see if they work well for their use case, and use them in production if so, otherwise provide feedback so that we can continue to make improvements and expand the use cases this is good for?

@clintropolis
Copy link
Member Author

I created some query benchmarks to see how various nested column queries perform against traditional columns and see where things currently stand to help guide which areas I will be investigating in the future. I'll add the benchmark code in a follow-up PR. The nested columns are identical to the traditional columns, created using a transform expression:

json_object('long1', long1, 'nesteder', json_object('string1', string1, 'long2', long2, 'double3',double3))

For the most part things are pretty similar as expected, and the ones that aren't I will be digging deeper into. For numeric columns, which have indexes in nested columns, the indexes in most cases make the queries for nested columns faster, though in a few cases are currently slower. One of the double columns with the bound filter, which part of the reason for that is that currently there is no native numeric range index for the bound filter to use, so bound filters on numbers have to use a DruidPredicateIndex which has to match against every value in the dictionary instead of being able to take short-cuts like LexicographicalRangeIndex does for strings. I'll be adding this in a follow-up PR as well.

SELECT SUM(long1) FROM foo
SELECT SUM(JSON_VALUE(nested, '$.long1' RETURNING BIGINT)) FROM foo
Benchmark                        (query)  (rowsPerSegment)  (vectorize)  Mode  Cnt    Score    Error  Units
SqlNestedDataBenchmark.querySql        0           5000000        false  avgt    5   36.711 ±  0.917  ms/op
SqlNestedDataBenchmark.querySql        0           5000000        force  avgt    5   15.587 ±  0.276  ms/op
SqlNestedDataBenchmark.querySql        1           5000000        false  avgt    5   39.224 ±  0.870  ms/op
SqlNestedDataBenchmark.querySql        1           5000000        force  avgt    5   15.877 ±  0.440  ms/op


SELECT SUM(long1), SUM(long2) FROM foo
SELECT SUM(JSON_VALUE(nested, '$.long1' RETURNING BIGINT)), SUM(JSON_VALUE(nested, '$.nesteder.long2' RETURNING BIGINT)) FROM foo
Benchmark                        (query)  (rowsPerSegment)  (vectorize)  Mode  Cnt    Score    Error  Units
SqlNestedDataBenchmark.querySql        2           5000000        false  avgt    5   63.805 ±  1.036  ms/op
SqlNestedDataBenchmark.querySql        2           5000000        force  avgt    5   30.381 ±  1.201  ms/op
SqlNestedDataBenchmark.querySql        3           5000000        false  avgt    5   66.660 ±  0.806  ms/op
SqlNestedDataBenchmark.querySql        3           5000000        force  avgt    5   30.341 ±  1.124  ms/op


SELECT SUM(long1), SUM(long2), SUM(double3) FROM foo
SELECT SUM(JSON_VALUE(nested, '$.long1' RETURNING BIGINT)), SUM(JSON_VALUE(nested, '$.nesteder.long2' RETURNING BIGINT)), SUM(JSON_VALUE(nested, '$.nesteder.double3' RETURNING DOUBLE)) FROM foo
Benchmark                        (query)  (rowsPerSegment)  (vectorize)  Mode  Cnt    Score    Error  Units
SqlNestedDataBenchmark.querySql        4           5000000        false  avgt    5   78.570 ±  1.657  ms/op
SqlNestedDataBenchmark.querySql        4           5000000        force  avgt    5   37.777 ±  1.295  ms/op
SqlNestedDataBenchmark.querySql        5           5000000        false  avgt    5   82.672 ±  1.010  ms/op
SqlNestedDataBenchmark.querySql        5           5000000        force  avgt    5   37.887 ±  0.802  ms/op


SELECT string1, SUM(long1) FROM foo GROUP BY 1 ORDER BY 2,
SELECT JSON_VALUE(nested, '$.nesteder.string1'), SUM(JSON_VALUE(nested, '$.long1' RETURNING BIGINT)) FROM foo GROUP BY 1 ORDER BY 2,
Benchmark                        (query)  (rowsPerSegment)  (vectorize)  Mode  Cnt    Score    Error  Units
SqlNestedDataBenchmark.querySql        6           5000000        false  avgt    5  269.560 ±  1.454  ms/op
SqlNestedDataBenchmark.querySql        6           5000000        force  avgt    5  157.090 ±  4.058  ms/op
SqlNestedDataBenchmark.querySql        7           5000000        false  avgt    5  373.162 ±  2.871  ms/op
SqlNestedDataBenchmark.querySql        7           5000000        force  avgt    5  195.213 ±  1.993  ms/op


SELECT string1, SUM(long1), SUM(double3) FROM foo GROUP BY 1 ORDER BY 2
SELECT JSON_VALUE(nested, '$.nesteder.string1'), SUM(JSON_VALUE(nested, '$.long1' RETURNING BIGINT)), SUM(JSON_VALUE(nested, '$.nesteder.double3' RETURNING DOUBLE)) FROM foo GROUP BY 1 ORDER BY 2
Benchmark                        (query)  (rowsPerSegment)  (vectorize)  Mode  Cnt    Score    Error  Units
SqlNestedDataBenchmark.querySql        8           5000000        false  avgt    5  251.743 ±  6.438  ms/op
SqlNestedDataBenchmark.querySql        8           5000000        force  avgt    5  172.322 ± 14.814  ms/op
SqlNestedDataBenchmark.querySql        9           5000000        false  avgt    5  417.454 ± 21.276  ms/op
SqlNestedDataBenchmark.querySql        9           5000000        force  avgt    5  215.228 ±  9.304  ms/op


SELECT SUM(long1) FROM foo WHERE string1 = '10000' OR string1 = '1000'
SELECT SUM(JSON_VALUE(nested, '$.long1' RETURNING BIGINT)) FROM foo WHERE JSON_VALUE(nested, '$.nesteder.string1') = '10000' OR JSON_VALUE(nested, '$.nesteder.string1') = '1000'
Benchmark                        (query)  (rowsPerSegment)  (vectorize)  Mode  Cnt    Score    Error  Units
SqlNestedDataBenchmark.querySql       10           5000000        false  avgt    5   11.482 ±  0.495  ms/op
SqlNestedDataBenchmark.querySql       10           5000000        force  avgt    5   11.549 ±  0.303  ms/op
SqlNestedDataBenchmark.querySql       11           5000000        false  avgt    5   11.695 ±  0.293  ms/op
SqlNestedDataBenchmark.querySql       11           5000000        force  avgt    5   11.931 ±  0.338  ms/op


SELECT SUM(long1) FROM foo WHERE long2 = 10000 OR long2 = 1000
SELECT SUM(JSON_VALUE(nested, '$.long1' RETURNING BIGINT)) FROM foo WHERE JSON_VALUE(nested, '$.nesteder.long2' RETURNING BIGINT) = 10000 OR JSON_VALUE(nested, '$.nesteder.long2' RETURNING BIGINT) = 1000
Benchmark                        (query)  (rowsPerSegment)  (vectorize)  Mode  Cnt    Score    Error  Units
SqlNestedDataBenchmark.querySql       12           5000000        false  avgt    5   78.895 ±  2.158  ms/op
SqlNestedDataBenchmark.querySql       12           5000000        force  avgt    5   48.814 ±  0.874  ms/op
SqlNestedDataBenchmark.querySql       13           5000000        false  avgt    5    1.297 ±  0.008  ms/op
SqlNestedDataBenchmark.querySql       13           5000000        force  avgt    5    1.277 ±  0.011  ms/op


SELECT SUM(long1) FROM foo WHERE double3 < 10000.0 AND double3 > 1000.0
SELECT SUM(JSON_VALUE(nested, '$.long1' RETURNING BIGINT)) FROM foo WHERE JSON_VALUE(nested, '$.nesteder.double3' RETURNING DOUBLE) < 10000.0 AND JSON_VALUE(nested, '$.nesteder.double3' RETURNING DOUBLE) > 1000.0
Benchmark                        (query)  (rowsPerSegment)  (vectorize)  Mode  Cnt    Score    Error  Units
SqlNestedDataBenchmark.querySql       14           5000000        false  avgt    5   92.982 ±  1.473  ms/op
SqlNestedDataBenchmark.querySql       14           5000000        force  avgt    5   54.729 ±  0.429  ms/op
SqlNestedDataBenchmark.querySql       15           5000000        false  avgt    5  580.472 ± 28.064  ms/op
SqlNestedDataBenchmark.querySql       15           5000000        force  avgt    5  561.494 ± 54.096  ms/op


SELECT long1, SUM(double3) FROM foo WHERE string1 = '10000' OR string1 = '1000' GROUP BY 1 ORDER BY 2
SELECT JSON_VALUE(nested, '$.long1' RETURNING BIGINT), SUM(JSON_VALUE(nested, '$.nesteder.double3' RETURNING DOUBLE)) FROM foo WHERE JSON_VALUE(nested, '$.nesteder.string1') = '10000' OR JSON_VALUE(nested, '$.nesteder.string1') = '1000' GROUP BY 1 ORDER BY 2
Benchmark                        (query)  (rowsPerSegment)  (vectorize)  Mode  Cnt    Score    Error  Units
SqlNestedDataBenchmark.querySql       16           5000000        false  avgt    5  129.760 ±  9.953  ms/op
SqlNestedDataBenchmark.querySql       16           5000000        force  avgt    5  133.015 ± 20.961  ms/op
SqlNestedDataBenchmark.querySql       17           5000000        false  avgt    5  142.197 ±  8.773  ms/op
SqlNestedDataBenchmark.querySql       17           5000000        force  avgt    5  132.048 ± 15.546  ms/op


SELECT string1, SUM(double3) FROM foo WHERE long2 < 10000 AND long2 > 1000 GROUP BY 1 ORDER BY 2
SELECT JSON_VALUE(nested, '$.nesteder.string1'), SUM(JSON_VALUE(nested, '$.nesteder.double3' RETURNING DOUBLE)) FROM foo WHERE JSON_VALUE(nested, '$.nesteder.long2' RETURNING BIGINT) < 10000 AND JSON_VALUE(nested, '$.nesteder.long2' RETURNING BIGINT) > 1000 GROUP BY 1 ORDER BY 2
Benchmark                        (query)  (rowsPerSegment)  (vectorize)  Mode  Cnt    Score    Error  Units
SqlNestedDataBenchmark.querySql       18           5000000        false  avgt    5  161.445 ±  7.024  ms/op
SqlNestedDataBenchmark.querySql       18           5000000        force  avgt    5  138.212 ± 19.673  ms/op
SqlNestedDataBenchmark.querySql       19           5000000        false  avgt    5  123.486 ±  5.029  ms/op
SqlNestedDataBenchmark.querySql       19           5000000        force  avgt    5  120.079 ±  6.822  ms/op


SELECT string1, SUM(double3) FROM foo WHERE double3 < 10000.0 AND double3 > 1000.0 GROUP BY 1 ORDER BY 2
SELECT JSON_VALUE(nested, '$.nesteder.string1'), SUM(JSON_VALUE(nested, '$.nesteder.double3' RETURNING DOUBLE)) FROM foo WHERE JSON_VALUE(nested, '$.nesteder.double3' RETURNING DOUBLE) < 10000.0 AND JSON_VALUE(nested, '$.nesteder.double3' RETURNING DOUBLE) > 1000.0 GROUP BY 1 ORDER BY 2
Benchmark                        (query)  (rowsPerSegment)  (vectorize)  Mode  Cnt    Score    Error  Units
SqlNestedDataBenchmark.querySql       20           5000000        false  avgt    5  280.393 ± 15.369  ms/op
SqlNestedDataBenchmark.querySql       20           5000000        force  avgt    5  174.545 ±  2.702  ms/op
SqlNestedDataBenchmark.querySql       21           5000000        false  avgt    5  802.647 ± 32.078  ms/op
SqlNestedDataBenchmark.querySql       21           5000000        force  avgt    5  591.274 ± 16.460  ms/op


SELECT long2 FROM foo WHERE long2 IN (1, 19, 21, 23, 25, 26, 46),
SELECT JSON_VALUE(nested, '$.nesteder.long2' RETURNING BIGINT) FROM foo WHERE JSON_VALUE(nested, '$.nesteder.long2' RETURNING BIGINT) IN (1, 19, 21, 23, 25, 26, 46),
Benchmark                        (query)  (rowsPerSegment)  (vectorize)  Mode  Cnt    Score    Error  Units
SqlNestedDataBenchmark.querySql       22           5000000        false  avgt    5  273.464 ± 15.731  ms/op
SqlNestedDataBenchmark.querySql       22           5000000        force  avgt    5  272.270 ± 20.511  ms/op
SqlNestedDataBenchmark.querySql       23           5000000        false  avgt    5  174.960 ±  1.923  ms/op
SqlNestedDataBenchmark.querySql       23           5000000        force  avgt    5  177.920 ±  4.095  ms/op


SELECT long2 FROM foo WHERE long2 IN (1, 19, 21, 23, 25, 26, 46) GROUP BY 1",
SELECT JSON_VALUE(nested, '$.nesteder.long2' RETURNING BIGINT) FROM foo WHERE JSON_VALUE(nested, '$.nesteder.long2' RETURNING BIGINT) IN (1, 19, 21, 23, 25, 26, 46) GROUP BY 1
Benchmark                        (query)  (rowsPerSegment)  (vectorize)  Mode  Cnt    Score    Error  Units
SqlNestedDataBenchmark.querySql       24           5000000        false  avgt    5  318.280 ±  7.544  ms/op
SqlNestedDataBenchmark.querySql       24           5000000        force  avgt    5  210.866 ± 14.684  ms/op
SqlNestedDataBenchmark.querySql       25           5000000        false  avgt    5  215.200 ±  2.366  ms/op
SqlNestedDataBenchmark.querySql       25           5000000        force  avgt    5  152.399 ± 22.695  ms/op

@gianm
Copy link
Contributor

gianm commented Jul 13, 2022

Thanks for the details. It's helpful in understanding the maturity of the feature. It sounds relatively mature by the standards of a new feature.

So, long story short, due to the unknowns I think the answer for right now is that operators should experiment with COMPLEX<json> columns to see if they work well for their use case, and use them in production if so, otherwise provide feedback so that we can continue to make improvements and expand the use cases this is good for?

To me this sounds like the basics of the feature are production-ready. There may be various callouts about performance, but it seems that the compatibility story is tight enough that the feature doesn't need the experimental markings at this time.

You mentioned being somewhat less certain about the behavior of nested arrays. We should figure out if that part is going to be included in the production-ready feature set, or if we'll call that particular scenario out as an evolving area. What is your intent & recommendation in this area?

Copy link
Contributor

@gianm gianm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 from a design perspective. I mainly considered the behavior of the functions being introduced, how this is all plugged into the existing query engine concepts, and how we're handling evolution and compatibility. It all sounds good to me.

@clintropolis
Copy link
Member Author

To me this sounds like the basics of the feature are production-ready. There may be various callouts about performance, but it seems that the compatibility story is tight enough that the feature doesn't need the experimental markings at this time.

I think that is fair 👍

You mentioned being somewhat less certain about the behavior of nested arrays. We should figure out if that part is going to be included in the production-ready feature set, or if we'll call that particular scenario out as an evolving area. What is your intent & recommendation in this area?

It is definitely going to be an evolving area (which I think could be said of our array support in general), though there are probably a narrow range use cases that could be used today, mainly where array lengths and element positions are known and have some meaning and query time operations are primarily extracting and operating on individual elements. This is more or less the current limitations of flattenSpec with nested arrays I think.

There are some lower hanging fruit that would improve stuff in the near term, some of which might be possible to get in before the next release. The first supporting wildcards in the subset of the path syntax that we support, which would allow JSON_QUERY and JSON_VALUE (or something like it.. i'm not sure entirely how the RETURNING syntax would work with array types in SQL so need to do some tinkering there) to extract complete arrays. For JSON_QUERY these results would still be COMPLEX<json> typed , but JSON_VALUE* would spit out druid literal array types (ARRAY<LONG>, ARRAY<STRING>, etc).

For nested arrays of JSON objects extracted by JSON_QUERY, i think we will want a way to convert a COMLEX<json> into an ARRAY<COMPLEX<json>> so that they too can take part in array operations, especially once we add a native UNNEST function to transform arrays into tables, which would be the path to exploding out these nested objects and performing operations on their contents.

At some point after that, I intend to introduce the option to begin storing literal arrays in nested ARRAY typed columns instead of them broken out into separate columns for individual elements like they currently exist (so that array operations don't have to decompress a bunch of separate columns to do stuff).

I guess I'm getting a bit into the weeds, but my point I guess is that I think this feature will evolve along-side and should help us improve array support in general, so am hyped to get it there.

@vogievetsky
Copy link
Contributor

This is amazing stuff. I am excited to follow this up with a web console PR to support some of this stuff there.

@vogievetsky vogievetsky merged commit 05b2e96 into apache:master Jul 14, 2022
@clintropolis clintropolis deleted the nested-columns branch July 14, 2022 19:12
Copy link
Contributor

@abhishekagarwal87 abhishekagarwal87 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had some review comments 😄 Sorry for being late.

@Override
public Expr apply(List<Expr> args)
{
Preconditions.checkArgument(args.size() % 2 == 0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add the size of arguments in the exception error message.

ExprEval field = args.get(i).eval(bindings);
ExprEval value = args.get(i + 1).eval(bindings);

Preconditions.checkArgument(field.type().is(ExprType.STRING), "field name must be a STRING");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add the actual type of field in the error message.

}
}

static List<NestedPathFinder.NestedPathPart> getArg1PathPartsFromLiteral(String fnName, List<Expr> args)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit. you could just pass the args.get(1) itself as an argument.

return input.value();
}

static void checkArg0NestedType(String fnName, List<Expr> args, ExprEval input)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit - you could just pass args.get(0) as an argument. since the intention to use 0th argument is explicit in the function name.

import java.util.SortedMap;
import java.util.TreeMap;

public class NestedDataColumnIndexer implements DimensionIndexer<StructuredData, StructuredData, StructuredData>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add some javadocs here?

boolean allNulls = allNull(dimValues.getSortedStrings()) &&
allNull(dimValues.getSortedLongs()) &&
allNull(dimValues.getSortedDoubles());
sortedLookup = dimValues;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this be inside the if block below?

Copy link
Member Author

@clintropolis clintropolis Jul 29, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nah, this is a strange flow that i picked up from this code being based on DictionaryEncodedColumnMerger. Every iteration sets sortedLookup, which is only used if there is a single thing being "merged", which really means that an incremental index is being persisted and so just passes through its sorted dictionary it built. If there are more than 1 set things being merged, then it uses a DictionaryMergingIterator to split apart each segments sorted global dictionary and build arrays for each locally sorted set of values to merge them, which null values are just skipped.

Its probably worth adjusting this code, will try to re-arrange in the future so its a bit easier to follow, especially since unlike DictionaryEncodedColumnMerger the "indexable adapter" abstraction sort of falls apart and i have to know when i'm dealing with incremental indexes vs queryable indexes.


final BaseColumn col = columnHolder.getColumn();

closer.register(col);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we are doing this two times. another occurrence is inside the function.

* Reader for a virtual contiguous address range backed by compressed blocks of data.
*
* Format:
* | version (byte) | compression (byte) | num blocks (int) | block size (int) | end offsets | compressed data |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

block size comes before number of blocks.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hah oops, good catch

/**
* Get size in bytes of virtual contiguous buffer
*/
public long getSize()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is compressedSize, correct?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, this is the end position of the last uncompressed value so the uncompressed size. The endOffsetsBuffer contains the uncompressed end position of every value that is compressed in the blocks. Using the uncompressed start and end position of a value and knowing the size of blocks we can then locate which block(s) the value is contained in to extract it.

I'll try to add some more comments and javadocs to this area so its a bit clearer what is going on

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that will be of help. Since At line #75, it says final int compressedSize = offsetView.get(numBlocks - 1);

public class CompressedBlockSerializer implements Serializer
{
private static final MetaSerdeHelper<CompressedBlockSerializer> META_SERDE_HELPER = MetaSerdeHelper
.firstWriteByte((CompressedBlockSerializer x) -> (byte) 0x01)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit - can you directly point to the VERSION variable in CompressedBlockReader?

final StructuredDataProcessor processor = new StructuredDataProcessor()
{
@Override
public int processLiteralField(String fieldName, Object fieldValue)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this method needs javadoc. what is the significance of return value here?

ExprEval input = args.get(0).eval(bindings);
// maybe in the future ProcessResults should deal in PathFinder.PathPart instead of strings for fields
StructuredDataProcessor.ProcessResults info = processor.processFields(maybeUnwrapStructuredData(input));
List<String> transformed = info.getLiteralFields()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have you thought about caching some results here? For example, many literal fields are going to be repeated across input records and parseJqPath doesn't seem cheap.

/**
* Get size in bytes of virtual contiguous buffer
*/
public long getSize()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that will be of help. Since At line #75, it says final int compressedSize = offsetView.get(numBlocks - 1);

Channels.writeFully(valuesOut, compressed);
uncompressedDataBuffer.clear();
numBlocks++;
if (numBlocks < 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how does this happen?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants