Skip to content

Commit

Permalink
Support for middle manager less druid, tasks launch as k8s jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
Rahul Gidwani committed Sep 29, 2022
1 parent 306f612 commit 190f8dd
Show file tree
Hide file tree
Showing 113 changed files with 5,194 additions and 229 deletions.
14 changes: 14 additions & 0 deletions core/src/main/java/org/apache/druid/indexer/TaskLocation.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.net.HostAndPort;
import org.apache.druid.java.util.common.IAE;

import javax.annotation.Nullable;
Expand All @@ -42,6 +43,11 @@ public static TaskLocation create(String host, int port, int tlsPort)
return new TaskLocation(host, port, tlsPort);
}

public static TaskLocation create(String host, int port, int tlsPort, boolean isTls)
{
return isTls ? new TaskLocation(host, -1, tlsPort) : new TaskLocation(host, port, -1);
}

public static TaskLocation unknown()
{
return TaskLocation.UNKNOWN;
Expand Down Expand Up @@ -127,4 +133,12 @@ public int hashCode()
{
return Objects.hash(host, port, tlsPort);
}

public HostAndPort toHostAndPort()
{
if (tlsPort >= 0) {
return HostAndPort.fromParts(host, tlsPort);
}
return HostAndPort.fromParts(host, port);
}
}
5 changes: 5 additions & 0 deletions core/src/main/java/org/apache/druid/indexer/TaskStatus.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ public static TaskStatus failure(String taskId, String errorMsg)
return new TaskStatus(taskId, TaskState.FAILED, -1, errorMsg, null);
}

public static TaskStatus success(String taskId, TaskLocation location)
{
return new TaskStatus(taskId, TaskState.SUCCESS, -1, null, location);
}

