Skip to content

Commit 5ce8c28

Browse files
Add disconnect_and_retry (#753)
--------- Co-authored-by: José Valim <jose.valim@gmail.com>
1 parent 251f30b commit 5ce8c28

File tree

4 files changed

+127
-30
lines changed

4 files changed

+127
-30
lines changed

lib/postgrex/protocol.ex

Lines changed: 58 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -350,7 +350,7 @@ defmodule Postgrex.Protocol do
350350
| {:error, %ArgumentError{} | Postgrex.Error.t(), state}
351351
| {:error, %DBConnection.TransactionError{}, state}
352352
| {:disconnect, %RuntimeError{}, state}
353-
| {:disconnect, %DBConnection.ConnectionError{}, state}
353+
| {:disconnect | :disconnect_and_retry, %DBConnection.ConnectionError{}, state}
354354
def handle_prepare(%Query{} = query, _, %{postgres: {_, _}} = s) do
355355
lock_error(s, :prepare, query)
356356
end
@@ -365,15 +365,18 @@ defmodule Postgrex.Protocol do
365365
def handle_prepare(%Query{name: ""} = query, opts, s) do
366366
prepare = Keyword.get(opts, :postgrex_prepare, false)
367367
status = new_status(opts, prepare: prepare)
368+
comment = Keyword.get(opts, :comment)
368369

369-
case prepare do
370-
true ->
371-
parse_describe_close(s, status, query)
370+
result =
371+
case prepare do
372+
true ->
373+
parse_describe_close(s, status, query)
372374

373-
false ->
374-
comment = Keyword.get(opts, :comment)
375-
parse_describe_flush(s, status, query, comment)
376-
end
375+
false ->
376+
parse_describe_flush(s, status, query, comment)
377+
end
378+
379+
handle_disconnect_retry(result)
377380
end
378381

379382
def handle_prepare(%Query{} = query, opts, %{queries: nil} = s) do
@@ -395,8 +398,9 @@ defmodule Postgrex.Protocol do
395398
false -> close_parse_describe_flush(s, status, query, comment)
396399
end
397400

398-
with {:ok, query, s} <- result do
399-
{:ok, query, %{s | messages: []}}
401+
case result do
402+
{:ok, query, s} -> {:ok, query, %{s | messages: []}}
403+
other -> handle_disconnect_retry(other)
400404
end
401405
end
402406
end
@@ -422,11 +426,14 @@ defmodule Postgrex.Protocol do
422426
| {:error, %ArgumentError{} | Postgrex.Error.t(), state}
423427
| {:error, %DBConnection.TransactionError{}, state}
424428
| {:disconnect, %RuntimeError{}, state}
425-
| {:disconnect, %DBConnection.ConnectionError{}, state}
429+
| {:disconnect | :disconnect_and_retry, %DBConnection.ConnectionError{}, state}
426430
def handle_execute(%Query{} = query, params, opts, s) do
427431
case Keyword.get(opts, :postgrex_copy, false) do
428-
true -> handle_execute_copy(query, params, opts, s)
429-
false -> handle_execute_result(query, params, opts, s)
432+
true ->
433+
handle_execute_copy(query, params, opts, s)
434+
435+
false ->
436+
handle_execute_result(query, params, opts, s)
430437
end
431438
end
432439

@@ -503,17 +510,19 @@ defmodule Postgrex.Protocol do
503510
{:ok, Postgrex.Result.t(), state}
504511
| {:error, %ArgumentError{} | Postgrex.Error.t(), state}
505512
| {:disconnect, %RuntimeError{}, state}
506-
| {:disconnect, %DBConnection.ConnectionError{}, state}
513+
| {:disconnect | :disconnect_and_retry, %DBConnection.ConnectionError{}, state}
507514
def handle_close(%Query{ref: ref} = query, opts, %{postgres: {_, ref}} = s) do
508-
flushed_close(s, new_status(opts), query)
515+
result = flushed_close(s, new_status(opts), query)
516+
handle_disconnect_retry(result)
509517
end
510518

511519
def handle_close(%Query{} = query, _, %{postgres: {_, _}} = s) do
512520
lock_error(s, :close, query)
513521
end
514522

515523
def handle_close(%Query{} = query, opts, s) do
516-
close(s, new_status(opts), query)
524+
result = close(s, new_status(opts), query)
525+
handle_disconnect_retry(result)
517526
end
518527

519528
@impl true
@@ -582,7 +591,8 @@ defmodule Postgrex.Protocol do
582591
{:ok, Postgrex.Result.t(), state}
583592
| {DBConnection.status(), state}
584593
| {:disconnect, %RuntimeError{}, state}
585-
| {:disconnect, %DBConnection.ConnectionError{} | Postgrex.Error.t(), state}
594+
| {:disconnect | :disconnect_and_retry,
595+
%DBConnection.ConnectionError{} | Postgrex.Error.t(), state}
586596
def handle_begin(_, %{postgres: {_, _}} = s) do
587597
lock_error(s, :begin)
588598
end
@@ -591,7 +601,8 @@ defmodule Postgrex.Protocol do
591601
case Keyword.get(opts, :mode, :transaction) do
592602
:transaction when postgres == :idle ->
593603
statement = "BEGIN"
594-
handle_transaction(statement, opts, s)
604+
result = handle_transaction(statement, opts, s)
605+
handle_disconnect_retry(result)
595606

596607
:savepoint when postgres == :transaction ->
597608
statement = "SAVEPOINT postgrex_savepoint"
@@ -2081,7 +2092,7 @@ defmodule Postgrex.Protocol do
20812092
bind_execute_close(s, status, query, params)
20822093

20832094
{error, _, _} = other when error in [:error, :disconnect] ->
2084-
other
2095+
handle_disconnect_retry(other)
20852096
end
20862097
end
20872098

@@ -2093,7 +2104,7 @@ defmodule Postgrex.Protocol do
20932104
bind_execute(s, status, query, params)
20942105

20952106
{error, _, _} = other when error in [:error, :disconnect] ->
2096-
other
2107+
handle_disconnect_retry(other)
20972108
end
20982109
end
20992110

@@ -2114,8 +2125,8 @@ defmodule Postgrex.Protocol do
21142125
msg_sync()
21152126
]
21162127

