Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Raw commit from debugging "Current state for room {room_id} is empty"…
Browse files Browse the repository at this point in the history
… error
  • Loading branch information
MadLittleMods committed May 12, 2022
1 parent d80a7ab commit f60236b
Show file tree
Hide file tree
Showing 7 changed files with 114 additions and 6 deletions.
2 changes: 1 addition & 1 deletion scripts-dev/complement.sh
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,4 @@ docker build -t $COMPLEMENT_BASE_IMAGE -f "docker/complement/$COMPLEMENT_DOCKERF
# Run the tests!
echo "Images built; running complement"
cd "$COMPLEMENT_DIR"
go test -v -tags synapse_blacklist,msc2716,msc3030,faster_joins -count=1 "${extra_test_args[@]}" "$@" ./tests/...
go test -v -tags synapse_blacklist,msc2716,msc3030,faster_joins -count=1 "${extra_test_args[@]}" "$@" ./tests/
2 changes: 2 additions & 0 deletions synapse/handlers/federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,8 @@ async def do_invite_join(
# room stuff after join currently doesn't work on workers.
assert self.config.worker.worker_app is None

logger.info("traceFrom(do_invite_join) for _update_current_state_txn")

logger.debug("Joining %s to %s", joinee, room_id)

origin, event, room_version_obj = await self._make_and_verify_event(
Expand Down
41 changes: 37 additions & 4 deletions synapse/handlers/federation_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,10 @@ async def process_remote_join(
room_id, itertools.chain(auth_events, state)
)

logger.info(
"process_remote_join state=%s partial_state=%s", state, partial_state
)

# and now persist the join event itself.
logger.info(
"Peristing join-via-remote %s (partial_state: %s)", event, partial_state
Expand All @@ -475,7 +479,19 @@ async def process_remote_join(
# and discover that we do not have it.
event.internal_metadata.proactively_send = False

return await self.persist_events_and_notify(room_id, [(event, context)])
stream_id_after_persist = await self.persist_events_and_notify(
room_id, [(event, context)]
)

# Do this after the state from the remote join was persisted (via
# `persist_events_and_notify`). Otherwise we can run into a
# situation where the create event doesn't exist yet in the
# `current_state_events`
for e in state:
await self._handle_marker_event(origin, e)
# TODO: Loop through previous state to find other markers

return stream_id_after_persist

async def update_state_for_partial_state_event(
self, destination: str, event: EventBase
Expand Down Expand Up @@ -1200,25 +1216,40 @@ async def _handle_marker_event(self, origin: str, marker_event: EventBase) -> No
"""

if marker_event.type != EventTypes.MSC2716_MARKER:
# logger.info(
# "_handle_marker_event not a marker event marker_event.type=%s",
# marker_event.type,
# )
# Not a marker event
return

logger.info("_handle_marker_event next 0000000000000000000000000000000000")

if marker_event.rejected_reason is not None:
logger.info(
"_handle_marker_event rejected %s", marker_event.rejected_reason
)
# Rejected event
return

logger.info("_handle_marker_event next 1111111111111111111111111111111111")

# Skip processing a marker event if the room version doesn't
# support it or the event is not from the room creator.
room_version = await self._store.get_room_version(marker_event.room_id)
logger.info("_handle_marker_event next 2222222222222222222222222222222222")
create_event = await self._store.get_create_event_for_room(marker_event.room_id)
logger.info("_handle_marker_event next 3333333333333333333333333333333333")
room_creator = create_event.content.get(EventContentFields.ROOM_CREATOR)
logger.info("_handle_marker_event next 4444444444444444444444444444444444")
if not room_version.msc2716_historical and (
not self._config.experimental.msc2716_enabled
or marker_event.sender != room_creator
):
logger.info("_handle_marker_event skipping room_version=%s", room_version)
return

logger.debug("_handle_marker_event: received %s", marker_event)
logger.info("_handle_marker_event: received %s", marker_event)

insertion_event_id = marker_event.content.get(
EventContentFields.MSC2716_MARKER_INSERTION
Expand All @@ -1228,7 +1259,7 @@ async def _handle_marker_event(self, origin: str, marker_event: EventBase) -> No
# Nothing to retrieve then (invalid marker)
return

logger.debug(
logger.info(
"_handle_marker_event: backfilling insertion event %s", insertion_event_id
)

Expand Down Expand Up @@ -1260,7 +1291,7 @@ async def _handle_marker_event(self, origin: str, marker_event: EventBase) -> No
insertion_event_id, marker_event.room_id
)

logger.debug(
logger.info(
"_handle_marker_event: insertion extremity added for %s from marker event %s",
insertion_event,
marker_event,
Expand Down Expand Up @@ -1947,6 +1978,8 @@ async def persist_events_and_notify(
Returns:
The stream ID after which all events have been persisted.
"""
# logger.info("persist_events_and_notify event_and_contexts(%d)=%s", len(event_and_contexts), event_and_contexts)

if not event_and_contexts:
return self._store.get_room_max_stream_ordering()

Expand Down
16 changes: 16 additions & 0 deletions synapse/rest/client/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,14 @@ async def on_POST(
remote_room_hosts,
)

logger.info("/join/:aliasOrId target_user=%s", requester.user)
logger.info(
"++++---------------------------------------------------------------++++"
)
logger.info(
"++++---------------------------------------------------------------++++"
)

await self.room_member_handler.update_membership(
requester=requester,
target=requester.user,
Expand Down Expand Up @@ -840,6 +848,14 @@ async def on_POST(
) -> Tuple[int, JsonDict]:
requester = await self.auth.get_user_by_req(request, allow_guest=True)

logger.info("/%s target_user=%s", membership_action, requester.user)
logger.info(
"-----------------------------------------------------------------------"
)
logger.info(
"-----------------------------------------------------------------------"
)

if requester.is_guest and membership_action not in {
Membership.JOIN,
Membership.LEAVE,
Expand Down
33 changes: 33 additions & 0 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,13 @@ async def _persist_events_and_state_updates(
Resolves when the events have been persisted
"""

logger.info(
"_persist_events_and_state_updates events_and_contexts(%d) current_state_for_room(%d) state_delta_for_room(%d)",
len(events_and_contexts),
len(current_state_for_room),
len(state_delta_for_room),
)

# We want to calculate the stream orderings as late as possible, as
# we only notify after all events with a lesser stream ordering have
# been persisted. I.e. if we spend 10s inside the with block then
Expand Down Expand Up @@ -999,7 +1006,17 @@ def _update_current_state_txn(
state_delta_by_room: Dict[str, DeltaState],
stream_id: int,
):
logger.info(
"_update_current_state_txn state_delta_by_room=%s", state_delta_by_room
)
for room_id, delta_state in state_delta_by_room.items():
logger.info(
"_update_current_state_txn room_id=%s delta_state=%s",
room_id,
delta_state,
exc_info=True,
)

to_delete = delta_state.to_delete
to_insert = delta_state.to_insert

Expand Down Expand Up @@ -1037,11 +1054,21 @@ def _update_current_state_txn(
users_in_room = self.store.get_users_in_room_txn(txn, room_id)
members_changed.update(users_in_room)

logger.info(
"_update_current_state_txn no_longer_in_room deleting all state for room_id=%s (before)",
room_id,
)

self.db_pool.simple_delete_txn(
txn,
table="current_state_events",
keyvalues={"room_id": room_id},
)

logger.info(
"_update_current_state_txn no_longer_in_room deleting all state for room_id=%s (after)",
room_id,
)
else:
# We're still in the room, so we update the current state as normal.

Expand Down Expand Up @@ -1092,6 +1119,12 @@ def _update_current_state_txn(
),
)

logger.info(
"_update_current_state_txn inserting current_state_events to_insert=%s to_delete=%s",
to_insert,
to_delete,
)

# We include the membership in the current state table, hence we do
# a lookup when we insert. This assumes that all events have already
# been inserted into room_memberships.
Expand Down
4 changes: 4 additions & 0 deletions synapse/storage/databases/main/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,10 @@ async def get_create_event_for_room(self, room_id: str) -> EventBase:
"""
state_ids = await self.get_current_state_ids(room_id)

logger.info(
"get_create_event_for_room room_id=%s state_ids=%s", room_id, state_ids
)

if not state_ids:
raise NotFoundError(f"Current state for room {room_id} is empty")

Expand Down
22 changes: 21 additions & 1 deletion synapse/storage/persist_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,11 +313,15 @@ async def persist_events(
matched the transcation ID; the existing event is returned in such
a case.
"""

# logger.info("persist_events events_and_contexts(%d)", len(events_and_contexts))

partitioned: Dict[str, List[Tuple[EventBase, EventContext]]] = {}
for event, ctx in events_and_contexts:
partitioned.setdefault(event.room_id, []).append((event, ctx))

async def enqueue(item):
# logger.info("persist_events enqueue=%s", item)
room_id, evs_ctxs = item
return await self._event_persist_queue.add_to_queue(
room_id, evs_ctxs, backfilled=backfilled
Expand Down Expand Up @@ -450,6 +454,12 @@ async def _persist_event_batch(
if not events_and_contexts:
return replaced_events

logger.info(
"traceFrom(_persist_event_batch) for _update_current_state_txn events_and_contexts(%d) backfilled=%s",
len(events_and_contexts),
backfilled,
)

# Check if any of the events have a transaction ID that has already been
# persisted, and if so we don't persist it again.
#
Expand Down Expand Up @@ -515,14 +525,18 @@ async def _persist_event_batch(
(event, context)
)

for room_id, ev_ctx_rm in events_by_room.items():
events_by_room_items = events_by_room.items()

for room_id, ev_ctx_rm in events_by_room_items:
latest_event_ids = set(
await self.main_store.get_latest_event_ids_in_room(room_id)
)
new_latest_event_ids = await self._calculate_new_extremities(
room_id, ev_ctx_rm, latest_event_ids
)

logger.info("persist_event_batch new_latest_event_ids=%s latest_event_ids=%s", new_latest_event_ids, latest_event_ids)

if new_latest_event_ids == latest_event_ids:
# No change in extremities, so no change in state
continue
Expand Down Expand Up @@ -590,6 +604,12 @@ async def _persist_event_batch(

new_forward_extremities[room_id] = new_latest_event_ids

# TODO: Left off here. need to see why
# `state_delta_for_room` is empty in the case where it's
# not working. Need to check how the delta is being
# calculated
logger.info("persist_event_batch delta_ids=%s", delta_ids)

# If either are not None then there has been a change,
# and we need to work out the delta (or use that
# given)
Expand Down

0 comments on commit f60236b

Please sign in to comment.