-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathserializers.py
78 lines (63 loc) · 2.17 KB
/
serializers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
"""Serialization format module."""
from typing import final, Any, TypeVar, Type
import pandas
from adapta.storage.models.format import (
DataFrameParquetSerializationFormat,
DictJsonSerializationFormat,
SerializationFormat,
)
T = TypeVar("T") # pylint: disable=C0103
class Serializer:
"""
Serializer that dynamically infers serialization format. The format to use is determined at runtime
by the type of the data.
"""
def __init__(
self,
default_serialization_formats: dict[
Type[T], Type[SerializationFormat[T]]
] = None,
):
self._serialization_formats = (
{}
if default_serialization_formats is None
else default_serialization_formats
)
def get_serialization_format(self, data: Any) -> Type[SerializationFormat]:
"""
Get the serializer for the data.
"""
return self._serialization_formats[type(data)]
def with_format(
self, serialization_format: Type[SerializationFormat]
) -> "Serializer":
"""Add a serialization format to the supported formats. Note that only 1 serialization format is allowed per
type."""
serialization_target_type = serialization_format.__orig_bases__[0].__args__[0]
self._serialization_formats[serialization_target_type] = serialization_format
return self
def serialize(self, data) -> bytes:
"""
Serialize data.
"""
return self.get_serialization_format(data)().serialize(data)
@final
class TelemetrySerializer(Serializer):
"""Telemetry serialization format"""
def __init__(self):
super().__init__(
default_serialization_formats={
pandas.DataFrame: DataFrameParquetSerializationFormat,
dict: DictJsonSerializationFormat,
}
)
@final
class ResultSerializer(Serializer):
"""Result serialization format"""
def __init__(self):
super().__init__(
default_serialization_formats={
pandas.DataFrame: DataFrameParquetSerializationFormat,
dict: DictJsonSerializationFormat,
}
)