diff --git a/README.md b/README.md index 0b84a3c..2a7c7d1 100644 --- a/README.md +++ b/README.md @@ -30,13 +30,13 @@ from lexrpc.flask_server import init_flask from arroba import server from arroba.datastore_storage import DatastoreStorage -from arroba.xrpc_sync import send_new_commits +from arroba.xrpc_sync import send_events # for Google Cloud Datastore ndb_client = ndb.Client() server.storage = DatastoreStorage(ndb_client=ndb_client) -server.repo.callback = lambda _: send_new_commits() # to subscribeRepos +server.repo.callback = lambda _: send_events() # to subscribeRepos app = Flask('my-pds') init_flask(server.server, app) @@ -107,11 +107,14 @@ _Breaking changes:_ * `AtpRemoteBlob`: if the blob URL doesn't return the `Content-Type` header, infer type from the URL, or fall back to `application/octet-stream` ([bridgy-fed#1073](https://github.com/snarfed/bridgy-fed/issues/1073)). * `did`: * Cache `resolve_plc`, `resolve_web`, and `resolve_handle` for 6h, up to 5000 total results per call. +* `xrpc_sync`: rename `send_new_commits` to `send_events` due to adding account tombstone support. _Non-breaking changes:_ * `did`: * Add `HANDLE_RE` regexp for handle validation. +* `storage`: + * Add new `Storage.tombstone_repo` method, implement in `MemoryStorage` and `DatastoreStorage`. [Used to delete accounts.](https://github.com/bluesky-social/atproto/discussions/2503#discussioncomment-9502339) ([bridgy-fed#783](https://github.com/snarfed/bridgy-fed/issues/783)) * `util`: * `service_jwt`: add optional `aud` kwarg. * `xrpc_sync`: @@ -126,8 +129,6 @@ _Non-breaking changes:_ * Bug fix: base32-encode TIDs in record keys, `at://` URIs, commit `rev`s, etc. Before, we were using the integer UNIX timestamp directly, which happened to be the same 13 character length. Oops. * Switch from `BGS_HOST` environment variable to `RELAY_HOST`. `BGS_HOST` is still supported for backward compatibility. -* `storage`: - * Add new `Storage.tombstone_repo` method, implement in `MemoryStorage` and `DatastoreStorage`. [Used to delete accounts.](https://github.com/bluesky-social/atproto/discussions/2503#discussioncomment-9502339) ([bridgy-fed#783](https://github.com/snarfed/bridgy-fed/issues/783)) * `datastore_storage`: * Bug fix for `DatastoreStorage.last_seq`, handle new NSID. * Add new `AtpRemoteBlob` class for storing "remote" blobs, available at public HTTP URLs, that we don't store ourselves. diff --git a/app.py b/app.py index b7bb282..7b926d0 100644 --- a/app.py +++ b/app.py @@ -131,7 +131,7 @@ def proxy_appview(nsid_rest=None): rotation_key=privkey, signing_key=privkey, handle=os.environ['REPO_HANDLE']) -server.repo.callback = lambda commit_data: xrpc_sync.send_new_commits() +server.repo.callback = lambda commit_data: xrpc_sync.send_events() if server.repo.handle != os.environ['REPO_HANDLE']: logger.warning(f"$REPO_HANDLE is {os.environ['REPO_HANDLE']} but loaded repo's handle is {server.repo.handle} !") diff --git a/arroba/datastore_storage.py b/arroba/datastore_storage.py index c2a847f..2fda118 100644 --- a/arroba/datastore_storage.py +++ b/arroba/datastore_storage.py @@ -128,10 +128,14 @@ def rotation_key(self): class AtpBlock(ndb.Model): - """A data record, MST node, or commit. + """A data record, MST node, repo commit, or other event. Key name is the DAG-CBOR base32 CID of the data. + Events should have a fully-qualified ``$type`` field that's one of the + ``message`` types in ``com.atproto.sync.subscribeRepos``, eg + ``com.atproto.sync.subscribeRepos#tombstone``. + Properties: * encoded (bytes): DAG-CBOR encoded value * data (dict): DAG-JSON value, only used for human debugging @@ -425,7 +429,7 @@ def load_repo(self, did_or_handle): rotation_key=atp_repo.rotation_key) @ndb_context - def tombstone_repo(self, repo): + def _tombstone_repo(self, repo): @ndb.transactional() def update(): atp_repo = AtpRepo.get_by_id(repo.did) @@ -469,8 +473,9 @@ def has(self, cid): return self.read(cid) is not None @ndb_context - def write(self, repo_did, obj): - seq = self.allocate_seq(SUBSCRIBE_REPOS_NSID) + def write(self, repo_did, obj, seq=None): + if seq is None: + seq = self.allocate_seq(SUBSCRIBE_REPOS_NSID) return AtpBlock.create(repo_did=repo_did, data=obj, seq=seq).cid @ndb_context diff --git a/arroba/repo.py b/arroba/repo.py index 03e8e0e..2d48fc6 100644 --- a/arroba/repo.py +++ b/arroba/repo.py @@ -66,8 +66,11 @@ class Repo: mst (MST) head (Block): head commit handle (str) - callback (callable: (CommitData) => None): called on new commits. - May be set directly by clients. None means no callback. + callback (callable: (CommitData | dict) => None): called on new commits + and other repo events. May be set directly by clients. None means no + callback. The parameter will be a :class:`CommitData` for commits, dict + record with ``$type`` for other ``com.atproto.sync.subscribeRepos`` + messages. """ storage = None mst = None @@ -87,7 +90,7 @@ def __init__(self, *, storage=None, mst=None, head=None, handle=None, mst (MST) commit (dict): head commit cid (CID): head CID - callback (callable, CommitData => None) + callback (callable, (CommitData | dict) => None) signing_key (ec.EllipticCurvePrivateKey): required rotation_key (ec.EllipticCurvePrivateKey) """ diff --git a/arroba/storage.py b/arroba/storage.py index 771c12d..e7f1779 100644 --- a/arroba/storage.py +++ b/arroba/storage.py @@ -51,12 +51,16 @@ class Action(Enum): class Block: - r"""An ATProto block: a record, :class:`MST` entry, or commit. + r"""An ATProto block: a record, :class:`MST` entry, commit, or other event. Can start from either encoded bytes or decoded object, with or without :class:`CID`. Decodes, encodes, and generates :class:`CID` lazily, on demand, on attribute access. + Events should have a fully-qualified ``$type`` field that's one of the + ``message`` types in ``com.atproto.sync.subscribeRepos``, eg + ``com.atproto.sync.subscribeRepos#tombstone``. + Based on :class:`carbox.car.Block`. Attributes: @@ -160,9 +164,35 @@ def load_repo(self, did_or_handle): def tombstone_repo(self, repo): """Marks a repo as tombstoned. + * Stores a ``com.atproto.sync.subscribeRepos#tombstone`` block with its + own sequence number. + * If :attr:`Repo.callback` is populated, calls it with the + ``com.atproto.sync.subscribeRepos#tombstone`` message. + * Calls :meth:`_tombstone` to mark the repo as tombstoned in storage. + After this, :meth:`load_repo` will raise :class:`TombstonedRepo` for this repo. + Args: + repo (Repo) + """ + self._tombstone_repo(repo) + + seq = self.allocate_seq(SUBSCRIBE_REPOS_NSID) + message = { + '$type': 'com.atproto.sync.subscribeRepos#tombstone', + 'seq': seq, + 'did': repo.did, + 'time': util.now().isoformat(), + } + self.write(repo.did, message, seq=seq) + + if repo.callback: + repo.callback(message) + + def _tombstone_repo(self, repo): + """Marks a repo as tombstoned in storage. + Args: repo (Repo) """ @@ -262,16 +292,17 @@ def has(self, cid): """ raise NotImplementedError() - def write(self, repo_did, obj): + def write(self, repo_did, obj, seq=None): """Writes a node to storage. - Generates new sequence number(s) as necessary for newly stored blocks. - TODO: remove? This seems unused. Args: repo_did (str): - obj (dict): a record, commit, or serialized :class:`MST` node + obj (dict): a record, commit, serialized :class:`MST` node, or + `subscribeRepos` event/message + seq (int or None): sequence number. If not provided, a new one will be + allocated. Returns: CID: @@ -347,7 +378,7 @@ def load_repo(self, did_or_handle): raise TombstonedRepo(f'{repo.did} is tombstoned') return repo - def tombstone_repo(self, repo): + def _tombstone_repo(self, repo): repo.status = TOMBSTONED def read(self, cid): @@ -368,10 +399,13 @@ def read_blocks_by_seq(self, start=0): def has(self, cid): return cid in self.blocks - def write(self, repo_did, obj): - block = Block(decoded=obj, seq=self.allocate_seq(SUBSCRIBE_REPOS_NSID)) + def write(self, repo_did, obj, seq=None): + if seq is None: + seq = self.allocate_seq(SUBSCRIBE_REPOS_NSID) + + block = Block(decoded=obj, seq=seq) if block not in self.blocks: - self.blocks.add(block) + self.blocks[block.cid] = block return block.cid def apply_commit(self, commit_data): diff --git a/arroba/tests/test_storage.py b/arroba/tests/test_storage.py index 03bc967..99bd257 100644 --- a/arroba/tests/test_storage.py +++ b/arroba/tests/test_storage.py @@ -5,10 +5,10 @@ from multiformats import CID from ..repo import Repo, Write -from ..storage import Action, Block, MemoryStorage -from ..util import next_tid, TOMBSTONED, TombstonedRepo +from ..storage import Action, Block, MemoryStorage, SUBSCRIBE_REPOS_NSID +from ..util import dag_cbor_cid, next_tid, TOMBSTONED, TombstonedRepo -from .testutil import TestCase +from .testutil import NOW, TestCase DECODED = {'foo': 'bar'} ENCODED = b'\xa1cfoocbar' @@ -78,11 +78,25 @@ def test_read_commits_by_seq_include_record_block_even_if_preexisting(self): self.assertEqual(record, commits[0].blocks[record.cid]) def test_tombstone_repo(self): + seen = [] storage = MemoryStorage() repo = Repo.create(storage, 'did:user', signing_key=self.key) + self.assertEqual(1, storage.last_seq(SUBSCRIBE_REPOS_NSID)) + + repo.callback = lambda event: seen.append(event) storage.tombstone_repo(repo) self.assertEqual(TOMBSTONED, repo.status) + self.assertEqual(2, storage.last_seq(SUBSCRIBE_REPOS_NSID)) + expected = { + '$type': 'com.atproto.sync.subscribeRepos#tombstone', + 'seq': 2, + 'did': 'did:user', + 'time': NOW.isoformat(), + } + self.assertEqual([expected], seen) + self.assertEqual(expected, storage.read(dag_cbor_cid(expected)).decoded) + with self.assertRaises(TombstonedRepo): storage.load_repo('did:user') diff --git a/arroba/tests/test_xrpc_sync.py b/arroba/tests/test_xrpc_sync.py index 8a033e0..b2c0c81 100644 --- a/arroba/tests/test_xrpc_sync.py +++ b/arroba/tests/test_xrpc_sync.py @@ -428,7 +428,7 @@ def test_get_record_not_found(self): class SubscribeReposTest(testutil.XrpcTestCase): def setUp(self): super().setUp() - self.repo.callback = lambda commit_data: xrpc_sync.send_new_commits() + self.repo.callback = lambda commit_data: xrpc_sync.send_events() def subscribe(self, received, delivered=None, limit=None, cursor=None): """subscribeRepos websocket client. May be run in a thread. diff --git a/arroba/xrpc_sync.py b/arroba/xrpc_sync.py index f11ee2d..28860f8 100644 --- a/arroba/xrpc_sync.py +++ b/arroba/xrpc_sync.py @@ -23,7 +23,7 @@ logger = logging.getLogger(__name__) -# used by subscribe_repos and send_new_commits +# used by subscribe_repos and send_events NEW_COMMITS_TIMEOUT = timedelta(seconds=60) new_commits = Condition() @@ -72,7 +72,7 @@ def get_repo(input, did=None, since=None): # }] -def send_new_commits(): +def send_events(): """Triggers ``subscribeRepos`` to deliver new commits from storage to subscribers. """ logger.debug(f'Triggering subscribeRepos to look for new commits') @@ -91,12 +91,12 @@ def subscribe_repos(cursor=None): choose how to register and serve it themselves, eg asyncio vs threads vs WSGI workers. - See :func:`send_new_commits` for an example thread-based callback to + See :func:`send_events` for an example thread-based callback to register with :class:`Repo` to deliver all new commits to subscribers. Here's how to register that callback and this XRPC method in a threaded - context:: + context: - server.repo.callback = lambda commit_data: xrpc_sync.send_new_commits() + server.repo.callback = lambda commit_data: xrpc_sync.send_events() server.server.register('com.atproto.sync.subscribeRepos', xrpc_sync.subscribe_repos)