Skip to content

Commit

Permalink
Merge redis and rabbitmq integration into celery (apache#28205)
Browse files Browse the repository at this point in the history
Previously we had separate integrations per service, but since we
are moving the integration tests into separate job, it will be
easier if the  celery integration is a single one - this way we
will have 1-1 relationship between tests to run and enabled
integrations.

The checks for various integrations were not really working
recently and this change is introducing them back.
  • Loading branch information
potiuk authored Dec 8, 2022
1 parent b002215 commit 68217f5
Show file tree
Hide file tree
Showing 16 changed files with 275 additions and 330 deletions.
6 changes: 2 additions & 4 deletions dev/breeze/src/airflow_breeze/global_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@
"mongo",
"openldap",
"pinot",
"rabbitmq",
"redis",
"celery",
"statsd",
"trino",
]
Expand Down Expand Up @@ -199,8 +198,7 @@ def get_airflow_extras():
"mongo",
"openldap",
"pinot",
"rabbitmq",
"redis",
"celery",
"statsd",
"trino",
]
Expand Down
10 changes: 5 additions & 5 deletions images/breeze/output-commands-hash.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# This file is automatically generated by pre-commit. If you have a conflict with this file
# Please do not solve it but run `breeze setup regenerate-command-images`.
# This command should fix the conflict and regenerate help images that you have conflict with.
main:11fedd2637e0e3b31936c4f015ab0d19
main:0e7677cd7035e01b5260b9ac4889fd90
build-docs:80555245ea1142991ce1d63c3bf8ce74
ci:find-newer-dependencies:8fa2b57f5f0523c928743b235ee3ab5a
ci:fix-ownership:fee2c9ec9ef19686792002ae054fecdd
Expand Down Expand Up @@ -48,11 +48,11 @@ setup:regenerate-command-images:20016a5ea492f214692c4b57c4fa9c06
setup:self-upgrade:d02f70c7a230eae3463ceec2056b63fa
setup:version:123b462a421884dc2320ffc5e54b2478
setup:a3bd246c3a425f3e586d11bbdc8937cb
shell:51149096ed3fb28d2a01e0a5ad0146ef
start-airflow:4bec5d26386dc268ec2de2f637c85272
shell:6c4dc12f3ff82a1e16aa3818d4181966
start-airflow:d9ccad39c8551d32aa6d2dd9eee8d573
static-checks:f45ad432bdc47a2256fdb0277b19d816
stop:8969537ccdd799f692ccb8600a7bbed6
testing:docker-compose-tests:b86c044b24138af0659a05ed6331576c
testing:helm-tests:94a442e7f3f63b34c4831a84d165690a
testing:tests:6f6b7f18cde20255fe988f3a26f340cf
testing:2a93cd42229622d0eaa445c96fd1246d
testing:tests:d2961459b8b39377dd2bc6a06a1bacf0
testing:9d83bc79d12d10f15402cb848bebcc01
98 changes: 47 additions & 51 deletions images/breeze/output-commands.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
110 changes: 53 additions & 57 deletions images/breeze/output_shell.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
120 changes: 58 additions & 62 deletions images/breeze/output_start-airflow.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
22 changes: 11 additions & 11 deletions images/breeze/output_testing.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
90 changes: 45 additions & 45 deletions images/breeze/output_testing_tests.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,17 @@
---
version: "3.7"
services:
rabbitmq:
image: rabbitmq:3.7
volumes:
- /dev/urandom:/dev/random # Required to get non-blocking entropy source
- rabbitmq-db-volume:/var/lib/rabbitmq
healthcheck:
test: rabbitmq-diagnostics -q ping
interval: 5s
timeout: 30s
retries: 50
restart: "on-failure"
redis:
image: redis:5.0.1
volumes:
Expand All @@ -30,13 +41,15 @@ services:
timeout: 30s
retries: 50
restart: "on-failure"

airflow:
environment:
- INTEGRATION_REDIS=true
- INTEGRATION_CELERY=true
depends_on:
redis:
condition: service_healthy
rabbitmq:
condition: service_healthy

