Skip to content

Commit 4db76b7

Browse files
committed
kafka: Add AWS MSK IAM auth support
1 parent 3fea32e commit 4db76b7

File tree

2 files changed

+156
-78
lines changed

2 files changed

+156
-78
lines changed

pipeline/inputs/kafka.md

Lines changed: 56 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -136,73 +136,93 @@ The example can be executed locally with `make start` in the `examples/kafka_fil
136136

137137
## AWS MSK IAM authentication
138138

139-
Fluent Bit v4.0.4 and later supports authentication to Amazon MSK (Managed Streaming for Apache Kafka) clusters using AWS IAM. This lets you securely connect to MSK brokers with AWS credentials, leveraging IAM roles and policies for access control.
139+
Starting with version 4.0.4, Fluent Bit supports AWS IAM authentication for Amazon MSK clusters. This allows you to use your AWS credentials and IAM policies to control access to Kafka topics.
140140

141-
### Build requirements
141+
### Prerequisites
142142

143-
If you are compiling Fluent Bit from source, ensure the following requirements are met to enable AWS MSK IAM support:
143+
- Access to an AWS MSK cluster with IAM authentication enabled
144+
- Valid AWS credentials (IAM role, access keys, or instance profile)
145+
- Network connectivity to your MSK brokers
144146

145-
- The packages `libsasl2` and `libsasl2-dev` must be installed on your build environment.
147+
### Configuration parameters [#config-aws]
146148

147-
### Runtime requirements
149+
| Property | Description | Default |
150+
| -------- | ----------- | ------- |
151+
| `rdkafka.sasl.mechanism` | Set to `aws_msk_iam` to enable MSK IAM authentication | _none_ |
152+
| `aws_region` | AWS region (optional, automatically detected from broker hostname for standard MSK endpoints) | auto-detected |
148153

149-
- **Network Access:** Fluent Bit must be able to reach your MSK broker endpoints (AWS VPC setup).
150-
- **AWS Credentials:** Provide these AWS credentials using any supported AWS method. These credentials are discovered by default when `aws_msk_iam` flag is enabled.
151-
- IAM roles (recommended for EC2, ECS, or EKS)
152-
- Environment variables (`AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`)
153-
- AWS credentials file (`~/.aws/credentials`)
154-
- Instance metadata service (IMDS)
155-
- **IAM Permissions:** The credentials must allow access to the target MSK cluster, as shown in the following example policy.
154+
### Basic configuration
156155

157-
### Configuration parameters [#config-aws]
156+
For most use cases, simply set `rdkafka.sasl.mechanism` to `aws_msk_iam`:
157+
158+
```yaml
159+
pipeline:
160+
inputs:
161+
- name: kafka
162+
brokers: boot-abc123.c1.kafka-serverless.us-east-1.amazonaws.com:9098
163+
topics: my-topic
164+
rdkafka.sasl.mechanism: aws_msk_iam
165+
```
158166
159-
| Property | Description | Required |
160-
| -------- | ----------- | -------- |
161-
| `aws_msk_iam` | If `true`, enables AWS MSK IAM authentication. Possible values: `true`, `false`. | `false` |
162-
| `aws_msk_iam_cluster_arn` | Full ARN of the MSK cluster for region extraction. This value is required if `aws_msk_iam` is `true`. | _none_ |
167+
The AWS region is automatically detected from the broker hostname for standard MSK endpoints.
163168
164-
### Configuration example
169+
**Note:** When using `aws_msk_iam`, Fluent Bit automatically sets `rdkafka.security.protocol` to `SASL_SSL`. You don't need to configure it manually.
170+
171+
### Using custom DNS or PrivateLink
172+
173+
If you're using custom DNS names or PrivateLink aliases, specify the `aws_region` parameter:
165174

166175
```yaml
167176
pipeline:
168177
inputs:
169178
- name: kafka
170-
brokers: my-cluster.abcdef.c1.kafka.us-east-1.amazonaws.com:9098
179+
brokers: my-kafka-endpoint.example.com:9098
171180
topics: my-topic
172-
aws_msk_iam: true
173-
aws_msk_iam_cluster_arn: arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster/abcdef-1234-5678-9012-abcdefghijkl-s3
174-
175-
outputs:
176-
- name: stdout
177-
match: '*'
181+
rdkafka.sasl.mechanism: aws_msk_iam
182+
aws_region: us-east-1
178183
```
179184

180-
### Example AWS IAM policy
185+
### AWS credentials
181186

182-
{% hint style="info" %}
187+
Fluent Bit uses the standard AWS credentials chain to authenticate:
183188

184-
IAM policies and permissions can be complex and might vary depending on your organization's security requirements. If you are unsure about the correct permissions or best practices, consult your AWS administrator or an AWS expert who is familiar with MSK and IAM security.
189+
1. Environment variables (`AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`)
190+
2. AWS credentials file (`~/.aws/credentials`)
191+
3. IAM instance profile (recommended for EC2)
192+
4. IAM task role (recommended for ECS)
193+
5. IAM service account (recommended for EKS)
185194

186-
{% endhint %}
195+
### Required IAM permissions
187196

188-
The AWS credentials used by Fluent Bit must have permission to connect to your MSK cluster. Here is a minimal example policy:
197+
Your AWS credentials need the following permissions to consume from MSK topics:
189198

190199
```json
191200
{
192201
"Version": "2012-10-17",
193202
"Statement": [
194203
{
195-
"Sid": "VisualEditor0",
196204
"Effect": "Allow",
197205
"Action": [
198-
"kafka-cluster:*",
199-
"kafka-cluster:DescribeCluster",
200-
"kafka-cluster:ReadData",
206+
"kafka-cluster:Connect",
201207
"kafka-cluster:DescribeTopic",
202-
"kafka-cluster:Connect"
208+
"kafka-cluster:ReadData",
209+
"kafka-cluster:DescribeGroup",
210+
"kafka-cluster:AlterGroup"
203211
],
204-
"Resource": "*"
212+
"Resource": [
213+
"arn:aws:kafka:REGION:ACCOUNT:cluster/CLUSTER_NAME/CLUSTER_UUID",
214+
"arn:aws:kafka:REGION:ACCOUNT:topic/CLUSTER_NAME/CLUSTER_UUID/my-topic",
215+
"arn:aws:kafka:REGION:ACCOUNT:group/CLUSTER_NAME/CLUSTER_UUID/fluent-bit"
216+
]
205217
}
206218
]
207219
}
208220
```
221+
222+
Replace `REGION`, `ACCOUNT`, `CLUSTER_NAME`, `CLUSTER_UUID`, and topic/group names with your actual values.
223+
224+
**Note:** The `CLUSTER_UUID` segment is required in all topic and group ARNs. You can find your cluster's UUID in the MSK console or by describing the cluster with the AWS CLI.
225+
226+
{% hint style="info" %}
227+
For detailed IAM policy configuration, consult your AWS administrator or refer to the [AWS MSK documentation](https://docs.aws.amazon.com/msk/latest/developerguide/iam-access-control.html).
228+
{% endhint %}

pipeline/outputs/kafka.md

Lines changed: 100 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -237,83 +237,141 @@ pipeline:
237237

238238
## AWS MSK IAM authentication
239239

240-
Fluent Bit 4.0.4 and later supports authentication to Amazon MSK (Managed Streaming for Apache Kafka) clusters using AWS IAM for the Kafka output plugin. This lets you securely send data to MSK brokers with AWS credentials, leveraging IAM roles and policies for access control.
240+
Starting with version 4.0.4, Fluent Bit supports AWS IAM authentication for Amazon MSK clusters. This allows you to use your AWS credentials and IAM policies to control access to Kafka topics.
241241

242242
### Prerequisites
243243

244-
If you are compiling Fluent Bit from source, ensure the following requirements are met to enable AWS MSK IAM support:
244+
- Access to an AWS MSK cluster with IAM authentication enabled
245+
- Valid AWS credentials (IAM role, access keys, or instance profile)
246+
- Network connectivity to your MSK brokers
245247

246-
- Build Requirements
248+
### Configuration parameters
247249

248-
The packages `libsasl2` and `libsasl2-dev` must be installed on your build environment.
250+
| Property | Description | Default |
251+
| -------- | ----------- | ------- |
252+
| `rdkafka.sasl.mechanism` | Set to `aws_msk_iam` to enable MSK IAM authentication | _none_ |
253+
| `aws_region` | AWS region (optional, automatically detected from broker hostname for standard MSK endpoints) | auto-detected |
249254

250-
- Runtime Requirements:
255+
### Basic configuration
251256

252-
- Network Access: Fluent Bit must be able to reach your MSK broker endpoints (AWS VPC setup).
253-
- AWS Credentials: Provide credentials using any supported AWS method:
254-
- IAM roles (recommended for EC2, ECS, or EKS)
255-
- Environment variables (`AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`)
256-
- AWS credentials file (`~/.aws/credentials`)
257-
- Instance metadata service (IMDS)
257+
For most use cases, simply set `rdkafka.sasl.mechanism` to `aws_msk_iam`:
258258

259-
These credentials are discovered by default when `aws_msk_iam` flag is enabled.
259+
{% tabs %}
260+
{% tab title="fluent-bit.yaml" %}
260261

261-
- IAM Permissions: The credentials must allow access to the target MSK cluster.
262+
```yaml
263+
pipeline:
264+
inputs:
265+
- name: cpu
262266

