Skip to content

Commit

Permalink
tombstones: store block and call repo callback with #tombstone message
Browse files Browse the repository at this point in the history
snarfed committed May 21, 2024
1 parent 9df7f44 commit cbf0737
Showing 8 changed files with 87 additions and 30 deletions.
9 changes: 5 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
@@ -30,13 +30,13 @@ from lexrpc.flask_server import init_flask

from arroba import server
from arroba.datastore_storage import DatastoreStorage
from arroba.xrpc_sync import send_new_commits
from arroba.xrpc_sync import send_events

# for Google Cloud Datastore
ndb_client = ndb.Client()

server.storage = DatastoreStorage(ndb_client=ndb_client)
server.repo.callback = lambda _: send_new_commits() # to subscribeRepos
server.repo.callback = lambda _: send_events() # to subscribeRepos

app = Flask('my-pds')
init_flask(server.server, app)
@@ -107,11 +107,14 @@ _Breaking changes:_
* `AtpRemoteBlob`: if the blob URL doesn't return the `Content-Type` header, infer type from the URL, or fall back to `application/octet-stream` ([bridgy-fed#1073](https://github.com/snarfed/bridgy-fed/issues/1073)).
* `did`:
* Cache `resolve_plc`, `resolve_web`, and `resolve_handle` for 6h, up to 5000 total results per call.
* `xrpc_sync`: rename `send_new_commits` to `send_events` due to adding account tombstone support.

_Non-breaking changes:_

* `did`:
* Add `HANDLE_RE` regexp for handle validation.
* `storage`:
* Add new `Storage.tombstone_repo` method, implement in `MemoryStorage` and `DatastoreStorage`. [Used to delete accounts.](https://github.com/bluesky-social/atproto/discussions/2503#discussioncomment-9502339) ([bridgy-fed#783](https://github.com/snarfed/bridgy-fed/issues/783))
* `util`:
* `service_jwt`: add optional `aud` kwarg.
* `xrpc_sync`:
@@ -126,8 +129,6 @@ _Non-breaking changes:_

* Bug fix: base32-encode TIDs in record keys, `at://` URIs, commit `rev`s, etc. Before, we were using the integer UNIX timestamp directly, which happened to be the same 13 character length. Oops.
* Switch from `BGS_HOST` environment variable to `RELAY_HOST`. `BGS_HOST` is still supported for backward compatibility.
* `storage`:
* Add new `Storage.tombstone_repo` method, implement in `MemoryStorage` and `DatastoreStorage`. [Used to delete accounts.](https://github.com/bluesky-social/atproto/discussions/2503#discussioncomment-9502339) ([bridgy-fed#783](https://github.com/snarfed/bridgy-fed/issues/783))
* `datastore_storage`:
* Bug fix for `DatastoreStorage.last_seq`, handle new NSID.
* Add new `AtpRemoteBlob` class for storing "remote" blobs, available at public HTTP URLs, that we don't store ourselves.
2 changes: 1 addition & 1 deletion app.py
Original file line number Diff line number Diff line change
@@ -131,7 +131,7 @@ def proxy_appview(nsid_rest=None):
rotation_key=privkey, signing_key=privkey,
handle=os.environ['REPO_HANDLE'])

server.repo.callback = lambda commit_data: xrpc_sync.send_new_commits()
server.repo.callback = lambda commit_data: xrpc_sync.send_events()
if server.repo.handle != os.environ['REPO_HANDLE']:
logger.warning(f"$REPO_HANDLE is {os.environ['REPO_HANDLE']} but loaded repo's handle is {server.repo.handle} !")

13 changes: 9 additions & 4 deletions arroba/datastore_storage.py
Original file line number Diff line number Diff line change
@@ -128,10 +128,14 @@ def rotation_key(self):


class AtpBlock(ndb.Model):
"""A data record, MST node, or commit.
"""A data record, MST node, repo commit, or other event.
Key name is the DAG-CBOR base32 CID of the data.
Events should have a fully-qualified ``$type`` field that's one of the
``message`` types in ``com.atproto.sync.subscribeRepos``, eg
``com.atproto.sync.subscribeRepos#tombstone``.
Properties:
* encoded (bytes): DAG-CBOR encoded value
* data (dict): DAG-JSON value, only used for human debugging
@@ -425,7 +429,7 @@ def load_repo(self, did_or_handle):
rotation_key=atp_repo.rotation_key)

