Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@
* [UDP](pipeline/outputs/udp.md)
* [Vivo Exporter](pipeline/outputs/vivo-exporter.md)
* [WebSocket](pipeline/outputs/websocket.md)
* [ZeroBus](pipeline/outputs/zerobus.md)

## Stream processing

Expand Down
169 changes: 169 additions & 0 deletions pipeline/outputs/zerobus.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
---
description: Send logs to Databricks through Zerobus

Check warning on line 2 in pipeline/outputs/zerobus.md

View workflow job for this annotation

GitHub Actions / runner / vale

[vale] reported by reviewdog 🐶 [FluentBit.Spelling] Spelling check: 'Databricks'? Raw Output: {"message": "[FluentBit.Spelling] Spelling check: 'Databricks'?", "location": {"path": "pipeline/outputs/zerobus.md", "range": {"start": {"line": 2, "column": 27}}}, "severity": "INFO"}
---

# Zerobus

Check warning on line 5 in pipeline/outputs/zerobus.md

View workflow job for this annotation

GitHub Actions / runner / vale

[vale] reported by reviewdog 🐶 [FluentBit.Spelling] Spelling check: 'Zerobus'? Raw Output: {"message": "[FluentBit.Spelling] Spelling check: 'Zerobus'?", "location": {"path": "pipeline/outputs/zerobus.md", "range": {"start": {"line": 5, "column": 3}}}, "severity": "INFO"}

{% hint style="info" %}
**Supported event types:** `logs`
{% endhint %}

