Skip to content

Commit

Permalink
test(providers/microsoft): add test case to wasb triggers
Browse files Browse the repository at this point in the history
  • Loading branch information
Lee-W committed Jan 16, 2024
1 parent a15013e commit 6790b5b
Show file tree
Hide file tree
Showing 2 changed files with 216 additions and 1 deletion.
1 change: 0 additions & 1 deletion tests/always/test_project_structure.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,6 @@ def test_providers_modules_should_have_tests(self):
"tests/providers/microsoft/azure/fs/test_adls.py",
"tests/providers/microsoft/azure/operators/test_adls.py",
"tests/providers/microsoft/azure/transfers/test_azure_blob_to_gcs.py",
"tests/providers/microsoft/azure/triggers/test_wasb.py",
"tests/providers/mongo/sensors/test_mongo.py",
"tests/providers/openlineage/extractors/test_manager.py",
"tests/providers/openlineage/plugins/test_adapter.py",
Expand Down
216 changes: 216 additions & 0 deletions tests/providers/microsoft/azure/triggers/test_wasb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

import asyncio
import logging
from unittest import mock

import pytest

from airflow.providers.microsoft.azure.triggers.wasb import (
WasbBlobSensorTrigger,
WasbPrefixSensorTrigger,
)
from airflow.triggers.base import TriggerEvent

TEST_DATA_STORAGE_BLOB_NAME = "test_blob_providers_team.txt"
TEST_DATA_STORAGE_CONTAINER_NAME = "test-container-providers-team"
TEST_DATA_STORAGE_BLOB_PREFIX = TEST_DATA_STORAGE_BLOB_NAME[:10]
TEST_WASB_CONN_ID = "wasb_default"
POKE_INTERVAL = 5.0


class TestWasbBlobSensorTrigger:
TRIGGER = WasbBlobSensorTrigger(
container_name=TEST_DATA_STORAGE_CONTAINER_NAME,
blob_name=TEST_DATA_STORAGE_BLOB_NAME,
wasb_conn_id=TEST_WASB_CONN_ID,
poke_interval=POKE_INTERVAL,
)

def test_serialization(self):
"""
Asserts that the WasbBlobSensorTrigger correctly serializes its arguments
and classpath.
"""

classpath, kwargs = self.TRIGGER.serialize()
assert classpath == "airflow.providers.microsoft.azure.triggers.wasb.WasbBlobSensorTrigger"
assert kwargs == {
"container_name": TEST_DATA_STORAGE_CONTAINER_NAME,
"blob_name": TEST_DATA_STORAGE_BLOB_NAME,
"wasb_conn_id": TEST_WASB_CONN_ID,
"poke_interval": POKE_INTERVAL,
"public_read": False,
}

@pytest.mark.asyncio
@pytest.mark.parametrize(
"blob_exists",
[True, False],
)
@mock.patch("airflow.providers.microsoft.azure.hooks.wasb.WasbAsyncHook.check_for_blob_async")
async def test_running(self, mock_check_for_blob, blob_exists):
"""
Test if the task is run in trigger successfully.
"""
mock_check_for_blob.return_value = blob_exists

task = asyncio.create_task(self.TRIGGER.run().__anext__())

# TriggerEvent was not returned
assert task.done() is False
asyncio.get_event_loop().stop()

@pytest.mark.asyncio
@mock.patch("airflow.providers.microsoft.azure.hooks.wasb.WasbAsyncHook.check_for_blob_async")
async def test_success(self, mock_check_for_blob):
"""Tests the success state for that the WasbBlobSensorTrigger."""
mock_check_for_blob.return_value = True

task = asyncio.create_task(self.TRIGGER.run().__anext__())
await asyncio.sleep(0.5)

# TriggerEvent was returned
assert task.done() is True
asyncio.get_event_loop().stop()

message = f"Blob {TEST_DATA_STORAGE_BLOB_NAME} found in container {TEST_DATA_STORAGE_CONTAINER_NAME}."
assert task.result() == TriggerEvent({"status": "success", "message": message})

@pytest.mark.asyncio
@mock.patch("airflow.providers.microsoft.azure.hooks.wasb.WasbAsyncHook.check_for_blob_async")
async def test_waiting_for_blob(self, mock_check_for_blob, caplog):
"""Tests the WasbBlobSensorTrigger sleeps waiting for the blob to arrive."""
mock_check_for_blob.side_effect = [False, True]
caplog.set_level(logging.INFO)

with mock.patch.object(self.TRIGGER.log, "info"):
task = asyncio.create_task(self.TRIGGER.run().__anext__())

await asyncio.sleep(POKE_INTERVAL + 0.5)

