Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ jobs:
matrix:
include:
- homeserver: Synapse
tags: synapse_blacklist,msc3083
tags: synapse_blacklist msc3083 faster_joins

- homeserver: Dendrite
tags: msc2836 dendrite_blacklist
Expand Down
316 changes: 236 additions & 80 deletions tests/federation_room_join_partial_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"encoding/json"
"fmt"
"net/http"
"net/url"
"strings"
"testing"
"time"
Expand All @@ -19,117 +20,272 @@ import (

"github.com/matrix-org/complement/internal/b"
"github.com/matrix-org/complement/internal/client"
"github.com/matrix-org/complement/internal/docker"
"github.com/matrix-org/complement/internal/federation"
"github.com/matrix-org/complement/internal/match"
"github.com/matrix-org/complement/internal/must"
)

// TestSyncBlocksDuringPartialStateJoin tests that a regular /sync request
// made during a partial-state /send_join request blocks until the state is
// correctly synced.
func TestSyncBlocksDuringPartialStateJoin(t *testing.T) {
// We make a room on the Complement server, then have @alice:hs1 join it,
// and make a sync request while the resync is in flight
func TestPartialStateJoin(t *testing.T) {
// test that a regular /sync request made during a partial-state /send_join
// request blocks until the state is correctly synced.
t.Run("SyncBlocksDuringPartialStateJoin", func(t *testing.T) {
deployment := Deploy(t, b.BlueprintAlice)
defer deployment.Destroy(t)
alice := deployment.Client(t, "hs1", "@alice:hs1")

deployment := Deploy(t, b.BlueprintAlice)
defer deployment.Destroy(t)
psjResult := beginPartialStateJoin(t, deployment, alice)
defer psjResult.Destroy()

alice := deployment.Client(t, "hs1", "@alice:hs1")
// Alice has now joined the room, and the server is syncing the state in the background.

srv := federation.NewServer(t, deployment,
// attempts to sync should now block. Fire off a goroutine to try it.
syncResponseChan := make(chan gjson.Result)
defer close(syncResponseChan)
go func() {
response, _ := alice.MustSync(t, client.SyncReq{})
syncResponseChan <- response
}()

// wait for the state_ids request to arrive
psjResult.AwaitStateIdsRequest(t)

// the client-side requests should still be waiting
select {
case <-syncResponseChan:
t.Fatalf("Sync completed before state resync complete")
default:
}

// release the federation /state response
psjResult.FinishStateRequest()

// the /sync request should now complete, with the new room
var syncRes gjson.Result
select {
case <-time.After(1 * time.Second):
t.Fatalf("/sync request request did not complete")
case syncRes = <-syncResponseChan:
}

roomRes := syncRes.Get("rooms.join." + client.GjsonEscape(psjResult.ServerRoom.RoomID))
if !roomRes.Exists() {
t.Fatalf("/sync completed without join to new room\n")
}

// check that the state includes both charlie and derek.
matcher := match.JSONCheckOffAllowUnwanted("state.events",
[]interface{}{
"m.room.member|" + psjResult.Server.UserID("charlie"),
"m.room.member|" + psjResult.Server.UserID("derek"),
}, func(result gjson.Result) interface{} {
return strings.Join([]string{result.Map()["type"].Str, result.Map()["state_key"].Str}, "|")
}, nil,
)
if err := matcher([]byte(roomRes.Raw)); err != nil {
t.Errorf("Did not find expected state events in /sync response: %s", err)

}
})

// when Alice does a lazy-loading sync, she should see the room immediately
t.Run("CanLazyLoadingSyncDuringPartialStateJoin", func(t *testing.T) {
deployment := Deploy(t, b.BlueprintAlice)
defer deployment.Destroy(t)
alice := deployment.Client(t, "hs1", "@alice:hs1")

psjResult := beginPartialStateJoin(t, deployment, alice)
defer psjResult.Destroy()

alice.MustSyncUntil(t,
client.SyncReq{
Filter: buildLazyLoadingSyncFilter(),
},
client.SyncJoinedTo(alice.UserID, psjResult.ServerRoom.RoomID),
)
t.Logf("Alice successfully synced")
})

// we should be able to send events in the room, during the resync
t.Run("CanSendEventsDuringPartialStateJoin", func(t *testing.T) {
t.Skip("Cannot yet send events during resync")
deployment := Deploy(t, b.BlueprintAlice)
defer deployment.Destroy(t)
alice := deployment.Client(t, "hs1", "@alice:hs1")

psjResult := beginPartialStateJoin(t, deployment, alice)
defer psjResult.Destroy()

alice.Client.Timeout = 2 * time.Second
paths := []string{"_matrix", "client", "r0", "rooms", psjResult.ServerRoom.RoomID, "send", "m.room.message", "0"}
res := alice.MustDoFunc(t, "PUT", paths, client.WithJSONBody(t, map[string]interface{}{
"msgtype": "m.text",
"body": "Hello world!",
}))
body := gjson.ParseBytes(client.ParseJSON(t, res))
eventID := body.Get("event_id").Str
t.Logf("Alice sent event event ID %s", eventID)
})

// a request to (client-side) /members?at= should block until the (federation) /state request completes
// TODO(faster_joins): also need to test /state, and /members without an `at`, which follow a different path
t.Run("MembersRequestBlocksDuringPartialStateJoin", func(t *testing.T) {
deployment := Deploy(t, b.BlueprintAlice)
defer deployment.Destroy(t)
alice := deployment.Client(t, "hs1", "@alice:hs1")

psjResult := beginPartialStateJoin(t, deployment, alice)
defer psjResult.Destroy()

// we need a sync token to pass to the `at` param.
syncToken := alice.MustSyncUntil(t,
client.SyncReq{
Filter: buildLazyLoadingSyncFilter(),
},
client.SyncJoinedTo(alice.UserID, psjResult.ServerRoom.RoomID),
)
t.Logf("Alice successfully synced")

// Fire off a goroutine to send the request, and write the response back to a channel.
clientMembersRequestResponseChan := make(chan *http.Response)
defer close(clientMembersRequestResponseChan)
go func() {
queryParams := url.Values{}
queryParams.Set("at", syncToken)
clientMembersRequestResponseChan <- alice.MustDoFunc(
t,
"GET",
[]string{"_matrix", "client", "r0", "rooms", psjResult.ServerRoom.RoomID, "members"},
client.WithQueries(queryParams),
)
}()

// release the federation /state response
psjResult.FinishStateRequest()

// the client-side /members request should now complete, with a response that includes charlie and derek.
select {
case <-time.After(1 * time.Second):
t.Fatalf("client-side /members request did not complete")
case res := <-clientMembersRequestResponseChan:
must.MatchResponse(t, res, match.HTTPResponse{
JSON: []match.JSON{
match.JSONCheckOff("chunk",
[]interface{}{
"m.room.member|" + alice.UserID,
"m.room.member|" + psjResult.Server.UserID("charlie"),
"m.room.member|" + psjResult.Server.UserID("derek"),
}, func(result gjson.Result) interface{} {
return strings.Join([]string{result.Map()["type"].Str, result.Map()["state_key"].Str}, "|")
}, nil),
},
})
}
})
}

// buildLazyLoadingSyncFilter constructs a json-marshalled filter suitable the 'Filter' field of a client.SyncReq
func buildLazyLoadingSyncFilter() string {
j, _ := json.Marshal(map[string]interface{}{
"room": map[string]interface{}{
"timeline": map[string]interface{}{
"lazy_load_members": true,
},
"state": map[string]interface{}{
"lazy_load_members": true,
},
},
})
return string(j)
}

// partialStateJoinResult is the result of beginPartialStateJoin
type partialStateJoinResult struct {
cancelListener func()
Server *federation.Server
ServerRoom *federation.ServerRoom
fedStateIdsRequestReceivedWaiter *Waiter
fedStateIdsSendResponseWaiter *Waiter
}

// beginPartialStateJoin spins up a room on a complement server,
// then has a test user join it. It returns a partialStateJoinResult,
// which must be Destroy'd on completion.
//
// When this method completes, the /join request will have completed, but the
// state has not yet been re-synced. To allow the re-sync to proceed, call
// partialStateJoinResult.FinishStateRequest.
func beginPartialStateJoin(t *testing.T, deployment *docker.Deployment, joiningUser *client.CSAPI) partialStateJoinResult {
result := partialStateJoinResult{}
success := false
defer func() {
if !success {
result.Destroy()
}
}()

result.Server = federation.NewServer(t, deployment,
federation.HandleKeyRequests(),
federation.HandlePartialStateMakeSendJoinRequests(),
federation.HandleEventRequests(),
)
cancel := srv.Listen()
defer cancel()
result.cancelListener = result.Server.Listen()

// some things for orchestration
fedStateIdsRequestReceivedWaiter := NewWaiter()
defer fedStateIdsRequestReceivedWaiter.Finish()
fedStateIdsSendResponseWaiter := NewWaiter()
defer fedStateIdsSendResponseWaiter.Finish()
result.fedStateIdsRequestReceivedWaiter = NewWaiter()
result.fedStateIdsSendResponseWaiter = NewWaiter()

// create the room on the complement server, with charlie and derek as members
charlie := srv.UserID("charlie")
derek := srv.UserID("derek")
serverRoom := makeTestRoom(t, srv, alice.GetDefaultRoomVersion(t), charlie, derek)
roomVer := joiningUser.GetDefaultRoomVersion(t)
result.ServerRoom = result.Server.MustMakeRoom(t, roomVer, federation.InitialRoomEvents(roomVer, result.Server.UserID("charlie")))
result.ServerRoom.AddEvent(result.Server.MustCreateEvent(t, result.ServerRoom, b.Event{
Type: "m.room.member",
StateKey: b.Ptr(result.Server.UserID("derek")),
Sender: result.Server.UserID("derek"),
Content: map[string]interface{}{
"membership": "join",
},
}))

// register a handler for /state_ids requests, which finishes fedStateIdsRequestReceivedWaiter, then
// waits for fedStateIdsSendResponseWaiter and sends a reply
handleStateIdsRequests(t, srv, serverRoom, fedStateIdsRequestReceivedWaiter, fedStateIdsSendResponseWaiter)
handleStateIdsRequests(t, result.Server, result.ServerRoom, result.fedStateIdsRequestReceivedWaiter, result.fedStateIdsSendResponseWaiter)

// a handler for /state requests, which sends a sensible response
handleStateRequests(t, srv, serverRoom, nil, nil)

// have alice join the room by room ID.
alice.JoinRoom(t, serverRoom.RoomID, []string{srv.ServerName()})
t.Logf("Join completed")
handleStateRequests(t, result.Server, result.ServerRoom, nil, nil)

// Alice has now joined the room, and the server is syncing the state in the background.
// have joiningUser join the room by room ID.
joiningUser.JoinRoom(t, result.ServerRoom.RoomID, []string{result.Server.ServerName()})
t.Logf("/join request completed")

// attempts to sync should now block. Fire off a goroutine to try it.
syncResponseChan := make(chan gjson.Result)
defer close(syncResponseChan)
go func() {
response, _ := alice.MustSync(t, client.SyncReq{})
syncResponseChan <- response
}()

// wait for the state_ids request to arrive
fedStateIdsRequestReceivedWaiter.Waitf(t, 5*time.Second, "Waiting for /state_ids request")
success = true
return result
}

// the client-side requests should still be waiting
select {
case <-syncResponseChan:
t.Fatalf("Sync completed before state resync complete")
default:
// Destroy cleans up the resources associated with the join attempt. It must
// be called once the test is finished
func (psj *partialStateJoinResult) Destroy() {
if psj.fedStateIdsSendResponseWaiter != nil {
psj.fedStateIdsSendResponseWaiter.Finish()
}

// release the federation /state response
fedStateIdsSendResponseWaiter.Finish()

// the /sync request should now complete, with the new room
var syncRes gjson.Result
select {
case <-time.After(1 * time.Second):
t.Fatalf("/sync request request did not complete")
case syncRes = <-syncResponseChan:
if psj.fedStateIdsRequestReceivedWaiter != nil {
psj.fedStateIdsRequestReceivedWaiter.Finish()
}

roomRes := syncRes.Get("rooms.join." + client.GjsonEscape(serverRoom.RoomID))
if !roomRes.Exists() {
t.Fatalf("/sync completed without join to new room\n")
if psj.cancelListener != nil {
psj.cancelListener()
}
}

// check that the state includes both charlie and derek.
matcher := match.JSONCheckOffAllowUnwanted("state.events",
[]interface{}{
"m.room.member|" + charlie,
"m.room.member|" + derek,
}, func(result gjson.Result) interface{} {
return strings.Join([]string{result.Map()["type"].Str, result.Map()["state_key"].Str}, "|")
}, nil,
)
if err := matcher([]byte(roomRes.Raw)); err != nil {
t.Errorf("Did not find expected state events in /sync response: %s", err)
}
// wait for a /state_ids request for the test room to arrive
func (psj *partialStateJoinResult) AwaitStateIdsRequest(t *testing.T) {
psj.fedStateIdsRequestReceivedWaiter.Waitf(t, 5*time.Second, "Waiting for /state_ids request")
}

// makeTestRoom constructs a test room on the Complement server, and adds the given extra members
func makeTestRoom(t *testing.T, srv *federation.Server, roomVer gomatrixserverlib.RoomVersion, creator string, members ...string) *federation.ServerRoom {
serverRoom := srv.MustMakeRoom(t, roomVer, federation.InitialRoomEvents(roomVer, creator))
for _, m := range members {
serverRoom.AddEvent(srv.MustCreateEvent(t, serverRoom, b.Event{
Type: "m.room.member",
StateKey: b.Ptr(m),
Sender: m,
Content: map[string]interface{}{
"membership": "join",
},
}),
)
}
return serverRoom
// allow the /state_ids request to complete, thus allowing the state re-sync to complete
func (psj *partialStateJoinResult) FinishStateRequest() {
psj.fedStateIdsSendResponseWaiter.Finish()
}

// handleStateIdsRequests registers a handler for /state_ids requests for serverRoom.
Expand Down