Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ jobs:
fail-fast: false
matrix:
version:
- '1.10'
- '1'
- 'nightly'
os:
Expand Down
6 changes: 3 additions & 3 deletions Project.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name = "GraphQLClient"
uuid = "09d831e3-9c21-47a9-bfd8-076871817219"
version = "0.7.6"
version = "0.8.0"

[deps]
GraphQLParser = "0ae10fbf-af58-4883-b66b-ff0ac82d20dd"
Expand All @@ -10,7 +10,7 @@ StructTypes = "856f2bd8-1eba-4b0a-8007-ebc267875bd4"

[compat]
GraphQLParser = "0.1.1"
HTTP = "1"
HTTP = "2"
JSON3 = "1.1.2"
StructTypes = "1.5"
julia = "1.6"
julia = "1.10"
2 changes: 1 addition & 1 deletion docs/src/operations.md
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ As well as the `subscription_name`, `output_type`, `sub_args`, `output_fields` a

A function can be passed to the `initfn` to be run once the subscription is open. This means that if subscribing to the result of a mutation, for example, it can be guaranteed that no responses will be missed between the mutation being executed and the subscription being opened.

If the `retry` keyword argument is `true`, GraphQLClient will retry the opening of the subscription if it fails. This keyword argument is passed directly to `HTTP.WebSockets.open`.
The `retry` keyword argument is deprecated and has no effect. HTTP.jl 2 no longer accepts a `retry` keyword on `HTTP.WebSockets.open`, so it is retained only for backwards compatibility.

### Stopping

Expand Down
103 changes: 62 additions & 41 deletions src/subscriptions.jl
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,11 @@ This function is designed to be used with the `do` keyword.
- `output_fields=String[]`: output fields to be returned. Can be a string, or
composed of dictionaries and vectors.
- `initfn=nothing`: optional function to be run once subscription is itialised.
- `retry=true`: retry if subscription fails to open.
- `retry=true`: retry establishing the subscription's WebSocket connection if the
initial open fails (up to a few attempts with backoff). HTTP.jl 2 no longer
accepts a `retry` keyword on `WebSockets.open`, so this is reimplemented here.
Only connection-establishment failures are retried; errors raised once the
subscription is live are not (the handler is never re-run).
- `subtimeout=0`: if `stopfn` supplied, this is the period that it is called at.
If `stopfn` is not supplied, this is the timeout for waiting for data. The timer
is reset after every subscription result is received.
Expand Down Expand Up @@ -94,49 +98,66 @@ function open_subscription(fn::Function,
)
message_str = JSON3.write(message)
throw_if_assigned = Ref{GraphQLError}()
HTTP.WebSockets.open(client.ws_endpoint; retry=retry, headers=client.headers) do ws
# Start sub
output_info(verbose) && println("Starting $(get_name(subscription_name)) subscription with ID $sub_id")
HTTP.send(ws, message_str)
subscription_tracker[][sub_id] = "open"

# Init function
if !isnothing(initfn)
output_debug(verbose) && println("Running subscription initialisation function")
initfn()
end

# Get listening
output_debug(verbose) && println("Listening to $(get_name(subscription_name)) with ID $sub_id...")

# Run function
finish = false
while !finish
data = readfromwebsocket(ws, stopfn, subtimeout)
if data === :timeout
output_info(verbose) && println("Subscription $sub_id timed out")
break
elseif data === :stopfn
output_info(verbose) && println("Subscription $sub_id stopped by the stop function supplied")
break
end
response = JSON3.read(data, GQLSubscriptionResponse{output_type})
payload = response.payload
if !isnothing(payload.errors) && !isempty(payload.errors) && throw_on_execution_error
subscription_tracker[][sub_id] = "errored"
throw_if_assigned[] = GraphQLError("Error during subscription.", payload)
break
# HTTP.jl 2 dropped the `retry` keyword on `WebSockets.open`, so connection-open
# retries are reimplemented here. We retry only failures that happen before the
# handler starts (establishing the WebSocket); once `established[]` is set the
# subscription is live, so any later error propagates and the handler is never
# re-run (which would otherwise re-send `start` / re-run `initfn`).
established = Ref(false)
max_attempts = retry ? 4 : 1
for attempt in 1:max_attempts
try
HTTP.WebSockets.open(client.ws_endpoint; headers=client.headers) do ws
established[] = true
# Start sub
output_info(verbose) && println("Starting $(get_name(subscription_name)) subscription with ID $sub_id")
HTTP.WebSockets.send(ws, message_str)
subscription_tracker[][sub_id] = "open"

# Init function
if !isnothing(initfn)
output_debug(verbose) && println("Running subscription initialisation function")
initfn()
end
# Handle multiple subs, do we need this?
if response.id == string(sub_id)
output_debug(verbose) && println("Result recieved on subscription with ID $sub_id")
finish = fn(payload)
if !isa(finish, Bool)