if not task.done():
message = (
f"Blob {TEST_DATA_STORAGE_BLOB_NAME} not available yet in container {TEST_DATA_STORAGE_CONTAINER_NAME}."
f" Sleeping for {POKE_INTERVAL} seconds"
)
assert message in caplog.text
asyncio.get_event_loop().stop()

@pytest.mark.asyncio
@mock.patch("airflow.providers.microsoft.azure.hooks.wasb.WasbAsyncHook.check_for_blob_async")
async def test_trigger_exception(self, mock_check_for_blob):
"""Tests the WasbBlobSensorTrigger yields an error event if there is an exception."""
mock_check_for_blob.side_effect = Exception("Test exception")

task = [i async for i in self.TRIGGER.run()]
assert len(task) == 1
assert TriggerEvent({"status": "error", "message": "Test exception"}) in task


class TestWasbPrefixSensorTrigger:
TRIGGER = WasbPrefixSensorTrigger(
container_name=TEST_DATA_STORAGE_CONTAINER_NAME,
prefix=TEST_DATA_STORAGE_BLOB_PREFIX,
wasb_conn_id=TEST_WASB_CONN_ID,
poke_interval=POKE_INTERVAL,
check_options={"delimiter": "/", "include": None},
)

def test_serialization(self):
"""
Asserts that the WasbPrefixSensorTrigger correctly serializes its arguments and classpath."""

classpath, kwargs = self.TRIGGER.serialize()
assert classpath == "airflow.providers.microsoft.azure.triggers.wasb.WasbPrefixSensorTrigger"
assert kwargs == {
"container_name": TEST_DATA_STORAGE_CONTAINER_NAME,
"prefix": TEST_DATA_STORAGE_BLOB_PREFIX,
"wasb_conn_id": TEST_WASB_CONN_ID,
"public_read": False,
"check_options": {
"delimiter": "/",
"include": None,
},
"poke_interval": POKE_INTERVAL,
}

@pytest.mark.asyncio
@pytest.mark.parametrize(
"prefix_exists",
[True, False],
)
@mock.patch("airflow.providers.microsoft.azure.hooks.wasb.WasbAsyncHook.check_for_prefix_async")
async def test_running(self, mock_check_for_prefix, prefix_exists):
"""Test if the task is run in trigger successfully."""
mock_check_for_prefix.return_value = prefix_exists

task = asyncio.create_task(self.TRIGGER.run().__anext__())

# TriggerEvent was not returned
assert task.done() is False
asyncio.get_event_loop().stop()

@pytest.mark.asyncio
@mock.patch("airflow.providers.microsoft.azure.hooks.wasb.WasbAsyncHook.check_for_prefix_async")
async def test_success(self, mock_check_for_prefix):
"""Tests the success state for that the WasbPrefixSensorTrigger."""
mock_check_for_prefix.return_value = True

task = asyncio.create_task(self.TRIGGER.run().__anext__())
await asyncio.sleep(0.5)

# TriggerEvent was returned
assert task.done() is True
asyncio.get_event_loop().stop()

message = (
f"Prefix {TEST_DATA_STORAGE_BLOB_PREFIX} found in container {TEST_DATA_STORAGE_CONTAINER_NAME}."
)
assert task.result() == TriggerEvent({"status": "success", "message": message})

@pytest.mark.asyncio
@mock.patch("airflow.providers.microsoft.azure.hooks.wasb.WasbAsyncHook.check_for_prefix_async")
async def test_waiting_for_blob(self, mock_check_for_prefix):
"""Tests the WasbPrefixSensorTrigger sleeps waiting for the blob to arrive."""
mock_check_for_prefix.side_effect = [False, True]

with mock.patch.object(self.TRIGGER.log, "info") as mock_log_info:
task = asyncio.create_task(self.TRIGGER.run().__anext__())

await asyncio.sleep(POKE_INTERVAL + 0.5)

if not task.done():
message = (
f"Prefix {TEST_DATA_STORAGE_BLOB_PREFIX} not available yet in container "
f"{TEST_DATA_STORAGE_CONTAINER_NAME}. Sleeping for {POKE_INTERVAL} seconds"
)
mock_log_info.assert_called_once_with(message)
asyncio.get_event_loop().stop()

@pytest.mark.asyncio
@mock.patch("airflow.providers.microsoft.azure.hooks.wasb.WasbAsyncHook.check_for_prefix_async")
async def test_trigger_exception(self, mock_check_for_prefix):
"""Tests the WasbPrefixSensorTrigger yields an error event if there is an exception."""
mock_check_for_prefix.side_effect = Exception("Test exception")

task = [i async for i in self.TRIGGER.run()]
assert len(task) == 1
assert TriggerEvent({"status": "error", "message": "Test exception"}) in task

0 comments on commit 6790b5b

Please sign in to comment.