Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integration test for k8s spark operator support #1236

Merged
merged 1 commit into from
Dec 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion infra/scripts/codebuild_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import sys
import argparse
import boto3
from botocore.config import Config


class LogTailer:
Expand Down Expand Up @@ -125,7 +126,14 @@ def source_version_from_prow_job_spec(job_spec: Dict[str, Any]) -> str:

async def run_build(project_name: str, source_version: str, source_location: str):
print(f"Building {project_name} at {source_version}", file=sys.stderr)
logs_client = boto3.client("logs", region_name="us-west-2")

config = Config(
retries = {
'max_attempts': 10,
}
)

logs_client = boto3.client("logs", region_name="us-west-2", config=config)
codebuild_client = boto3.client("codebuild", region_name="us-west-2")

print("Submitting the build..", file=sys.stderr)
Expand Down
15 changes: 15 additions & 0 deletions infra/scripts/setup-e2e-env-sparkop.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#!/bin/bash

make compile-protos-python

python -m pip install --upgrade pip==20.2 setuptools wheel

python -m pip install -qr sdk/python/requirements-dev.txt
python -m pip install -qr tests/requirements.txt

# Using mvn -q to make it less verbose. This step happens after docker containers were
# succesfully built so it should be unlikely to fail, therefore we likely won't need detailed logs.
echo "########## Building ingestion jar"
TIMEFORMAT='########## took %R seconds'

time make build-java-no-tests REVISION=develop MAVEN_EXTRA_OPTS="-q --no-transfer-progress"
20 changes: 20 additions & 0 deletions infra/scripts/test-end-to-end-sparkop.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#!/usr/bin/env bash

set -euo pipefail

pip install "s3fs" "boto3" "urllib3>=1.25.4"

export DISABLE_FEAST_SERVICE_FIXTURES=1
export DISABLE_SERVICE_FIXTURES=1

export FEAST_SPARK_K8S_NAMESPACE=sparkop

PYTHONPATH=sdk/python pytest tests/e2e/ \
--feast-version develop \
--core-url sparkop-feast-core:6565 \
--serving-url sparkop-feast-online-serving:6566 \
--env k8s \
--staging-path $STAGING_PATH \
--redis-url sparkop-redis-master.sparkop.svc.cluster.local:6379 \
--kafka-brokers sparkop-kafka.sparkop.svc.cluster.local:9092 \
-m "not bq"
2 changes: 1 addition & 1 deletion sdk/python/feast/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ class ConfigOptions(metaclass=ConfigMeta):
SPARK_K8S_NAMESPACE = "default"

# expect k8s spark operator to be running in the same cluster as Feast
SPARK_K8S_USE_INCLUSTER_CONFIG = True
SPARK_K8S_USE_INCLUSTER_CONFIG = "True"

# SparkApplication resource template
SPARK_K8S_JOB_TEMPLATE_PATH = None
Expand Down
3 changes: 3 additions & 0 deletions sdk/python/feast/pyspark/launchers/k8s/k8s.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ def historical_feature_retrieval(
jars=[],
extra_metadata={METADATA_OUTPUT_URI: job_params.get_destination_path()},
arguments=job_params.get_arguments(),
namespace=self._namespace,
)