volumes:
rabbitmq-db-volume:
redis-db-volume:
40 changes: 0 additions & 40 deletions scripts/ci/docker-compose/integration-rabbitmq.yml

This file was deleted.

75 changes: 32 additions & 43 deletions scripts/in_container/check_environment.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
# shellcheck source=scripts/in_container/_in_container_script_init.sh
EXIT_CODE=0

DISABLED_INTEGRATIONS=""

# We want to avoid misleading messages and perform only forward lookup of the service IP address.
# Netcat when run without -n performs both forward and reverse lookup and fails if the reverse
# lookup name does not match the original name even if the host is reachable via IP. This happens
Expand Down Expand Up @@ -76,23 +74,6 @@ function check_service {
fi
}

function check_integration {
local integration_label=$1
local integration_name=$2
local call=$3
local max_check=${4:=1}

local env_var_name
env_var_name=INTEGRATION_${integration_name^^}
if [[ ${!env_var_name:=} != "true" || ${!env_var_name} != "True" ]]; then
if [[ ! ${DISABLED_INTEGRATIONS} == *" ${integration_name}"* ]]; then
DISABLED_INTEGRATIONS="${DISABLED_INTEGRATIONS} ${integration_name}"
fi
return
fi
check_service "${integration_label}" "${call}" "${max_check}"
}