# Get listening
output_debug(verbose) && println("Listening to $(get_name(subscription_name)) with ID $sub_id...")

# Run function
finish = false
while !finish
data = readfromwebsocket(ws, stopfn, subtimeout)
if data === :timeout
output_info(verbose) && println("Subscription $sub_id timed out")
break
elseif data === :stopfn
output_info(verbose) && println("Subscription $sub_id stopped by the stop function supplied")
break
end
response = JSON3.read(data, GQLSubscriptionResponse{output_type})
payload = response.payload
if !isnothing(payload.errors) && !isempty(payload.errors) && throw_on_execution_error
subscription_tracker[][sub_id] = "errored"
error("Subscription function must return a boolean")
throw_if_assigned[] = GraphQLError("Error during subscription.", payload)
break
end
# Handle multiple subs, do we need this?
if response.id == string(sub_id)
output_debug(verbose) && println("Result recieved on subscription with ID $sub_id")
finish = fn(payload)
if !isa(finish, Bool)
subscription_tracker[][sub_id] = "errored"
error("Subscription function must return a boolean")
end
end
end
end
break
catch ex
(established[] || attempt == max_attempts) && rethrow()
output_info(verbose) && println("WebSocket open failed (attempt $attempt/$max_attempts), retrying: ", ex)
sleep(0.5 * attempt)
end
end
# We can't throw errors from the ws handle function in HTTP.jl 1.0, as they get digested.
isassigned(throw_if_assigned) && throw(throw_if_assigned[])
Expand Down Expand Up @@ -167,7 +188,7 @@ function async_reader_with_timeout(ws::HTTP.WebSockets.WebSocket, subtimeout)::C
Base.throwto(reader_task, InterruptException())
end
timeout = Timer(timeout_cb, subtimeout)
data = HTTP.receive(ws)
data = HTTP.WebSockets.receive(ws)
subtimeout > 0 && close(timeout) # Cancel the timeout
put!(ch, data)
end
Expand Down Expand Up @@ -221,7 +242,7 @@ function readfromwebsocket(ws::HTTP.WebSockets.WebSocket, stopfn, subtimeout)
ch_out = async_reader_with_stopfn(ws, stopfn, checktime)
data = take!(ch_out)
else
data = HTTP.receive(ws)
data = HTTP.WebSockets.receive(ws)
end
return data
end
22 changes: 11 additions & 11 deletions test/http_execution.jl
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ end

# handle_error
@test_throws ArgumentError test_error_handler(GraphQLClient.handle_error, ArgumentError("msg"))
@test_throws HTTP.StatusError test_error_handler(GraphQLClient.handle_error, HTTP.StatusError(404, "POST", "", HTTP.Response(404;request=HTTP.Request(), body="{}")))
@test_throws GraphQLClient.GraphQLError test_error_handler(GraphQLClient.handle_error, HTTP.StatusError(400, "POST", "", HTTP.Response(400;request=HTTP.Request(), body="{}")))
@test_throws HTTP.StatusError test_error_handler(GraphQLClient.handle_error, HTTP.StatusError(404, HTTP.Response(404;request=HTTP.Request(), body=Vector{UInt8}("{}"))))
@test_throws GraphQLClient.GraphQLError test_error_handler(GraphQLClient.handle_error, HTTP.StatusError(400, HTTP.Response(400;request=HTTP.Request(), body=Vector{UInt8}("{}"))))

