Skip to content

Commit 6250640

Browse files
committed
buffer: add feature to evacuate chunk files when retry limit
Signed-off-by: Daijiro Fukuda <fukuda@clear-code.com>
1 parent e5c1623 commit 6250640

File tree

6 files changed

+387
-1
lines changed

6 files changed

+387
-1
lines changed

lib/fluent/plugin/buf_file.rb

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,22 @@ def handle_broken_files(path, mode, e)
229229
File.unlink(path, path + '.meta') rescue nil
230230
end
231231

232+
def evacuate_chunk(chunk)
233+
unless chunk.is_a?(Fluent::Plugin::Buffer::FileChunk)
234+
raise ArgumentError, "The chunk must be FileChunk, but it was #{chunk.class}."
235+
end
236+
safe_owner_id = owner.plugin_id.gsub(/[ "\/\\:;|*<>?]/, '_')
237+
backup_base_dir = system_config.root_dir || DEFAULT_BACKUP_DIR
238+
backup_dir = File.join(backup_base_dir, 'buffer', safe_owner_id)
239+
240+
FileUtils.mkdir_p(backup_dir, mode: system_config.dir_permission || Fluent::DEFAULT_DIR_PERMISSION) unless Dir.exist?(backup_dir)
241+
242+
FileUtils.copy([chunk.path, chunk.meta_path], backup_dir)
243+
log.warn "chunk files are evacuated to #{backup_dir}.", chunk_id: dump_unique_id_hex(chunk.unique_id)
244+
rescue => e
245+
log.error "unexpected error while evacuating chunk files.", error: e
246+
end
247+
232248
private
233249

234250
def escaped_patterns(patterns)

lib/fluent/plugin/buf_file_single.rb

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,22 @@ def handle_broken_files(path, mode, e)
241241
File.unlink(path) rescue nil
242242
end
243243

244+
def evacuate_chunk(chunk)
245+
unless chunk.is_a?(Fluent::Plugin::Buffer::FileSingleChunk)
246+
raise ArgumentError, "The chunk must be FileSingleChunk, but it was #{chunk.class}."
247+
end
248+
safe_owner_id = owner.plugin_id.gsub(/[ "\/\\:;|*<>?]/, '_')
249+
backup_base_dir = system_config.root_dir || DEFAULT_BACKUP_DIR
250+
backup_dir = File.join(backup_base_dir, 'buffer', safe_owner_id)
251+
252+
FileUtils.mkdir_p(backup_dir, mode: system_config.dir_permission || Fluent::DEFAULT_DIR_PERMISSION) unless Dir.exist?(backup_dir)
253+
254+
FileUtils.copy(chunk.path, backup_dir)
255+
log.warn "chunk files are evacuated to #{backup_dir}.", chunk_id: dump_unique_id_hex(chunk.unique_id)
256+
rescue => e
257+
log.error "unexpected error while evacuating chunk files.", error: e
258+
end
259+
244260
private
245261

246262
def escaped_patterns(patterns)

lib/fluent/plugin/buffer.rb

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -625,6 +625,7 @@ def clear_queue!
625625
until @queue.empty?
626626
begin
627627
q = @queue.shift
628+
evacuate_chunk(q)
628629
log.trace("purging a chunk in queue"){ {id: dump_unique_id_hex(chunk.unique_id), bytesize: chunk.bytesize, size: chunk.size} }
629630
q.purge
630631
rescue => e
@@ -636,6 +637,25 @@ def clear_queue!
636637
end
637638
end
638639

640+
def evacuate_chunk(chunk)
641+
# Overwrite this on demand.
642+
#
643+
# Note: Difference from the `backup` feature.
644+
# The `backup` feature is for unrecoverable errors, mainly for bad chunks.
645+
# On the other hand, this feature is for normal chunks.
646+
# The main motivation for this feature is to enable recovery by evacuating buffer files
647+
# when the retry limit is reached due to external factors such as network issues.
648+
#
649+
# Note: Difference from the `secondary` feature.
650+
# The `secondary` feature is not suitable for recovery.
651+
# It can be difficult to recover files made by `out_secondary_file` because the metadata
652+
# is lost.
653+
# For file buffers, the easiest way for recovery is to evacuate the chunk files as is.
654+
# Once the issue is recovered, we can put back the chunk files, and restart Fluentd to
655+
# load them.
656+
# This feature enables it.
657+
end
658+
639659
def chunk_size_over?(chunk)
640660
chunk.bytesize > @chunk_limit_size || (@chunk_limit_records && chunk.size > @chunk_limit_records)
641661
end

lib/fluent/plugin/buffer/file_chunk.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ class FileChunkError < StandardError; end
3737
# path_prefix: path prefix string, ended with '.'
3838
# path_suffix: path suffix string, like '.log' (or any other user specified)
3939

40-
attr_reader :path, :permission
40+
attr_reader :path, :meta_path, :permission
4141

4242
def initialize(metadata, path, mode, perm: nil, compress: :text)
4343
super(metadata, compress: compress)

test/plugin/test_buf_file.rb

Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,34 @@ def write(chunk)
2020
# drop
2121
end
2222
end
23+
24+
class DummyErrorOutputPlugin < DummyOutputPlugin
25+
def register_write(&block)
26+
instance_variable_set("@write", block)
27+
end
28+
29+
def initialize
30+
super
31+
@should_fail_writing = true
32+
@write = nil
33+
end
34+
35+
def recover
36+
@should_fail_writing = false
37+
end
38+
39+
def write(chunk)
40+
if @should_fail_writing
41+
raise "failed writing chunk"
42+
else
43+
@write ? @write.call(chunk) : nil
44+
end
45+
end
46+
47+
def format(tag, time, record)
48+
[tag, time.to_i, record].to_json + "\n"
49+
end
50+
end
2351
end
2452

2553
class FileBufferTest < Test::Unit::TestCase
@@ -1311,4 +1339,143 @@ def compare_log(plugin, msg)
13111339
assert { not File.exist?("#{@bufdir}/backup/worker0/#{@id_output}/#{@d.dump_unique_id_hex(c2id)}.log") }
13121340
end
13131341
end
1342+
1343+
sub_test_case 'evacuate_chunk' do
1344+
def setup
1345+
Fluent::Test.setup
1346+
1347+
@now = Time.local(2025, 5, 30, 17, 0, 0)
1348+
@base_dir = File.expand_path("../../tmp/evacuate_chunk", __FILE__)
1349+
@buf_dir = File.join(@base_dir, "buffer")
1350+
@root_dir = File.join(@base_dir, "root")
1351+
FileUtils.mkdir_p(@root_dir)
1352+
1353+
Fluent::SystemConfig.overwrite_system_config("root_dir" => @root_dir) do
1354+
Timecop.freeze(@now)
1355+
yield
1356+
end
1357+
ensure
1358+
Timecop.return
1359+
FileUtils.rm_rf(@base_dir)
1360+
end
1361+
1362+
def start_plugin(plugin)
1363+
plugin.start
1364+
plugin.after_start
1365+
end
1366+
1367+
def stop_plugin(plugin)
1368+
plugin.stop unless plugin.stopped?
1369+
plugin.before_shutdown unless plugin.before_shutdown?
1370+
plugin.shutdown unless plugin.shutdown?
1371+
plugin.after_shutdown unless plugin.after_shutdown?
1372+
plugin.close unless plugin.closed?
1373+
plugin.terminate unless plugin.terminated?
1374+
end
1375+
1376+
def configure_output(id, chunk_key, buffer_conf)
1377+
output = FluentPluginFileBufferTest::DummyErrorOutputPlugin.new
1378+
output.configure(
1379+
config_element('ROOT', '', {'@id' => id}, [config_element('buffer', chunk_key, buffer_conf)])
1380+
)
1381+
yield output
1382+
ensure
1383+
stop_plugin(output)
1384+
end
1385+
1386+
def wait(sec: 4)
1387+
waiting(sec) do
1388+
Thread.pass until yield
1389+
end
1390+
end
1391+
1392+
def emit_events(output, tag, es)
1393+
output.interrupt_flushes
1394+
output.emit_events("test.1", dummy_event_stream)
1395+
@now += 1
1396+
Timecop.freeze(@now)
1397+
output.enqueue_thread_wait
1398+
output.flush_thread_wakeup
1399+
end
1400+
1401+
def proceed_to_next_retry(output)
1402+
@now += 1
1403+
Timecop.freeze(@now)
1404+
output.flush_thread_wakeup
1405+
end
1406+
1407+
def dummy_event_stream
1408+
Fluent::ArrayEventStream.new([
1409+
[ event_time("2025-05-30 10:00:00"), {"message" => "data1"} ],
1410+
[ event_time("2025-05-30 10:10:00"), {"message" => "data2"} ],
1411+
[ event_time("2025-05-30 10:20:00"), {"message" => "data3"} ],
1412+
])
1413+
end
1414+
1415+
def evacuate_dir(plugin_id)
1416+
File.join(@root_dir, "buffer", plugin_id)
1417+
end
1418+
1419+
test 'can recover by putting back evacuated chunk files' do
1420+
plugin_id = "test_output"
1421+
tag = "test.1"
1422+
buffer_conf = {
1423+
"path" => @buf_dir,
1424+
"flush_mode" => "interval",
1425+
"flush_interval" => "1s",
1426+
"retry_type" => "periodic",
1427+
"retry_max_times" => 1,
1428+
"retry_randomize" => false,
1429+
}
1430+
1431+
# Fail flushing and reach retry limit
1432+
configure_output(plugin_id, "tag", buffer_conf) do |output|
1433+
start_plugin(output)
1434+
1435+
emit_events(output, tag, dummy_event_stream)
1436+
wait { output.write_count == 1 and output.num_errors == 1 }
1437+
1438+
proceed_to_next_retry(output)
1439+
wait { output.write_count == 2 and output.num_errors == 2 }
1440+
wait { Dir.empty?(@buf_dir) }
1441+
1442+
# Assert evacuated files
1443+
evacuated_files = Dir.children(evacuate_dir(plugin_id)).map do |child_name|
1444+
File.join(evacuate_dir(plugin_id), child_name)
1445+
end
1446+
assert { evacuated_files.size == 2 } # .log and .log.meta
1447+
1448+
# Put back evacuated chunk files for recovery
1449+
FileUtils.move(evacuated_files, @buf_dir)
1450+
end
1451+
1452+
# Restart plugin to load the chunk files that were put back
1453+
written_data = []
1454+
configure_output(plugin_id, "tag", buffer_conf) do |output|
1455+
output.recover
1456+
output.register_write do |chunk|
1457+
written_data << chunk.read
1458+
end
1459+
start_plugin(output)
1460+
1461+
wait { not written_data.empty? }
1462+
end
1463+
1464+
# Assert the recovery success
1465+
assert { written_data.length == 1 }
1466+
1467+
expected_records = []
1468+
dummy_event_stream.each do |(time, record)|
1469+
expected_records << [tag, time.to_i, record]
1470+
end
1471+
1472+
actual_records = StringIO.open(written_data.first) do |io|
1473+
io.each_line.map do |line|
1474+
JSON.parse(line)
1475+
end
1476+
end
1477+
1478+
assert_equal(expected_records, actual_records)
1479+
end
1480+
end
13141481
end

0 commit comments

Comments
 (0)