263-
### AWS MSK IAM configuration parameters
267+
outputs:
268+
- name: kafka
269+
match: '*'
270+
brokers: b-1.mycluster.kafka.us-east-1.amazonaws.com:9098
271+
topics: my-topic
272+
rdkafka.sasl.mechanism: aws_msk_iam
273+
```
264274
265-
This plugin supports the following parameters:
275+
{% endtab %}
276+
{% tab title="fluent-bit.conf" %}
266277
267-
| Property | Description | Type | Default |
268-
|---------------------------|-----------------------------------------------------|---------|-------------------------------|
269-
| `aws_msk_iam` | Optional. Enable AWS MSK IAM authentication. | Boolean | `false` |
270-
| `aws_msk_iam_cluster_arn` | Full ARN of the MSK cluster for region extraction. Required if `aws_msk_iam` is set. | String | _none_ |
278+
```text
279+
[INPUT]
280+
Name cpu
271281

272-
### Configuration example
282+
[OUTPUT]
283+
Name kafka
284+
Match *
285+
Brokers b-1.mycluster.kafka.us-east-1.amazonaws.com:9098
286+
Topics my-topic
287+
rdkafka.sasl.mechanism aws_msk_iam
288+
```
273289

290+
{% endtab %}
291+
{% endtabs %}
292+
293+
The AWS region is automatically detected from the broker hostname for standard MSK endpoints.
294+
295+
**Note:** When using `aws_msk_iam`, Fluent Bit automatically sets `rdkafka.security.protocol` to `SASL_SSL`. You don't need to configure it manually.
296+
297+
### Using custom DNS or PrivateLink
298+
299+
If you're using custom DNS names or PrivateLink aliases, specify the `aws_region` parameter:
274300

275301
{% tabs %}
276302
{% tab title="fluent-bit.yaml" %}
277303

278304
```yaml
279305
pipeline:
280306
inputs:
281-
- name: random
307+
- name: cpu
282308