The _Zerobus_ output plugin lets you ingest log records into a [Databricks](https://www.databricks.com/) table through the Zerobus streaming ingestion interface. Records are converted to JSON and sent by using the Zerobus SDK using gRPC.

Check warning on line 11 in pipeline/outputs/zerobus.md

View workflow job for this annotation

GitHub Actions / runner / vale

[vale] reported by reviewdog 🐶 [FluentBit.Spelling] Spelling check: 'Zerobus'? Raw Output: {"message": "[FluentBit.Spelling] Spelling check: 'Zerobus'?", "location": {"path": "pipeline/outputs/zerobus.md", "range": {"start": {"line": 11, "column": 215}}}, "severity": "INFO"}

Check warning on line 11 in pipeline/outputs/zerobus.md

View workflow job for this annotation

GitHub Actions / runner / vale

[vale] reported by reviewdog 🐶 [FluentBit.Spelling] Spelling check: 'Zerobus'? Raw Output: {"message": "[FluentBit.Spelling] Spelling check: 'Zerobus'?", "location": {"path": "pipeline/outputs/zerobus.md", "range": {"start": {"line": 11, "column": 124}}}, "severity": "INFO"}

Check warning on line 11 in pipeline/outputs/zerobus.md

View workflow job for this annotation

GitHub Actions / runner / vale

[vale] reported by reviewdog 🐶 [FluentBit.Spelling] Spelling check: 'Databricks'? Raw Output: {"message": "[FluentBit.Spelling] Spelling check: 'Databricks'?", "location": {"path": "pipeline/outputs/zerobus.md", "range": {"start": {"line": 11, "column": 65}}}, "severity": "INFO"}

Check warning on line 11 in pipeline/outputs/zerobus.md

View workflow job for this annotation

GitHub Actions / runner / vale

[vale] reported by reviewdog 🐶 [FluentBit.Spelling] Spelling check: 'Zerobus'? Raw Output: {"message": "[FluentBit.Spelling] Spelling check: 'Zerobus'?", "location": {"path": "pipeline/outputs/zerobus.md", "range": {"start": {"line": 11, "column": 6}}}, "severity": "INFO"}

Before you begin, you need a Databricks workspace with a Unity Catalog table configured for Zerobus ingestion, and an OAuth 2.0 service principal (client ID and client secret) with appropriate permissions.

Check warning on line 13 in pipeline/outputs/zerobus.md

View workflow job for this annotation

GitHub Actions / runner / vale

[vale] reported by reviewdog 🐶 [FluentBit.WordList] Use 'OAuth 2.0' instead of 'OAuth 2'. Raw Output: {"message": "[FluentBit.WordList] Use 'OAuth 2.0' instead of 'OAuth 2'.", "location": {"path": "pipeline/outputs/zerobus.md", "range": {"start": {"line": 13, "column": 119}}}, "severity": "INFO"}

Check warning on line 13 in pipeline/outputs/zerobus.md

View workflow job for this annotation

GitHub Actions / runner / vale

[vale] reported by reviewdog 🐶 [FluentBit.Spelling] Spelling check: 'Zerobus'? Raw Output: {"message": "[FluentBit.Spelling] Spelling check: 'Zerobus'?", "location": {"path": "pipeline/outputs/zerobus.md", "range": {"start": {"line": 13, "column": 93}}}, "severity": "INFO"}

Check warning on line 13 in pipeline/outputs/zerobus.md

View workflow job for this annotation

GitHub Actions / runner / vale

[vale] reported by reviewdog 🐶 [FluentBit.Spelling] Spelling check: 'Databricks'? Raw Output: {"message": "[FluentBit.Spelling] Spelling check: 'Databricks'?", "location": {"path": "pipeline/outputs/zerobus.md", "range": {"start": {"line": 13, "column": 30}}}, "severity": "INFO"}

## Build requirements

If you are compiling Fluent Bit from source, the `zerobus-ffi` shared library and header must be installed on your build environment before building. Pre-built binaries are available from the [zerobus-sdk releases](https://github.com/databricks/zerobus-sdk/releases).

Download `zerobus-ffi-1.1.0.tar.gz` and place the files for your platform (`linux-x86_64` or `linux-aarch64`) into the following directories:

| File | Destination |
| :--- | :--- |
| `libzerobus_ffi.so` | `/usr/local/lib/` |
| `libzerobus_ffi.a` | `/usr/local/lib/` |
| `zerobus.h` | `/usr/local/include/` |

After placing the files, run `sudo ldconfig` to update the shared library cache.

## Configuration parameters

| Key | Description | Default |
| :--- | :--- | :--- |
| `endpoint` | Zerobus gRPC endpoint URL. If no scheme is provided, `https://` is automatically prepended. | _none_ |

Check warning on line 33 in pipeline/outputs/zerobus.md

View workflow job for this annotation

GitHub Actions / runner / vale

[vale] reported by reviewdog 🐶 [FluentBit.Spelling] Spelling check: 'Zerobus'? Raw Output: {"message": "[FluentBit.Spelling] Spelling check: 'Zerobus'?", "location": {"path": "pipeline/outputs/zerobus.md", "range": {"start": {"line": 33, "column": 16}}}, "severity": "INFO"}
| `workspace_url` | Databricks workspace URL. If no scheme is provided, `https://` is automatically prepended. | _none_ |
| `table_name` | Fully qualified Unity Catalog table name in `catalog.schema.table` format. | _none_ |
| `client_id` | OAuth 2.0 client ID for authentication. | _none_ |
| `client_secret` | OAuth 2.0 client secret for authentication. | _none_ |
| `add_tag` | If enabled, the Fluent Bit tag is added as a `_tag` field in each record. | `true` |
| `time_key` | Key name for the injected timestamp. The timestamp is formatted as RFC 3339 with nanosecond precision. Set to an empty string to disable timestamp injection. | `_time` |
| `log_key` | Comma-separated list of record keys to include in the output. When unset, all keys are included. | _none_ |
| `raw_log_key` | If set, the full original record (before filtering by `log_key`) is stored as a JSON string under this key name. | _none_ |

## Get started

To send log records to Databricks through Zerobus, configure the plugin with your Zerobus endpoint, workspace URL, table name, and OAuth 2.0 credentials.

### Configuration file

{% tabs %}
{% tab title="fluent-bit.yaml" %}

```yaml
pipeline:
inputs:
- name: tail
tag: app.logs
path: /var/log/app/*.log

outputs:
- name: zerobus
match: '*'
endpoint: https://<workspace-id>.zerobus.<region>.cloud.databricks.com
workspace_url: https://<instance-name>.cloud.databricks.com
table_name: catalog.schema.logs
client_id: <your-client-id>
client_secret: <your-client-secret>
```

{% endtab %}
{% tab title="fluent-bit.conf" %}

```text
[INPUT]
Name tail
Tag app.logs
Path /var/log/app/*.log

[OUTPUT]
Name zerobus
Match *
Endpoint https://<workspace-id>.zerobus.<region>.cloud.databricks.com
Workspace_Url https://<instance-name>.cloud.databricks.com
Table_Name catalog.schema.logs
Client_Id <your-client-id>
Client_Secret <your-client-secret>
```

{% endtab %}
{% endtabs %}

### Record format

Each log record is converted to a JSON object before ingestion. The plugin applies the following transformations in order:

1. If `raw_log_key` is set, the full original record is captured as a JSON string before any filtering.
2. If `log_key` is set, only the specified keys are included in the output record.
3. If `raw_log_key` is set, the captured JSON string is injected under the configured key (unless a key with that name already exists).
4. If `time_key` is set, a timestamp in RFC 3339 format with nanosecond precision (for example, `2024-01-15T10:30:00.123456789Z`) is injected (unless a key with that name already exists).
5. If `add_tag` is enabled, the Fluent Bit tag is injected as `_tag` (unless a key with that name already exists).
Comment on lines +95 to +99
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Fix the repeated sentence starts to resolve the lint warning.

The transformation steps repeatedly start with "If...", which triggers a style warning. This issue was flagged in a previous review and marked as addressed, but the current code still contains the repeated pattern. The PR objectives mention that linting errors need to be fixed.

✍️ Suggested rewording to eliminate repetition
-1. If `raw_log_key` is set, the full original record is captured as a JSON string before any filtering.
-2. If `log_key` is set, only the specified keys are included in the output record.
-3. If `raw_log_key` is set, the captured JSON string is injected under the configured key (unless a key with that name already exists).
-4. If `time_key` is set, a timestamp in RFC 3339 format with nanosecond precision (for example, `2024-01-15T10:30:00.123456789Z`) is injected (unless a key with that name already exists).
-5. If `add_tag` is enabled, the Fluent Bit tag is injected as `_tag` (unless a key with that name already exists).
+1. When `raw_log_key` is set, the full original record is captured as a JSON string before any filtering.
+2. When `log_key` is set, only the specified keys are included in the output record.
+3. The captured JSON string (from step 1) is then injected under the configured `raw_log_key` (unless a key with that name already exists).
+4. Next, when `time_key` is set, a timestamp in RFC 3339 format with nanosecond precision (for example, `2024-01-15T10:30:00.123456789Z`) is injected (unless a key with that name already exists).
+5. Finally, when `add_tag` is enabled, the Fluent Bit tag is injected as `_tag` (unless a key with that name already exists).
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
1. If `raw_log_key` is set, the full original record is captured as a JSON string before any filtering.
2. If `log_key` is set, only the specified keys are included in the output record.
3. If `raw_log_key` is set, the captured JSON string is injected under the configured key (unless a key with that name already exists).
4. If `time_key` is set, a timestamp in RFC 3339 format with nanosecond precision (for example, `2024-01-15T10:30:00.123456789Z`) is injected (unless a key with that name already exists).
5. If `add_tag` is enabled, the Fluent Bit tag is injected as `_tag` (unless a key with that name already exists).
1. When `raw_log_key` is set, the full original record is captured as a JSON string before any filtering.
2. When `log_key` is set, only the specified keys are included in the output record.
3. The captured JSON string (from step 1) is then injected under the configured `raw_log_key` (unless a key with that name already exists).
4. Next, when `time_key` is set, a timestamp in RFC 3339 format with nanosecond precision (for example, `2024-01-15T10:30:00.123456789Z`) is injected (unless a key with that name already exists).
5. Finally, when `add_tag` is enabled, the Fluent Bit tag is injected as `_tag` (unless a key with that name already exists).
🧰 Tools
🪛 LanguageTool

[style] ~97-~97: Three successive sentences begin with the same word. Consider rewording the sentence or use a thesaurus to find a synonym.
Context: ...s are included in the output record. 3. If raw_log_key is set, the captured JSON...

(ENGLISH_WORD_REPEAT_BEGINNING_RULE)


[style] ~98-~98: Three successive sentences begin with the same word. Consider rewording the sentence or use a thesaurus to find a synonym.
Context: ... key with that name already exists). 4. If time_key is set, a timestamp in RFC 3...

(ENGLISH_WORD_REPEAT_BEGINNING_RULE)


[style] ~99-~99: Three successive sentences begin with the same word. Consider rewording the sentence or use a thesaurus to find a synonym.
Context: ... key with that name already exists). 5. If add_tag is enabled, the Fluent Bit ta...

