Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(controller): Auto-mount empty-dir for k8sapi/kubelet executor outputs. Closes #2679 #4766

Closed
wants to merge 32 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
85655d2
feat(controller): Auto-mount empty-dir for k8sapi/kubelet executor ou…
alexec Dec 17, 2020
014e5ac
auto-emptydir: M test/e2e/smoke_test.go
alexec Dec 17, 2020
8264c95
compile
alexec Dec 17, 2020
0aeb80b
Merge branch 'master' into auto-emptydir
alexec Jan 8, 2021
d131660
:auto-emptydir: M test/e2e/functional_test.go
alexec Jan 9, 2021
08004ae
Merge branch 'master' into auto-emptydir
alexec Jan 9, 2021
4edcd50
:auto-emptydir: M .github/workflows/ci-build.yaml
alexec Jan 10, 2021
b52c929
:auto-emptydir: M .github/workflows/ci-build.yaml
alexec Jan 10, 2021
88caf7e
add missing brace
alexec Jan 10, 2021
935a96e
add missing sq brace
alexec Jan 10, 2021
1e3b70c
docker != pns
alexec Jan 10, 2021
0e54aea
pipefail
alexec Jan 11, 2021
ecbff08
Merge branch 'master' into auto-emptydir
alexec Jan 17, 2021
f221cfa
Merge branch 'master' into auto-emptydir
alexec Jan 18, 2021
af75e70
Merge branch 'master' into auto-emptydir
alexec Jan 18, 2021
4a5ddd9
:auto-emptydir: M .github/workflows/ci-build.yaml
alexec Jan 19, 2021
7aa9e63
:auto-emptydir: M .github/workflows/ci-build.yaml
alexec Jan 19, 2021
58af926
:auto-emptydir: M Makefile
alexec Jan 19, 2021
099db87
fix test for optional output artifacts
alexec Jan 19, 2021
4a8c2a4
:auto-emptydir: M .github/workflows/ci-build.yaml
alexec Jan 19, 2021
7033caa
comment
alexec Jan 19, 2021
016bb21
:auto-emptydir: M .github/workflows/ci-build.yaml
alexec Jan 19, 2021
1810d23
sudo chmod
alexec Jan 19, 2021
713307f
:auto-emptydir: M .github/workflows/ci-build.yaml
alexec Jan 19, 2021
62cb2ca
Merge branch 'master' into auto-emptydir
alexec Jan 19, 2021
9079811
:auto-emptydir: M .github/workflows/ci-build.yaml
alexec Jan 19, 2021
7eb925f
:auto-emptydir: M .github/workflows/ci-build.yaml
alexec Jan 20, 2021
dd9d967
:auto-emptydir: M .github/workflows/ci-build.yaml
alexec Jan 20, 2021
2546384
Merge branch 'master' into auto-emptydir
alexec Jan 20, 2021
9aeb192
:auto-emptydir: M Makefile
alexec Jan 20, 2021
0e06310
Merge branch 'master' into auto-emptydir
alexec Jan 21, 2021
c0881ae
:auto-emptydir: M .github/workflows/ci-build.yaml
alexec Jan 21, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/ci-build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ jobs:
GOPATH: /home/runner/go
run: make ${{ matrix.test }} GOTEST='gotestsum --format testname --'
- name: Upload logs
if: ${{ failure() }}
# failure() does not include timeouts, so is useless
if: ${{ always() }}
uses: actions/upload-artifact@v1
with:
name: ${{ matrix.test }}-${{matrix.containerRuntimeExecutor}}-${{matrix.alwaysOffloadNodeStatus}}-${{ github.run_id }}-argo.log
Expand Down
3 changes: 2 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
FROM golang:1.13.4 as builder

ARG IMAGE_OS=linux
ARG IMAGE_ARCH=amd64

RUN apt-get update && apt-get --no-install-recommends install -y \
git \
Expand Down Expand Up @@ -129,7 +130,7 @@ FROM scratch as workflow-controller
USER 8737
ARG IMAGE_OS=linux
# Add timezone data
COPY --from=argo-build /usr/share/zoneinfo /usr/share/zoneinfo
COPY --from=argoexec-base /usr/share/zoneinfo /usr/share/zoneinfo
COPY --from=argo-build /go/src/github.com/argoproj/argo/dist/workflow-controller-${IMAGE_OS}-* /bin/workflow-controller
ENTRYPOINT [ "workflow-controller" ]

Expand Down
26 changes: 17 additions & 9 deletions Dockerfile.dev
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,13 @@ WORKDIR /tmp
ENV DOCKER_CHANNEL stable
ENV DOCKER_VERSION 18.09.1