2117-
with :ok <- msg_send(%{s | buffer: nil}, msgs, buffer),
2118-
{:ok, s, buffer} <- recv_bind(s, status, buffer),
2128+
with :ok <- msg_send(%{s | buffer: nil}, msgs, buffer) |> handle_disconnect_retry(),
2129+
{:ok, s, buffer} <- recv_bind(s, status, buffer) |> handle_disconnect_retry(),
21192130
{:ok, result, s, buffer} <- recv_execute(s, status, query, buffer),
21202131
{:ok, s, buffer} <- recv_close(s, status, buffer),
21212132
{:ok, s} <- recv_ready(s, status, buffer) do
@@ -2125,7 +2136,7 @@ defmodule Postgrex.Protocol do
21252136
error_ready(s, status, err, buffer)
21262137
|> maybe_disconnect()
21272138

2128-
{:disconnect, _err, _s} = disconnect ->
2139+
{_disconnect_or_retry, _err, _s} = disconnect ->
21292140
disconnect
21302141
end
21312142
end
@@ -2151,8 +2162,8 @@ defmodule Postgrex.Protocol do
21512162
msg_sync()
21522163
]
21532164

2154-
with :ok <- msg_send(%{s | buffer: nil}, msgs, buffer),
2155-
{:ok, s, buffer} <- recv_bind(s, status, buffer),
2165+
with :ok <- msg_send(%{s | buffer: nil}, msgs, buffer) |> handle_disconnect_retry(),
2166+
{:ok, s, buffer} <- recv_bind(s, status, buffer) |> handle_disconnect_retry(),
21562167
{:ok, result, s, buffer} <- recv_execute(s, status, query, buffer),
21572168
{:ok, s} <- recv_ready(s, status, buffer) do
21582169
{:ok, query, result, s}
@@ -2163,7 +2174,7 @@ defmodule Postgrex.Protocol do
21632174
error_ready(s, status, err, buffer)
21642175
|> maybe_disconnect()
21652176

2166-
{:disconnect, _err, _s} = disconnect ->
2177+
{_disconnect_or_retry, _err, _s} = disconnect ->
21672178
disconnect
21682179
end
21692180
end
@@ -3391,7 +3402,13 @@ defmodule Postgrex.Protocol do
33913402
end
33923403

