Skip to content

Commit 1a773b0

Browse files
authored
in_forward: enable skip_invalid_event by default (#5003)
**Which issue(s) this PR fixes**: Fixes # **What this PR does / why we need it**: This PR will enable `skip_invalid_event` in `in_forward` by default. If broken data is received for any reason, it may cause the following error. ``` 2025-06-12 18:27:30 +0900 [error]: unexpected error on reading data host="127.0.0.1" port=55970 error_class=TypeError error="can't convert String into an exact number" 2025-06-12 18:27:30 +0900 [error]: <internal:timev>:325:in 'Time.at' 2025-06-12 18:27:30 +0900 [error]: /Users/watson/src/fluentd/lib/fluent/plugin/formatter_stdout.rb:42:in 'Fluent::Plugin::StdoutFormatter#format' 2025-06-12 18:27:30 +0900 [error]: /Users/watson/src/fluentd/lib/fluent/plugin/out_stdout.rb:67:in 'Fluent::Plugin::StdoutOutput#format' 2025-06-12 18:27:30 +0900 [error]: /Users/watson/src/fluentd/lib/fluent/plugin/out_stdout.rb:60:in 'block in Fluent::Plugin::StdoutOutput#process' 2025-06-12 18:27:30 +0900 [error]: /Users/watson/src/fluentd/lib/fluent/event.rb:259:in 'block in Fluent::MessagePackEventStream#each' 2025-06-12 18:27:30 +0900 [error]: /Users/watson/src/fluentd/lib/fluent/event.rb:258:in 'Array#each' 2025-06-12 18:27:30 +0900 [error]: /Users/watson/src/fluentd/lib/fluent/event.rb:258:in 'Enumerable#each_with_index' 2025-06-12 18:27:30 +0900 [error]: /Users/watson/src/fluentd/lib/fluent/event.rb:258:in 'Fluent::MessagePackEventStream#each' 2025-06-12 18:27:30 +0900 [error]: /Users/watson/src/fluentd/lib/fluent/plugin/out_stdout.rb:59:in 'Fluent::Plugin::StdoutOutput#process' 2025-06-12 18:27:30 +0900 [error]: /Users/watson/src/fluentd/lib/fluent/plugin/output.rb:865:in 'emit_sync' 2025-06-12 18:27:30 +0900 [error]: /Users/watson/src/fluentd/lib/fluent/event_router.rb:115:in 'Fluent::EventRouter#emit_stream' 2025-06-12 18:27:30 +0900 [error]: /Users/watson/src/fluentd/lib/fluent/plugin/in_forward.rb:329:in 'Fluent::Plugin::ForwardInput#on_message' 2025-06-12 18:27:30 +0900 [error]: /Users/watson/src/fluentd/lib/fluent/plugin/in_forward.rb:226:in 'block in Fluent::Plugin::ForwardInput#handle_connection' 2025-06-12 18:27:30 +0900 [error]: /Users/watson/src/fluentd/lib/fluent/plugin/in_forward.rb:263:in 'block (3 levels) in Fluent::Plugin::ForwardInput#read_messages' 2025-06-12 18:27:30 +0900 [error]: /Users/watson/src/fluentd/lib/fluent/plugin/in_forward.rb:262:in 'MessagePack::Unpacker#feed_each' 2025-06-12 18:27:30 +0900 [error]: /Users/watson/src/fluentd/lib/fluent/plugin/in_forward.rb:262:in 'block (2 levels) in Fluent::Plugin::ForwardInput#read_messages' 2025-06-12 18:27:30 +0900 [error]: /Users/watson/src/fluentd/lib/fluent/plugin/in_forward.rb:273:in 'block in Fluent::Plugin::ForwardInput#read_messages' ... ``` When enable `skip_invalid_event`, the broken data are just skipped. ``` 2025-06-18 11:16:40 +0900 [info]: #0 fluentd worker is now running worker=0 2025-06-18 11:16:49 +0900 [warn]: #0 skip invalid event: host="127.0.0.1" tag="incoming" time=-17 record=nil 2025-06-18 11:16:49 +0900 [warn]: #0 skip invalid event: host="127.0.0.1" tag="incoming" time="\xBD\uFFFD\u0000hR \uFFFD:`uN\uFFFD\uFFFDmessage\uFFFD" record=nil 2025-06-18 11:16:49 +0900 [warn]: #0 skip invalid event: host="127.0.0.1" tag="incoming" time=50 record=nil 2025-06-18 11:16:49 +0900 [warn]: #0 skip invalid event: host="127.0.0.1" tag="incoming" time=97 record=nil ... ``` And, when `require_ack_response` was configured to `true` in out_forward, 1. in_forward causes `error_class=TypeError error="can't convert String into an exact number"` error by broken data. 2. in_forward can't sent the ack response by above error. 3. out_forward causes `error_class=Fluent::Plugin::ForwardOutput::NoNodesAvailable error="no nodes are available"` error because in_forward doesn't send the ack response. 4. out_forward will retry to send broken data. 5. back to `1.` in_forward and out_forward will repeat errors and retries indefinitely. If `skip_invalid_event` was configured to `true` in in_forward, in_forward doesn't cause the error by broken data, in_forward and out_forward don't repeat errors and retries. So, Fluentd will be more stable by enabled `skip_invalid_event`. ### About performance degradation It shows the processing time of `in_forward` when receiving 1 GB of data. * with `skip_invalid_event false` * 15.178013304 sec * with `skip_invalid_event true` * 15.859435514 sec Even when skip_invalid_event was enabled, I think the performance degradation was minimal. **Docs Changes**: fluent/fluentd-docs-gitbook#587 **Release Note**: Same as the title. Signed-off-by: Shizuo Fujita <fujita@clear-code.com>
1 parent 619e6b0 commit 1a773b0

File tree

3 files changed

+4
-10
lines changed

3 files changed

+4
-10
lines changed

lib/fluent/plugin/in_forward.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ class ForwardInput < Input
5555
desc 'Received chunk is dropped if it is larger than this value.'
5656
config_param :chunk_size_limit, :size, default: nil
5757
desc 'Skip an event if incoming event is invalid.'
58-
config_param :skip_invalid_event, :bool, default: false
58+
config_param :skip_invalid_event, :bool, default: true
5959

6060
desc "The field name of the client's source address."
6161
config_param :source_address_key, :string, default: nil

test/plugin/test_in_forward.rb

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -580,9 +580,6 @@ def create_driver(conf=base_config)
580580
end
581581
chunk = ["tag1", entries, { 'compressed' => 'gzip' }].to_msgpack
582582

583-
# check CompressedMessagePackEventStream is created
584-
mock(Fluent::CompressedMessagePackEventStream).new(entries, nil, 0, compress: :gzip)
585-
586583
d.run do
587584
Fluent::MessagePackFactory.msgpack_unpacker.feed_each(chunk) do |obj|
588585
option = d.instance.send(:on_message, obj, chunk.size, DUMMY_SOCK)
@@ -604,13 +601,10 @@ def create_driver(conf=base_config)
604601
entries = ''
605602
events.each do |_tag, _time, record|
606603
v = [_time, record].to_msgpack
607-
entries << compress(v)
604+
entries << compress(v, type: :zstd)
608605
end
609606
chunk = ["tag1", entries, { 'compressed' => 'zstd' }].to_msgpack
610607

611-
# check CompressedMessagePackEventStream is created
612-
mock(Fluent::CompressedMessagePackEventStream).new(entries, nil, 0, compress: :zstd)
613-
614608
d.run do
615609
Fluent::MessagePackFactory.msgpack_unpacker.feed_each(chunk) do |obj|
616610
option = d.instance.send(:on_message, obj, chunk.size, DUMMY_SOCK)

test/plugin/test_out_forward.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -541,7 +541,7 @@ def try_write(chunk)
541541
end
542542

543543
test 'send_compressed_message_pack_stream_if_compress_is_gzip' do
544-
target_input_driver = create_target_input_driver
544+
target_input_driver = create_target_input_driver(conf: target_config + "skip_invalid_event false")
545545

546546
@d = d = create_driver(config + %[
547547
flush_interval 1s
@@ -571,7 +571,7 @@ def try_write(chunk)
571571
end
572572

573573
test 'send_compressed_message_pack_stream_if_compress_is_zstd' do
574-
target_input_driver = create_target_input_driver
574+
target_input_driver = create_target_input_driver(conf: target_config + "skip_invalid_event false")
575575

576576
@d = d = create_driver(config + %[
577577
flush_interval 1s

0 commit comments

Comments
 (0)