Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Fix background updates to handle redactions/rejections (#5352)
Browse files Browse the repository at this point in the history
* Fix background updates to handle redactions/rejections

In background updates based on current state delta stream we need to
handle that we may not have all the events (or at least that
`get_events` may raise an exception).
  • Loading branch information
erikjohnston authored and hawkowl committed Jun 5, 2019
1 parent 95ab2eb commit 7553881
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 12 deletions.
1 change: 1 addition & 0 deletions changelog.d/5352.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix room stats and presence background updates to correctly handle missing events.
11 changes: 7 additions & 4 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -828,14 +828,17 @@ def _handle_state_delta(self, deltas):
# joins.
continue

event = yield self.store.get_event(event_id)
if event.content.get("membership") != Membership.JOIN:
event = yield self.store.get_event(event_id, allow_none=True)
if not event or event.content.get("membership") != Membership.JOIN:
# We only care about joins
continue

if prev_event_id:
prev_event = yield self.store.get_event(prev_event_id)
if prev_event.content.get("membership") == Membership.JOIN:
prev_event = yield self.store.get_event(prev_event_id, allow_none=True)
if (
prev_event
and prev_event.content.get("membership") == Membership.JOIN
):
# Ignore changes to join events.
continue

Expand Down
18 changes: 13 additions & 5 deletions synapse/handlers/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ def _handle_deltas(self, deltas):
event_id = delta["event_id"]
stream_id = delta["stream_id"]
prev_event_id = delta["prev_event_id"]
stream_pos = delta["stream_id"]

logger.debug("Handling: %r %r, %s", typ, state_key, event_id)

Expand All @@ -136,10 +137,15 @@ def _handle_deltas(self, deltas):
event_content = {}

if event_id is not None:
event_content = (yield self.store.get_event(event_id)).content or {}
event = yield self.store.get_event(event_id, allow_none=True)
if event:
event_content = event.content or {}

# We use stream_pos here rather than fetch by event_id as event_id
# may be None
now = yield self.store.get_received_ts_by_stream_pos(stream_pos)

# quantise time to the nearest bucket
now = yield self.store.get_received_ts(event_id)
now = (now // 1000 // self.stats_bucket_size) * self.stats_bucket_size

if typ == EventTypes.Member:
Expand All @@ -149,9 +155,11 @@ def _handle_deltas(self, deltas):
# compare them.
prev_event_content = {}
if prev_event_id is not None:
prev_event_content = (
yield self.store.get_event(prev_event_id)
).content
prev_event = yield self.store.get_event(
prev_event_id, allow_none=True,
)
if prev_event:
prev_event_content = prev_event.content

membership = event_content.get("membership", Membership.LEAVE)
prev_membership = prev_event_content.get("membership", Membership.LEAVE)
Expand Down
37 changes: 37 additions & 0 deletions synapse/storage/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,43 @@ def get_received_ts(self, event_id):
desc="get_received_ts",
)

def get_received_ts_by_stream_pos(self, stream_ordering):
"""Given a stream ordering get an approximate timestamp of when it
happened.
This is done by simply taking the received ts of the first event that
has a stream ordering greater than or equal to the given stream pos.
If none exists returns the current time, on the assumption that it must
have happened recently.
Args:
stream_ordering (int)
Returns:
Deferred[int]
"""

def _get_approximate_received_ts_txn(txn):
sql = """
SELECT received_ts FROM events
WHERE stream_ordering >= ?
LIMIT 1
"""

txn.execute(sql, (stream_ordering,))
row = txn.fetchone()
if row and row[0]:
ts = row[0]
else:
ts = self.clock.time_msec()

return ts

return self.runInteraction(
"get_approximate_received_ts",
_get_approximate_received_ts_txn,
)

@defer.inlineCallbacks
def get_event(
self,
Expand Down
62 changes: 59 additions & 3 deletions tests/handlers/test_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ def test_incorrect_state_transition(self):
"a2": {"membership": "not a real thing"},
}

def get_event(event_id):
def get_event(event_id, allow_none=True):
m = Mock()
m.content = events[event_id]
d = defer.Deferred()
Expand All @@ -224,7 +224,7 @@ def get_received_ts(event_id):
"room_id": "room",
"event_id": "a1",
"prev_event_id": "a2",
"stream_id": "bleb",
"stream_id": 60,
}
]

Expand All @@ -241,11 +241,67 @@ def get_received_ts(event_id):
"room_id": "room",
"event_id": "a2",
"prev_event_id": "a1",
"stream_id": "bleb",
"stream_id": 100,
}
]

f = self.get_failure(self.handler._handle_deltas(deltas), ValueError)
self.assertEqual(
f.value.args[0], "'not a real thing' is not a valid membership"
)

def test_redacted_prev_event(self):
"""
If the prev_event does not exist, then it is assumed to be a LEAVE.
"""
u1 = self.register_user("u1", "pass")
u1_token = self.login("u1", "pass")

room_1 = self.helper.create_room_as(u1, tok=u1_token)

# Do the initial population of the user directory via the background update
self._add_background_updates()

while not self.get_success(self.store.has_completed_background_updates()):
self.get_success(self.store.do_next_background_update(100), by=0.1)

events = {
"a1": None,
"a2": {"membership": Membership.JOIN},
}

def get_event(event_id, allow_none=True):
if events.get(event_id):
m = Mock()
m.content = events[event_id]
else:
m = None
d = defer.Deferred()
self.reactor.callLater(0.0, d.callback, m)
return d

def get_received_ts(event_id):
return defer.succeed(1)

self.store.get_received_ts = get_received_ts
self.store.get_event = get_event

deltas = [
{
"type": EventTypes.Member,
"state_key": "some_user:test",
"room_id": room_1,
"event_id": "a2",
"prev_event_id": "a1",
"stream_id": 100,
}
]

# Handle our fake deltas, which has a user going from LEAVE -> JOIN.
self.get_success(self.handler._handle_deltas(deltas))

# One delta, with two joined members -- the room creator, and our fake
# user.
r = self.get_success(self.store.get_deltas_for_room(room_1, 0))
self.assertEqual(len(r), 1)
self.assertEqual(r[0]["joined_members"], 2)

0 comments on commit 7553881

Please sign in to comment.