/**
* This method is deprecated for production because it does not handle the error message of failed task status properly.
* Use {@link #success(String)} or {@link #failure(String, String)} instead.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,5 +58,10 @@ public void testSerde() throws IOException
null
);
Assert.assertEquals(statusNoLocation, mapper.readValue(jsonNoLocation, TaskStatus.class));

TaskStatus success = TaskStatus.success("forkTaskID", new TaskLocation("localhost", 0, 1));
Assert.assertEquals(success.getLocation().getHost(), "localhost");
Assert.assertEquals(success.getLocation().getPort(), 0);
Assert.assertEquals(success.getLocation().getTlsPort(), 1);
}
}
1 change: 1 addition & 0 deletions distribution/docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ RUN addgroup -S -g 1000 druid \

COPY --chown=druid:druid --from=builder /opt /opt
COPY distribution/docker/druid.sh /druid.sh
COPY distribution/docker/peon.sh /peon.sh

# create necessary directories which could be mounted as volume
# /opt/druid/var is used to keep individual files(e.g. log) of each Druid service
Expand Down
153 changes: 153 additions & 0 deletions distribution/docker/peon.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
#!/bin/sh

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

# NOTE: this is a 'run' script for the stock tarball
# It takes 1 required argument (the name of the service,
# e.g. 'broker', 'historical' etc). Any additional arguments
# are passed to that service.
#
# It accepts 'JAVA_OPTS' as an environment variable
#
# Additional env vars:
# - DRUID_LOG4J -- set the entire log4j.xml verbatim
# - DRUID_LOG_LEVEL -- override the default log level in default log4j
# - DRUID_XMX -- set Java Xmx
# - DRUID_XMS -- set Java Xms
# - DRUID_MAXNEWSIZE -- set Java max new size
# - DRUID_NEWSIZE -- set Java new size
# - DRUID_MAXDIRECTMEMORYSIZE -- set Java max direct memory size
#
# - DRUID_CONFIG_COMMON -- full path to a file for druid 'common' properties
# - DRUID_CONFIG_${service} -- full path to a file for druid 'service' properties

set -e
SERVICE="overlord"

echo "$(date -Is) startup service $SERVICE"

# We put all the config in /tmp/conf to allow for a
# read-only root filesystem
mkdir -p /tmp/conf/
test -d /tmp/conf/druid && rm -r /tmp/conf/druid
cp -r /opt/druid/conf/druid /tmp/conf/druid

getConfPath() {
cluster_conf_base=/tmp/conf/druid/cluster
case "$1" in
_common) echo $cluster_conf_base/_common ;;
historical) echo $cluster_conf_base/data/historical ;;
middleManager) echo $cluster_conf_base/data/middleManager ;;
indexer) echo $cluster_conf_base/data/indexer ;;
coordinator | overlord) echo $cluster_conf_base/master/coordinator-overlord ;;
broker) echo $cluster_conf_base/query/broker ;;
router) echo $cluster_conf_base/query/router ;;
*) echo $cluster_conf_base/misc/$1 ;;
esac
}
COMMON_CONF_DIR=$(getConfPath _common)
SERVICE_CONF_DIR=$(getConfPath ${SERVICE})

# Delete the old key (if existing) and append new key=value
setKey() {
service="$1"
key="$2"
value="$3"
service_conf=$(getConfPath $service)/runtime.properties
# Delete from all
sed -ri "/$key=/d" $COMMON_CONF_DIR/common.runtime.properties
[ -f $service_conf ] && sed -ri "/$key=/d" $service_conf
[ -f $service_conf ] && echo -e "\n$key=$value" >>$service_conf
[ -f $service_conf ] || echo -e "\n$key=$value" >>$COMMON_CONF_DIR/common.runtime.properties

echo "Setting $key=$value in $service_conf"
}

setJavaKey() {
service="$1"
key=$2
value=$3
file=$(getConfPath $service)/jvm.config
sed -ri "/$key/d" $file
echo $value >> $file
}

## Setup host names
if [ -n "${ZOOKEEPER}" ];
then
setKey _common druid.zk.service.host "${ZOOKEEPER}"
fi

DRUID_SET_HOST=${DRUID_SET_HOST:-1}
if [ "${DRUID_SET_HOST}" = "1" ]
then
setKey $SERVICE druid.host $(ip r get 1 | awk '{print $7;exit}')
fi

env | grep ^druid_ | while read evar;
do
# Can't use IFS='=' to parse since var might have = in it (e.g. password)
val=$(echo "$evar" | sed -e 's?[^=]*=??')
var=$(echo "$evar" | sed -e 's?^\([^=]*\)=.*?\1?g' -e 's?_?.?g')
setKey $SERVICE "$var" "$val"
done

env |grep ^s3service | while read evar
do
val=$(echo "$evar" | sed -e 's?[^=]*=??')
var=$(echo "$evar" | sed -e 's?^\([^=]*\)=.*?\1?g' -e 's?_?.?' -e 's?_?-?g')
echo "$var=$val" >>$COMMON_CONF_DIR/jets3t.properties
done

# This is to allow configuration via a Kubernetes configMap without
# e.g. using subPath (you can also mount the configMap on /tmp/conf/druid)
if [ -n "$DRUID_CONFIG_COMMON" ]
then
cp -f "$DRUID_CONFIG_COMMON" $COMMON_CONF_DIR/common.runtime.properties
fi

SCONFIG=$(printf "%s_%s" DRUID_CONFIG ${SERVICE})
SCONFIG=$(eval echo \$$(echo $SCONFIG))

if [ -n "${SCONFIG}" ]
then
cp -f "${SCONFIG}" $SERVICE_CONF_DIR/runtime.properties
fi

if [ -n "$DRUID_LOG_LEVEL" ]
then
sed -ri 's/"info"/"'$DRUID_LOG_LEVEL'"/g' $COMMON_CONF_DIR/log4j2.xml
fi

if [ -n "$DRUID_LOG4J" ]
then
echo "$DRUID_LOG4J" >$COMMON_CONF_DIR/log4j2.xml
fi

DRUID_DIRS_TO_CREATE=${DRUID_DIRS_TO_CREATE-'var/tmp var/druid/segments var/druid/indexing-logs var/druid/task var/druid/hadoop-tmp var/druid/segment-cache'}
if [ -n "${DRUID_DIRS_TO_CREATE}" ]
then
mkdir -p ${DRUID_DIRS_TO_CREATE}
fi

# take the ${TASK_JSON} environment variable and base64 decode, unzip and throw it in ${TASK_DIR}/task.json
mkdir -p ${TASK_DIR}; echo ${TASK_JSON} | base64 -d | gzip -d > ${TASK_DIR}/task.json;

exec java ${JAVA_OPTS} -cp $COMMON_CONF_DIR:$SERVICE_CONF_DIR:lib/*: org.apache.druid.cli.Main internal peon $@
2 changes: 2 additions & 0 deletions distribution/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,8 @@
<argument>-c</argument>
<argument>org.apache.druid.extensions.contrib:kafka-emitter</argument>
<argument>-c</argument>
<argument>org.apache.druid.extensions.contrib:druid-kubernetes-overlord-extensions</argument>
<argument>-c</argument>
<argument>org.apache.druid.extensions.contrib:materialized-view-maintenance</argument>
<argument>-c</argument>
<argument>org.apache.druid.extensions.contrib:materialized-view-selection</argument>
Expand Down
96 changes: 96 additions & 0 deletions docs/development/extensions-contrib/k8s-jobs.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
---
id: k8s-jobs
title: "MM-less Druid in K8s"
---

<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
-->

Consider this an [EXPERIMENTAL](../experimental.md) feature mostly because it has not been tested yet on a wide variety of long-running Druid clusters.

Apache Druid Extension to enable using Kubernetes for launching and managing tasks instead of the Middle Managers. This extension allows you to launch tasks as K8s jobs removing the need for your middle manager.

## How it works

It takes the podSpec of your `Overlord`pod and creates a kubernetes job from this podSpec. Thus if you have sidecars such as splunk, hubble, istio it can optionally launch a task as a k8s job. All jobs are natively restorable, they are decopled from the druid deployment, thus restarting pods or doing upgrades has no affect on tasks in flight. They will continue to run and when the overlord comes back up it will start tracking them again.

## Configuration

To use this extension please make sure to [include](../extensions.md#loading-extensions)`druid-kubernetes-overlord-extensions` in the extensions load list for your overlord process.

The extension uses the task queue to limit how many concurrent tasks (k8s jobs) are in flight so it is required you have a reasonable value for `druid.indexer.queue.maxSize`. Additionally set the variable `druid.indexer.runner.namespace` to the namespace in which you are running druid.

Other configurations required are:
`druid.indexer.runner.type: k8s` and `druid.indexer.task.enableTaskLevelLogPush: true`

You can add optional labels to your k8s jobs / pods if you need them by using the following configuration:
`druid.indexer.runner.labels: '{"key":"value"}'`

All other configurations you had for the middle manager tasks must be moved under the overlord with one caveat, you must specify javaOpts as an array:
`druid.indexer.runner.javaOptsArray`, `druid.indexer.runner.javaOpts` is no longer supported.

If you are running without a middle manager you need to also use `druid.processing.intermediaryData.storage.type=deepstore`

Additional Configuration

### Properties
|Property|Possible Values|Description|Default|required|
|--------|---------------|-----------|-------|--------|
|`druid.indexer.runner.debugJobs`|`boolean`|Clean up k8s jobs after tasks complete.|False|No|
|`druid.indexer.runner.sidecarSupport`|`boolean`|If your overlord pod has sidecars, this will attempt to start the task with the same sidecars as the overlord pod.|False|No|
|`druid.indexer.runner.kubexitImage`|`String`|Used kubexit project to help shutdown sidecars when the main pod completes. Otherwise jobs with sidecars never terminate.|karlkfi/kubexit:latest|No|
|`druid.indexer.runner.maxTaskDuration`|`Duration`|Max time a task is allowed to run for before getting killed|4H|No|
|`druid.indexer.runner.taskCleanupDelay`|`Duration`|How long do jobs stay around before getting reaped from k8s|2D|No|
|`druid.indexer.runner.taskCleanupInterval`|`Duration`|How often to check for jobs to be reaped|10m|No|
|`druid.indexer.runner.k8sjobLaunchTimeout`|`Duration`|How long to wait to launch a k8s task before marking it as failed, on a resource constrained cluster it may take some time.|1H|No|
|`druid.indexer.runner.javaOptsArray`|`Duration`|java opts for the task.|-Xmx1g|No|
|`druid.indexer.runner.graceTerminationPeriodSeconds`|`Long`|Number of seconds you want to wait after a sigterm for container lifecycle hooks to complete. Keep at a smaller value if you want tasks to hold locks for shorter periods.|30s (k8s default)|No|

### Gotchas

- You must have in your role the abiliity to launch jobs.
- All Druid Pods belonging to one Druid cluster must be inside same kubernetes namespace.

```
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: druid-cluster
rules:
- apiGroups:
- ""
resources:
- pods
- configmaps
- jobs
verbs:
- '*'
---
kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: druid-cluster
subjects:
- kind: ServiceAccount
name: default
roleRef:
kind: Role
name: druid-cluster
apiGroup: rbac.authorization.k8s.io
```
1 change: 1 addition & 0 deletions docs/development/extensions.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ All of these community extensions can be downloaded using [pull-deps](../operati
|druid-tdigestsketch|Support for approximate sketch aggregators based on [T-Digest](https://github.com/tdunning/t-digest)|[link](../development/extensions-contrib/tdigestsketch-quantiles.md)|
|gce-extensions|GCE Extensions|[link](../development/extensions-contrib/gce-extensions.md)|
|prometheus-emitter|Exposes [Druid metrics](../operations/metrics.md) for Prometheus server collection (https://prometheus.io/)|[link](./extensions-contrib/prometheus.md)|
|kubernetes-overlord-extensions|Support for launching tasks in k8s, no more Middle Managers|[link](../development/extensions-contrib/k8s-jobs.md)|

## Promoting community extensions to core extensions

Expand Down
Loading

0 comments on commit 190f8dd

Please sign in to comment.