Skip to content

Commit

Permalink
add context to kafka supervisor for the kafka indexing task (apache#3464
Browse files Browse the repository at this point in the history
)
  • Loading branch information
pjain1 authored and fundead committed Dec 7, 2016
1 parent 8d4394e commit 80b0e4e
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -1272,7 +1272,7 @@ private void createKafkaTasksForGroup(int groupId, int replicas)
spec.getDataSchema(),
taskTuningConfig,
kafkaIOConfig,
ImmutableMap.<String, Object>of(),
spec.getContext(),
null
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,14 @@
import io.druid.indexing.overlord.supervisor.SupervisorSpec;
import io.druid.segment.indexing.DataSchema;

import java.util.Map;

public class KafkaSupervisorSpec implements SupervisorSpec
{
private final DataSchema dataSchema;
private final KafkaSupervisorTuningConfig tuningConfig;
private final KafkaSupervisorIOConfig ioConfig;
private final Map<String, Object> context;

private final TaskStorage taskStorage;
private final TaskMaster taskMaster;
Expand All @@ -50,6 +53,7 @@ public KafkaSupervisorSpec(
@JsonProperty("dataSchema") DataSchema dataSchema,
@JsonProperty("tuningConfig") KafkaSupervisorTuningConfig tuningConfig,
@JsonProperty("ioConfig") KafkaSupervisorIOConfig ioConfig,
@JsonProperty("context") Map<String, Object> context,
@JacksonInject TaskStorage taskStorage,
@JacksonInject TaskMaster taskMaster,
@JacksonInject IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator,
Expand Down Expand Up @@ -77,6 +81,7 @@ public KafkaSupervisorSpec(
null
);
this.ioConfig = Preconditions.checkNotNull(ioConfig, "ioConfig");
this.context = context;

this.taskStorage = taskStorage;
this.taskMaster = taskMaster;
Expand All @@ -103,6 +108,12 @@ public KafkaSupervisorIOConfig getIoConfig()
return ioConfig;
}

@JsonProperty
public Map<String, Object> getContext()
{
return context;
}

@Override
public String getId()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1632,6 +1632,7 @@ public KafkaIndexTaskClient build(
dataSchema,
tuningConfig,
kafkaSupervisorIOConfig,
null,
taskStorage,
taskMaster,
indexerMetadataStorageCoordinator,
Expand Down

0 comments on commit 80b0e4e

Please sign in to comment.