input: add storage.total_limit_size to cap filesystem buffer bytes#11734
input: add storage.total_limit_size to cap filesystem buffer bytes#11734ashinde-ai wants to merge 3 commits intofluent:masterfrom
Conversation
When an input plugin uses filesystem-backed storage, chunks can accumulate on disk without any byte-level limit. The existing controls are either memory-only (mem_buf_limit, which is skipped for filesystem storage in flb_input_chunk_protect) or count-based (storage.pause_on_chunks_overlimit + max_chunks_up). Add a new storage.total_limit_size field to flb_input_instance, mirroring the same property already present on output instances. Parse the property in flb_input_set_property, accepting human- readable sizes (e.g. "2G") and defaulting to unlimited (-1). Signed-off-by: Amit Shinde <ashinde@databahn.ai> Made-with: Cursor
Add flb_input_chunk_fs_total_size() to compute the total bytes used by all chunks (both up and down) in an input's chunkio stream. Extend flb_input_chunk_is_storage_overlimit() to check the total filesystem size against the input's storage_total_limit_size when set. When the limit is reached the input is paused via the existing backpressure mechanism, and resumes once chunks are flushed below the threshold. Update flb_input_chunk_protect() to log a distinct warning when the pause is triggered by the total size limit versus the existing chunk-count overlimit. Signed-off-by: Amit Shinde <ashinde@databahn.ai> Made-with: Cursor
Add emitter_storage.total_limit_size to the rewrite_tag filter
config map. When set, the value is forwarded to the internal
emitter input instance as storage.total_limit_size, capping the
total filesystem buffer the emitter can consume.
This allows users to prevent unbounded disk growth when using
emitter_storage.type = filesystem under sustained backpressure.
Example configuration:
[FILTER]
Name rewrite_tag
Emitter_Storage.type filesystem
Emitter_Mem_Buf_Limit 500M
Emitter_Storage.total_limit_size 2G
Signed-off-by: Amit Shinde <ashinde@databahn.ai>
Made-with: Cursor
📝 WalkthroughWalkthroughThis change introduces a total filesystem storage size limit feature for input instances. It adds a new Changes
Sequence DiagramsequenceDiagram
participant User
participant Config as Configuration<br/>Parser
participant Input as Input<br/>Instance
participant ChunkMgr as Chunk<br/>Manager
participant Storage as Storage<br/>System
User->>Config: Set storage.total_limit_size = X
Config->>Input: Parse & store limit value
Input->>Input: storage_total_limit_size = X
loop On incoming data
User->>Input: Write data chunk
Input->>Storage: Store chunk
Storage->>ChunkMgr: Register chunk
ChunkMgr->>ChunkMgr: Calculate total FS size of all chunks
alt Total FS size >= limit
ChunkMgr->>Input: Pause input
Input->>Input: Set status = FLB_INPUT_PAUSED
ChunkMgr->>User: Log "storage total size overlimit"
else Total FS size < limit
ChunkMgr->>ChunkMgr: Continue processing
end
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Suggested labels
Suggested reviewers
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@plugins/filter_rewrite_tag/rewrite_tag.c`:
- Around line 76-83: The code currently logs a failure when
flb_input_set_property(ins, "storage.total_limit_size",
ctx->emitter_storage_total_limit_size) returns -1 but continues emitter
initialization; change this to treat the failure as fatal: after detecting ret
== -1, call flb_plg_error with the existing message (including ctx->ins context)
and return an error status to abort emitter creation (e.g., return -1 or
propagate the error from the enclosing init function) so an invalid or
unapplyable ctx->emitter_storage_total_limit_size no longer results in an
unintended unlimited storage configuration.
In `@src/flb_input_chunk.c`:
- Around line 2400-2420: The function flb_input_chunk_fs_total_size currently
calls cio_chunk_get_real_size and can undercount active chunks; change the loop
to use the input-chunk wrapper flb_input_chunk_get_real_size instead: replace
mk_list_entry(..., struct cio_chunk, _head) with mk_list_entry(..., struct
flb_input_chunk, _head) (e.g., struct flb_input_chunk *chunk), call
flb_input_chunk_get_real_size(chunk) into bytes, and keep the existing if (bytes
> 0) total += bytes logic so the wrapper’s fallback for unsynced sizes is used.
- Around line 2517-2531: When deciding which pause reason to log in
flb_input_chunk handling, preserve the chunk-count overlimit message unless the
storage total-size cap (i->storage_total_limit_size) is actually exceeded:
compute fs_total via flb_input_chunk_fs_total_size(i) and if fs_total >=
i->storage_total_limit_size log the size-overlimit message (using flb_warn as
shown), otherwise emit the existing chunk-count warning using
storage->cio->total_chunks_up and storage->cio->max_chunks_up; also move the
declaration of size_t fs_total to the start of the function (not mid-block) to
comply with the variable-declaration guideline.
In `@src/flb_input.c`:
- Around line 922-940: The parsed size handling in the prop_key_check branch
uses flb_utils_size_to_bytes (assigned to int64_t limit) but only rejects == -1;
any other negative value will wrap when cast to size_t for
ins->storage_total_limit_size. Change the negative check to if (limit < 0) to
reject all negative returns from flb_utils_size_to_bytes before casting, keep
limit declared as int64_t at the top of the function (per guidelines) rather
than mid-block, preserve the existing zero-to-off mapping (if limit == 0 set to
-1), ensure flb_sds_destroy(tmp) is still called on error paths, and finally
assign ins->storage_total_limit_size only after handling negatives so the cast
is safe.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: f855c8e7-06d6-4aea-8db6-b9ccdf14df29
📒 Files selected for processing (5)
include/fluent-bit/flb_input.hplugins/filter_rewrite_tag/rewrite_tag.cplugins/filter_rewrite_tag/rewrite_tag.hsrc/flb_input.csrc/flb_input_chunk.c
| /* Set the storage total limit size (filesystem cap) */ | ||
| if (ctx->emitter_storage_total_limit_size) { | ||
| ret = flb_input_set_property(ins, "storage.total_limit_size", | ||
| ctx->emitter_storage_total_limit_size); | ||
| if (ret == -1) { | ||
| flb_plg_error(ctx->ins, "cannot set storage.total_limit_size"); | ||
| } | ||
| } |
There was a problem hiding this comment.
Fail emitter creation when the storage cap cannot be applied.
Line 80 logs the flb_input_set_property() failure but continues initializing the emitter, so an invalid or env-expanded emitter_storage.total_limit_size silently becomes unlimited.
🐛 Proposed fix
if (ctx->emitter_storage_total_limit_size) {
ret = flb_input_set_property(ins, "storage.total_limit_size",
ctx->emitter_storage_total_limit_size);
if (ret == -1) {
flb_plg_error(ctx->ins, "cannot set storage.total_limit_size");
+ flb_input_instance_destroy(ins);
+ return -1;
}
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| /* Set the storage total limit size (filesystem cap) */ | |
| if (ctx->emitter_storage_total_limit_size) { | |
| ret = flb_input_set_property(ins, "storage.total_limit_size", | |
| ctx->emitter_storage_total_limit_size); | |
| if (ret == -1) { | |
| flb_plg_error(ctx->ins, "cannot set storage.total_limit_size"); | |
| } | |
| } | |
| /* Set the storage total limit size (filesystem cap) */ | |
| if (ctx->emitter_storage_total_limit_size) { | |
| ret = flb_input_set_property(ins, "storage.total_limit_size", | |
| ctx->emitter_storage_total_limit_size); | |
| if (ret == -1) { | |
| flb_plg_error(ctx->ins, "cannot set storage.total_limit_size"); | |
| flb_input_instance_destroy(ins); | |
| return -1; | |
| } | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@plugins/filter_rewrite_tag/rewrite_tag.c` around lines 76 - 83, The code
currently logs a failure when flb_input_set_property(ins,
"storage.total_limit_size", ctx->emitter_storage_total_limit_size) returns -1
but continues emitter initialization; change this to treat the failure as fatal:
after detecting ret == -1, call flb_plg_error with the existing message
(including ctx->ins context) and return an error status to abort emitter
creation (e.g., return -1 or propagate the error from the enclosing init
function) so an invalid or unapplyable ctx->emitter_storage_total_limit_size no
longer results in an unintended unlimited storage configuration.
| /* | ||
| * Return the total bytes used by all chunks (up + down) belonging to | ||
| * an input instance's stream. Used to enforce storage.total_limit_size. | ||
| */ | ||
| static size_t flb_input_chunk_fs_total_size(struct flb_input_instance *i) | ||
| { | ||
| ssize_t bytes; | ||
| size_t total = 0; | ||
| struct cio_chunk *ch; | ||
| struct mk_list *head; | ||
| struct flb_storage_input *storage = (struct flb_storage_input *) i->storage; | ||
|
|
||
| mk_list_foreach(head, &storage->stream->chunks) { | ||
| ch = mk_list_entry(head, struct cio_chunk, _head); | ||
| bytes = cio_chunk_get_real_size(ch); | ||
| if (bytes > 0) { | ||
| total += bytes; | ||
| } | ||
| } | ||
| return total; | ||
| } |
There was a problem hiding this comment.
Use the input-chunk size wrapper to avoid undercounting active chunks.
Line 2414 bypasses flb_input_chunk_get_real_size(), which has the fallback for chunks whose real size is not synced yet. A non-empty active chunk can be counted as 0, delaying or skipping the new total-size cap.
🐛 Proposed fix
static size_t flb_input_chunk_fs_total_size(struct flb_input_instance *i)
{
ssize_t bytes;
size_t total = 0;
- struct cio_chunk *ch;
struct mk_list *head;
- struct flb_storage_input *storage = (struct flb_storage_input *) i->storage;
+ struct flb_input_chunk *ic;
- mk_list_foreach(head, &storage->stream->chunks) {
- ch = mk_list_entry(head, struct cio_chunk, _head);
- bytes = cio_chunk_get_real_size(ch);
+ mk_list_foreach(head, &i->chunks) {
+ ic = mk_list_entry(head, struct flb_input_chunk, _head);
+ bytes = flb_input_chunk_get_real_size(ic);
if (bytes > 0) {
total += bytes;
}
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/flb_input_chunk.c` around lines 2400 - 2420, The function
flb_input_chunk_fs_total_size currently calls cio_chunk_get_real_size and can
undercount active chunks; change the loop to use the input-chunk wrapper
flb_input_chunk_get_real_size instead: replace mk_list_entry(..., struct
cio_chunk, _head) with mk_list_entry(..., struct flb_input_chunk, _head) (e.g.,
struct flb_input_chunk *chunk), call flb_input_chunk_get_real_size(chunk) into
bytes, and keep the existing if (bytes > 0) total += bytes logic so the
wrapper’s fallback for unsynced sizes is used.
| if (i->storage_total_limit_size != (size_t) -1) { | ||
| size_t fs_total = flb_input_chunk_fs_total_size(i); | ||
| if (fs_total >= i->storage_total_limit_size) { | ||
| flb_warn("[input] %s paused (storage total size overlimit " | ||
| "%zuB/%zuB)", | ||
| flb_input_name(i), fs_total, | ||
| i->storage_total_limit_size); | ||
| } | ||
| } | ||
| else { | ||
| flb_warn("[input] %s paused (storage buf overlimit %zu/%zu)", | ||
| flb_input_name(i), | ||
| storage->cio->total_chunks_up, | ||
| storage->cio->max_chunks_up); | ||
| } |
There was a problem hiding this comment.
Preserve the chunk-count overlimit log when both limits are configured.
If storage.pause_on_chunks_overlimit trips first and storage_total_limit_size is set but not exceeded, this branch emits no pause reason. Fall back to the existing chunk-count warning unless the total-size cap is the actual cause.
🛠️ Proposed fix
static inline int flb_input_chunk_protect(struct flb_input_instance *i, size_t just_written_size)
{
+ int storage_total_limit_reached = FLB_FALSE;
+ size_t fs_total = 0;
struct flb_storage_input *storage = i->storage;
if (flb_input_chunk_is_storage_overlimit(i) == FLB_TRUE) {
if (i->storage_total_limit_size != (size_t) -1) {
- size_t fs_total = flb_input_chunk_fs_total_size(i);
+ fs_total = flb_input_chunk_fs_total_size(i);
if (fs_total >= i->storage_total_limit_size) {
flb_warn("[input] %s paused (storage total size overlimit "
"%zuB/%zuB)",
flb_input_name(i), fs_total,
i->storage_total_limit_size);
+ storage_total_limit_reached = FLB_TRUE;
}
}
- else {
+ if (storage_total_limit_reached == FLB_FALSE) {
flb_warn("[input] %s paused (storage buf overlimit %zu/%zu)",
flb_input_name(i),
storage->cio->total_chunks_up,As per coding guidelines, Declare variables at the start of functions, not mid-block.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/flb_input_chunk.c` around lines 2517 - 2531, When deciding which pause
reason to log in flb_input_chunk handling, preserve the chunk-count overlimit
message unless the storage total-size cap (i->storage_total_limit_size) is
actually exceeded: compute fs_total via flb_input_chunk_fs_total_size(i) and if
fs_total >= i->storage_total_limit_size log the size-overlimit message (using
flb_warn as shown), otherwise emit the existing chunk-count warning using
storage->cio->total_chunks_up and storage->cio->max_chunks_up; also move the
declaration of size_t fs_total to the start of the function (not mid-block) to
comply with the variable-declaration guideline.
| else if (prop_key_check("storage.total_limit_size", k, len) == 0 && tmp) { | ||
| int64_t limit; | ||
|
|
||
| if (strcasecmp(tmp, "off") == 0 || | ||
| flb_utils_bool(tmp) == FLB_FALSE) { | ||
| limit = -1; | ||
| } | ||
| else { | ||
| limit = flb_utils_size_to_bytes(tmp); | ||
| if (limit == -1) { | ||
| flb_sds_destroy(tmp); | ||
| return -1; | ||
| } | ||
| if (limit == 0) { | ||
| limit = -1; | ||
| } | ||
| } | ||
| flb_sds_destroy(tmp); | ||
| ins->storage_total_limit_size = (size_t) limit; |
There was a problem hiding this comment.
Reject all negative parsed sizes before casting to size_t.
Line 931 only rejects -1; other negative parsed values would wrap at Line 940 into a huge limit, effectively disabling the cap. Use < 0 and keep the storage-specific variable with the function-scope declarations.
🐛 Proposed fix
@@
- ssize_t limit;
+ ssize_t limit;
+ int64_t storage_limit;
@@
else if (prop_key_check("storage.total_limit_size", k, len) == 0 && tmp) {
- int64_t limit;
-
if (strcasecmp(tmp, "off") == 0 ||
flb_utils_bool(tmp) == FLB_FALSE) {
- limit = -1;
+ storage_limit = -1;
}
else {
- limit = flb_utils_size_to_bytes(tmp);
- if (limit == -1) {
+ storage_limit = flb_utils_size_to_bytes(tmp);
+ if (storage_limit < 0) {
flb_sds_destroy(tmp);
return -1;
}
- if (limit == 0) {
- limit = -1;
+ if (storage_limit == 0) {
+ storage_limit = -1;
}
}
flb_sds_destroy(tmp);
- ins->storage_total_limit_size = (size_t) limit;
+ ins->storage_total_limit_size = (size_t) storage_limit;
}As per coding guidelines, Declare variables at the start of functions, not mid-block.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/flb_input.c` around lines 922 - 940, The parsed size handling in the
prop_key_check branch uses flb_utils_size_to_bytes (assigned to int64_t limit)
but only rejects == -1; any other negative value will wrap when cast to size_t
for ins->storage_total_limit_size. Change the negative check to if (limit < 0)
to reject all negative returns from flb_utils_size_to_bytes before casting, keep
limit declared as int64_t at the top of the function (per guidelines) rather
than mid-block, preserve the existing zero-to-off mapping (if limit == 0 set to
-1), ensure flb_sds_destroy(tmp) is still called on error paths, and finally
assign ins->storage_total_limit_size only after handling negatives so the cast
is safe.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 8586351d5e
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| bytes = cio_chunk_get_real_size(ch); | ||
| if (bytes > 0) { | ||
| total += bytes; |
There was a problem hiding this comment.
Count unsynced filesystem chunks when enforcing storage cap
flb_input_chunk_fs_total_size() sums chunk bytes via cio_chunk_get_real_size(), but this API can return 0 before the real size is synced (the same file already has flb_input_chunk_get_real_size() fallback logic for that case). Because 0-sized entries are skipped here, active FS chunks can be undercounted and storage.total_limit_size may not pause ingestion when it should, especially while chunks are still up/in-flight under backpressure.
Useful? React with 👍 / 👎.
| if (ret == -1) { | ||
| flb_plg_error(ctx->ins, "cannot set storage.total_limit_size"); | ||
| } |
There was a problem hiding this comment.
Reject invalid emitter storage limit instead of continuing
When emitter_storage.total_limit_size is invalid (e.g., typo like 10XB), flb_input_set_property() returns -1 but emitter_create() only logs and continues startup. That leaves the emitter running with the default unlimited storage cap, so the configuration appears accepted while the safety limit is silently disabled.
Useful? React with 👍 / 👎.
Summary
Add
storage.total_limit_sizesupport for input plugins, capping the total filesystem storage an input can consume. This mirrors the same property that already exists on output instances. When the limit is reached, the input is paused via the existing backpressure mechanism and resumes once chunks are flushed below the threshold.Also expose this through the
rewrite_tagfilter asemitter_storage.total_limit_size, allowing users to cap the internal emitter's disk cache when usingemitter_storage.type = filesystem.Problem
When an input plugin (particularly the emitter created by
rewrite_tag) uses filesystem-backed storage, chunks can accumulate on disk without any byte-level limit:mem_buf_limitis skipped for filesystem storage (flb_input_chunk_protectreturns early whenstorage->type == FLB_STORAGE_FS)storage.pause_on_chunks_overlimit+storage.max_chunks_uplimits chunk count, not total bytesUnder sustained backpressure (e.g. slow outputs, retry storms), the emitter's disk cache can grow to multiple GB with no way to cap it.
Changes
input:flb_input.h,flb_input.cstorage_total_limit_sizefield toflb_input_instance, initialize to unlimited (-1), parsestorage.total_limit_sizepropertyinput_chunk:flb_input_chunk.cflb_input_chunk_fs_total_size()helper to sum all chunks (up + down); extendis_storage_overlimit()with byte-level check; improve pause log messagesfilter_rewrite_tag:rewrite_tag.h,rewrite_tag.cemitter_storage.total_limit_sizeconfig map entry; pass value to emitter input instanceExample Configuration