(ENGLISH_WORD_REPEAT_BEGINNING_RULE)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pipeline/outputs/zerobus.md` around lines 95 - 99, The listed transformation
steps in pipeline/outputs/zerobus.md repeat the same sentence start ("If..."),
causing a lint/style warning; rephrase the five bullet points to vary sentence
structure while preserving meaning: reference raw_log_key and log_key behavior
(full original record captured as JSON string vs. inclusion of specified keys),
explain that the captured JSON is injected under raw_log_key unless that key
exists, state that time_key injects an RFC3339 timestamp with nanosecond
precision unless the key exists, and note that enabling add_tag injects the
Fluent Bit tag as _tag unless that key exists; update the wording for each
bullet so they don't all begin with "If" (e.g., use active phrases like "When
set," "When configured," "The captured JSON is injected," "A timestamp is
added," "Enabling add_tag injects") to remove the repeated pattern.


For example, given the following input record:

```json
{"level": "info", "message": "request completed", "status": 200}
```

The default configuration produces:

```json
{
"level": "info",
"message": "request completed",
"status": 200,
"_time": "2024-01-15T10:30:00.123456789Z",
"_tag": "app.logs"
}
```

### Filtering keys

Use `log_key` to select specific fields from the record. Combined with `raw_log_key`, you can send a filtered record while preserving the original data:

{% tabs %}
{% tab title="fluent-bit.yaml" %}

```yaml
pipeline:
outputs:
- name: zerobus
match: '*'
endpoint: https://<workspace-id>.zerobus.<region>.cloud.databricks.com
workspace_url: https://<instance-name>.cloud.databricks.com
table_name: catalog.schema.logs
client_id: <your-client-id>
client_secret: <your-client-secret>
log_key: level,message
raw_log_key: _raw
```

{% endtab %}
{% tab title="fluent-bit.conf" %}

```text
[OUTPUT]
Name zerobus
Match *
Endpoint https://<workspace-id>.zerobus.<region>.cloud.databricks.com
Workspace_Url https://<instance-name>.cloud.databricks.com
Table_Name catalog.schema.logs
Client_Id <your-client-id>
Client_Secret <your-client-secret>
Log_Key level,message
Raw_Log_Key _raw
```

{% endtab %}
{% endtabs %}

This produces:

```json
{
"level": "info",
"message": "request completed",
"_raw": "{\"level\":\"info\",\"message\":\"request completed\",\"status\":200}",
"_time": "2024-01-15T10:30:00.123456789Z",
"_tag": "app.logs"
}
```
Loading