Skip to content

Commit

Permalink
more queue opt cosmos
Browse files Browse the repository at this point in the history
  • Loading branch information
deanm0000 committed Jul 30, 2024
1 parent f612c8f commit 9d6c712
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 29 deletions.
9 changes: 6 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "dean_utils"
version="0.0.20"
version="0.0.21"
authors=[
{ name="Dean MacGregor", email="[email protected]"}
]
Expand All @@ -19,7 +19,10 @@ dependencies=[
"fsspec",
"azure-storage-queue",
"python-multipart",
"azure-cosmos",
"azure-communication-email"

]

[project.optional-dependencies]
cosmos = [
"azure-cosmos",
]
114 changes: 93 additions & 21 deletions src/dean_utils/utils/az_utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from azure.cosmos.aio import CosmosClient
import re
import os
from azure.storage.queue.aio import QueueServiceClient as QSC
Expand All @@ -14,23 +13,32 @@
from datetime import datetime, timedelta, timezone
from typing import TypeAlias, Literal
from uuid import uuid4
import json

HTTPX_METHODS: TypeAlias = Literal["GET", "POST"]
AIO_SERVE = QSC.from_connection_string(conn_str=os.environ["AzureWebJobsStorage"])


def def_cos(db_name, client_name):
return (
CosmosClient(
re.search("(?<=AccountEndpoint=).+?(?=;)", os.environ["cosmos"]).group(),
{
"masterKey": re.search(
"(?<=AccountKey=).+?(?=$)", os.environ["cosmos"]
).group()
},
try:
from azure.cosmos.aio import CosmosClient

return (
CosmosClient(
re.search(
"(?<=AccountEndpoint=).+?(?=;)", os.environ["cosmos"]
).group(),
{
"masterKey": re.search(
"(?<=AccountKey=).+?(?=$)", os.environ["cosmos"]
).group()
},
)
.get_database_client(db_name)
.get_container_client(client_name)
)
.get_database_client(db_name)
.get_container_client(client_name)
)
except ImportError:
raise ImportError("MS's cosmos sdk not installed")


async def cos_query_all(cosdb, QRY):
Expand All @@ -51,16 +59,80 @@ async def cos_query_all(cosdb, QRY):
return returns


async def send_to_queue(queue: str, messages: List):
aioserv = QSC.from_connection_string(conn_str=os.environ["Synblob"])
async with aioserv.get_queue_client(
async def send_message(
queue: str,
messages: List[str] | str | dict,
*,
visibility_timeout: int | None = None,
time_to_live: int | None = None,
timeout: int | None = None,
**kwargs,
):
async with AIO_SERVE.get_queue_client(
queue, message_encode_policy=TextBase64EncodePolicy()
) as aioclient:
tasks = [
asyncio.create_task(aioclient.send_message(_file)) for _file in messages
]
await asyncio.wait(tasks)
return tasks
) as aio_client:
if isinstance(messages, list):
tasks = []
for message in messages:
if not isinstance(message, str):
message = json.dumps(message)
asyncio.create_task(
aio_client.send_message(
message,
visibility_timeout=visibility_timeout,
time_to_live=time_to_live,
timeout=timeout,
**kwargs,
)
)

await asyncio.wait(tasks)
return tasks
else:
if not isinstance(messages, str):
messages = json.dumps(messages)
return await aio_client.send_message(messages)


async def update_queue(
queue,
id,
pop_receipt,
message,
*,
visibility_timeout: int | None = None,
timeout: int | None = None,
**kwargs,
):
async with AIO_SERVE.get_queue_client(
queue, message_encode_policy=TextBase64EncodePolicy()
) as aio_client:
task = await aio_client.update_message(
id,
pop_receipt,
content=message,
visibility_timeout=visibility_timeout,
timeout=timeout,
**kwargs,
)
return task


async def delete_message(
queue,
id,
pop_receipt,
**kwargs,
):
async with AIO_SERVE.get_queue_client(
queue, message_encode_policy=TextBase64EncodePolicy()
) as aio_client:
task = await aio_client.delete_message(
id,
pop_receipt,
**kwargs,
)
return task


class async_abfs:
Expand Down
10 changes: 5 additions & 5 deletions src/dean_utils/utils/email_utility.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ def send_email(from_email: str, to_email: str, subject: str, msg: str) -> None:


def az_send(
subject: str = None,
msg: str = None,
html: str = None,
from_email: str = None,
to_email: str = None,
subject: str | None = None,
msg: str | None = None,
html: str | None = None,
from_email: str | None = None,
to_email: str | None = None,
) -> None:
if email_client is None:
raise MissingEnvVars("missing azuremail var")
Expand Down

0 comments on commit 9d6c712

Please sign in to comment.