Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for middle manager less druid, tasks launch as k8s jobs #13156

Merged
merged 22 commits into from
Nov 3, 2022
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions core/src/main/java/org/apache/druid/indexer/TaskLocation.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.net.HostAndPort;
import org.apache.druid.java.util.common.IAE;

import javax.annotation.Nullable;
Expand All @@ -42,6 +43,11 @@ public static TaskLocation create(String host, int port, int tlsPort)
return new TaskLocation(host, port, tlsPort);
}

public static TaskLocation create(String host, int port, int tlsPort, boolean isTls)
{
return isTls ? new TaskLocation(host, -1, tlsPort) : new TaskLocation(host, port, -1);
}

public static TaskLocation unknown()
{
return TaskLocation.UNKNOWN;
Expand Down Expand Up @@ -127,4 +133,12 @@ public int hashCode()
{
return Objects.hash(host, port, tlsPort);
}

public HostAndPort toHostAndPort()
{
if (tlsPort >= 0) {
return HostAndPort.fromParts(host, tlsPort);
}
return HostAndPort.fromParts(host, port);
}
}
5 changes: 5 additions & 0 deletions core/src/main/java/org/apache/druid/indexer/TaskStatus.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ public static TaskStatus failure(String taskId, String errorMsg)
return new TaskStatus(taskId, TaskState.FAILED, -1, errorMsg, null);
}

public static TaskStatus success(String taskId, TaskLocation location)
{
return new TaskStatus(taskId, TaskState.SUCCESS, -1, null, location);
}