283309
outputs:
284310
- name: kafka
285311
match: '*'
286-
brokers: my-cluster.abcdef.c1.kafka.us-east-1.amazonaws.com:9098
312+
brokers: my-kafka-endpoint.example.com:9098
287313
topics: my-topic
288-
aws_msk_iam: true
289-
aws_msk_iam_cluster_arn: arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster/abcdef-1234-5678-9012-abcdefghijkl-s3
314+
rdkafka.sasl.mechanism: aws_msk_iam
315+
aws_region: us-east-1
316+
```
317+
318+
{% endtab %}
319+
{% tab title="fluent-bit.conf" %}
320+
321+
```text
322+
[INPUT]
323+
Name cpu
324+
325+
[OUTPUT]
326+
Name kafka
327+
Match *
328+
Brokers my-kafka-endpoint.example.com:9098
329+
Topics my-topic
330+
rdkafka.sasl.mechanism aws_msk_iam
331+
aws_region us-east-1
290332
```
291333

292334
{% endtab %}
293335
{% endtabs %}
294336

295-
### AWS IAM policy
337+
### AWS credentials
296338

297-
IAM policies and permissions can be complex and can vary depending on your organization's security requirements. If you are unsure about the correct permissions or best practices, consult with your AWS administrator or an AWS expert who is familiar with MSK and IAM security.
339+
Fluent Bit uses the standard AWS credentials chain to authenticate:
298340

299-
The AWS credentials used by Fluent Bit must have permission to connect to your MSK cluster. Here is a minimal example policy:
341+
1. Environment variables (`AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`)
342+
2. AWS credentials file (`~/.aws/credentials`)
343+
3. IAM instance profile (recommended for EC2)
344+
4. IAM task role (recommended for ECS)
345+
5. IAM service account (recommended for EKS)
346+
347+
### Required IAM permissions
348+
349+
Your AWS credentials need the following permissions to produce to MSK topics:
300350

301351
```json
302352
{
303-
"Version": "2012-10-17",
304-
"Statement": [
305-
{
306-
"Sid": "VisualEditor0",
307-
"Effect": "Allow",
308-
"Action": [
309-
"kafka-cluster:*",
310-
"kafka-cluster:DescribeCluster",
311-
"kafka-cluster:ReadData",
312-
"kafka-cluster:DescribeTopic",
313-
"kafka-cluster:Connect"
314-
],
315-
"Resource": "*"
316-
}
317-
]
353+
"Version": "2012-10-17",
354+
"Statement": [
355+
{
356+
"Effect": "Allow",
357+
"Action": [
358+
"kafka-cluster:Connect",
359+
"kafka-cluster:DescribeTopic",
360+
"kafka-cluster:WriteData"
361+
],
362+
"Resource": [
363+
"arn:aws:kafka:REGION:ACCOUNT:cluster/CLUSTER_NAME/CLUSTER_UUID",
364+
"arn:aws:kafka:REGION:ACCOUNT:topic/CLUSTER_NAME/CLUSTER_UUID/my-topic"
365+
]
366+
}
367+
]
318368
}
319369
```
370+
371+
Replace `REGION`, `ACCOUNT`, `CLUSTER_NAME`, `CLUSTER_UUID`, and topic name with your actual values.
372+
373+
**Note:** The `CLUSTER_UUID` segment is required in all topic ARNs. You can find your cluster's UUID in the MSK console or by describing the cluster with the AWS CLI.
374+
375+
{% hint style="info" %}
376+
For detailed IAM policy configuration, consult your AWS administrator or refer to the [AWS MSK documentation](https://docs.aws.amazon.com/msk/latest/developerguide/iam-access-control.html).
377+
{% endhint %}

0 commit comments

Comments
 (0)