33933404
defp conn_error(mod, action, reason) when reason in @nonposix_errors do
3394-
conn_error("#{mod} #{action}: #{reason}")
3405+
msg = "#{mod} #{action}: #{reason}"
3406+
3407+
if reason == :closed do
3408+
conn_error(msg, :closed)
3409+
else
3410+
conn_error(msg)
3411+
end
33953412
end
33963413

33973414
defp conn_error(:tcp, action, reason) do
@@ -3404,6 +3421,10 @@ defmodule Postgrex.Protocol do
34043421
conn_error("ssl #{action}: #{formatted_reason} - #{inspect(reason)}")
34053422
end
34063423

3424+
defp conn_error(message, reason) do
3425+
DBConnection.ConnectionError.exception(message: message, reason: reason)
3426+
end
3427+
34073428
defp conn_error(message) do
34083429
DBConnection.ConnectionError.exception(message)
34093430
end
@@ -3416,6 +3437,16 @@ defmodule Postgrex.Protocol do
34163437
{:disconnect, err, %{s | buffer: buffer}}
34173438
end
34183439

3440+
# This function is used in two ways:
3441+
#
3442+
# * When we know the operation is fully retriable, we invoke it at the top
3443+
# * When only part is retriable (such as bind in execute or begin in a transaction),
3444+
# we invoke it at the specific instructions
3445+
defp handle_disconnect_retry({:disconnect, %{reason: :closed} = err, s}),
3446+
do: {:disconnect_and_retry, err, s}
3447+
3448+
defp handle_disconnect_retry(other), do: other
3449+
34193450
defp sync_recv(s, status, buffer) do
34203451
%{postgres: postgres, transactions: transactions} = s
34213452

mix.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ defmodule Postgrex.Mixfile do
3333
{:jason, "~> 1.0", optional: true},
3434
{:table, "~> 0.1.0", optional: true},
3535
{:decimal, "~> 1.5 or ~> 2.0"},
36-
{:db_connection, "~> 2.1"}
36+
{:db_connection, github: "elixir-ecto/db_connection", branch: "master"}
3737
]
3838
end
3939

mix.lock

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
%{
2-
"db_connection": {:hex, :db_connection, "2.7.0", "b99faa9291bb09892c7da373bb82cba59aefa9b36300f6145c5f201c7adf48ec", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "dcf08f31b2701f857dfc787fbad78223d61a32204f217f15e881dd93e4bdd3ff"},
2+
"db_connection": {:git, "https://github.com/elixir-ecto/db_connection.git", "ce227e06605b77540c6b27bc94acddaf4e7ae027", [branch: "master"]},
33
"decimal": {:hex, :decimal, "1.9.0", "83e8daf59631d632b171faabafb4a9f4242c514b0a06ba3df493951c08f64d07", [:mix], [], "hexpm", "b1f2343568eed6928f3e751cf2dffde95bfaa19dd95d09e8a9ea92ccfd6f7d85"},
44
"earmark_parser": {:hex, :earmark_parser, "1.4.44", "f20830dd6b5c77afe2b063777ddbbff09f9759396500cdbe7523efd58d7a339c", [:mix], [], "hexpm", "4778ac752b4701a5599215f7030989c989ffdc4f6df457c5f36938cc2d2a2750"},
55
"ex_doc": {:hex, :ex_doc, "0.38.2", "504d25eef296b4dec3b8e33e810bc8b5344d565998cd83914ffe1b8503737c02", [:mix], [{:earmark_parser, "~> 1.4.44", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.0", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1 or ~> 1.0", [hex: :makeup_erlang, repo: "hexpm", optional: false]}, {:makeup_html, ">= 0.1.0", [hex: :makeup_html, repo: "hexpm", optional: true]}], "hexpm", "732f2d972e42c116a70802f9898c51b54916e542cc50968ac6980512ec90f42b"},
@@ -9,5 +9,5 @@
99
"makeup_erlang": {:hex, :makeup_erlang, "1.0.2", "03e1804074b3aa64d5fad7aa64601ed0fb395337b982d9bcf04029d68d51b6a7", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "af33ff7ef368d5893e4a267933e7744e46ce3cf1f61e2dccf53a111ed3aa3727"},
1010
"nimble_parsec": {:hex, :nimble_parsec, "1.4.2", "8efba0122db06df95bfaa78f791344a89352ba04baedd3849593bfce4d0dc1c6", [:mix], [], "hexpm", "4b21398942dda052b403bbe1da991ccd03a053668d147d53fb8c4e0efe09c973"},
1111
"table": {:hex, :table, "0.1.0", "f16104d717f960a623afb134a91339d40d8e11e0c96cfce54fee086b333e43f0", [:mix], [], "hexpm", "bf533d3606823ad8a7ee16f41941e5e6e0e42a20c4504cdf4cfabaaed1c8acb9"},
12-
"telemetry": {:hex, :telemetry, "1.0.0", "0f453a102cdf13d506b7c0ab158324c337c41f1cc7548f0bc0e130bbf0ae9452", [:rebar3], [], "hexpm", "73bc09fa59b4a0284efb4624335583c528e07ec9ae76aca96ea0673850aec57a"},
12+
"telemetry": {:hex, :telemetry, "1.3.0", "fedebbae410d715cf8e7062c96a1ef32ec22e764197f70cda73d82778d61e7a2", [:rebar3], [], "hexpm", "7015fc8919dbe63764f4b4b87a95b7c0996bd539e0d499be6ec9d7f3875b79e6"},
1313
}

