Skip to content

Commit f36883f

Browse files
authored
add serializer and deserializer as client arguments (#12)
1 parent 5567d2e commit f36883f

File tree

4 files changed

+58
-15
lines changed

4 files changed

+58
-15
lines changed

Project.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
name = "ElasticsearchClient"
22
uuid = "e586a49d-aa29-4ce5-8f91-fa4f824367bd"
33
authors = ["Egor Shmorgun <egor.shmorgun@opensesame.com>"]
4-
version = "0.2.9"
4+
version = "0.2.10"
55

66
[deps]
77
CodecZlib = "944b1d66-785c-5afd-91f1-9de20f533193"
@@ -24,6 +24,7 @@ julia = "1"
2424

2525
[extras]
2626
Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40"
27+
JSON3 = "0f8b85d8-7281-11e9-16c2-39a750bddbf1"
2728

2829
[targets]
29-
test = ["Test"]
30+
test = ["Test", "JSON3"]

src/elastic_transport/client.jl

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
using HTTP
22
using URIs
33
using Mocking
4+
using JSON
45

56
const DEFAULT_PORT = 9200
67
const DEFAULT_PROTOCOL = "http"
@@ -19,8 +20,10 @@ Create a client connected to an Elastic cluster.
1920
2021
# Possible arguments
2122
22-
- `http_client::Module`: A module that implement request method. Maybe useful if you need custom http layers. HTTP.jl used bu default.
23+
- `http_client::Module`: A module that implement request method. Maybe useful if you need custom http layers. HTTP.jl used bu default.
2324
- `hosts`: Single host passed as a String, Dict or NamedTuple, or multiple hosts passed as an Array; `host`, `url`, `urls` keys are also valid
25+
- `serializer`: Function to serialise the body to JSON. JSON.json by default
26+
- `deserializer`: Function to deserialise response body to Dict. JSON.parse by default
2427
- `resurrect_after::Integer`: After how many seconds a dead connection should be tried again
2528
- `reload_connection::Bool`: Reload connections after X requests (false by default)
2629
- `randomize_hosts::Bool`: Shuffle connections on initialization and reload (false by default)
@@ -45,7 +48,12 @@ mutable struct Client
4548
transport::Transport
4649
end
4750

48-
function Client(;http_client::Module=HTTP, kwargs...)
51+
function Client(;
52+
http_client::Module=HTTP,
53+
serializer::Function=JSON.json,
54+
deserializer::Function=JSON.parse,
55+
kwargs...
56+
)
4957
arguments = Dict{Symbol, Any}(kwargs)
5058
options = deepcopy(arguments)
5159

@@ -71,7 +79,13 @@ function Client(;http_client::Module=HTTP, kwargs...)
7179
options[:transport_options][:request] = Dict(:timeout => arguments[:request_timeout])
7280
end
7381

74-
transport = Transport(; hosts=hosts, options=options, http_client=http_client)
82+
transport = Transport(;
83+
hosts=hosts,
84+
options=options,
85+
http_client=http_client,
86+
serializer=serializer,
87+
deserializer=deserializer
88+
)
7589

7690
Client(
7791
arguments,

src/elastic_transport/transport/transport.jl

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,11 @@ mutable struct Transport
2727
retry_on_status::Vector{Integer}
2828
verbose::Integer
2929
http_client::Module
30+
serializer::Function
31+
deserializer::Function
3032
end
3133

32-
function Transport(; hosts::Vector=[], options=Dict(), http_client::Module=HTTP)
34+
function Transport(; hosts::Vector=[], options=Dict(), http_client::Module, serializer::Function, deserializer::Function)
3335
!haskey(options, :http) && (options[:http] = Dict())
3436
!haskey(options, :retry_on_status) && (options[:retry_on_status] = Integer[])
3537
!haskey(options, :delay_on_retry) && (options[:delay_on_retry] = 0)
@@ -49,7 +51,9 @@ function Transport(; hosts::Vector=[], options=Dict(), http_client::Module=HTTP)
4951
get(options, :resurrect_after, DEFAULT_RESURRECT_AFTER),
5052
options[:retry_on_status],
5153
options[:verbose],
52-
http_client
54+
http_client,
55+
serializer,
56+
deserializer
5357
)
5458
end
5559

@@ -173,7 +177,7 @@ function perform_request(
173177

174178
headers = Connections.parse_headers(connection, headers)
175179
if !isnothing(body) && !isa(body, String)
176-
body = JSON.json(body)
180+
body = transport.serializer(body)
177181
end
178182
body, headers = compress_request(transport, body, headers)
179183

@@ -247,7 +251,7 @@ function perform_request(
247251
end
248252

249253
if !isempty(response_body) && !isnothing(response_content_type) && !isnothing(match(r"json"i, response_content_type))
250-
json = JSON.parse(response_body)
254+
json = transport.deserializer(response_body)
251255
took = if json isa Dict
252256
get(json, "took", "n/a")
253257
end
@@ -302,6 +306,6 @@ function log_response(method, body, url, response_status, response_body, took, d
302306
message_level,
303307
verbose
304308
)
305-
!isnothing(body) && @debug "> $(JSON.json(body))"
309+
!isnothing(body) && log_message("> $body", Logging.Debug, verbose)
306310
log_message("< $(String(response_body))", Logging.Debug, verbose)
307311
end

test/elastic_transport_test/transport_test.jl

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,13 @@ using Test
33
using Mocking
44
using HTTP
55
using JSON
6+
using JSON3
67

78
Mocking.activate()
89

10+
serializer = JSON.json
11+
deserializer = JSON.parse
12+
913
hosts = [
1014
Dict{Symbol, Any}(:host => "localhost", :schema => "https"),
1115
Dict{Symbol, Any}(:host => "127.0.0.1", :schema => "http", :port => 9250),
@@ -102,15 +106,15 @@ nodes_response_mock = HTTP.Response(
102106

103107
@testset "Transport test" begin
104108
@testset "Transport initialization" begin
105-
transport = ElasticsearchClient.ElasticTransport.Transport(;hosts, options)
109+
transport = ElasticsearchClient.ElasticTransport.Transport(;hosts, options, http_client=HTTP, serializer=serializer, deserializer=deserializer)
106110

107111
@test length(transport.connections.connections) == length(hosts)
108112
@test transport.use_compression == options[:compression]
109113
@test transport.retry_on_status == options[:retry_on_status]
110114
end
111115

112116
@testset "Performing request" begin
113-
transport = ElasticsearchClient.ElasticTransport.Transport(;hosts, options=options)
117+
transport = ElasticsearchClient.ElasticTransport.Transport(;hosts, options=options, http_client=HTTP, serializer=serializer, deserializer=deserializer)
114118

115119
@testset "Testing with successful response" begin
116120
@testset "Testing GET request with params" begin
@@ -223,14 +227,34 @@ nodes_response_mock = HTTP.Response(
223227
@test length(ElasticsearchClient.ElasticTransport.Connections.dead(transport.connections)) == 1
224228
end
225229
end
230+
231+
@testset "Testing GET request with custom serializer/deserializer" begin
232+
http_patch = @patch HTTP.request(args...;kwargs...) = successful_health_response_mock
233+
custom_transport = ElasticsearchClient.ElasticTransport.Transport(;
234+
hosts,
235+
options=options,
236+
http_client=HTTP,
237+
serializer=JSON3.write,
238+
deserializer=JSON3.read
239+
)
240+
241+
242+
apply(http_patch) do
243+
response = ElasticsearchClient.ElasticTransport.perform_request(custom_transport, "GET", "/_cluster/health"; params = Dict("pretty" => true))
244+
245+
@test response isa HTTP.Response
246+
@test response.status == 200
247+
@test haskey(response.body, :cluster_name)
248+
end
249+
end
226250
end
227251
end
228252

229253
@testset "Testing sniffing" begin
230254
@testset "Testing successful sniffing" begin
231255
http_patch = @patch HTTP.request(args...;kwargs...) = nodes_response_mock
232256

233-
transport = ElasticsearchClient.ElasticTransport.Transport(;hosts, options=options)
257+
transport = ElasticsearchClient.ElasticTransport.Transport(;hosts, options=options, http_client=HTTP, serializer=serializer, deserializer=deserializer)
234258

235259
apply(http_patch) do
236260
hosts = ElasticsearchClient.ElasticTransport.sniff_hosts(transport) |>
@@ -256,7 +280,7 @@ nodes_response_mock = HTTP.Response(
256280
@testset "Testing sniffing timeout" begin
257281
http_patch = @patch HTTP.request(args...;kwargs...) = sleep(ElasticsearchClient.ElasticTransport.DEFAULT_SNIFFING_TIMEOUT + 0.5)
258282

259-
transport = ElasticsearchClient.ElasticTransport.Transport(;hosts, options=options)
283+
transport = ElasticsearchClient.ElasticTransport.Transport(;hosts, options=options, http_client=HTTP, serializer=serializer, deserializer=deserializer)
260284

261285
apply(http_patch) do
262286
@test_throws ElasticsearchClient.ElasticTransport.SniffingTimetoutError ElasticsearchClient.ElasticTransport.sniff_hosts(transport)
@@ -267,7 +291,7 @@ nodes_response_mock = HTTP.Response(
267291
@testset "Testing reload connections" begin
268292
http_patch = @patch HTTP.request(args...;kwargs...) = nodes_response_mock
269293

270-
transport = ElasticsearchClient.ElasticTransport.Transport(;hosts, options=options)
294+
transport = ElasticsearchClient.ElasticTransport.Transport(;hosts, options=options, http_client=HTTP, serializer=serializer, deserializer=deserializer)
271295

272296
apply(http_patch) do
273297
ElasticsearchClient.ElasticTransport.reload_connections!(transport)

0 commit comments

Comments
 (0)