Skip to content

Commit

Permalink
Merge branch 'master' into update-publishing-readme
Browse files Browse the repository at this point in the history
  • Loading branch information
317brian authored Nov 20, 2023
2 parents ccda41c + 6ed343c commit e1faba3
Show file tree
Hide file tree
Showing 45 changed files with 2,164 additions and 370 deletions.
228 changes: 145 additions & 83 deletions docs/api-reference/automatic-compaction-api.md

Large diffs are not rendered by default.

578 changes: 545 additions & 33 deletions docs/api-reference/data-management-api.md

Large diffs are not rendered by default.

17 changes: 8 additions & 9 deletions docs/api-reference/sql-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -390,12 +390,10 @@ Generally, the `sql` and `sql/statements` endpoints support the same response bo

Keep the following in mind when submitting queries to the `sql/statements` endpoint:

- There are additional context parameters for `sql/statements`:
- Apart from the context parameters mentioned [here](../multi-stage-query/reference.md#context-parameters) there are additional context parameters for `sql/statements` specifically:

- `executionMode` determines how query results are fetched. Druid currently only supports `ASYNC`. You must manually retrieve your results after the query completes.
- `selectDestination` determines where final results get written. By default, results are written to task reports. Set this parameter to `durableStorage` to instruct Druid to write the results from SELECT queries to durable storage, which allows you to fetch larger result sets. Note that this requires you to have [durable storage for MSQ enabled](../operations/durable-storage.md).

- The only supported value for `resultFormat` is JSON LINES.
- `selectDestination` determines where final results get written. By default, results are written to task reports. Set this parameter to `durableStorage` to instruct Druid to write the results from SELECT queries to durable storage, which allows you to fetch larger result sets. For result sets with more than 3000 rows, it is highly recommended to use `durableStorage`. Note that this requires you to have [durable storage for MSQ](../operations/durable-storage.md) enabled.

#### Responses

Expand Down Expand Up @@ -812,12 +810,10 @@ Host: http://ROUTER_IP:ROUTER_PORT

Retrieves results for completed queries. Results are separated into pages, so you can use the optional `page` parameter to refine the results you get. Druid returns information about the composition of each page and its page number (`id`). For information about pages, see [Get query status](#get-query-status).

If a page number isn't passed, all results are returned sequentially in the same response. If you have large result sets, you may encounter timeouts based on the value configured for `druid.router.http.readTimeout`.

When getting query results, keep the following in mind:
If a page number isn't passed, all results are returned sequentially in the same response. If you have large result sets, you may encounter timeouts based on the value configured for `druid.router.http.readTimeout`.

- JSON Lines is the only supported result format.
- Getting the query results for an ingestion query returns an empty response.
Getting the query results for an ingestion query returns an empty response.

#### URL

Expand All @@ -826,7 +822,10 @@ When getting query results, keep the following in mind:
#### Query parameters
* `page` (optional)
* Type: Int
* Refine paginated results
* Fetch results based on page numbers. If not specified, all results are returned sequentially starting from page 0 to N in the same response.
* `resultFormat` (optional)
* Type: String
* Defines the format in which the results are presented. The following options are supported `arrayLines`,`objectLines`,`array`,`object`, and `csv`. The default is `object`.

#### Responses

Expand Down
26 changes: 16 additions & 10 deletions docs/api-reference/supervisor-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@ sidebar_label: Supervisors
import Tabs from '@theme/Tabs';
import TabItem from '@theme/TabItem';


<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
Expand Down Expand Up @@ -3144,17 +3142,21 @@ Use this endpoint to selectively reset offsets for partitions without resetting

#### Responses

<!--DOCUSAURUS_CODE_TABS-->
<Tabs>

<TabItem value="1" label="200 SUCCESS">

<!--200 SUCCESS-->

*Successfully reset offsets*

<!--404 NOT FOUND-->
</TabItem>
<TabItem value="2" label="404 NOT FOUND">


*Invalid supervisor ID*

<!--END_DOCUSAURUS_CODE_TABS-->
</TabItem>
</Tabs>

---
#### Reset Offsets Metadata
Expand All @@ -3181,17 +3183,20 @@ The following table defines the fields within the `partitions` object in the res
The following example shows how to reset offsets for a kafka supervisor with the name `social_media`. Let's say the supervisor is reading
from a kafka topic `ads_media_stream` and has the stored offsets: `{"0": 0, "1": 10, "2": 20, "3": 40}`.

<!--DOCUSAURUS_CODE_TABS-->
<Tabs>

<TabItem value="3" label="cURL">

<!--cURL-->

```shell
curl --request POST "http://ROUTER_IP:ROUTER_PORT/druid/indexer/v1/supervisor/social_media/resetOffsets"
--header 'Content-Type: application/json'
--data-raw '{"type":"kafka","partitions":{"type":"end","stream":"ads_media_stream","partitionOffsetMap":{"0":100, "2": 650}}}'
```

<!--HTTP-->
</TabItem>
<TabItem value="4" label="HTTP">


```HTTP
POST /druid/indexer/v1/supervisor/social_media/resetOffsets HTTP/1.1
Expand All @@ -3214,7 +3219,8 @@ Content-Type: application/json
The above operation will reset offsets only for partitions 0 and 2 to 100 and 650 respectively. After a successful reset,
when the supervisor's tasks restart, they will resume reading from `{"0": 100, "1": 10, "2": 650, "3": 40}`.

<!--END_DOCUSAURUS_CODE_TABS-->
</TabItem>
</Tabs>

#### Sample response

Expand Down
15 changes: 11 additions & 4 deletions docs/development/extensions-core/kafka-supervisor-operations.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ sidebar_label: "Apache Kafka operations"
description: "Reference topic for running and maintaining Apache Kafka supervisors"
---

import Tabs from '@theme/Tabs';
import TabItem from '@theme/TabItem';

<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
Expand Down Expand Up @@ -149,17 +152,19 @@ twice, resulting in missing or duplicate data.
The following example shows how to reset offsets for a kafka supervisor with the name `social_media`. Let's say the supervisor is reading
from two kafka topics `ads_media_foo` and `ads_media_bar` and has the stored offsets: `{"ads_media_foo:0": 0, "ads_media_foo:1": 10, "ads_media_bar:0": 20, "ads_media_bar:1": 40}`.

<!--DOCUSAURUS_CODE_TABS-->
<Tabs>

<TabItem value="1" label="cURL">

<!--cURL-->

```shell
curl --request POST "http://ROUTER_IP:ROUTER_PORT/druid/indexer/v1/supervisor/social_media/resetOffsets"
--header 'Content-Type: application/json'
--data-raw '{"type":"kafka","partitions":{"type":"end","stream":"ads_media_foo|ads_media_bar","partitionOffsetMap":{"ads_media_foo:0": 3, "ads_media_bar:1": 12}}}'
```

<!--HTTP-->
</TabItem>
<TabItem value="2" label="HTTP">

```HTTP
POST /druid/indexer/v1/supervisor/social_media/resetOffsets HTTP/1.1
Expand All @@ -178,10 +183,12 @@ Content-Type: application/json
}
}
```

The above operation will reset offsets for `ads_media_foo` partition 0 and `ads_media_bar` partition 1 to offsets 3 and 12 respectively. After a successful reset,
when the supervisor's tasks restart, they will resume reading from `{"ads_media_foo:0": 3, "ads_media_foo:1": 10, "ads_media_bar:0": 20, "ads_media_bar:1": 12}`.

<!--END_DOCUSAURUS_CODE_TABS-->
</TabItem>
</Tabs>

#### Sample response

Expand Down
12 changes: 7 additions & 5 deletions docs/ingestion/input-sources.md
Original file line number Diff line number Diff line change
Expand Up @@ -858,7 +858,8 @@ The following is a sample spec for a HDFS warehouse source:
},
"warehouseSource": {
"type": "hdfs"
}
},
"snapshotTime": "2023-06-01T00:00:00.000Z",
},
"inputFormat": {
"type": "parquet"
Expand Down Expand Up @@ -937,10 +938,11 @@ The following is a sample spec for a S3 warehouse source:
|--------|-----------|---------|
|type|Set the value to `iceberg`.|yes|
|tableName|The Iceberg table name configured in the catalog.|yes|
|namespace|The Iceberg namespace associated with the table|yes|
|icebergFilter|The JSON Object that filters data files within a snapshot|no|
|icebergCatalog|The JSON Object used to define the catalog that manages the configured Iceberg table|yes|
|warehouseSource|The JSON Object that defines the native input source for reading the data files from the warehouse|yes|
|namespace|The Iceberg namespace associated with the table.|yes|
|icebergFilter|The JSON Object that filters data files within a snapshot.|no|
|icebergCatalog|The JSON Object used to define the catalog that manages the configured Iceberg table.|yes|
|warehouseSource|The JSON Object that defines the native input source for reading the data files from the warehouse.|yes|
|snapshotTime|Timestamp in ISO8601 DateTime format that will be used to fetch the most recent snapshot as of this time.|no|

###Catalog Object

Expand Down
2 changes: 1 addition & 1 deletion docs/querying/math-expr.md
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ JSON functions provide facilities to extract, transform, and create `COMPLEX<jso

