Skip to content

Commit

Permalink
Marker events as state - MSC2716 (#371)
Browse files Browse the repository at this point in the history
Synapse changes: matrix-org/synapse#12718

Part of MSC2716: matrix-org/matrix-spec-proposals#2716

We're testing to make sure historical messages show up for a remote federated homeserver even when the homeserver is missing the part of the timeline where the marker events were sent and it paginates before they occurred to see if the history is available. Making sure the homeserver processes all of the markers from the current state instead of just when it sees them in the timeline.

Sending marker events as state now so they are always able to be seen by homeservers (not lost in some timeline gap).

Marker events should be sent with a unique `state_key` so that they can all resolve in the current state to easily be discovered.

 - If we re-use the same `state_key` (like `""`), then we would have to fetch previous snapshots of state up through time to find all of the marker events. This way we can avoid all of that.
 - Also avoids state resolution conflicts where only one of the marker events win

As a homeserver, when we see new marker state, we know there is new history imported somewhere back in time and should process it to fetch the insertion event where the historical messages are and set it as an insertion extremity. This way we know where to backfill more messages when someone asks for scrollback.

Also includes `paginateUntilMessageCheckOff` from
#214
  • Loading branch information
MadLittleMods authored May 24, 2022
1 parent 9013049 commit 933c9be
Showing 1 changed file with 209 additions and 3 deletions.
212 changes: 209 additions & 3 deletions tests/msc2716_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -918,6 +918,114 @@ func TestImportHistoricalMessages(t *testing.T) {
},
})
})

// We're testing to make sure historical messages show up for a remote
// federated homeserver even when the homeserver is missing the part of
// the timeline where the marker events were sent and it paginates before
// they occured to see if the history is available. Making sure the
// homeserver processes all of the markers from the current state instead
// of just when it sees them in the timeline.
testHistoricalMessagesAppearForRemoteHomeserverWhenMissingPartOfTimelineWithMarker := func(t *testing.T, numBatches int) {
t.Helper()

roomID := as.CreateRoom(t, createPublicRoomOpts)
alice.JoinRoom(t, roomID, nil)

eventIDsBefore := createMessagesInRoom(t, alice, roomID, numBatches, "eventIDsBefore")
timeAfterEventBefore := time.Now()

eventIDsAfter := createMessagesInRoom(t, alice, roomID, 3, "eventIDsAfter")
eventIDAfter := eventIDsAfter[0]

// Join the room from a remote homeserver before the historical messages were sent
remoteCharlie.JoinRoom(t, roomID, []string{"hs1"})

// Make sure all of the events have been backfilled for the remote user
// before we leave the room
fetchUntilMessagesResponseHas(t, remoteCharlie, roomID, func(ev gjson.Result) bool {
if ev.Get("event_id").Str == eventIDsBefore[0] {
return true
}

return false
})

// Leave before the historical messages are imported
remoteCharlie.LeaveRoom(t, roomID)

var expectedEventIDs []string
for i := 0; i < numBatches; i++ {
// Create separate disconnected batches
batchSendRes := batchSendHistoricalMessages(
t,
as,
roomID,
eventIDsBefore[i],
"",
createJoinStateEventsForBatchSendRequest([]string{virtualUserID}, timeAfterEventBefore),
createMessageEventsForBatchSendRequest([]string{virtualUserID}, timeAfterEventBefore, 2),
// Status
200,
)
batchSendResBody := client.ParseJSON(t, batchSendRes)
historicalEventIDs := client.GetJSONFieldStringArray(t, batchSendResBody, "event_ids")
baseInsertionEventID := client.GetJSONFieldStr(t, batchSendResBody, "base_insertion_event_id")

// Store the historical events we will expect to see later
expectedEventIDs = append(expectedEventIDs, historicalEventIDs...)

// Send the marker event which lets remote homeservers know there are
// some historical messages back at the given insertion event. We
// purposely use the local user Alice here as remoteCharlie isn't even
// in the room at this point in time and even if they were, the purpose
// of this test is to make sure the remote-join will pick up the state,
// not our backfill here.
sendMarkerAndEnsureBackfilled(t, as, alice, roomID, baseInsertionEventID)
}

// Add some events after the marker so that remoteCharlie doesn't see the marker
createMessagesInRoom(t, alice, roomID, 3, "eventIDFiller")

// Join the room from a remote homeserver after the historical messages were sent
remoteCharlie.JoinRoom(t, roomID, []string{"hs1"})

// From the remote user, make a /context request for eventIDAfter to get
// pagination token before the marker event
contextRes := remoteCharlie.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "context", eventIDAfter}, client.WithContentType("application/json"), client.WithQueries(url.Values{
"limit": []string{"0"},
}))
contextResResBody := client.ParseJSON(t, contextRes)
paginationTokenBeforeMarker := client.GetJSONFieldStr(t, contextResResBody, "end")

// Start the /messages request from that pagination token which
// jumps/skips over the marker event in the timeline. This is the key
// part of the test. We want to make sure that new marker state can be
// injested and processed to reveal the imported history after a
// remote-join without paginating and backfilling over the spot in the
// timeline with the marker event.
//
// We don't want to use `validateBatchSendRes(t, remoteCharlie, roomID,
// batchSendRes, false)` here because it tests against the full message
// response and we need to skip past the marker in the timeline.
paginateUntilMessageCheckOff(t, remoteCharlie, roomID, paginationTokenBeforeMarker, expectedEventIDs, []string{})
}