test/query_test.exs

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1965,4 +1965,70 @@ defmodule QueryTest do
19651965
Postgrex.execute!(context[:pid], "name", "postgrex")
19661966
end
19671967
end
1968+
1969+
test "disconnect_and_retry with prepare" do
1970+
# Start new connection so we can retry on disconnect
1971+
opts = [database: "postgrex_test", backoff_min: 1, backoff_max: 1]
1972+
{:ok, pid} = P.start_link(opts)
1973+
1974+
# Drop socket
1975+
disconnect(pid)
1976+
1977+
# Assert preparation happens instead of returning error
1978+
assert {:ok, _} = P.prepare(pid, "42", "SELECT 42")
1979+
end
1980+
1981+
test "disconnect_and_retry with transaction" do
1982+
# Start new connection so we can retry on disconnect
1983+
opts = [database: "postgrex_test", backoff_min: 1, backoff_max: 1]
1984+
{:ok, pid} = P.start_link(opts)
1985+
1986+
# Drop socket
1987+
disconnect(pid)
1988+
1989+
# Assert transaction happens instead of returning error
1990+
assert {:ok, _} = P.transaction(pid, fn conn -> P.query(conn, "SELECT 1", []) end)
1991+
end
1992+
1993+
test "disconnect_and_retry with closing prepared statement" do
1994+
# Start new connection so we can retry on disconnect
1995+
opts = [database: "postgrex_test", backoff_min: 1, backoff_max: 1]
1996+
{:ok, pid} = P.start_link(opts)
1997+
1998+
# Prepare query that we wil try to close after disconnecting
1999+
{:ok, query} = P.prepare(pid, "42", "SELECT 42")
2000+
2001+
# Drop socket
2002+
disconnect(pid)
2003+
2004+
# Assert close happens instead of returning error
2005+
assert :ok = P.close(pid, query)
2006+
end
2007+
2008+
test "disconnect_and_retry on attempting execution of prepared statement" do
2009+
# Start new connection so we can retry on disconnect
2010+
opts = [database: "postgrex_test", backoff_min: 1, backoff_max: 1]
2011+
{:ok, pid} = P.start_link(opts)
2012+
2013+
# Prepare query that we wil try to execute after disconnecting
2014+
{:ok, query} = P.prepare(pid, "42", "SELECT 42")
2015+
2016+
# Drop socket
2017+
disconnect(pid)
2018+
2019+
# Assert execute happens instead of returning error
2020+
assert {:ok, _, _} = P.execute(pid, query, [])
2021+
end
2022+
2023+
defp disconnect(pid) do
2024+
sock = DBConnection.run(pid, &get_socket/1)
2025+
:gen_tcp.shutdown(sock, :read_write)
2026+
end
2027+
2028+
defp get_socket(conn) do
2029+
{:pool_ref, _, _, _, holder, _} = conn.pool_ref
2030+
[{:conn, _, _, state, _, _, _, _}] = :ets.lookup(holder, :conn)
2031+
{:gen_tcp, sock} = state.sock
2032+
sock
2033+
end
19682034
end

0 commit comments

Comments
 (0)