Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change tests to accomodate non blocking regular sync #584

Closed
wants to merge 3 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 104 additions & 74 deletions tests/federation_room_join_partial_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,9 @@ func TestPartialStateJoin(t *testing.T) {
deployment := Deploy(t, b.BlueprintAlice)
defer deployment.Destroy(t)

// test that a regular /sync request made during a partial-state /send_join
// request blocks until the state is correctly synced.
t.Run("SyncBlocksDuringPartialStateJoin", func(t *testing.T) {
// test that a non lazy /sync request made during a partial-state /send_join
// request does not return the room until the state is correctly synced.
t.Run("NonLazySyncDuringPartialStateJoin", func(t *testing.T) {
alice := deployment.RegisterUser(t, "hs1", "t1alice", "secret", false)

server := createTestServer(t, deployment)
Expand All @@ -132,38 +132,26 @@ func TestPartialStateJoin(t *testing.T) {

// Alice has now joined the room, and the server is syncing the state in the background.

// attempts to sync should now block. Fire off a goroutine to try it.
syncResponseChan := make(chan gjson.Result)
go func() {
response, _ := alice.MustSync(t, client.SyncReq{})
syncResponseChan <- response
close(syncResponseChan)
}()
// sync shouldn't include the room yet
response, nextBatch := alice.MustSync(t, client.SyncReq{})

syncJoinedRoomPath := "rooms.join." + client.GjsonEscape(serverRoom.RoomID)
if response.Get(syncJoinedRoomPath).Exists() {
t.Fatal("Sync shouldn't include the joined room until resync is over")
}

// wait for the state_ids request to arrive
psjResult.AwaitStateIdsRequest(t)

// the client-side requests should still be waiting
select {
case <-syncResponseChan:
t.Fatalf("Sync completed before state resync complete")
default:
}

// release the federation /state response
psjResult.FinishStateRequest()

// the /sync request should now complete, with the new room
var syncRes gjson.Result
select {
case <-time.After(1 * time.Second):
t.Fatalf("/sync request request did not complete")
case syncRes = <-syncResponseChan:
}
response, _ = alice.MustSync(t, client.SyncReq{Since: nextBatch})

roomRes := syncRes.Get("rooms.join." + client.GjsonEscape(serverRoom.RoomID))
roomRes := response.Get(syncJoinedRoomPath)
if !roomRes.Exists() {
t.Fatalf("/sync completed without join to new room\n")
t.Fatal("Sync should now include the joined room since resync is over")
}

// check that the state includes both charlie and derek.
Expand All @@ -181,6 +169,70 @@ func TestPartialStateJoin(t *testing.T) {
}
})

t.Run("NonLazyLongPollingSyncDuringPartialStateJoin", func(t *testing.T) {
alice := deployment.RegisterUser(t, "hs1", "t99alice", "secret", false)

server := createTestServer(t, deployment)
cancel := server.Listen()
defer cancel()
serverRoom := createTestRoom(t, server, alice.GetDefaultRoomVersion(t))
psjResult := beginPartialStateJoin(t, server, serverRoom, alice)
defer psjResult.Destroy(t)

// Alice has now joined the room, and the server is syncing the state in the background.

// initial sync shouldn't include the room yet, but still return immediatly
response, nextBatch := alice.MustSync(t, client.SyncReq{
TimeoutMillis: "10000",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this adds anything since MustSync uses a timeout of 1000 by default:

query := url.Values{
"timeout": []string{"1000"},
}

I also think I got myself confused when writing up the comment last night.

I've had a go at this in #588.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MV: this isn't addressing the comment of dmr above, this is about the notifier changes (see synapse PR)

})

syncJoinedRoomPath := "rooms.join." + client.GjsonEscape(serverRoom.RoomID)
if response.Get(syncJoinedRoomPath).Exists() {
t.Fatal("Sync shouldn't include the joined room until resync is over")
}

// Begin a long polling sync that shouldn't return yet since no change happened
responseChan := make(chan gjson.Result, 1)
syncStarted := make(chan struct{})
go func() {
defer close(responseChan)
defer close(syncStarted)

syncStarted <- struct{}{}
response, _ := alice.MustSync(t, client.SyncReq{
TimeoutMillis: "10000",
Since: nextBatch,
})
responseChan <- response
}()

// Try to wait for the sync to actually start, then un-partial-state the room
select {
case <-syncStarted:
// wait for the state_ids request to arrive
psjResult.AwaitStateIdsRequest(t)
// release the federation /state response
psjResult.FinishStateRequest()
case <-time.After(time.Second * 5):
// even though this should mostly be impossible, make sure we have a timeout
t.Fatalf("goroutine didn't start")
}

// Try to wait for the sync to return or timeout after 15 seconds,
// as the above tests are using a timeout of 10 seconds
select {
case response = <-responseChan:
case <-time.After(time.Second * 5):
t.Errorf("sync should have returned before the timeout")
}

// the /sync request should now complete, with the new room
roomRes := response.Get(syncJoinedRoomPath)
if !roomRes.Exists() {
t.Fatal("Sync should now include the joined room since resync is over")
}
})

// when Alice does a lazy-loading sync, she should see the room immediately
t.Run("CanLazyLoadingSyncDuringPartialStateJoin", func(t *testing.T) {
alice := deployment.RegisterUser(t, "hs1", "t2alice", "secret", false)
Expand Down Expand Up @@ -917,7 +969,7 @@ func TestPartialStateJoin(t *testing.T) {
})

// test that a partial-state join continues syncing state after a restart
// the same as SyncBlocksDuringPartialStateJoin, with a restart in the middle
// the same as NonLazySyncDuringPartialStateJoin, with a restart in the middle
t.Run("PartialStateJoinContinuesAfterRestart", func(t *testing.T) {
alice := deployment.RegisterUser(t, "hs1", "t12alice", "secret", false)

Expand All @@ -933,44 +985,35 @@ func TestPartialStateJoin(t *testing.T) {
// wait for the state_ids request to arrive
psjResult.AwaitStateIdsRequest(t)

// Non lazy sync shouldn't include the room yet
response, nextBatch := alice.MustSync(t, client.SyncReq{})

syncJoinedRoomPath := "rooms.join." + client.GjsonEscape(serverRoom.RoomID)
if response.Get(syncJoinedRoomPath).Exists() {
t.Fatal("Sync shouldn't include the joined room until resync is over")
}

// restart the homeserver
err := deployment.Restart(t)
if err != nil {
t.Errorf("Failed to restart homeserver: %s", err)
}

// attempts to sync should block. Fire off a goroutine to try it.
syncResponseChan := make(chan gjson.Result)
go func() {
response, _ := alice.MustSync(t, client.SyncReq{})
syncResponseChan <- response
close(syncResponseChan)
}()
// Sync still shouldn't include the room
response, nextBatch = alice.MustSync(t, client.SyncReq{Since: nextBatch})

// we expect another state_ids request to arrive.
// we'd do another AwaitStateIdsRequest, except it's single-use.

// the client-side requests should still be waiting
select {
case <-syncResponseChan:
t.Fatalf("Sync completed before state resync complete")
default:
if response.Get(syncJoinedRoomPath).Exists() {
t.Fatal("Sync shouldn't include the joined room until resync is over")
}

// release the federation /state response
psjResult.FinishStateRequest()

// the /sync request should now complete, with the new room
var syncRes gjson.Result
select {
case <-time.After(1 * time.Second):
t.Fatalf("/sync request request did not complete")
case syncRes = <-syncResponseChan:
}
response, _ = alice.MustSync(t, client.SyncReq{Since: nextBatch})

roomRes := syncRes.Get("rooms.join." + client.GjsonEscape(serverRoom.RoomID))
if !roomRes.Exists() {
t.Fatalf("/sync completed without join to new room\n")
if !response.Get(syncJoinedRoomPath).Exists() {
t.Fatal("Sync should now include the joined room since resync is over")
}
})

Expand Down Expand Up @@ -1029,34 +1072,21 @@ func TestPartialStateJoin(t *testing.T) {
// wait until hs2 starts syncing state
fedStateIdsRequestReceivedWaiter.Waitf(t, 5*time.Second, "Waiting for /state_ids request")

syncResponseChan := make(chan gjson.Result)
go func() {
response, _ := charlie.MustSync(t, client.SyncReq{})
syncResponseChan <- response
close(syncResponseChan)
}()
response, nextBatch := charlie.MustSync(t, client.SyncReq{})

// the client-side requests should still be waiting
select {
case <-syncResponseChan:
t.Fatalf("hs2 sync completed before state resync complete")
default:
// the client-side requests shouldn't report the join yet
syncJoinedRoomPath := "rooms.join." + client.GjsonEscape(roomID)
if response.Get(syncJoinedRoomPath).Exists() {
t.Fatal("Sync shouldn't include the joined room yet")
}

// reply to hs2 with a bogus /state_ids response
fedStateIdsSendResponseWaiter.Finish()

// charlie's /sync request should now complete, with the new room
var syncRes gjson.Result
select {
case <-time.After(1 * time.Second):
t.Fatalf("hs2 /sync request request did not complete")
case syncRes = <-syncResponseChan:
}
response, _ = charlie.MustSync(t, client.SyncReq{Since: nextBatch})

roomRes := syncRes.Get("rooms.join." + client.GjsonEscape(roomID))
if !roomRes.Exists() {
t.Fatalf("hs2 /sync completed without join to new room\n")
if !response.Get(syncJoinedRoomPath).Exists() {
t.Fatal("hs2 /sync completed without join to new room")
}
})

Expand All @@ -1083,7 +1113,7 @@ func TestPartialStateJoin(t *testing.T) {
t.Logf("Alice successfully synced")

// wait for partial state to finish syncing,
// by waiting for the room to show up in a regular /sync.
// by waiting for the room to show up in /sync.
psjResult.AwaitStateIdsRequest(t)
psjResult.FinishStateRequest()
alice.MustSyncUntil(t,
Expand All @@ -1108,7 +1138,7 @@ func TestPartialStateJoin(t *testing.T) {
server.MustSendTransaction(t, deployment, "hs1", []json.RawMessage{event.JSON()}, nil)
}

// wait for the events to come down a regular /sync.
// wait for the events to come down a /sync.
alice.MustSyncUntil(t,
client.SyncReq{},
client.SyncTimelineHasEventID(serverRoom.RoomID, lastEventID),
Expand Down Expand Up @@ -1658,7 +1688,7 @@ func TestPartialStateJoin(t *testing.T) {

// Alice has now joined the room, and the server is syncing the state in the background.

// attempts to sync should now block. Fire off a goroutine to try it.
// attempts to joined_members should now block. Fire off a goroutine to try it.
jmResponseChan := make(chan *http.Response)
go func() {
response := alice.MustDoFunc(t, "GET", []string{"_matrix", "client", "v3", "rooms", serverRoom.RoomID, "joined_members"})
Expand Down