From daf3a4db887a8b7f805fc50ac41c68dcfd93f53e Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Fri, 19 Jun 2026 11:42:22 +0200 Subject: [PATCH 1/2] Fix flaky PublicationManager relation tracker restart test Fixes #4606. The "handles relation tracker restart" test stopped the RelationTracker and relied on assert_pub_tables as a "wait for restart" barrier. It isn't one: the relation is already in the publication and isn't removed during the restart, so the assertion passes immediately - potentially before the supervisor has restarted and re-registered the process. The subsequent remove_shape call then raced with a "no process" exit. Wait for the RelationTracker to be re-registered and to finish restoring its filters before issuing further calls. Reuse the existing wait-on-condition idiom rather than adding a bespoke helper: promote the generic poller to Support.TestUtils as wait_until/2. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../replication/publication_manager_test.exs | 12 +++++++++ .../test/electric/shape_cache_test.exs | 26 +++++-------------- .../sync-service/test/support/test_utils.ex | 24 +++++++++++++++++ 3 files changed, 42 insertions(+), 20 deletions(-) diff --git a/packages/sync-service/test/electric/replication/publication_manager_test.exs b/packages/sync-service/test/electric/replication/publication_manager_test.exs index 097604f609..e48fe68448 100644 --- a/packages/sync-service/test/electric/replication/publication_manager_test.exs +++ b/packages/sync-service/test/electric/replication/publication_manager_test.exs @@ -652,6 +652,18 @@ defmodule Electric.Replication.PublicationManagerTest do relation_tracker_name = PublicationManager.RelationTracker.name(ctx.stack_id) GenServer.stop(relation_tracker_name) + # Wait for the supervisor to restart and re-register the RelationTracker, + # then for it to finish restoring filters from ShapeStatus. + # + # The assert_pub_tables below is NOT a sufficient barrier on its own: the + # relation is already in the publication and is not removed during the + # restart, so the assertion passes immediately - potentially before the + # restarted process has re-registered. Calling remove_shape in that window + # races with a "no process" exit, which is the flakiness this guards + # against. + assert wait_until(fn -> is_pid(GenServer.whereis(relation_tracker_name)) end, 2_000) + :ok = PublicationManager.wait_for_restore(ctx.stack_id, timeout: 2_000) + # After restart, the publication manager should repopulate from ShapeStatus. # The publication should still have the relation. assert_pub_tables(ctx, [ctx.relation], 2_000) diff --git a/packages/sync-service/test/electric/shape_cache_test.exs b/packages/sync-service/test/electric/shape_cache_test.exs index 1407853478..0b14f47103 100644 --- a/packages/sync-service/test/electric/shape_cache_test.exs +++ b/packages/sync-service/test/electric/shape_cache_test.exs @@ -22,7 +22,8 @@ defmodule Electric.ShapeCacheTest do assert_shape_cleanup: 1, patch_shape_status: 1, patch_shape_cache: 1, - complete_txn_fragment: 3 + complete_txn_fragment: 3, + wait_until: 2 ] @stub_inspector Support.StubInspector.new( @@ -1338,9 +1339,10 @@ defmodule Electric.ShapeCacheTest do index_after = SubqueryIndex.for_stack(ctx.stack_id) assert index_after != nil - assert wait_until(200, fn -> - SubqueryIndex.has_positions?(index_after, shape_handle) - end) + assert wait_until( + fn -> SubqueryIndex.has_positions?(index_after, shape_handle) end, + 200 + ) end test "restores shapes with subqueries and their materializers when backup missing", ctx do @@ -1389,22 +1391,6 @@ defmodule Electric.ShapeCacheTest do :ok = stop_supervised(name) end end - - defp wait_until(timeout_ms, fun, started_at \\ System.monotonic_time(:millisecond)) - - defp wait_until(timeout_ms, fun, started_at) do - cond do - fun.() -> - true - - System.monotonic_time(:millisecond) - started_at >= timeout_ms -> - false - - true -> - Process.sleep(10) - wait_until(timeout_ms, fun, started_at) - end - end end describe "start_consumer_for_handle/2" do diff --git a/packages/sync-service/test/support/test_utils.ex b/packages/sync-service/test/support/test_utils.ex index f2cc374828..87d3415985 100644 --- a/packages/sync-service/test/support/test_utils.ex +++ b/packages/sync-service/test/support/test_utils.ex @@ -138,6 +138,30 @@ defmodule Support.TestUtils do Electric.StatusMonitor.mark_pg_lock_as_errored(stack_id, error_message) end + @doc """ + Poll `fun` every 10ms until it returns a truthy value or `timeout_ms` elapses. + + Returns `true` if the condition was met within the timeout, `false` otherwise. + """ + def wait_until(fun, timeout_ms \\ 500) + when is_function(fun, 0) and is_integer(timeout_ms) do + do_wait_until(fun, timeout_ms, System.monotonic_time(:millisecond)) + end + + defp do_wait_until(fun, timeout_ms, started_at) do + cond do + fun.() -> + true + + System.monotonic_time(:millisecond) - started_at >= timeout_ms -> + false + + true -> + Process.sleep(10) + do_wait_until(fun, timeout_ms, started_at) + end + end + def generate_shape(relation, where_clause \\ nil, selected_columns \\ nil) do all_columns = Enum.uniq(["id", "value", "foo_enum"] ++ (selected_columns || [])) selected_columns = selected_columns || all_columns From 2f90aab2e4c6f5c8fc3f344a7b212786b2ace363 Mon Sep 17 00:00:00 2001 From: Oleksii Sholik Date: Fri, 19 Jun 2026 11:54:56 +0200 Subject: [PATCH 2/2] Add changeset for relation tracker restart test fix Co-Authored-By: Claude Opus 4.8 (1M context) --- .changeset/fix-relation-tracker-restart-test-race.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/fix-relation-tracker-restart-test-race.md diff --git a/.changeset/fix-relation-tracker-restart-test-race.md b/.changeset/fix-relation-tracker-restart-test-race.md new file mode 100644 index 0000000000..e32939d6b7 --- /dev/null +++ b/.changeset/fix-relation-tracker-restart-test-race.md @@ -0,0 +1,5 @@ +--- +'@core/sync-service': patch +--- + +Fix a race condition in the PublicationManager relation tracker restart test that caused intermittent failures.