-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathinterfaces.py
204 lines (162 loc) · 6.56 KB
/
interfaces.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
from __future__ import annotations
from packify import SerializableType
from typing import Any, Callable, Hashable, Protocol, Type, runtime_checkable
@runtime_checkable
class ClockProtocol(Protocol):
"""Duck typed Protocol showing what a clock must do."""
uuid: bytes
default_ts: SerializableType
def read(self, /, *, inject: dict = {}) -> SerializableType:
"""Return the current timestamp."""
...
def update(self, data: SerializableType = None) -> SerializableType:
"""Update the clock and return the current time stamp."""
...
@staticmethod
def is_later(ts1: SerializableType, ts2: SerializableType) -> bool:
"""Return True iff ts1 > ts2."""
...
@staticmethod
def are_concurrent(ts1: SerializableType, ts2: SerializableType) -> bool:
"""Return True if not ts1 > ts2 and not ts2 > ts1."""
...
@staticmethod
def compare(ts1: SerializableType, ts2: SerializableType) -> int:
"""Return 1 if ts1 is later than ts2; -1 if ts2 is later than
ts1; and 0 if they are concurrent/incomparable.
"""
...
def pack(self) -> bytes:
"""Pack the clock into bytes."""
...
@classmethod
def unpack(cls, data: bytes, /, *, inject: dict = {}) -> ClockProtocol:
"""Unpack a clock from bytes."""
...
@runtime_checkable
class CRDTProtocol(Protocol):
"""Duck typed Protocol showing what CRDTs must do."""
clock: ClockProtocol
def pack(self) -> bytes:
"""Pack the data and metadata into a bytes string."""
...
@classmethod
def unpack(cls, data: bytes, /, *, inject: dict = {}) -> CRDTProtocol:
"""Unpack the data bytes string into an instance."""
...
def read(self, /, *, inject: dict = {}) -> Any:
"""Return the eventually consistent data view."""
...
def update(self, state_update: StateUpdateProtocol, /, *,
inject: dict = {}) -> CRDTProtocol:
"""Apply an update and return self (monad pattern). Should call
self.invoke_listeners after validating the state_update.
"""
...
def checksums(self, /, *, from_ts: Any = None, until_ts: Any = None) -> tuple[Any]:
"""Returns any checksums for the underlying data to detect
desynchronization due to message failure.
"""
...
def history(self, /, *, from_ts: Any = None, until_ts: Any = None,
update_class: Type[StateUpdateProtocol] = None) -> tuple[StateUpdateProtocol]:
"""Returns a concise history of StateUpdates that will converge
to the underlying data. Useful for resynchronization by
replaying all updates from divergent nodes.
"""
...
def get_merkle_history(self, /, *, update_class: Type[StateUpdateProtocol]
) -> list[bytes, list[bytes], dict[bytes, bytes]]:
"""Get a Merklized history for the StateUpdates of the form
[root, [content_id for update in self.history()], {
content_id: packed for update in self.history()}] where
packed is the result of update.pack() and content_id is the
sha256 of the packed update.
"""
...
def resolve_merkle_histories(self, history: list[bytes, list[bytes]]) -> list[bytes]:
"""Accept a history of form [root, leaves] from another node.
Return the leaves that need to be resolved and merged for
synchronization.
"""
...
def add_listener(self, listener: Callable[[StateUpdateProtocol], None]) -> None:
"""Adds a listener that is called on each update."""
...
def remove_listener(self, listener: Callable[[StateUpdateProtocol], None]) -> None:
"""Removes a listener if it was previously added."""
...
def invoke_listeners(self, state_update: StateUpdateProtocol) -> None:
"""Invokes all event listeners, passing them the state_update."""
...
@runtime_checkable
class ListProtocol(Protocol):
def index(self, item: SerializableType, _start: int = 0, _stop: int = -1) -> int:
"""Returns the int index of the item in the list returned by
read(). Should raise a ValueError if the item is not
present.
"""
...
def append(self, item: SerializableType, writer: SerializableType, /, *,
update_class: Type[StateUpdateProtocol]) -> StateUpdateProtocol:
"""Creates, applies, and returns an update_class that appends
the item to the end of the list returned by read().
"""
...
def remove(self, index: int, writer: SerializableType, /, *,
update_class: Type[StateUpdateProtocol]) -> StateUpdateProtocol:
"""Creates, applies, and returns an update_class that removes
the item at the index in the list returned by read(). Should
raise ValueError if the index is out of bounds.
"""
...
@runtime_checkable
class DataWrapperProtocol(Protocol):
"""Duck type protocol for values that can be written to a LWWRegister,
included in a GSet or ORSet, or be used as the key for a LWWMap.
Can also be packed, unpacked, and compared.
"""
value: Any
def __hash__(self) -> int:
"""Data type must be hashable."""
...
def __eq__(self, other) -> bool:
"""Data type must be comparable."""
...
def __ne__(self, other) -> bool:
"""Data type must be comparable."""
...
def __gt__(self, other) -> bool:
"""Data type must be comparable."""
...
def __ge__(self, other) -> bool:
"""Data type must be comparable."""
...
def __lt__(self, other) -> bool:
"""Data type must be comparable."""
...
def __le__(self, other) -> bool:
"""Data type must be comparable."""
...
def pack(self) -> bytes:
"""Package value into bytes."""
...
@classmethod
def unpack(cls, data: bytes, /, *, inject: dict = {}) -> DataWrapperProtocol:
"""Unpack value from bytes."""
...
@runtime_checkable
class StateUpdateProtocol(Protocol):
clock_uuid: bytes
ts: Any
data: Hashable
def __init__(self, /, *, clock_uuid: bytes, ts: Any, data: Hashable) -> None:
"""Initialize the instance."""
...
def pack(self) -> bytes:
"""Pack the instance into bytes."""
...
@classmethod
def unpack(cls, data: bytes, /, *, inject: dict = {}) -> StateUpdateProtocol:
"""Unpack an instance from bytes."""
...