/**
* This method is deprecated for production because it does not handle the error message of failed task status properly.
* Use {@link #success(String)} or {@link #failure(String, String)} instead.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,5 +58,10 @@ public void testSerde() throws IOException
null
);
Assert.assertEquals(statusNoLocation, mapper.readValue(jsonNoLocation, TaskStatus.class));

TaskStatus success = TaskStatus.success("forkTaskID", new TaskLocation("localhost", 0, 1));
Assert.assertEquals(success.getLocation().getHost(), "localhost");
Assert.assertEquals(success.getLocation().getPort(), 0);
Assert.assertEquals(success.getLocation().getTlsPort(), 1);
}
}
1 change: 1 addition & 0 deletions distribution/docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ RUN addgroup -S -g 1000 druid \

COPY --chown=druid:druid --from=builder /opt /opt
COPY distribution/docker/druid.sh /druid.sh
COPY distribution/docker/peon.sh /peon.sh

# create necessary directories which could be mounted as volume
# /opt/druid/var is used to keep individual files(e.g. log) of each Druid service
Expand Down
155 changes: 155 additions & 0 deletions distribution/docker/peon.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
#!/bin/sh
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Meta commentary: it would be cool (and I don't think too crazy) if there was a Cli target that is like "task" which essentially does a remote bootstrap.

I.e. it could start up and the first thing it does is read some config for the overlord process, "phone home" to ask for the task spec and runtime properties, and then use those to bootstrap.

This might open up security holes, so it would probably have to be done with some sort of shared secret or something, maybe? But anyway, might simplify this script into something that is relatively generic and not even k8s-dependent.

Could also be done as an evolution of this change post-merge.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So when creating this patch, I was hoping to minimize core changes to increase the odds of this patch getting accepted. In all honesty, I believe if this is a first class feature, we can fix some other hacky stuff that was done. I definitely think having a Cli command for the task is a good idea, I think this is something that I could put up a PR if /when this gets merged.


#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

# NOTE: this is a 'run' script for the stock tarball
# It takes 1 required argument (the name of the service,
# e.g. 'broker', 'historical' etc). Any additional arguments
# are passed to that service.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is similar to the standard script. Is it a copy? If a copy, then perhaps we can avoid an actual copy: add a rule to the distribution project (likely in assembly.xml) to copy the existing script to peon.sh where needed.

If this version has changes, can they be applied to the standard script somehow to avoid the copy?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the peon.sh is copied into the dockerifle, do we need to add a rule here? I just do it in the dockerfile itself, user wont have to worry, if anything changes with the script it gets automatically updated. I believe that is the same thing that happens with the druid.sh script.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a comment here specifying that this is similar to druid.sh and is used exclusively for the kubernetes-overlord-extension?

#
# It accepts 'JAVA_OPTS' as an environment variable
#
# Additional env vars:
# - DRUID_LOG4J -- set the entire log4j.xml verbatim
# - DRUID_LOG_LEVEL -- override the default log level in default log4j
# - DRUID_XMX -- set Java Xmx
# - DRUID_XMS -- set Java Xms
# - DRUID_MAXNEWSIZE -- set Java max new size
# - DRUID_NEWSIZE -- set Java new size
# - DRUID_MAXDIRECTMEMORYSIZE -- set Java max direct memory size
#
# - DRUID_CONFIG_COMMON -- full path to a file for druid 'common' properties
# - DRUID_CONFIG_${service} -- full path to a file for druid 'service' properties

# This script is very similar to druid.sh, used exclusively for the kubernetes-overlord-extension.

set -e
SERVICE="overlord"

echo "$(date -Is) startup service $SERVICE"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is there a separate script for peon?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a good question, the service needs to be overlord, and the command is different. I also didn't want to modify the druid.sh script because this is an extension. I want to keep things totally different here.


# We put all the config in /tmp/conf to allow for a
# read-only root filesystem
mkdir -p /tmp/conf/
test -d /tmp/conf/druid && rm -r /tmp/conf/druid
cp -r /opt/druid/conf/druid /tmp/conf/druid

getConfPath() {
cluster_conf_base=/tmp/conf/druid/cluster
case "$1" in
_common) echo $cluster_conf_base/_common ;;
historical) echo $cluster_conf_base/data/historical ;;
middleManager) echo $cluster_conf_base/data/middleManager ;;
indexer) echo $cluster_conf_base/data/indexer ;;
coordinator | overlord) echo $cluster_conf_base/master/coordinator-overlord ;;
broker) echo $cluster_conf_base/query/broker ;;
router) echo $cluster_conf_base/query/router ;;
Comment on lines +58 to +63
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need all this for a script that is meant to launch a peon?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i can combine it in the druid.sh if you want...I'm totally happy to do that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because this was an extension (that is experimental), i did not want to change the druid.sh, something that everyone uses. That would mean changing the script for an experimental extension. I hope one day this work goes into core, if that was the case, I would've just modified the druid.sh to handle this.

*) echo $cluster_conf_base/misc/$1 ;;
esac
}
COMMON_CONF_DIR=$(getConfPath _common)
SERVICE_CONF_DIR=$(getConfPath ${SERVICE})

# Delete the old key (if existing) and append new key=value
setKey() {
service="$1"
key="$2"
value="$3"
service_conf=$(getConfPath $service)/runtime.properties
# Delete from all
sed -ri "/$key=/d" $COMMON_CONF_DIR/common.runtime.properties
[ -f $service_conf ] && sed -ri "/$key=/d" $service_conf
[ -f $service_conf ] && echo -e "\n$key=$value" >>$service_conf
[ -f $service_conf ] || echo -e "\n$key=$value" >>$COMMON_CONF_DIR/common.runtime.properties

echo "Setting $key=$value in $service_conf"
}

setJavaKey() {
service="$1"
key=$2
value=$3
file=$(getConfPath $service)/jvm.config
sed -ri "/$key/d" $file
echo $value >> $file
}

## Setup host names
if [ -n "${ZOOKEEPER}" ];
then
setKey _common druid.zk.service.host "${ZOOKEEPER}"
fi

DRUID_SET_HOST=${DRUID_SET_HOST:-1}
if [ "${DRUID_SET_HOST}" = "1" ]
then
setKey $SERVICE druid.host $(ip r get 1 | awk '{print $7;exit}')
fi

env | grep ^druid_ | while read evar;
do
# Can't use IFS='=' to parse since var might have = in it (e.g. password)
val=$(echo "$evar" | sed -e 's?[^=]*=??')
var=$(echo "$evar" | sed -e 's?^\([^=]*\)=.*?\1?g' -e 's?_?.?g')
setKey $SERVICE "$var" "$val"
done

env |grep ^s3service | while read evar
do
val=$(echo "$evar" | sed -e 's?[^=]*=??')
var=$(echo "$evar" | sed -e 's?^\([^=]*\)=.*?\1?g' -e 's?_?.?' -e 's?_?-?g')
echo "$var=$val" >>$COMMON_CONF_DIR/jets3t.properties
done

# This is to allow configuration via a Kubernetes configMap without
# e.g. using subPath (you can also mount the configMap on /tmp/conf/druid)
if [ -n "$DRUID_CONFIG_COMMON" ]
then
cp -f "$DRUID_CONFIG_COMMON" $COMMON_CONF_DIR/common.runtime.properties
fi

SCONFIG=$(printf "%s_%s" DRUID_CONFIG ${SERVICE})
SCONFIG=$(eval echo \$$(echo $SCONFIG))

if [ -n "${SCONFIG}" ]
then
cp -f "${SCONFIG}" $SERVICE_CONF_DIR/runtime.properties
fi

if [ -n "$DRUID_LOG_LEVEL" ]
then
sed -ri 's/"info"/"'$DRUID_LOG_LEVEL'"/g' $COMMON_CONF_DIR/log4j2.xml
fi

if [ -n "$DRUID_LOG4J" ]
then
echo "$DRUID_LOG4J" >$COMMON_CONF_DIR/log4j2.xml
fi

DRUID_DIRS_TO_CREATE=${DRUID_DIRS_TO_CREATE-'var/tmp var/druid/segments var/druid/indexing-logs var/druid/task var/druid/hadoop-tmp var/druid/segment-cache'}
if [ -n "${DRUID_DIRS_TO_CREATE}" ]
then
mkdir -p ${DRUID_DIRS_TO_CREATE}
fi

# take the ${TASK_JSON} environment variable and base64 decode, unzip and throw it in ${TASK_DIR}/task.json
mkdir -p ${TASK_DIR}; echo ${TASK_JSON} | base64 -d | gzip -d > ${TASK_DIR}/task.json;

exec java ${JAVA_OPTS} -cp $COMMON_CONF_DIR:$SERVICE_CONF_DIR:lib/*: org.apache.druid.cli.Main internal peon $@
2 changes: 2 additions & 0 deletions distribution/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,8 @@
<argument>-c</argument>
<argument>org.apache.druid.extensions.contrib:kafka-emitter</argument>
<argument>-c</argument>
<argument>org.apache.druid.extensions.contrib:druid-kubernetes-overlord-extensions</argument>
<argument>-c</argument>
<argument>org.apache.druid.extensions.contrib:materialized-view-maintenance</argument>
<argument>-c</argument>
<argument>org.apache.druid.extensions.contrib:materialized-view-selection</argument>
Expand Down
129 changes: 129 additions & 0 deletions docs/development/extensions-contrib/k8s-jobs.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
---
id: k8s-jobs
title: "MM-less Druid in K8s"
---

<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
-->

Apache Druid Extension to enable using Kubernetes for launching and managing tasks instead of the Middle Managers. This extension allows you to launch tasks as kubernetes jobs removing the need for your middle manager.

Consider this an [EXPERIMENTAL](../experimental.md) feature mostly because it has not been tested yet on a wide variety of long-running Druid clusters.

## How it works

The K8s extension takes the podSpec of your `Overlord` pod and creates a kubernetes job from this podSpec. Thus if you have sidecars such as Splunk or Istio it can optionally launch a task as a K8s job. All jobs are natively restorable, they are decoupled from the druid deployment, thus restarting pods or doing upgrades has no affect on tasks in flight. They will continue to run and when the overlord comes back up it will start tracking them again.

## Configuration

To use this extension please make sure to [include](../extensions.md#loading-extensions)`druid-kubernetes-overlord-extensions` in the extensions load list for your overlord process.

The extension uses the task queue to limit how many concurrent tasks (K8s jobs) are in flight so it is required you have a reasonable value for `druid.indexer.queue.maxSize`. Additionally set the variable `druid.indexer.runner.namespace` to the namespace in which you are running druid.

Other configurations required are:
`druid.indexer.runner.type: k8s` and `druid.indexer.task.encapsulatedTask: true`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

druid.indexer.task.encapsulatedTask doesn't seem documented anywhere (or in javadocs), do we need to elaborate on what it does?


You can add optional labels to your K8s jobs / pods if you need them by using the following configuration:
`druid.indexer.runner.labels: '{"key":"value"}'`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO the k8s indexer config should all be prefixed with druid.indexer.runner.k8s.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so i thought about that, I would have to duplicate the IndexingServiceModuleHelper just for this extension. How about I make you a deal, we can do all the cleanup in a following PR. Honestly there are a few hacks because this is an extension, if it gets into core, I can fix up a lot of the code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand. I just checked and other runner types are not namespacing their configuration as well. So I think it's ok.

Annotations are the same with:
`druid.indexer.runner.annotations: '{"key":"value"}'`

All other configurations you had for the middle manager tasks must be moved under the overlord with one caveat, you must specify javaOpts as an array:
`druid.indexer.runner.javaOptsArray`, `druid.indexer.runner.javaOpts` is no longer supported.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we say "no longer support", do we mean "not supported by the K8s extension"?

What does it mean to "be moved under the overlord"? Where we had druid.indexer.foo we now have druid.overlord.foo?

If this is so, and those properties only apply when K8s is in effect, should the config root perhaps be druid.k8s or some such?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No what I meant is that before you could pass javaOpts string and a javaOptsArray for your peon tasks, it was recommended to pass the array, but the string was still supported. In this patch, we only support the array.


If you are running without a middle manager you need to also use `druid.processing.intermediaryData.storage.type=deepstore`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Explain a bit? With K8s, there is no MM, right? So, under K8s, this setting is mandatory?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes you are absolutely correct.


Additional Configuration

### Properties
|Property|Possible Values|Description|Default|required|
|--------|---------------|-----------|-------|--------|
|`druid.indexer.runner.debugJobs`|`boolean`|Clean up K8s jobs after tasks complete.|False|No|
|`druid.indexer.runner.sidecarSupport`|`boolean`|If your overlord pod has sidecars, this will attempt to start the task with the same sidecars as the overlord pod.|False|No|
|`druid.indexer.runner.kubexitImage`|`String`|Used kubexit project to help shutdown sidecars when the main pod completes. Otherwise jobs with sidecars never terminate.|karlkfi/kubexit:latest|No|
|`druid.indexer.runner.disableClientProxy`|`boolean`|Use this if you have a global http(s) proxy and you wish to bypass it.|false|No|
|`druid.indexer.runner.maxTaskDuration`|`Duration`|Max time a task is allowed to run for before getting killed|`PT4H`|No|
|`druid.indexer.runner.taskCleanupDelay`|`Duration`|How long do jobs stay around before getting reaped from K8s|`P2D`|No|
|`druid.indexer.runner.taskCleanupInterval`|`Duration`|How often to check for jobs to be reaped|`PT10M`|No|
|`druid.indexer.runner.K8sjobLaunchTimeout`|`Duration`|How long to wait to launch a K8s task before marking it as failed, on a resource constrained cluster it may take some time.|`PT1H`|No|
|`druid.indexer.runner.javaOptsArray`|`JsonArray`|java opts for the task.|`-Xmx1g`|No|
|`druid.indexer.runner.labels`|`JsonObject`|Additional labels you want to add to peon pod|`{}`|No|
|`druid.indexer.runner.annotations`|`JsonObject`|Additional annotations you want to add to peon pod|`{}`|No|
|`druid.indexer.runner.graceTerminationPeriodSeconds`|`Long`|Number of seconds you want to wait after a sigterm for container lifecycle hooks to complete. Keep at a smaller value if you want tasks to hold locks for shorter periods.|`PT30S` (K8s default)|No|

churromorales marked this conversation as resolved.
Show resolved Hide resolved
### Gotchas

- You must have in your role the ability to launch jobs.
- All Druid Pods belonging to one Druid cluster must be inside same kubernetes namespace.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is this name space specified? I didn't see an option for it above.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be the same namespace under which the overlord is running.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additionally anything that communicates with the peon pod has to be in the same namespace.

- For the sidecar support to work, your entry point / command in docker must be explicitly defined your spec.

You can't have something like this:
Dockerfile:
``` ENTRYPOINT: ["foo.sh"] ```

and in your sidecar specs:
``` container:
name: foo
args:
- arg1
- arg2
```

That will not work, because we cannot decipher what your command is, the extension needs to know it explicitly.
**Even for sidecars like Istio which are dynamically created by the service mesh, this needs to happen.*

Instead do the following:
You can keep your Dockerfile the same but you must have a sidecar spec like so:
``` container:
name: foo
command: foo.sh
args:
- arg1
- arg2
```

The following roles must also be accessible. An example spec could be:

```
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: druid-cluster
rules:
- apiGroups:
- ""
resources:
- pods
- configmaps
- jobs
verbs:
- '*'
---
kind: RoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: druid-cluster
subjects:
- kind: ServiceAccount
name: default
roleRef:
kind: Role
name: druid-cluster
apiGroup: rbac.authorization.k8s.io
```
1 change: 1 addition & 0 deletions docs/development/extensions.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ All of these community extensions can be downloaded using [pull-deps](../operati
|druid-tdigestsketch|Support for approximate sketch aggregators based on [T-Digest](https://github.com/tdunning/t-digest)|[link](../development/extensions-contrib/tdigestsketch-quantiles.md)|
|gce-extensions|GCE Extensions|[link](../development/extensions-contrib/gce-extensions.md)|
|prometheus-emitter|Exposes [Druid metrics](../operations/metrics.md) for Prometheus server collection (https://prometheus.io/)|[link](./extensions-contrib/prometheus.md)|
|kubernetes-overlord-extensions|Support for launching tasks in k8s without Middle Managers|[link](../development/extensions-contrib/k8s-jobs.md)|

## Promoting community extensions to core extensions

Expand Down
Loading