@ndb_context
def tombstone_repo(self, repo):
def _tombstone_repo(self, repo):
@ndb.transactional()
def update():
atp_repo = AtpRepo.get_by_id(repo.did)
@@ -469,8 +473,9 @@ def has(self, cid):
return self.read(cid) is not None

@ndb_context
def write(self, repo_did, obj):
seq = self.allocate_seq(SUBSCRIBE_REPOS_NSID)
def write(self, repo_did, obj, seq=None):
if seq is None:
seq = self.allocate_seq(SUBSCRIBE_REPOS_NSID)
return AtpBlock.create(repo_did=repo_did, data=obj, seq=seq).cid

@ndb_context
9 changes: 6 additions & 3 deletions arroba/repo.py
Original file line number Diff line number Diff line change
@@ -66,8 +66,11 @@ class Repo:
mst (MST)
head (Block): head commit
handle (str)
callback (callable: (CommitData) => None): called on new commits.
May be set directly by clients. None means no callback.
callback (callable: (CommitData | dict) => None): called on new commits
and other repo events. May be set directly by clients. None means no
callback. The parameter will be a :class:`CommitData` for commits, dict
record with ``$type`` for other ``com.atproto.sync.subscribeRepos``
messages.
"""
storage = None
mst = None
@@ -87,7 +90,7 @@ def __init__(self, *, storage=None, mst=None, head=None, handle=None,
mst (MST)
commit (dict): head commit
cid (CID): head CID
callback (callable, CommitData => None)
callback (callable, (CommitData | dict) => None)
signing_key (ec.EllipticCurvePrivateKey): required
rotation_key (ec.EllipticCurvePrivateKey)
"""
52 changes: 43 additions & 9 deletions arroba/storage.py
Original file line number Diff line number Diff line change
@@ -51,12 +51,16 @@ class Action(Enum):


class Block:
r"""An ATProto block: a record, :class:`MST` entry, or commit.
r"""An ATProto block: a record, :class:`MST` entry, commit, or other event.
Can start from either encoded bytes or decoded object, with or without
:class:`CID`. Decodes, encodes, and generates :class:`CID` lazily, on
demand, on attribute access.
Events should have a fully-qualified ``$type`` field that's one of the
``message`` types in ``com.atproto.sync.subscribeRepos``, eg
``com.atproto.sync.subscribeRepos#tombstone``.
Based on :class:`carbox.car.Block`.
Attributes:
@@ -160,9 +164,35 @@ def load_repo(self, did_or_handle):
def tombstone_repo(self, repo):
"""Marks a repo as tombstoned.
* Stores a ``com.atproto.sync.subscribeRepos#tombstone`` block with its
own sequence number.
* If :attr:`Repo.callback` is populated, calls it with the
``com.atproto.sync.subscribeRepos#tombstone`` message.
* Calls :meth:`_tombstone` to mark the repo as tombstoned in storage.
After this, :meth:`load_repo` will raise :class:`TombstonedRepo` for
this repo.
Args:
repo (Repo)
"""
self._tombstone_repo(repo)

seq = self.allocate_seq(SUBSCRIBE_REPOS_NSID)
message = {
'$type': 'com.atproto.sync.subscribeRepos#tombstone',
'seq': seq,
'did': repo.did,
'time': util.now().isoformat(),
}
self.write(repo.did, message, seq=seq)

if repo.callback:
repo.callback(message)

def _tombstone_repo(self, repo):
"""Marks a repo as tombstoned in storage.
Args:
repo (Repo)
"""
@@ -262,16 +292,17 @@ def has(self, cid):
"""
raise NotImplementedError()

def write(self, repo_did, obj):
def write(self, repo_did, obj, seq=None):
"""Writes a node to storage.
Generates new sequence number(s) as necessary for newly stored blocks.
TODO: remove? This seems unused.
Args:
repo_did (str):
obj (dict): a record, commit, or serialized :class:`MST` node
obj (dict): a record, commit, serialized :class:`MST` node, or
`subscribeRepos` event/message
seq (int or None): sequence number. If not provided, a new one will be
allocated.
Returns:
CID:
@@ -347,7 +378,7 @@ def load_repo(self, did_or_handle):
raise TombstonedRepo(f'{repo.did} is tombstoned')
return repo

def tombstone_repo(self, repo):
def _tombstone_repo(self, repo):
repo.status = TOMBSTONED

