plugins: add ZeroBus output plugin#11678
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughAdds a new Fluent Bit output plugin "out_zerobus" (C implementation and public FFI header), CMake build integration and an auto-fetching ZeroBus FFI CMake module; implements plugin init/flush/exit, msgpack→JSON conversion, batch ingestion via Rust FFI, and platform-specific link rules. Changes
Sequence DiagramssequenceDiagram
participant FB as Fluent Bit
participant Plugin as out_zerobus Plugin
participant SDK as ZeroBus SDK
participant Stream as ZeroBus Stream
Note over FB,Stream: Initialization (cb_zerobus_init)
FB->>Plugin: Provide config (endpoint, workspace_url, table_name, credentials, options)
Plugin->>SDK: zerobus_sdk_new(endpoint, workspace_url, &result)
SDK-->>Plugin: CZerobusSdk handle / CResult
Plugin->>SDK: zerobus_sdk_set_use_tls(sdk, use_tls)
Plugin->>SDK: zerobus_sdk_create_stream(sdk, table_name, ..., client_id, client_secret, options, &result)
SDK-->>Stream: CZerobusStream handle / CResult
Plugin-->>FB: Store context (sdk, stream, config)
sequenceDiagram
participant FB as Fluent Bit
participant Plugin as out_zerobus Plugin
participant Conv as JSON Converter
participant Stream as ZeroBus Stream
Note over FB,Stream: Flush (cb_zerobus_flush)
FB->>Plugin: Deliver msgpack chunk
Plugin->>Conv: Decode chunk → per-record msgpack map
loop per record
Conv->>Conv: Convert to JSON (apply log_key/raw_log_key/time_key/_tag)
Conv-->>Plugin: JSON string / skip on failure
Plugin->>Plugin: Append to batch array
end
Plugin->>Stream: zerobus_stream_ingest_json_records(stream, json_batch, n, &result)
Stream-->>Plugin: offset or error (int64_t / CResult)
Plugin->>Stream: zerobus_stream_wait_for_offset(stream, offset, &result)
Stream-->>Plugin: Success / Retryable error / Failure
Plugin-->>FB: Return FLB_OK / FLB_RETRY / FLB_ERROR
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 8756a20a15
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@plugins/out_zerobus/CMakeLists.txt`:
- Around line 14-23: The CMake snippet only handles APPLE and UNIX, so on
Windows the flb-plugin-out_zerobus target will be created without Windows linker
flags; update the if/elseif chain to handle WIN32: either add a WIN32 branch
that calls target_link_libraries(flb-plugin-out_zerobus ...) with the
Windows-specific dependencies required by the Rust FFI (or common Windows system
libs like ws2_32/advapi32/crypt32 as appropriate for your Rust static lib), or
explicitly stop Windows builds by emitting a message(FATAL_ERROR "...") when
WIN32 and FLB_OUT_ZEROBUS is enabled; modify the conditional surrounding
target_link_libraries accordingly (refer to target_link_libraries and
flb-plugin-out_zerobus) so Windows behavior is deterministic.
In `@plugins/out_zerobus/zerobus.c`:
- Around line 350-358: The init_error path currently frees ctx->endpoint,
ctx->workspace_url and ctx itself but misses releasing auto-populated config_map
SDS and list fields; update the init_error cleanup to check and destroy
ctx->table_name, ctx->client_id, ctx->client_secret, ctx->time_key,
ctx->raw_log_key with flb_sds_destroy and, for ctx->log_keys, call
flb_slist_destroy then flb_free on ctx->log_keys before flb_free(ctx) so all
allocated config_map resources are freed on initialization failure (referencing
the init_error label and the
ctx->table_name/client_id/client_secret/time_key/raw_log_key/log_keys symbols).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: c50e0269-1a75-4958-9d15-c0de0134c98d
📒 Files selected for processing (5)
cmake/plugins_options.cmakeplugins/CMakeLists.txtplugins/out_zerobus/CMakeLists.txtplugins/out_zerobus/zerobus.cplugins/out_zerobus/zerobus.h
84261c1 to
919de56
Compare
There was a problem hiding this comment.
♻️ Duplicate comments (1)
plugins/out_zerobus/zerobus.c (1)
355-363:⚠️ Potential issue | 🟠 MajorMemory leak: free config_map fields in init_error path.
The
init_errorpath freesendpointandworkspace_urlbut not the auto-populated config_map fields (table_name,client_id,client_secret,time_key,raw_log_key,log_keys). Sinceflb_output_set_context()is only called on success (line 352), the framework cannot clean up these fields when init fails.,
Proposed fix
init_error: if (ctx->endpoint) { flb_sds_destroy(ctx->endpoint); } if (ctx->workspace_url) { flb_sds_destroy(ctx->workspace_url); } + if (ctx->table_name) { + flb_sds_destroy(ctx->table_name); + } + if (ctx->client_id) { + flb_sds_destroy(ctx->client_id); + } + if (ctx->client_secret) { + flb_sds_destroy(ctx->client_secret); + } + if (ctx->time_key) { + flb_sds_destroy(ctx->time_key); + } + if (ctx->raw_log_key) { + flb_sds_destroy(ctx->raw_log_key); + } + if (ctx->log_keys) { + flb_slist_destroy(ctx->log_keys); + flb_free(ctx->log_keys); + } flb_free(ctx); return -1;🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@plugins/out_zerobus/zerobus.c` around lines 355 - 363, The init_error cleanup path currently frees ctx->endpoint and ctx->workspace_url but omits the config_map string fields populated earlier, causing leaks; in the init_error block (label init_error) destroy each of ctx->table_name, ctx->client_id, ctx->client_secret, ctx->time_key, ctx->raw_log_key, and ctx->log_keys with flb_sds_destroy if non-NULL before calling flb_free(ctx), mirroring the destruction done on success (and ensure this only runs on the error path since flb_output_set_context() is invoked on success).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@plugins/out_zerobus/zerobus.c`:
- Around line 355-363: The init_error cleanup path currently frees ctx->endpoint
and ctx->workspace_url but omits the config_map string fields populated earlier,
causing leaks; in the init_error block (label init_error) destroy each of
ctx->table_name, ctx->client_id, ctx->client_secret, ctx->time_key,
ctx->raw_log_key, and ctx->log_keys with flb_sds_destroy if non-NULL before
calling flb_free(ctx), mirroring the destruction done on success (and ensure
this only runs on the error path since flb_output_set_context() is invoked on
success).
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 7b43c433-0812-413f-96c9-280aed9cd161
📒 Files selected for processing (5)
cmake/plugins_options.cmakeplugins/CMakeLists.txtplugins/out_zerobus/CMakeLists.txtplugins/out_zerobus/zerobus.cplugins/out_zerobus/zerobus.h
✅ Files skipped from review due to trivial changes (3)
- cmake/plugins_options.cmake
- plugins/CMakeLists.txt
- plugins/out_zerobus/zerobus.h
🚧 Files skipped from review as they are similar to previous changes (1)
- plugins/out_zerobus/CMakeLists.txt
1. Example configuration file2. Debug log output3.Valgrind resultsTested on Linux (Docker) with
|
There was a problem hiding this comment.
🧹 Nitpick comments (3)
plugins/out_zerobus/zerobus.c (2)
418-425: Consider FLB_ERROR instead of FLB_RETRY for decoder init failure.Decoder initialization failures typically indicate corrupted or malformed chunk data that won't be fixed by retrying. Returning
FLB_RETRYwill cause the engine to reschedule the same corrupted data repeatedly until retry limits are exhausted.🔧 Suggested fix
ret = flb_log_event_decoder_init(&log_decoder, (char *) event_chunk->data, event_chunk->size); if (ret != FLB_EVENT_DECODER_SUCCESS) { flb_plg_error(ctx->ins, "log event decoder initialization error: %d", ret); - FLB_OUTPUT_RETURN(FLB_RETRY); + FLB_OUTPUT_RETURN(FLB_ERROR); }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@plugins/out_zerobus/zerobus.c` around lines 418 - 425, The decoder init error branch in plugins/out_zerobus/zerobus.c currently returns FLB_RETRY after flb_log_event_decoder_init(&log_decoder, (char *) event_chunk->data, event_chunk->size) fails; change the control flow to return FLB_ERROR instead of FLB_RETRY (keep the existing flb_plg_error call and context/variables: log_decoder, event_chunk, ret, ctx->ins) so corrupted/malformed chunk data is not rescheduled for retry.
613-623: Consider addingFLB_OUTPUT_SYNCHRONOUSfor explicit blocking behavior.With
workers = 1and a blockingwait_for_offsetcall in the flush function, adding theFLB_OUTPUT_SYNCHRONOUSflag (2048) would make the engine's task serialization intent explicit. This flag indicates "run one task at a time, no flush cycle limit" and is appropriate for blocking output operations.The current implementation works correctly without it (as other single-worker plugins like kinesis_streams demonstrate with
flags = 0), but the flag clarifies to the engine that this output requires synchronous task handling.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@plugins/out_zerobus/zerobus.c` around lines 613 - 623, Add the FLB_OUTPUT_SYNCHRONOUS flag to the out_zerobus_plugin declaration to make blocking behavior explicit: in the struct flb_output_plugin instance named out_zerobus_plugin set .flags to include FLB_OUTPUT_SYNCHRONOUS (bit 2048) alongside any existing flags; this documents that cb_zerobus_flush (which performs the blocking wait_for_offset) should be treated as synchronous and run one task at a time when workers = 1.cmake/zerobus-ffi.cmake (1)
68-82: Partial extraction could leave broken state.If extraction fails partway through, the
native/directory may exist but be incomplete. Subsequent cmake runs would skip extraction (line 70 check passes) but the library verification at line 87 would disable the plugin. Consider checking for the library file directly instead of the directory:🔧 Suggested improvement
# Extract the tarball set(_ZEROBUS_EXTRACT_DIR "${CMAKE_BINARY_DIR}") -if(NOT EXISTS "${_ZEROBUS_EXTRACT_DIR}/native") +set(_ZEROBUS_LIB_PATH "${_ZEROBUS_EXTRACT_DIR}/native/${_ZEROBUS_PLATFORM}/libzerobus_ffi.a") +if(NOT EXISTS "${_ZEROBUS_LIB_PATH}") execute_process( COMMAND ${CMAKE_COMMAND} -E tar xzf "${_ZEROBUS_TARBALL}" WORKING_DIRECTORY "${CMAKE_BINARY_DIR}"🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@cmake/zerobus-ffi.cmake` around lines 68 - 82, The current existence check uses the directory "${_ZEROBUS_EXTRACT_DIR}/native" which can be present after a partial extract; change the logic to check for the actual expected library artifact (e.g. the platform-specific file under "${_ZEROBUS_EXTRACT_DIR}/native", such as libzerobus*.so/.dylib/.dll) before skipping extraction, and if extraction fails via execute_process store/inspect _EXTRACT_RESULT and remove the partial "${_ZEROBUS_EXTRACT_DIR}/native" to avoid leftover broken state before returning and calling FLB_OPTION(FLB_OUT_ZEROBUS OFF); ensure the new check replaces the directory check and that the execute_process step runs whenever the expected library file is missing.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@cmake/zerobus-ffi.cmake`:
- Around line 68-82: The current existence check uses the directory
"${_ZEROBUS_EXTRACT_DIR}/native" which can be present after a partial extract;
change the logic to check for the actual expected library artifact (e.g. the
platform-specific file under "${_ZEROBUS_EXTRACT_DIR}/native", such as
libzerobus*.so/.dylib/.dll) before skipping extraction, and if extraction fails
via execute_process store/inspect _EXTRACT_RESULT and remove the partial
"${_ZEROBUS_EXTRACT_DIR}/native" to avoid leftover broken state before returning
and calling FLB_OPTION(FLB_OUT_ZEROBUS OFF); ensure the new check replaces the
directory check and that the execute_process step runs whenever the expected
library file is missing.
In `@plugins/out_zerobus/zerobus.c`:
- Around line 418-425: The decoder init error branch in
plugins/out_zerobus/zerobus.c currently returns FLB_RETRY after
flb_log_event_decoder_init(&log_decoder, (char *) event_chunk->data,
event_chunk->size) fails; change the control flow to return FLB_ERROR instead of
FLB_RETRY (keep the existing flb_plg_error call and context/variables:
log_decoder, event_chunk, ret, ctx->ins) so corrupted/malformed chunk data is
not rescheduled for retry.
- Around line 613-623: Add the FLB_OUTPUT_SYNCHRONOUS flag to the
out_zerobus_plugin declaration to make blocking behavior explicit: in the struct
flb_output_plugin instance named out_zerobus_plugin set .flags to include
FLB_OUTPUT_SYNCHRONOUS (bit 2048) alongside any existing flags; this documents
that cb_zerobus_flush (which performs the blocking wait_for_offset) should be
treated as synchronous and run one task at a time when workers = 1.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 8a58ab51-fbf3-49af-9c7c-c113e19e99a3
📒 Files selected for processing (7)
CMakeLists.txtcmake/plugins_options.cmakecmake/zerobus-ffi.cmakeplugins/CMakeLists.txtplugins/out_zerobus/CMakeLists.txtplugins/out_zerobus/zerobus.cplugins/out_zerobus/zerobus.h
✅ Files skipped from review due to trivial changes (3)
- cmake/plugins_options.cmake
- plugins/CMakeLists.txt
- plugins/out_zerobus/zerobus.h
🚧 Files skipped from review as they are similar to previous changes (1)
- plugins/out_zerobus/CMakeLists.txt
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@cmake/zerobus-ffi.cmake`:
- Around line 14-19: The manual override branch that sets ZEROBUS_LIB_FILE when
ZEROBUS_LIB_DIR is provided must validate the constructed path exists before
returning; update the block that references ZEROBUS_LIB_DIR and sets
ZEROBUS_LIB_FILE to test the full file path (built from ZEROBUS_LIB_DIR,
CMAKE_STATIC_LIBRARY_PREFIX, zerobus_ffi, and CMAKE_STATIC_LIBRARY_SUFFIX) using
CMake's file(EXISTS ...) or if(NOT EXISTS ...) check and emit a clear error
(e.g., message(FATAL_ERROR ...)) or fallback to auto-discovery behavior if the
file is missing, mirroring the validation logic used by the auto-discovery code
that currently checks existence later.
- Around line 88-100: The guard currently checks for
"${_ZEROBUS_EXTRACT_DIR}/native" which can be stale; change the extraction guard
to test for the actual expected library file instead (e.g. the platform-specific
artifact under "${_ZEROBUS_EXTRACT_DIR}/native", not the directory itself)
before skipping extraction, using the same variables (_ZEROBUS_EXTRACT_DIR,
_ZEROBUS_TARBALL) and keeping the extraction flow and error handling
(RESULT_VARIABLE _EXTRACT_RESULT / message / FLB_OPTION(FLB_OUT_ZEROBUS OFF) /
return()) intact so we only skip extraction when the real library file is
present and valid.
- Around line 28-31: Update the processor-match condition that sets
_ZEROBUS_PLATFORM so it also recognizes lowercase "arm64" (and matches case
consistently with the windows-setup pattern); modify the
if(CMAKE_SYSTEM_PROCESSOR MATCHES ...) check used to set _ZEROBUS_PLATFORM so
the regex includes arm64 in any case (e.g., use a case-insensitive or anchored
alternation like ^(AARCH64|ARM64|aarch64|arm64)$ or a (?i) flag) to ensure
CMAKE_SYSTEM_PROCESSOR correctly selects "linux-aarch64".
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 4c79dee6-0431-4f68-9d4a-533bbd0aebb5
📒 Files selected for processing (2)
cmake/zerobus-ffi.cmakeplugins/out_zerobus/CMakeLists.txt
🚧 Files skipped from review as they are similar to previous changes (1)
- plugins/out_zerobus/CMakeLists.txt
|
Ready for review. Let me know if anything needs clarification. |
eda21d3 to
604f6f1
Compare
|
For security reasons, we do not do direct downloads of third party libraries on configuring/building time, dependencies are either external or bundled. I think in this case the proper way would be to make the plugin optional and auto-enabled if the dependency exists in the system where its being build. |
Signed-off-by: mats <mats.kazuki@gmail.com>
Signed-off-by: mats <mats.kazuki@gmail.com>
Signed-off-by: mats <mats.kazuki@gmail.com>
* build: disable out_zerobus on Windows ARM64 The previous pointer-size only gate falsely selected the windows-x86-64 prebuilt library when cross-compiling for Windows ARM64, causing the ARM64 package job to fail at link time. Reject ARM/ARM64 by processor name first, and keep the pointer-size check to pick the x86-64 prebuilt on 64-bit x86 and disable the plugin on 32-bit Windows. * build: align ZeroBus Windows ARM regex with Linux branch The ARM rejection regex included lowercase spellings and bare ARM/arm tokens that Windows never reports for CMAKE_SYSTEM_PROCESSOR. Use the same alternation as the sibling Linux branch and quote the processor value in the status message to match the existing style. Signed-off-by: mats <mats.kazuki@gmail.com>
5ff6eab to
d63358a
Compare
Updated the ZeroBus output plugin's CMake configuration by correcting the spelling of "Zerobus" and simplifying the library search logic. The plugin now checks for the library in user-defined paths and standard system paths, improving the build process and error messaging for missing dependencies. Signed-off-by: mats <mats.kazuki@gmail.com>
|
@edsiper Thanks for the clarification. I’ve updated the PR accordingly: the build scripts no longer download the third-party library directly. Instead, the plugin is optional and is automatically enabled only when the required dependency is found in the build environment. |
|
So, the packaging requirements in the template are applicable here: will this plugin compile for all supported targets (Linux, macOS and Windows plus containers) we currently have with no additional dependencies required? That's what those tests are for. If not then some kind of additional configuration will be required, either:
I will tag it with the label to verify this but it can be all run locally via the |
| DEFINE_OPTION(FLB_OUT_UDP "Enable UDP output plugin" ON) | ||
| DEFINE_OPTION(FLB_OUT_VIVO_EXPORTER "Enable Vivo exporter output plugin" ON) | ||
| DEFINE_OPTION(FLB_OUT_WEBSOCKET "Enable Websocket output plugin" ON) | ||
| DEFINE_OPTION(FLB_OUT_ZEROBUS "Enable Databricks Zerobus output plugin" ON) |
There was a problem hiding this comment.
This means we will always build it for every supported target so we may need additional dependencies for those targets where that is possible and configuration to turn it off for those targets where it is not possible.
Add a new output plugin (
out_zerobus) that sends log records to Databricks ZeroBus for ingestion into Delta Lake tables.Each flush converts Fluent Bit log events to JSON, ingests them as a batch through the ZeroBus streaming API, and waits for server-side acknowledgement before reporting success.
Key features
libzerobus_ffi.a)client_id/client_secret)log_keyfiltering,time_keyinjection (RFC 3339 nanoseconds),raw_log_keyfor preserving the original record, and optional_taginjectionis_retryableflagBuild integration
The plugin is enabled by default (
FLB_OUT_ZEROBUS=ON). The prebuilt static library is provisioned automatically viacmake/zerobus-ffi.cmake:-DZEROBUS_LIB_DIR=/path/to/dircontaininglibzerobus_ffi.aPlatform-specific linker dependencies are handled in
plugins/out_zerobus/CMakeLists.txt(CoreFoundation/Security/iconv on macOS, dl/pthread/m/resolv on Linux).Enter
[N/A]in the box, if an item is not applicable to your change.Testing
Before we can approve your change; please submit the following in a comment:
If this is a change to packaging of containers or native binaries then please confirm it works for all targets.
ok-package-testlabel to test for all targets (requires maintainer to do).Documentation
fluent/fluent-bit-docs/pull/2537
Backporting
Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.
Summary by CodeRabbit
New Features
Chores