diff --git a/README.md b/README.md index 2a7c7d1..ef9d168 100644 --- a/README.md +++ b/README.md @@ -86,7 +86,7 @@ Configure arroba with these environment variables: Optional, only used in [com.atproto.repo](https://arroba.readthedocs.io/en/stable/source/arroba.html#module-arroba.xrpc_repo), [.server](https://arroba.readthedocs.io/en/stable/source/arroba.html#module-arroba.xrpc_server), and [.sync](https://arroba.readthedocs.io/en/stable/source/arroba.html#module-arroba.xrpc_sync) XRPC handlers: * `REPO_TOKEN`, static token to use as both `accessJwt` and `refreshJwt`, defaults to contents of `repo_token` file. Not required to be an actual JWT. If not set, XRPC methods that require auth will return HTTP 501 Not Implemented. -* `ROLLBACK_WINDOW`, number of commits to serve in the [`subscribeRepos` rollback window](https://atproto.com/specs/event-stream#sequence-numbers). Defaults to no limit. +* `ROLLBACK_WINDOW`, number of events to serve in the [`subscribeRepos` rollback window](https://atproto.com/specs/event-stream#sequence-numbers). Defaults to no limit. <!-- Only used in app.py: * `REPO_DID`, repo user's DID, defaults to contents of `repo_did` file @@ -107,7 +107,8 @@ _Breaking changes:_ * `AtpRemoteBlob`: if the blob URL doesn't return the `Content-Type` header, infer type from the URL, or fall back to `application/octet-stream` ([bridgy-fed#1073](https://github.com/snarfed/bridgy-fed/issues/1073)). * `did`: * Cache `resolve_plc`, `resolve_web`, and `resolve_handle` for 6h, up to 5000 total results per call. -* `xrpc_sync`: rename `send_new_commits` to `send_events` due to adding account tombstone support. +* `storage`: rename `Storage.read_commits_by_seq` to `read_events_by_seq` for new account tombstone support. +* `xrpc_sync`: rename `send_new_commits` to `send_events`, ditto. _Non-breaking changes:_ @@ -119,6 +120,7 @@ _Non-breaking changes:_ * `service_jwt`: add optional `aud` kwarg. * `xrpc_sync`: * `subscribeRepos`: + * Add support for non-commit events, starting with account tombstones. * Add `ROLLBACK_WINDOW` environment variable to limit size of [rollback window](https://atproto.com/specs/event-stream#sequence-numbers). Defaults to no limit. * For commits with create or update operations, always include the record block, even if it already existed in the repo beforehand ([snarfed/bridgy-fed#1016](https://github.com/snarfed/bridgy-fed/issues/1016)). * Bug fix, populate the time each commit was created in `time` instead of the current time ([snarfed/bridgy-fed#1015](https://github.com/snarfed/bridgy-fed/issues/1015)). diff --git a/app.yaml b/app.yaml index 59f60af..606f6ae 100644 --- a/app.yaml +++ b/app.yaml @@ -10,7 +10,7 @@ runtime_config: operating_system: ubuntu18 runtime_version: 3.9 -# need only one instance so that new commits can be delivered to subscribeRepos +# need only one instance so that new events can be delivered to subscribeRepos # subscribers in memory manual_scaling: instances: 1 diff --git a/arroba/storage.py b/arroba/storage.py index e7f1779..87e02a0 100644 --- a/arroba/storage.py +++ b/arroba/storage.py @@ -25,6 +25,7 @@ class Action(Enum): DELETE = auto() # TODO: Should this be a subclass of Block? +# TODO: generalize to handle other events CommitData = namedtuple('CommitData', [ 'commit', # Block 'blocks', # dict of CID to Block @@ -119,7 +120,7 @@ def __hash__(self): class Storage: - """Abstract base class for storing nodes: records, MST entries, and commits. + """Abstract base class for storing nodes: records, MST entries, commits, etc. Concrete subclasses should implement this on top of physical storage, eg database, filesystem, in memory. @@ -234,16 +235,17 @@ def read_blocks_by_seq(self, start=0): """ raise NotImplementedError() - def read_commits_by_seq(self, start=0): - """Batch read commits from storage by ``subscribeRepos`` sequence number. + def read_events_by_seq(self, start=0): + """Batch read commits and other events by ``subscribeRepos`` sequence number. Args: seq (int): optional ``subscribeRepos`` sequence number to start from, inclusive. Defaults to 0. Returns: - generator: generator of :class:`CommitData`, starting from ``seq``, - inclusive, in ascending ``seq`` order + generator: generator of :class:`CommitData` for commits and dict + messages for other events, starting from ``seq``, inclusive, in + ascending ``seq`` order """ assert start >= 0 @@ -263,7 +265,10 @@ def make_commit(): for block in self.read_blocks_by_seq(start=start): assert block.seq if block.seq != seq: # switching to a new commit's blocks - if commit_block: + if block.decoded.get('$type', '').startswith( + 'com.atproto.sync.subscribeRepos#'): + yield block.decoded # non-commit message + elif commit_block: yield make_commit() else: assert blocks is None # only the first commit diff --git a/arroba/tests/test_storage.py b/arroba/tests/test_storage.py index 99bd257..6c80a64 100644 --- a/arroba/tests/test_storage.py +++ b/arroba/tests/test_storage.py @@ -32,7 +32,7 @@ def test_block_eq(self): def test_block_hash(self): self.assertEqual(id(Block(decoded=DECODED)), id(Block(encoded=ENCODED))) - def test_read_commits_by_seq(self): + def test_read_events_by_seq(self): commit_cids = [] storage = MemoryStorage() @@ -49,11 +49,11 @@ def test_read_commits_by_seq(self): commit_cids.append(repo.head.cid) self.assertEqual(commit_cids, [cd.commit.cid for cd in - storage.read_commits_by_seq()]) + storage.read_events_by_seq()]) self.assertEqual(commit_cids[1:], [cd.commit.cid for cd in - storage.read_commits_by_seq(start=2)]) + storage.read_events_by_seq(start=2)]) - def test_read_commits_by_seq_include_record_block_even_if_preexisting(self): + def test_read_events_by_seq_include_record_block_even_if_preexisting(self): # https://github.com/snarfed/bridgy-fed/issues/1016#issuecomment-2109276344 commit_cids = [] @@ -69,7 +69,7 @@ def test_read_commits_by_seq_include_record_block_even_if_preexisting(self): second = Write(Action.CREATE, 'co.ll', next_tid(), {'foo': 'bar'}) commit_cid = repo.apply_writes([second]) - commits = list(storage.read_commits_by_seq(start=3)) + commits = list(storage.read_events_by_seq(start=3)) self.assertEqual(1, len(commits)) self.assertEqual(repo.head.cid, commits[0].commit.cid) self.assertEqual(prev, commits[0].prev) diff --git a/arroba/tests/test_xrpc_sync.py b/arroba/tests/test_xrpc_sync.py index b2c0c81..fc8d1ea 100644 --- a/arroba/tests/test_xrpc_sync.py +++ b/arroba/tests/test_xrpc_sync.py @@ -445,6 +445,7 @@ def subscribe(self, received, delivered=None, limit=None, cursor=None): xrpc_sync.subscribe_repos(cursor=cursor)): self.assertIn(header, [ {'op': 1, 't': '#commit'}, + {'op': 1, 't': '#tombstone'}, {'op': -1}, ]) received.append(payload) @@ -648,6 +649,18 @@ def test_include_preexisting_record_block(self, *_): subscriber.join() + def test_tombstone(self, *_): + server.storage.tombstone_repo(self.repo) + + seq = server.storage.last_seq(SUBSCRIBE_REPOS_NSID) + header, payload = next(iter(xrpc_sync.subscribe_repos(cursor=seq))) + self.assertEqual({'op': 1, 't': '#tombstone'}, header) + self.assertEqual({ + 'seq': seq, + 'did': self.repo.did, + 'time': testutil.NOW.isoformat(), + }, payload) + class DatastoreXrpcSyncTest(XrpcSyncTest, testutil.DatastoreTest): STORAGE_CLS = DatastoreStorage diff --git a/arroba/xrpc_sync.py b/arroba/xrpc_sync.py index 28860f8..6f1cae8 100644 --- a/arroba/xrpc_sync.py +++ b/arroba/xrpc_sync.py @@ -151,18 +151,27 @@ def header_payload(commit_data): yield ({'op': 1, 't': '#info'}, {'name': 'OutdatedCursor'}) cursor = rollback_start - logger.info(f'fetching existing commits from seq {cursor}') - for commit_data in server.storage.read_commits_by_seq(start=cursor): - yield header_payload(commit_data) - last_seq = commit_data.commit.seq - - # serve new commits as they happen - logger.info(f'serving new commits') + logger.info(f'fetching existing events from seq {cursor}') + for event in server.storage.read_events_by_seq(start=cursor): + if isinstance(event, CommitData): + yield header_payload(event) + last_seq = event.commit.seq + elif isinstance(event, dict): + type = event.pop('$type') + type_fragment = type.removeprefix('com.atproto.sync.subscribeRepos') + assert type_fragment != type, type + yield {'op': 1, 't': type_fragment}, event + last_seq = event['seq'] + else: + raise RuntimeError(f'unexpected event type {event.__class__} {event}') + + # serve new events as they happen + logger.info(f'serving new events') while True: with new_commits: new_commits.wait(NEW_COMMITS_TIMEOUT.total_seconds()) - for commit_data in server.storage.read_commits_by_seq(start=last_seq + 1): + for commit_data in server.storage.read_events_by_seq(start=last_seq + 1): yield header_payload(commit_data) last_seq = commit_data.commit.seq