# handle_deserialisation_error
@test_throws MethodError test_error_handler(GraphQLClient.handle_deserialisation_error, MethodError(""), "", "")
Expand All @@ -27,7 +27,7 @@ end
# Argument error with "invalid JSON" but default type
@test_throws ArgumentError test_error_handler(GraphQLClient.handle_deserialisation_error, ArgumentError(""), "", Any)
# Actual deserialisation error
resp = HTTP.Response(200;body="{\"data\": {\"query\": 1}}")
resp = HTTP.Response(200;body=Vector{UInt8}("{\"data\": {\"query\": 1}}"))
@test_throws ArgumentError test_error_handler(
GraphQLClient.handle_deserialisation_error,
ArgumentError("invalid JSON at byte"),
Expand All @@ -44,7 +44,7 @@ end
end

# Actual error that resulted in a deserialisation error
resp = HTTP.Response(400;body="{\"errors\": [{\"message\": \"I stopped deserialisation!\"}]}")
resp = HTTP.Response(400;body=Vector{UInt8}("{\"errors\": [{\"message\": \"I stopped deserialisation!\"}]}"))
@test_throws GraphQLClient.GraphQLError test_error_handler(
GraphQLClient.handle_deserialisation_error,
ArgumentError("invalid JSON at byte"),
Expand All @@ -58,7 +58,7 @@ end
end

function local_server_success(port)
@async HTTP.serve(HTTP.Sockets.localhost, port) do req
HTTP.serve!("127.0.0.1", port) do req
execution_string = String(req.body)
return HTTP.Response("""
{
Expand All @@ -73,12 +73,12 @@ function local_server_success(port)
end

function local_server_success_json(port)
@async HTTP.serve(HTTP.Sockets.localhost, port) do req
HTTP.serve!("127.0.0.1", port) do req
return HTTP.Response(JSON3.write(
Dict(
"data" => Dict(
"queryName" => Dict(
"field" => JSON3.read(req.body)
"field" => JSON3.read(String(req.body))
)
)
)
Expand All @@ -87,7 +87,7 @@ function local_server_success_json(port)
end

function local_server_error(port)
@async HTTP.serve(HTTP.Sockets.localhost, port) do req
HTTP.serve!("127.0.0.1", port) do req
str = """
{
"data": {
Expand All @@ -110,7 +110,7 @@ end
# Successful query
port = 7999
local_server_success(7999)
client = Client("http://$(HTTP.Sockets.localhost):$port";introspect=false)
client = Client("http://127.0.0.1:$port";introspect=false)

execution_string = "execute this"
response = GraphQLClient._execute(client.endpoint, execution_string, Dict())
Expand All @@ -131,7 +131,7 @@ end
# Test error in response
port = 7996
local_server_error(7996)
client = Client("http://$(HTTP.Sockets.localhost):$port";introspect=false)
client = Client("http://127.0.0.1:$port";introspect=false)
execution_string = "execute this"
response = GraphQLClient._execute(client.endpoint, execution_string, Dict())
@test !isnothing(response.errors)
Expand All @@ -150,7 +150,7 @@ end
struct S2; field::S1; end
StructTypes.StructType(::Type{S1}) = StructTypes.Struct()
StructTypes.StructType(::Type{S2}) = StructTypes.Struct()
client = Client("http://$(HTTP.Sockets.localhost):$port";introspect=false)
client = Client("http://127.0.0.1:$port";introspect=false)
@inferred GraphQLClient.execute(client.endpoint, Dict("query" => "val"))
@inferred GraphQLClient.execute(client.endpoint, Dict("query" => "val"), Dict(), S2)
response = GraphQLClient.execute(client.endpoint, Dict("query" => "val"))
Expand Down
20 changes: 10 additions & 10 deletions test/subscriptions.jl
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
function listen_localhost()
@async HTTP.listen(HTTP.Sockets.localhost, 8080) do http
HTTP.listen!("127.0.0.1", 8080) do http
if HTTP.WebSockets.isupgrade(http.message)
HTTP.WebSockets.upgrade(http) do ws
for data in ws
HTTP.send(ws, data)
HTTP.WebSockets.send(ws, data)
end
end
end
end
end

function do_nothing_localhost()
@async HTTP.listen(HTTP.Sockets.localhost, 8081) do http
HTTP.listen!("127.0.0.1", 8081) do http
if HTTP.WebSockets.isupgrade(http.message)
HTTP.WebSockets.upgrade(http) do ws
for data in ws
Expand All @@ -30,7 +30,7 @@ end
@test take!(ch) == :timeout

ch = GraphQLClient.async_reader_with_timeout(ws, 5)
HTTP.send(ws, "Data")
HTTP.WebSockets.send(ws, "Data")
@test String(take!(ch)) == "Data"

# stopfn
Expand All @@ -43,11 +43,11 @@ end
@test take!(ch) == :stopfn
stop[] = false
ch = GraphQLClient.async_reader_with_stopfn(ws, stopfn, 0.5)
HTTP.send(ws, "Data")
HTTP.WebSockets.send(ws, "Data")
@test String(take!(ch)) == "Data"

# readfromwebsocket - no timeout or stopfn
HTTP.send(ws, "Data")
HTTP.WebSockets.send(ws, "Data")
@test String(GraphQLClient.readfromwebsocket(ws, nothing, 0)) == "Data"

# readfromwebsocket - timeout
Expand All @@ -68,7 +68,7 @@ end
end

function send_error_localhost(message, port)
@async HTTP.listen(HTTP.Sockets.localhost, port) do http
HTTP.listen!("127.0.0.1", port) do http
if HTTP.WebSockets.isupgrade(http.message)
HTTP.WebSockets.upgrade(http) do ws
for data in ws
Expand All @@ -90,15 +90,15 @@ function send_error_localhost(message, port)
}
}
"""
HTTP.send(ws, error_payload)
HTTP.WebSockets.send(ws, error_payload)
end
end
end
end
end

function send_data_localhost(sub_name, port)
@async HTTP.listen(HTTP.Sockets.localhost, port) do http
HTTP.listen!("127.0.0.1", port) do http
if HTTP.WebSockets.isupgrade(http.message)
HTTP.WebSockets.upgrade(http) do ws
for data in ws
Expand All @@ -116,7 +116,7 @@ function send_data_localhost(sub_name, port)
}
}
"""
HTTP.send(ws, data_payload)
HTTP.WebSockets.send(ws, data_payload)
end
end
end
Expand Down
Loading