From a07b0415f3a3f9c2c65d36098400ddf530ce95d1 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Thu, 21 Jul 2022 16:26:34 +0100 Subject: [PATCH 01/21] Factor out CreateMessageEvent() function --- ...federation_room_join_partial_state_test.go | 32 +++++++++++++------ 1 file changed, 23 insertions(+), 9 deletions(-) diff --git a/tests/federation_room_join_partial_state_test.go b/tests/federation_room_join_partial_state_test.go index a532e9ca..f8ffb8cf 100644 --- a/tests/federation_room_join_partial_state_test.go +++ b/tests/federation_room_join_partial_state_test.go @@ -142,15 +142,7 @@ func TestPartialStateJoin(t *testing.T) { federation.HandleEventAuthRequests()(psjResult.Server) // derek sends an event in the room - event := psjResult.Server.MustCreateEvent(t, psjResult.ServerRoom, b.Event{ - Type: "m.room.message", - Sender: psjResult.Server.UserID("derek"), - Content: map[string]interface{}{ - "msgtype": "m.text", - "body": "Message", - }, - }) - psjResult.ServerRoom.AddEvent(event) + event := psjResult.CreateMessageEvent(t, "derek", nil) psjResult.Server.MustSendTransaction(t, deployment, "hs1", []json.RawMessage{event.JSON()}, nil) t.Logf("Derek sent event event ID %s", event.EventID()) @@ -620,6 +612,28 @@ func (psj *partialStateJoinResult) Destroy() { } } +// send a message into the room without letting the homeserver under test know about it. +func (psj *partialStateJoinResult) CreateMessageEvent(t *testing.T, senderLocalpart string, prevEventIDs []string) *gomatrixserverlib.Event { + var prevEvents interface{} + if prevEventIDs == nil { + prevEvents = nil + } else { + prevEvents = prevEventIDs + } + + event := psj.Server.MustCreateEvent(t, psj.ServerRoom, b.Event{ + Type: "m.room.message", + Sender: psj.Server.UserID(senderLocalpart), + Content: map[string]interface{}{ + "msgtype": "m.text", + "body": "Message", + }, + PrevEvents: prevEvents, + }) + psj.ServerRoom.AddEvent(event) + return event +} + // wait for a /state_ids request for the test room to arrive func (psj *partialStateJoinResult) AwaitStateIdsRequest(t *testing.T) { psj.fedStateIdsRequestReceivedWaiter.Waitf(t, 5*time.Second, "Waiting for /state_ids request") From 7b483be2268ba534bb4882caa6ae40d7ebf1f4d4 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Thu, 21 Jul 2022 19:53:46 +0100 Subject: [PATCH 02/21] Factor out `testReceiveEventDuringPartialStateJoin()` function `testReceiveEventDuringPartialStateJoin()` sends an event over federation, checks that a client can see it and checks the state at the event. --- ...federation_room_join_partial_state_test.go | 128 ++++++++++-------- 1 file changed, 69 insertions(+), 59 deletions(-) diff --git a/tests/federation_room_join_partial_state_test.go b/tests/federation_room_join_partial_state_test.go index f8ffb8cf..cf4b21c7 100644 --- a/tests/federation_room_join_partial_state_test.go +++ b/tests/federation_room_join_partial_state_test.go @@ -143,66 +143,8 @@ func TestPartialStateJoin(t *testing.T) { // derek sends an event in the room event := psjResult.CreateMessageEvent(t, "derek", nil) - psjResult.Server.MustSendTransaction(t, deployment, "hs1", []json.RawMessage{event.JSON()}, nil) t.Logf("Derek sent event event ID %s", event.EventID()) - - /* TODO: check that a lazy-loading sync can see the event. Currently this doesn't work, because /sync blocks. - * https://github.com/matrix-org/synapse/issues/13146 - alice.MustSyncUntil(t, - client.SyncReq{ - Filter: buildLazyLoadingSyncFilter(nil), - }, - client.SyncTimelineHasEventID(psjResult.ServerRoom.RoomID, event.EventID()), - ) - */ - - // still, Alice should be able to see the event with an /event request. We might have to try it a few times. - start := time.Now() - for { - if time.Since(start) > time.Second { - t.Fatalf("timeout waiting for received event to be visible") - } - res := alice.DoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", psjResult.ServerRoom.RoomID, "event", event.EventID()}) - eventResBody := client.ParseJSON(t, res) - if res.StatusCode == 200 { - t.Logf("Successfully fetched received event %s", event.EventID()) - break - } - if res.StatusCode == 404 && gjson.GetBytes(eventResBody, "errcode").String() == "M_NOT_FOUND" { - t.Logf("Fetching received event failed with M_NOT_FOUND; will retry") - time.Sleep(100 * time.Millisecond) - continue - } - t.Fatalf("GET /event failed with %d: %s", res.StatusCode, string(eventResBody)) - } - - // allow the partial join to complete - psjResult.FinishStateRequest() - alice.MustSyncUntil(t, - client.SyncReq{}, - client.SyncJoinedTo(alice.UserID, psjResult.ServerRoom.RoomID), - ) - - // check the server's idea of the state at the event. We do this by making a `state_ids` request over federation - stateReq := gomatrixserverlib.NewFederationRequest("GET", "hs1", - fmt.Sprintf("/_matrix/federation/v1/state_ids/%s?event_id=%s", - url.PathEscape(psjResult.ServerRoom.RoomID), - url.QueryEscape(event.EventID()), - ), - ) - var respStateIDs gomatrixserverlib.RespStateIDs - if err := psjResult.Server.SendFederationRequest(deployment, stateReq, &respStateIDs); err != nil { - t.Errorf("/state_ids request returned non-200: %s", err) - return - } - var gotState, expectedState []interface{} - for _, ev := range respStateIDs.StateEventIDs { - gotState = append(gotState, ev) - } - for _, ev := range psjResult.ServerRoom.AllCurrentState() { - expectedState = append(expectedState, ev.EventID()) - } - must.CheckOffAll(t, gotState, expectedState) + testReceiveEventDuringPartialStateJoin(t, deployment, alice, psjResult, event) }) // a request to (client-side) /members?at= should block until the (federation) /state request completes @@ -505,6 +447,74 @@ func TestPartialStateJoin(t *testing.T) { }) } +// test reception of an event over federation during a resync +// sends the given event to the homeserver under test, checks that a client can see it and checks +// the state at the event +func testReceiveEventDuringPartialStateJoin( + t *testing.T, deployment *docker.Deployment, alice *client.CSAPI, psjResult partialStateJoinResult, event *gomatrixserverlib.Event, +) { + // send the event to the homeserver + psjResult.Server.MustSendTransaction(t, deployment, "hs1", []json.RawMessage{event.JSON()}, nil) + + /* TODO: check that a lazy-loading sync can see the event. Currently this doesn't work, because /sync blocks. + * https://github.com/matrix-org/synapse/issues/13146 + alice.MustSyncUntil(t, + client.SyncReq{ + Filter: buildLazyLoadingSyncFilter(nil), + }, + client.SyncTimelineHasEventID(psjResult.ServerRoom.RoomID, event.EventID()), + ) + */ + + // still, Alice should be able to see the event with an /event request. We might have to try it a few times. + start := time.Now() + for { + if time.Since(start) > time.Second { + t.Fatalf("timeout waiting for received event to be visible") + } + res := alice.DoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", psjResult.ServerRoom.RoomID, "event", event.EventID()}) + eventResBody := client.ParseJSON(t, res) + if res.StatusCode == 200 { + t.Logf("Successfully fetched received event %s", event.EventID()) + break + } + if res.StatusCode == 404 && gjson.GetBytes(eventResBody, "errcode").String() == "M_NOT_FOUND" { + t.Logf("Fetching received event failed with M_NOT_FOUND; will retry") + time.Sleep(100 * time.Millisecond) + continue + } + t.Fatalf("GET /event failed with %d: %s", res.StatusCode, string(eventResBody)) + } + + // allow the partial join to complete + psjResult.FinishStateRequest() + alice.MustSyncUntil(t, + client.SyncReq{}, + client.SyncJoinedTo(alice.UserID, psjResult.ServerRoom.RoomID), + ) + + // check the server's idea of the state at the event. We do this by making a `state_ids` request over federation + stateReq := gomatrixserverlib.NewFederationRequest("GET", "hs1", + fmt.Sprintf("/_matrix/federation/v1/state_ids/%s?event_id=%s", + url.PathEscape(psjResult.ServerRoom.RoomID), + url.QueryEscape(event.EventID()), + ), + ) + var respStateIDs gomatrixserverlib.RespStateIDs + if err := psjResult.Server.SendFederationRequest(deployment, stateReq, &respStateIDs); err != nil { + t.Errorf("/state_ids request returned non-200: %s", err) + return + } + var gotState, expectedState []interface{} + for _, ev := range respStateIDs.StateEventIDs { + gotState = append(gotState, ev) + } + for _, ev := range psjResult.ServerRoom.AllCurrentState() { + expectedState = append(expectedState, ev.EventID()) + } + must.CheckOffAll(t, gotState, expectedState) +} + // buildLazyLoadingSyncFilter constructs a json-marshalled filter suitable the 'Filter' field of a client.SyncReq func buildLazyLoadingSyncFilter(timelineOptions map[string]interface{}) string { timelineFilter := map[string]interface{}{ From bd2b68828752393bf07e41d8a3fdac89211dae98 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Thu, 21 Jul 2022 19:57:37 +0100 Subject: [PATCH 03/21] Add `handleGetMissingEventsRequests()` function to respond to `/get_misisng_events` requests --- ...federation_room_join_partial_state_test.go | 47 +++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/tests/federation_room_join_partial_state_test.go b/tests/federation_room_join_partial_state_test.go index cf4b21c7..72693e05 100644 --- a/tests/federation_room_join_partial_state_test.go +++ b/tests/federation_room_join_partial_state_test.go @@ -9,6 +9,7 @@ package tests import ( "encoding/json" "fmt" + "io/ioutil" "net/http" "net/url" "strconv" @@ -16,9 +17,11 @@ import ( "testing" "time" + "github.com/gorilla/mux" "github.com/tidwall/gjson" "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" "github.com/matrix-org/complement/internal/b" "github.com/matrix-org/complement/internal/client" @@ -722,6 +725,50 @@ func handleStateRequests( ).Methods("GET") } +// register a handler for `/get_missing_events` requests +func handleGetMissingEventsRequests( + t *testing.T, srv *federation.Server, serverRoom *federation.ServerRoom, + eventsToReturn []*gomatrixserverlib.Event, +) { + srv.Mux().HandleFunc("/_matrix/federation/v1/get_missing_events/{roomID}", func(w http.ResponseWriter, req *http.Request) { + roomID := mux.Vars(req)["roomID"] + if roomID != serverRoom.RoomID { + t.Fatalf("Received unexpected /get_missing_events request for room: %s", roomID) + } + + body, err := ioutil.ReadAll(req.Body) + if err != nil { + t.Fatalf("unable to read request body: %v", err) + } + var getMissingEventsRequest struct { + EarliestEvents []string `json:"earliest_events"` + LatestEvents []string `json:"latest_events"` + Limit int `json:"int"` + MinDepth int `json:"min_depth"` + } + err = json.Unmarshal(body, &getMissingEventsRequest) + if err != nil { + errResp := util.MessageResponse(400, err.Error()) + w.WriteHeader(errResp.Code) + b, _ := json.Marshal(errResp.JSON) + w.Write(b) + return + } + + t.Logf("Incoming get_missing_events request for prev events of %s in room %s", getMissingEventsRequest.LatestEvents, roomID) + + // TODO: return events based on those requested + w.WriteHeader(200) + res := struct { + Events []*gomatrixserverlib.Event `json:"events"` + }{ + Events: eventsToReturn, + } + responseBytes, _ := json.Marshal(&res) + w.Write(responseBytes) + }).Methods("POST") +} + func eventIDsFromEvents(he []*gomatrixserverlib.Event) []string { eventIDs := make([]string, len(he)) for i := range he { From d085f116370e5e144162a6d5ab671f508c2dd257 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Thu, 21 Jul 2022 20:00:13 +0100 Subject: [PATCH 04/21] Test whether the homeserver thinks it has full state at a received event --- ...federation_room_join_partial_state_test.go | 46 +++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/tests/federation_room_join_partial_state_test.go b/tests/federation_room_join_partial_state_test.go index 72693e05..3210c820 100644 --- a/tests/federation_room_join_partial_state_test.go +++ b/tests/federation_room_join_partial_state_test.go @@ -20,6 +20,7 @@ import ( "github.com/gorilla/mux" "github.com/tidwall/gjson" + "github.com/matrix-org/gomatrix" "github.com/matrix-org/gomatrixserverlib" "github.com/matrix-org/util" @@ -489,6 +490,51 @@ func testReceiveEventDuringPartialStateJoin( t.Fatalf("GET /event failed with %d: %s", res.StatusCode, string(eventResBody)) } + // fire off a /state_ids request for the last event. + // it must either: + // * block because the homeserver does not have full state at the last event + // * or 403 because the homeserver does not have full state yet and does not consider the + // Complement homeserver to be in the room + stateIdsResponseChan := make(chan *gomatrixserverlib.RespStateIDs) + defer close(stateIdsResponseChan) + go func() { + stateReq := gomatrixserverlib.NewFederationRequest("GET", "hs1", + fmt.Sprintf("/_matrix/federation/v1/state_ids/%s?event_id=%s", + url.PathEscape(psjResult.ServerRoom.RoomID), + url.QueryEscape(event.EventID()), + ), + ) + var respStateIDs gomatrixserverlib.RespStateIDs + if err := psjResult.Server.SendFederationRequest(deployment, stateReq, &respStateIDs); err != nil { + httpErr, ok := err.(gomatrix.HTTPError) + t.Logf("%v", httpErr) + if ok && httpErr.Code == 403 { + stateIdsResponseChan <- nil + return + } + t.Errorf("/state_ids request returned non-200: %s", err) + return + } + stateIdsResponseChan <- &respStateIDs + }() + + select { + case <-time.After(1 * time.Second): + t.Logf("/state_ids request for event %s blocked as expected", event.EventID()) + defer func() { <-stateIdsResponseChan }() + break + case respStateIDs := <-stateIdsResponseChan: + if respStateIDs == nil { + t.Logf("/state_ids request for event %s returned 403 as expected", event.EventID()) + } else { + // since we have not yet given the homeserver the full state at the join event and allowed + // the partial join to complete, it can't possibly know the full state at the last event. + // While it may be possible for the response to be correct by some accident of state res, + // the homeserver is still wrong in spirit. + t.Fatalf("/state_ids request for event %s did not block when it should have", event.EventID()) + } + } + // allow the partial join to complete psjResult.FinishStateRequest() alice.MustSyncUntil(t, From 2cbeaae959adea9bc532bfeafcb75736fa9499fe Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Thu, 21 Jul 2022 20:01:27 +0100 Subject: [PATCH 05/21] Allow explicitly specified `/state` and `/state_ids` requests to complete --- ...federation_room_join_partial_state_test.go | 30 +++++++++++++++---- 1 file changed, 24 insertions(+), 6 deletions(-) diff --git a/tests/federation_room_join_partial_state_test.go b/tests/federation_room_join_partial_state_test.go index 3210c820..5b3ae3b6 100644 --- a/tests/federation_room_join_partial_state_test.go +++ b/tests/federation_room_join_partial_state_test.go @@ -592,6 +592,8 @@ type partialStateJoinResult struct { ServerRoom *federation.ServerRoom fedStateIdsRequestReceivedWaiter *Waiter fedStateIdsSendResponseWaiter *Waiter + // the set of events for which we will not block `/state` or `/state_ids` requests. + fedStateIdsAllowedEvents map[string]bool } // beginPartialStateJoin spins up a room on a complement server, @@ -627,6 +629,7 @@ func beginPartialStateJoin(t *testing.T, deployment *docker.Deployment, joiningU // some things for orchestration result.fedStateIdsRequestReceivedWaiter = NewWaiter() result.fedStateIdsSendResponseWaiter = NewWaiter() + result.fedStateIdsAllowedEvents = make(map[string]bool) // create the room on the complement server, with charlie and derek as members roomVer := joiningUser.GetDefaultRoomVersion(t) @@ -642,10 +645,17 @@ func beginPartialStateJoin(t *testing.T, deployment *docker.Deployment, joiningU // register a handler for /state_ids requests, which finishes fedStateIdsRequestReceivedWaiter, then // waits for fedStateIdsSendResponseWaiter and sends a reply - handleStateIdsRequests(t, result.Server, result.ServerRoom, result.fedStateIdsRequestReceivedWaiter, result.fedStateIdsSendResponseWaiter) + handleStateIdsRequests( + t, + result.Server, + result.ServerRoom, + result.fedStateIdsRequestReceivedWaiter, + result.fedStateIdsSendResponseWaiter, + result.fedStateIdsAllowedEvents, + ) // a handler for /state requests, which sends a sensible response - handleStateRequests(t, result.Server, result.ServerRoom, nil, nil) + handleStateRequests(t, result.Server, result.ServerRoom, nil, nil, nil) // have joiningUser join the room by room ID. joiningUser.JoinRoom(t, result.ServerRoom.RoomID, []string{result.Server.ServerName()}) @@ -693,6 +703,12 @@ func (psj *partialStateJoinResult) CreateMessageEvent(t *testing.T, senderLocalp return event } +// allow a /state_ids request for a given event to complete before FinishStateRequest has been called. +// only applies to new incoming requests, and not any currently blocked ones. +func (psj *partialStateJoinResult) AllowStateRequestForEvent(eventID string) { + psj.fedStateIdsAllowedEvents[eventID] = true +} + // wait for a /state_ids request for the test room to arrive func (psj *partialStateJoinResult) AwaitStateIdsRequest(t *testing.T) { psj.fedStateIdsRequestReceivedWaiter.Waitf(t, 5*time.Second, "Waiting for /state_ids request") @@ -709,7 +725,7 @@ func (psj *partialStateJoinResult) FinishStateRequest() { // if sendResponseWaiter is not nil, we will Wait() for it to finish before sending the response. func handleStateIdsRequests( t *testing.T, srv *federation.Server, serverRoom *federation.ServerRoom, - requestReceivedWaiter *Waiter, sendResponseWaiter *Waiter, + requestReceivedWaiter *Waiter, sendResponseWaiter *Waiter, allowedEvents map[string]bool, ) { srv.Mux().Handle( fmt.Sprintf("/_matrix/federation/v1/state_ids/%s", serverRoom.RoomID), @@ -719,7 +735,8 @@ func handleStateIdsRequests( if requestReceivedWaiter != nil { requestReceivedWaiter.Finish() } - if sendResponseWaiter != nil { + if !allowedEvents[queryParams["event_id"][0]] && + sendResponseWaiter != nil { sendResponseWaiter.Waitf(t, 60*time.Second, "Waiting for /state_ids request") } t.Logf("Replying to /state_ids request") @@ -744,7 +761,7 @@ func handleStateIdsRequests( // if sendResponseWaiter is not nil, we will Wait() for it to finish before sending the response. func handleStateRequests( t *testing.T, srv *federation.Server, serverRoom *federation.ServerRoom, - requestReceivedWaiter *Waiter, sendResponseWaiter *Waiter, + requestReceivedWaiter *Waiter, sendResponseWaiter *Waiter, allowedEvents map[string]bool, ) { srv.Mux().Handle( fmt.Sprintf("/_matrix/federation/v1/state/%s", serverRoom.RoomID), @@ -754,7 +771,8 @@ func handleStateRequests( if requestReceivedWaiter != nil { requestReceivedWaiter.Finish() } - if sendResponseWaiter != nil { + if !allowedEvents[queryParams["event_id"][0]] && + sendResponseWaiter != nil { sendResponseWaiter.Waitf(t, 60*time.Second, "Waiting for /state request") } res := gomatrixserverlib.RespState{ From 894d755a6cc5f251719f91fd353884230202510f Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Thu, 21 Jul 2022 20:02:28 +0100 Subject: [PATCH 06/21] Add simple test for receiving an event with a missing prev event --- ...federation_room_join_partial_state_test.go | 34 +++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/tests/federation_room_join_partial_state_test.go b/tests/federation_room_join_partial_state_test.go index 5b3ae3b6..c15730cd 100644 --- a/tests/federation_room_join_partial_state_test.go +++ b/tests/federation_room_join_partial_state_test.go @@ -151,6 +151,40 @@ func TestPartialStateJoin(t *testing.T) { testReceiveEventDuringPartialStateJoin(t, deployment, alice, psjResult, event) }) + // we should be able to receive events with missing prevs over federation during the resync + t.Run("CanReceiveEventsWithMissingPrevDuringPartialStateJoin", func(t *testing.T) { + deployment := Deploy(t, b.BlueprintAlice) + defer deployment.Destroy(t) + alice := deployment.Client(t, "hs1", "@alice:hs1") + + psjResult := beginPartialStateJoin(t, deployment, alice) + defer psjResult.Destroy() + + // we construct the following event graph: + // ... <-- M <-- A <-- B + // + // M is @alice:hs1's join event. + // A and B are regular m.room.messsage events sent by @derek from Complement. + // + // initially, hs1 only knows about event M. + // we send only event B to hs1. + eventM := psjResult.ServerRoom.CurrentState("m.room.member", alice.UserID) + eventA := psjResult.CreateMessageEvent(t, "derek", []string{eventM.EventID()}) + eventB := psjResult.CreateMessageEvent(t, "derek", []string{eventA.EventID()}) + t.Logf("%s's m.room.member event is %s", *eventM.StateKey(), eventM.EventID()) + t.Logf("Derek sent event A with ID %s", eventA.EventID()) + t.Logf("Derek sent event B with ID %s", eventB.EventID()) + + // the HS will make an /event_auth request for event A + federation.HandleEventAuthRequests()(psjResult.Server) + + // the HS will make a /get_missing_events request for the missing prev events of event B + handleGetMissingEventsRequests(t, psjResult.Server, psjResult.ServerRoom, []*gomatrixserverlib.Event{eventA}) + + // send event B to hs1 + testReceiveEventDuringPartialStateJoin(t, deployment, alice, psjResult, eventB) + }) + // a request to (client-side) /members?at= should block until the (federation) /state request completes // TODO(faster_joins): also need to test /state, and /members without an `at`, which follow a different path t.Run("MembersRequestBlocksDuringPartialStateJoin", func(t *testing.T) { From 06439794e269cd40c7cbd9057d0120dde905e887 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Thu, 21 Jul 2022 20:02:58 +0100 Subject: [PATCH 07/21] Add a test for receiving an event with one missing and one known prev event --- ...federation_room_join_partial_state_test.go | 36 +++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/tests/federation_room_join_partial_state_test.go b/tests/federation_room_join_partial_state_test.go index c15730cd..b522248f 100644 --- a/tests/federation_room_join_partial_state_test.go +++ b/tests/federation_room_join_partial_state_test.go @@ -185,6 +185,42 @@ func TestPartialStateJoin(t *testing.T) { testReceiveEventDuringPartialStateJoin(t, deployment, alice, psjResult, eventB) }) + // we should be able to receive events with partially missing prevs over federation during the resync + t.Run("CanReceiveEventsWithHalfMissingPrevsDuringPartialStateJoin", func(t *testing.T) { + deployment := Deploy(t, b.BlueprintAlice) + defer deployment.Destroy(t) + alice := deployment.Client(t, "hs1", "@alice:hs1") + + psjResult := beginPartialStateJoin(t, deployment, alice) + defer psjResult.Destroy() + + // we construct the following event graph: + // +---------+ + // v \ + // ... <-- M <-- A <-- B + // + // M is @alice:hs1's join event. + // A and B are regular m.room.messsage events sent by @derek from Complement. + // + // initially, hs1 only knows about event M. + // we send only event B to hs1. + eventM := psjResult.ServerRoom.CurrentState("m.room.member", alice.UserID) + eventA := psjResult.CreateMessageEvent(t, "derek", []string{eventM.EventID()}) + eventB := psjResult.CreateMessageEvent(t, "derek", []string{eventA.EventID(), eventM.EventID()}) + t.Logf("%s's m.room.member event is %s", *eventM.StateKey(), eventM.EventID()) + t.Logf("Derek sent event A with ID %s", eventA.EventID()) + t.Logf("Derek sent event B with ID %s", eventB.EventID()) + + // the HS will make an /event_auth request for event A + federation.HandleEventAuthRequests()(psjResult.Server) + + // the HS will make a /get_missing_events request for the missing prev event of event B + handleGetMissingEventsRequests(t, psjResult.Server, psjResult.ServerRoom, []*gomatrixserverlib.Event{eventA}) + + // send event B to hs1 + testReceiveEventDuringPartialStateJoin(t, deployment, alice, psjResult, eventB) + }) + // a request to (client-side) /members?at= should block until the (federation) /state request completes // TODO(faster_joins): also need to test /state, and /members without an `at`, which follow a different path t.Run("MembersRequestBlocksDuringPartialStateJoin", func(t *testing.T) { From 5e385285a4e2a8f650863288f56179b62bfa2baf Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Thu, 21 Jul 2022 20:03:22 +0100 Subject: [PATCH 08/21] Add a test for receiving an event with a missing prev event, with one missing and one known prev event --- ...federation_room_join_partial_state_test.go | 37 +++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/tests/federation_room_join_partial_state_test.go b/tests/federation_room_join_partial_state_test.go index b522248f..9f1ea3cd 100644 --- a/tests/federation_room_join_partial_state_test.go +++ b/tests/federation_room_join_partial_state_test.go @@ -221,6 +221,43 @@ func TestPartialStateJoin(t *testing.T) { testReceiveEventDuringPartialStateJoin(t, deployment, alice, psjResult, eventB) }) + // we should be able to receive events with missing prevs over federation during the resync + t.Run("CanReceiveEventsWithMissingPrevWithHalfMissingPrevsDuringPartialStateJoin", func(t *testing.T) { + deployment := Deploy(t, b.BlueprintAlice) + defer deployment.Destroy(t) + alice := deployment.Client(t, "hs1", "@alice:hs1") + + psjResult := beginPartialStateJoin(t, deployment, alice) + defer psjResult.Destroy() + + // we construct the following event graph: + // +---------+ + // v \ + // ... <-- M <-- A <-- B <-- C + // + // M is @alice:hs1's join event. + // A, B and C are regular m.room.messsage events sent by @derek from Complement. + // + // initially, hs1 only knows about event M. + // we send only event C to hs1. + eventM := psjResult.ServerRoom.CurrentState("m.room.member", alice.UserID) + eventA := psjResult.CreateMessageEvent(t, "derek", []string{eventM.EventID()}) + eventB := psjResult.CreateMessageEvent(t, "derek", []string{eventA.EventID(), eventM.EventID()}) + eventC := psjResult.CreateMessageEvent(t, "derek", []string{eventB.EventID()}) + t.Logf("%s's m.room.member event is %s", *eventM.StateKey(), eventM.EventID()) + t.Logf("Derek sent event A with ID %s", eventA.EventID()) + t.Logf("Derek sent event B with ID %s", eventB.EventID()) + t.Logf("Derek sent event C with ID %s", eventC.EventID()) + psjResult.AllowStateRequestForEvent(eventA.EventID()) + + // the HS will make a /get_missing_events request for the missing prev event of event C, + // to which we respond with event B only. + handleGetMissingEventsRequests(t, psjResult.Server, psjResult.ServerRoom, []*gomatrixserverlib.Event{eventB}) + + // send event C to hs1 + testReceiveEventDuringPartialStateJoin(t, deployment, alice, psjResult, eventC) + }) + // a request to (client-side) /members?at= should block until the (federation) /state request completes // TODO(faster_joins): also need to test /state, and /members without an `at`, which follow a different path t.Run("MembersRequestBlocksDuringPartialStateJoin", func(t *testing.T) { From 2979d2485fdb62eeca463cdcd46c5598520dda55 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Fri, 22 Jul 2022 13:32:22 +0100 Subject: [PATCH 09/21] fixup: s/sent/created/ --- ...federation_room_join_partial_state_test.go | 25 ++++++++++--------- 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/tests/federation_room_join_partial_state_test.go b/tests/federation_room_join_partial_state_test.go index 9f1ea3cd..c1868558 100644 --- a/tests/federation_room_join_partial_state_test.go +++ b/tests/federation_room_join_partial_state_test.go @@ -145,9 +145,10 @@ func TestPartialStateJoin(t *testing.T) { // the HS will make an /event_auth request for the event federation.HandleEventAuthRequests()(psjResult.Server) - // derek sends an event in the room event := psjResult.CreateMessageEvent(t, "derek", nil) - t.Logf("Derek sent event event ID %s", event.EventID()) + t.Logf("Derek created event with ID %s", event.EventID()) + + // derek sends an event in the room testReceiveEventDuringPartialStateJoin(t, deployment, alice, psjResult, event) }) @@ -164,7 +165,7 @@ func TestPartialStateJoin(t *testing.T) { // ... <-- M <-- A <-- B // // M is @alice:hs1's join event. - // A and B are regular m.room.messsage events sent by @derek from Complement. + // A and B are regular m.room.messsage events created by @derek on the Complement homeserver. // // initially, hs1 only knows about event M. // we send only event B to hs1. @@ -172,8 +173,8 @@ func TestPartialStateJoin(t *testing.T) { eventA := psjResult.CreateMessageEvent(t, "derek", []string{eventM.EventID()}) eventB := psjResult.CreateMessageEvent(t, "derek", []string{eventA.EventID()}) t.Logf("%s's m.room.member event is %s", *eventM.StateKey(), eventM.EventID()) - t.Logf("Derek sent event A with ID %s", eventA.EventID()) - t.Logf("Derek sent event B with ID %s", eventB.EventID()) + t.Logf("Derek created event A with ID %s", eventA.EventID()) + t.Logf("Derek created event B with ID %s", eventB.EventID()) // the HS will make an /event_auth request for event A federation.HandleEventAuthRequests()(psjResult.Server) @@ -200,7 +201,7 @@ func TestPartialStateJoin(t *testing.T) { // ... <-- M <-- A <-- B // // M is @alice:hs1's join event. - // A and B are regular m.room.messsage events sent by @derek from Complement. + // A and B are regular m.room.messsage events created by @derek on the Complement homeserver. // // initially, hs1 only knows about event M. // we send only event B to hs1. @@ -208,8 +209,8 @@ func TestPartialStateJoin(t *testing.T) { eventA := psjResult.CreateMessageEvent(t, "derek", []string{eventM.EventID()}) eventB := psjResult.CreateMessageEvent(t, "derek", []string{eventA.EventID(), eventM.EventID()}) t.Logf("%s's m.room.member event is %s", *eventM.StateKey(), eventM.EventID()) - t.Logf("Derek sent event A with ID %s", eventA.EventID()) - t.Logf("Derek sent event B with ID %s", eventB.EventID()) + t.Logf("Derek created event A with ID %s", eventA.EventID()) + t.Logf("Derek created event B with ID %s", eventB.EventID()) // the HS will make an /event_auth request for event A federation.HandleEventAuthRequests()(psjResult.Server) @@ -236,7 +237,7 @@ func TestPartialStateJoin(t *testing.T) { // ... <-- M <-- A <-- B <-- C // // M is @alice:hs1's join event. - // A, B and C are regular m.room.messsage events sent by @derek from Complement. + // A, B and C are regular m.room.messsage events created by @derek on the Complement homeserver. // // initially, hs1 only knows about event M. // we send only event C to hs1. @@ -245,9 +246,9 @@ func TestPartialStateJoin(t *testing.T) { eventB := psjResult.CreateMessageEvent(t, "derek", []string{eventA.EventID(), eventM.EventID()}) eventC := psjResult.CreateMessageEvent(t, "derek", []string{eventB.EventID()}) t.Logf("%s's m.room.member event is %s", *eventM.StateKey(), eventM.EventID()) - t.Logf("Derek sent event A with ID %s", eventA.EventID()) - t.Logf("Derek sent event B with ID %s", eventB.EventID()) - t.Logf("Derek sent event C with ID %s", eventC.EventID()) + t.Logf("Derek created event A with ID %s", eventA.EventID()) + t.Logf("Derek created event B with ID %s", eventB.EventID()) + t.Logf("Derek created event C with ID %s", eventC.EventID()) psjResult.AllowStateRequestForEvent(eventA.EventID()) // the HS will make a /get_missing_events request for the missing prev event of event C, From a5b3c71af8d96e64366271bfcd5249e2e3c08855 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Fri, 22 Jul 2022 13:42:57 +0100 Subject: [PATCH 10/21] fixup: define StateIDsResult struct and send /state_id failures down the channel too --- ...federation_room_join_partial_state_test.go | 35 +++++++++++-------- 1 file changed, 20 insertions(+), 15 deletions(-) diff --git a/tests/federation_room_join_partial_state_test.go b/tests/federation_room_join_partial_state_test.go index c1868558..be00931d 100644 --- a/tests/federation_room_join_partial_state_test.go +++ b/tests/federation_room_join_partial_state_test.go @@ -603,8 +603,13 @@ func testReceiveEventDuringPartialStateJoin( // * block because the homeserver does not have full state at the last event // * or 403 because the homeserver does not have full state yet and does not consider the // Complement homeserver to be in the room - stateIdsResponseChan := make(chan *gomatrixserverlib.RespStateIDs) - defer close(stateIdsResponseChan) + + type StateIDsResult struct { + RespStateIDs gomatrixserverlib.RespStateIDs + Error error + } + stateIdsResultChan := make(chan StateIDsResult) + defer close(stateIdsResultChan) go func() { stateReq := gomatrixserverlib.NewFederationRequest("GET", "hs1", fmt.Sprintf("/_matrix/federation/v1/state_ids/%s?event_id=%s", @@ -614,26 +619,26 @@ func testReceiveEventDuringPartialStateJoin( ) var respStateIDs gomatrixserverlib.RespStateIDs if err := psjResult.Server.SendFederationRequest(deployment, stateReq, &respStateIDs); err != nil { - httpErr, ok := err.(gomatrix.HTTPError) - t.Logf("%v", httpErr) - if ok && httpErr.Code == 403 { - stateIdsResponseChan <- nil - return - } - t.Errorf("/state_ids request returned non-200: %s", err) - return + stateIdsResultChan <- StateIDsResult{Error: err} + } else { + stateIdsResultChan <- StateIDsResult{RespStateIDs: respStateIDs} } - stateIdsResponseChan <- &respStateIDs }() select { case <-time.After(1 * time.Second): t.Logf("/state_ids request for event %s blocked as expected", event.EventID()) - defer func() { <-stateIdsResponseChan }() + defer func() { <-stateIdsResultChan }() break - case respStateIDs := <-stateIdsResponseChan: - if respStateIDs == nil { - t.Logf("/state_ids request for event %s returned 403 as expected", event.EventID()) + case stateIDsResult := <-stateIdsResultChan: + if stateIDsResult.Error != nil { + httpErr, ok := stateIDsResult.Error.(gomatrix.HTTPError) + t.Logf("%v", httpErr) + if ok && httpErr.Code == 403 { + t.Logf("/state_ids request for event %s returned 403 as expected", event.EventID()) + } else { + t.Errorf("/state_ids request returned non-200: %s", stateIDsResult.Error) + } } else { // since we have not yet given the homeserver the full state at the join event and allowed // the partial join to complete, it can't possibly know the full state at the last event. From 8f5b9300d8a6c12adc33fee321b6db3c95a3ddaa Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Fri, 22 Jul 2022 13:45:46 +0100 Subject: [PATCH 11/21] fixup: rename new tests to use parents/grandparents terminology --- tests/federation_room_join_partial_state_test.go | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/tests/federation_room_join_partial_state_test.go b/tests/federation_room_join_partial_state_test.go index be00931d..2652207f 100644 --- a/tests/federation_room_join_partial_state_test.go +++ b/tests/federation_room_join_partial_state_test.go @@ -152,8 +152,8 @@ func TestPartialStateJoin(t *testing.T) { testReceiveEventDuringPartialStateJoin(t, deployment, alice, psjResult, event) }) - // we should be able to receive events with missing prevs over federation during the resync - t.Run("CanReceiveEventsWithMissingPrevDuringPartialStateJoin", func(t *testing.T) { + // we should be able to receive events with a missing prev event over federation during the resync + t.Run("CanReceiveEventsWithMissingParentsDuringPartialStateJoin", func(t *testing.T) { deployment := Deploy(t, b.BlueprintAlice) defer deployment.Destroy(t) alice := deployment.Client(t, "hs1", "@alice:hs1") @@ -186,8 +186,8 @@ func TestPartialStateJoin(t *testing.T) { testReceiveEventDuringPartialStateJoin(t, deployment, alice, psjResult, eventB) }) - // we should be able to receive events with partially missing prevs over federation during the resync - t.Run("CanReceiveEventsWithHalfMissingPrevsDuringPartialStateJoin", func(t *testing.T) { + // we should be able to receive events with partially missing prev events over federation during the resync + t.Run("CanReceiveEventsWithHalfMissingParentsDuringPartialStateJoin", func(t *testing.T) { deployment := Deploy(t, b.BlueprintAlice) defer deployment.Destroy(t) alice := deployment.Client(t, "hs1", "@alice:hs1") @@ -222,8 +222,9 @@ func TestPartialStateJoin(t *testing.T) { testReceiveEventDuringPartialStateJoin(t, deployment, alice, psjResult, eventB) }) - // we should be able to receive events with missing prevs over federation during the resync - t.Run("CanReceiveEventsWithMissingPrevWithHalfMissingPrevsDuringPartialStateJoin", func(t *testing.T) { + // we should be able to receive events with a missing prev event, with half missing prev events, + // over federation during the resync + t.Run("CanReceiveEventsWithHalfMissingGrandparentsDuringPartialStateJoin", func(t *testing.T) { deployment := Deploy(t, b.BlueprintAlice) defer deployment.Destroy(t) alice := deployment.Client(t, "hs1", "@alice:hs1") From 49e35b44e6357005cc400aab5417c3982b018d8d Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Fri, 22 Jul 2022 13:51:22 +0100 Subject: [PATCH 12/21] Revert "Allow explicitly specified `/state` and `/state_ids` requests to complete" This reverts commit 2cbeaae959adea9bc532bfeafcb75736fa9499fe. --- ...federation_room_join_partial_state_test.go | 30 ++++--------------- 1 file changed, 6 insertions(+), 24 deletions(-) diff --git a/tests/federation_room_join_partial_state_test.go b/tests/federation_room_join_partial_state_test.go index 2652207f..4072ac19 100644 --- a/tests/federation_room_join_partial_state_test.go +++ b/tests/federation_room_join_partial_state_test.go @@ -706,8 +706,6 @@ type partialStateJoinResult struct { ServerRoom *federation.ServerRoom fedStateIdsRequestReceivedWaiter *Waiter fedStateIdsSendResponseWaiter *Waiter - // the set of events for which we will not block `/state` or `/state_ids` requests. - fedStateIdsAllowedEvents map[string]bool } // beginPartialStateJoin spins up a room on a complement server, @@ -743,7 +741,6 @@ func beginPartialStateJoin(t *testing.T, deployment *docker.Deployment, joiningU // some things for orchestration result.fedStateIdsRequestReceivedWaiter = NewWaiter() result.fedStateIdsSendResponseWaiter = NewWaiter() - result.fedStateIdsAllowedEvents = make(map[string]bool) // create the room on the complement server, with charlie and derek as members roomVer := joiningUser.GetDefaultRoomVersion(t) @@ -759,17 +756,10 @@ func beginPartialStateJoin(t *testing.T, deployment *docker.Deployment, joiningU // register a handler for /state_ids requests, which finishes fedStateIdsRequestReceivedWaiter, then // waits for fedStateIdsSendResponseWaiter and sends a reply - handleStateIdsRequests( - t, - result.Server, - result.ServerRoom, - result.fedStateIdsRequestReceivedWaiter, - result.fedStateIdsSendResponseWaiter, - result.fedStateIdsAllowedEvents, - ) + handleStateIdsRequests(t, result.Server, result.ServerRoom, result.fedStateIdsRequestReceivedWaiter, result.fedStateIdsSendResponseWaiter) // a handler for /state requests, which sends a sensible response - handleStateRequests(t, result.Server, result.ServerRoom, nil, nil, nil) + handleStateRequests(t, result.Server, result.ServerRoom, nil, nil) // have joiningUser join the room by room ID. joiningUser.JoinRoom(t, result.ServerRoom.RoomID, []string{result.Server.ServerName()}) @@ -817,12 +807,6 @@ func (psj *partialStateJoinResult) CreateMessageEvent(t *testing.T, senderLocalp return event } -// allow a /state_ids request for a given event to complete before FinishStateRequest has been called. -// only applies to new incoming requests, and not any currently blocked ones. -func (psj *partialStateJoinResult) AllowStateRequestForEvent(eventID string) { - psj.fedStateIdsAllowedEvents[eventID] = true -} - // wait for a /state_ids request for the test room to arrive func (psj *partialStateJoinResult) AwaitStateIdsRequest(t *testing.T) { psj.fedStateIdsRequestReceivedWaiter.Waitf(t, 5*time.Second, "Waiting for /state_ids request") @@ -839,7 +823,7 @@ func (psj *partialStateJoinResult) FinishStateRequest() { // if sendResponseWaiter is not nil, we will Wait() for it to finish before sending the response. func handleStateIdsRequests( t *testing.T, srv *federation.Server, serverRoom *federation.ServerRoom, - requestReceivedWaiter *Waiter, sendResponseWaiter *Waiter, allowedEvents map[string]bool, + requestReceivedWaiter *Waiter, sendResponseWaiter *Waiter, ) { srv.Mux().Handle( fmt.Sprintf("/_matrix/federation/v1/state_ids/%s", serverRoom.RoomID), @@ -849,8 +833,7 @@ func handleStateIdsRequests( if requestReceivedWaiter != nil { requestReceivedWaiter.Finish() } - if !allowedEvents[queryParams["event_id"][0]] && - sendResponseWaiter != nil { + if sendResponseWaiter != nil { sendResponseWaiter.Waitf(t, 60*time.Second, "Waiting for /state_ids request") } t.Logf("Replying to /state_ids request") @@ -875,7 +858,7 @@ func handleStateIdsRequests( // if sendResponseWaiter is not nil, we will Wait() for it to finish before sending the response. func handleStateRequests( t *testing.T, srv *federation.Server, serverRoom *federation.ServerRoom, - requestReceivedWaiter *Waiter, sendResponseWaiter *Waiter, allowedEvents map[string]bool, + requestReceivedWaiter *Waiter, sendResponseWaiter *Waiter, ) { srv.Mux().Handle( fmt.Sprintf("/_matrix/federation/v1/state/%s", serverRoom.RoomID), @@ -885,8 +868,7 @@ func handleStateRequests( if requestReceivedWaiter != nil { requestReceivedWaiter.Finish() } - if !allowedEvents[queryParams["event_id"][0]] && - sendResponseWaiter != nil { + if sendResponseWaiter != nil { sendResponseWaiter.Waitf(t, 60*time.Second, "Waiting for /state request") } res := gomatrixserverlib.RespState{ From 1adf5aae17dcfb1a0e4221278c33fdee90e71e36 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 21 Jul 2022 18:48:21 +0100 Subject: [PATCH 13/21] Faster joins tests: make request handlers more specific I'm going to add a test which will involve *multiple* /state and /state_ids requests, so we need to make the registered handlers more selective. --- ...federation_room_join_partial_state_test.go | 46 +++++++++++++------ 1 file changed, 33 insertions(+), 13 deletions(-) diff --git a/tests/federation_room_join_partial_state_test.go b/tests/federation_room_join_partial_state_test.go index 4072ac19..c3f48297 100644 --- a/tests/federation_room_join_partial_state_test.go +++ b/tests/federation_room_join_partial_state_test.go @@ -754,12 +754,23 @@ func beginPartialStateJoin(t *testing.T, deployment *docker.Deployment, joiningU }, })) - // register a handler for /state_ids requests, which finishes fedStateIdsRequestReceivedWaiter, then + // register a handler for /state_ids requests for the most recent event, + // which finishes fedStateIdsRequestReceivedWaiter, then // waits for fedStateIdsSendResponseWaiter and sends a reply - handleStateIdsRequests(t, result.Server, result.ServerRoom, result.fedStateIdsRequestReceivedWaiter, result.fedStateIdsSendResponseWaiter) + lastEvent := result.ServerRoom.Timeline[len(result.ServerRoom.Timeline)-1] + currentState := result.ServerRoom.AllCurrentState() + handleStateIdsRequests( + t, result.Server, result.ServerRoom, + lastEvent.EventID(), currentState, + result.fedStateIdsRequestReceivedWaiter, result.fedStateIdsSendResponseWaiter, + ) // a handler for /state requests, which sends a sensible response - handleStateRequests(t, result.Server, result.ServerRoom, nil, nil) + handleStateRequests( + t, result.Server, result.ServerRoom, + lastEvent.EventID(), currentState, + nil, nil, + ) // have joiningUser join the room by room ID. joiningUser.JoinRoom(t, result.ServerRoom.RoomID, []string{result.Server.ServerName()}) @@ -817,16 +828,20 @@ func (psj *partialStateJoinResult) FinishStateRequest() { psj.fedStateIdsSendResponseWaiter.Finish() } -// handleStateIdsRequests registers a handler for /state_ids requests for serverRoom. +// handleStateIdsRequests registers a handler for /state_ids requests for 'eventID' +// +// the returned state is as passed in 'roomState' // // if requestReceivedWaiter is not nil, it will be Finish()ed when the request arrives. // if sendResponseWaiter is not nil, we will Wait() for it to finish before sending the response. func handleStateIdsRequests( t *testing.T, srv *federation.Server, serverRoom *federation.ServerRoom, + eventID string, roomState []*gomatrixserverlib.Event, requestReceivedWaiter *Waiter, sendResponseWaiter *Waiter, ) { - srv.Mux().Handle( + srv.Mux().NewRoute().Methods("GET").Path( fmt.Sprintf("/_matrix/federation/v1/state_ids/%s", serverRoom.RoomID), + ).Queries("event_id", eventID).Handler( http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { queryParams := req.URL.Query() t.Logf("Incoming state_ids request for event %s in room %s", queryParams["event_id"], serverRoom.RoomID) @@ -839,8 +854,8 @@ func handleStateIdsRequests( t.Logf("Replying to /state_ids request") res := gomatrixserverlib.RespStateIDs{ - AuthEventIDs: eventIDsFromEvents(serverRoom.AuthChain()), - StateEventIDs: eventIDsFromEvents(serverRoom.AllCurrentState()), + AuthEventIDs: eventIDsFromEvents(serverRoom.AuthChainForEvents(roomState)), + StateEventIDs: eventIDsFromEvents(roomState), } w.WriteHeader(200) jsonb, _ := json.Marshal(res) @@ -849,19 +864,24 @@ func handleStateIdsRequests( t.Errorf("Error writing to request: %v", err) } }), - ).Methods("GET") + ) + t.Logf("Registered state_ids handler for event %s", eventID) } -// makeStateHandler returns a handler for /state requests for serverRoom. +// makeStateHandler returns a handler for /state requests for 'eventID' +// +// the returned state is as passed in 'roomState' // // if requestReceivedWaiter is not nil, it will be Finish()ed when the request arrives. // if sendResponseWaiter is not nil, we will Wait() for it to finish before sending the response. func handleStateRequests( t *testing.T, srv *federation.Server, serverRoom *federation.ServerRoom, + eventID string, roomState []*gomatrixserverlib.Event, requestReceivedWaiter *Waiter, sendResponseWaiter *Waiter, ) { - srv.Mux().Handle( + srv.Mux().NewRoute().Methods("GET").Path( fmt.Sprintf("/_matrix/federation/v1/state/%s", serverRoom.RoomID), + ).Queries("event_id", eventID).Handler( http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { queryParams := req.URL.Query() t.Logf("Incoming state request for event %s in room %s", queryParams["event_id"], serverRoom.RoomID) @@ -872,8 +892,8 @@ func handleStateRequests( sendResponseWaiter.Waitf(t, 60*time.Second, "Waiting for /state request") } res := gomatrixserverlib.RespState{ - AuthEvents: gomatrixserverlib.NewEventJSONsFromEvents(serverRoom.AuthChain()), - StateEvents: gomatrixserverlib.NewEventJSONsFromEvents(serverRoom.AllCurrentState()), + AuthEvents: gomatrixserverlib.NewEventJSONsFromEvents(serverRoom.AuthChainForEvents(roomState)), + StateEvents: gomatrixserverlib.NewEventJSONsFromEvents(roomState), } w.WriteHeader(200) jsonb, _ := json.Marshal(res) @@ -882,7 +902,7 @@ func handleStateRequests( t.Errorf("Error writing to request: %v", err) } }), - ).Methods("GET") + ) } // register a handler for `/get_missing_events` requests From 74dc0571464ccae1da1d091dddd44b0b0be85fa6 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Fri, 22 Jul 2022 14:09:28 +0100 Subject: [PATCH 14/21] fixup: Add explicit /state_id and /state handlers instead --- tests/federation_room_join_partial_state_test.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/federation_room_join_partial_state_test.go b/tests/federation_room_join_partial_state_test.go index c3f48297..d78e8e2d 100644 --- a/tests/federation_room_join_partial_state_test.go +++ b/tests/federation_room_join_partial_state_test.go @@ -250,12 +250,15 @@ func TestPartialStateJoin(t *testing.T) { t.Logf("Derek created event A with ID %s", eventA.EventID()) t.Logf("Derek created event B with ID %s", eventB.EventID()) t.Logf("Derek created event C with ID %s", eventC.EventID()) - psjResult.AllowStateRequestForEvent(eventA.EventID()) // the HS will make a /get_missing_events request for the missing prev event of event C, // to which we respond with event B only. handleGetMissingEventsRequests(t, psjResult.Server, psjResult.ServerRoom, []*gomatrixserverlib.Event{eventB}) + // dedicated state_ids and state handlers for event A + handleStateIdsRequests(t, psjResult.Server, psjResult.ServerRoom, eventA.EventID(), psjResult.ServerRoom.AllCurrentState(), nil, nil) + handleStateRequests(t, psjResult.Server, psjResult.ServerRoom, eventA.EventID(), psjResult.ServerRoom.AllCurrentState(), nil, nil) + // send event C to hs1 testReceiveEventDuringPartialStateJoin(t, deployment, alice, psjResult, eventC) }) From 5a9832ff257a6e3dea2ae1b7ddfcc80650c1d5fa Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Fri, 22 Jul 2022 14:10:07 +0100 Subject: [PATCH 15/21] Log which /state_ids request is being replied to --- tests/federation_room_join_partial_state_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/federation_room_join_partial_state_test.go b/tests/federation_room_join_partial_state_test.go index d78e8e2d..78cc8317 100644 --- a/tests/federation_room_join_partial_state_test.go +++ b/tests/federation_room_join_partial_state_test.go @@ -854,7 +854,7 @@ func handleStateIdsRequests( if sendResponseWaiter != nil { sendResponseWaiter.Waitf(t, 60*time.Second, "Waiting for /state_ids request") } - t.Logf("Replying to /state_ids request") + t.Logf("Replying to /state_ids request for event %s", queryParams["event_id"]) res := gomatrixserverlib.RespStateIDs{ AuthEventIDs: eventIDsFromEvents(serverRoom.AuthChainForEvents(roomState)), From d437bfb860c61d056bfe1dfed751471d019c269d Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Fri, 22 Jul 2022 14:59:30 +0100 Subject: [PATCH 16/21] fixup: explain purpose of deferred channel read --- tests/federation_room_join_partial_state_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/federation_room_join_partial_state_test.go b/tests/federation_room_join_partial_state_test.go index 78cc8317..657ad5ee 100644 --- a/tests/federation_room_join_partial_state_test.go +++ b/tests/federation_room_join_partial_state_test.go @@ -632,6 +632,8 @@ func testReceiveEventDuringPartialStateJoin( select { case <-time.After(1 * time.Second): t.Logf("/state_ids request for event %s blocked as expected", event.EventID()) + // read from the channel and discard the result, so that the goroutine above doesn't block + // indefinitely on the send defer func() { <-stateIdsResultChan }() break case stateIDsResult := <-stateIdsResultChan: From 8abc0e8cbd9bb8293a49e1e88e9f48b06ada4b80 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Fri, 22 Jul 2022 16:51:20 +0100 Subject: [PATCH 17/21] fixup: link to synapse#13288 --- tests/federation_room_join_partial_state_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/federation_room_join_partial_state_test.go b/tests/federation_room_join_partial_state_test.go index 657ad5ee..3429d982 100644 --- a/tests/federation_room_join_partial_state_test.go +++ b/tests/federation_room_join_partial_state_test.go @@ -607,6 +607,8 @@ func testReceiveEventDuringPartialStateJoin( // * block because the homeserver does not have full state at the last event // * or 403 because the homeserver does not have full state yet and does not consider the // Complement homeserver to be in the room + // Synapse's behaviour will likely change once https://github.com/matrix-org/synapse/issues/13288 + // is resolved. type StateIDsResult struct { RespStateIDs gomatrixserverlib.RespStateIDs From 98907c0ff3b154a275913a869d506b9387182e3c Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Mon, 25 Jul 2022 20:17:46 +0100 Subject: [PATCH 18/21] fixup: Use WithRetryUntil to retry /event --- ...federation_room_join_partial_state_test.go | 32 ++++++++----------- 1 file changed, 14 insertions(+), 18 deletions(-) diff --git a/tests/federation_room_join_partial_state_test.go b/tests/federation_room_join_partial_state_test.go index 3429d982..d4099a41 100644 --- a/tests/federation_room_join_partial_state_test.go +++ b/tests/federation_room_join_partial_state_test.go @@ -583,24 +583,20 @@ func testReceiveEventDuringPartialStateJoin( */ // still, Alice should be able to see the event with an /event request. We might have to try it a few times. - start := time.Now() - for { - if time.Since(start) > time.Second { - t.Fatalf("timeout waiting for received event to be visible") - } - res := alice.DoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", psjResult.ServerRoom.RoomID, "event", event.EventID()}) - eventResBody := client.ParseJSON(t, res) - if res.StatusCode == 200 { - t.Logf("Successfully fetched received event %s", event.EventID()) - break - } - if res.StatusCode == 404 && gjson.GetBytes(eventResBody, "errcode").String() == "M_NOT_FOUND" { - t.Logf("Fetching received event failed with M_NOT_FOUND; will retry") - time.Sleep(100 * time.Millisecond) - continue - } - t.Fatalf("GET /event failed with %d: %s", res.StatusCode, string(eventResBody)) - } + alice.DoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", psjResult.ServerRoom.RoomID, "event", event.EventID()}, + client.WithRetryUntil(time.Second, func(res *http.Response) bool { + if res.StatusCode == 200 { + return true + } + eventResBody := client.ParseJSON(t, res) + if res.StatusCode == 404 && gjson.GetBytes(eventResBody, "errcode").String() == "M_NOT_FOUND" { + return false + } + t.Fatalf("GET /event failed with %d: %s", res.StatusCode, string(eventResBody)) + return false + }), + ) + t.Logf("Successfully fetched received event %s", event.EventID()) // fire off a /state_ids request for the last event. // it must either: From 47ce7bae002f87aee5a4aeee12a2391d21caa297 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Mon, 25 Jul 2022 22:43:25 +0100 Subject: [PATCH 19/21] Use Context.WithTimeout instead of a SendFederationRequest goroutine --- internal/federation/server.go | 4 +- ...federation_room_join_partial_state_test.go | 71 +++++++------------ tests/federation_room_join_test.go | 2 +- 3 files changed, 29 insertions(+), 48 deletions(-) diff --git a/internal/federation/server.go b/internal/federation/server.go index 02fb7de1..f20629e3 100644 --- a/internal/federation/server.go +++ b/internal/federation/server.go @@ -213,7 +213,7 @@ func (s *Server) MustSendTransaction(t *testing.T, deployment *docker.Deployment // SendFederationRequest signs and sends an arbitrary federation request from this server. // // The requests will be routed according to the deployment map in `deployment`. -func (s *Server) SendFederationRequest(deployment *docker.Deployment, req gomatrixserverlib.FederationRequest, resBody interface{}) error { +func (s *Server) SendFederationRequest(ctx context.Context, deployment *docker.Deployment, req gomatrixserverlib.FederationRequest, resBody interface{}) error { if err := req.Sign(gomatrixserverlib.ServerName(s.serverName), s.KeyID, s.Priv); err != nil { return err } @@ -224,7 +224,7 @@ func (s *Server) SendFederationRequest(deployment *docker.Deployment, req gomatr } httpClient := gomatrixserverlib.NewClient(gomatrixserverlib.WithTransport(&docker.RoundTripper{Deployment: deployment})) - return httpClient.DoRequestAndParseResponse(context.Background(), httpReq, resBody) + return httpClient.DoRequestAndParseResponse(ctx, httpReq, resBody) } // MustCreateEvent will create and sign a new latest event for the given room. diff --git a/tests/federation_room_join_partial_state_test.go b/tests/federation_room_join_partial_state_test.go index d4099a41..1bc4e649 100644 --- a/tests/federation_room_join_partial_state_test.go +++ b/tests/federation_room_join_partial_state_test.go @@ -7,9 +7,11 @@ package tests import ( + "context" "encoding/json" "fmt" "io/ioutil" + "net" "net/http" "net/url" "strconv" @@ -606,50 +608,30 @@ func testReceiveEventDuringPartialStateJoin( // Synapse's behaviour will likely change once https://github.com/matrix-org/synapse/issues/13288 // is resolved. - type StateIDsResult struct { - RespStateIDs gomatrixserverlib.RespStateIDs - Error error - } - stateIdsResultChan := make(chan StateIDsResult) - defer close(stateIdsResultChan) - go func() { - stateReq := gomatrixserverlib.NewFederationRequest("GET", "hs1", - fmt.Sprintf("/_matrix/federation/v1/state_ids/%s?event_id=%s", - url.PathEscape(psjResult.ServerRoom.RoomID), - url.QueryEscape(event.EventID()), - ), - ) - var respStateIDs gomatrixserverlib.RespStateIDs - if err := psjResult.Server.SendFederationRequest(deployment, stateReq, &respStateIDs); err != nil { - stateIdsResultChan <- StateIDsResult{Error: err} - } else { - stateIdsResultChan <- StateIDsResult{RespStateIDs: respStateIDs} - } - }() - - select { - case <-time.After(1 * time.Second): - t.Logf("/state_ids request for event %s blocked as expected", event.EventID()) - // read from the channel and discard the result, so that the goroutine above doesn't block - // indefinitely on the send - defer func() { <-stateIdsResultChan }() - break - case stateIDsResult := <-stateIdsResultChan: - if stateIDsResult.Error != nil { - httpErr, ok := stateIDsResult.Error.(gomatrix.HTTPError) - t.Logf("%v", httpErr) - if ok && httpErr.Code == 403 { - t.Logf("/state_ids request for event %s returned 403 as expected", event.EventID()) - } else { - t.Errorf("/state_ids request returned non-200: %s", stateIDsResult.Error) - } + stateReq := gomatrixserverlib.NewFederationRequest("GET", "hs1", + fmt.Sprintf("/_matrix/federation/v1/state_ids/%s?event_id=%s", + url.PathEscape(psjResult.ServerRoom.RoomID), + url.QueryEscape(event.EventID()), + ), + ) + var respStateIDs gomatrixserverlib.RespStateIDs + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + err := psjResult.Server.SendFederationRequest(ctx, deployment, stateReq, &respStateIDs) + if err != nil { + if netErr, ok := err.(net.Error); ok && netErr.Timeout() { + t.Logf("/state_ids request for event %s blocked as expected", event.EventID()) + } else if httpErr, ok := err.(gomatrix.HTTPError); ok && httpErr.Code == 403 { + t.Logf("/state_ids request for event %s returned 403 as expected", event.EventID()) } else { - // since we have not yet given the homeserver the full state at the join event and allowed - // the partial join to complete, it can't possibly know the full state at the last event. - // While it may be possible for the response to be correct by some accident of state res, - // the homeserver is still wrong in spirit. - t.Fatalf("/state_ids request for event %s did not block when it should have", event.EventID()) + t.Errorf("/state_ids request returned non-200: %s", err) } + } else { + // since we have not yet given the homeserver the full state at the join event and allowed + // the partial join to complete, it can't possibly know the full state at the last event. + // While it may be possible for the response to be correct by some accident of state res, + // the homeserver is still wrong in spirit. + t.Fatalf("/state_ids request for event %s did not block when it should have", event.EventID()) } // allow the partial join to complete @@ -660,14 +642,13 @@ func testReceiveEventDuringPartialStateJoin( ) // check the server's idea of the state at the event. We do this by making a `state_ids` request over federation - stateReq := gomatrixserverlib.NewFederationRequest("GET", "hs1", + stateReq = gomatrixserverlib.NewFederationRequest("GET", "hs1", fmt.Sprintf("/_matrix/federation/v1/state_ids/%s?event_id=%s", url.PathEscape(psjResult.ServerRoom.RoomID), url.QueryEscape(event.EventID()), ), ) - var respStateIDs gomatrixserverlib.RespStateIDs - if err := psjResult.Server.SendFederationRequest(deployment, stateReq, &respStateIDs); err != nil { + if err := psjResult.Server.SendFederationRequest(context.Background(), deployment, stateReq, &respStateIDs); err != nil { t.Errorf("/state_ids request returned non-200: %s", err) return } diff --git a/tests/federation_room_join_test.go b/tests/federation_room_join_test.go index ce6af1dc..50849f2c 100644 --- a/tests/federation_room_join_test.go +++ b/tests/federation_room_join_test.go @@ -385,7 +385,7 @@ func testValidationForSendMembershipEndpoint(t *testing.T, baseApiPath, expected } var res interface{} - err := srv.SendFederationRequest(deployment, req, &res) + err := srv.SendFederationRequest(context.Background(), deployment, req, &res) if err == nil { t.Errorf("send request returned 200") return From 1a0fa9f4952b9278400560c1b4d1468939980536 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Mon, 25 Jul 2022 22:45:00 +0100 Subject: [PATCH 20/21] Add comment explaining purpose of early /state_ids request --- tests/federation_room_join_partial_state_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/federation_room_join_partial_state_test.go b/tests/federation_room_join_partial_state_test.go index 1bc4e649..5fa56c48 100644 --- a/tests/federation_room_join_partial_state_test.go +++ b/tests/federation_room_join_partial_state_test.go @@ -606,7 +606,8 @@ func testReceiveEventDuringPartialStateJoin( // * or 403 because the homeserver does not have full state yet and does not consider the // Complement homeserver to be in the room // Synapse's behaviour will likely change once https://github.com/matrix-org/synapse/issues/13288 - // is resolved. + // is resolved. For now, we use this to check whether Synapse has calculated the partial state + // flag for the last event correctly. stateReq := gomatrixserverlib.NewFederationRequest("GET", "hs1", fmt.Sprintf("/_matrix/federation/v1/state_ids/%s?event_id=%s", From e3d8197f22fb16f62566240f952920f055a9e2bb Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Tue, 26 Jul 2022 12:27:00 +0100 Subject: [PATCH 21/21] fixup: Remove .vscode/settings.json --- .vscode/settings.json | 5 ----- 1 file changed, 5 deletions(-) delete mode 100644 .vscode/settings.json diff --git a/.vscode/settings.json b/.vscode/settings.json deleted file mode 100644 index 55924f1e..00000000 --- a/.vscode/settings.json +++ /dev/null @@ -1,5 +0,0 @@ -{ - "go.testEnvVars": { - "COMPLEMENT_BASE_IMAGE": "complement-synapse" - } -}