t.Run("Historical messages show up for remote federated homeserver even when the homeserver is missing the part of the timeline where the marker was sent and it paginates before it occured", func(t *testing.T) {
t.Parallel()

testHistoricalMessagesAppearForRemoteHomeserverWhenMissingPartOfTimelineWithMarker(t, 1)
})

t.Run("Historical messages show up for remote federated homeserver even when the homeserver is missing the part of the timeline where multiple marker events were sent and it paginates before they occured", func(t *testing.T) {
t.Parallel()

testHistoricalMessagesAppearForRemoteHomeserverWhenMissingPartOfTimelineWithMarker(
t,
// Anything above 1 here should be sufficient to test whether we can
// process all of the current state to injest all of the marker events
2,
)
})
})

t.Run("Existing room versions", func(t *testing.T) {
Expand Down Expand Up @@ -1092,6 +1200,103 @@ func fetchUntilMessagesResponseHas(t *testing.T, c *client.CSAPI, roomID string,
}
}

// Paginate the /messages endpoint until we find all of the expectedEventIds
// (order does not matter). If any event in denyListEventIDs is found, an error
// will be thrown.
func paginateUntilMessageCheckOff(t *testing.T, c *client.CSAPI, roomID string, fromPaginationToken string, expectedEventIDs []string, denyListEventIDs []string) {
t.Helper()
start := time.Now()

workingExpectedEventIDMap := make(map[string]string)
for _, expectedEventID := range expectedEventIDs {
workingExpectedEventIDMap[expectedEventID] = expectedEventID
}

denyEventIDMap := make(map[string]string)
for _, denyEventID := range denyListEventIDs {
denyEventIDMap[denyEventID] = denyEventID
}

var actualEventIDList []string
callCounter := 0
messageResEnd := fromPaginationToken
generateErrorMesssageInfo := func() string {
i := 0
leftoverEventIDs := make([]string, len(workingExpectedEventIDMap))
for eventID := range workingExpectedEventIDMap {
leftoverEventIDs[i] = eventID
i++
}

return fmt.Sprintf("Called /messages %d times but only found %d/%d expected messages. Leftover messages we expected (%d): %s. We saw %d events over all of the API calls: %s",
callCounter,
len(expectedEventIDs)-len(leftoverEventIDs),
len(expectedEventIDs),
len(leftoverEventIDs),
leftoverEventIDs,
len(actualEventIDList),
actualEventIDList,
)
}

for {
if time.Since(start) > c.SyncUntilTimeout {
t.Fatalf(
"paginateUntilMessageCheckOff timed out. %s",
generateErrorMesssageInfo(),
)
}

messagesRes := c.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{
"dir": []string{"b"},
"limit": []string{"100"},
"from": []string{messageResEnd},
}))
callCounter++
messsageResBody := client.ParseJSON(t, messagesRes)
messageResEnd = client.GetJSONFieldStr(t, messsageResBody, "end")
// Since the original body can only be read once, create a new one from the body bytes we just read
messagesRes.Body = ioutil.NopCloser(bytes.NewBuffer(messsageResBody))

foundEventInMessageResponse := false
must.MatchResponse(t, messagesRes, match.HTTPResponse{
JSON: []match.JSON{
match.JSONArrayEach("chunk", func(ev gjson.Result) error {
foundEventInMessageResponse = true
eventID := ev.Get("event_id").Str
actualEventIDList = append(actualEventIDList, eventID)

if _, keyExists := denyEventIDMap[eventID]; keyExists {
return fmt.Errorf(
"paginateUntilMessageCheckOff found unexpected message=%s in deny list while paginating. %s",
eventID,
generateErrorMesssageInfo(),
)
}

if _, keyExists := workingExpectedEventIDMap[eventID]; keyExists {
delete(workingExpectedEventIDMap, eventID)
}

return nil
}),
},
})

if !foundEventInMessageResponse {
t.Fatalf(
"paginateUntilMessageCheckOff reached the end of the messages without finding all expected events. %s",
generateErrorMesssageInfo(),
)
}

// We were able to find all of the expected events!
if len(workingExpectedEventIDMap) == 0 {
return
}
}
}

func historicalEventFilter(r gjson.Result) bool {
// This includes messages, insertion, batch, and marker events because we
// include the historical field on all of them.
Expand Down Expand Up @@ -1169,9 +1374,10 @@ func sendMarkerAndEnsureBackfilled(t *testing.T, as *client.CSAPI, c *client.CSA
markerInsertionContentField: insertionEventID,
},
}
// We can't use as.SendEventSynced(...) because application services can't use the /sync API
txnId := getTxnID("sendMarkerAndEnsureBackfilled-txn")
markerSendRes := as.MustDoFunc(t, "PUT", []string{"_matrix", "client", "r0", "rooms", roomID, "send", markerEvent.Type, txnId}, client.WithJSONBody(t, markerEvent.Content))
// Marker events should have unique state_key so they all show up in the current state to process.
unique_state_key := getTxnID("marker_state_key")
// We can't use as.SendEventSynced(...) because application services can't use the /sync API.
markerSendRes := as.MustDoFunc(t, "PUT", []string{"_matrix", "client", "r0", "rooms", roomID, "state", markerEvent.Type, unique_state_key}, client.WithJSONBody(t, markerEvent.Content))
markerSendBody := client.ParseJSON(t, markerSendRes)
markerEventID = client.GetJSONFieldStr(t, markerSendBody, "event_id")

Expand Down

0 comments on commit 933c9be

Please sign in to comment.