Skip to content

Commit 3a6826a

Browse files
authored
out_forward: add warning for zstd as experimental (#4896)
**Which issue(s) this PR fixes**: * Related to #4758 **What this PR does / why we need it**: When using `compress zstd` for `out_forward`, output the following warning log. ``` zstd compression feature is an experimental new feature supported since v1.19.0. Please make sure that the destination server also supports this feature before using it. in_forward plugin for Fluentd supports it since v1.19.0. ``` In #4758, it was agreed that we would refrain from updating [the forward protocol v1.5](https://github.com/fluent/fluentd/wiki/Forward-Protocol-Specification-v1.5) for the time being and instead output a log message indicating that this feature is experimental. > for now, we will make zstd available as an exprimenatal feature on the Fluentd side, and note in the documentation that it is an experimental feature that cannot be used unless the server supports it. > Just displaying as a warning log to tell: "This is experimental feature and not standardized yet" or similar message should be enough. * #4758 (comment) * #4758 (comment) * #4758 (comment) **Docs Changes**: Not needed. (It should be done as #4657) **Release Note**: Not needed. (A note for #4657 would be enough.) Signed-off-by: Daijiro Fukuda <fukuda@clear-code.com>
1 parent bf8f5f2 commit 3a6826a

File tree

2 files changed

+61
-61
lines changed

2 files changed

+61
-61
lines changed

lib/fluent/plugin/out_forward.rb

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -271,9 +271,15 @@ def configure(conf)
271271
end
272272

273273
raise Fluent::ConfigError, "ack_response_timeout must be a positive integer" if @ack_response_timeout < 1
274+
275+
if @compress == :zstd
276+
log.warn "zstd compression feature is an experimental new feature supported since v1.19.0." +
277+
" Please make sure that the destination server also supports this feature before using it." +
278+
" in_forward plugin for Fluentd supports it since v1.19.0."
279+
end
280+
274281
@healthy_nodes_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "healthy_nodes_count", help_text: "Number of count healthy nodes", prefer_gauge: true)
275282
@registered_nodes_count_metrics = metrics_create(namespace: "fluentd", subsystem: "output", name: "registered_nodes_count", help_text: "Number of count registered nodes", prefer_gauge: true)
276-
277283
end
278284

279285
def multi_workers_ready?

test/plugin/test_out_forward.rb

Lines changed: 54 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,60 @@ def try_write(chunk)
178178
assert{ logs.any?{|log| log.include?(expected_log) && log.include?(expected_detail) } }
179179
end
180180

181+
sub_test_case 'configure compress' do
182+
data('default', ['', :text])
183+
data('gzip', ['compress gzip', :gzip])
184+
data('zstd', ['compress zstd', :zstd])
185+
test 'should be applied' do |(option, expected)|
186+
@d = d = create_driver(config + option)
187+
node = d.instance.nodes.first
188+
189+
assert_equal(
190+
[expected, expected],
191+
[d.instance.compress, node.instance_variable_get(:@compress)]
192+
)
193+
end
194+
195+
data('default' => '')
196+
data('gzip' => 'compress gzip')
197+
data('zstd' => 'compress zstd')
198+
test 'should log as experimental only for zstd' do |option|
199+
@d = d = create_driver(config + option)
200+
201+
log_message = "zstd compression feature is an experimental new feature"
202+
assert do
203+
if d.instance.compress == :zstd
204+
d.logs.any? { |log| log.include?(log_message) }
205+
else
206+
d.logs.none? { |log| log.include?(log_message) }
207+
end
208+
end
209+
end
210+
211+
# TODO add tests that we cannot configure the different compress type between owner and buffer except for :text
212+
data('gzip', ['compress gzip', :text, :gzip])
213+
data('zstd', ['compress zstd', :text, :zstd])
214+
test 'can configure buffer compress separately when owner uses :text' do |(buffer_option, expected_owner_compress, expected_buffer_compress)|
215+
@d = d = create_driver(config + %[
216+
<buffer>
217+
type memory
218+
#{buffer_option}
219+
</buffer>
220+
])
221+
node = d.instance.nodes.first
222+
223+
assert_equal(
224+
[expected_owner_compress, expected_owner_compress, expected_buffer_compress],
225+
[d.instance.compress, node.instance_variable_get(:@compress), d.instance.buffer.compress],
226+
)
227+
228+
log_message = "buffer is compressed. If you also want to save the bandwidth of a network, Add `compress` configuration in <match>"
229+
assert do
230+
d.logs.any? { |log| log.include?(log_message) }
231+
end
232+
end
233+
end
234+
181235
data('CA cert' => 'tls_ca_cert_path',
182236
'non CA cert' => 'tls_cert_path')
183237
test 'configure tls_cert_path/tls_ca_cert_path' do |param|
@@ -326,66 +380,6 @@ def try_write(chunk)
326380
assert_equal 1234, d.instance.discovery_manager.services[0].port
327381
end
328382

329-
test 'compress_default_value' do
330-
@d = d = create_driver
331-
assert_equal :text, d.instance.compress
332-
333-
node = d.instance.nodes.first
334-
assert_equal :text, node.instance_variable_get(:@compress)
335-
end
336-
337-
test 'set_compress_is_gzip' do
338-
@d = d = create_driver(config + %[compress gzip])
339-
assert_equal :gzip, d.instance.compress
340-
assert_equal :gzip, d.instance.buffer.compress
341-
342-
node = d.instance.nodes.first
343-
assert_equal :gzip, node.instance_variable_get(:@compress)
344-
end
345-
346-
test 'set_compress_is_zstd' do
347-
@d = d = create_driver(config + %[compress zstd])
348-
assert_equal :zstd, d.instance.compress
349-
assert_equal :zstd, d.instance.buffer.compress
350-
351-
node = d.instance.nodes.first
352-
assert_equal :zstd, node.instance_variable_get(:@compress)
353-
end
354-
355-
test 'set_compress_is_gzip_in_buffer_section' do
356-
mock = flexmock($log)
357-
mock.should_receive(:log).with("buffer is compressed. If you also want to save the bandwidth of a network, Add `compress` configuration in <match>")
358-
359-
@d = d = create_driver(config + %[
360-
<buffer>
361-
type memory
362-
compress gzip
363-
</buffer>
364-
])
365-
assert_equal :text, d.instance.compress
366-
assert_equal :gzip, d.instance.buffer.compress
367-
368-
node = d.instance.nodes.first
369-
assert_equal :text, node.instance_variable_get(:@compress)
370-
end
371-
372-
test 'set_compress_is_zstd_in_buffer_section' do
373-
mock = flexmock($log)
374-
mock.should_receive(:log).with("buffer is compressed. If you also want to save the bandwidth of a network, Add `compress` configuration in <match>")
375-
376-
@d = d = create_driver(config + %[
377-
<buffer>
378-
type memory
379-
compress zstd
380-
</buffer>
381-
])
382-
assert_equal :text, d.instance.compress
383-
assert_equal :zstd, d.instance.buffer.compress
384-
385-
node = d.instance.nodes.first
386-
assert_equal :text, node.instance_variable_get(:@compress)
387-
end
388-
389383
test 'phi_failure_detector disabled' do
390384
@d = d = create_driver(config + %[phi_failure_detector false \n phi_threshold 0])
391385
node = d.instance.nodes.first

0 commit comments

Comments
 (0)