RUN if [ "${IMAGE_OS}" = "linux" -a "${IMAGE_ARCH}" = "amd64" ]; then \
wget -O docker.tgz https://download.docker.com/linux/static/${DOCKER_CHANNEL}/x86_64/docker-${DOCKER_VERSION}.tgz; \
elif [ "${IMAGE_OS}" = "linux" -a "${IMAGE_ARCH}" = "arm64" ]; then \
wget -O docker.tgz https://download.docker.com/linux/static/${DOCKER_CHANNEL}/aarch64/docker-${DOCKER_VERSION}.tgz; \
RUN if [ "${IMAGE_OS}" = "linux" ]; then \
export IMAGE_ARCH=`uname -m`; \
if [ "${IMAGE_ARCH}" = "ppc64le" ] ||[ "${IMAGE_ARCH}" = "s390x" ]; then \
wget -O docker.tgz https://download.docker.com/${IMAGE_OS}/static/${DOCKER_CHANNEL}/${IMAGE_ARCH}/docker-18.06.3-ce.tgz; \
else \
wget -O docker.tgz https://download.docker.com/${IMAGE_OS}/static/${DOCKER_CHANNEL}/${IMAGE_ARCH}/docker-${DOCKER_VERSION}.tgz; \
fi \
fi && \
tar --extract --file docker.tgz --strip-components 1 --directory /usr/local/bin/ && \
rm docker.tgz
Expand All @@ -44,12 +47,11 @@ RUN if [ "${IMAGE_OS}" = "linux" -a "${IMAGE_ARCH}" = "amd64" ]; then \
# argoexec-base
# Used as the base for both the release and development version of argoexec
####################################################################################################
FROM debian:10.3-slim as argoexec-base
FROM debian:10.6-slim as argoexec-base

ARG IMAGE_OS=linux
ARG IMAGE_ARCH=amd64

# NOTE: keep the version synced with https://storage.googleapis.com/kubernetes-release/release/stable.txt
# NOTE: kubectl version should be one minor version less than https://storage.googleapis.com/kubernetes-release/release/stable.txt
ENV KUBECTL_VERSION=1.18.8
ENV JQ_VERSION=1.6
RUN apt-get update && \
Expand All @@ -63,16 +65,19 @@ RUN apt-get update && \
/usr/share/doc \
/usr/share/doc-base
ADD hack/recurl.sh .
RUN ./recurl.sh /usr/local/bin/kubectl https://storage.googleapis.com/kubernetes-release/release/v${KUBECTL_VERSION}/bin/linux/${IMAGE_ARCH}/kubectl
ADD hack/image_arch.sh .
RUN . ./image_arch.sh && ./recurl.sh /usr/local/bin/kubectl https://storage.googleapis.com/kubernetes-release/release/v${KUBECTL_VERSION}/bin/${IMAGE_OS}/${IMAGE_ARCH}/kubectl
RUN ./recurl.sh /usr/local/bin/jq https://github.com/stedolan/jq/releases/download/jq-${JQ_VERSION}/jq-linux64
RUN rm recurl.sh
COPY hack/ssh_known_hosts /etc/ssh/ssh_known_hosts
COPY hack/nsswitch.conf /etc/nsswitch.conf
COPY --from=builder /usr/local/bin/docker /usr/local/bin/

####################################################################################################
# argoexec
####################################################################################################
FROM argoexec-base as argoexec
ARG IMAGE_OS=linux
COPY argoexec /usr/local/bin/
ENTRYPOINT [ "argoexec" ]

Expand All @@ -81,8 +86,9 @@ ENTRYPOINT [ "argoexec" ]
####################################################################################################
FROM scratch as workflow-controller
USER 8737
ARG IMAGE_OS=linux
# Add timezone data
COPY --from=builder /usr/share/zoneinfo /usr/share/zoneinfo
COPY --from=argoexec-base /usr/share/zoneinfo /usr/share/zoneinfo
COPY workflow-controller /bin/
ENTRYPOINT [ "workflow-controller" ]

Expand All @@ -91,7 +97,9 @@ ENTRYPOINT [ "workflow-controller" ]
####################################################################################################
FROM scratch as argocli
USER 8737
ARG IMAGE_OS=linux
COPY --from=argoexec-base /etc/ssh/ssh_known_hosts /etc/ssh/ssh_known_hosts
COPY --from=argoexec-base /etc/nsswitch.conf /etc/nsswitch.conf
COPY --from=argoexec-base /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ca-certificates.crt
COPY --chown=8737 argo-server.crt argo-server.crt
COPY --chown=8737 argo-server.key argo-server.key
Expand Down
3 changes: 2 additions & 1 deletion test/e2e/manifests/mixins/workflow-controller-configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@ data:
memory: 64Mi
limits:
cpu: 0.5
memory: 128Mi
memory: 128Mi
kubeletInsecure: "true"
11 changes: 0 additions & 11 deletions test/e2e/smoke_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
package e2e

