diff --git a/docs/api-reference/supervisor-api.md b/docs/api-reference/supervisor-api.md new file mode 100644 index 000000000000..b315971ec207 --- /dev/null +++ b/docs/api-reference/supervisor-api.md @@ -0,0 +1,3359 @@ +--- +id: supervisor-api +title: Supervisor API +sidebar_label: Supervisors +--- +import Tabs from '@theme/Tabs'; +import TabItem from '@theme/TabItem'; + + + + +This topic describes the API endpoints to manage and monitor supervisors for Apache Druid. + +In this topic, `http://ROUTER_IP:ROUTER_PORT` is a placeholder for your Router service address and port. Replace it with the information for your deployment. For example, use `http://localhost:8888` for quickstart deployments. + +## Supervisor information + +The following table lists the properties of a supervisor object: + +|Property|Type|Description| +|---|---|---| +|`id`|String|Unique identifier.| +|`state`|String|Generic state of the supervisor. Available states:`UNHEALTHY_SUPERVISOR`, `UNHEALTHY_TASKS`, `PENDING`, `RUNNING`, `SUSPENDED`, `STOPPING`. See [Apache Kafka operations](../development/extensions-core/kafka-supervisor-operations.md) for details.| +|`detailedState`|String|Detailed state of the supervisor. This property contains a more descriptive, implementation-specific state that may provide more insight into the supervisor's activities than the `state` property. See [Apache Kafka ingestion](../development/extensions-core/kafka-ingestion.md) and [Amazon Kinesis ingestion](../development/extensions-core/kinesis-ingestion.md) for supervisor-specific states.| +|`healthy`|Boolean|Supervisor health indicator.| +|`spec`|Object|Container object for the supervisor configuration.| +|`suspended`|Boolean|Indicates whether the supervisor is in a suspended state.| + +### Get an array of active supervisor IDs + +Returns an array of strings representing the names of active supervisors. If there are no active supervisors, it returns an empty array. + +#### URL + +GET /druid/indexer/v1/supervisor + +#### Responses + + + + + + +*Successfully retrieved array of active supervisor IDs* + + + + +--- + +#### Sample request + + + + + + +```shell +curl "http://ROUTER_IP:ROUTER_PORT/druid/indexer/v1/supervisor" +``` + + + + + +```HTTP +GET /druid/indexer/v1/supervisor HTTP/1.1 +Host: http://ROUTER_IP:ROUTER_PORT +``` + + + + +#### Sample response + +
+ Click to show sample response + + ```json + [ + "wikipedia_stream", + "social_media" + ] + ``` +
+ +### Get an array of active supervisor objects + +Retrieves an array of active supervisor objects. If there are no active supervisors, it returns an empty array. For reference on the supervisor object properties, see the preceding [table](#supervisor-information). + +#### URL + +GET /druid/indexer/v1/supervisor?full + +#### Responses + + + + + + +*Successfully retrieved supervisor objects* + + + + +--- + +#### Sample request + + + + + + +```shell +curl "http://ROUTER_IP:ROUTER_PORT/druid/indexer/v1/supervisor?full=null" +``` + + + + + +```HTTP +GET /druid/indexer/v1/supervisor?full=null HTTP/1.1 +Host: http://ROUTER_IP:ROUTER_PORT +``` + + + + +#### Sample response + +
+ Click to show sample response + + ```json + [ + { + "id": "wikipedia_stream", + "state": "RUNNING", + "detailedState": "CONNECTING_TO_STREAM", + "healthy": true, + "spec": { + "type": "kafka", + "spec": { + "dataSchema": { + "dataSource": "wikipedia_stream", + "timestampSpec": { + "column": "__time", + "format": "iso", + "missingValue": null + }, + "dimensionsSpec": { + "dimensions": [ + { + "type": "string", + "name": "username", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": true + }, + { + "type": "string", + "name": "post_title", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": true + }, + { + "type": "long", + "name": "views", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": false + }, + { + "type": "long", + "name": "upvotes", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": false + }, + { + "type": "long", + "name": "comments", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": false + }, + { + "type": "string", + "name": "edited", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": true + } + ], + "dimensionExclusions": [ + "__time" + ], + "includeAllDimensions": false, + "useSchemaDiscovery": false + }, + "metricsSpec": [], + "granularitySpec": { + "type": "uniform", + "segmentGranularity": "HOUR", + "queryGranularity": { + "type": "none" + }, + "rollup": false, + "intervals": [] + }, + "transformSpec": { + "filter": null, + "transforms": [] + } + }, + "ioConfig": { + "topic": "social_media", + "inputFormat": { + "type": "json", + "keepNullColumns": false, + "assumeNewlineDelimited": false, + "useJsonNodeReader": false + }, + "replicas": 1, + "taskCount": 1, + "taskDuration": "PT3600S", + "consumerProperties": { + "bootstrap.servers": "localhost:9042" + }, + "autoScalerConfig": null, + "pollTimeout": 100, + "startDelay": "PT5S", + "period": "PT30S", + "useEarliestOffset": true, + "completionTimeout": "PT1800S", + "lateMessageRejectionPeriod": null, + "earlyMessageRejectionPeriod": null, + "lateMessageRejectionStartDateTime": null, + "configOverrides": null, + "idleConfig": null, + "stream": "social_media", + "useEarliestSequenceNumber": true + }, + "tuningConfig": { + "type": "kafka", + "appendableIndexSpec": { + "type": "onheap", + "preserveExistingMetrics": false + }, + "maxRowsInMemory": 150000, + "maxBytesInMemory": 0, + "skipBytesInMemoryOverheadCheck": false, + "maxRowsPerSegment": 5000000, + "maxTotalRows": null, + "intermediatePersistPeriod": "PT10M", + "maxPendingPersists": 0, + "indexSpec": { + "bitmap": { + "type": "roaring" + }, + "dimensionCompression": "lz4", + "stringDictionaryEncoding": { + "type": "utf8" + }, + "metricCompression": "lz4", + "longEncoding": "longs" + }, + "indexSpecForIntermediatePersists": { + "bitmap": { + "type": "roaring" + }, + "dimensionCompression": "lz4", + "stringDictionaryEncoding": { + "type": "utf8" + }, + "metricCompression": "lz4", + "longEncoding": "longs" + }, + "reportParseExceptions": false, + "handoffConditionTimeout": 0, + "resetOffsetAutomatically": false, + "segmentWriteOutMediumFactory": null, + "workerThreads": null, + "chatRetries": 8, + "httpTimeout": "PT10S", + "shutdownTimeout": "PT80S", + "offsetFetchPeriod": "PT30S", + "intermediateHandoffPeriod": "P2147483647D", + "logParseExceptions": false, + "maxParseExceptions": 2147483647, + "maxSavedParseExceptions": 0, + "skipSequenceNumberAvailabilityCheck": false, + "repartitionTransitionDuration": "PT120S" + } + }, + "dataSchema": { + "dataSource": "wikipedia_stream", + "timestampSpec": { + "column": "__time", + "format": "iso", + "missingValue": null + }, + "dimensionsSpec": { + "dimensions": [ + { + "type": "string", + "name": "username", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": true + }, + { + "type": "string", + "name": "post_title", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": true + }, + { + "type": "long", + "name": "views", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": false + }, + { + "type": "long", + "name": "upvotes", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": false + }, + { + "type": "long", + "name": "comments", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": false + }, + { + "type": "string", + "name": "edited", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": true + } + ], + "dimensionExclusions": [ + "__time" + ], + "includeAllDimensions": false, + "useSchemaDiscovery": false + }, + "metricsSpec": [], + "granularitySpec": { + "type": "uniform", + "segmentGranularity": "HOUR", + "queryGranularity": { + "type": "none" + }, + "rollup": false, + "intervals": [] + }, + "transformSpec": { + "filter": null, + "transforms": [] + } + }, + "tuningConfig": { + "type": "kafka", + "appendableIndexSpec": { + "type": "onheap", + "preserveExistingMetrics": false + }, + "maxRowsInMemory": 150000, + "maxBytesInMemory": 0, + "skipBytesInMemoryOverheadCheck": false, + "maxRowsPerSegment": 5000000, + "maxTotalRows": null, + "intermediatePersistPeriod": "PT10M", + "maxPendingPersists": 0, + "indexSpec": { + "bitmap": { + "type": "roaring" + }, + "dimensionCompression": "lz4", + "stringDictionaryEncoding": { + "type": "utf8" + }, + "metricCompression": "lz4", + "longEncoding": "longs" + }, + "indexSpecForIntermediatePersists": { + "bitmap": { + "type": "roaring" + }, + "dimensionCompression": "lz4", + "stringDictionaryEncoding": { + "type": "utf8" + }, + "metricCompression": "lz4", + "longEncoding": "longs" + }, + "reportParseExceptions": false, + "handoffConditionTimeout": 0, + "resetOffsetAutomatically": false, + "segmentWriteOutMediumFactory": null, + "workerThreads": null, + "chatRetries": 8, + "httpTimeout": "PT10S", + "shutdownTimeout": "PT80S", + "offsetFetchPeriod": "PT30S", + "intermediateHandoffPeriod": "P2147483647D", + "logParseExceptions": false, + "maxParseExceptions": 2147483647, + "maxSavedParseExceptions": 0, + "skipSequenceNumberAvailabilityCheck": false, + "repartitionTransitionDuration": "PT120S" + }, + "ioConfig": { + "topic": "social_media", + "inputFormat": { + "type": "json", + "keepNullColumns": false, + "assumeNewlineDelimited": false, + "useJsonNodeReader": false + }, + "replicas": 1, + "taskCount": 1, + "taskDuration": "PT3600S", + "consumerProperties": { + "bootstrap.servers": "localhost:9042" + }, + "autoScalerConfig": null, + "pollTimeout": 100, + "startDelay": "PT5S", + "period": "PT30S", + "useEarliestOffset": true, + "completionTimeout": "PT1800S", + "lateMessageRejectionPeriod": null, + "earlyMessageRejectionPeriod": null, + "lateMessageRejectionStartDateTime": null, + "configOverrides": null, + "idleConfig": null, + "stream": "social_media", + "useEarliestSequenceNumber": true + }, + "context": null, + "suspended": false + }, + "suspended": false + }, + { + "id": "social_media", + "state": "RUNNING", + "detailedState": "RUNNING", + "healthy": true, + "spec": { + "type": "kafka", + "spec": { + "dataSchema": { + "dataSource": "social_media", + "timestampSpec": { + "column": "__time", + "format": "iso", + "missingValue": null + }, + "dimensionsSpec": { + "dimensions": [ + { + "type": "string", + "name": "username", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": true + }, + { + "type": "string", + "name": "post_title", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": true + }, + { + "type": "long", + "name": "views", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": false + }, + { + "type": "long", + "name": "upvotes", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": false + }, + { + "type": "long", + "name": "comments", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": false + }, + { + "type": "string", + "name": "edited", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": true + } + ], + "dimensionExclusions": [ + "__time" + ], + "includeAllDimensions": false, + "useSchemaDiscovery": false + }, + "metricsSpec": [], + "granularitySpec": { + "type": "uniform", + "segmentGranularity": "HOUR", + "queryGranularity": { + "type": "none" + }, + "rollup": false, + "intervals": [] + }, + "transformSpec": { + "filter": null, + "transforms": [] + } + }, + "ioConfig": { + "topic": "social_media", + "inputFormat": { + "type": "json", + "keepNullColumns": false, + "assumeNewlineDelimited": false, + "useJsonNodeReader": false + }, + "replicas": 1, + "taskCount": 1, + "taskDuration": "PT3600S", + "consumerProperties": { + "bootstrap.servers": "localhost:9094" + }, + "autoScalerConfig": null, + "pollTimeout": 100, + "startDelay": "PT5S", + "period": "PT30S", + "useEarliestOffset": true, + "completionTimeout": "PT1800S", + "lateMessageRejectionPeriod": null, + "earlyMessageRejectionPeriod": null, + "lateMessageRejectionStartDateTime": null, + "configOverrides": null, + "idleConfig": null, + "stream": "social_media", + "useEarliestSequenceNumber": true + }, + "tuningConfig": { + "type": "kafka", + "appendableIndexSpec": { + "type": "onheap", + "preserveExistingMetrics": false + }, + "maxRowsInMemory": 150000, + "maxBytesInMemory": 0, + "skipBytesInMemoryOverheadCheck": false, + "maxRowsPerSegment": 5000000, + "maxTotalRows": null, + "intermediatePersistPeriod": "PT10M", + "maxPendingPersists": 0, + "indexSpec": { + "bitmap": { + "type": "roaring" + }, + "dimensionCompression": "lz4", + "stringDictionaryEncoding": { + "type": "utf8" + }, + "metricCompression": "lz4", + "longEncoding": "longs" + }, + "indexSpecForIntermediatePersists": { + "bitmap": { + "type": "roaring" + }, + "dimensionCompression": "lz4", + "stringDictionaryEncoding": { + "type": "utf8" + }, + "metricCompression": "lz4", + "longEncoding": "longs" + }, + "reportParseExceptions": false, + "handoffConditionTimeout": 0, + "resetOffsetAutomatically": false, + "segmentWriteOutMediumFactory": null, + "workerThreads": null, + "chatRetries": 8, + "httpTimeout": "PT10S", + "shutdownTimeout": "PT80S", + "offsetFetchPeriod": "PT30S", + "intermediateHandoffPeriod": "P2147483647D", + "logParseExceptions": false, + "maxParseExceptions": 2147483647, + "maxSavedParseExceptions": 0, + "skipSequenceNumberAvailabilityCheck": false, + "repartitionTransitionDuration": "PT120S" + } + }, + "dataSchema": { + "dataSource": "social_media", + "timestampSpec": { + "column": "__time", + "format": "iso", + "missingValue": null + }, + "dimensionsSpec": { + "dimensions": [ + { + "type": "string", + "name": "username", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": true + }, + { + "type": "string", + "name": "post_title", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": true + }, + { + "type": "long", + "name": "views", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": false + }, + { + "type": "long", + "name": "upvotes", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": false + }, + { + "type": "long", + "name": "comments", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": false + }, + { + "type": "string", + "name": "edited", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": true + } + ], + "dimensionExclusions": [ + "__time" + ], + "includeAllDimensions": false, + "useSchemaDiscovery": false + }, + "metricsSpec": [], + "granularitySpec": { + "type": "uniform", + "segmentGranularity": "HOUR", + "queryGranularity": { + "type": "none" + }, + "rollup": false, + "intervals": [] + }, + "transformSpec": { + "filter": null, + "transforms": [] + } + }, + "tuningConfig": { + "type": "kafka", + "appendableIndexSpec": { + "type": "onheap", + "preserveExistingMetrics": false + }, + "maxRowsInMemory": 150000, + "maxBytesInMemory": 0, + "skipBytesInMemoryOverheadCheck": false, + "maxRowsPerSegment": 5000000, + "maxTotalRows": null, + "intermediatePersistPeriod": "PT10M", + "maxPendingPersists": 0, + "indexSpec": { + "bitmap": { + "type": "roaring" + }, + "dimensionCompression": "lz4", + "stringDictionaryEncoding": { + "type": "utf8" + }, + "metricCompression": "lz4", + "longEncoding": "longs" + }, + "indexSpecForIntermediatePersists": { + "bitmap": { + "type": "roaring" + }, + "dimensionCompression": "lz4", + "stringDictionaryEncoding": { + "type": "utf8" + }, + "metricCompression": "lz4", + "longEncoding": "longs" + }, + "reportParseExceptions": false, + "handoffConditionTimeout": 0, + "resetOffsetAutomatically": false, + "segmentWriteOutMediumFactory": null, + "workerThreads": null, + "chatRetries": 8, + "httpTimeout": "PT10S", + "shutdownTimeout": "PT80S", + "offsetFetchPeriod": "PT30S", + "intermediateHandoffPeriod": "P2147483647D", + "logParseExceptions": false, + "maxParseExceptions": 2147483647, + "maxSavedParseExceptions": 0, + "skipSequenceNumberAvailabilityCheck": false, + "repartitionTransitionDuration": "PT120S" + }, + "ioConfig": { + "topic": "social_media", + "inputFormat": { + "type": "json", + "keepNullColumns": false, + "assumeNewlineDelimited": false, + "useJsonNodeReader": false + }, + "replicas": 1, + "taskCount": 1, + "taskDuration": "PT3600S", + "consumerProperties": { + "bootstrap.servers": "localhost:9094" + }, + "autoScalerConfig": null, + "pollTimeout": 100, + "startDelay": "PT5S", + "period": "PT30S", + "useEarliestOffset": true, + "completionTimeout": "PT1800S", + "lateMessageRejectionPeriod": null, + "earlyMessageRejectionPeriod": null, + "lateMessageRejectionStartDateTime": null, + "configOverrides": null, + "idleConfig": null, + "stream": "social_media", + "useEarliestSequenceNumber": true + }, + "context": null, + "suspended": false + }, + "suspended": false + } + ] + ``` +
+ +### Get an array of supervisor states + +Retrieves an array of objects representing active supervisors and their current state. If there are no active supervisors, it returns an empty array. For reference on the supervisor object properties, see the preceding [table](#supervisor-information). + +#### URL + +GET /druid/indexer/v1/supervisor?state=true + +#### Responses + + + + + + +*Successfully retrieved supervisor state objects* + + + + +--- + +#### Sample request + + + + + + +```shell +curl "http://ROUTER_IP:ROUTER_PORT/druid/indexer/v1/supervisor?state=true" +``` + + + + + +```HTTP +GET /druid/indexer/v1/supervisor?state=true HTTP/1.1 +Host: http://ROUTER_IP:ROUTER_PORT +``` + + + + +#### Sample response + +
+ Click to show sample response + + ```json + [ + { + "id": "wikipedia_stream", + "state": "UNHEALTHY_SUPERVISOR", + "detailedState": "UNABLE_TO_CONNECT_TO_STREAM", + "healthy": false, + "suspended": false + }, + { + "id": "social_media", + "state": "RUNNING", + "detailedState": "RUNNING", + "healthy": true, + "suspended": false + } + ] + ``` + +
+ +### Get supervisor specification + +Retrieves the specification for a single supervisor. The returned specification includes the `dataSchema`, `ioConfig`, and `tuningConfig` objects. + +#### URL + +GET /druid/indexer/v1/supervisor/:supervisorId + +#### Responses + + + + + + +*Successfully retrieved supervisor spec* + + + + + +*Invalid supervisor ID* + + + + +--- + +#### Sample request + +The following example shows how to retrieve the specification of a supervisor with the name `wikipedia_stream`. + + + + + + +```shell +curl "http://ROUTER_IP:ROUTER_PORT/druid/indexer/v1/supervisor/wikipedia_stream" +``` + + + + + +```HTTP +GET /druid/indexer/v1/supervisor/wikipedia_stream HTTP/1.1 +Host: http://ROUTER_IP:ROUTER_PORT +``` + + + + + +#### Sample response + +
+ Click to show sample response + + ```json +{ + "type": "kafka", + "spec": { + "dataSchema": { + "dataSource": "social_media", + "timestampSpec": { + "column": "__time", + "format": "iso", + "missingValue": null + }, + "dimensionsSpec": { + "dimensions": [ + { + "type": "string", + "name": "username", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": true + }, + { + "type": "string", + "name": "post_title", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": true + }, + { + "type": "long", + "name": "views", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": false + }, + { + "type": "long", + "name": "upvotes", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": false + }, + { + "type": "long", + "name": "comments", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": false + }, + { + "type": "string", + "name": "edited", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": true + } + ], + "dimensionExclusions": [ + "__time" + ], + "includeAllDimensions": false, + "useSchemaDiscovery": false + }, + "metricsSpec": [], + "granularitySpec": { + "type": "uniform", + "segmentGranularity": "HOUR", + "queryGranularity": { + "type": "none" + }, + "rollup": false, + "intervals": [] + }, + "transformSpec": { + "filter": null, + "transforms": [] + } + }, + "ioConfig": { + "topic": "social_media", + "inputFormat": { + "type": "json", + "keepNullColumns": false, + "assumeNewlineDelimited": false, + "useJsonNodeReader": false + }, + "replicas": 1, + "taskCount": 1, + "taskDuration": "PT3600S", + "consumerProperties": { + "bootstrap.servers": "localhost:9094" + }, + "autoScalerConfig": null, + "pollTimeout": 100, + "startDelay": "PT5S", + "period": "PT30S", + "useEarliestOffset": true, + "completionTimeout": "PT1800S", + "lateMessageRejectionPeriod": null, + "earlyMessageRejectionPeriod": null, + "lateMessageRejectionStartDateTime": null, + "configOverrides": null, + "idleConfig": null, + "stream": "social_media", + "useEarliestSequenceNumber": true + }, + "tuningConfig": { + "type": "kafka", + "appendableIndexSpec": { + "type": "onheap", + "preserveExistingMetrics": false + }, + "maxRowsInMemory": 150000, + "maxBytesInMemory": 0, + "skipBytesInMemoryOverheadCheck": false, + "maxRowsPerSegment": 5000000, + "maxTotalRows": null, + "intermediatePersistPeriod": "PT10M", + "maxPendingPersists": 0, + "indexSpec": { + "bitmap": { + "type": "roaring" + }, + "dimensionCompression": "lz4", + "stringDictionaryEncoding": { + "type": "utf8" + }, + "metricCompression": "lz4", + "longEncoding": "longs" + }, + "indexSpecForIntermediatePersists": { + "bitmap": { + "type": "roaring" + }, + "dimensionCompression": "lz4", + "stringDictionaryEncoding": { + "type": "utf8" + }, + "metricCompression": "lz4", + "longEncoding": "longs" + }, + "reportParseExceptions": false, + "handoffConditionTimeout": 0, + "resetOffsetAutomatically": false, + "segmentWriteOutMediumFactory": null, + "workerThreads": null, + "chatRetries": 8, + "httpTimeout": "PT10S", + "shutdownTimeout": "PT80S", + "offsetFetchPeriod": "PT30S", + "intermediateHandoffPeriod": "P2147483647D", + "logParseExceptions": false, + "maxParseExceptions": 2147483647, + "maxSavedParseExceptions": 0, + "skipSequenceNumberAvailabilityCheck": false, + "repartitionTransitionDuration": "PT120S" + } + }, + "dataSchema": { + "dataSource": "social_media", + "timestampSpec": { + "column": "__time", + "format": "iso", + "missingValue": null + }, + "dimensionsSpec": { + "dimensions": [ + { + "type": "string", + "name": "username", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": true + }, + { + "type": "string", + "name": "post_title", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": true + }, + { + "type": "long", + "name": "views", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": false + }, + { + "type": "long", + "name": "upvotes", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": false + }, + { + "type": "long", + "name": "comments", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": false + }, + { + "type": "string", + "name": "edited", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": true + } + ], + "dimensionExclusions": [ + "__time" + ], + "includeAllDimensions": false, + "useSchemaDiscovery": false + }, + "metricsSpec": [], + "granularitySpec": { + "type": "uniform", + "segmentGranularity": "HOUR", + "queryGranularity": { + "type": "none" + }, + "rollup": false, + "intervals": [] + }, + "transformSpec": { + "filter": null, + "transforms": [] + } + }, + "tuningConfig": { + "type": "kafka", + "appendableIndexSpec": { + "type": "onheap", + "preserveExistingMetrics": false + }, + "maxRowsInMemory": 150000, + "maxBytesInMemory": 0, + "skipBytesInMemoryOverheadCheck": false, + "maxRowsPerSegment": 5000000, + "maxTotalRows": null, + "intermediatePersistPeriod": "PT10M", + "maxPendingPersists": 0, + "indexSpec": { + "bitmap": { + "type": "roaring" + }, + "dimensionCompression": "lz4", + "stringDictionaryEncoding": { + "type": "utf8" + }, + "metricCompression": "lz4", + "longEncoding": "longs" + }, + "indexSpecForIntermediatePersists": { + "bitmap": { + "type": "roaring" + }, + "dimensionCompression": "lz4", + "stringDictionaryEncoding": { + "type": "utf8" + }, + "metricCompression": "lz4", + "longEncoding": "longs" + }, + "reportParseExceptions": false, + "handoffConditionTimeout": 0, + "resetOffsetAutomatically": false, + "segmentWriteOutMediumFactory": null, + "workerThreads": null, + "chatRetries": 8, + "httpTimeout": "PT10S", + "shutdownTimeout": "PT80S", + "offsetFetchPeriod": "PT30S", + "intermediateHandoffPeriod": "P2147483647D", + "logParseExceptions": false, + "maxParseExceptions": 2147483647, + "maxSavedParseExceptions": 0, + "skipSequenceNumberAvailabilityCheck": false, + "repartitionTransitionDuration": "PT120S" + }, + "ioConfig": { + "topic": "social_media", + "inputFormat": { + "type": "json", + "keepNullColumns": false, + "assumeNewlineDelimited": false, + "useJsonNodeReader": false + }, + "replicas": 1, + "taskCount": 1, + "taskDuration": "PT3600S", + "consumerProperties": { + "bootstrap.servers": "localhost:9094" + }, + "autoScalerConfig": null, + "pollTimeout": 100, + "startDelay": "PT5S", + "period": "PT30S", + "useEarliestOffset": true, + "completionTimeout": "PT1800S", + "lateMessageRejectionPeriod": null, + "earlyMessageRejectionPeriod": null, + "lateMessageRejectionStartDateTime": null, + "configOverrides": null, + "idleConfig": null, + "stream": "social_media", + "useEarliestSequenceNumber": true + }, + "context": null, + "suspended": false +} + ``` +
+ +### Get supervisor status + +Retrieves the current status report for a single supervisor. The report contains the state of the supervisor tasks and an array of recently thrown exceptions. + +For additional information about the status report, see the topic for each streaming ingestion methods: +* [Amazon Kinesis](../development/extensions-core/kinesis-ingestion.md#get-supervisor-status-report) +* [Apache Kafka](../development/extensions-core/kafka-supervisor-operations.md#getting-supervisor-status-report) + +#### URL + +GET /druid/indexer/v1/supervisor/:supervisorId/status + +#### Responses + + + + + + +*Successfully retrieved supervisor status* + + + + + +*Invalid supervisor ID* + + + + +--- + +#### Sample request + +The following example shows how to retrieve the status of a supervisor with the name `social_media`. + + + + + + +```shell +curl "http://ROUTER_IP:ROUTER_PORT/druid/indexer/v1/supervisor/social_media/status" +``` + + + + + +```HTTP +GET /druid/indexer/v1/supervisor/social_media/status HTTP/1.1 +Host: http://ROUTER_IP:ROUTER_PORT +``` + + + + +#### Sample response + +
+ Click to show sample response + + ```json + { + "id": "social_media", + "generationTime": "2023-07-05T23:24:43.934Z", + "payload": { + "dataSource": "social_media", + "stream": "social_media", + "partitions": 1, + "replicas": 1, + "durationSeconds": 3600, + "activeTasks": [ + { + "id": "index_kafka_social_media_ab72ae4127c591c_flcbhdlh", + "startingOffsets": { + "0": 3176381 + }, + "startTime": "2023-07-05T23:21:39.321Z", + "remainingSeconds": 3415, + "type": "ACTIVE", + "currentOffsets": { + "0": 3296632 + }, + "lag": { + "0": 3 + } + } + ], + "publishingTasks": [], + "latestOffsets": { + "0": 3296635 + }, + "minimumLag": { + "0": 3 + }, + "aggregateLag": 3, + "offsetsLastUpdated": "2023-07-05T23:24:30.212Z", + "suspended": false, + "healthy": true, + "state": "RUNNING", + "detailedState": "RUNNING", + "recentErrors": [] + } + } + ``` +
+ +## Audit history + +An audit history provides a comprehensive log of events, including supervisor configuration, creation, suspension, and modification history. + +### Get audit history for all supervisors + +Retrieve an audit history of specs for all supervisors. + +#### URL + +GET /druid/indexer/v1/supervisor/history + +#### Responses + + + + + + +*Successfully retrieved audit history* + + + + +--- + +#### Sample request + + + + + + +```shell +curl "http://ROUTER_IP:ROUTER_PORT/druid/indexer/v1/supervisor/history" +``` + + + + + +```HTTP +GET /druid/indexer/v1/supervisor/history HTTP/1.1 +Host: http://ROUTER_IP:ROUTER_PORT +``` + + + + +#### Sample response + +
+ Click to show sample response + + ```json +{ + "social_media": [ + { + "spec": { + "type": "kafka", + "spec": { + "dataSchema": { + "dataSource": "social_media", + "timestampSpec": { + "column": "__time", + "format": "iso", + "missingValue": null + }, + "dimensionsSpec": { + "dimensions": [ + { + "type": "string", + "name": "username", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": true + }, + { + "type": "string", + "name": "post_title", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": true + }, + { + "type": "long", + "name": "views", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": false + }, + { + "type": "long", + "name": "upvotes", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": false + }, + { + "type": "long", + "name": "comments", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": false + }, + { + "type": "string", + "name": "edited", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": true + } + ], + "dimensionExclusions": [ + "__time" + ], + "includeAllDimensions": false, + "useSchemaDiscovery": false + }, + "metricsSpec": [], + "granularitySpec": { + "type": "uniform", + "segmentGranularity": "HOUR", + "queryGranularity": { + "type": "none" + }, + "rollup": false, + "intervals": [] + }, + "transformSpec": { + "filter": null, + "transforms": [] + } + }, + "ioConfig": { + "topic": "social_media", + "inputFormat": { + "type": "json", + "keepNullColumns": false, + "assumeNewlineDelimited": false, + "useJsonNodeReader": false + }, + "replicas": 1, + "taskCount": 1, + "taskDuration": "PT3600S", + "consumerProperties": { + "bootstrap.servers": "localhost:9094" + }, + "autoScalerConfig": null, + "pollTimeout": 100, + "startDelay": "PT5S", + "period": "PT30S", + "useEarliestOffset": true, + "completionTimeout": "PT1800S", + "lateMessageRejectionPeriod": null, + "earlyMessageRejectionPeriod": null, + "lateMessageRejectionStartDateTime": null, + "configOverrides": null, + "idleConfig": null, + "stream": "social_media", + "useEarliestSequenceNumber": true + }, + "tuningConfig": { + "type": "kafka", + "appendableIndexSpec": { + "type": "onheap", + "preserveExistingMetrics": false + }, + "maxRowsInMemory": 150000, + "maxBytesInMemory": 0, + "skipBytesInMemoryOverheadCheck": false, + "maxRowsPerSegment": 5000000, + "maxTotalRows": null, + "intermediatePersistPeriod": "PT10M", + "maxPendingPersists": 0, + "indexSpec": { + "bitmap": { + "type": "roaring" + }, + "dimensionCompression": "lz4", + "stringDictionaryEncoding": { + "type": "utf8" + }, + "metricCompression": "lz4", + "longEncoding": "longs" + }, + "indexSpecForIntermediatePersists": { + "bitmap": { + "type": "roaring" + }, + "dimensionCompression": "lz4", + "stringDictionaryEncoding": { + "type": "utf8" + }, + "metricCompression": "lz4", + "longEncoding": "longs" + }, + "reportParseExceptions": false, + "handoffConditionTimeout": 0, + "resetOffsetAutomatically": false, + "segmentWriteOutMediumFactory": null, + "workerThreads": null, + "chatRetries": 8, + "httpTimeout": "PT10S", + "shutdownTimeout": "PT80S", + "offsetFetchPeriod": "PT30S", + "intermediateHandoffPeriod": "P2147483647D", + "logParseExceptions": false, + "maxParseExceptions": 2147483647, + "maxSavedParseExceptions": 0, + "skipSequenceNumberAvailabilityCheck": false, + "repartitionTransitionDuration": "PT120S" + } + }, + "dataSchema": { + "dataSource": "social_media", + "timestampSpec": { + "column": "__time", + "format": "iso", + "missingValue": null + }, + "dimensionsSpec": { + "dimensions": [ + { + "type": "string", + "name": "username", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": true + }, + { + "type": "string", + "name": "post_title", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": true + }, + { + "type": "long", + "name": "views", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": false + }, + { + "type": "long", + "name": "upvotes", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": false + }, + { + "type": "long", + "name": "comments", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": false + }, + { + "type": "string", + "name": "edited", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": true + } + ], + "dimensionExclusions": [ + "__time" + ], + "includeAllDimensions": false, + "useSchemaDiscovery": false + }, + "metricsSpec": [], + "granularitySpec": { + "type": "uniform", + "segmentGranularity": "HOUR", + "queryGranularity": { + "type": "none" + }, + "rollup": false, + "intervals": [] + }, + "transformSpec": { + "filter": null, + "transforms": [] + } + }, + "tuningConfig": { + "type": "kafka", + "appendableIndexSpec": { + "type": "onheap", + "preserveExistingMetrics": false + }, + "maxRowsInMemory": 150000, + "maxBytesInMemory": 0, + "skipBytesInMemoryOverheadCheck": false, + "maxRowsPerSegment": 5000000, + "maxTotalRows": null, + "intermediatePersistPeriod": "PT10M", + "maxPendingPersists": 0, + "indexSpec": { + "bitmap": { + "type": "roaring" + }, + "dimensionCompression": "lz4", + "stringDictionaryEncoding": { + "type": "utf8" + }, + "metricCompression": "lz4", + "longEncoding": "longs" + }, + "indexSpecForIntermediatePersists": { + "bitmap": { + "type": "roaring" + }, + "dimensionCompression": "lz4", + "stringDictionaryEncoding": { + "type": "utf8" + }, + "metricCompression": "lz4", + "longEncoding": "longs" + }, + "reportParseExceptions": false, + "handoffConditionTimeout": 0, + "resetOffsetAutomatically": false, + "segmentWriteOutMediumFactory": null, + "workerThreads": null, + "chatRetries": 8, + "httpTimeout": "PT10S", + "shutdownTimeout": "PT80S", + "offsetFetchPeriod": "PT30S", + "intermediateHandoffPeriod": "P2147483647D", + "logParseExceptions": false, + "maxParseExceptions": 2147483647, + "maxSavedParseExceptions": 0, + "skipSequenceNumberAvailabilityCheck": false, + "repartitionTransitionDuration": "PT120S" + }, + "ioConfig": { + "topic": "social_media", + "inputFormat": { + "type": "json", + "keepNullColumns": false, + "assumeNewlineDelimited": false, + "useJsonNodeReader": false + }, + "replicas": 1, + "taskCount": 1, + "taskDuration": "PT3600S", + "consumerProperties": { + "bootstrap.servers": "localhost:9094" + }, + "autoScalerConfig": null, + "pollTimeout": 100, + "startDelay": "PT5S", + "period": "PT30S", + "useEarliestOffset": true, + "completionTimeout": "PT1800S", + "lateMessageRejectionPeriod": null, + "earlyMessageRejectionPeriod": null, + "lateMessageRejectionStartDateTime": null, + "configOverrides": null, + "idleConfig": null, + "stream": "social_media", + "useEarliestSequenceNumber": true + }, + "context": null, + "suspended": false + }, + "version": "2023-07-03T18:51:02.970Z" + } + ] +} + ``` +
+ +### Get audit history for a specific supervisor + +Retrieves an audit history of specs for a single supervisor. + +#### URL + +GET /druid/indexer/v1/supervisor/:supervisorId/history + + +#### Responses + + + + + + +*Successfully retrieved supervisor audit history* + + + + + +*Invalid supervisor ID* + + + + +--- + +#### Sample request + +The following example shows how to retrieve the audit history of a supervisor with the name `wikipedia_stream`. + + + + + + +```shell +curl "http://ROUTER_IP:ROUTER_PORT/druid/indexer/v1/supervisor/wikipedia_stream/history" +``` + + + + + +```HTTP +GET /druid/indexer/v1/supervisor/wikipedia_stream/history HTTP/1.1 +Host: http://ROUTER_IP:ROUTER_PORT +``` + + + + +#### Sample response + +
+ Click to show sample response + + ```json +[ + { + "spec": { + "type": "kafka", + "spec": { + "dataSchema": { + "dataSource": "wikipedia_stream", + "timestampSpec": { + "column": "__time", + "format": "iso", + "missingValue": null + }, + "dimensionsSpec": { + "dimensions": [ + { + "type": "string", + "name": "username", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": true + }, + { + "type": "string", + "name": "post_title", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": true + }, + { + "type": "long", + "name": "views", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": false + }, + { + "type": "long", + "name": "upvotes", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": false + }, + { + "type": "long", + "name": "comments", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": false + }, + { + "type": "string", + "name": "edited", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": true + } + ], + "dimensionExclusions": [ + "__time" + ], + "includeAllDimensions": false, + "useSchemaDiscovery": false + }, + "metricsSpec": [], + "granularitySpec": { + "type": "uniform", + "segmentGranularity": "HOUR", + "queryGranularity": { + "type": "none" + }, + "rollup": false, + "intervals": [] + }, + "transformSpec": { + "filter": null, + "transforms": [] + } + }, + "ioConfig": { + "topic": "social_media", + "inputFormat": { + "type": "json", + "keepNullColumns": false, + "assumeNewlineDelimited": false, + "useJsonNodeReader": false + }, + "replicas": 1, + "taskCount": 1, + "taskDuration": "PT3600S", + "consumerProperties": { + "bootstrap.servers": "localhost:9042" + }, + "autoScalerConfig": null, + "pollTimeout": 100, + "startDelay": "PT5S", + "period": "PT30S", + "useEarliestOffset": true, + "completionTimeout": "PT1800S", + "lateMessageRejectionPeriod": null, + "earlyMessageRejectionPeriod": null, + "lateMessageRejectionStartDateTime": null, + "configOverrides": null, + "idleConfig": null, + "stream": "social_media", + "useEarliestSequenceNumber": true + }, + "tuningConfig": { + "type": "kafka", + "appendableIndexSpec": { + "type": "onheap", + "preserveExistingMetrics": false + }, + "maxRowsInMemory": 150000, + "maxBytesInMemory": 0, + "skipBytesInMemoryOverheadCheck": false, + "maxRowsPerSegment": 5000000, + "maxTotalRows": null, + "intermediatePersistPeriod": "PT10M", + "maxPendingPersists": 0, + "indexSpec": { + "bitmap": { + "type": "roaring" + }, + "dimensionCompression": "lz4", + "stringDictionaryEncoding": { + "type": "utf8" + }, + "metricCompression": "lz4", + "longEncoding": "longs" + }, + "indexSpecForIntermediatePersists": { + "bitmap": { + "type": "roaring" + }, + "dimensionCompression": "lz4", + "stringDictionaryEncoding": { + "type": "utf8" + }, + "metricCompression": "lz4", + "longEncoding": "longs" + }, + "reportParseExceptions": false, + "handoffConditionTimeout": 0, + "resetOffsetAutomatically": false, + "segmentWriteOutMediumFactory": null, + "workerThreads": null, + "chatRetries": 8, + "httpTimeout": "PT10S", + "shutdownTimeout": "PT80S", + "offsetFetchPeriod": "PT30S", + "intermediateHandoffPeriod": "P2147483647D", + "logParseExceptions": false, + "maxParseExceptions": 2147483647, + "maxSavedParseExceptions": 0, + "skipSequenceNumberAvailabilityCheck": false, + "repartitionTransitionDuration": "PT120S" + } + }, + "dataSchema": { + "dataSource": "wikipedia_stream", + "timestampSpec": { + "column": "__time", + "format": "iso", + "missingValue": null + }, + "dimensionsSpec": { + "dimensions": [ + { + "type": "string", + "name": "username", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": true + }, + { + "type": "string", + "name": "post_title", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": true + }, + { + "type": "long", + "name": "views", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": false + }, + { + "type": "long", + "name": "upvotes", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": false + }, + { + "type": "long", + "name": "comments", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": false + }, + { + "type": "string", + "name": "edited", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": true + } + ], + "dimensionExclusions": [ + "__time" + ], + "includeAllDimensions": false, + "useSchemaDiscovery": false + }, + "metricsSpec": [], + "granularitySpec": { + "type": "uniform", + "segmentGranularity": "HOUR", + "queryGranularity": { + "type": "none" + }, + "rollup": false, + "intervals": [] + }, + "transformSpec": { + "filter": null, + "transforms": [] + } + }, + "tuningConfig": { + "type": "kafka", + "appendableIndexSpec": { + "type": "onheap", + "preserveExistingMetrics": false + }, + "maxRowsInMemory": 150000, + "maxBytesInMemory": 0, + "skipBytesInMemoryOverheadCheck": false, + "maxRowsPerSegment": 5000000, + "maxTotalRows": null, + "intermediatePersistPeriod": "PT10M", + "maxPendingPersists": 0, + "indexSpec": { + "bitmap": { + "type": "roaring" + }, + "dimensionCompression": "lz4", + "stringDictionaryEncoding": { + "type": "utf8" + }, + "metricCompression": "lz4", + "longEncoding": "longs" + }, + "indexSpecForIntermediatePersists": { + "bitmap": { + "type": "roaring" + }, + "dimensionCompression": "lz4", + "stringDictionaryEncoding": { + "type": "utf8" + }, + "metricCompression": "lz4", + "longEncoding": "longs" + }, + "reportParseExceptions": false, + "handoffConditionTimeout": 0, + "resetOffsetAutomatically": false, + "segmentWriteOutMediumFactory": null, + "workerThreads": null, + "chatRetries": 8, + "httpTimeout": "PT10S", + "shutdownTimeout": "PT80S", + "offsetFetchPeriod": "PT30S", + "intermediateHandoffPeriod": "P2147483647D", + "logParseExceptions": false, + "maxParseExceptions": 2147483647, + "maxSavedParseExceptions": 0, + "skipSequenceNumberAvailabilityCheck": false, + "repartitionTransitionDuration": "PT120S" + }, + "ioConfig": { + "topic": "social_media", + "inputFormat": { + "type": "json", + "keepNullColumns": false, + "assumeNewlineDelimited": false, + "useJsonNodeReader": false + }, + "replicas": 1, + "taskCount": 1, + "taskDuration": "PT3600S", + "consumerProperties": { + "bootstrap.servers": "localhost:9042" + }, + "autoScalerConfig": null, + "pollTimeout": 100, + "startDelay": "PT5S", + "period": "PT30S", + "useEarliestOffset": true, + "completionTimeout": "PT1800S", + "lateMessageRejectionPeriod": null, + "earlyMessageRejectionPeriod": null, + "lateMessageRejectionStartDateTime": null, + "configOverrides": null, + "idleConfig": null, + "stream": "social_media", + "useEarliestSequenceNumber": true + }, + "context": null, + "suspended": false + }, + "version": "2023-07-05T20:59:16.872Z" + } +] + ``` +
+ +## Manage supervisors + +### Create or update a supervisor + +Creates a new supervisor or updates an existing one for the same datasource with a new schema and configuration. + +You can define a supervisor spec for [Apache Kafka](../development/extensions-core/kafka-ingestion.md#define-a-supervisor-spec) or [Amazon Kinesis](../development/extensions-core/kinesis-ingestion.md#supervisor-spec) streaming ingestion methods. Once created, the supervisor persists in the metadata database. + +When you call this endpoint on an existing supervisor for the same datasource, the running supervisor signals its tasks to stop reading and begin publishing, exiting itself. Druid then uses the provided configuration from the request body to create a new supervisor. Druid submits a new schema while retaining existing publishing tasks and starts new tasks at the previous task offsets. + +#### URL + +POST /druid/indexer/v1/supervisor + +#### Responses + + + + + + +*Successfully created a new supervisor or updated an existing supervisor* + + + + + +*Request body content type is not in JSON format* + + + + +--- + +#### Sample request + +The following example uses JSON input format to create a supervisor spec for Kafka with a `social_media` datasource and `social_media` topic. + + + + + + +```shell +curl "http://ROUTER_IP:ROUTER_PORT/druid/indexer/v1/supervisor" \ +--header 'Content-Type: application/json' \ +--data '{ + "type": "kafka", + "spec": { + "ioConfig": { + "type": "kafka", + "consumerProperties": { + "bootstrap.servers": "localhost:9094" + }, + "topic": "social_media", + "inputFormat": { + "type": "json" + }, + "useEarliestOffset": true + }, + "tuningConfig": { + "type": "kafka" + }, + "dataSchema": { + "dataSource": "social_media", + "timestampSpec": { + "column": "__time", + "format": "iso" + }, + "dimensionsSpec": { + "dimensions": [ + "username", + "post_title", + { + "type": "long", + "name": "views" + }, + { + "type": "long", + "name": "upvotes" + }, + { + "type": "long", + "name": "comments" + }, + "edited" + ] + }, + "granularitySpec": { + "queryGranularity": "none", + "rollup": false, + "segmentGranularity": "hour" + } + } + } +}' +``` + + + + + +```HTTP +POST /druid/indexer/v1/supervisor HTTP/1.1 +Host: http://ROUTER_IP:ROUTER_PORT +Content-Type: application/json +Content-Length: 1359 + +{ + "type": "kafka", + "spec": { + "ioConfig": { + "type": "kafka", + "consumerProperties": { + "bootstrap.servers": "localhost:9094" + }, + "topic": "social_media", + "inputFormat": { + "type": "json" + }, + "useEarliestOffset": true + }, + "tuningConfig": { + "type": "kafka" + }, + "dataSchema": { + "dataSource": "social_media", + "timestampSpec": { + "column": "__time", + "format": "iso" + }, + "dimensionsSpec": { + "dimensions": [ + "username", + "post_title", + { + "type": "long", + "name": "views" + }, + { + "type": "long", + "name": "upvotes" + }, + { + "type": "long", + "name": "comments" + }, + "edited" + ] + }, + "granularitySpec": { + "queryGranularity": "none", + "rollup": false, + "segmentGranularity": "hour" + } + } + } +} +``` + + + + +#### Sample response + +
+ Click to show sample response + + ```json +{ + "id": "social_media" +} + ``` +
+ +### Suspend a running supervisor + +Suspends a single running supervisor. Returns the updated supervisor spec, where the `suspended` property is set to `true`. The suspended supervisor continues to emit logs and metrics. + +#### URL +POST /druid/indexer/v1/supervisor/:supervisorId/suspend + +#### Responses + + + + + + +*Successfully shut down supervisor* + + + + + +*Supervisor already suspended* + + + + + +*Invalid supervisor ID* + + + + +--- + +#### Sample request + +The following example shows how to suspend a running supervisor with the name `social_media`. + + + + + + +```shell +curl --request POST "http://ROUTER_IP:ROUTER_PORT/druid/indexer/v1/supervisor/social_media/suspend" +``` + + + + + +```HTTP +POST /druid/indexer/v1/supervisor/social_media/suspend HTTP/1.1 +Host: http://ROUTER_IP:ROUTER_PORT +``` + + + + +#### Sample response + +
+ Click to show sample response + + ```json +{ + "type": "kafka", + "spec": { + "dataSchema": { + "dataSource": "social_media", + "timestampSpec": { + "column": "__time", + "format": "iso", + "missingValue": null + }, + "dimensionsSpec": { + "dimensions": [ + { + "type": "string", + "name": "username", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": true + }, + { + "type": "string", + "name": "post_title", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": true + }, + { + "type": "long", + "name": "views", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": false + }, + { + "type": "long", + "name": "upvotes", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": false + }, + { + "type": "long", + "name": "comments", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": false + }, + { + "type": "string", + "name": "edited", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": true + } + ], + "dimensionExclusions": [ + "__time" + ], + "includeAllDimensions": false, + "useSchemaDiscovery": false + }, + "metricsSpec": [], + "granularitySpec": { + "type": "uniform", + "segmentGranularity": "HOUR", + "queryGranularity": { + "type": "none" + }, + "rollup": false, + "intervals": [] + }, + "transformSpec": { + "filter": null, + "transforms": [] + } + }, + "ioConfig": { + "topic": "social_media", + "inputFormat": { + "type": "json", + "keepNullColumns": false, + "assumeNewlineDelimited": false, + "useJsonNodeReader": false + }, + "replicas": 1, + "taskCount": 1, + "taskDuration": "PT3600S", + "consumerProperties": { + "bootstrap.servers": "localhost:9094" + }, + "autoScalerConfig": null, + "pollTimeout": 100, + "startDelay": "PT5S", + "period": "PT30S", + "useEarliestOffset": true, + "completionTimeout": "PT1800S", + "lateMessageRejectionPeriod": null, + "earlyMessageRejectionPeriod": null, + "lateMessageRejectionStartDateTime": null, + "configOverrides": null, + "idleConfig": null, + "stream": "social_media", + "useEarliestSequenceNumber": true + }, + "tuningConfig": { + "type": "kafka", + "appendableIndexSpec": { + "type": "onheap", + "preserveExistingMetrics": false + }, + "maxRowsInMemory": 150000, + "maxBytesInMemory": 0, + "skipBytesInMemoryOverheadCheck": false, + "maxRowsPerSegment": 5000000, + "maxTotalRows": null, + "intermediatePersistPeriod": "PT10M", + "maxPendingPersists": 0, + "indexSpec": { + "bitmap": { + "type": "roaring" + }, + "dimensionCompression": "lz4", + "stringDictionaryEncoding": { + "type": "utf8" + }, + "metricCompression": "lz4", + "longEncoding": "longs" + }, + "indexSpecForIntermediatePersists": { + "bitmap": { + "type": "roaring" + }, + "dimensionCompression": "lz4", + "stringDictionaryEncoding": { + "type": "utf8" + }, + "metricCompression": "lz4", + "longEncoding": "longs" + }, + "reportParseExceptions": false, + "handoffConditionTimeout": 0, + "resetOffsetAutomatically": false, + "segmentWriteOutMediumFactory": null, + "workerThreads": null, + "chatRetries": 8, + "httpTimeout": "PT10S", + "shutdownTimeout": "PT80S", + "offsetFetchPeriod": "PT30S", + "intermediateHandoffPeriod": "P2147483647D", + "logParseExceptions": false, + "maxParseExceptions": 2147483647, + "maxSavedParseExceptions": 0, + "skipSequenceNumberAvailabilityCheck": false, + "repartitionTransitionDuration": "PT120S" + } + }, + "dataSchema": { + "dataSource": "social_media", + "timestampSpec": { + "column": "__time", + "format": "iso", + "missingValue": null + }, + "dimensionsSpec": { + "dimensions": [ + { + "type": "string", + "name": "username", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": true + }, + { + "type": "string", + "name": "post_title", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": true + }, + { + "type": "long", + "name": "views", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": false + }, + { + "type": "long", + "name": "upvotes", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": false + }, + { + "type": "long", + "name": "comments", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": false + }, + { + "type": "string", + "name": "edited", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": true + } + ], + "dimensionExclusions": [ + "__time" + ], + "includeAllDimensions": false, + "useSchemaDiscovery": false + }, + "metricsSpec": [], + "granularitySpec": { + "type": "uniform", + "segmentGranularity": "HOUR", + "queryGranularity": { + "type": "none" + }, + "rollup": false, + "intervals": [] + }, + "transformSpec": { + "filter": null, + "transforms": [] + } + }, + "tuningConfig": { + "type": "kafka", + "appendableIndexSpec": { + "type": "onheap", + "preserveExistingMetrics": false + }, + "maxRowsInMemory": 150000, + "maxBytesInMemory": 0, + "skipBytesInMemoryOverheadCheck": false, + "maxRowsPerSegment": 5000000, + "maxTotalRows": null, + "intermediatePersistPeriod": "PT10M", + "maxPendingPersists": 0, + "indexSpec": { + "bitmap": { + "type": "roaring" + }, + "dimensionCompression": "lz4", + "stringDictionaryEncoding": { + "type": "utf8" + }, + "metricCompression": "lz4", + "longEncoding": "longs" + }, + "indexSpecForIntermediatePersists": { + "bitmap": { + "type": "roaring" + }, + "dimensionCompression": "lz4", + "stringDictionaryEncoding": { + "type": "utf8" + }, + "metricCompression": "lz4", + "longEncoding": "longs" + }, + "reportParseExceptions": false, + "handoffConditionTimeout": 0, + "resetOffsetAutomatically": false, + "segmentWriteOutMediumFactory": null, + "workerThreads": null, + "chatRetries": 8, + "httpTimeout": "PT10S", + "shutdownTimeout": "PT80S", + "offsetFetchPeriod": "PT30S", + "intermediateHandoffPeriod": "P2147483647D", + "logParseExceptions": false, + "maxParseExceptions": 2147483647, + "maxSavedParseExceptions": 0, + "skipSequenceNumberAvailabilityCheck": false, + "repartitionTransitionDuration": "PT120S" + }, + "ioConfig": { + "topic": "social_media", + "inputFormat": { + "type": "json", + "keepNullColumns": false, + "assumeNewlineDelimited": false, + "useJsonNodeReader": false + }, + "replicas": 1, + "taskCount": 1, + "taskDuration": "PT3600S", + "consumerProperties": { + "bootstrap.servers": "localhost:9094" + }, + "autoScalerConfig": null, + "pollTimeout": 100, + "startDelay": "PT5S", + "period": "PT30S", + "useEarliestOffset": true, + "completionTimeout": "PT1800S", + "lateMessageRejectionPeriod": null, + "earlyMessageRejectionPeriod": null, + "lateMessageRejectionStartDateTime": null, + "configOverrides": null, + "idleConfig": null, + "stream": "social_media", + "useEarliestSequenceNumber": true + }, + "context": null, + "suspended": true +} + ``` +
+ +### Suspend all supervisors + +Suspends all supervisors. Note that this endpoint returns an HTTP `200 Success` code message even if there are no supervisors or running supervisors to suspend. + +#### URL +POST /druid/indexer/v1/supervisor/suspendAll + +#### Responses + + + + + + +*Successfully suspended all supervisors* + + + + +--- + +#### Sample request + + + + + + +```shell +curl --request POST "http://ROUTER_IP:ROUTER_PORT/druid/indexer/v1/supervisor/suspendAll" +``` + + + + + +```HTTP +POST /druid/indexer/v1/supervisor/suspendAll HTTP/1.1 +Host: http://ROUTER_IP:ROUTER_PORT +``` + + + + +#### Sample response + +
+ Click to show sample response + + ```json +{ + "status": "success" +} + ``` +
+ +### Resume a supervisor + +Resumes indexing tasks for a supervisor. Returns an updated supervisor spec with the `suspended` property set to `false`. + +#### URL + +POST /druid/indexer/v1/supervisor/:supervisorId/resume + +#### Responses + + + + + + +*Successfully resumed supervisor* + + + + + +*Supervisor already running* + + + + + +*Invalid supervisor ID* + + + + +--- + +#### Sample request + +The following example resumes a previously suspended supervisor with name `social_media`. + + + + + + +```shell +curl --request POST "http://ROUTER_IP:ROUTER_PORT/druid/indexer/v1/supervisor/social_media/resume" +``` + + + + + +```HTTP +POST /druid/indexer/v1/supervisor/social_media/resume HTTP/1.1 +Host: http://ROUTER_IP:ROUTER_PORT +``` + + + + +#### Sample response + +
+ Click to show sample response + + ```json +{ + "type": "kafka", + "spec": { + "dataSchema": { + "dataSource": "social_media", + "timestampSpec": { + "column": "__time", + "format": "iso", + "missingValue": null + }, + "dimensionsSpec": { + "dimensions": [ + { + "type": "string", + "name": "username", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": true + }, + { + "type": "string", + "name": "post_title", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": true + }, + { + "type": "long", + "name": "views", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": false + }, + { + "type": "long", + "name": "upvotes", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": false + }, + { + "type": "long", + "name": "comments", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": false + }, + { + "type": "string", + "name": "edited", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": true + } + ], + "dimensionExclusions": [ + "__time" + ], + "includeAllDimensions": false, + "useSchemaDiscovery": false + }, + "metricsSpec": [], + "granularitySpec": { + "type": "uniform", + "segmentGranularity": "HOUR", + "queryGranularity": { + "type": "none" + }, + "rollup": false, + "intervals": [] + }, + "transformSpec": { + "filter": null, + "transforms": [] + } + }, + "ioConfig": { + "topic": "social_media", + "inputFormat": { + "type": "json", + "keepNullColumns": false, + "assumeNewlineDelimited": false, + "useJsonNodeReader": false + }, + "replicas": 1, + "taskCount": 1, + "taskDuration": "PT3600S", + "consumerProperties": { + "bootstrap.servers": "localhost:9094" + }, + "autoScalerConfig": null, + "pollTimeout": 100, + "startDelay": "PT5S", + "period": "PT30S", + "useEarliestOffset": true, + "completionTimeout": "PT1800S", + "lateMessageRejectionPeriod": null, + "earlyMessageRejectionPeriod": null, + "lateMessageRejectionStartDateTime": null, + "configOverrides": null, + "idleConfig": null, + "stream": "social_media", + "useEarliestSequenceNumber": true + }, + "tuningConfig": { + "type": "kafka", + "appendableIndexSpec": { + "type": "onheap", + "preserveExistingMetrics": false + }, + "maxRowsInMemory": 150000, + "maxBytesInMemory": 0, + "skipBytesInMemoryOverheadCheck": false, + "maxRowsPerSegment": 5000000, + "maxTotalRows": null, + "intermediatePersistPeriod": "PT10M", + "maxPendingPersists": 0, + "indexSpec": { + "bitmap": { + "type": "roaring" + }, + "dimensionCompression": "lz4", + "stringDictionaryEncoding": { + "type": "utf8" + }, + "metricCompression": "lz4", + "longEncoding": "longs" + }, + "indexSpecForIntermediatePersists": { + "bitmap": { + "type": "roaring" + }, + "dimensionCompression": "lz4", + "stringDictionaryEncoding": { + "type": "utf8" + }, + "metricCompression": "lz4", + "longEncoding": "longs" + }, + "reportParseExceptions": false, + "handoffConditionTimeout": 0, + "resetOffsetAutomatically": false, + "segmentWriteOutMediumFactory": null, + "workerThreads": null, + "chatRetries": 8, + "httpTimeout": "PT10S", + "shutdownTimeout": "PT80S", + "offsetFetchPeriod": "PT30S", + "intermediateHandoffPeriod": "P2147483647D", + "logParseExceptions": false, + "maxParseExceptions": 2147483647, + "maxSavedParseExceptions": 0, + "skipSequenceNumberAvailabilityCheck": false, + "repartitionTransitionDuration": "PT120S" + } + }, + "dataSchema": { + "dataSource": "social_media", + "timestampSpec": { + "column": "__time", + "format": "iso", + "missingValue": null + }, + "dimensionsSpec": { + "dimensions": [ + { + "type": "string", + "name": "username", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": true + }, + { + "type": "string", + "name": "post_title", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": true + }, + { + "type": "long", + "name": "views", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": false + }, + { + "type": "long", + "name": "upvotes", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": false + }, + { + "type": "long", + "name": "comments", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": false + }, + { + "type": "string", + "name": "edited", + "multiValueHandling": "SORTED_ARRAY", + "createBitmapIndex": true + } + ], + "dimensionExclusions": [ + "__time" + ], + "includeAllDimensions": false, + "useSchemaDiscovery": false + }, + "metricsSpec": [], + "granularitySpec": { + "type": "uniform", + "segmentGranularity": "HOUR", + "queryGranularity": { + "type": "none" + }, + "rollup": false, + "intervals": [] + }, + "transformSpec": { + "filter": null, + "transforms": [] + } + }, + "tuningConfig": { + "type": "kafka", + "appendableIndexSpec": { + "type": "onheap", + "preserveExistingMetrics": false + }, + "maxRowsInMemory": 150000, + "maxBytesInMemory": 0, + "skipBytesInMemoryOverheadCheck": false, + "maxRowsPerSegment": 5000000, + "maxTotalRows": null, + "intermediatePersistPeriod": "PT10M", + "maxPendingPersists": 0, + "indexSpec": { + "bitmap": { + "type": "roaring" + }, + "dimensionCompression": "lz4", + "stringDictionaryEncoding": { + "type": "utf8" + }, + "metricCompression": "lz4", + "longEncoding": "longs" + }, + "indexSpecForIntermediatePersists": { + "bitmap": { + "type": "roaring" + }, + "dimensionCompression": "lz4", + "stringDictionaryEncoding": { + "type": "utf8" + }, + "metricCompression": "lz4", + "longEncoding": "longs" + }, + "reportParseExceptions": false, + "handoffConditionTimeout": 0, + "resetOffsetAutomatically": false, + "segmentWriteOutMediumFactory": null, + "workerThreads": null, + "chatRetries": 8, + "httpTimeout": "PT10S", + "shutdownTimeout": "PT80S", + "offsetFetchPeriod": "PT30S", + "intermediateHandoffPeriod": "P2147483647D", + "logParseExceptions": false, + "maxParseExceptions": 2147483647, + "maxSavedParseExceptions": 0, + "skipSequenceNumberAvailabilityCheck": false, + "repartitionTransitionDuration": "PT120S" + }, + "ioConfig": { + "topic": "social_media", + "inputFormat": { + "type": "json", + "keepNullColumns": false, + "assumeNewlineDelimited": false, + "useJsonNodeReader": false + }, + "replicas": 1, + "taskCount": 1, + "taskDuration": "PT3600S", + "consumerProperties": { + "bootstrap.servers": "localhost:9094" + }, + "autoScalerConfig": null, + "pollTimeout": 100, + "startDelay": "PT5S", + "period": "PT30S", + "useEarliestOffset": true, + "completionTimeout": "PT1800S", + "lateMessageRejectionPeriod": null, + "earlyMessageRejectionPeriod": null, + "lateMessageRejectionStartDateTime": null, + "configOverrides": null, + "idleConfig": null, + "stream": "social_media", + "useEarliestSequenceNumber": true + }, + "context": null, + "suspended": false +} + ``` +
+ +### Resume all supervisors + +Resumes all supervisors. Note that this endpoint returns an HTTP `200 Success` code even if there are no supervisors or suspended supervisors to resume. + +#### URL + +POST /druid/indexer/v1/supervisor/resumeAll + +#### Responses + + + + + + +*Successfully resumed all supervisors* + + + + +--- + +#### Sample request + + + + + + +```shell +curl --request POST "http://ROUTER_IP:ROUTER_PORT/druid/indexer/v1/supervisor/resumeAll" +``` + + + + + +```HTTP +POST /druid/indexer/v1/supervisor/resumeAll HTTP/1.1 +Host: http://ROUTER_IP:ROUTER_PORT +``` + + + + +#### Sample response + +
+ Click to show sample response + + ```json +{ + "status": "success" +} + ``` +
+ +### Reset a supervisor + +Resets the specified supervisor. This endpoint clears _all_ stored offsets in Kafka or sequence numbers in Kinesis, prompting the supervisor to resume data reading. The supervisor will start from the earliest or latest available position, depending on the platform (offsets in Kafka or sequence numbers in Kinesis). It kills and recreates active tasks to read from valid positions. + +Use this endpoint to recover from a stopped state due to missing offsets in Kafka or sequence numbers in Kinesis. Use this endpoint with caution as it may result in skipped messages and lead to data loss or duplicate data. + +#### URL + +POST /druid/indexer/v1/supervisor/:supervisorId/reset + +#### Responses + + + + + + +*Successfully reset supervisor* + + + + + +*Invalid supervisor ID* + + + + +--- + +#### Sample request + +The following example shows how to reset a supervisor with the name `social_media`. + + + + + + +```shell +curl --request POST "http://ROUTER_IP:ROUTER_PORT/druid/indexer/v1/supervisor/social_media/reset" +``` + + + + + +```HTTP +POST /druid/indexer/v1/supervisor/social_media/reset HTTP/1.1 +Host: http://ROUTER_IP:ROUTER_PORT +``` + + + + +#### Sample response + +
+ Click to show sample response + + ```json +{ + "id": "social_media" +} + ``` +
+ +### Reset Offsets for a supervisor + +Resets the specified offsets for a supervisor. This endpoint clears _only_ the specified offsets in Kafka or sequence numbers in Kinesis, prompting the supervisor to resume data reading. +If there are no stored offsets, the specified offsets will be set in the metadata store. The supervisor will start from the reset offsets for the partitions specified and for the other partitions from the stored offset. +It kills and recreates active tasks pertaining to the partitions specified to read from valid offsets. + +Use this endpoint to selectively reset offsets for partitions without resetting the entire set. + +#### URL + +POST /druid/indexer/v1/supervisor/:supervisorId/resetOffsets + +#### Responses + + + + + +*Successfully reset offsets* + + + +*Invalid supervisor ID* + + + +--- +#### Reset Offsets Metadata + +This section presents the structure and details of the reset offsets metadata payload. + +| Field | Type | Description | Required | +|---------|---------|---------|---------| +| `type` | String | The type of reset offsets metadata payload. It must match the supervisor's `type`. Possible values: `kafka` or `kinesis`. | Yes | +| `partitions` | Object | An object representing the reset metadata. See below for details. | Yes | + +#### Partitions + +The following table defines the fields within the `partitions` object in the reset offsets metadata payload. + +| Field | Type | Description | Required | +|---------|---------|---------|---------| +| `type` | String | Must be set as `end`. Indicates the end sequence numbers for the reset offsets. | Yes | +| `stream` | String | The stream to be reset. It must be a valid stream consumed by the supervisor. | Yes | +| `partitionOffsetMap` | Object | A map of partitions to corresponding offsets for the stream to be reset.| Yes | + +#### Sample request + +The following example shows how to reset offsets for a kafka supervisor with the name `social_media`. Let's say the supervisor is reading +from a kafka topic `ads_media_stream` and has the stored offsets: `{"0": 0, "1": 10, "2": 20, "3": 40}`. + + + + + +```shell +curl --request POST "http://ROUTER_IP:ROUTER_PORT/druid/indexer/v1/supervisor/social_media/resetOffsets" +--header 'Content-Type: application/json' +--data-raw '{"type":"kafka","partitions":{"type":"end","stream":"ads_media_stream","partitionOffsetMap":{"0":100, "2": 650}}}' +``` + + + +```HTTP +POST /druid/indexer/v1/supervisor/social_media/resetOffsets HTTP/1.1 +Host: http://ROUTER_IP:ROUTER_PORT +Content-Type: application/json + +{ + "type": "kafka", + "partitions": { + "type": "end", + "stream": "ads_media_stream", + "partitionOffsetMap": { + "0": 100, + "2": 650 + } + } +} +``` + +The above operation will reset offsets only for partitions 0 and 2 to 100 and 650 respectively. After a successful reset, +when the supervisor's tasks restart, they will resume reading from `{"0": 100, "1": 10, "2": 650, "3": 40}`. + + + +#### Sample response + +
+ Click to show sample response + + ```json +{ + "id": "social_media" +} + ``` +
+ +### Terminate a supervisor + +Terminates a supervisor and its associated indexing tasks, triggering the publishing of their segments. When terminated, a tombstone marker is placed in the database to prevent reloading on restart. + +The terminated supervisor still exists in the metadata store and its history can be retrieved. + +#### URL + +POST /druid/indexer/v1/supervisor/:supervisorId/terminate + +#### Responses + + + + + + +*Successfully terminated a supervisor* + + + + + +*Invalid supervisor ID or supervisor not running* + + + + +--- + +#### Sample request + + + + + + +```shell +curl --request POST "http://ROUTER_IP:ROUTER_PORT/druid/indexer/v1/supervisor/social_media/terminate" +``` + + + + + +```HTTP +POST /druid/indexer/v1/supervisor/social_media/terminate HTTP/1.1 +Host: http://ROUTER_IP:ROUTER_PORT +``` + + + + +#### Sample response + +
+ Click to show sample response + + ```json +{ + "id": "social_media" +} + ``` +
+ +### Terminate all supervisors + +Terminates all supervisors. Terminated supervisors still exist in the metadata store and their history can be retrieved. Note that this endpoint returns an HTTP `200 Success` code even if there are no supervisors or running supervisors to terminate. + +#### URL + +POST /druid/indexer/v1/supervisor/terminateAll + +#### Responses + + + + + + +*Successfully terminated all supervisors* + + + + +--- + +#### Sample request + + + + + + +```shell +curl --request POST "http://ROUTER_IP:ROUTER_PORT/druid/indexer/v1/supervisor/terminateAll" +``` + + + + + +```HTTP +POST /druid/indexer/v1/supervisor/terminateAll HTTP/1.1 +Host: http://ROUTER_IP:ROUTER_PORT +``` + + + + +#### Sample response + +
+ Click to show sample response + + ```json +{ + "status": "success" +} + ``` +
+ +### Shut down a supervisor + +Shuts down a supervisor. This endpoint is deprecated and will be removed in future releases. Use the equivalent [terminate](#terminate-a-supervisor) endpoint instead. + +#### URL + +POST /druid/indexer/v1/supervisor/:supervisorId/shutdown diff --git a/docs/development/extensions-core/kafka-supervisor-operations.md b/docs/development/extensions-core/kafka-supervisor-operations.md index fe8d1f562b66..ace9c7d677fd 100644 --- a/docs/development/extensions-core/kafka-supervisor-operations.md +++ b/docs/development/extensions-core/kafka-supervisor-operations.md @@ -131,6 +131,70 @@ to start and in flight tasks will fail. This operation enables you to recover fr Note that the supervisor must be running for this endpoint to be available. +## Resetting Offsets for a Supervisor + +The supervisor must be running for this endpoint to be available. + +The `POST /druid/indexer/v1/supervisor//resetOffsets` operation clears stored +offsets, causing the supervisor to start reading from the specified offsets. After resetting stored +offsets, the supervisor kills and recreates any active tasks pertaining to the specified partitions, +so that tasks begin reading from specified offsets. For partitions that are not specified in this operation, the supervisor +will resume from the last stored offset. + +Use care when using this operation! Resetting offsets for a supervisor may cause Kafka messages to be skipped or read +twice, resulting in missing or duplicate data. + +#### Sample request + +The following example shows how to reset offsets for a kafka supervisor with the name `social_media`. Let's say the supervisor is reading +from two kafka topics `ads_media_foo` and `ads_media_bar` and has the stored offsets: `{"ads_media_foo:0": 0, "ads_media_foo:1": 10, "ads_media_bar:0": 20, "ads_media_bar:1": 40}`. + + + + + +```shell +curl --request POST "http://ROUTER_IP:ROUTER_PORT/druid/indexer/v1/supervisor/social_media/resetOffsets" +--header 'Content-Type: application/json' +--data-raw '{"type":"kafka","partitions":{"type":"end","stream":"ads_media_foo|ads_media_bar","partitionOffsetMap":{"ads_media_foo:0": 3, "ads_media_bar:1": 12}}}' +``` + + + +```HTTP +POST /druid/indexer/v1/supervisor/social_media/resetOffsets HTTP/1.1 +Host: http://ROUTER_IP:ROUTER_PORT +Content-Type: application/json + +{ + "type": "kafka", + "partitions": { + "type": "end", + "stream": "ads_media_foo|ads_media_bar", + "partitionOffsetMap": { + "ads_media_foo:0": 3, + "ads_media_bar:1": 12 + } + } +} +``` +The above operation will reset offsets for `ads_media_foo` partition 0 and `ads_media_bar` partition 1 to offsets 3 and 12 respectively. After a successful reset, +when the supervisor's tasks restart, they will resume reading from `{"ads_media_foo:0": 3, "ads_media_foo:1": 10, "ads_media_bar:0": 20, "ads_media_bar:1": 12}`. + + + +#### Sample response + +
+ Click to show sample response + + ```json +{ + "id": "social_media" +} + ``` +
+ ## Terminating Supervisors The `POST /druid/indexer/v1/supervisor//terminate` operation terminates a supervisor and causes all diff --git a/docs/development/extensions-core/kinesis-ingestion.md b/docs/development/extensions-core/kinesis-ingestion.md index 31682b54a291..74e18e39b76e 100644 --- a/docs/development/extensions-core/kinesis-ingestion.md +++ b/docs/development/extensions-core/kinesis-ingestion.md @@ -38,6 +38,13 @@ To use the Kinesis indexing service, load the `druid-kinesis-indexing-service` c To use the Kinesis indexing service, load the `druid-kinesis-indexing-service` extension on both the Overlord and the MiddleManagers. Druid starts a supervisor for a dataSource when you submit a supervisor spec. Submit your supervisor spec to the following endpoint: +|Property|Type|Description|Required| +|--------|----|-----------|--------| +|`type`|String|The supervisor type; this should always be `kinesis`.|Yes| +|`spec`|Object|The container object for the supervisor configuration.|Yes| +|`ioConfig`|Object|The [I/O configuration](#supervisor-io-configuration) object for configuring Kinesis connection and I/O-related settings for the supervisor and indexing task.|Yes| +|`dataSchema`|Object|The schema used by the Kinesis indexing task during ingestion. See [`dataSchema`](../../ingestion/ingestion-spec.md#dataschema) for more information.|Yes| +|`tuningConfig`|Object|The [tuning configuration](#supervisor-tuning-configuration) object for configuring performance-related settings for the supervisor and indexing tasks.|No| `http://:/druid/indexer/v1/supervisor` @@ -502,7 +509,17 @@ no longer available in Kinesis (typically because the message retention period h removed and re-created) the supervisor will refuse to start and in-flight tasks will fail. This operation enables you to recover from this condition. -Note that the supervisor must be running for this endpoint to be available. +### Resetting Offsets for a supervisor + +To reset partition offsets for a supervisor, send a `POST` request to the `/druid/indexer/v1/supervisor/:supervisorId/resetOffsets` endpoint. This endpoint clears stored +sequence numbers, prompting the supervisor to start reading from the specified offsets. +After resetting stored offsets, the supervisor kills and recreates any active tasks pertaining to the specified partitions, +so that tasks begin reading specified offsets. For partitions that are not specified in this operation, the supervisor will resume from the last +stored offset. + +Use this endpoint with caution as it may result in skipped messages, leading to data loss or duplicate data. + +### Terminate a supervisor ### Terminating Supervisors diff --git a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java index fd9303c17bc4..ac2738534da9 100644 --- a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java +++ b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java @@ -277,6 +277,12 @@ public void reset(DataSourceMetadata dataSourceMetadata) } } + @Override + public void resetOffsets(DataSourceMetadata resetDataSourceMetadata) + { + throw new UnsupportedOperationException("Reset offsets not supported in MaterializedViewSupervisor"); + } + @Override public void checkpoint(int taskGroupId, DataSourceMetadata checkpointMetadata) { diff --git a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java index d822948881b3..e49b160c1e99 100644 --- a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java +++ b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java @@ -387,4 +387,37 @@ public void testSuspendedDoesntRun() EasyMock.replay(mock); supervisor.run(); } + + @Test + public void testResetOffsetsNotSupported() + { + MaterializedViewSupervisorSpec suspended = new MaterializedViewSupervisorSpec( + "base", + new DimensionsSpec(Collections.singletonList(new StringDimensionSchema("dim"))), + new AggregatorFactory[]{new LongSumAggregatorFactory("m1", "m1")}, + HadoopTuningConfig.makeDefaultTuningConfig(), + null, + null, + null, + null, + null, + true, + objectMapper, + taskMaster, + taskStorage, + metadataSupervisorManager, + sqlSegmentsMetadataManager, + indexerMetadataStorageCoordinator, + new MaterializedViewTaskConfig(), + EasyMock.createMock(AuthorizerMapper.class), + EasyMock.createMock(ChatHandlerProvider.class), + new SupervisorStateManagerConfig() + ); + MaterializedViewSupervisor supervisor = (MaterializedViewSupervisor) suspended.createSupervisor(); + Assert.assertThrows( + "Reset offsets not supported in MaterializedViewSupervisor", + UnsupportedOperationException.class, + () -> supervisor.resetOffsets(null) + ); + } } diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaDataSourceMetadataTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaDataSourceMetadataTest.java index 19336184ac08..54e3e10c43dd 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaDataSourceMetadataTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaDataSourceMetadataTest.java @@ -19,10 +19,20 @@ package org.apache.druid.indexing.kafka; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import com.google.inject.Injector; +import com.google.inject.name.Names; +import org.apache.druid.data.input.kafka.KafkaTopicPartition; +import org.apache.druid.guice.StartupInjectorBuilder; +import org.apache.druid.indexing.overlord.DataSourceMetadata; import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers; import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers; +import org.apache.druid.initialization.CoreInjectorBuilder; +import org.apache.druid.initialization.DruidModule; +import org.apache.druid.utils.CollectionUtils; import org.junit.Assert; import org.junit.Test; @@ -161,6 +171,37 @@ public void testMinus() ); } + @Test + public void testKafkaDataSourceMetadataSerdeRoundTrip() throws JsonProcessingException + { + ObjectMapper jsonMapper = createObjectMapper(); + + KafkaDataSourceMetadata kdm1 = endMetadata(ImmutableMap.of()); + String kdmStr1 = jsonMapper.writeValueAsString(kdm1); + DataSourceMetadata dsMeta1 = jsonMapper.readValue(kdmStr1, DataSourceMetadata.class); + Assert.assertEquals(kdm1, dsMeta1); + + KafkaDataSourceMetadata kdm2 = endMetadata(ImmutableMap.of(1, 3L)); + String kdmStr2 = jsonMapper.writeValueAsString(kdm2); + DataSourceMetadata dsMeta2 = jsonMapper.readValue(kdmStr2, DataSourceMetadata.class); + Assert.assertEquals(kdm2, dsMeta2); + } + + @Test + public void testKafkaDataSourceMetadataSerde() throws JsonProcessingException + { + ObjectMapper jsonMapper = createObjectMapper(); + KafkaDataSourceMetadata expectedKdm1 = endMetadata(ImmutableMap.of(1, 3L)); + String kdmStr1 = "{\"type\":\"kafka\",\"partitions\":{\"type\":\"end\",\"stream\":\"foo\",\"topic\":\"foo\",\"partitionSequenceNumberMap\":{\"1\":3},\"partitionOffsetMap\":{\"1\":3},\"exclusivePartitions\":[]}}\n"; + DataSourceMetadata dsMeta1 = jsonMapper.readValue(kdmStr1, DataSourceMetadata.class); + Assert.assertEquals(dsMeta1, expectedKdm1); + + KafkaDataSourceMetadata expectedKdm2 = endMetadata(ImmutableMap.of(1, 3L, 2, 1900L)); + String kdmStr2 = "{\"type\":\"kafka\",\"partitions\":{\"type\":\"end\",\"stream\":\"foo\",\"topic\":\"food\",\"partitionSequenceNumberMap\":{\"1\":3, \"2\":1900},\"partitionOffsetMap\":{\"1\":3, \"2\":1900},\"exclusivePartitions\":[]}}\n"; + DataSourceMetadata dsMeta2 = jsonMapper.readValue(kdmStr2, DataSourceMetadata.class); + Assert.assertEquals(dsMeta2, expectedKdm2); + } + private static KafkaDataSourceMetadata startMetadata(Map offsets) { return new KafkaDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>("foo", offsets, ImmutableSet.of())); @@ -170,4 +211,21 @@ private static KafkaDataSourceMetadata endMetadata(Map offsets) { return new KafkaDataSourceMetadata(new SeekableStreamEndSequenceNumbers<>("foo", offsets)); } + + private static ObjectMapper createObjectMapper() + { + DruidModule module = new KafkaIndexTaskModule(); + final Injector injector = new CoreInjectorBuilder(new StartupInjectorBuilder().build()) + .addModule( + binder -> { + binder.bindConstant().annotatedWith(Names.named("serviceName")).to("test"); + binder.bindConstant().annotatedWith(Names.named("servicePort")).to(8000); + binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(9000); + } + ).build(); + + ObjectMapper objectMapper = injector.getInstance(ObjectMapper.class); + module.getJacksonModules().forEach(objectMapper::registerModule); + return objectMapper; + } } diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisDataSourceMetadataTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisDataSourceMetadataTest.java index fbb3d6405ff6..87981531700e 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisDataSourceMetadataTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisDataSourceMetadataTest.java @@ -20,10 +20,18 @@ package org.apache.druid.indexing.kinesis; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import com.google.inject.Injector; +import com.google.inject.name.Names; +import org.apache.druid.guice.StartupInjectorBuilder; +import org.apache.druid.indexing.overlord.DataSourceMetadata; import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers; import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers; +import org.apache.druid.initialization.CoreInjectorBuilder; +import org.apache.druid.initialization.DruidModule; import org.junit.Assert; import org.junit.Test; @@ -217,6 +225,37 @@ public void testMinus() ); } + @Test + public void testKinesisDataSourceMetadataSerdeRoundTrip() throws JsonProcessingException + { + ObjectMapper jsonMapper = createObjectMapper(); + + KinesisDataSourceMetadata kdm1 = startMetadata(ImmutableMap.of(), ImmutableSet.of()); + String kdmStr1 = jsonMapper.writeValueAsString(kdm1); + DataSourceMetadata dsMeta1 = jsonMapper.readValue(kdmStr1, DataSourceMetadata.class); + Assert.assertEquals(kdm1, dsMeta1); + + KinesisDataSourceMetadata kdm2 = startMetadata(ImmutableMap.of("1", "3"), ImmutableSet.of()); + String kdmStr2 = jsonMapper.writeValueAsString(kdm2); + DataSourceMetadata dsMeta2 = jsonMapper.readValue(kdmStr2, DataSourceMetadata.class); + Assert.assertEquals(kdm2, dsMeta2); + } + + @Test + public void testKinesisDataSourceMetadataSerde() throws JsonProcessingException + { + ObjectMapper jsonMapper = createObjectMapper(); + KinesisDataSourceMetadata expectedKdm1 = endMetadata(ImmutableMap.of("1", "5")); + String kdmStr1 = "{\"type\":\"kinesis\",\"partitions\":{\"type\":\"end\",\"stream\":\"foo\",\"topic\":\"foo\",\"partitionSequenceNumberMap\":{\"1\":5},\"partitionOffsetMap\":{\"1\":5},\"exclusivePartitions\":[]}}\n"; + DataSourceMetadata dsMeta1 = jsonMapper.readValue(kdmStr1, DataSourceMetadata.class); + Assert.assertEquals(dsMeta1, expectedKdm1); + + KinesisDataSourceMetadata expectedKdm2 = endMetadata(ImmutableMap.of("1", "10", "2", "19")); + String kdmStr2 = "{\"type\":\"kinesis\",\"partitions\":{\"type\":\"end\",\"stream\":\"foo\",\"topic\":\"food\",\"partitionSequenceNumberMap\":{\"1\":10, \"2\":19},\"partitionOffsetMap\":{\"1\":10, \"2\":19},\"exclusivePartitions\":[]}}\n"; + DataSourceMetadata dsMeta2 = jsonMapper.readValue(kdmStr2, DataSourceMetadata.class); + Assert.assertEquals(dsMeta2, expectedKdm2); + } + private static KinesisDataSourceMetadata simpleStartMetadata(Map sequences) { return startMetadata(sequences, sequences.keySet()); @@ -233,4 +272,20 @@ private static KinesisDataSourceMetadata endMetadata(Map sequenc { return new KinesisDataSourceMetadata(new SeekableStreamEndSequenceNumbers<>("foo", sequences)); } + + private static ObjectMapper createObjectMapper() + { + DruidModule module = new KinesisIndexingServiceModule(); + final Injector injector = new CoreInjectorBuilder(new StartupInjectorBuilder().build()) + .addModule( + binder -> { + binder.bindConstant().annotatedWith(Names.named("serviceName")).to("test"); + binder.bindConstant().annotatedWith(Names.named("servicePort")).to(8000); + binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(9000); + } + ).build(); + ObjectMapper objectMapper = injector.getInstance(ObjectMapper.class); + module.getJacksonModules().forEach(objectMapper::registerModule); + return objectMapper; + } } diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java index 50fbe2f721db..a0328cf94287 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java @@ -2634,7 +2634,6 @@ public void testResetNoTasks() supervisor.resetInternal(null); verifyAll(); - } @Test diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java index 1258a1a6c582..05905159bfe4 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java @@ -201,7 +201,7 @@ public Optional isSupervisorHealthy(String id) return supervisor == null ? Optional.absent() : Optional.fromNullable(supervisor.lhs.isHealthy()); } - public boolean resetSupervisor(String id, @Nullable DataSourceMetadata dataSourceMetadata) + public boolean resetSupervisor(String id, @Nullable DataSourceMetadata resetDataSourceMetadata) { Preconditions.checkState(started, "SupervisorManager not started"); Preconditions.checkNotNull(id, "id"); @@ -212,7 +212,11 @@ public boolean resetSupervisor(String id, @Nullable DataSourceMetadata dataSourc return false; } - supervisor.lhs.reset(dataSourceMetadata); + if (resetDataSourceMetadata == null) { + supervisor.lhs.reset(null); + } else { + supervisor.lhs.resetOffsets(resetDataSourceMetadata); + } SupervisorTaskAutoScaler autoscaler = autoscalers.get(id); if (autoscaler != null) { autoscaler.reset(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java index a444d06bebc5..51c201df0521 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResource.java @@ -30,6 +30,7 @@ import com.google.common.collect.Sets; import com.google.inject.Inject; import com.sun.jersey.spi.container.ResourceFilters; +import org.apache.druid.indexing.overlord.DataSourceMetadata; import org.apache.druid.indexing.overlord.TaskMaster; import org.apache.druid.indexing.overlord.http.security.SupervisorResourceFilter; import org.apache.druid.java.util.common.StringUtils; @@ -40,6 +41,7 @@ import org.apache.druid.server.security.ForbiddenException; import org.apache.druid.server.security.ResourceAction; +import javax.annotation.Nullable; import javax.servlet.http.HttpServletRequest; import javax.ws.rs.Consumes; import javax.ws.rs.GET; @@ -455,10 +457,31 @@ public Response specGetHistory( @Produces(MediaType.APPLICATION_JSON) @ResourceFilters(SupervisorResourceFilter.class) public Response reset(@PathParam("id") final String id) + { + return handleResetRequest(id, null); + } + + @POST + @Path("/{id}/resetOffsets") + @Produces(MediaType.APPLICATION_JSON) + @Consumes(MediaType.APPLICATION_JSON) + @ResourceFilters(SupervisorResourceFilter.class) + public Response resetOffsets( + @PathParam("id") final String id, + final DataSourceMetadata resetDataSourceMetadata + ) + { + return handleResetRequest(id, resetDataSourceMetadata); + } + + private Response handleResetRequest( + final String id, + @Nullable final DataSourceMetadata resetDataSourceMetadata + ) { return asLeaderWithSupervisorManager( manager -> { - if (manager.resetSupervisor(id, null)) { + if (manager.resetSupervisor(id, resetDataSourceMetadata)) { return Response.ok(ImmutableMap.of("id", id)).build(); } else { return Response.status(Response.Status.NOT_FOUND) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 53769f84985a..5ad715c86fe4 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -42,6 +42,8 @@ import org.apache.commons.codec.digest.DigestUtils; import org.apache.druid.common.guava.FutureUtils; import org.apache.druid.data.input.impl.ByteEntity; +import org.apache.druid.error.DruidException; +import org.apache.druid.error.InvalidInput; import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatus; @@ -62,6 +64,7 @@ import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager; import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats; import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata; +import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClient; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClientFactory; @@ -95,6 +98,7 @@ import org.apache.druid.segment.indexing.DataSchema; import org.joda.time.DateTime; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import javax.validation.constraints.NotNull; import java.io.IOException; @@ -601,6 +605,31 @@ public String getType() } } + private class ResetOffsetsNotice implements Notice + { + final DataSourceMetadata dataSourceMetadata; + private static final String TYPE = "reset_offsets_notice"; + + ResetOffsetsNotice( + final DataSourceMetadata dataSourceMetadata + ) + { + this.dataSourceMetadata = dataSourceMetadata; + } + + @Override + public void handle() + { + resetOffsetsInternal(dataSourceMetadata); + } + + @Override + public String getType() + { + return TYPE; + } + } + protected class CheckpointNotice implements Notice { private final int taskGroupId; @@ -991,12 +1020,59 @@ public void stop(boolean stopGracefully) } @Override - public void reset(DataSourceMetadata dataSourceMetadata) + public void reset(@Nullable final DataSourceMetadata dataSourceMetadata) { - log.info("Posting ResetNotice"); + log.info("Posting ResetNotice with datasource metadata [%s]", dataSourceMetadata); addNotice(new ResetNotice(dataSourceMetadata)); } + /** + * Reset offsets with provided dataSource metadata. Validates {@code resetDataSourceMetadata}, + * creates a {@code ResetOffsetsNotice} with the metadata and adds it to the notice queue. The resulting stored offsets + * is a union of existing checkpointed offsets and provided offsets. + * @param resetDataSourceMetadata required datasource metadata with offsets to reset. + * @throws DruidException if any metadata attribute doesn't match the supervisor's. + */ + @Override + public void resetOffsets(@Nonnull DataSourceMetadata resetDataSourceMetadata) + { + if (resetDataSourceMetadata == null) { + throw InvalidInput.exception("Reset dataSourceMetadata is required for resetOffsets."); + } + + if (!checkSourceMetadataMatch(resetDataSourceMetadata)) { + throw InvalidInput.exception( + "Datasource metadata instance does not match required, found instance of [%s].", + resetDataSourceMetadata.getClass() + ); + } + @SuppressWarnings("unchecked") + final SeekableStreamDataSourceMetadata resetMetadata = + (SeekableStreamDataSourceMetadata) resetDataSourceMetadata; + + final SeekableStreamSequenceNumbers streamSequenceNumbers = resetMetadata.getSeekableStreamSequenceNumbers(); + if (!(streamSequenceNumbers instanceof SeekableStreamEndSequenceNumbers)) { + throw InvalidInput.exception( + "Provided datasourceMetadata[%s] is invalid. Sequence numbers can only be of type[%s], but found[%s].", + resetMetadata, + SeekableStreamEndSequenceNumbers.class.getSimpleName(), + streamSequenceNumbers.getClass().getSimpleName() + ); + } + + final String resetStream = streamSequenceNumbers.getStream(); + if (!ioConfig.getStream().equals(resetStream)) { + throw InvalidInput.exception( + "Stream[%s] doesn't exist in the supervisor[%s]. Supervisor is consuming stream[%s].", + resetStream, + supervisorId, + ioConfig.getStream() + ); + } + log.info("Posting ResetOffsetsNotice with reset dataSource metadata[%s]", resetDataSourceMetadata); + addNotice(new ResetOffsetsNotice(resetDataSourceMetadata)); + } + public ReentrantLock getRecordSupplierLock() { return recordSupplierLock; @@ -1689,6 +1765,70 @@ public void resetInternal(DataSourceMetadata dataSourceMetadata) } } + /** + * Reset offsets with the data source metadata. If checkpoints exist, the resulting stored offsets will be a union of + * existing checkpointed offsets and provided offsets; any checkpointed offsets not specified in the metadata will be + * preserved as-is. If checkpoints don't exist, the provided reset datasource metdadata will be inserted into + * the metadata storage. Once the offsets are reset, any active tasks serving the partition offsets will be restarted. + * @param dataSourceMetadata Required reset data source metdata. Assumed that the metadata is validated. + */ + public void resetOffsetsInternal(@Nonnull final DataSourceMetadata dataSourceMetadata) + { + log.info("Reset offsets for dataSource[%s] with metadata[%s]", dataSource, dataSourceMetadata); + + @SuppressWarnings("unchecked") + final SeekableStreamDataSourceMetadata resetMetadata = + (SeekableStreamDataSourceMetadata) dataSourceMetadata; + + final boolean metadataUpdateSuccess; + final DataSourceMetadata metadata = indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(dataSource); + if (metadata == null) { + log.info("Checkpointed metadata in null for dataSource[%s] - inserting metadata[%s]", dataSource, resetMetadata); + metadataUpdateSuccess = indexerMetadataStorageCoordinator.insertDataSourceMetadata(dataSource, resetMetadata); + } else { + if (!checkSourceMetadataMatch(metadata)) { + throw InvalidInput.exception( + "Datasource metadata instance does not match required, found instance of [%s]", + metadata.getClass() + ); + } + @SuppressWarnings("unchecked") + final SeekableStreamDataSourceMetadata currentMetadata = + (SeekableStreamDataSourceMetadata) metadata; + final DataSourceMetadata newMetadata = currentMetadata.plus(resetMetadata); + log.info("Current checkpointed metadata[%s], new metadata[%s] for dataSource[%s]", currentMetadata, newMetadata, dataSource); + try { + metadataUpdateSuccess = indexerMetadataStorageCoordinator.resetDataSourceMetadata(dataSource, newMetadata); + } + catch (IOException e) { + log.error("Reset offsets for dataSource[%s] with metadata[%s] failed [%s]", dataSource, newMetadata, e.getMessage()); + throw new RuntimeException(e); + } + } + + if (!metadataUpdateSuccess) { + throw new ISE("Unable to reset metadata[%s] for datasource[%s]", dataSource, dataSourceMetadata); + } + + resetMetadata.getSeekableStreamSequenceNumbers() + .getPartitionSequenceNumberMap() + .keySet() + .forEach(partition -> { + final int groupId = getTaskGroupIdForPartition(partition); + killTaskGroupForPartitions( + ImmutableSet.of(partition), + "DataSourceMetadata is updated while reset offsets is called" + ); + activelyReadingTaskGroups.remove(groupId); + // killTaskGroupForPartitions() cleans up partitionGroups. + // Add the removed groups back. + partitionGroups.computeIfAbsent(groupId, k -> new HashSet<>()); + partitionOffsets.put(partition, getNotSetMarker()); + }); + + } + + private void killTask(final String id, String reasonFormat, Object... args) { Optional taskQueue = taskMaster.getTaskQueue(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java index 498b6cfa4c18..d10f07292560 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java @@ -22,7 +22,10 @@ import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import org.apache.druid.indexing.overlord.DataSourceMetadata; +import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers; +import org.apache.druid.indexing.seekablestream.TestSeekableStreamDataSourceMetadata; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.metadata.MetadataSupervisorManager; import org.easymock.Capture; @@ -297,6 +300,33 @@ public void testResetSupervisor() verifyAll(); } + @Test + public void testResetSupervisorWithSpecificOffsets() + { + Map existingSpecs = ImmutableMap.of( + "id1", new TestSupervisorSpec("id1", supervisor1) + ); + + DataSourceMetadata datasourceMetadata = new TestSeekableStreamDataSourceMetadata( + new SeekableStreamStartSequenceNumbers<>( + "topic", + ImmutableMap.of("0", "10", "1", "20", "2", "30"), + ImmutableSet.of() + ) + ); + + EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs); + supervisor1.start(); + supervisor1.resetOffsets(datasourceMetadata); + replayAll(); + + manager.start(); + Assert.assertTrue("resetValidSupervisor", manager.resetSupervisor("id1", datasourceMetadata)); + Assert.assertFalse("resetInvalidSupervisor", manager.resetSupervisor("nobody_home", datasourceMetadata)); + + verifyAll(); + } + @Test public void testCreateSuspendResumeAndStopSupervisor() { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java index e15c43cc8180..f8b6a6f7ea19 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java @@ -27,6 +27,8 @@ import org.apache.druid.indexing.overlord.DataSourceMetadata; import org.apache.druid.indexing.overlord.TaskMaster; import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler; +import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers; +import org.apache.druid.indexing.seekablestream.TestSeekableStreamDataSourceMetadata; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.segment.TestHelper; import org.apache.druid.server.security.Access; @@ -1108,6 +1110,53 @@ public void testReset() verifyAll(); } + @Test + public void testResetOffsets() + { + Capture id1 = Capture.newInstance(); + Capture id2 = Capture.newInstance(); + EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)).times(2); + EasyMock.expect(supervisorManager.resetSupervisor( + EasyMock.capture(id1), + EasyMock.anyObject(DataSourceMetadata.class) + )).andReturn(true); + EasyMock.expect(supervisorManager.resetSupervisor( + EasyMock.capture(id2), + EasyMock.anyObject(DataSourceMetadata.class) + )).andReturn(false); + replayAll(); + + DataSourceMetadata datasourceMetadata = new TestSeekableStreamDataSourceMetadata( + new SeekableStreamStartSequenceNumbers<>( + "topic", + ImmutableMap.of("0", "10", "1", "20", "2", "30"), + ImmutableSet.of() + ) + ); + + Response response = supervisorResource.resetOffsets("my-id", datasourceMetadata); + + Assert.assertEquals(200, response.getStatus()); + Assert.assertEquals(ImmutableMap.of("id", "my-id"), response.getEntity()); + + response = supervisorResource.resetOffsets("my-id-2", datasourceMetadata); + + Assert.assertEquals(404, response.getStatus()); + Assert.assertEquals("my-id", id1.getValue()); + Assert.assertEquals("my-id-2", id2.getValue()); + verifyAll(); + + resetAll(); + + EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()); + replayAll(); + + response = supervisorResource.terminate("my-id"); + + Assert.assertEquals(503, response.getStatus()); + verifyAll(); + } + @Test public void testNoopSupervisorSpecSerde() throws Exception { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/TestSeekableStreamDataSourceMetadata.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/TestSeekableStreamDataSourceMetadata.java new file mode 100644 index 000000000000..7d87427e45c7 --- /dev/null +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/TestSeekableStreamDataSourceMetadata.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.seekablestream; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.indexing.overlord.DataSourceMetadata; + +public class TestSeekableStreamDataSourceMetadata extends SeekableStreamDataSourceMetadata +{ + @JsonCreator + public TestSeekableStreamDataSourceMetadata( + @JsonProperty("partitions") SeekableStreamSequenceNumbers seekableStreamSequenceNumbers) + { + super(seekableStreamSequenceNumbers); + } + + @Override + protected SeekableStreamDataSourceMetadata createConcreteDataSourceMetaData( + SeekableStreamSequenceNumbers seekableStreamSequenceNumbers + ) + { + return new TestSeekableStreamDataSourceMetadata(seekableStreamSequenceNumbers); + } + + @Override + public DataSourceMetadata asStartMetadata() + { + return null; + } +} diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java index ab3f9c5f9f1f..55400eb835fb 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java @@ -32,6 +32,9 @@ import org.apache.druid.data.input.impl.JsonInputFormat; import org.apache.druid.data.input.impl.StringDimensionSchema; import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.error.DruidException; +import org.apache.druid.error.DruidExceptionMatcher; +import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.TestUtils; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.common.task.TaskResource; @@ -55,6 +58,7 @@ import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig; import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers; +import org.apache.druid.indexing.seekablestream.TestSeekableStreamDataSourceMetadata; import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber; import org.apache.druid.indexing.seekablestream.common.RecordSupplier; import org.apache.druid.indexing.seekablestream.common.StreamException; @@ -78,6 +82,7 @@ import org.apache.druid.server.metrics.NoopServiceEmitter; import org.easymock.EasyMock; import org.easymock.EasyMockSupport; +import org.hamcrest.MatcherAssert; import org.joda.time.DateTime; import org.joda.time.Duration; import org.joda.time.Period; @@ -87,6 +92,7 @@ import javax.annotation.Nullable; import java.io.File; +import java.io.IOException; import java.math.BigInteger; import java.util.ArrayList; import java.util.Arrays; @@ -165,8 +171,7 @@ public void setupTest() taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class)); EasyMock.expectLastCall().times(0, 1); - EasyMock - .expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(null).anyTimes(); + EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(null).anyTimes(); EasyMock.expect(recordSupplier.getAssignment()).andReturn(ImmutableSet.of(SHARD0_PARTITION)).anyTimes(); EasyMock.expect(recordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("10").anyTimes(); } @@ -816,7 +821,6 @@ public void testEmitRecordLag() throws Exception null ); - supervisor.start(); Assert.assertTrue(supervisor.stateManager.isHealthy()); @@ -1019,6 +1023,590 @@ public void testGetStats() ); } + @Test + public void testSupervisorResetAllWithCheckpoints() throws InterruptedException + { + EasyMock.expect(spec.isSuspended()).andReturn(false); + EasyMock.expect(indexerMetadataStorageCoordinator.deleteDataSourceMetadata(DATASOURCE)).andReturn( + true + ); + taskQueue.shutdown("task1", "DataSourceMetadata is not found while reset"); + EasyMock.expectLastCall(); + replayAll(); + + final TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(); + + supervisor.start(); + supervisor.addTaskGroupToActivelyReadingTaskGroup( + supervisor.getTaskGroupIdForPartition("0"), + ImmutableMap.of("0", "5"), + Optional.absent(), + Optional.absent(), + ImmutableSet.of("task1"), + ImmutableSet.of() + ); + + supervisor.addTaskGroupToPendingCompletionTaskGroup( + supervisor.getTaskGroupIdForPartition("1"), + ImmutableMap.of("1", "6"), + Optional.absent(), + Optional.absent(), + ImmutableSet.of("task2"), + ImmutableSet.of() + ); + + Assert.assertEquals(1, supervisor.getActiveTaskGroupsCount()); + Assert.assertEquals(0, supervisor.getNoticesQueueSize()); + Assert.assertEquals(0, supervisor.getPartitionOffsets().size()); + + supervisor.reset(null); + validateSupervisorStateAfterResetOffsets(supervisor, ImmutableMap.of(), 0); + } + + @Test + public void testSupervisorResetOneTaskSpecificOffsetsWithCheckpoints() throws InterruptedException, IOException + { + final ImmutableMap checkpointOffsets = ImmutableMap.of("0", "0", "1", "10", "2", "20", "3", "30"); + final ImmutableMap resetOffsets = ImmutableMap.of("0", "1000", "2", "2500"); + final ImmutableMap expectedOffsets = ImmutableMap.of("0", "1000", "1", "10", "2", "2500", "3", "30"); + + EasyMock.expect(spec.isSuspended()).andReturn(false); + EasyMock.reset(indexerMetadataStorageCoordinator); + EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn( + new TestSeekableStreamDataSourceMetadata( + new SeekableStreamEndSequenceNumbers<>( + STREAM, + checkpointOffsets + ) + ) + ); + EasyMock.expect(indexerMetadataStorageCoordinator.resetDataSourceMetadata(DATASOURCE, new TestSeekableStreamDataSourceMetadata( + new SeekableStreamEndSequenceNumbers<>( + STREAM, + expectedOffsets + )) + )).andReturn( + true + ); + + taskQueue.shutdown("task1", "DataSourceMetadata is updated while reset offsets is called"); + EasyMock.expectLastCall(); + + replayAll(); + + final TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(); + + supervisor.start(); + supervisor.addTaskGroupToActivelyReadingTaskGroup( + supervisor.getTaskGroupIdForPartition("0"), + checkpointOffsets, + Optional.absent(), + Optional.absent(), + ImmutableSet.of("task1"), + ImmutableSet.of() + ); + + final DataSourceMetadata resetMetadata = new TestSeekableStreamDataSourceMetadata( + new SeekableStreamEndSequenceNumbers<>( + STREAM, + resetOffsets + ) + ); + + Assert.assertEquals(1, supervisor.getActiveTaskGroupsCount()); + Assert.assertEquals(0, supervisor.getNoticesQueueSize()); + Assert.assertEquals(0, supervisor.getPartitionOffsets().size()); + + supervisor.resetOffsets(resetMetadata); + + validateSupervisorStateAfterResetOffsets(supervisor, resetOffsets, 0); + } + + @Test + public void testSupervisorResetSpecificOffsetsTasksWithCheckpoints() throws InterruptedException, IOException + { + final ImmutableMap checkpointOffsets = ImmutableMap.of("0", "5", "1", "6", "2", "100"); + final ImmutableMap resetOffsets = ImmutableMap.of("0", "10", "1", "8"); + final ImmutableMap expectedOffsets = ImmutableMap.of("0", "10", "1", "8", "2", "100"); + + EasyMock.expect(spec.isSuspended()).andReturn(false); + EasyMock.reset(indexerMetadataStorageCoordinator); + EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn( + new TestSeekableStreamDataSourceMetadata( + new SeekableStreamEndSequenceNumbers<>( + STREAM, + checkpointOffsets + ) + ) + ); + EasyMock.expect(indexerMetadataStorageCoordinator.resetDataSourceMetadata(DATASOURCE, new TestSeekableStreamDataSourceMetadata( + new SeekableStreamEndSequenceNumbers<>( + "stream", + expectedOffsets + ) + ))).andReturn(true); + taskQueue.shutdown("task1", "DataSourceMetadata is updated while reset offsets is called"); + EasyMock.expectLastCall(); + + taskQueue.shutdown("task2", "DataSourceMetadata is updated while reset offsets is called"); + EasyMock.expectLastCall(); + + replayAll(); + + final TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(); + + // Spin off two active tasks with each task serving one partition. + supervisor.getIoConfig().setTaskCount(3); + supervisor.start(); + supervisor.addTaskGroupToActivelyReadingTaskGroup( + supervisor.getTaskGroupIdForPartition("0"), + ImmutableMap.of("0", "5"), + Optional.absent(), + Optional.absent(), + ImmutableSet.of("task1"), + ImmutableSet.of() + ); + + supervisor.addTaskGroupToActivelyReadingTaskGroup( + supervisor.getTaskGroupIdForPartition("1"), + ImmutableMap.of("1", "6"), + Optional.absent(), + Optional.absent(), + ImmutableSet.of("task2"), + ImmutableSet.of() + ); + + supervisor.addTaskGroupToActivelyReadingTaskGroup( + supervisor.getTaskGroupIdForPartition("2"), + ImmutableMap.of("2", "100"), + Optional.absent(), + Optional.absent(), + ImmutableSet.of("task3"), + ImmutableSet.of() + ); + + final DataSourceMetadata resetMetadata = new TestSeekableStreamDataSourceMetadata( + new SeekableStreamEndSequenceNumbers<>( + STREAM, + resetOffsets + ) + ); + + Assert.assertEquals(3, supervisor.getActiveTaskGroupsCount()); + Assert.assertEquals(0, supervisor.getNoticesQueueSize()); + Assert.assertEquals(0, supervisor.getPartitionOffsets().size()); + + supervisor.resetOffsets(resetMetadata); + + validateSupervisorStateAfterResetOffsets(supervisor, resetOffsets, 1); + } + + @Test + public void testSupervisorResetOffsetsWithNoCheckpoints() throws InterruptedException + { + final ImmutableMap resetOffsets = ImmutableMap.of("0", "10", "1", "8"); + final ImmutableMap expectedOffsets = ImmutableMap.copyOf(resetOffsets); + + EasyMock.expect(spec.isSuspended()).andReturn(false); + EasyMock.reset(indexerMetadataStorageCoordinator); + EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(null); + EasyMock.expect(indexerMetadataStorageCoordinator.insertDataSourceMetadata(DATASOURCE, new TestSeekableStreamDataSourceMetadata( + new SeekableStreamEndSequenceNumbers<>( + "stream", + expectedOffsets + ) + ))).andReturn(true); + taskQueue.shutdown("task1", "DataSourceMetadata is updated while reset offsets is called"); + EasyMock.expectLastCall(); + + taskQueue.shutdown("task2", "DataSourceMetadata is updated while reset offsets is called"); + EasyMock.expectLastCall(); + + replayAll(); + + final TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(); + + // Spin off three active tasks with each task serving one partition. + supervisor.getIoConfig().setTaskCount(3); + supervisor.start(); + supervisor.addTaskGroupToActivelyReadingTaskGroup( + supervisor.getTaskGroupIdForPartition("0"), + ImmutableMap.of("0", "5"), + Optional.absent(), + Optional.absent(), + ImmutableSet.of("task1"), + ImmutableSet.of() + ); + + supervisor.addTaskGroupToActivelyReadingTaskGroup( + supervisor.getTaskGroupIdForPartition("1"), + ImmutableMap.of("1", "6"), + Optional.absent(), + Optional.absent(), + ImmutableSet.of("task2"), + ImmutableSet.of() + ); + + supervisor.addTaskGroupToActivelyReadingTaskGroup( + supervisor.getTaskGroupIdForPartition("2"), + ImmutableMap.of("2", "100"), + Optional.absent(), + Optional.absent(), + ImmutableSet.of("task3"), + ImmutableSet.of() + ); + + final DataSourceMetadata resetMetadata = new TestSeekableStreamDataSourceMetadata( + new SeekableStreamEndSequenceNumbers<>( + STREAM, + resetOffsets + ) + ); + + Assert.assertEquals(3, supervisor.getActiveTaskGroupsCount()); + Assert.assertEquals(0, supervisor.getNoticesQueueSize()); + Assert.assertEquals(0, supervisor.getPartitionOffsets().size()); + + supervisor.resetOffsets(resetMetadata); + + validateSupervisorStateAfterResetOffsets(supervisor, resetOffsets, 1); + } + + + @Test + public void testSupervisorResetWithNoPartitions() throws IOException, InterruptedException + { + final ImmutableMap checkpointOffsets = ImmutableMap.of("0", "5", "1", "6"); + final ImmutableMap resetOffsets = ImmutableMap.of(); + final ImmutableMap expectedOffsets = ImmutableMap.of("0", "5", "1", "6"); + + EasyMock.expect(spec.isSuspended()).andReturn(false); + EasyMock.reset(indexerMetadataStorageCoordinator); + EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn( + new TestSeekableStreamDataSourceMetadata( + new SeekableStreamEndSequenceNumbers<>( + STREAM, + checkpointOffsets + ) + ) + ); + EasyMock.expect(indexerMetadataStorageCoordinator.resetDataSourceMetadata(DATASOURCE, new TestSeekableStreamDataSourceMetadata( + new SeekableStreamEndSequenceNumbers<>( + "stream", + expectedOffsets + ) + ))).andReturn(true); + + replayAll(); + + final TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(); + + // Spin off two active tasks with each task serving one partition. + supervisor.getIoConfig().setTaskCount(2); + supervisor.start(); + supervisor.addTaskGroupToActivelyReadingTaskGroup( + supervisor.getTaskGroupIdForPartition("0"), + ImmutableMap.of("0", "5"), + Optional.absent(), + Optional.absent(), + ImmutableSet.of("task1"), + ImmutableSet.of() + ); + + supervisor.addTaskGroupToActivelyReadingTaskGroup( + supervisor.getTaskGroupIdForPartition("1"), + ImmutableMap.of("1", "6"), + Optional.absent(), + Optional.absent(), + ImmutableSet.of("task2"), + ImmutableSet.of() + ); + + final DataSourceMetadata resetMetadata = new TestSeekableStreamDataSourceMetadata( + new SeekableStreamEndSequenceNumbers<>( + STREAM, + resetOffsets + ) + ); + + Assert.assertEquals(2, supervisor.getActiveTaskGroupsCount()); + Assert.assertEquals(0, supervisor.getNoticesQueueSize()); + Assert.assertEquals(0, supervisor.getPartitionOffsets().size()); + + supervisor.resetOffsets(resetMetadata); + + validateSupervisorStateAfterResetOffsets(supervisor, resetOffsets, 2); + } + + @Test + public void testSupervisorResetWithNewPartition() throws IOException, InterruptedException + { + final ImmutableMap checkpointOffsets = ImmutableMap.of("0", "5", "1", "6"); + final ImmutableMap resetOffsets = ImmutableMap.of("2", "20"); + final ImmutableMap expectedOffsets = ImmutableMap.of("0", "5", "1", "6", "2", "20"); + + EasyMock.expect(spec.isSuspended()).andReturn(false); + EasyMock.reset(indexerMetadataStorageCoordinator); + EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn( + new TestSeekableStreamDataSourceMetadata( + new SeekableStreamEndSequenceNumbers<>( + STREAM, + checkpointOffsets + ) + ) + ); + EasyMock.expect(indexerMetadataStorageCoordinator.resetDataSourceMetadata(DATASOURCE, new TestSeekableStreamDataSourceMetadata( + new SeekableStreamEndSequenceNumbers<>( + "stream", + expectedOffsets + ) + ))).andReturn(true); + taskQueue.shutdown("task1", "DataSourceMetadata is updated while reset offsets is called"); + EasyMock.expectLastCall(); + + replayAll(); + + final TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(); + + // Spin off two active tasks with each task serving one partition. + supervisor.getIoConfig().setTaskCount(2); + supervisor.start(); + supervisor.addTaskGroupToActivelyReadingTaskGroup( + supervisor.getTaskGroupIdForPartition("0"), + ImmutableMap.of("0", "5"), + Optional.absent(), + Optional.absent(), + ImmutableSet.of("task1"), + ImmutableSet.of() + ); + + supervisor.addTaskGroupToActivelyReadingTaskGroup( + supervisor.getTaskGroupIdForPartition("1"), + ImmutableMap.of("1", "6"), + Optional.absent(), + Optional.absent(), + ImmutableSet.of("task2"), + ImmutableSet.of() + ); + + final DataSourceMetadata resetMetadata = new TestSeekableStreamDataSourceMetadata( + new SeekableStreamEndSequenceNumbers<>( + STREAM, + resetOffsets + ) + ); + + Assert.assertEquals(2, supervisor.getActiveTaskGroupsCount()); + Assert.assertEquals(0, supervisor.getNoticesQueueSize()); + Assert.assertEquals(0, supervisor.getPartitionOffsets().size()); + + supervisor.resetOffsets(resetMetadata); + + validateSupervisorStateAfterResetOffsets(supervisor, resetOffsets, 1); + } + + @Test + public void testSupervisorNoResetDataSourceMetadata() + { + EasyMock.expect(spec.isSuspended()).andReturn(false); + replayAll(); + + final TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(); + + supervisor.start(); + supervisor.addTaskGroupToActivelyReadingTaskGroup( + supervisor.getTaskGroupIdForPartition("0"), + ImmutableMap.of("0", "0"), + Optional.absent(), + Optional.absent(), + ImmutableSet.of("task1"), + ImmutableSet.of() + ); + + supervisor.addTaskGroupToPendingCompletionTaskGroup( + supervisor.getTaskGroupIdForPartition("1"), + ImmutableMap.of("1", "0"), + Optional.absent(), + Optional.absent(), + ImmutableSet.of("task2"), + ImmutableSet.of() + ); + + verifyAll(); + + MatcherAssert.assertThat( + Assert.assertThrows(DruidException.class, () -> + supervisor.resetOffsets(null) + ), + DruidExceptionMatcher.invalidInput().expectMessageIs( + "Reset dataSourceMetadata is required for resetOffsets." + ) + ); + } + + @Test + public void testSupervisorResetWithInvalidStartSequenceMetadata() + { + EasyMock.expect(spec.isSuspended()).andReturn(false); + replayAll(); + + final TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(); + + supervisor.start(); + supervisor.addTaskGroupToActivelyReadingTaskGroup( + supervisor.getTaskGroupIdForPartition("0"), + ImmutableMap.of("0", "0"), + Optional.absent(), + Optional.absent(), + ImmutableSet.of("task1"), + ImmutableSet.of() + ); + + supervisor.addTaskGroupToPendingCompletionTaskGroup( + supervisor.getTaskGroupIdForPartition("1"), + ImmutableMap.of("1", "0"), + Optional.absent(), + Optional.absent(), + ImmutableSet.of("task2"), + ImmutableSet.of() + ); + + verifyAll(); + + final DataSourceMetadata dataSourceMetadata = new TestSeekableStreamDataSourceMetadata( + new SeekableStreamStartSequenceNumbers<>( + "i-am-not-real", + ImmutableMap.of("0", "10", "1", "20", "2", "30"), + ImmutableSet.of() + ) + ); + + MatcherAssert.assertThat( + Assert.assertThrows(DruidException.class, () -> + supervisor.resetOffsets(dataSourceMetadata) + ), + DruidExceptionMatcher.invalidInput().expectMessageIs( + StringUtils.format( + "Provided datasourceMetadata[%s] is invalid. Sequence numbers can only be of type[SeekableStreamEndSequenceNumbers], but found[SeekableStreamStartSequenceNumbers].", + dataSourceMetadata + ) + ) + ); + } + + @Test + public void testSupervisorResetInvalidStream() + { + EasyMock.expect(spec.isSuspended()).andReturn(false); + replayAll(); + + final TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(); + + supervisor.start(); + supervisor.addTaskGroupToActivelyReadingTaskGroup( + supervisor.getTaskGroupIdForPartition("0"), + ImmutableMap.of("0", "0"), + Optional.absent(), + Optional.absent(), + ImmutableSet.of("task1"), + ImmutableSet.of() + ); + + supervisor.addTaskGroupToPendingCompletionTaskGroup( + supervisor.getTaskGroupIdForPartition("1"), + ImmutableMap.of("1", "0"), + Optional.absent(), + Optional.absent(), + ImmutableSet.of("task2"), + ImmutableSet.of() + ); + + verifyAll(); + + final DataSourceMetadata dataSourceMetadata = new TestSeekableStreamDataSourceMetadata( + new SeekableStreamEndSequenceNumbers<>( + "i-am-not-real", + ImmutableMap.of("0", "10", "1", "20", "2", "30") + ) + ); + + MatcherAssert.assertThat( + Assert.assertThrows(DruidException.class, () -> + supervisor.resetOffsets(dataSourceMetadata) + ), + DruidExceptionMatcher.invalidInput().expectMessageIs( + "Stream[i-am-not-real] doesn't exist in the supervisor[testSupervisorId]. Supervisor is consuming stream[stream]." + ) + ); + } + + @Test + public void testStaleOffsetsNegativeLagNotEmitted() throws Exception + { + expectEmitterSupervisor(false); + + CountDownLatch latch = new CountDownLatch(1); + + final TestEmittingTestSeekableStreamSupervisor supervisor = new TestEmittingTestSeekableStreamSupervisor( + latch, + TestEmittingTestSeekableStreamSupervisor.LAG, + // Record lag must not be emitted + ImmutableMap.of("0", 10L, "1", -100L), + null + ); + supervisor.start(); + // Forcibly set the offsets to be stale + supervisor.sequenceLastUpdated = DateTimes.nowUtc().minus(Integer.MAX_VALUE); + + latch.await(); + + supervisor.emitLag(); + Assert.assertEquals(0, emitter.getEvents().size()); + } + + private void validateSupervisorStateAfterResetOffsets( + final TestSeekableStreamSupervisor supervisor, + final ImmutableMap expectedResetOffsets, + final int expectedActiveTaskCount + ) throws InterruptedException + { + // Wait for the notice queue to be drained asynchronously before we validate the supervisor's final state. + while (supervisor.getNoticesQueueSize() > 0) { + Thread.sleep(100); + } + Thread.sleep(1000); + Assert.assertEquals(expectedActiveTaskCount, supervisor.getActiveTaskGroupsCount()); + Assert.assertEquals(expectedResetOffsets.size(), supervisor.getPartitionOffsets().size()); + for (Map.Entry entry : expectedResetOffsets.entrySet()) { + Assert.assertEquals(supervisor.getNotSetMarker(), supervisor.getPartitionOffsets().get(entry.getKey())); + } + verifyAll(); + } + + @Test + public void testScheduleReporting() + { + EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes(); + DruidMonitorSchedulerConfig config = new DruidMonitorSchedulerConfig(); + EasyMock.expect(spec.getMonitorSchedulerConfig()).andReturn(config).times(2); + ScheduledExecutorService executorService = EasyMock.createMock(ScheduledExecutorService.class); + EasyMock.expect(executorService.scheduleWithFixedDelay(EasyMock.anyObject(), EasyMock.eq(86415000L), EasyMock.eq(300000L), EasyMock.eq(TimeUnit.MILLISECONDS))).andReturn(EasyMock.createMock(ScheduledFuture.class)).once(); + EasyMock.expect(executorService.scheduleAtFixedRate(EasyMock.anyObject(), EasyMock.eq(86425000L), EasyMock.eq(config.getEmissionDuration().getMillis()), EasyMock.eq(TimeUnit.MILLISECONDS))).andReturn(EasyMock.createMock(ScheduledFuture.class)).times(2); + + EasyMock.replay(executorService, spec); + final BaseTestSeekableStreamSupervisor supervisor = new BaseTestSeekableStreamSupervisor() + { + @Override + public LagStats computeLagStats() + { + return new LagStats(0, 0, 0); + } + }; + supervisor.scheduleReporting(executorService); + EasyMock.verify(executorService, spec); + } + + private List filterMetrics(List events, List whitelist) { List result = events.stream() @@ -1361,7 +1949,12 @@ protected List> createIndexT @Override protected int getTaskGroupIdForPartition(String partition) { - return 0; + try { + return Integer.parseInt(partition) % spec.getIoConfig().getTaskCount(); + } + catch (NumberFormatException e) { + return 0; + } } @Override diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java index b19aeaa28812..a137e1831aa3 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java @@ -150,6 +150,11 @@ public void reset(DataSourceMetadata dataSourceMetadata) { } + @Override + public void resetOffsets(DataSourceMetadata resetDataSourceMetadata) + { + } + @Override public void checkpoint(int taskGroupId, DataSourceMetadata checkpointMetadata) { diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java index 10c48578a638..bcfc5ebe8196 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java @@ -21,6 +21,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import org.apache.druid.error.DruidException; import org.apache.druid.indexing.overlord.DataSourceMetadata; import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats; import org.apache.druid.segment.incremental.ParseExceptionReport; @@ -61,8 +62,20 @@ default Boolean isHealthy() return null; // default implementation for interface compatability; returning null since true or false is misleading } + /** + * Resets all offsets for a dataSource. + * @param dataSourceMetadata optional dataSource metadata. + */ void reset(DataSourceMetadata dataSourceMetadata); + /** + * Reset offsets with provided dataSource metadata. The resulting stored offsets should be a union of existing checkpointed + * offsets with provided offsets. + * @param resetDataSourceMetadata required datasource metadata with offsets to reset. + * @throws DruidException if any metadata attribute doesn't match the supervisor's state. + */ + void resetOffsets(DataSourceMetadata resetDataSourceMetadata); + /** * The definition of checkpoint is not very strict as currently it does not affect data or control path. * On this call Supervisor can potentially checkpoint data processed so far to some durable storage diff --git a/server/src/test/java/org/apache/druid/indexing/NoopSupervisorSpecTest.java b/server/src/test/java/org/apache/druid/indexing/NoopSupervisorSpecTest.java index fd5fac09e51c..8e68e76ed518 100644 --- a/server/src/test/java/org/apache/druid/indexing/NoopSupervisorSpecTest.java +++ b/server/src/test/java/org/apache/druid/indexing/NoopSupervisorSpecTest.java @@ -19,8 +19,10 @@ package org.apache.druid.indexing; +import org.apache.druid.indexing.overlord.ObjectMetadata; import org.apache.druid.indexing.overlord.supervisor.NoopSupervisorSpec; import org.apache.druid.indexing.overlord.supervisor.Supervisor; +import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager; import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats; import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler; import org.junit.Assert; @@ -64,4 +66,30 @@ public Integer call() } Assert.assertNull(e); } +<<<<<<< HEAD +======= + + @Test + public void testInputSourceResources() + { + NoopSupervisorSpec noopSupervisorSpec = new NoopSupervisorSpec(null, Collections.singletonList("datasource1")); + Assert.assertTrue(noopSupervisorSpec.getInputSourceResources().isEmpty()); + } + + @Test + public void testNoppSupervisorResetOffsetsDoNothing() + { + NoopSupervisorSpec expectedSpec = new NoopSupervisorSpec(null, null); + Supervisor noOpSupervisor = expectedSpec.createSupervisor(); + Assert.assertEquals(-1, noOpSupervisor.getActiveTaskGroupsCount()); + noOpSupervisor.resetOffsets(null); + Assert.assertEquals(-1, noOpSupervisor.getActiveTaskGroupsCount()); + Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, noOpSupervisor.getState()); + + Assert.assertEquals(-1, noOpSupervisor.getActiveTaskGroupsCount()); + noOpSupervisor.resetOffsets(new ObjectMetadata("someObject")); + Assert.assertEquals(-1, noOpSupervisor.getActiveTaskGroupsCount()); + Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, noOpSupervisor.getState()); + } +>>>>>>> 37db5d9b81 (Reset offsets supervisor API (#14772)) } diff --git a/website/.spelling b/website/.spelling index 36a104a93009..d08d144f0272 100644 --- a/website/.spelling +++ b/website/.spelling @@ -1104,6 +1104,7 @@ numProcessors q.size repartitionTransitionDuration replicastaskCounttaskCount +resetOffsets resetuseEarliestSequenceNumberPOST resumePOST statusrecentErrorsdruid.supervisor.maxStoredExceptionEventsstatedetailedStatestatedetailedStatestatestatePENDINGRUNNINGSUSPENDEDSTOPPINGUNHEALTHY_SUPERVISORUNHEALTHY_TASKSdetailedStatestatedruid.supervisor.unhealthinessThresholddruid.supervisor.taskUnhealthinessThresholdtaskDurationtaskCountreplicasdetailedStatedetailedStateRUNNINGPOST