job_info = _submit_job(
Expand Down Expand Up @@ -276,6 +277,7 @@ def offline_to_online_ingestion(
jars=[],
extra_metadata={},
arguments=ingestion_job_params.get_arguments(),
namespace=self._namespace,
)

job_info = _submit_job(
Expand Down Expand Up @@ -317,6 +319,7 @@ def start_stream_to_online_ingestion(
jars=extra_jar_paths,
extra_metadata={METADATA_JOBHASH: job_hash},
arguments=ingestion_job_params.get_arguments(),
namespace=self._namespace,
)

job_info = _submit_job(
Expand Down
9 changes: 7 additions & 2 deletions sdk/python/feast/pyspark/launchers/k8s/k8s_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,19 @@ def _prepare_job_resource(
jars: List[str],
extra_metadata: Dict[str, str],
arguments: List[str],
namespace: str,
) -> Dict[str, Any]:
""" Prepare SparkApplication custom resource configs """
job = deepcopy(job_template)

labels = {LABEL_JOBID: job_id, LABEL_JOBTYPE: job_type}

_add_keys(job, ("metadata", "labels"), labels)
_add_keys(job, ("metadata",), dict(name=_job_id_to_resource_name(job_id)))
_add_keys(
job,
("metadata",),
dict(name=_job_id_to_resource_name(job_id), namespace=namespace),
)
_add_keys(job, ("spec",), dict(mainClass=main_class))
_add_keys(job, ("spec",), dict(mainApplicationFile=main_application_file))
_add_keys(job, ("spec",), dict(arguments=arguments))
Expand Down Expand Up @@ -179,7 +184,7 @@ def _k8s_state_to_feast(k8s_state: str) -> SparkJobStatus:

def _resource_to_job_info(resource: Dict[str, Any]) -> JobInfo:
labels = resource["metadata"]["labels"]
sparkConf = resource["spec"].get("sparkConf")
sparkConf = resource["spec"].get("sparkConf", {})

if "status" in resource:
state = _k8s_state_to_feast(resource["status"]["applicationState"]["state"])
Expand Down
4 changes: 3 additions & 1 deletion tests/e2e/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ def pytest_addoption(parser):
parser.addoption("--job-service-url", action="store", default="localhost:6568")
parser.addoption("--kafka-brokers", action="store", default="localhost:9092")

parser.addoption("--env", action="store", help="local|aws|gcloud", default="local")
parser.addoption(
"--env", action="store", help="local|aws|gcloud|k8s", default="local"
)
parser.addoption("--with-job-service", action="store_true")
parser.addoption("--staging-path", action="store")
parser.addoption("--dataproc-cluster-name", action="store")
Expand Down
13 changes: 13 additions & 0 deletions tests/e2e/fixtures/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,19 @@ def feast_client(
local_staging_path, "historical_output"
),
)
elif pytestconfig.getoption("env") == "k8s":
return Client(
core_url=f"{feast_core[0]}:{feast_core[1]}",
serving_url=f"{feast_serving[0]}:{feast_serving[1]}",
spark_launcher="k8s",
spark_staging_location=os.path.join(local_staging_path, "k8s"),
spark_ingestion_jar=ingestion_job_jar,
redis_host=pytestconfig.getoption("redis_url").split(":")[0],
redis_port=pytestconfig.getoption("redis_url").split(":")[1],
historical_feature_output_location=os.path.join(
local_staging_path, "historical_output"
),
)
else:
raise KeyError(f"Unknown environment {pytestconfig.getoption('env')}")

Expand Down
9 changes: 6 additions & 3 deletions tests/e2e/test_historical_features.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from datetime import datetime, timedelta
from typing import Union
from urllib.parse import urlparse
from urllib.parse import urlparse, urlunparse

import gcsfs
import numpy as np
Expand All @@ -24,11 +24,14 @@ def read_parquet(uri):
files = ["gs://" + path for path in fs.glob(uri + "/part-*")]
ds = parquet.ParquetDataset(files, filesystem=fs)
return ds.read().to_pandas()
elif parsed_uri.scheme == "s3":
elif parsed_uri.scheme == "s3" or parsed_uri.scheme == "s3a":

s3uri = urlunparse(parsed_uri._replace(scheme="s3"))

import s3fs

fs = s3fs.S3FileSystem()
files = ["s3://" + path for path in fs.glob(uri + "/part-*")]
files = ["s3://" + path for path in fs.glob(s3uri + "/part-*")]
ds = parquet.ParquetDataset(files, filesystem=fs)
return ds.read().to_pandas()
else:
Expand Down