import (
"os"
"testing"
"time"

Expand Down Expand Up @@ -50,16 +49,6 @@ func (s *SmokeSuite) TestRunAsNonRootWorkflow() {
}

func (s *SmokeSuite) TestArtifactPassing() {

switch s.Config.ContainerRuntimeExecutor {
case common.ContainerRuntimeExecutorKubelet, common.ContainerRuntimeExecutorK8sAPI:
s.T().Skip("non-docker not supported")
case common.ContainerRuntimeExecutorPNS:
if os.Getenv("CI") == "true" {
s.T().Skip("non-docker not supported")
}
}

s.Given().
Workflow("@smoke/artifact-passing.yaml").
When().
Expand Down
2 changes: 1 addition & 1 deletion workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ func (woc *wfOperationCtx) operate(ctx context.Context) {
woc.markWorkflowFailed(ctx, msg)
return
}
validateOpts := validate.ValidateOpts{ContainerRuntimeExecutor: woc.controller.GetContainerRuntimeExecutor()}
validateOpts := validate.ValidateOpts{}
wftmplGetter := templateresolution.WrapWorkflowTemplateInterface(woc.controller.wfclientset.ArgoprojV1alpha1().WorkflowTemplates(woc.wf.Namespace))
cwftmplGetter := templateresolution.WrapClusterWorkflowTemplateInterface(woc.controller.wfclientset.ArgoprojV1alpha1().ClusterWorkflowTemplates())

Expand Down
48 changes: 46 additions & 2 deletions workflow/controller/workflowpod.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ import (
"context"
"encoding/json"
"fmt"
"hash/fnv"
"io"
"path/filepath"
"strconv"
"strings"
"time"

log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -282,7 +284,7 @@ func (woc *wfOperationCtx) createWorkflowPod(ctx context.Context, nodeName strin
if err != nil {
return nil, err
}
addOutputArtifactsVolumes(pod, tmpl)
woc.addOutputArtifactsVolumes(pod, tmpl)

// Set the container template JSON in pod annotations, which executor examines for things like
// artifact location/path.
Expand Down Expand Up @@ -899,7 +901,7 @@ func (woc *wfOperationCtx) addInputArtifactsVolumes(pod *apiv1.Pod, tmpl *wfv1.T
// wait container will collect the artifacts directly from volumeMount instead of `docker cp`-ing
// them to the wait sidecar. In order for this to work, we mirror all volume mounts in the main
// container under a well-known path.
func addOutputArtifactsVolumes(pod *apiv1.Pod, tmpl *wfv1.Template) {
func (woc *wfOperationCtx) addOutputArtifactsVolumes(pod *apiv1.Pod, tmpl *wfv1.Template) {
if tmpl.GetType() == wfv1.TemplateTypeResource {
return
}
Expand All @@ -920,15 +922,57 @@ func addOutputArtifactsVolumes(pod *apiv1.Pod, tmpl *wfv1.Template) {
mainCtr = &pod.Spec.Containers[mainCtrIndex]
waitCtr := &pod.Spec.Containers[waitCtrIndex]

if woc.needAutoMountOutputVolume(pod) {
z := func(path string) {
// the main container main create a file or a dir at path, so we must mount the parent dir
mountPath := filepath.Dir(path)
// we don't need to mount this if it is already mounted
for _, x := range mainCtr.VolumeMounts {
if strings.HasPrefix(mountPath, x.MountPath) {
return
}
}
h := fnv.New32()
_, _ = h.Write([]byte(mountPath))
name := fmt.Sprintf("output-%v", h.Sum32())
log.WithFields(log.Fields{"mountPath": mountPath, "name": name}).Debugln("auto-mounting output volume")
mainCtr.VolumeMounts = append(mainCtr.VolumeMounts, apiv1.VolumeMount{Name: name, MountPath: mountPath})
pod.Spec.Volumes = append(pod.Spec.Volumes, apiv1.Volume{Name: name, VolumeSource: apiv1.VolumeSource{EmptyDir: &apiv1.EmptyDirVolumeSource{}}})
}
for _, x := range tmpl.Outputs.Artifacts {
z(x.Path)
}
for _, x := range tmpl.Outputs.Parameters {
if x.ValueFrom != nil && x.ValueFrom.Path != "" {
z(x.ValueFrom.Path)
}
}
}

for _, mnt := range mainCtr.VolumeMounts {
mnt.MountPath = filepath.Join(common.ExecutorMainFilesystemDir, mnt.MountPath)
// ReadOnly is needed to be false for overlapping volume mounts
mnt.ReadOnly = false
waitCtr.VolumeMounts = append(waitCtr.VolumeMounts, mnt)
}

pod.Spec.Containers[mainCtrIndex] = *mainCtr
pod.Spec.Containers[waitCtrIndex] = *waitCtr
}

func (woc *wfOperationCtx) needAutoMountOutputVolume(pod *apiv1.Pod) bool {
switch woc.controller.GetContainerRuntimeExecutor() {
case common.ContainerRuntimeExecutorK8sAPI, common.ContainerRuntimeExecutorKubelet:
return true // must always be mounted - these executors do not support CopyFile/GetFileContents
case common.ContainerRuntimeExecutorPNS:
s := pod.Spec
return s.SecurityContext != nil && s.SecurityContext.RunAsNonRoot != nil && *s.SecurityContext.RunAsNonRoot
default:
// "docker" cannot support 'runAsNonRoot', so this must always be false
return false
}
}

// addArchiveLocation conditionally updates the template with the default artifact repository
// information configured in the controller, for the purposes of archiving outputs. This is skipped
// for templates which do not need to archive anything, or have explicitly set an archive location
Expand Down
42 changes: 4 additions & 38 deletions workflow/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,18 +370,15 @@ func (we *WorkflowExecutor) stageArchiveFile(mainCtrID string, art *wfv1.Artifac
}
}

if !we.isBaseImagePath(art.Path) {
mountedArtPath := filepath.Join(common.ExecutorMainFilesystemDir, art.Path)
if argofile.Exists(mountedArtPath) {
// If we get here, we are uploading an artifact from a mirrored volume mount which the wait
// sidecar has direct access to. We can upload directly from the shared volume mount,
// instead of copying it from the container.
mountedArtPath := filepath.Join(common.ExecutorMainFilesystemDir, art.Path)
log.Infof("Staging %s from mirrored volume mount %s", art.Path, mountedArtPath)
if strategy.None != nil {
fileName := filepath.Base(art.Path)
log.Infof("No compression strategy needed. Staging skipped")
if !argofile.Exists(mountedArtPath) {
return "", "", errors.Errorf(errors.CodeNotFound, "%s no such file or directory", art.Path)
}
return fileName, mountedArtPath, nil
}
fileName := fmt.Sprintf("%s.tgz", art.Name)
Expand Down Expand Up @@ -445,31 +442,6 @@ func (we *WorkflowExecutor) stageArchiveFile(mainCtrID string, art *wfv1.Artifac
return fileName, localArtPath, nil
}

// isBaseImagePath checks if the given artifact path resides in the base image layer of the container
// versus a shared volume mount between the wait and main container
func (we *WorkflowExecutor) isBaseImagePath(path string) bool {
// first check if path overlaps with a user-specified volumeMount
if common.FindOverlappingVolume(&we.Template, path) != nil {
return false
}
// next check if path overlaps with a shared input-artifact emptyDir mounted by argo
for _, inArt := range we.Template.Inputs.Artifacts {
if path == inArt.Path {
// The input artifact may have been optional and not supplied. If this is the case, the file won't exist on
// the input artifact volume. Since this function was called, we know that we want to use this path as an
// output artifact, so we should look for it in the base image path.
if inArt.Optional && !inArt.HasLocationOrKey() {
return true
}
return false
}
if strings.HasPrefix(path, inArt.Path+"/") {
return false
}
}
return true
}

// SaveParameters will save the content in the specified file path as output parameter value
func (we *WorkflowExecutor) SaveParameters(ctx context.Context) error {
if len(we.Template.Outputs.Parameters) == 0 {
Expand All @@ -490,13 +462,8 @@ func (we *WorkflowExecutor) SaveParameters(ctx context.Context) error {
}

var output *wfv1.AnyString
if we.isBaseImagePath(param.ValueFrom.Path) {
executorType := os.Getenv(common.EnvVarContainerRuntimeExecutor)
if executorType == common.ContainerRuntimeExecutorK8sAPI || executorType == common.ContainerRuntimeExecutorKubelet {
log.Infof("Copying output parameter %s from base image layer %s is not supported for k8sapi and kubelet executors. "+
"Consider using an emptyDir volume: https://argoproj.github.io/argo/empty-dir/.", param.Name, param.ValueFrom.Path)
continue
}
mountedPath := filepath.Join(common.ExecutorMainFilesystemDir, param.ValueFrom.Path)
if !argofile.Exists(mountedPath) {
log.Infof("Copying %s from base image layer", param.ValueFrom.Path)
fileContents, err := we.RuntimeExecutor.GetFileContents(mainCtrID, param.ValueFrom.Path)
if err != nil {
Expand All @@ -511,7 +478,6 @@ func (we *WorkflowExecutor) SaveParameters(ctx context.Context) error {
}
} else {
log.Infof("Copying %s from volume mount", param.ValueFrom.Path)
mountedPath := filepath.Join(common.ExecutorMainFilesystemDir, param.ValueFrom.Path)
data, err := ioutil.ReadFile(mountedPath)
if err != nil {
// We have a default value to use instead of returning an error
Expand Down
Loading