Skip to content

Commit

Permalink
Reset offsets supervisor API (apache#14772)
Browse files Browse the repository at this point in the history
* Add supervisor /resetOffsets API.

- Add a new endpoint /druid/indexer/v1/supervisor/<supervisorId>/resetOffsets
which accepts DataSourceMetadata as a body parameter.
- Update logs, unit tests and docs.

* Add a new interface method for backwards compatibility.

* Rename

* Adjust tests and javadocs.

* Use CoreInjectorBuilder instead of deprecated makeInjectorWithModules

* UT fix

* Doc updates.

* remove extraneous debugging logs.

* Remove the boolean setting; only ResetHandle() and resetInternal()

* Relax constraints and add a new ResetOffsetsNotice; cleanup old logic.

* A separate ResetOffsetsNotice and some cleanup.

* Minor cleanup

* Add a check & test to verify that sequence numbers are only of type SeekableStreamEndSequenceNumbers

* Add unit tests for the no op implementations for test coverage

* CodeQL fix

* checkstyle from merge conflict

* Doc changes

* DOCUSAURUS code tabs fix. Thanks, Brian!
  • Loading branch information
abhishekrb19 authored and pagrawal10 committed Jun 12, 2024
1 parent 72ff8ed commit c63113b
Show file tree
Hide file tree
Showing 19 changed files with 4,536 additions and 11 deletions.
3,359 changes: 3,359 additions & 0 deletions docs/api-reference/supervisor-api.md

Large diffs are not rendered by default.

64 changes: 64 additions & 0 deletions docs/development/extensions-core/kafka-supervisor-operations.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,70 @@ to start and in flight tasks will fail. This operation enables you to recover fr

Note that the supervisor must be running for this endpoint to be available.

## Resetting Offsets for a Supervisor

The supervisor must be running for this endpoint to be available.

The `POST /druid/indexer/v1/supervisor/<supervisorId>/resetOffsets` operation clears stored
offsets, causing the supervisor to start reading from the specified offsets. After resetting stored
offsets, the supervisor kills and recreates any active tasks pertaining to the specified partitions,
so that tasks begin reading from specified offsets. For partitions that are not specified in this operation, the supervisor
will resume from the last stored offset.

Use care when using this operation! Resetting offsets for a supervisor may cause Kafka messages to be skipped or read
twice, resulting in missing or duplicate data.

#### Sample request

The following example shows how to reset offsets for a kafka supervisor with the name `social_media`. Let's say the supervisor is reading
from two kafka topics `ads_media_foo` and `ads_media_bar` and has the stored offsets: `{"ads_media_foo:0": 0, "ads_media_foo:1": 10, "ads_media_bar:0": 20, "ads_media_bar:1": 40}`.

<!--DOCUSAURUS_CODE_TABS-->

<!--cURL-->

```shell
curl --request POST "http://ROUTER_IP:ROUTER_PORT/druid/indexer/v1/supervisor/social_media/resetOffsets"
--header 'Content-Type: application/json'
--data-raw '{"type":"kafka","partitions":{"type":"end","stream":"ads_media_foo|ads_media_bar","partitionOffsetMap":{"ads_media_foo:0": 3, "ads_media_bar:1": 12}}}'
```

<!--HTTP-->

```HTTP
POST /druid/indexer/v1/supervisor/social_media/resetOffsets HTTP/1.1
Host: http://ROUTER_IP:ROUTER_PORT
Content-Type: application/json
{
"type": "kafka",
"partitions": {
"type": "end",
"stream": "ads_media_foo|ads_media_bar",
"partitionOffsetMap": {
"ads_media_foo:0": 3,
"ads_media_bar:1": 12
}
}
}
```
The above operation will reset offsets for `ads_media_foo` partition 0 and `ads_media_bar` partition 1 to offsets 3 and 12 respectively. After a successful reset,
when the supervisor's tasks restart, they will resume reading from `{"ads_media_foo:0": 3, "ads_media_foo:1": 10, "ads_media_bar:0": 20, "ads_media_bar:1": 12}`.

<!--END_DOCUSAURUS_CODE_TABS-->

#### Sample response

<details>
<summary>Click to show sample response</summary>

```json
{
"id": "social_media"
}
```
</details>

## Terminating Supervisors

The `POST /druid/indexer/v1/supervisor/<supervisorId>/terminate` operation terminates a supervisor and causes all
Expand Down
19 changes: 18 additions & 1 deletion docs/development/extensions-core/kinesis-ingestion.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ To use the Kinesis indexing service, load the `druid-kinesis-indexing-service` c

To use the Kinesis indexing service, load the `druid-kinesis-indexing-service` extension on both the Overlord and the MiddleManagers. Druid starts a supervisor for a dataSource when you submit a supervisor spec. Submit your supervisor spec to the following endpoint:

|Property|Type|Description|Required|
|--------|----|-----------|--------|
|`type`|String|The supervisor type; this should always be `kinesis`.|Yes|
|`spec`|Object|The container object for the supervisor configuration.|Yes|
|`ioConfig`|Object|The [I/O configuration](#supervisor-io-configuration) object for configuring Kinesis connection and I/O-related settings for the supervisor and indexing task.|Yes|
|`dataSchema`|Object|The schema used by the Kinesis indexing task during ingestion. See [`dataSchema`](../../ingestion/ingestion-spec.md#dataschema) for more information.|Yes|
|`tuningConfig`|Object|The [tuning configuration](#supervisor-tuning-configuration) object for configuring performance-related settings for the supervisor and indexing tasks.|No|

`http://<OVERLORD_IP>:<OVERLORD_PORT>/druid/indexer/v1/supervisor`

Expand Down Expand Up @@ -502,7 +509,17 @@ no longer available in Kinesis (typically because the message retention period h
removed and re-created) the supervisor will refuse to start and in-flight tasks will fail. This operation
enables you to recover from this condition.

Note that the supervisor must be running for this endpoint to be available.
### Resetting Offsets for a supervisor

To reset partition offsets for a supervisor, send a `POST` request to the `/druid/indexer/v1/supervisor/:supervisorId/resetOffsets` endpoint. This endpoint clears stored
sequence numbers, prompting the supervisor to start reading from the specified offsets.
After resetting stored offsets, the supervisor kills and recreates any active tasks pertaining to the specified partitions,
so that tasks begin reading specified offsets. For partitions that are not specified in this operation, the supervisor will resume from the last
stored offset.

Use this endpoint with caution as it may result in skipped messages, leading to data loss or duplicate data.

### Terminate a supervisor

### Terminating Supervisors

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,12 @@ public void reset(DataSourceMetadata dataSourceMetadata)
}
}

@Override
public void resetOffsets(DataSourceMetadata resetDataSourceMetadata)
{
throw new UnsupportedOperationException("Reset offsets not supported in MaterializedViewSupervisor");
}

@Override
public void checkpoint(int taskGroupId, DataSourceMetadata checkpointMetadata)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,4 +387,37 @@ public void testSuspendedDoesntRun()
EasyMock.replay(mock);
supervisor.run();
}

@Test
public void testResetOffsetsNotSupported()
{
MaterializedViewSupervisorSpec suspended = new MaterializedViewSupervisorSpec(
"base",
new DimensionsSpec(Collections.singletonList(new StringDimensionSchema("dim"))),
new AggregatorFactory[]{new LongSumAggregatorFactory("m1", "m1")},
HadoopTuningConfig.makeDefaultTuningConfig(),
null,
null,
null,
null,
null,
true,
objectMapper,
taskMaster,
taskStorage,
metadataSupervisorManager,
sqlSegmentsMetadataManager,
indexerMetadataStorageCoordinator,
new MaterializedViewTaskConfig(),
EasyMock.createMock(AuthorizerMapper.class),
EasyMock.createMock(ChatHandlerProvider.class),
new SupervisorStateManagerConfig()
);
MaterializedViewSupervisor supervisor = (MaterializedViewSupervisor) suspended.createSupervisor();
Assert.assertThrows(
"Reset offsets not supported in MaterializedViewSupervisor",
UnsupportedOperationException.class,
() -> supervisor.resetOffsets(null)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,20 @@

package org.apache.druid.indexing.kafka;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Injector;
import com.google.inject.name.Names;
import org.apache.druid.data.input.kafka.KafkaTopicPartition;
import org.apache.druid.guice.StartupInjectorBuilder;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
import org.apache.druid.initialization.CoreInjectorBuilder;
import org.apache.druid.initialization.DruidModule;
import org.apache.druid.utils.CollectionUtils;
import org.junit.Assert;
import org.junit.Test;

Expand Down Expand Up @@ -161,6 +171,37 @@ public void testMinus()
);
}

@Test
public void testKafkaDataSourceMetadataSerdeRoundTrip() throws JsonProcessingException
{
ObjectMapper jsonMapper = createObjectMapper();

KafkaDataSourceMetadata kdm1 = endMetadata(ImmutableMap.of());
String kdmStr1 = jsonMapper.writeValueAsString(kdm1);
DataSourceMetadata dsMeta1 = jsonMapper.readValue(kdmStr1, DataSourceMetadata.class);
Assert.assertEquals(kdm1, dsMeta1);

KafkaDataSourceMetadata kdm2 = endMetadata(ImmutableMap.of(1, 3L));
String kdmStr2 = jsonMapper.writeValueAsString(kdm2);
DataSourceMetadata dsMeta2 = jsonMapper.readValue(kdmStr2, DataSourceMetadata.class);
Assert.assertEquals(kdm2, dsMeta2);
}

@Test
public void testKafkaDataSourceMetadataSerde() throws JsonProcessingException
{
ObjectMapper jsonMapper = createObjectMapper();
KafkaDataSourceMetadata expectedKdm1 = endMetadata(ImmutableMap.of(1, 3L));
String kdmStr1 = "{\"type\":\"kafka\",\"partitions\":{\"type\":\"end\",\"stream\":\"foo\",\"topic\":\"foo\",\"partitionSequenceNumberMap\":{\"1\":3},\"partitionOffsetMap\":{\"1\":3},\"exclusivePartitions\":[]}}\n";
DataSourceMetadata dsMeta1 = jsonMapper.readValue(kdmStr1, DataSourceMetadata.class);
Assert.assertEquals(dsMeta1, expectedKdm1);

KafkaDataSourceMetadata expectedKdm2 = endMetadata(ImmutableMap.of(1, 3L, 2, 1900L));
String kdmStr2 = "{\"type\":\"kafka\",\"partitions\":{\"type\":\"end\",\"stream\":\"foo\",\"topic\":\"food\",\"partitionSequenceNumberMap\":{\"1\":3, \"2\":1900},\"partitionOffsetMap\":{\"1\":3, \"2\":1900},\"exclusivePartitions\":[]}}\n";
DataSourceMetadata dsMeta2 = jsonMapper.readValue(kdmStr2, DataSourceMetadata.class);
Assert.assertEquals(dsMeta2, expectedKdm2);
}

private static KafkaDataSourceMetadata startMetadata(Map<Integer, Long> offsets)
{
return new KafkaDataSourceMetadata(new SeekableStreamStartSequenceNumbers<>("foo", offsets, ImmutableSet.of()));
Expand All @@ -170,4 +211,21 @@ private static KafkaDataSourceMetadata endMetadata(Map<Integer, Long> offsets)
{
return new KafkaDataSourceMetadata(new SeekableStreamEndSequenceNumbers<>("foo", offsets));
}

private static ObjectMapper createObjectMapper()
{
DruidModule module = new KafkaIndexTaskModule();
final Injector injector = new CoreInjectorBuilder(new StartupInjectorBuilder().build())
.addModule(
binder -> {
binder.bindConstant().annotatedWith(Names.named("serviceName")).to("test");
binder.bindConstant().annotatedWith(Names.named("servicePort")).to(8000);
binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(9000);
}
).build();

ObjectMapper objectMapper = injector.getInstance(ObjectMapper.class);
module.getJacksonModules().forEach(objectMapper::registerModule);
return objectMapper;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,18 @@

package org.apache.druid.indexing.kinesis;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Injector;
import com.google.inject.name.Names;
import org.apache.druid.guice.StartupInjectorBuilder;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
import org.apache.druid.initialization.CoreInjectorBuilder;
import org.apache.druid.initialization.DruidModule;
import org.junit.Assert;
import org.junit.Test;

Expand Down Expand Up @@ -217,6 +225,37 @@ public void testMinus()
);
}

@Test
public void testKinesisDataSourceMetadataSerdeRoundTrip() throws JsonProcessingException
{
ObjectMapper jsonMapper = createObjectMapper();

KinesisDataSourceMetadata kdm1 = startMetadata(ImmutableMap.of(), ImmutableSet.of());
String kdmStr1 = jsonMapper.writeValueAsString(kdm1);
DataSourceMetadata dsMeta1 = jsonMapper.readValue(kdmStr1, DataSourceMetadata.class);
Assert.assertEquals(kdm1, dsMeta1);

KinesisDataSourceMetadata kdm2 = startMetadata(ImmutableMap.of("1", "3"), ImmutableSet.of());
String kdmStr2 = jsonMapper.writeValueAsString(kdm2);
DataSourceMetadata dsMeta2 = jsonMapper.readValue(kdmStr2, DataSourceMetadata.class);
Assert.assertEquals(kdm2, dsMeta2);
}

@Test
public void testKinesisDataSourceMetadataSerde() throws JsonProcessingException
{
ObjectMapper jsonMapper = createObjectMapper();
KinesisDataSourceMetadata expectedKdm1 = endMetadata(ImmutableMap.of("1", "5"));
String kdmStr1 = "{\"type\":\"kinesis\",\"partitions\":{\"type\":\"end\",\"stream\":\"foo\",\"topic\":\"foo\",\"partitionSequenceNumberMap\":{\"1\":5},\"partitionOffsetMap\":{\"1\":5},\"exclusivePartitions\":[]}}\n";
DataSourceMetadata dsMeta1 = jsonMapper.readValue(kdmStr1, DataSourceMetadata.class);
Assert.assertEquals(dsMeta1, expectedKdm1);

KinesisDataSourceMetadata expectedKdm2 = endMetadata(ImmutableMap.of("1", "10", "2", "19"));
String kdmStr2 = "{\"type\":\"kinesis\",\"partitions\":{\"type\":\"end\",\"stream\":\"foo\",\"topic\":\"food\",\"partitionSequenceNumberMap\":{\"1\":10, \"2\":19},\"partitionOffsetMap\":{\"1\":10, \"2\":19},\"exclusivePartitions\":[]}}\n";
DataSourceMetadata dsMeta2 = jsonMapper.readValue(kdmStr2, DataSourceMetadata.class);
Assert.assertEquals(dsMeta2, expectedKdm2);
}

private static KinesisDataSourceMetadata simpleStartMetadata(Map<String, String> sequences)
{
return startMetadata(sequences, sequences.keySet());
Expand All @@ -233,4 +272,20 @@ private static KinesisDataSourceMetadata endMetadata(Map<String, String> sequenc
{
return new KinesisDataSourceMetadata(new SeekableStreamEndSequenceNumbers<>("foo", sequences));
}

private static ObjectMapper createObjectMapper()
{
DruidModule module = new KinesisIndexingServiceModule();
final Injector injector = new CoreInjectorBuilder(new StartupInjectorBuilder().build())
.addModule(
binder -> {
binder.bindConstant().annotatedWith(Names.named("serviceName")).to("test");
binder.bindConstant().annotatedWith(Names.named("servicePort")).to(8000);
binder.bindConstant().annotatedWith(Names.named("tlsServicePort")).to(9000);
}
).build();
ObjectMapper objectMapper = injector.getInstance(ObjectMapper.class);
module.getJacksonModules().forEach(objectMapper::registerModule);
return objectMapper;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2634,7 +2634,6 @@ public void testResetNoTasks()

supervisor.resetInternal(null);
verifyAll();

}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ public Optional<Boolean> isSupervisorHealthy(String id)
return supervisor == null ? Optional.absent() : Optional.fromNullable(supervisor.lhs.isHealthy());
}

public boolean resetSupervisor(String id, @Nullable DataSourceMetadata dataSourceMetadata)
public boolean resetSupervisor(String id, @Nullable DataSourceMetadata resetDataSourceMetadata)
{
Preconditions.checkState(started, "SupervisorManager not started");
Preconditions.checkNotNull(id, "id");
Expand All @@ -212,7 +212,11 @@ public boolean resetSupervisor(String id, @Nullable DataSourceMetadata dataSourc
return false;
}

supervisor.lhs.reset(dataSourceMetadata);
if (resetDataSourceMetadata == null) {
supervisor.lhs.reset(null);
} else {
supervisor.lhs.resetOffsets(resetDataSourceMetadata);
}
SupervisorTaskAutoScaler autoscaler = autoscalers.get(id);
if (autoscaler != null) {
autoscaler.reset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import com.sun.jersey.spi.container.ResourceFilters;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.http.security.SupervisorResourceFilter;
import org.apache.druid.java.util.common.StringUtils;
Expand All @@ -40,6 +41,7 @@
import org.apache.druid.server.security.ForbiddenException;
import org.apache.druid.server.security.ResourceAction;

import javax.annotation.Nullable;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
Expand Down Expand Up @@ -455,10 +457,31 @@ public Response specGetHistory(
@Produces(MediaType.APPLICATION_JSON)
@ResourceFilters(SupervisorResourceFilter.class)
public Response reset(@PathParam("id") final String id)
{
return handleResetRequest(id, null);
}

@POST
@Path("/{id}/resetOffsets")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
@ResourceFilters(SupervisorResourceFilter.class)
public Response resetOffsets(
@PathParam("id") final String id,
final DataSourceMetadata resetDataSourceMetadata
)
{
return handleResetRequest(id, resetDataSourceMetadata);
}

private Response handleResetRequest(
final String id,
@Nullable final DataSourceMetadata resetDataSourceMetadata
)
{
return asLeaderWithSupervisorManager(
manager -> {
if (manager.resetSupervisor(id, null)) {
if (manager.resetSupervisor(id, resetDataSourceMetadata)) {
return Response.ok(ImmutableMap.of("id", id)).build();
} else {
return Response.status(Response.Status.NOT_FOUND)
Expand Down
Loading

0 comments on commit c63113b

Please sign in to comment.