def read(self, cid):
@@ -368,10 +399,13 @@ def read_blocks_by_seq(self, start=0):
def has(self, cid):
return cid in self.blocks

def write(self, repo_did, obj):
block = Block(decoded=obj, seq=self.allocate_seq(SUBSCRIBE_REPOS_NSID))
def write(self, repo_did, obj, seq=None):
if seq is None:
seq = self.allocate_seq(SUBSCRIBE_REPOS_NSID)

block = Block(decoded=obj, seq=seq)
if block not in self.blocks:
self.blocks.add(block)
self.blocks[block.cid] = block
return block.cid

def apply_commit(self, commit_data):
20 changes: 17 additions & 3 deletions arroba/tests/test_storage.py
Original file line number Diff line number Diff line change
@@ -5,10 +5,10 @@
from multiformats import CID

from ..repo import Repo, Write
from ..storage import Action, Block, MemoryStorage
from ..util import next_tid, TOMBSTONED, TombstonedRepo
from ..storage import Action, Block, MemoryStorage, SUBSCRIBE_REPOS_NSID
from ..util import dag_cbor_cid, next_tid, TOMBSTONED, TombstonedRepo

from .testutil import TestCase
from .testutil import NOW, TestCase

DECODED = {'foo': 'bar'}
ENCODED = b'\xa1cfoocbar'
@@ -78,11 +78,25 @@ def test_read_commits_by_seq_include_record_block_even_if_preexisting(self):
self.assertEqual(record, commits[0].blocks[record.cid])

def test_tombstone_repo(self):
seen = []
storage = MemoryStorage()
repo = Repo.create(storage, 'did:user', signing_key=self.key)
self.assertEqual(1, storage.last_seq(SUBSCRIBE_REPOS_NSID))

repo.callback = lambda event: seen.append(event)
storage.tombstone_repo(repo)

self.assertEqual(TOMBSTONED, repo.status)

self.assertEqual(2, storage.last_seq(SUBSCRIBE_REPOS_NSID))
expected = {
'$type': 'com.atproto.sync.subscribeRepos#tombstone',
'seq': 2,
'did': 'did:user',
'time': NOW.isoformat(),
}
self.assertEqual([expected], seen)
self.assertEqual(expected, storage.read(dag_cbor_cid(expected)).decoded)

with self.assertRaises(TombstonedRepo):
storage.load_repo('did:user')
2 changes: 1 addition & 1 deletion arroba/tests/test_xrpc_sync.py
Original file line number Diff line number Diff line change
@@ -428,7 +428,7 @@ def test_get_record_not_found(self):
class SubscribeReposTest(testutil.XrpcTestCase):
def setUp(self):
super().setUp()
self.repo.callback = lambda commit_data: xrpc_sync.send_new_commits()
self.repo.callback = lambda commit_data: xrpc_sync.send_events()

def subscribe(self, received, delivered=None, limit=None, cursor=None):
"""subscribeRepos websocket client. May be run in a thread.
10 changes: 5 additions & 5 deletions arroba/xrpc_sync.py
Original file line number Diff line number Diff line change
@@ -23,7 +23,7 @@

logger = logging.getLogger(__name__)

# used by subscribe_repos and send_new_commits
# used by subscribe_repos and send_events
NEW_COMMITS_TIMEOUT = timedelta(seconds=60)
new_commits = Condition()

@@ -72,7 +72,7 @@ def get_repo(input, did=None, since=None):
# }]


def send_new_commits():
def send_events():
"""Triggers ``subscribeRepos`` to deliver new commits from storage to subscribers.
"""
logger.debug(f'Triggering subscribeRepos to look for new commits')
@@ -91,12 +91,12 @@ def subscribe_repos(cursor=None):
choose how to register and serve it themselves, eg asyncio vs threads vs
WSGI workers.
See :func:`send_new_commits` for an example thread-based callback to
See :func:`send_events` for an example thread-based callback to
register with :class:`Repo` to deliver all new commits to subscribers.
Here's how to register that callback and this XRPC method in a threaded
context::
context:
server.repo.callback = lambda commit_data: xrpc_sync.send_new_commits()
server.repo.callback = lambda commit_data: xrpc_sync.send_events()
server.server.register('com.atproto.sync.subscribeRepos',
xrpc_sync.subscribe_repos)

0 comments on commit cbf0737

Please sign in to comment.