function check_db_backend {
local max_check=${1:=1}

Expand Down Expand Up @@ -144,30 +125,44 @@ function startairflow_if_requested() {
}

echo
echo "${COLOR_BLUE}Checking integrations and backends.${COLOR_RESET}"
echo "${COLOR_BLUE}Checking backend and integrations.${COLOR_RESET}"
echo

if [[ -n ${BACKEND=} ]]; then
check_db_backend 50
fi
echo
check_integration "Kerberos" "kerberos" "run_nc kdc-server-example-com 88" 50
check_integration "MongoDB" "mongo" "run_nc mongo 27017" 50
check_integration "Redis" "redis" "run_nc redis 6379" 50
check_integration "Cassandra" "cassandra" "run_nc cassandra 9042" 50
check_integration "OpenLDAP" "openldap" "run_nc openldap 389" 50
check_integration "Trino (HTTP)" "trino" "run_nc trino 8080" 50
check_integration "Trino (HTTPS)" "trino" "run_nc trino 7778" 50
check_integration "Trino (API)" "trino" \
"curl --max-time 1 http://trino:8080/v1/info/ | grep '\"starting\":false'" 50
check_integration "Pinot (HTTP)" "pinot" "run_nc pinot 9000" 50
CMD="curl --max-time 1 -X GET 'http://pinot:9000/health' -H 'accept: text/plain' | grep OK"
check_integration "Pinot (Controller API)" "pinot" "${CMD}" 50
CMD="curl --max-time 1 -X GET 'http://pinot:9000/pinot-controller/admin' -H 'accept: text/plain' | grep GOOD"
check_integration "Pinot (Controller API)" "pinot" "${CMD}" 50
CMD="curl --max-time 1 -X GET 'http://pinot:8000/health' -H 'accept: text/plain' | grep OK"
check_integration "Pinot (Broker API)" "pinot" "${CMD}" 50
check_integration "RabbitMQ" "rabbitmq" "run_nc rabbitmq 5672" 50

if [[ ${INTEGRATION_KERBEROS} == "true" ]]; then
check_service "Kerberos" "run_nc kdc-server-example-com 88" 50
fi
if [[ ${INTEGRATION_MONGO} == "true" ]]; then
check_service "MongoDB" "run_nc mongo 27017" 50
fi
if [[ ${INTEGRATION_CELERY} == "true" ]]; then
check_service "Redis" "run_nc redis 6379" 50
check_service "RabbitMQ" "run_nc rabbitmq 5672" 50
fi
if [[ ${INTEGRATION_CASSANDRA} == "true" ]]; then
check_service "Cassandra" "run_nc cassandra 9042" 50
fi
if [[ ${INTEGRATION_OPENLDAP} == "true" ]]; then
check_service "OpenLDAP" "run_nc openldap 389" 50
fi
if [[ ${INTEGRATION_TRINO} == "true" ]]; then
check_service "Trino (HTTP)" "run_nc trino 8080" 50
check_service "Trino (HTTPS)" "run_nc trino 7778" 50
check_service "Trino (API)" "curl --max-time 1 http://trino:8080/v1/info/ | grep '\"starting\":false'" 50
fi
if [[ ${INTEGRATION_PINOT} == "true" ]]; then
check_service "Pinot (HTTP)" "run_nc pinot 9000" 50
CMD="curl --max-time 1 -X GET 'http://pinot:9000/health' -H 'accept: text/plain' | grep OK"
check_service "Pinot (Controller API)" "${CMD}" 50
CMD="curl --max-time 1 -X GET 'http://pinot:9000/pinot-controller/admin' -H 'accept: text/plain' | grep GOOD"
check_service "Pinot (Controller API)" "${CMD}" 50
CMD="curl --max-time 1 -X GET 'http://pinot:8000/health' -H 'accept: text/plain' | grep OK"
check_service "Pinot (Broker API)" "${CMD}" 50
fi

if [[ ${EXIT_CODE} != 0 ]]; then
echo
Expand All @@ -180,9 +175,3 @@ fi

resetdb_if_requested
startairflow_if_requested

if [[ -n ${DISABLED_INTEGRATIONS=} && (${VERBOSE=} == "true" || ${VERBOSE} == "True") ]]; then
echo
echo "${COLOR_BLUE}Those integrations are disabled: ${DISABLED_INTEGRATIONS}${COLOR_RESET}"
echo
fi
3 changes: 1 addition & 2 deletions tests/integration/cli/commands/test_celery_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@
from tests.test_utils.config import conf_vars


@pytest.mark.integration("redis")
@pytest.mark.integration("rabbitmq")
@pytest.mark.integration("celery")
@pytest.mark.backend("mysql", "postgres")
class TestWorkerServeLogs:
@classmethod
Expand Down
6 changes: 2 additions & 4 deletions tests/integration/executors/test_celery_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,7 @@ def _prepare_app(broker_url=None, execute=None):
set_event_loop(None)


@pytest.mark.integration("redis")
@pytest.mark.integration("rabbitmq")
@pytest.mark.integration("celery")
@pytest.mark.backend("mysql", "postgres")
class TestCeleryExecutor:
def setup_method(self) -> None:
Expand Down Expand Up @@ -260,8 +259,7 @@ def __ne__(self, other):
return not self.__eq__(other)


@pytest.mark.integration("redis")
@pytest.mark.integration("rabbitmq")
@pytest.mark.integration("celery")
@pytest.mark.backend("mysql", "postgres")
class TestBulkStateFetcher(unittest.TestCase):
@mock.patch(
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/providers/redis/hooks/test_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from airflow.providers.redis.hooks.redis import RedisHook


@pytest.mark.integration("redis")
@pytest.mark.integration("celery")
class TestRedisHook:
def test_real_ping(self):
hook = RedisHook(redis_conn_id="redis_default")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
DEFAULT_DATE = timezone.datetime(2017, 1, 1)


@pytest.mark.integration("redis")
@pytest.mark.integration("celery")
class TestRedisPublishOperator:
def setup_method(self):
args = {"owner": "airflow", "start_date": DEFAULT_DATE}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
DEFAULT_DATE = timezone.datetime(2017, 1, 1)


@pytest.mark.integration("redis")
@pytest.mark.integration("celery")
class TestRedisSensor:
def setup_method(self):
args = {"owner": "airflow", "start_date": DEFAULT_DATE}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
DEFAULT_DATE = timezone.datetime(2017, 1, 1)


@pytest.mark.integration("redis")
@pytest.mark.integration("celery")
class TestRedisPubSubSensor:
def setup_method(self):
args = {"owner": "airflow", "start_date": DEFAULT_DATE}
Expand Down

0 comments on commit 68217f5

Please sign in to comment.