Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/fix-relation-tracker-restart-test-race.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@core/sync-service': patch
---

Fix a race condition in the PublicationManager relation tracker restart test that caused intermittent failures.
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,18 @@ defmodule Electric.Replication.PublicationManagerTest do
relation_tracker_name = PublicationManager.RelationTracker.name(ctx.stack_id)
GenServer.stop(relation_tracker_name)

# Wait for the supervisor to restart and re-register the RelationTracker,
# then for it to finish restoring filters from ShapeStatus.
#
# The assert_pub_tables below is NOT a sufficient barrier on its own: the
# relation is already in the publication and is not removed during the
# restart, so the assertion passes immediately - potentially before the
# restarted process has re-registered. Calling remove_shape in that window
# races with a "no process" exit, which is the flakiness this guards
# against.
assert wait_until(fn -> is_pid(GenServer.whereis(relation_tracker_name)) end, 2_000)
:ok = PublicationManager.wait_for_restore(ctx.stack_id, timeout: 2_000)

# After restart, the publication manager should repopulate from ShapeStatus.
# The publication should still have the relation.
assert_pub_tables(ctx, [ctx.relation], 2_000)
Expand Down
26 changes: 6 additions & 20 deletions packages/sync-service/test/electric/shape_cache_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ defmodule Electric.ShapeCacheTest do
assert_shape_cleanup: 1,
patch_shape_status: 1,
patch_shape_cache: 1,
complete_txn_fragment: 3
complete_txn_fragment: 3,
wait_until: 2
]

@stub_inspector Support.StubInspector.new(
Expand Down Expand Up @@ -1338,9 +1339,10 @@ defmodule Electric.ShapeCacheTest do
index_after = SubqueryIndex.for_stack(ctx.stack_id)
assert index_after != nil

assert wait_until(200, fn ->
SubqueryIndex.has_positions?(index_after, shape_handle)
end)
assert wait_until(
fn -> SubqueryIndex.has_positions?(index_after, shape_handle) end,
200
)
end

test "restores shapes with subqueries and their materializers when backup missing", ctx do
Expand Down Expand Up @@ -1389,22 +1391,6 @@ defmodule Electric.ShapeCacheTest do
:ok = stop_supervised(name)
end
end

defp wait_until(timeout_ms, fun, started_at \\ System.monotonic_time(:millisecond))

defp wait_until(timeout_ms, fun, started_at) do
cond do
fun.() ->
true

System.monotonic_time(:millisecond) - started_at >= timeout_ms ->
false

true ->
Process.sleep(10)
wait_until(timeout_ms, fun, started_at)
end
end
end

describe "start_consumer_for_handle/2" do
Expand Down
24 changes: 24 additions & 0 deletions packages/sync-service/test/support/test_utils.ex
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,30 @@ defmodule Support.TestUtils do
Electric.StatusMonitor.mark_pg_lock_as_errored(stack_id, error_message)
end

@doc """
Poll `fun` every 10ms until it returns a truthy value or `timeout_ms` elapses.

Returns `true` if the condition was met within the timeout, `false` otherwise.
"""
def wait_until(fun, timeout_ms \\ 500)
when is_function(fun, 0) and is_integer(timeout_ms) do
do_wait_until(fun, timeout_ms, System.monotonic_time(:millisecond))
end

defp do_wait_until(fun, timeout_ms, started_at) do
cond do
fun.() ->
true

System.monotonic_time(:millisecond) - started_at >= timeout_ms ->
false

true ->
Process.sleep(10)
do_wait_until(fun, timeout_ms, started_at)
end
end

def generate_shape(relation, where_clause \\ nil, selected_columns \\ nil) do
all_columns = Enum.uniq(["id", "value", "foo_enum"] ++ (selected_columns || []))
selected_columns = selected_columns || all_columns
Expand Down
Loading