| function | description |
|---|---|
| json_value(expr, path[, type]) | Extract a Druid literal (`STRING`, `LONG`, `DOUBLE`) value from `expr` using JSONPath syntax of `path`. The optional `type` argument can be set to `'LONG'`,`'DOUBLE'` or `'STRING'` to cast values to that type. |
| json_value(expr, path[, type]) | Extract a Druid literal (`STRING`, `LONG`, `DOUBLE`, `ARRAY<STRING>`, `ARRAY<LONG>`, or `ARRAY<DOUBLE>`) value from `expr` using JSONPath syntax of `path`. The optional `type` argument can be set to `'LONG'`,`'DOUBLE'`, `'STRING'`, `'ARRAY<LONG>'`, `'ARRAY<DOUBLE>'`, or `'ARRAY<STRING>'` to cast values to that type. |
| json_query(expr, path) | Extract a `COMPLEX<json>` value from `expr` using JSONPath syntax of `path` |
| json_object(expr1, expr2[, expr3, expr4 ...]) | Construct a `COMPLEX<json>` with alternating 'key' and 'value' arguments|
| parse_json(expr) | Deserialize a JSON `STRING` into a `COMPLEX<json>`. If the input is not a `STRING` or it is invalid JSON, this function will result in an error.|
Expand Down
10 changes: 6 additions & 4 deletions docs/querying/query-from-deep-storage.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,10 @@ Submitting a query from deep storage uses the same syntax as any other Druid SQL

