From e854861a35eb903d611bae0fad50942dc103ee2a Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 10 Nov 2025 16:28:10 -0600 Subject: [PATCH 01/14] Add test to ensure history is visible after joining new room over federation --- tests/csapi/room_messages_test.go | 283 ++++++++++++++++++++++++++++++ 1 file changed, 283 insertions(+) diff --git a/tests/csapi/room_messages_test.go b/tests/csapi/room_messages_test.go index 373f3363..5f1377b4 100644 --- a/tests/csapi/room_messages_test.go +++ b/tests/csapi/room_messages_test.go @@ -1,9 +1,13 @@ package csapi_tests import ( + "encoding/json" "fmt" "net/http" "net/url" + "slices" + "strconv" + "strings" "testing" "github.com/tidwall/gjson" @@ -15,6 +19,7 @@ import ( "github.com/matrix-org/complement/match" "github.com/matrix-org/complement/must" "github.com/matrix-org/complement/runtime" + "github.com/matrix-org/gomatrixserverlib/spec" ) // sytest: POST /rooms/:room_id/send/:event_type sends a message @@ -220,3 +225,281 @@ func TestRoomMessagesLazyLoadingLocalUser(t *testing.T) { }, }) } + +type MessageDraft struct { + Sender *client.CSAPI + Message string +} + +type EventInfo struct { + MessageDraft MessageDraft + EventID string +} + +func TestMessagesOverFederation(t *testing.T) { + deployment := complement.Deploy(t, 2) + defer deployment.Destroy(t) + + alice := deployment.Register(t, "hs1", helpers.RegistrationOpts{ + LocalpartSuffix: "alice", + }) + bob := deployment.Register(t, "hs2", helpers.RegistrationOpts{ + LocalpartSuffix: "bob", + }) + + t.Run("Visible history after joining new room (backfill)", func(t *testing.T) { + // Some homeservers have different hard-limits for /messages requests (Synapse's + // `MAX_LIMIT` is 1000) so we test a few different variations. + for _, testCase := range []struct { + name string + numberOfMessagesToSend int + messagesRequestLimit int + }{ + { + name: "`messagesRequestLimit` is lower than the number of messages backfilled (assumed)", + // We send more messages than fit in one request + numberOfMessagesToSend: 20, + // This is the default limit in the Matrix spec so it's bound to be lower than + // the number of messages that are backfilled. + messagesRequestLimit: 10, + }, + // { + // name: "`messagesRequestLimit` is greater than the number of messages backfilled (in Synapse, 100)", + // // We send more messages than fit in one request + // numberOfMessagesToSend: 300, + // // We request more messages than Synapse tries to backfill at once (which is 100) + // messagesRequestLimit: 200, + // }, + } { + t.Run(testCase.name, func(t *testing.T) { + // Alice creates the room + roomID := alice.MustCreateRoom(t, map[string]interface{}{"preset": "public_chat"}) + + // Keep track of the order + eventIDs := make([]string, 0) + // Map from event_id to event info + eventMap := make(map[string]EventInfo) + + messageDrafts := make([]MessageDraft, 0, testCase.numberOfMessagesToSend) + for i := 0; i < testCase.numberOfMessagesToSend; i++ { + messageDrafts = append(messageDrafts, MessageDraft{alice, fmt.Sprintf("Filler message %d to increase history size.", i+1)}) + } + sendAndTrackMessages(t, roomID, messageDrafts, &eventIDs, &eventMap) + + // Bob joins the room + bob.MustJoinRoom(t, roomID, []spec.ServerName{ + deployment.GetFullyQualifiedHomeserverName(t, "hs1"), + }) + awaitPartialStateJoinCompletion(t, roomID, bob) + + // Make it easy to cross-reference the events being talked about in the logs + for eventIndex, eventID := range eventIDs { + // messageDraft := eventMap[eventID].MessageDraft + t.Logf("Message %d -> event_id=%s", eventIndex, eventID) + } + + // Keep paginating backwards until we reach the start of the room + actualEventIDs := make( + []string, + 0, + // This is a minimum capacity (there will be more events) + testCase.numberOfMessagesToSend, + ) + fromToken := "" + for { + messageQueryParams := url.Values{ + "dir": []string{"b"}, + "limit": []string{strconv.Itoa(testCase.messagesRequestLimit)}, + } + if fromToken != "" { + messageQueryParams.Set("from", fromToken) + } + + messagesRes := bob.MustDo(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, + client.WithContentType("application/json"), + client.WithQueries(messageQueryParams), + ) + messagesResBody := client.ParseJSON(t, messagesRes) + actualEventIDsFromRequest := extractEventIDsFromMessagesResponse(t, messagesResBody) + actualEventIDs = append(actualEventIDs, actualEventIDsFromRequest...) + + endTokenRes := gjson.GetBytes(messagesResBody, "end") + // "`end`: If no further events are available (either because we have reached the + // start of the timeline, or because the user does not have permission to see + // any more events), this property is omitted from the response." (Matrix spec) + if !endTokenRes.Exists() { + break + } + fromToken = endTokenRes.Str + + // Or if we don't see any more events, we will assume that we reached the + // start of the room. No more to paginate. + if len(actualEventIDsFromRequest) == 0 { + break + } + } + + // Assert timeline order + assertMessagesInTimelineInOrder(t, actualEventIDs, eventIDs) + }) + } + }) +} + +func sendMessageDrafts( + t *testing.T, + roomID string, + messageDrafts []MessageDraft, +) []string { + t.Helper() + + eventIDs := make([]string, len(messageDrafts)) + for messageDraftIndex, messageDraft := range messageDrafts { + eventID := messageDraft.Sender.SendEventSynced(t, roomID, b.Event{ + Type: "m.room.message", + Content: map[string]interface{}{ + "msgtype": "m.text", + "body": messageDraft.Message, + }, + }) + eventIDs[messageDraftIndex] = eventID + } + + return eventIDs +} + +// sendAndTrackMessages sends the given message drafts to the room, keeping track of the +// new events in the list of `eventIDs` and `eventMap`. Returns the list of new event +// IDs that were sent. +func sendAndTrackMessages( + t *testing.T, + roomID string, + messageDrafts []MessageDraft, + eventIDs *[]string, + eventMap *map[string]EventInfo, +) []string { + t.Helper() + + newEventIDs := sendMessageDrafts(t, roomID, messageDrafts) + + *eventIDs = append(*eventIDs, newEventIDs...) + for i, eventID := range newEventIDs { + (*eventMap)[eventID] = EventInfo{ + MessageDraft: messageDrafts[i], + EventID: eventID, + } + } + + return newEventIDs +} + +// extractEventIDsFromMessagesResponse extracts the event IDs from the given +// `/messages` response body. +func extractEventIDsFromMessagesResponse( + t *testing.T, + messagesResBody json.RawMessage, +) []string { + t.Helper() + + wantKey := "chunk" + keyRes := gjson.GetBytes(messagesResBody, wantKey) + if !keyRes.Exists() { + t.Fatalf("extractEventIDsFromMessagesResponse: missing key '%s'", wantKey) + } + if !keyRes.IsArray() { + t.Fatalf("extractEventIDsFromMessagesResponse: key '%s' is not an array (was %s)", wantKey, keyRes.Type) + } + + var eventIDs []string + actualEvents := keyRes.Array() + for _, event := range actualEvents { + eventIDs = append(eventIDs, event.Get("event_id").Str) + } + + return eventIDs +} + +// assertMessagesTimeline asserts all events are in the `/messages` response in the +// given order. Other unrelated events can be in between. +// +// messagesResBody: from a `/messages?dir=b` request (these will be in reverse-chronological order) +// eventIDs: the list of event IDs in chronological order that we expect to see in the response +func assertMessagesInTimelineInOrder(t *testing.T, actualEventIDs []string, expectedEventIDs []string) { + t.Helper() + + relevantActualEventIDs := make([]string, 0, len(expectedEventIDs)) + for _, eventID := range actualEventIDs { + if slices.Contains(expectedEventIDs, eventID) { + relevantActualEventIDs = append(relevantActualEventIDs, eventID) + } + } + // Put them in chronological order to match the expected list + // slices.Reverse(relevantActualEvents) + slices.Reverse(relevantActualEventIDs) + + expectedLines := make([]string, len(expectedEventIDs)) + for i, expectedEventID := range expectedEventIDs { + isExpectedInActual := slices.Contains(relevantActualEventIDs, expectedEventID) + isMissingIndicatorString := " " + if !isExpectedInActual { + isMissingIndicatorString = "?" + } + + expectedLines[i] = fmt.Sprintf("%2d: %s %s", i, isMissingIndicatorString, expectedEventID) + } + expectedDiffString := strings.Join(expectedLines, "\n") + + actualLines := make([]string, len(relevantActualEventIDs)) + for actualEventIndex, actualEventID := range relevantActualEventIDs { + isActualInExpected := slices.Contains(expectedEventIDs, actualEventID) + isActualInExpectedIndicatorString := " " + if isActualInExpected { + isActualInExpectedIndicatorString = "+" + } + + expectedIndex := slices.Index(expectedEventIDs, actualEventID) + expectedIndexString := "" + if actualEventIndex != expectedIndex { + expectedDirectionString := "⬆️" + if expectedIndex > actualEventIndex { + expectedDirectionString = "⬇️" + } + + expectedIndexString = fmt.Sprintf(" (expected index %d %s)", expectedIndex, expectedDirectionString) + } + + actualLines[actualEventIndex] = fmt.Sprintf("%2d: %s %s%s", actualEventIndex, isActualInExpectedIndicatorString, actualEventID, expectedIndexString) + } + actualDiffString := strings.Join(actualLines, "\n") + + if len(relevantActualEventIDs) != len(expectedEventIDs) { + t.Fatalf("expected %d events in timeline (got %d)\nActual events ('+' = found expected items):\n%s\nExpected events ('?' = missing expected items):\n%s", + len(expectedEventIDs), len(relevantActualEventIDs), actualDiffString, expectedDiffString, + ) + } + + for i, eventID := range relevantActualEventIDs { + if eventID != expectedEventIDs[i] { + t.Fatalf("expected event ID %s (got %s) at index %d\nActual events ('+' = found expected items):\n%s\nExpected events ('?' = missing expected items):\n%s", + expectedEventIDs[i], eventID, i, actualDiffString, expectedDiffString, + ) + } + } +} + +// awaitPartialStateJoinCompletion waits until the joined room is no longer partial-stated +func awaitPartialStateJoinCompletion( + t *testing.T, room_id string, user *client.CSAPI, +) { + t.Helper() + + // Use a `/members` request to wait for the room to be un-partial stated. + // We avoid using `/sync`, as it only waits (or used to wait) for full state at + // particular events, rather than the whole room. + user.MustDo( + t, + "GET", + []string{"_matrix", "client", "v3", "rooms", room_id, "members"}, + ) + t.Logf("%s's partial state join to %s completed.", user.UserID, room_id) +} From adc26a9d45f0c97d875aaebff840fa872700fc14 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 10 Nov 2025 16:30:21 -0600 Subject: [PATCH 02/14] Better clarify history visibility --- tests/csapi/room_messages_test.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/tests/csapi/room_messages_test.go b/tests/csapi/room_messages_test.go index 5f1377b4..7473dbee 100644 --- a/tests/csapi/room_messages_test.go +++ b/tests/csapi/room_messages_test.go @@ -247,7 +247,7 @@ func TestMessagesOverFederation(t *testing.T) { LocalpartSuffix: "bob", }) - t.Run("Visible history after joining new room (backfill)", func(t *testing.T) { + t.Run("Visible shared history after joining new room (backfill)", func(t *testing.T) { // Some homeservers have different hard-limits for /messages requests (Synapse's // `MAX_LIMIT` is 1000) so we test a few different variations. for _, testCase := range []struct { @@ -273,7 +273,13 @@ func TestMessagesOverFederation(t *testing.T) { } { t.Run(testCase.name, func(t *testing.T) { // Alice creates the room - roomID := alice.MustCreateRoom(t, map[string]interface{}{"preset": "public_chat"}) + roomID := alice.MustCreateRoom(t, map[string]interface{}{ + // The `public_chat` preset includes `history_visibility: "shared"` ("Previous + // events are always accessible to newly joined members. All events in the + // room are accessible, even those sent when the member was not a part of the + // room."), which is what we want to test. + "preset": "public_chat", + }) // Keep track of the order eventIDs := make([]string, 0) From 93c33c157e0b9c5f9f75e37ce407d96b8bf5edfe Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 10 Nov 2025 16:55:34 -0600 Subject: [PATCH 03/14] Add larger test --- tests/csapi/room_messages_test.go | 60 +++++++++++++++++++++++-------- 1 file changed, 45 insertions(+), 15 deletions(-) diff --git a/tests/csapi/room_messages_test.go b/tests/csapi/room_messages_test.go index 7473dbee..357894bb 100644 --- a/tests/csapi/room_messages_test.go +++ b/tests/csapi/room_messages_test.go @@ -247,14 +247,21 @@ func TestMessagesOverFederation(t *testing.T) { LocalpartSuffix: "bob", }) + // Test to make sure all of the messages sent in the room are visible to someone else + // who joins the room later on. t.Run("Visible shared history after joining new room (backfill)", func(t *testing.T) { - // Some homeservers have different hard-limits for /messages requests (Synapse's - // `MAX_LIMIT` is 1000) so we test a few different variations. + // Some homeservers have different hard-limits for `/messages?limit=xxx` requests + // (Synapse's `MAX_LIMIT` is 1000) so we test a few different variations. for _, testCase := range []struct { name string numberOfMessagesToSend int messagesRequestLimit int }{ + // Test where the `/messages?limit=xxx` is <= than the number of messages the + // homeserver tries to backfill before responding to the `/messages` request. + // Because the Matrix spec default `limit` is 10, we can assume that this is lower + // than the number of messages that *any* homeserver will try to backfill before + // responding. { name: "`messagesRequestLimit` is lower than the number of messages backfilled (assumed)", // We send more messages than fit in one request @@ -263,13 +270,15 @@ func TestMessagesOverFederation(t *testing.T) { // the number of messages that are backfilled. messagesRequestLimit: 10, }, - // { - // name: "`messagesRequestLimit` is greater than the number of messages backfilled (in Synapse, 100)", - // // We send more messages than fit in one request - // numberOfMessagesToSend: 300, - // // We request more messages than Synapse tries to backfill at once (which is 100) - // messagesRequestLimit: 200, - // }, + // Test where the `/messages?limit=xxx` is greater than the number of messages + // Synapse tries to backfill (100) before responding to the `/messages` request. + { + name: "`messagesRequestLimit` is greater than the number of messages backfilled (in Synapse, 100)", + // We send more messages than fit in one request + numberOfMessagesToSend: 300, + // We request more messages than Synapse tries to backfill at once (which is 100) + messagesRequestLimit: 200, + }, } { t.Run(testCase.name, func(t *testing.T) { // Alice creates the room @@ -329,6 +338,19 @@ func TestMessagesOverFederation(t *testing.T) { actualEventIDsFromRequest := extractEventIDsFromMessagesResponse(t, messagesResBody) actualEventIDs = append(actualEventIDs, actualEventIDsFromRequest...) + // Make it easy to understand what each `/messages` request returned + relevantActualEventIDsFromRequest := filterEventIDs(t, actualEventIDsFromRequest, eventIDs) + firstEventIndex := -1 + lastEventIndex := -1 + if len(relevantActualEventIDsFromRequest) > 0 { + firstEventIndex = slices.Index(eventIDs, relevantActualEventIDsFromRequest[0]) + lastEventIndex = slices.Index(eventIDs, relevantActualEventIDsFromRequest[len(relevantActualEventIDsFromRequest)-1]) + } + t.Logf("Fetched %d events from the `/messages` endpoint that included events %d to %d", + len(actualEventIDsFromRequest), + firstEventIndex, lastEventIndex, + ) + endTokenRes := gjson.GetBytes(messagesResBody, "end") // "`end`: If no further events are available (either because we have reached the // start of the timeline, or because the user does not have permission to see @@ -425,12 +447,7 @@ func extractEventIDsFromMessagesResponse( return eventIDs } -// assertMessagesTimeline asserts all events are in the `/messages` response in the -// given order. Other unrelated events can be in between. -// -// messagesResBody: from a `/messages?dir=b` request (these will be in reverse-chronological order) -// eventIDs: the list of event IDs in chronological order that we expect to see in the response -func assertMessagesInTimelineInOrder(t *testing.T, actualEventIDs []string, expectedEventIDs []string) { +func filterEventIDs(t *testing.T, actualEventIDs []string, expectedEventIDs []string) []string { t.Helper() relevantActualEventIDs := make([]string, 0, len(expectedEventIDs)) @@ -439,6 +456,19 @@ func assertMessagesInTimelineInOrder(t *testing.T, actualEventIDs []string, expe relevantActualEventIDs = append(relevantActualEventIDs, eventID) } } + + return relevantActualEventIDs +} + +// assertMessagesTimeline asserts all events are in the `/messages` response in the +// given order. Other unrelated events can be in between. +// +// messagesResBody: from a `/messages?dir=b` request (these will be in reverse-chronological order) +// eventIDs: the list of event IDs in chronological order that we expect to see in the response +func assertMessagesInTimelineInOrder(t *testing.T, actualEventIDs []string, expectedEventIDs []string) { + t.Helper() + + relevantActualEventIDs := filterEventIDs(t, actualEventIDs, expectedEventIDs) // Put them in chronological order to match the expected list // slices.Reverse(relevantActualEvents) slices.Reverse(relevantActualEventIDs) From aae930188bf6318a0c07f5da9e5318cdf1aa94de Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 10 Nov 2025 17:35:12 -0600 Subject: [PATCH 04/14] Add re-join tests --- tests/csapi/room_messages_test.go | 151 ++++++++++++++++++++++++++++++ 1 file changed, 151 insertions(+) diff --git a/tests/csapi/room_messages_test.go b/tests/csapi/room_messages_test.go index 357894bb..998d910c 100644 --- a/tests/csapi/room_messages_test.go +++ b/tests/csapi/room_messages_test.go @@ -372,6 +372,157 @@ func TestMessagesOverFederation(t *testing.T) { }) } }) + + // Test to make sure all of the messages sent in the room are visible to someone else + // who *re-joins* the room. + t.Run("Visible shared history after re-joining room (backfill)", func(t *testing.T) { + // Some homeservers have different hard-limits for `/messages?limit=xxx` requests + // (Synapse's `MAX_LIMIT` is 1000) so we test a few different variations. + for _, testCase := range []struct { + name string + numberOfMessagesToSend int + messagesRequestLimit int + }{ + // Test where the `/messages?limit=xxx` is <= than the number of messages the + // homeserver tries to backfill before responding to the `/messages` request. + // Because the Matrix spec default `limit` is 10, we can assume that this is lower + // than the number of messages that *any* homeserver will try to backfill before + // responding. + { + name: "`messagesRequestLimit` is lower than the number of messages backfilled (assumed)", + // We send more messages than fit in one request + numberOfMessagesToSend: 20, + // This is the default limit in the Matrix spec so it's bound to be lower than + // the number of messages that are backfilled. + messagesRequestLimit: 10, + }, + // Test where the `/messages?limit=xxx` is greater than the number of messages + // Synapse tries to backfill (100) before responding to the `/messages` request. + // + // FIXME: This test currently doesn't work because the homeserver will backfill + // the `limit=100` and return those 100 new events + all of the old history + // leaving an invisible gap in between. So the events in the response includes the + // 100 new events, [gap], the old history from when you were previously joined. + // This is the type of scenario that MSC3871 (Gappy timelines) is trying to + // address. This is a hole in the spec as there is no way for a homeserver + // indicate gaps to the client so they can paginate the gap and cause the + // homeserver to backfill more. + // + // { + // name: "`messagesRequestLimit` is greater than the number of messages backfilled (in Synapse, 100)", + // // We send more messages than fit in one request + // numberOfMessagesToSend: 300, + // // We request more messages than Synapse tries to backfill at once (which is 100) + // messagesRequestLimit: 200, + // }, + } { + t.Run(testCase.name, func(t *testing.T) { + // Start a sync loop + _, aliceSince := alice.MustSync(t, client.SyncReq{TimeoutMillis: "0"}) + + // Alice creates the room + roomID := alice.MustCreateRoom(t, map[string]interface{}{ + // The `public_chat` preset includes `history_visibility: "shared"` ("Previous + // events are always accessible to newly joined members. All events in the + // room are accessible, even those sent when the member was not a part of the + // room."), which is what we want to test. + "preset": "public_chat", + }) + + // Bob joins the room + bob.MustJoinRoom(t, roomID, []spec.ServerName{ + deployment.GetFullyQualifiedHomeserverName(t, "hs1"), + }) + awaitPartialStateJoinCompletion(t, roomID, bob) + aliceSince = alice.MustSyncUntil(t, client.SyncReq{Since: aliceSince}, client.SyncJoinedTo(bob.UserID, roomID)) + + // Bob leaves the room + bob.MustLeaveRoom(t, roomID) + // Make sure the leave has federated + aliceSince = alice.MustSyncUntil(t, client.SyncReq{Since: aliceSince}, client.SyncLeftFrom(bob.UserID, roomID)) + + // Keep track of the order + eventIDs := make([]string, 0) + // Map from event_id to event info + eventMap := make(map[string]EventInfo) + + messageDrafts := make([]MessageDraft, 0, testCase.numberOfMessagesToSend) + for i := 0; i < testCase.numberOfMessagesToSend; i++ { + messageDrafts = append(messageDrafts, MessageDraft{alice, fmt.Sprintf("Filler message %d to increase history size.", i+1)}) + } + sendAndTrackMessages(t, roomID, messageDrafts, &eventIDs, &eventMap) + + // Bob joins the room + bob.MustJoinRoom(t, roomID, []spec.ServerName{ + deployment.GetFullyQualifiedHomeserverName(t, "hs1"), + }) + awaitPartialStateJoinCompletion(t, roomID, bob) + + // Make it easy to cross-reference the events being talked about in the logs + for eventIndex, eventID := range eventIDs { + // messageDraft := eventMap[eventID].MessageDraft + t.Logf("Message %d -> event_id=%s", eventIndex, eventID) + } + + // Keep paginating backwards until we reach the start of the room + actualEventIDs := make( + []string, + 0, + // This is a minimum capacity (there will be more events) + testCase.numberOfMessagesToSend, + ) + fromToken := "" + for { + messageQueryParams := url.Values{ + "dir": []string{"b"}, + "limit": []string{strconv.Itoa(testCase.messagesRequestLimit)}, + } + if fromToken != "" { + messageQueryParams.Set("from", fromToken) + } + + messagesRes := bob.MustDo(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, + client.WithContentType("application/json"), + client.WithQueries(messageQueryParams), + ) + messagesResBody := client.ParseJSON(t, messagesRes) + actualEventIDsFromRequest := extractEventIDsFromMessagesResponse(t, messagesResBody) + actualEventIDs = append(actualEventIDs, actualEventIDsFromRequest...) + + // Make it easy to understand what each `/messages` request returned + relevantActualEventIDsFromRequest := filterEventIDs(t, actualEventIDsFromRequest, eventIDs) + firstEventIndex := -1 + lastEventIndex := -1 + if len(relevantActualEventIDsFromRequest) > 0 { + firstEventIndex = slices.Index(eventIDs, relevantActualEventIDsFromRequest[0]) + lastEventIndex = slices.Index(eventIDs, relevantActualEventIDsFromRequest[len(relevantActualEventIDsFromRequest)-1]) + } + t.Logf("Fetched %d events from the `/messages` endpoint that included events %d to %d", + len(actualEventIDsFromRequest), + firstEventIndex, lastEventIndex, + ) + + endTokenRes := gjson.GetBytes(messagesResBody, "end") + // "`end`: If no further events are available (either because we have reached the + // start of the timeline, or because the user does not have permission to see + // any more events), this property is omitted from the response." (Matrix spec) + if !endTokenRes.Exists() { + break + } + fromToken = endTokenRes.Str + + // Or if we don't see any more events, we will assume that we reached the + // start of the room. No more to paginate. + if len(actualEventIDsFromRequest) == 0 { + break + } + } + + // Assert timeline order + assertMessagesInTimelineInOrder(t, actualEventIDs, eventIDs) + }) + } + }) } func sendMessageDrafts( From 9fef8dc82b1b40fe9e2ab059db8f7e9dc7091eea Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 10 Nov 2025 17:43:37 -0600 Subject: [PATCH 05/14] Share logic between tests --- tests/csapi/room_messages_test.go | 276 ++++++++++++------------------ 1 file changed, 113 insertions(+), 163 deletions(-) diff --git a/tests/csapi/room_messages_test.go b/tests/csapi/room_messages_test.go index 998d910c..ec478c3b 100644 --- a/tests/csapi/room_messages_test.go +++ b/tests/csapi/room_messages_test.go @@ -236,6 +236,12 @@ type EventInfo struct { EventID string } +type MessagesTestCase struct { + name string + numberOfMessagesToSend int + messagesRequestLimit int +} + func TestMessagesOverFederation(t *testing.T) { deployment := complement.Deploy(t, 2) defer deployment.Destroy(t) @@ -252,11 +258,7 @@ func TestMessagesOverFederation(t *testing.T) { t.Run("Visible shared history after joining new room (backfill)", func(t *testing.T) { // Some homeservers have different hard-limits for `/messages?limit=xxx` requests // (Synapse's `MAX_LIMIT` is 1000) so we test a few different variations. - for _, testCase := range []struct { - name string - numberOfMessagesToSend int - messagesRequestLimit int - }{ + for _, testCase := range []MessagesTestCase{ // Test where the `/messages?limit=xxx` is <= than the number of messages the // homeserver tries to backfill before responding to the `/messages` request. // Because the Matrix spec default `limit` is 10, we can assume that this is lower @@ -290,85 +292,15 @@ func TestMessagesOverFederation(t *testing.T) { "preset": "public_chat", }) - // Keep track of the order - eventIDs := make([]string, 0) - // Map from event_id to event info - eventMap := make(map[string]EventInfo) - - messageDrafts := make([]MessageDraft, 0, testCase.numberOfMessagesToSend) - for i := 0; i < testCase.numberOfMessagesToSend; i++ { - messageDrafts = append(messageDrafts, MessageDraft{alice, fmt.Sprintf("Filler message %d to increase history size.", i+1)}) - } - sendAndTrackMessages(t, roomID, messageDrafts, &eventIDs, &eventMap) - - // Bob joins the room - bob.MustJoinRoom(t, roomID, []spec.ServerName{ + // Send messages and make sure we can see them in `/messages` + _sendAndTestMessageHistory( + t, + roomID, deployment.GetFullyQualifiedHomeserverName(t, "hs1"), - }) - awaitPartialStateJoinCompletion(t, roomID, bob) - - // Make it easy to cross-reference the events being talked about in the logs - for eventIndex, eventID := range eventIDs { - // messageDraft := eventMap[eventID].MessageDraft - t.Logf("Message %d -> event_id=%s", eventIndex, eventID) - } - - // Keep paginating backwards until we reach the start of the room - actualEventIDs := make( - []string, - 0, - // This is a minimum capacity (there will be more events) - testCase.numberOfMessagesToSend, + alice, + bob, + testCase, ) - fromToken := "" - for { - messageQueryParams := url.Values{ - "dir": []string{"b"}, - "limit": []string{strconv.Itoa(testCase.messagesRequestLimit)}, - } - if fromToken != "" { - messageQueryParams.Set("from", fromToken) - } - - messagesRes := bob.MustDo(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, - client.WithContentType("application/json"), - client.WithQueries(messageQueryParams), - ) - messagesResBody := client.ParseJSON(t, messagesRes) - actualEventIDsFromRequest := extractEventIDsFromMessagesResponse(t, messagesResBody) - actualEventIDs = append(actualEventIDs, actualEventIDsFromRequest...) - - // Make it easy to understand what each `/messages` request returned - relevantActualEventIDsFromRequest := filterEventIDs(t, actualEventIDsFromRequest, eventIDs) - firstEventIndex := -1 - lastEventIndex := -1 - if len(relevantActualEventIDsFromRequest) > 0 { - firstEventIndex = slices.Index(eventIDs, relevantActualEventIDsFromRequest[0]) - lastEventIndex = slices.Index(eventIDs, relevantActualEventIDsFromRequest[len(relevantActualEventIDsFromRequest)-1]) - } - t.Logf("Fetched %d events from the `/messages` endpoint that included events %d to %d", - len(actualEventIDsFromRequest), - firstEventIndex, lastEventIndex, - ) - - endTokenRes := gjson.GetBytes(messagesResBody, "end") - // "`end`: If no further events are available (either because we have reached the - // start of the timeline, or because the user does not have permission to see - // any more events), this property is omitted from the response." (Matrix spec) - if !endTokenRes.Exists() { - break - } - fromToken = endTokenRes.Str - - // Or if we don't see any more events, we will assume that we reached the - // start of the room. No more to paginate. - if len(actualEventIDsFromRequest) == 0 { - break - } - } - - // Assert timeline order - assertMessagesInTimelineInOrder(t, actualEventIDs, eventIDs) }) } }) @@ -378,11 +310,7 @@ func TestMessagesOverFederation(t *testing.T) { t.Run("Visible shared history after re-joining room (backfill)", func(t *testing.T) { // Some homeservers have different hard-limits for `/messages?limit=xxx` requests // (Synapse's `MAX_LIMIT` is 1000) so we test a few different variations. - for _, testCase := range []struct { - name string - numberOfMessagesToSend int - messagesRequestLimit int - }{ + for _, testCase := range []MessagesTestCase{ // Test where the `/messages?limit=xxx` is <= than the number of messages the // homeserver tries to backfill before responding to the `/messages` request. // Because the Matrix spec default `limit` is 10, we can assume that this is lower @@ -441,88 +369,110 @@ func TestMessagesOverFederation(t *testing.T) { // Make sure the leave has federated aliceSince = alice.MustSyncUntil(t, client.SyncReq{Since: aliceSince}, client.SyncLeftFrom(bob.UserID, roomID)) - // Keep track of the order - eventIDs := make([]string, 0) - // Map from event_id to event info - eventMap := make(map[string]EventInfo) + // Send messages and make sure we can see them in `/messages` + _sendAndTestMessageHistory( + t, + roomID, + deployment.GetFullyQualifiedHomeserverName(t, "hs1"), + alice, + bob, + testCase, + ) + }) + } + }) +} - messageDrafts := make([]MessageDraft, 0, testCase.numberOfMessagesToSend) - for i := 0; i < testCase.numberOfMessagesToSend; i++ { - messageDrafts = append(messageDrafts, MessageDraft{alice, fmt.Sprintf("Filler message %d to increase history size.", i+1)}) - } - sendAndTrackMessages(t, roomID, messageDrafts, &eventIDs, &eventMap) +// 1. Alice sends a bunch of messages into the room +// 2. Bob joins the room +// 3. Bob paginates backwards through the room history until he reaches the start of the room +// 4. Assert that Bob sees all of the messages that Alice sent in the correct order +func _sendAndTestMessageHistory( + t *testing.T, + roomID string, + serverToJoinVia spec.ServerName, + alice, bob *client.CSAPI, + testCase MessagesTestCase, +) { + // Keep track of the order + eventIDs := make([]string, 0) + // Map from event_id to event info + eventMap := make(map[string]EventInfo) + + messageDrafts := make([]MessageDraft, 0, testCase.numberOfMessagesToSend) + for i := 0; i < testCase.numberOfMessagesToSend; i++ { + messageDrafts = append(messageDrafts, MessageDraft{alice, fmt.Sprintf("Filler message %d to increase history size.", i+1)}) + } + sendAndTrackMessages(t, roomID, messageDrafts, &eventIDs, &eventMap) - // Bob joins the room - bob.MustJoinRoom(t, roomID, []spec.ServerName{ - deployment.GetFullyQualifiedHomeserverName(t, "hs1"), - }) - awaitPartialStateJoinCompletion(t, roomID, bob) + // Bob joins the room + bob.MustJoinRoom(t, roomID, []spec.ServerName{ + serverToJoinVia, + }) + awaitPartialStateJoinCompletion(t, roomID, bob) - // Make it easy to cross-reference the events being talked about in the logs - for eventIndex, eventID := range eventIDs { - // messageDraft := eventMap[eventID].MessageDraft - t.Logf("Message %d -> event_id=%s", eventIndex, eventID) - } + // Make it easy to cross-reference the events being talked about in the logs + for eventIndex, eventID := range eventIDs { + // messageDraft := eventMap[eventID].MessageDraft + t.Logf("Message %d -> event_id=%s", eventIndex, eventID) + } - // Keep paginating backwards until we reach the start of the room - actualEventIDs := make( - []string, - 0, - // This is a minimum capacity (there will be more events) - testCase.numberOfMessagesToSend, - ) - fromToken := "" - for { - messageQueryParams := url.Values{ - "dir": []string{"b"}, - "limit": []string{strconv.Itoa(testCase.messagesRequestLimit)}, - } - if fromToken != "" { - messageQueryParams.Set("from", fromToken) - } - - messagesRes := bob.MustDo(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, - client.WithContentType("application/json"), - client.WithQueries(messageQueryParams), - ) - messagesResBody := client.ParseJSON(t, messagesRes) - actualEventIDsFromRequest := extractEventIDsFromMessagesResponse(t, messagesResBody) - actualEventIDs = append(actualEventIDs, actualEventIDsFromRequest...) - - // Make it easy to understand what each `/messages` request returned - relevantActualEventIDsFromRequest := filterEventIDs(t, actualEventIDsFromRequest, eventIDs) - firstEventIndex := -1 - lastEventIndex := -1 - if len(relevantActualEventIDsFromRequest) > 0 { - firstEventIndex = slices.Index(eventIDs, relevantActualEventIDsFromRequest[0]) - lastEventIndex = slices.Index(eventIDs, relevantActualEventIDsFromRequest[len(relevantActualEventIDsFromRequest)-1]) - } - t.Logf("Fetched %d events from the `/messages` endpoint that included events %d to %d", - len(actualEventIDsFromRequest), - firstEventIndex, lastEventIndex, - ) - - endTokenRes := gjson.GetBytes(messagesResBody, "end") - // "`end`: If no further events are available (either because we have reached the - // start of the timeline, or because the user does not have permission to see - // any more events), this property is omitted from the response." (Matrix spec) - if !endTokenRes.Exists() { - break - } - fromToken = endTokenRes.Str - - // Or if we don't see any more events, we will assume that we reached the - // start of the room. No more to paginate. - if len(actualEventIDsFromRequest) == 0 { - break - } - } + // Keep paginating backwards until we reach the start of the room + actualEventIDs := make( + []string, + 0, + // This is a minimum capacity (there will be more events) + testCase.numberOfMessagesToSend, + ) + fromToken := "" + for { + messageQueryParams := url.Values{ + "dir": []string{"b"}, + "limit": []string{strconv.Itoa(testCase.messagesRequestLimit)}, + } + if fromToken != "" { + messageQueryParams.Set("from", fromToken) + } - // Assert timeline order - assertMessagesInTimelineInOrder(t, actualEventIDs, eventIDs) - }) + messagesRes := bob.MustDo(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, + client.WithContentType("application/json"), + client.WithQueries(messageQueryParams), + ) + messagesResBody := client.ParseJSON(t, messagesRes) + actualEventIDsFromRequest := extractEventIDsFromMessagesResponse(t, messagesResBody) + actualEventIDs = append(actualEventIDs, actualEventIDsFromRequest...) + + // Make it easy to understand what each `/messages` request returned + relevantActualEventIDsFromRequest := filterEventIDs(t, actualEventIDsFromRequest, eventIDs) + firstEventIndex := -1 + lastEventIndex := -1 + if len(relevantActualEventIDsFromRequest) > 0 { + firstEventIndex = slices.Index(eventIDs, relevantActualEventIDsFromRequest[0]) + lastEventIndex = slices.Index(eventIDs, relevantActualEventIDsFromRequest[len(relevantActualEventIDsFromRequest)-1]) } - }) + t.Logf("Fetched %d events from the `/messages` endpoint that included events %d to %d", + len(actualEventIDsFromRequest), + firstEventIndex, lastEventIndex, + ) + + endTokenRes := gjson.GetBytes(messagesResBody, "end") + // "`end`: If no further events are available (either because we have reached the + // start of the timeline, or because the user does not have permission to see + // any more events), this property is omitted from the response." (Matrix spec) + if !endTokenRes.Exists() { + break + } + fromToken = endTokenRes.Str + + // Or if we don't see any more events, we will assume that we reached the + // start of the room. No more to paginate. + if len(actualEventIDsFromRequest) == 0 { + break + } + } + + // Assert timeline order + assertMessagesInTimelineInOrder(t, actualEventIDs, eventIDs) } func sendMessageDrafts( From afc9c9aa285f28554d774f8e83261d830bbe06bd Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 10 Nov 2025 18:05:01 -0600 Subject: [PATCH 06/14] Skip re-join tests for Dendrite See https://github.com/matrix-org/complement/pull/816#discussion_r2512366312 --- tests/csapi/room_messages_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/csapi/room_messages_test.go b/tests/csapi/room_messages_test.go index ec478c3b..2f52ede0 100644 --- a/tests/csapi/room_messages_test.go +++ b/tests/csapi/room_messages_test.go @@ -308,6 +308,9 @@ func TestMessagesOverFederation(t *testing.T) { // Test to make sure all of the messages sent in the room are visible to someone else // who *re-joins* the room. t.Run("Visible shared history after re-joining room (backfill)", func(t *testing.T) { + // FIXME: Dendrite doesn't handle backfill well on re-join yet + runtime.SkipIf(t, runtime.Dendrite) + // Some homeservers have different hard-limits for `/messages?limit=xxx` requests // (Synapse's `MAX_LIMIT` is 1000) so we test a few different variations. for _, testCase := range []MessagesTestCase{ From bb3a7a168aad67f39c437fb3bef38ba23fb60fa7 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 10 Nov 2025 18:16:04 -0600 Subject: [PATCH 07/14] Skip Dendrite for all cases --- tests/csapi/room_messages_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/csapi/room_messages_test.go b/tests/csapi/room_messages_test.go index 2f52ede0..6ed6f801 100644 --- a/tests/csapi/room_messages_test.go +++ b/tests/csapi/room_messages_test.go @@ -256,6 +256,9 @@ func TestMessagesOverFederation(t *testing.T) { // Test to make sure all of the messages sent in the room are visible to someone else // who joins the room later on. t.Run("Visible shared history after joining new room (backfill)", func(t *testing.T) { + // FIXME: Dendrite doesn't handle backfill here for whatever reason + runtime.SkipIf(t, runtime.Dendrite) + // Some homeservers have different hard-limits for `/messages?limit=xxx` requests // (Synapse's `MAX_LIMIT` is 1000) so we test a few different variations. for _, testCase := range []MessagesTestCase{ From 139b1895361118a6fadfa6ed59a60152eccf720d Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 13 Nov 2025 14:38:29 -0600 Subject: [PATCH 08/14] Shared `c.MustAwaitPartialStateJoinCompletion` --- client/client.go | 15 ++++++ tests/csapi/room_messages_test.go | 21 +-------- ...federation_room_join_partial_state_test.go | 47 ++++++------------- 3 files changed, 32 insertions(+), 51 deletions(-) diff --git a/client/client.go b/client/client.go index e43ebce4..137a9735 100644 --- a/client/client.go +++ b/client/client.go @@ -214,6 +214,21 @@ func (c *CSAPI) JoinRoom(t ct.TestLike, roomIDOrAlias string, serverNames []spec ) } +// MustAwaitPartialStateJoinCompletion waits until the joined room is no longer partial-stated +func (c *CSAPI) MustAwaitPartialStateJoinCompletion(t ct.TestLike, room_id string) { + t.Helper() + + // Use a `/members` request to wait for the room to be un-partial stated. + // We avoid using `/sync`, as it only waits (or used to wait) for full state at + // particular events, rather than the whole room. + c.MustDo( + t, + "GET", + []string{"_matrix", "client", "v3", "rooms", room_id, "members"}, + ) + t.Logf("%s's partial state join to %s completed.", c.UserID, room_id) +} + // MustLeaveRoom leaves the room ID, else fails the test. func (c *CSAPI) MustLeaveRoom(t ct.TestLike, roomID string) { res := c.LeaveRoom(t, roomID) diff --git a/tests/csapi/room_messages_test.go b/tests/csapi/room_messages_test.go index 6ed6f801..409f863a 100644 --- a/tests/csapi/room_messages_test.go +++ b/tests/csapi/room_messages_test.go @@ -367,7 +367,7 @@ func TestMessagesOverFederation(t *testing.T) { bob.MustJoinRoom(t, roomID, []spec.ServerName{ deployment.GetFullyQualifiedHomeserverName(t, "hs1"), }) - awaitPartialStateJoinCompletion(t, roomID, bob) + bob.MustAwaitPartialStateJoinCompletion(t, roomID) aliceSince = alice.MustSyncUntil(t, client.SyncReq{Since: aliceSince}, client.SyncJoinedTo(bob.UserID, roomID)) // Bob leaves the room @@ -415,7 +415,7 @@ func _sendAndTestMessageHistory( bob.MustJoinRoom(t, roomID, []spec.ServerName{ serverToJoinVia, }) - awaitPartialStateJoinCompletion(t, roomID, bob) + bob.MustAwaitPartialStateJoinCompletion(t, roomID) // Make it easy to cross-reference the events being talked about in the logs for eventIndex, eventID := range eventIDs { @@ -629,20 +629,3 @@ func assertMessagesInTimelineInOrder(t *testing.T, actualEventIDs []string, expe } } } - -// awaitPartialStateJoinCompletion waits until the joined room is no longer partial-stated -func awaitPartialStateJoinCompletion( - t *testing.T, room_id string, user *client.CSAPI, -) { - t.Helper() - - // Use a `/members` request to wait for the room to be un-partial stated. - // We avoid using `/sync`, as it only waits (or used to wait) for full state at - // particular events, rather than the whole room. - user.MustDo( - t, - "GET", - []string{"_matrix", "client", "v3", "rooms", room_id, "members"}, - ) - t.Logf("%s's partial state join to %s completed.", user.UserID, room_id) -} diff --git a/tests/msc3902/federation_room_join_partial_state_test.go b/tests/msc3902/federation_room_join_partial_state_test.go index 9e11b163..8b229b59 100644 --- a/tests/msc3902/federation_room_join_partial_state_test.go +++ b/tests/msc3902/federation_room_join_partial_state_test.go @@ -372,7 +372,7 @@ func TestPartialStateJoin(t *testing.T) { // release the federation /state response psjResult.FinishStateRequest() - awaitPartialStateJoinCompletion(t, serverRoom, alice) + alice.MustAwaitPartialStateJoinCompletion(t, serverRoom.RoomID) t.Log("8. Have Alice eager-sync. She should see the remote room.") response, eagerSyncToken = alice.MustSync(t, getEagerSyncReq()) @@ -2259,7 +2259,7 @@ func TestPartialStateJoin(t *testing.T) { // Finish the partial state join. psjResult.FinishStateRequest() - awaitPartialStateJoinCompletion(t, room, alice) + alice.MustAwaitPartialStateJoinCompletion(t, room.RoomID) // Both homeservers should still receive device list updates. renameDevice(t, alice, "A new device name 2") @@ -2309,7 +2309,7 @@ func TestPartialStateJoin(t *testing.T) { // Finish the partial state join. psjResult.FinishStateRequest() - awaitPartialStateJoinCompletion(t, room, alice) + alice.MustAwaitPartialStateJoinCompletion(t, room.RoomID) // Both homeservers should still receive device list updates. renameDevice(t, alice, "A new device name 3") @@ -2358,7 +2358,7 @@ func TestPartialStateJoin(t *testing.T) { // Finish the partial state join. psjResult.FinishStateRequest() - awaitPartialStateJoinCompletion(t, room, alice) + alice.MustAwaitPartialStateJoinCompletion(t, room.RoomID) // @elsie:server2 should no longer receive device list updates. renameDevice(t, alice, "A new device name 2") @@ -2510,7 +2510,7 @@ func TestPartialStateJoin(t *testing.T) { // Finish the partial state join. psjResult.FinishStateRequest() - awaitPartialStateJoinCompletion(t, room, alice) + alice.MustAwaitPartialStateJoinCompletion(t, room.RoomID) // @elsie:server2 must receive missed device list updates. mustReceiveDeviceListUpdate(t, deviceListUpdateChannel2, "@elsie did not receive missed device list update.") @@ -2946,7 +2946,7 @@ func TestPartialStateJoin(t *testing.T) { // Finish the partial state join. psjResult.FinishStateRequest() - awaitPartialStateJoinCompletion(t, room, alice) + alice.MustAwaitPartialStateJoinCompletion(t, room.RoomID) // @charlie's device list update ought to have arrived by now. mustSyncUntilDeviceListsHas(t, alice, syncToken, "changed", server.UserID("charlie")) @@ -3045,7 +3045,7 @@ func TestPartialStateJoin(t *testing.T) { // Finish the partial state join. psjResult.FinishStateRequest() - awaitPartialStateJoinCompletion(t, room, alice) + alice.MustAwaitPartialStateJoinCompletion(t, room.RoomID) // @elsie's device list ought to still be cached. mustQueryKeysWithoutFederationRequest(t, alice, userDevicesChannel, server.UserID("elsie")) @@ -3172,7 +3172,7 @@ func TestPartialStateJoin(t *testing.T) { t.Fatalf("TODO: fail the partial state join") psjResult.FinishStateRequest() - awaitPartialStateJoinCompletion(t, room, alice) + alice.MustAwaitPartialStateJoinCompletion(t, room.RoomID) // hs1 should no longer be tracking elsie's device list; subsequent // key requests from alice require a federation request. @@ -3298,7 +3298,7 @@ func TestPartialStateJoin(t *testing.T) { // Finish the partial state join. // The homeserver under test will discover that @elsie was actually not in the room. psjResult.FinishStateRequest() - awaitPartialStateJoinCompletion(t, room, alice) + alice.MustAwaitPartialStateJoinCompletion(t, room.RoomID) // @elsie's device list ought to no longer be cached. // `device_lists.left` is not working yet: https://github.com/matrix-org/synapse/issues/13886 @@ -3342,7 +3342,7 @@ func TestPartialStateJoin(t *testing.T) { // The homeserver under test will discover that there was a period where @elsie was // actually not in the room. psjResult.FinishStateRequest() - awaitPartialStateJoinCompletion(t, room, alice) + alice.MustAwaitPartialStateJoinCompletion(t, room.RoomID) // @elsie's device list ought to have been flushed from the cache. mustQueryKeysWithFederationRequest(t, alice, userDevicesChannel, server.UserID("elsie")) @@ -3374,7 +3374,7 @@ func TestPartialStateJoin(t *testing.T) { // Finish the partial state join. // The homeserver under test will discover that @elsie was actually not in the room. psjResult.FinishStateRequest() - awaitPartialStateJoinCompletion(t, room, alice) + alice.MustAwaitPartialStateJoinCompletion(t, room.RoomID) // `device_lists.left` is not working yet: https://github.com/matrix-org/synapse/issues/13886 // mustSyncUntilDeviceListsHas(t, alice, syncToken, "left", server.UserID("elsie")) @@ -3432,7 +3432,7 @@ func TestPartialStateJoin(t *testing.T) { // The homeserver under test will discover that @elsie was actually not in the room, and // so did not share a room the whole time. psjResult.FinishStateRequest() - awaitPartialStateJoinCompletion(t, room, alice) + alice.MustAwaitPartialStateJoinCompletion(t, room.RoomID) // @elsie's device list ought to be evicted from the cache. mustSyncUntilDeviceListsHas(t, alice, syncToken, "changed", server.UserID("elsie")) @@ -4027,7 +4027,7 @@ func TestPartialStateJoin(t *testing.T) { // finish syncing the state psjResult.FinishStateRequest() - awaitPartialStateJoinCompletion(t, psjResult.ServerRoom, terry) + terry.MustAwaitPartialStateJoinCompletion(t, psjResult.ServerRoom.RoomID) // The number of joined users should now be 3: one local user (terry) and two remote (charlie and derek) assertPublicRoomDirectoryMemberCountEquals(t, 3) @@ -4089,7 +4089,7 @@ func TestPartialStateJoin(t *testing.T) { // finish syncing the state psjResult.FinishStateRequest() - awaitPartialStateJoinCompletion(t, psjResult.ServerRoom, rocky) + rocky.MustAwaitPartialStateJoinCompletion(t, psjResult.ServerRoom.RoomID) assertUserInDirectory(t, "rod", server.UserID("rod")) assertUserInDirectory(t, "todd", server.UserID("todd")) @@ -4298,23 +4298,6 @@ func awaitEventArrival(t *testing.T, timeout time.Duration, alice *client.CSAPI, t.Logf("Alice successfully observed event %s via /event", eventID) } -// awaitPartialStateJoinCompletion waits until the joined room is no longer partial-stated -func awaitPartialStateJoinCompletion( - t *testing.T, room *federation.ServerRoom, user *client.CSAPI, -) { - t.Helper() - - // Use a `/members` request to wait for the room to be un-partial stated. - // We avoid using `/sync`, as it only waits (or used to wait) for full state at - // particular events, rather than the whole room. - user.MustDo( - t, - "GET", - []string{"_matrix", "client", "v3", "rooms", room.RoomID, "members"}, - ) - t.Logf("%s's partial state join to %s completed.", user.UserID, room.RoomID) -} - // buildLazyLoadingSyncFilter constructs a json-marshalled filter suitable the 'Filter' field of a client.SyncReq func buildLazyLoadingSyncFilter(timelineOptions map[string]interface{}) string { timelineFilter := map[string]interface{}{ @@ -4415,7 +4398,7 @@ func (psj *partialStateJoinResult) Destroy(t *testing.T) { // considered offline and interfere with subsequent tests. t.Log("Cleaning up after test...") - awaitPartialStateJoinCompletion(t, psj.ServerRoom, psj.User) + psj.User.MustAwaitPartialStateJoinCompletion(t, psj.ServerRoom.RoomID) // The caller is about to tear down the Complement homeserver. Leave the room, so // that the homeserver under test stops sending it presence updates. From c6ce4466f446f7c8603c753bc6dfcecf970e0252 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 13 Nov 2025 14:40:14 -0600 Subject: [PATCH 09/14] Revert "Shared `c.MustAwaitPartialStateJoinCompletion`" This reverts commit 139b1895361118a6fadfa6ed59a60152eccf720d. --- client/client.go | 15 ------ tests/csapi/room_messages_test.go | 21 ++++++++- ...federation_room_join_partial_state_test.go | 47 +++++++++++++------ 3 files changed, 51 insertions(+), 32 deletions(-) diff --git a/client/client.go b/client/client.go index 137a9735..e43ebce4 100644 --- a/client/client.go +++ b/client/client.go @@ -214,21 +214,6 @@ func (c *CSAPI) JoinRoom(t ct.TestLike, roomIDOrAlias string, serverNames []spec ) } -// MustAwaitPartialStateJoinCompletion waits until the joined room is no longer partial-stated -func (c *CSAPI) MustAwaitPartialStateJoinCompletion(t ct.TestLike, room_id string) { - t.Helper() - - // Use a `/members` request to wait for the room to be un-partial stated. - // We avoid using `/sync`, as it only waits (or used to wait) for full state at - // particular events, rather than the whole room. - c.MustDo( - t, - "GET", - []string{"_matrix", "client", "v3", "rooms", room_id, "members"}, - ) - t.Logf("%s's partial state join to %s completed.", c.UserID, room_id) -} - // MustLeaveRoom leaves the room ID, else fails the test. func (c *CSAPI) MustLeaveRoom(t ct.TestLike, roomID string) { res := c.LeaveRoom(t, roomID) diff --git a/tests/csapi/room_messages_test.go b/tests/csapi/room_messages_test.go index 409f863a..6ed6f801 100644 --- a/tests/csapi/room_messages_test.go +++ b/tests/csapi/room_messages_test.go @@ -367,7 +367,7 @@ func TestMessagesOverFederation(t *testing.T) { bob.MustJoinRoom(t, roomID, []spec.ServerName{ deployment.GetFullyQualifiedHomeserverName(t, "hs1"), }) - bob.MustAwaitPartialStateJoinCompletion(t, roomID) + awaitPartialStateJoinCompletion(t, roomID, bob) aliceSince = alice.MustSyncUntil(t, client.SyncReq{Since: aliceSince}, client.SyncJoinedTo(bob.UserID, roomID)) // Bob leaves the room @@ -415,7 +415,7 @@ func _sendAndTestMessageHistory( bob.MustJoinRoom(t, roomID, []spec.ServerName{ serverToJoinVia, }) - bob.MustAwaitPartialStateJoinCompletion(t, roomID) + awaitPartialStateJoinCompletion(t, roomID, bob) // Make it easy to cross-reference the events being talked about in the logs for eventIndex, eventID := range eventIDs { @@ -629,3 +629,20 @@ func assertMessagesInTimelineInOrder(t *testing.T, actualEventIDs []string, expe } } } + +// awaitPartialStateJoinCompletion waits until the joined room is no longer partial-stated +func awaitPartialStateJoinCompletion( + t *testing.T, room_id string, user *client.CSAPI, +) { + t.Helper() + + // Use a `/members` request to wait for the room to be un-partial stated. + // We avoid using `/sync`, as it only waits (or used to wait) for full state at + // particular events, rather than the whole room. + user.MustDo( + t, + "GET", + []string{"_matrix", "client", "v3", "rooms", room_id, "members"}, + ) + t.Logf("%s's partial state join to %s completed.", user.UserID, room_id) +} diff --git a/tests/msc3902/federation_room_join_partial_state_test.go b/tests/msc3902/federation_room_join_partial_state_test.go index 8b229b59..9e11b163 100644 --- a/tests/msc3902/federation_room_join_partial_state_test.go +++ b/tests/msc3902/federation_room_join_partial_state_test.go @@ -372,7 +372,7 @@ func TestPartialStateJoin(t *testing.T) { // release the federation /state response psjResult.FinishStateRequest() - alice.MustAwaitPartialStateJoinCompletion(t, serverRoom.RoomID) + awaitPartialStateJoinCompletion(t, serverRoom, alice) t.Log("8. Have Alice eager-sync. She should see the remote room.") response, eagerSyncToken = alice.MustSync(t, getEagerSyncReq()) @@ -2259,7 +2259,7 @@ func TestPartialStateJoin(t *testing.T) { // Finish the partial state join. psjResult.FinishStateRequest() - alice.MustAwaitPartialStateJoinCompletion(t, room.RoomID) + awaitPartialStateJoinCompletion(t, room, alice) // Both homeservers should still receive device list updates. renameDevice(t, alice, "A new device name 2") @@ -2309,7 +2309,7 @@ func TestPartialStateJoin(t *testing.T) { // Finish the partial state join. psjResult.FinishStateRequest() - alice.MustAwaitPartialStateJoinCompletion(t, room.RoomID) + awaitPartialStateJoinCompletion(t, room, alice) // Both homeservers should still receive device list updates. renameDevice(t, alice, "A new device name 3") @@ -2358,7 +2358,7 @@ func TestPartialStateJoin(t *testing.T) { // Finish the partial state join. psjResult.FinishStateRequest() - alice.MustAwaitPartialStateJoinCompletion(t, room.RoomID) + awaitPartialStateJoinCompletion(t, room, alice) // @elsie:server2 should no longer receive device list updates. renameDevice(t, alice, "A new device name 2") @@ -2510,7 +2510,7 @@ func TestPartialStateJoin(t *testing.T) { // Finish the partial state join. psjResult.FinishStateRequest() - alice.MustAwaitPartialStateJoinCompletion(t, room.RoomID) + awaitPartialStateJoinCompletion(t, room, alice) // @elsie:server2 must receive missed device list updates. mustReceiveDeviceListUpdate(t, deviceListUpdateChannel2, "@elsie did not receive missed device list update.") @@ -2946,7 +2946,7 @@ func TestPartialStateJoin(t *testing.T) { // Finish the partial state join. psjResult.FinishStateRequest() - alice.MustAwaitPartialStateJoinCompletion(t, room.RoomID) + awaitPartialStateJoinCompletion(t, room, alice) // @charlie's device list update ought to have arrived by now. mustSyncUntilDeviceListsHas(t, alice, syncToken, "changed", server.UserID("charlie")) @@ -3045,7 +3045,7 @@ func TestPartialStateJoin(t *testing.T) { // Finish the partial state join. psjResult.FinishStateRequest() - alice.MustAwaitPartialStateJoinCompletion(t, room.RoomID) + awaitPartialStateJoinCompletion(t, room, alice) // @elsie's device list ought to still be cached. mustQueryKeysWithoutFederationRequest(t, alice, userDevicesChannel, server.UserID("elsie")) @@ -3172,7 +3172,7 @@ func TestPartialStateJoin(t *testing.T) { t.Fatalf("TODO: fail the partial state join") psjResult.FinishStateRequest() - alice.MustAwaitPartialStateJoinCompletion(t, room.RoomID) + awaitPartialStateJoinCompletion(t, room, alice) // hs1 should no longer be tracking elsie's device list; subsequent // key requests from alice require a federation request. @@ -3298,7 +3298,7 @@ func TestPartialStateJoin(t *testing.T) { // Finish the partial state join. // The homeserver under test will discover that @elsie was actually not in the room. psjResult.FinishStateRequest() - alice.MustAwaitPartialStateJoinCompletion(t, room.RoomID) + awaitPartialStateJoinCompletion(t, room, alice) // @elsie's device list ought to no longer be cached. // `device_lists.left` is not working yet: https://github.com/matrix-org/synapse/issues/13886 @@ -3342,7 +3342,7 @@ func TestPartialStateJoin(t *testing.T) { // The homeserver under test will discover that there was a period where @elsie was // actually not in the room. psjResult.FinishStateRequest() - alice.MustAwaitPartialStateJoinCompletion(t, room.RoomID) + awaitPartialStateJoinCompletion(t, room, alice) // @elsie's device list ought to have been flushed from the cache. mustQueryKeysWithFederationRequest(t, alice, userDevicesChannel, server.UserID("elsie")) @@ -3374,7 +3374,7 @@ func TestPartialStateJoin(t *testing.T) { // Finish the partial state join. // The homeserver under test will discover that @elsie was actually not in the room. psjResult.FinishStateRequest() - alice.MustAwaitPartialStateJoinCompletion(t, room.RoomID) + awaitPartialStateJoinCompletion(t, room, alice) // `device_lists.left` is not working yet: https://github.com/matrix-org/synapse/issues/13886 // mustSyncUntilDeviceListsHas(t, alice, syncToken, "left", server.UserID("elsie")) @@ -3432,7 +3432,7 @@ func TestPartialStateJoin(t *testing.T) { // The homeserver under test will discover that @elsie was actually not in the room, and // so did not share a room the whole time. psjResult.FinishStateRequest() - alice.MustAwaitPartialStateJoinCompletion(t, room.RoomID) + awaitPartialStateJoinCompletion(t, room, alice) // @elsie's device list ought to be evicted from the cache. mustSyncUntilDeviceListsHas(t, alice, syncToken, "changed", server.UserID("elsie")) @@ -4027,7 +4027,7 @@ func TestPartialStateJoin(t *testing.T) { // finish syncing the state psjResult.FinishStateRequest() - terry.MustAwaitPartialStateJoinCompletion(t, psjResult.ServerRoom.RoomID) + awaitPartialStateJoinCompletion(t, psjResult.ServerRoom, terry) // The number of joined users should now be 3: one local user (terry) and two remote (charlie and derek) assertPublicRoomDirectoryMemberCountEquals(t, 3) @@ -4089,7 +4089,7 @@ func TestPartialStateJoin(t *testing.T) { // finish syncing the state psjResult.FinishStateRequest() - rocky.MustAwaitPartialStateJoinCompletion(t, psjResult.ServerRoom.RoomID) + awaitPartialStateJoinCompletion(t, psjResult.ServerRoom, rocky) assertUserInDirectory(t, "rod", server.UserID("rod")) assertUserInDirectory(t, "todd", server.UserID("todd")) @@ -4298,6 +4298,23 @@ func awaitEventArrival(t *testing.T, timeout time.Duration, alice *client.CSAPI, t.Logf("Alice successfully observed event %s via /event", eventID) } +// awaitPartialStateJoinCompletion waits until the joined room is no longer partial-stated +func awaitPartialStateJoinCompletion( + t *testing.T, room *federation.ServerRoom, user *client.CSAPI, +) { + t.Helper() + + // Use a `/members` request to wait for the room to be un-partial stated. + // We avoid using `/sync`, as it only waits (or used to wait) for full state at + // particular events, rather than the whole room. + user.MustDo( + t, + "GET", + []string{"_matrix", "client", "v3", "rooms", room.RoomID, "members"}, + ) + t.Logf("%s's partial state join to %s completed.", user.UserID, room.RoomID) +} + // buildLazyLoadingSyncFilter constructs a json-marshalled filter suitable the 'Filter' field of a client.SyncReq func buildLazyLoadingSyncFilter(timelineOptions map[string]interface{}) string { timelineFilter := map[string]interface{}{ @@ -4398,7 +4415,7 @@ func (psj *partialStateJoinResult) Destroy(t *testing.T) { // considered offline and interfere with subsequent tests. t.Log("Cleaning up after test...") - psj.User.MustAwaitPartialStateJoinCompletion(t, psj.ServerRoom.RoomID) + awaitPartialStateJoinCompletion(t, psj.ServerRoom, psj.User) // The caller is about to tear down the Complement homeserver. Leave the room, so // that the homeserver under test stops sending it presence updates. From 85791c3653660602e1e81fc566717cd128468a25 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 13 Nov 2025 14:46:19 -0600 Subject: [PATCH 10/14] Remove `awaitPartialStateJoinCompletion` See https://github.com/matrix-org/complement/pull/816#discussion_r2520215649 --- tests/csapi/room_messages_test.go | 19 ------------------- 1 file changed, 19 deletions(-) diff --git a/tests/csapi/room_messages_test.go b/tests/csapi/room_messages_test.go index 6ed6f801..713d9ff2 100644 --- a/tests/csapi/room_messages_test.go +++ b/tests/csapi/room_messages_test.go @@ -367,7 +367,6 @@ func TestMessagesOverFederation(t *testing.T) { bob.MustJoinRoom(t, roomID, []spec.ServerName{ deployment.GetFullyQualifiedHomeserverName(t, "hs1"), }) - awaitPartialStateJoinCompletion(t, roomID, bob) aliceSince = alice.MustSyncUntil(t, client.SyncReq{Since: aliceSince}, client.SyncJoinedTo(bob.UserID, roomID)) // Bob leaves the room @@ -415,7 +414,6 @@ func _sendAndTestMessageHistory( bob.MustJoinRoom(t, roomID, []spec.ServerName{ serverToJoinVia, }) - awaitPartialStateJoinCompletion(t, roomID, bob) // Make it easy to cross-reference the events being talked about in the logs for eventIndex, eventID := range eventIDs { @@ -629,20 +627,3 @@ func assertMessagesInTimelineInOrder(t *testing.T, actualEventIDs []string, expe } } } - -// awaitPartialStateJoinCompletion waits until the joined room is no longer partial-stated -func awaitPartialStateJoinCompletion( - t *testing.T, room_id string, user *client.CSAPI, -) { - t.Helper() - - // Use a `/members` request to wait for the room to be un-partial stated. - // We avoid using `/sync`, as it only waits (or used to wait) for full state at - // particular events, rather than the whole room. - user.MustDo( - t, - "GET", - []string{"_matrix", "client", "v3", "rooms", room_id, "members"}, - ) - t.Logf("%s's partial state join to %s completed.", user.UserID, room_id) -} From 3a1e0cb6097a43b6334b37e08e7c58e704b1ef6d Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 18 Nov 2025 13:50:37 -0600 Subject: [PATCH 11/14] Remove commented out code See https://github.com/matrix-org/complement/pull/816#discussion_r2539053398 --- tests/csapi/room_messages_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/csapi/room_messages_test.go b/tests/csapi/room_messages_test.go index 713d9ff2..73813e37 100644 --- a/tests/csapi/room_messages_test.go +++ b/tests/csapi/room_messages_test.go @@ -417,7 +417,6 @@ func _sendAndTestMessageHistory( // Make it easy to cross-reference the events being talked about in the logs for eventIndex, eventID := range eventIDs { - // messageDraft := eventMap[eventID].MessageDraft t.Logf("Message %d -> event_id=%s", eventIndex, eventID) } From 0303f845c3d012444e7e149a68a7f2e2ca35b04f Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 18 Nov 2025 13:51:09 -0600 Subject: [PATCH 12/14] Remove outdated commented code See https://github.com/matrix-org/complement/pull/816#discussion_r2539072192 --- tests/csapi/room_messages_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/csapi/room_messages_test.go b/tests/csapi/room_messages_test.go index 73813e37..6e22eb1e 100644 --- a/tests/csapi/room_messages_test.go +++ b/tests/csapi/room_messages_test.go @@ -574,7 +574,6 @@ func assertMessagesInTimelineInOrder(t *testing.T, actualEventIDs []string, expe relevantActualEventIDs := filterEventIDs(t, actualEventIDs, expectedEventIDs) // Put them in chronological order to match the expected list - // slices.Reverse(relevantActualEvents) slices.Reverse(relevantActualEventIDs) expectedLines := make([]string, len(expectedEventIDs)) From 903ae0fd6200f4a580fa70fb6098e9a448b15619 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 18 Nov 2025 14:05:44 -0600 Subject: [PATCH 13/14] Simplify order complexity of `assertMessagesInTimelineInOrder` See https://github.com/matrix-org/complement/pull/816#discussion_r2539072826 --- tests/csapi/room_messages_test.go | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/tests/csapi/room_messages_test.go b/tests/csapi/room_messages_test.go index 6e22eb1e..39375bd9 100644 --- a/tests/csapi/room_messages_test.go +++ b/tests/csapi/room_messages_test.go @@ -421,7 +421,7 @@ func _sendAndTestMessageHistory( } // Keep paginating backwards until we reach the start of the room - actualEventIDs := make( + reverseChronologicalActualEventIDs := make( []string, 0, // This is a minimum capacity (there will be more events) @@ -443,7 +443,7 @@ func _sendAndTestMessageHistory( ) messagesResBody := client.ParseJSON(t, messagesRes) actualEventIDsFromRequest := extractEventIDsFromMessagesResponse(t, messagesResBody) - actualEventIDs = append(actualEventIDs, actualEventIDsFromRequest...) + reverseChronologicalActualEventIDs = append(reverseChronologicalActualEventIDs, actualEventIDsFromRequest...) // Make it easy to understand what each `/messages` request returned relevantActualEventIDsFromRequest := filterEventIDs(t, actualEventIDsFromRequest, eventIDs) @@ -474,8 +474,12 @@ func _sendAndTestMessageHistory( } } + // Put them in chronological order to match the expected list + chronologicalActualEventIds := slices.Clone(reverseChronologicalActualEventIDs) + slices.Reverse(chronologicalActualEventIds) + // Assert timeline order - assertMessagesInTimelineInOrder(t, actualEventIDs, eventIDs) + assertMessagesInTimelineInOrder(t, chronologicalActualEventIds, eventIDs) } func sendMessageDrafts( @@ -566,15 +570,10 @@ func filterEventIDs(t *testing.T, actualEventIDs []string, expectedEventIDs []st // assertMessagesTimeline asserts all events are in the `/messages` response in the // given order. Other unrelated events can be in between. -// -// messagesResBody: from a `/messages?dir=b` request (these will be in reverse-chronological order) -// eventIDs: the list of event IDs in chronological order that we expect to see in the response func assertMessagesInTimelineInOrder(t *testing.T, actualEventIDs []string, expectedEventIDs []string) { t.Helper() relevantActualEventIDs := filterEventIDs(t, actualEventIDs, expectedEventIDs) - // Put them in chronological order to match the expected list - slices.Reverse(relevantActualEventIDs) expectedLines := make([]string, len(expectedEventIDs)) for i, expectedEventID := range expectedEventIDs { From 2772ebe9f53bd8da091d4ab568d6e5f8ed859948 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 18 Nov 2025 14:16:12 -0600 Subject: [PATCH 14/14] Separate debug string generation from assertion logic See https://github.com/matrix-org/complement/pull/816#discussion_r2539098175 --- tests/csapi/room_messages_test.go | 53 +++++++++++++++++++------------ 1 file changed, 32 insertions(+), 21 deletions(-) diff --git a/tests/csapi/room_messages_test.go b/tests/csapi/room_messages_test.go index 39375bd9..13c0e32d 100644 --- a/tests/csapi/room_messages_test.go +++ b/tests/csapi/room_messages_test.go @@ -479,7 +479,7 @@ func _sendAndTestMessageHistory( slices.Reverse(chronologicalActualEventIds) // Assert timeline order - assertMessagesInTimelineInOrder(t, chronologicalActualEventIds, eventIDs) + assertEventsInOrder(t, chronologicalActualEventIds, eventIDs) } func sendMessageDrafts( @@ -568,16 +568,33 @@ func filterEventIDs(t *testing.T, actualEventIDs []string, expectedEventIDs []st return relevantActualEventIDs } -// assertMessagesTimeline asserts all events are in the `/messages` response in the -// given order. Other unrelated events can be in between. -func assertMessagesInTimelineInOrder(t *testing.T, actualEventIDs []string, expectedEventIDs []string) { +// assertEventsInOrder asserts all `actualEventIDs` are present and in order according +// to `expectedEventIDs`. Other unrelated events can be in between. +func assertEventsInOrder(t *testing.T, actualEventIDs []string, expectedEventIDs []string) { t.Helper() relevantActualEventIDs := filterEventIDs(t, actualEventIDs, expectedEventIDs) + if len(relevantActualEventIDs) != len(expectedEventIDs) { + t.Fatalf("expected %d events in timeline (got %d relevant events filtered down from %d events)\n%s", + len(expectedEventIDs), len(relevantActualEventIDs), len(actualEventIDs), + generateEventOrderDiffString(relevantActualEventIDs, expectedEventIDs), + ) + } + + for i, eventID := range relevantActualEventIDs { + if eventID != expectedEventIDs[i] { + t.Fatalf("expected event ID %s (got %s) at index %d\n%s", + expectedEventIDs[i], eventID, i, generateEventOrderDiffString(relevantActualEventIDs, expectedEventIDs), + ) + } + } +} + +func generateEventOrderDiffString(actualEventIDs []string, expectedEventIDs []string) string { expectedLines := make([]string, len(expectedEventIDs)) for i, expectedEventID := range expectedEventIDs { - isExpectedInActual := slices.Contains(relevantActualEventIDs, expectedEventID) + isExpectedInActual := slices.Contains(actualEventIDs, expectedEventID) isMissingIndicatorString := " " if !isExpectedInActual { isMissingIndicatorString = "?" @@ -587,8 +604,8 @@ func assertMessagesInTimelineInOrder(t *testing.T, actualEventIDs []string, expe } expectedDiffString := strings.Join(expectedLines, "\n") - actualLines := make([]string, len(relevantActualEventIDs)) - for actualEventIndex, actualEventID := range relevantActualEventIDs { + actualLines := make([]string, len(actualEventIDs)) + for actualEventIndex, actualEventID := range actualEventIDs { isActualInExpected := slices.Contains(expectedEventIDs, actualEventID) isActualInExpectedIndicatorString := " " if isActualInExpected { @@ -606,21 +623,15 @@ func assertMessagesInTimelineInOrder(t *testing.T, actualEventIDs []string, expe expectedIndexString = fmt.Sprintf(" (expected index %d %s)", expectedIndex, expectedDirectionString) } - actualLines[actualEventIndex] = fmt.Sprintf("%2d: %s %s%s", actualEventIndex, isActualInExpectedIndicatorString, actualEventID, expectedIndexString) - } - actualDiffString := strings.Join(actualLines, "\n") - - if len(relevantActualEventIDs) != len(expectedEventIDs) { - t.Fatalf("expected %d events in timeline (got %d)\nActual events ('+' = found expected items):\n%s\nExpected events ('?' = missing expected items):\n%s", - len(expectedEventIDs), len(relevantActualEventIDs), actualDiffString, expectedDiffString, + actualLines[actualEventIndex] = fmt.Sprintf("%2d: %s %s%s", + actualEventIndex, isActualInExpectedIndicatorString, actualEventID, expectedIndexString, ) } + actualDiffString := strings.Join(actualLines, "\n") - for i, eventID := range relevantActualEventIDs { - if eventID != expectedEventIDs[i] { - t.Fatalf("expected event ID %s (got %s) at index %d\nActual events ('+' = found expected items):\n%s\nExpected events ('?' = missing expected items):\n%s", - expectedEventIDs[i], eventID, i, actualDiffString, expectedDiffString, - ) - } - } + return fmt.Sprintf( + "Actual events ('+' = found expected items):\n%s\nExpected events ('?' = missing expected items):\n%s", + actualDiffString, + expectedDiffString, + ) }