Skip to content

Commit

Permalink
Remove unnecessary and add missing single-dispatch functions for message
Browse files Browse the repository at this point in the history
  • Loading branch information
decaz committed Feb 10, 2024
1 parent a1a970b commit f0c7a81
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 33 deletions.
2 changes: 0 additions & 2 deletions aio_pika/abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
ConsumerTag = str

MILLISECONDS = 1000
ZERO_TIME = datetime(1970, 1, 1)


class SSLOptions(TypedDict, total=False):
Expand Down Expand Up @@ -958,6 +957,5 @@ def _get_exchange_name_from_str(value: str) -> str:
"TransactionState",
"UnderlayChannel",
"UnderlayConnection",
"ZERO_TIME",
"get_exchange_name",
)
39 changes: 8 additions & 31 deletions aio_pika/message.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import time
import warnings
from datetime import datetime, timedelta, timezone
from functools import singledispatch
Expand All @@ -11,7 +10,7 @@
from pamqp.common import FieldValue

from .abc import (
MILLISECONDS, ZERO_TIME, AbstractChannel, AbstractIncomingMessage,
MILLISECONDS, AbstractChannel, AbstractIncomingMessage,
AbstractMessage, AbstractProcessContext, DateType, DeliveryMode,
HeadersType, MessageInfo, NoneType,
)
Expand Down Expand Up @@ -58,26 +57,21 @@ def decode_expiration(t: Any) -> Optional[float]:
raise ValueError("Invalid expiration type: %r" % type(t), t)


@decode_expiration.register(time.struct_time)
def decode_expiration_struct_time(t: time.struct_time) -> float:
return (datetime(*t[:7]) - ZERO_TIME).total_seconds()


@decode_expiration.register(str)
def decode_expiration_str(t: str) -> float:
return float(t)
return float(t) / MILLISECONDS


@decode_expiration.register(NoneType) # type: ignore
def decode_expiration_none(_: Any) -> None:
return None


@singledispatch
def encode_timestamp(value: Any) -> Optional[datetime]:
raise ValueError("Invalid timestamp type: %r" % type(value), value)


@encode_timestamp.register(time.struct_time)
def encode_timestamp_struct_time(value: time.struct_time) -> datetime:
return datetime(*value[:6])


@encode_timestamp.register(datetime)
def encode_timestamp_datetime(value: datetime) -> datetime:
return value
Expand Down Expand Up @@ -109,17 +103,6 @@ def decode_timestamp_datetime(value: datetime) -> datetime:
return value


@decode_timestamp.register(float)
@decode_timestamp.register(int)
def decode_timestamp_number(value: Union[float, int]) -> datetime:
return datetime.fromtimestamp(value, tz=timezone.utc)


@decode_timestamp.register(time.struct_time)
def decode_timestamp_struct_time(value: time.struct_time) -> datetime:
return datetime(*value[:6])


@decode_timestamp.register(NoneType) # type: ignore
def decode_timestamp_none(_: Any) -> None:
return None
Expand Down Expand Up @@ -373,12 +356,6 @@ def __init__(self, message: DeliveredMessage, no_ack: bool = False):
self.__no_ack = no_ack
self.__processed = False

expiration = None
if message.header.properties.expiration:
expiration = decode_expiration(
message.header.properties.expiration,
)

super().__init__(
body=message.body,
content_type=message.header.properties.content_type,
Expand All @@ -388,7 +365,7 @@ def __init__(self, message: DeliveredMessage, no_ack: bool = False):
priority=message.header.properties.priority,
correlation_id=message.header.properties.correlation_id,
reply_to=message.header.properties.reply_to,
expiration=expiration / 1000.0 if expiration else None,
expiration=decode_expiration(message.header.properties.expiration),
message_id=message.header.properties.message_id,
timestamp=decode_timestamp(message.header.properties.timestamp),
type=message.header.properties.message_type,
Expand Down

0 comments on commit f0c7a81

Please sign in to comment.