-
Notifications
You must be signed in to change notification settings - Fork 61
Marker events as state - MSC2716 #371
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 6 commits
056ebc0
17e265a
1171ed9
b53ec21
3b3d383
77559bf
3ac5a05
1ff578b
73c78cf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -918,6 +918,175 @@ func TestImportHistoricalMessages(t *testing.T) { | |
}, | ||
}) | ||
}) | ||
|
||
t.Run("Historical messages show up for remote federated homeserver even when the homeserver is missing the part of the timeline where the marker was sent and it paginates before it occured", func(t *testing.T) { | ||
t.Parallel() | ||
|
||
roomID := as.CreateRoom(t, createPublicRoomOpts) | ||
alice.JoinRoom(t, roomID, nil) | ||
|
||
eventIDsBefore := createMessagesInRoom(t, alice, roomID, 1, "eventIDsBefore") | ||
eventIdBefore := eventIDsBefore[0] | ||
timeAfterEventBefore := time.Now() | ||
|
||
eventIDsAfter := createMessagesInRoom(t, alice, roomID, 3, "eventIDsAfter") | ||
eventIDAfter := eventIDsAfter[0] | ||
|
||
// Join the room from a remote homeserver before the historical messages were sent | ||
remoteCharlie.JoinRoom(t, roomID, []string{"hs1"}) | ||
|
||
// Make sure all of the events have been backfilled for the remote user | ||
// before we leave the room | ||
fetchUntilMessagesResponseHas(t, remoteCharlie, roomID, func(ev gjson.Result) bool { | ||
if ev.Get("event_id").Str == eventIdBefore { | ||
return true | ||
} | ||
|
||
return false | ||
}) | ||
|
||
// Leave before the historical messages are imported | ||
remoteCharlie.LeaveRoom(t, roomID) | ||
|
||
batchSendRes := batchSendHistoricalMessages( | ||
t, | ||
as, | ||
roomID, | ||
eventIdBefore, | ||
"", | ||
createJoinStateEventsForBatchSendRequest([]string{virtualUserID}, timeAfterEventBefore), | ||
createMessageEventsForBatchSendRequest([]string{virtualUserID}, timeAfterEventBefore, 2), | ||
// Status | ||
200, | ||
) | ||
batchSendResBody := client.ParseJSON(t, batchSendRes) | ||
historicalEventIDs := client.GetJSONFieldStringArray(t, batchSendResBody, "event_ids") | ||
baseInsertionEventID := client.GetJSONFieldStr(t, batchSendResBody, "base_insertion_event_id") | ||
|
||
// Send the marker event which lets remote homeservers know there are | ||
// some historical messages back at the given insertion event. We | ||
// purposely use the local user Alice here as remoteCharlie isn't even | ||
// in the room at this point in time and even if they were, the purpose | ||
// of this test is to make sure the remote-join will pick up the state, | ||
// not our backfill here. | ||
sendMarkerAndEnsureBackfilled(t, as, alice, roomID, baseInsertionEventID) | ||
|
||
// Add some events after the marker so that remoteCharlie doesn't see the marker | ||
createMessagesInRoom(t, alice, roomID, 3, "eventIDFiller") | ||
|
||
// Join the room from a remote homeserver after the historical messages were sent | ||
remoteCharlie.JoinRoom(t, roomID, []string{"hs1"}) | ||
|
||
// From the remote user, make a /context request for eventIDAfter to get | ||
// pagination token before the marker event | ||
contextRes := remoteCharlie.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "context", eventIDAfter}, client.WithContentType("application/json"), client.WithQueries(url.Values{ | ||
"limit": []string{"0"}, | ||
})) | ||
contextResResBody := client.ParseJSON(t, contextRes) | ||
paginationTokenBeforeMarker := client.GetJSONFieldStr(t, contextResResBody, "end") | ||
|
||
// Start the /messages request from that pagination token which | ||
// jumps/skips over the marker event in the timeline. This is the key | ||
// part of the test. We want to make sure that new marker state can be | ||
// injested and processed to reveal the imported history after a | ||
// remote-join without paginating and backfilling over the spot in the | ||
// timeline with the marker event. | ||
// | ||
// We don't want to use `validateBatchSendRes(t, remoteCharlie, roomID, | ||
// batchSendRes, false)` here because it tests against the full message | ||
// response and we need to skip past the marker in the timeline. | ||
paginateUntilMessageCheckOff(t, remoteCharlie, roomID, paginationTokenBeforeMarker, historicalEventIDs, []string{}) | ||
}) | ||
|
||
t.Run("Historical messages show up for remote federated homeserver even when the homeserver is missing the part of the timeline where multiple marker events were sent and it paginates before they occured", func(t *testing.T) { | ||
t.Parallel() | ||
|
||
roomID := as.CreateRoom(t, createPublicRoomOpts) | ||
alice.JoinRoom(t, roomID, nil) | ||
|
||
// Anything above 1 here should be sufficient to test whether we can | ||
// follow the state and previous state all the way up to injest all of | ||
// the marker events along the way | ||
numBatches := 2 | ||
|
||
eventIDsBefore := createMessagesInRoom(t, alice, roomID, numBatches, "eventIDsBefore") | ||
timeAfterEventBefore := time.Now() | ||
|
||
eventIDsAfter := createMessagesInRoom(t, alice, roomID, 3, "eventIDsAfter") | ||
eventIDAfter := eventIDsAfter[0] | ||
|
||
// Join the room from a remote homeserver before the historical messages were sent | ||
remoteCharlie.JoinRoom(t, roomID, []string{"hs1"}) | ||
|
||
// Make sure all of the events have been backfilled for the remote user | ||
// before we leave the room | ||
fetchUntilMessagesResponseHas(t, remoteCharlie, roomID, func(ev gjson.Result) bool { | ||
if ev.Get("event_id").Str == eventIDsBefore[0] { | ||
return true | ||
} | ||
|
||
return false | ||
}) | ||
|
||
// Leave before the historical messages are imported | ||
remoteCharlie.LeaveRoom(t, roomID) | ||
|
||
var expectedEventIDs []string | ||
for i := 0; i < numBatches; i++ { | ||
// Create separate disconnected batches | ||
batchSendRes := batchSendHistoricalMessages( | ||
t, | ||
as, | ||
roomID, | ||
eventIDsBefore[i], | ||
"", | ||
createJoinStateEventsForBatchSendRequest([]string{virtualUserID}, timeAfterEventBefore), | ||
createMessageEventsForBatchSendRequest([]string{virtualUserID}, timeAfterEventBefore, 2), | ||
// Status | ||
200, | ||
) | ||
batchSendResBody := client.ParseJSON(t, batchSendRes) | ||
historicalEventIDs := client.GetJSONFieldStringArray(t, batchSendResBody, "event_ids") | ||
baseInsertionEventID := client.GetJSONFieldStr(t, batchSendResBody, "base_insertion_event_id") | ||
|
||
// Store the historical events we will expect to see later | ||
expectedEventIDs = append(expectedEventIDs, historicalEventIDs...) | ||
|
||
// Send the marker event which lets remote homeservers know there are | ||
// some historical messages back at the given insertion event. We | ||
// purposely use the local user Alice here as remoteCharlie isn't even | ||
// in the room at this point in time and even if they were, the purpose | ||
// of this test is to make sure the remote-join will pick up the state, | ||
// not our backfill here. | ||
sendMarkerAndEnsureBackfilled(t, as, alice, roomID, baseInsertionEventID) | ||
} | ||
|
||
// Add some events after the marker so that remoteCharlie doesn't see the marker | ||
createMessagesInRoom(t, alice, roomID, 3, "eventIDFiller") | ||
|
||
// Join the room from a remote homeserver after the historical messages were sent | ||
remoteCharlie.JoinRoom(t, roomID, []string{"hs1"}) | ||
|
||
// From the remote user, make a /context request for eventIDAfter to get | ||
// pagination token before the marker event | ||
contextRes := remoteCharlie.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "context", eventIDAfter}, client.WithContentType("application/json"), client.WithQueries(url.Values{ | ||
"limit": []string{"0"}, | ||
})) | ||
contextResResBody := client.ParseJSON(t, contextRes) | ||
paginationTokenBeforeMarker := client.GetJSONFieldStr(t, contextResResBody, "end") | ||
|
||
// Start the /messages request from that pagination token which | ||
// jumps/skips over the marker event in the timeline. This is the key | ||
// part of the test. We want to make sure that new marker state can be | ||
// injested and processed to reveal the imported history after a | ||
// remote-join without paginating and backfilling over the spot in the | ||
// timeline with the marker event. | ||
// | ||
// We don't want to use `validateBatchSendRes(t, remoteCharlie, roomID, | ||
// batchSendRes, false)` here because it tests against the full message | ||
// response and we need to skip past the marker in the timeline. | ||
paginateUntilMessageCheckOff(t, remoteCharlie, roomID, paginationTokenBeforeMarker, expectedEventIDs, []string{}) | ||
}) | ||
}) | ||
|
||
t.Run("Existing room versions", func(t *testing.T) { | ||
|
@@ -1092,6 +1261,103 @@ func fetchUntilMessagesResponseHas(t *testing.T, c *client.CSAPI, roomID string, | |
} | ||
} | ||
|
||
// Paginate the /messages endpoint until we find all of the expectedEventIds | ||
// (order does not matter). If any event in denyListEventIDs is found, an error | ||
// will be thrown. | ||
func paginateUntilMessageCheckOff(t *testing.T, c *client.CSAPI, roomID string, fromPaginationToken string, expectedEventIDs []string, denyListEventIDs []string) { | ||
t.Helper() | ||
start := time.Now() | ||
|
||
workingExpectedEventIDMap := make(map[string]string) | ||
for _, expectedEventID := range expectedEventIDs { | ||
workingExpectedEventIDMap[expectedEventID] = expectedEventID | ||
} | ||
|
||
denyEventIDMap := make(map[string]string) | ||
for _, denyEventID := range denyListEventIDs { | ||
denyEventIDMap[denyEventID] = denyEventID | ||
} | ||
|
||
var actualEventIDList []string | ||
callCounter := 0 | ||
messageResEnd := fromPaginationToken | ||
generateErrorMesssageInfo := func() string { | ||
i := 0 | ||
leftoverEventIDs := make([]string, len(workingExpectedEventIDMap)) | ||
for eventID := range workingExpectedEventIDMap { | ||
leftoverEventIDs[i] = eventID | ||
i++ | ||
} | ||
|
||
return fmt.Sprintf("Called /messages %d times but only found %d/%d expected messages. Leftover messages we expected (%d): %s. We saw %d events over all of the API calls: %s", | ||
callCounter, | ||
len(expectedEventIDs)-len(leftoverEventIDs), | ||
len(expectedEventIDs), | ||
len(leftoverEventIDs), | ||
leftoverEventIDs, | ||
len(actualEventIDList), | ||
actualEventIDList, | ||
) | ||
} | ||
|
||
for { | ||
if time.Since(start) > 200*c.SyncUntilTimeout { | ||
MadLittleMods marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
t.Fatalf( | ||
"paginateUntilMessageCheckOff timed out. %s", | ||
generateErrorMesssageInfo(), | ||
) | ||
} | ||
|
||
messagesRes := c.MustDoFunc(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{ | ||
"dir": []string{"b"}, | ||
"limit": []string{"100"}, | ||
"from": []string{messageResEnd}, | ||
})) | ||
callCounter++ | ||
messsageResBody := client.ParseJSON(t, messagesRes) | ||
messageResEnd = client.GetJSONFieldStr(t, messsageResBody, "end") | ||
// Since the original body can only be read once, create a new one from the body bytes we just read | ||
messagesRes.Body = ioutil.NopCloser(bytes.NewBuffer(messsageResBody)) | ||
|
||
foundEventInMessageResponse := false | ||
must.MatchResponse(t, messagesRes, match.HTTPResponse{ | ||
JSON: []match.JSON{ | ||
match.JSONArrayEach("chunk", func(ev gjson.Result) error { | ||
foundEventInMessageResponse = true | ||
eventID := ev.Get("event_id").Str | ||
actualEventIDList = append(actualEventIDList, eventID) | ||
|
||
if _, keyExists := denyEventIDMap[eventID]; keyExists { | ||
return fmt.Errorf( | ||
"paginateUntilMessageCheckOff found unexpected message=%s in deny list while paginating. %s", | ||
eventID, | ||
generateErrorMesssageInfo(), | ||
) | ||
} | ||
|
||
if _, keyExists := workingExpectedEventIDMap[eventID]; keyExists { | ||
delete(workingExpectedEventIDMap, eventID) | ||
} | ||
|
||
return nil | ||
}), | ||
}, | ||
}) | ||
|
||
if !foundEventInMessageResponse { | ||
t.Fatalf( | ||
"paginateUntilMessageCheckOff reached the end of the messages without finding all expected events. %s", | ||
generateErrorMesssageInfo(), | ||
) | ||
} | ||
|
||
// We were able to find all of the expected events! | ||
if len(workingExpectedEventIDMap) == 0 { | ||
return | ||
} | ||
} | ||
} | ||
|
||
func historicalEventFilter(r gjson.Result) bool { | ||
// This includes messages, insertion, batch, and marker events because we | ||
// include the historical field on all of them. | ||
|
@@ -1169,9 +1435,10 @@ func sendMarkerAndEnsureBackfilled(t *testing.T, as *client.CSAPI, c *client.CSA | |
markerInsertionContentField: insertionEventID, | ||
}, | ||
} | ||
// We can't use as.SendEventSynced(...) because application services can't use the /sync API | ||
txnId := getTxnID("sendMarkerAndEnsureBackfilled-txn") | ||
markerSendRes := as.MustDoFunc(t, "PUT", []string{"_matrix", "client", "r0", "rooms", roomID, "send", markerEvent.Type, txnId}, client.WithJSONBody(t, markerEvent.Content)) | ||
// Marker events should have unique state_key so they all show up in the current state to process. | ||
unique_state_key := getTxnID("marker_state_key") | ||
// We can't use as.SendEventSynced(...) because application services can't use the /sync API. | ||
markerSendRes := as.MustDoFunc(t, "PUT", []string{"_matrix", "client", "r0", "rooms", roomID, "state", markerEvent.Type, unique_state_key}, client.WithJSONBody(t, markerEvent.Content)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sending marker events as state now so they are always able to be seen by homeservers (not lost in some timeline gap). Marker events should be sent with a unique
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. From a design/protocol perspective, this feels sub-optimal. What you really wanted was something like MSC2836 which would allow you to walk back up marker events over federation without needing to put things into state, but oh well. |
||
markerSendBody := client.ParseJSON(t, markerSendRes) | ||
markerEventID = client.GetJSONFieldStr(t, markerSendBody, "event_id") | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function is originally from #214 but saw that we could use it here before that merges