-
Notifications
You must be signed in to change notification settings - Fork 321
/
Copy pathinterface.py
495 lines (399 loc) · 14.7 KB
/
interface.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
# Copyright (c) Meta Platforms, Inc. and affiliates.
"""
Abstraction layer for fetching information needed to run OMM.
To try and separate concerns for extension, this file provides
access to all the persistent data needed to run OMM. In the
default implementation, we have a unified implementation that
implements all interfaces, but different implementations may
prefer to store different data in different places.
For implementations, see storage.mocked.MockedStore, which provides
plausable defaults for all of these interfaces, useful for testing,
or storage.default.DefaultOMMStore, which uses a combination of
static configuration and postgres.
"""
import abc
from dataclasses import dataclass
import typing as t
import time
import flask
from threatexchange.cli.storage.interfaces import ISignalTypeConfigStore
from threatexchange.utils import dataclass_json
from threatexchange.content_type.content_base import ContentType
from threatexchange.signal_type.signal_base import SignalType
from threatexchange.signal_type.index import SignalTypeIndex
from threatexchange.exchanges import auth
from threatexchange.exchanges.fetch_state import (
FetchCheckpointBase,
CollaborationConfigBase,
FetchedSignalMetadata,
TUpdateRecordKey,
)
from threatexchange.exchanges.signal_exchange_api import (
TSignalExchangeAPI,
TSignalExchangeAPICls,
)
@dataclass
class ContentTypeConfig:
"""
Holder for ContentType configuration.
"""
# Content types that are not enabled should not be used in hashing/matching
enabled: bool
content_type: t.Type[ContentType]
class IContentTypeConfigStore(metaclass=abc.ABCMeta):
"""Interface for accessing ContentType configuration"""
@abc.abstractmethod
def get_content_type_configs(self) -> t.Mapping[str, ContentTypeConfig]:
"""
Return all installed content types.
"""
@dataclass
class SignalTypeIndexBuildCheckpoint:
"""
A point at which the index has been built up to.
This feature allows the index to skip a build when nothing has changed,
which is mostly useful for debugging. Once you have enough ongoing
changes to the db, the value of eliding builds goes down, and
a naive implementation could not even store this.
The key check is the DB's last added hash id, and the total hash count
"""
# When the most recent item in the bank was added
# allows for optional fast incremental build for additions
last_item_timestamp: int
# What was the id of the last added id to the DB on build
last_item_id: int
# What is the total hash db size (to account for removals)
total_hash_count: int
@classmethod
def get_empty(cls):
"""Represents a checkpoint for an empty index / no hashes."""
return cls(last_item_timestamp=-1, last_item_id=-1, total_hash_count=0)
class ISignalTypeIndexStore(metaclass=abc.ABCMeta):
"""
Interface for accessing index objects.
In the SignalType interfaces, SignalTypeIndex is a large object that
contains all the information needed to match content known to the system.
This means that the index size is ultimately limited by available memory.
Extensions of OMM looking to solve this scaling problem may need to redesign
the bank -> indexing -> index flow, shard the index, or other tricks.
This approach provides a simple backup approach that will work with any
properly implemented SignalType.
"""
@abc.abstractmethod
def get_signal_type_index(
self,
signal_type: t.Type[SignalType],
) -> t.Optional[SignalTypeIndex[int]]:
"""
Return the built index for this SignalType.
For OMM, the indexed values are the ids of BankedContent
"""
@abc.abstractmethod
def store_signal_type_index(
self,
signal_type: t.Type[SignalType],
index: SignalTypeIndex,
checkpoint: SignalTypeIndexBuildCheckpoint,
) -> None:
"""
Persists the signal type index, potentially replacing a previous version.
"""
@abc.abstractmethod
def get_last_index_build_checkpoint(
self, signal_type: t.Type[SignalType]
) -> t.Optional[SignalTypeIndexBuildCheckpoint]:
"""
Returns chekpoint for last index build if it exists
"""
@dataclass
class SignalExchangeAPIConfig:
"""
Holder for SignalExchangeAPIConfig configuration.
"""
api_cls: TSignalExchangeAPICls
credentials: t.Optional[auth.CredentialHelper] = None
@property
def supports_auth(self):
"""Whether this API takes credentials for authentification"""
return issubclass(self.api_cls, auth.SignalExchangeWithAuth)
def set_credentials_from_json_dict(self, d: dict[str, t.Any]) -> None:
if not self.supports_auth:
raise ValueError(f"{self.api_cls.get_name()} does not support credentials")
cred_cls = t.cast(
auth.SignalExchangeWithAuth, self.api_cls
).get_credential_cls()
self.credentials = dataclass_json.dataclass_load_dict(d, cred_cls)
@dataclass(kw_only=True)
class FetchStatus:
checkpoint_ts: t.Optional[int]
running_fetch_start_ts: t.Optional[int]
last_fetch_complete_ts: t.Optional[int]
last_fetch_succeeded: t.Optional[bool]
up_to_date: bool
fetched_items: int
@property
def fetch_in_progress(self) -> bool:
return self.running_fetch_start_ts is not None
@classmethod
def get_default(cls) -> t.Self:
return cls(
checkpoint_ts=None,
running_fetch_start_ts=None,
last_fetch_complete_ts=None,
last_fetch_succeeded=None,
up_to_date=False,
fetched_items=0,
)
class ISignalExchangeStore(metaclass=abc.ABCMeta):
"""Interface for accessing SignalExchange configuration"""
def exchange_apis_get_installed(self) -> t.Mapping[str, TSignalExchangeAPICls]:
"""
Return all installed SignalExchange types.
"""
return {k: v.api_cls for k, v in self.exchange_apis_get_configs().items()}
@abc.abstractmethod
def exchange_apis_get_configs(self) -> t.Mapping[str, SignalExchangeAPIConfig]:
"""
Returns the configuration for all installed exchange types
"""
@abc.abstractmethod
def exchange_api_config_update(self, cfg: SignalExchangeAPIConfig) -> None:
"""
Update the config for an installed exchange API.
"""
@abc.abstractmethod
def exchange_update(
self, cfg: CollaborationConfigBase, *, create: bool = False
) -> None:
"""
Create or update a collaboration/exchange.
If create is false, if the name doesn't exist it will throw
If create is true, if the name already exists it will throw
"""
@abc.abstractmethod
def exchange_delete(self, name: str) -> None:
"""
Delete collaboration/exchange.
No exception is thrown if a config with that name doesn't exist
"""
@abc.abstractmethod
def exchanges_get(self) -> t.Mapping[str, CollaborationConfigBase]:
"""
Get all collaboration configs.
Collaboration configs control the syncing of data from external
sources to banks of labeled content locally.
"""
def exchange_get(self, name: str) -> t.Optional[CollaborationConfigBase]:
"""Get one collaboration config, if it exists"""
return self.exchanges_get().get(name)
@abc.abstractmethod
def exchange_get_fetch_status(self, name: str) -> FetchStatus:
"""
Get the last fetch status.
"""
@abc.abstractmethod
def exchange_get_fetch_checkpoint(
self, name: str
) -> t.Optional[FetchCheckpointBase]:
"""
Get the last fetch checkpoint.
If there is no previous fetch, returns None.
"""
@abc.abstractmethod
def exchange_get_client(
self, collab_config: CollaborationConfigBase
) -> TSignalExchangeAPI:
"""Return an auth'd and initialized client for a collab"""
@abc.abstractmethod
def exchange_start_fetch(self, collab_name: str) -> None:
"""Record the start of a fetch attempt for this collab"""
@abc.abstractmethod
def exchange_complete_fetch(
self, collab_name: str, *, is_up_to_date: bool, exception: bool
) -> None:
"""
Record that the fetch has completed, as well as how the fetch went.
"""
@abc.abstractmethod
def exchange_commit_fetch(
self,
collab: CollaborationConfigBase,
old_checkpoint: t.Optional[FetchCheckpointBase],
# The merged data from sequential fetches of the API
dat: t.Dict[str, t.Any],
# The last checkpoint recieved by the API
checkpoint: FetchCheckpointBase,
) -> None:
"""
Commit a sequentially fetched set of data from a fetch().
The old checkpoint can be used in two ways:
1. As a very weak attempt to prevent stomping old data in the
case of two process trying to commit at the same time.
2. If is_stale() is true, the storage can attempt to do something
smarter than dropping all data and reloading, which can prevent
the index from "flapping" if all the data is the same.
"""
@abc.abstractmethod
def exchange_get_data(
self,
collab_name: str,
key: TUpdateRecordKey,
) -> FetchedSignalMetadata:
"""
Get API-specific collaboration data by key.
This is only stored if the configuration for the exchange enables it,
otherwise an exception should be thrown.
"""
@dataclass
class BankConfig:
# UPPER_WITH_UNDER syntax
name: str
# 0.0-1.0 - what percentage of contents should be
# considered a match? Seeded by target content
matching_enabled_ratio: float
@property
def enabled(self) -> bool:
return self.matching_enabled_ratio > 0.0
@dataclass
class BankContentConfig:
"""
Represents all the signals (hashes) for one piece of content.
When signals come from external sources, or the original content
has been lost
"""
ENABLED: t.ClassVar[int] = 1
DISABLED: t.ClassVar[int] = 0
# This is what is indexed in the indice and directly returned by
# lookup
id: int
# Disable matching for just one seed content
# Has some magic values as well:
# 0 - disabled
# 1 - enabled
disable_until_ts: int
# If this content is originally from a collaboration, includes
# the name of the collaboration as well as the keys for use with
# ICollaborationStore.get_collab_data
collab_metadata: t.Mapping[str, t.Sequence[str]]
original_media_uri: t.Optional[str]
bank: BankConfig
@property
def enabled(self) -> bool:
if self.disable_until_ts == 0:
return False
if self.disable_until_ts == 1:
return True
return self.disable_until_ts <= time.time()
@dataclass
class BankContentIterationItem:
"""
An item streamed from the datastore for building the index.
"""
signal_type_name: str
signal_val: str
bank_content_id: int
bank_content_timestamp: int
class IBankStore(metaclass=abc.ABCMeta):
"""
Interface for maintaining collections of labeled content (aka banks).
The lifecycle of content is
Content
|
(Hash)
|
v
Signals
|
(Add to Bank)
|
v
+-> id: BankContent
| |
| (Build Index)
| |
| v
+--<-- Index <-- Query
"""
@abc.abstractmethod
def get_banks(self) -> t.Mapping[str, BankConfig]:
"""Return all bank configs"""
def get_bank(self, name: str) -> t.Optional[BankConfig]:
"""Return one bank config"""
return self.get_banks().get(name)
@abc.abstractmethod
def bank_update(
self,
bank: BankConfig,
*,
create: bool = False,
rename_from: t.Optional[str] = None,
) -> None:
"""
Update a bank config in the backing store.
If create is false, will throw an exception if not already existing.
If create is true, will throw an exception if it already exists
If create is false and you're updating the name, rename_from must be provided
"""
@abc.abstractmethod
def bank_delete(self, name: str) -> None:
"""
Delete a bank entirely.
If no such bank exists, no exception is thrown.
"""
# Bank content
@abc.abstractmethod
def bank_content_get(self, id: t.Iterable[int]) -> t.Sequence[BankContentConfig]:
"""Get the content config for a bank"""
@abc.abstractmethod
def bank_content_update(self, val: BankContentConfig) -> None:
"""Update the content config for a bank"""
@abc.abstractmethod
def bank_add_content(
self,
bank_name: str,
content_signals: t.Dict[t.Type[SignalType], str],
config: t.Optional[BankContentConfig] = None,
) -> int:
"""
Add content (Photo, Video, etc) to a bank, where it can match content.
Indexing is not instant, there may be a delay before it match APIs can hit it.
"""
@abc.abstractmethod
def bank_remove_content(self, bank_name: str, content_id: int) -> int:
"""Remove content from bank by id"""
@abc.abstractmethod
def get_current_index_build_target(
self, signal_type: t.Type[SignalType]
) -> SignalTypeIndexBuildCheckpoint:
"""Get information about the total bank size for skipping an index build"""
@abc.abstractmethod
def bank_yield_content(
self, signal_type: t.Optional[t.Type[SignalType]] = None, batch_size: int = 100
) -> t.Iterator[BankContentIterationItem]:
"""
Yield the entire content of the bank in batches.
If a signal type is provided, will yield signals of that type if
they are available for that content.
"""
class IUnifiedStore(
IContentTypeConfigStore,
ISignalTypeConfigStore,
ISignalExchangeStore,
ISignalTypeIndexStore,
IBankStore,
metaclass=abc.ABCMeta,
):
"""
All the store classes combined into one interfaces.
This is probably the most common way to use this, especially early on
in development - the option to pass them more narrowly is helpful
mostly for typing.
"""
def init_flask(cls, app: flask.Flask) -> None:
"""
Make any flask-specific initialization for this storage implementation
This serves as the normal constructor when used with OMM, which allows
you to write __init__ how is most useful to your implementation for
testing.
"""
return