Generally, the request body fields are the same between the `sql` and `sql/statements` endpoints.

There are additional context parameters for `sql/statements` specifically:
Apart from the context parameters mentioned [here](../multi-stage-query/reference.md#context-parameters) there are additional context parameters for `sql/statements`:

- `executionMode` (required) determines how query results are fetched. Set this to `ASYNC`.
- `selectDestination` (optional) set to `durableStorage` instructs Druid to write the results from SELECT queries to durable storage. Note that this requires you to have [durable storage for MSQ enabled](../operations/durable-storage.md).
- `selectDestination` (optional) set to `durableStorage` instructs Druid to write the results of SELECT queries to durable storage. For result sets with more than 3000 rows, it is highly recommended to use `durableStorage`. Note that this requires you to have [durable storage for MSQ enabled](../operations/durable-storage.md).

The following sample query includes the two additional context parameters that querying from deep storage supports:

Expand Down Expand Up @@ -182,12 +182,14 @@ Only the user who submitted a query can retrieve the results for the query.
Use the following endpoint to retrieve results:

```
GET https://ROUTER:8888/druid/v2/sql/statements/QUERYID/results?page=PAGENUMBER&size=RESULT_SIZE&timeout=TIMEOUT_MS
GET https://ROUTER:8888/druid/v2/sql/statements/QUERYID/results?page=PAGENUMBER&resultFormat=FORMAT
```

Results are returned in JSON format.

You can use the optional `page`, `size`, and `timeout` parameters to refine your results. You can retrieve the `page` information for your results by fetching the status of the completed query.
You can use the optional `page` parameter to refine your results, and `resultFormat` parameter to define the format in which the results will be presented.
* You can retrieve the `page` information for your results by fetching the status of the completed query.
* For `resultFormat` the following options are supported `arrayLines`,`objectLines`,`array`,`object`, and `csv`. Default value is `object`. More documentation present [here](../api-reference/sql-api.md#request-body).

When you try to get results for a query from deep storage, you may receive an error that states the query is still running. Wait until the query completes before you try again.

Expand Down
2 changes: 1 addition & 1 deletion docs/querying/sql-data-types.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ When `druid.generic.useDefaultValueForNull = true` (legacy mode), Druid instead

Druid supports [`ARRAY` types](arrays.md), which behave as standard SQL arrays, where results are grouped by matching entire arrays. The [`UNNEST` operator](./sql-array-functions.md#unn) can be used to perform operations on individual array elements, translating each element into a separate row.

`ARRAY` typed columns can be stored in segments with class JSON based ingestion using the 'auto' typed dimension schema shared with [schema auto-discovery](../ingestion/schema-design.md#schema-auto-discovery-for-dimensions) to detect and ingest arrays as ARRAY typed columns. For [SQL based ingestion](../multi-stage-query/index.md), the query context parameter `arrayIngestMode` must be specified as `"array"` to ingest ARRAY types. In Druid 28, the default mode for this parameter is `"mvd"` for backwards compatibility, which instead can only handle `ARRAY<STRING>` which it stores in [multi-value string columns](#multi-value-strings).
`ARRAY` typed columns can be stored in segments with JSON-based ingestion using the 'auto' typed dimension schema shared with [schema auto-discovery](../ingestion/schema-design.md#schema-auto-discovery-for-dimensions) to detect and ingest arrays as ARRAY typed columns. For [SQL based ingestion](../multi-stage-query/index.md), the query context parameter `arrayIngestMode` must be specified as `"array"` to ingest ARRAY types. In Druid 28, the default mode for this parameter is `"mvd"` for backwards compatibility, which instead can only handle `ARRAY<STRING>` which it stores in [multi-value string columns](#multi-value-strings).

You can convert multi-value dimensions to standard SQL arrays explicitly with `MV_TO_ARRAY` or implicitly using [array functions](./sql-array-functions.md). You can also use the array functions to construct arrays from multiple columns.

Expand Down
2 changes: 1 addition & 1 deletion docs/querying/sql-functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ Computes approximate quantiles on fixed buckets histogram column or a regular nu

`ARRAY[expr1, expr2, ...]`

**Function type:** [Multi-value string](sql-multivalue-string-functions.md)
**Function type:** [Array](sql-array-functions.md)

Constructs a SQL ARRAY literal from the expression arguments. The arguments must be of the same type.

Expand Down
2 changes: 1 addition & 1 deletion extensions-contrib/druid-iceberg-extensions/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
<modelVersion>4.0.0</modelVersion>

<properties>
<iceberg.core.version>1.4.0</iceberg.core.version>
<iceberg.core.version>1.4.1</iceberg.core.version>
<hive.version>3.1.3</hive.version>
</properties>
<dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.io.CloseableIterable;
import org.joda.time.DateTime;

import java.util.ArrayList;
import java.util.List;
Expand All @@ -54,12 +55,15 @@ public abstract class IcebergCatalog
*
* @param tableNamespace The catalog namespace under which the table is defined
* @param tableName The iceberg table name
* @param icebergFilter The iceberg filter that needs to be applied before reading the files
* @param snapshotTime Datetime that will be used to fetch the most recent snapshot as of this time
* @return a list of data file paths
*/
public List<String> extractSnapshotDataFiles(
String tableNamespace,
String tableName,
IcebergFilter icebergFilter
IcebergFilter icebergFilter,
DateTime snapshotTime
)
{
Catalog catalog = retrieveCatalog();
Expand All @@ -85,7 +89,9 @@ public List<String> extractSnapshotDataFiles(
if (icebergFilter != null) {
tableScan = icebergFilter.filter(tableScan);
}

if (snapshotTime != null) {
tableScan = tableScan.asOfTime(snapshotTime.getMillis());
}
CloseableIterable<FileScanTask> tasks = tableScan.planFiles();
CloseableIterable.transform(tasks, FileScanTask::file)
.forEach(dataFile -> dataFilePaths.add(dataFile.path().toString()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.druid.iceberg.filter.IcebergFilter;
import org.apache.druid.java.util.common.CloseableIterators;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.joda.time.DateTime;

import javax.annotation.Nullable;
import java.io.File;
Expand Down Expand Up @@ -68,6 +69,9 @@ public class IcebergInputSource implements SplittableInputSource<List<String>>
@JsonProperty
private InputSourceFactory warehouseSource;

@JsonProperty
private final DateTime snapshotTime;

private boolean isLoaded = false;

private SplittableInputSource delegateInputSource;
Expand All @@ -78,14 +82,16 @@ public IcebergInputSource(
@JsonProperty("namespace") String namespace,
@JsonProperty("icebergFilter") @Nullable IcebergFilter icebergFilter,
@JsonProperty("icebergCatalog") IcebergCatalog icebergCatalog,
@JsonProperty("warehouseSource") InputSourceFactory warehouseSource
@JsonProperty("warehouseSource") InputSourceFactory warehouseSource,
@JsonProperty("snapshotTime") @Nullable DateTime snapshotTime
)
{
this.tableName = Preconditions.checkNotNull(tableName, "tableName cannot be null");
this.namespace = Preconditions.checkNotNull(namespace, "namespace cannot be null");
this.icebergCatalog = Preconditions.checkNotNull(icebergCatalog, "icebergCatalog cannot be null");
this.icebergFilter = icebergFilter;
this.warehouseSource = Preconditions.checkNotNull(warehouseSource, "warehouseSource cannot be null");
this.snapshotTime = snapshotTime;
}

@Override
Expand Down Expand Up @@ -164,6 +170,13 @@ public IcebergFilter getIcebergFilter()
return icebergFilter;
}

@Nullable
@JsonProperty
public DateTime getSnapshotTime()
{
return snapshotTime;
}

public SplittableInputSource getDelegateInputSource()
{
return delegateInputSource;
Expand All @@ -174,7 +187,8 @@ protected void retrieveIcebergDatafiles()
List<String> snapshotDataFiles = icebergCatalog.extractSnapshotDataFiles(
getNamespace(),
getTableName(),
getIcebergFilter()
getIcebergFilter(),
getSnapshotTime()
);
if (snapshotDataFiles.isEmpty()) {
delegateInputSource = new EmptyInputSource();
Expand Down
Loading

0 comments on commit e1faba3

Please sign in to comment.