From bbd73d3e6596e30999e6075613a5f5825ed9a40d Mon Sep 17 00:00:00 2001 From: Arbin Date: Fri, 12 Dec 2025 22:13:17 +0800 Subject: [PATCH 1/3] kafka: Add AWS MSK IAM auth support Signed-off-by: Arbin --- pipeline/inputs/kafka.md | 92 ++++++++++++++---------- pipeline/outputs/kafka.md | 142 +++++++++++++++++++++++++++----------- 2 files changed, 156 insertions(+), 78 deletions(-) diff --git a/pipeline/inputs/kafka.md b/pipeline/inputs/kafka.md index 54bcb444f..d8c35f261 100644 --- a/pipeline/inputs/kafka.md +++ b/pipeline/inputs/kafka.md @@ -136,73 +136,93 @@ The example can be executed locally with `make start` in the `examples/kafka_fil ## AWS MSK IAM authentication -Fluent Bit v4.0.4 and later supports authentication to Amazon MSK (Managed Streaming for Apache Kafka) clusters using AWS IAM. This lets you securely connect to MSK brokers with AWS credentials, leveraging IAM roles and policies for access control. +Starting with version 4.0.4, Fluent Bit supports AWS IAM authentication for Amazon MSK clusters. This allows you to use your AWS credentials and IAM policies to control access to Kafka topics. -### Build requirements +### Prerequisites -If you are compiling Fluent Bit from source, ensure the following requirements are met to enable AWS MSK IAM support: +- Access to an AWS MSK cluster with IAM authentication enabled +- Valid AWS credentials (IAM role, access keys, or instance profile) +- Network connectivity to your MSK brokers -- The packages `libsasl2` and `libsasl2-dev` must be installed on your build environment. +### Configuration parameters [#config-aws] -### Runtime requirements +| Property | Description | Default | +| -------- | ----------- | ------- | +| `rdkafka.sasl.mechanism` | Set to `aws_msk_iam` to enable MSK IAM authentication | _none_ | +| `aws_region` | AWS region (optional, automatically detected from broker hostname for standard MSK endpoints) | auto-detected | -- **Network Access:** Fluent Bit must be able to reach your MSK broker endpoints (AWS VPC setup). -- **AWS Credentials:** Provide these AWS credentials using any supported AWS method. These credentials are discovered by default when `aws_msk_iam` flag is enabled. - - IAM roles (recommended for EC2, ECS, or EKS) - - Environment variables (`AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`) - - AWS credentials file (`~/.aws/credentials`) - - Instance metadata service (IMDS) -- **IAM Permissions:** The credentials must allow access to the target MSK cluster, as shown in the following example policy. +### Basic configuration -### Configuration parameters [#config-aws] +For most use cases, simply set `rdkafka.sasl.mechanism` to `aws_msk_iam`: + +```yaml +pipeline: + inputs: + - name: kafka + brokers: boot-abc123.c1.kafka-serverless.us-east-1.amazonaws.com:9098 + topics: my-topic + rdkafka.sasl.mechanism: aws_msk_iam +``` -| Property | Description | Required | -| -------- | ----------- | -------- | -| `aws_msk_iam` | If `true`, enables AWS MSK IAM authentication. Possible values: `true`, `false`. | `false` | -| `aws_msk_iam_cluster_arn` | Full ARN of the MSK cluster for region extraction. This value is required if `aws_msk_iam` is `true`. | _none_ | +The AWS region is automatically detected from the broker hostname for standard MSK endpoints. -### Configuration example +**Note:** When using `aws_msk_iam`, Fluent Bit automatically sets `rdkafka.security.protocol` to `SASL_SSL`. You don't need to configure it manually. + +### Using custom DNS or PrivateLink + +If you're using custom DNS names or PrivateLink aliases, specify the `aws_region` parameter: ```yaml pipeline: inputs: - name: kafka - brokers: my-cluster.abcdef.c1.kafka.us-east-1.amazonaws.com:9098 + brokers: my-kafka-endpoint.example.com:9098 topics: my-topic - aws_msk_iam: true - aws_msk_iam_cluster_arn: arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster/abcdef-1234-5678-9012-abcdefghijkl-s3 - - outputs: - - name: stdout - match: '*' + rdkafka.sasl.mechanism: aws_msk_iam + aws_region: us-east-1 ``` -### Example AWS IAM policy +### AWS credentials -{% hint style="info" %} +Fluent Bit uses the standard AWS credentials chain to authenticate: -IAM policies and permissions can be complex and might vary depending on your organization's security requirements. If you are unsure about the correct permissions or best practices, consult your AWS administrator or an AWS expert who is familiar with MSK and IAM security. +1. Environment variables (`AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`) +2. AWS credentials file (`~/.aws/credentials`) +3. IAM instance profile (recommended for EC2) +4. IAM task role (recommended for ECS) +5. IAM service account (recommended for EKS) -{% endhint %} +### Required IAM permissions -The AWS credentials used by Fluent Bit must have permission to connect to your MSK cluster. Here is a minimal example policy: +Your AWS credentials need the following permissions to consume from MSK topics: ```json { "Version": "2012-10-17", "Statement": [ { - "Sid": "VisualEditor0", "Effect": "Allow", "Action": [ - "kafka-cluster:*", - "kafka-cluster:DescribeCluster", - "kafka-cluster:ReadData", + "kafka-cluster:Connect", "kafka-cluster:DescribeTopic", - "kafka-cluster:Connect" + "kafka-cluster:ReadData", + "kafka-cluster:DescribeGroup", + "kafka-cluster:AlterGroup" ], - "Resource": "*" + "Resource": [ + "arn:aws:kafka:REGION:ACCOUNT:cluster/CLUSTER_NAME/CLUSTER_UUID", + "arn:aws:kafka:REGION:ACCOUNT:topic/CLUSTER_NAME/CLUSTER_UUID/my-topic", + "arn:aws:kafka:REGION:ACCOUNT:group/CLUSTER_NAME/CLUSTER_UUID/fluent-bit" + ] } ] } ``` + +Replace `REGION`, `ACCOUNT`, `CLUSTER_NAME`, `CLUSTER_UUID`, and topic/group names with your actual values. + +**Note:** The `CLUSTER_UUID` segment is required in all topic and group ARNs. You can find your cluster's UUID in the MSK console or by describing the cluster with the AWS CLI. + +{% hint style="info" %} +For detailed IAM policy configuration, consult your AWS administrator or refer to the [AWS MSK documentation](https://docs.aws.amazon.com/msk/latest/developerguide/iam-access-control.html). +{% endhint %} diff --git a/pipeline/outputs/kafka.md b/pipeline/outputs/kafka.md index af13981b9..3fe3ff64c 100644 --- a/pipeline/outputs/kafka.md +++ b/pipeline/outputs/kafka.md @@ -237,40 +237,66 @@ pipeline: ## AWS MSK IAM authentication -Fluent Bit 4.0.4 and later supports authentication to Amazon MSK (Managed Streaming for Apache Kafka) clusters using AWS IAM for the Kafka output plugin. This lets you securely send data to MSK brokers with AWS credentials, leveraging IAM roles and policies for access control. +Starting with version 4.0.4, Fluent Bit supports AWS IAM authentication for Amazon MSK clusters. This allows you to use your AWS credentials and IAM policies to control access to Kafka topics. ### Prerequisites -If you are compiling Fluent Bit from source, ensure the following requirements are met to enable AWS MSK IAM support: +- Access to an AWS MSK cluster with IAM authentication enabled +- Valid AWS credentials (IAM role, access keys, or instance profile) +- Network connectivity to your MSK brokers -- Build Requirements +### Configuration parameters - The packages `libsasl2` and `libsasl2-dev` must be installed on your build environment. +| Property | Description | Default | +| -------- | ----------- | ------- | +| `rdkafka.sasl.mechanism` | Set to `aws_msk_iam` to enable MSK IAM authentication | _none_ | +| `aws_region` | AWS region (optional, automatically detected from broker hostname for standard MSK endpoints) | auto-detected | -- Runtime Requirements: +### Basic configuration - - Network Access: Fluent Bit must be able to reach your MSK broker endpoints (AWS VPC setup). - - AWS Credentials: Provide credentials using any supported AWS method: - - IAM roles (recommended for EC2, ECS, or EKS) - - Environment variables (`AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`) - - AWS credentials file (`~/.aws/credentials`) - - Instance metadata service (IMDS) +For most use cases, simply set `rdkafka.sasl.mechanism` to `aws_msk_iam`: - These credentials are discovered by default when `aws_msk_iam` flag is enabled. +{% tabs %} +{% tab title="fluent-bit.yaml" %} -- IAM Permissions: The credentials must allow access to the target MSK cluster. +```yaml +pipeline: + inputs: + - name: cpu -### AWS MSK IAM configuration parameters + outputs: + - name: kafka + match: '*' + brokers: b-1.mycluster.kafka.us-east-1.amazonaws.com:9098 + topics: my-topic + rdkafka.sasl.mechanism: aws_msk_iam +``` -This plugin supports the following parameters: +{% endtab %} +{% tab title="fluent-bit.conf" %} -| Property | Description | Type | Default | -|---------------------------|-----------------------------------------------------|---------|-------------------------------| -| `aws_msk_iam` | Optional. Enable AWS MSK IAM authentication. | Boolean | `false` | -| `aws_msk_iam_cluster_arn` | Full ARN of the MSK cluster for region extraction. Required if `aws_msk_iam` is set. | String | _none_ | +```text +[INPUT] + Name cpu -### Configuration example +[OUTPUT] + Name kafka + Match * + Brokers b-1.mycluster.kafka.us-east-1.amazonaws.com:9098 + Topics my-topic + rdkafka.sasl.mechanism aws_msk_iam +``` +{% endtab %} +{% endtabs %} + +The AWS region is automatically detected from the broker hostname for standard MSK endpoints. + +**Note:** When using `aws_msk_iam`, Fluent Bit automatically sets `rdkafka.security.protocol` to `SASL_SSL`. You don't need to configure it manually. + +### Using custom DNS or PrivateLink + +If you're using custom DNS names or PrivateLink aliases, specify the `aws_region` parameter: {% tabs %} {% tab title="fluent-bit.yaml" %} @@ -278,42 +304,74 @@ This plugin supports the following parameters: ```yaml pipeline: inputs: - - name: random + - name: cpu outputs: - name: kafka match: '*' - brokers: my-cluster.abcdef.c1.kafka.us-east-1.amazonaws.com:9098 + brokers: my-kafka-endpoint.example.com:9098 topics: my-topic - aws_msk_iam: true - aws_msk_iam_cluster_arn: arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster/abcdef-1234-5678-9012-abcdefghijkl-s3 + rdkafka.sasl.mechanism: aws_msk_iam + aws_region: us-east-1 +``` + +{% endtab %} +{% tab title="fluent-bit.conf" %} + +```text +[INPUT] + Name cpu + +[OUTPUT] + Name kafka + Match * + Brokers my-kafka-endpoint.example.com:9098 + Topics my-topic + rdkafka.sasl.mechanism aws_msk_iam + aws_region us-east-1 ``` {% endtab %} {% endtabs %} -### AWS IAM policy +### AWS credentials -IAM policies and permissions can be complex and can vary depending on your organization's security requirements. If you are unsure about the correct permissions or best practices, consult with your AWS administrator or an AWS expert who is familiar with MSK and IAM security. +Fluent Bit uses the standard AWS credentials chain to authenticate: -The AWS credentials used by Fluent Bit must have permission to connect to your MSK cluster. Here is a minimal example policy: +1. Environment variables (`AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`) +2. AWS credentials file (`~/.aws/credentials`) +3. IAM instance profile (recommended for EC2) +4. IAM task role (recommended for ECS) +5. IAM service account (recommended for EKS) + +### Required IAM permissions + +Your AWS credentials need the following permissions to produce to MSK topics: ```json { - "Version": "2012-10-17", - "Statement": [ - { - "Sid": "VisualEditor0", - "Effect": "Allow", - "Action": [ - "kafka-cluster:*", - "kafka-cluster:DescribeCluster", - "kafka-cluster:ReadData", - "kafka-cluster:DescribeTopic", - "kafka-cluster:Connect" - ], - "Resource": "*" - } - ] + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Action": [ + "kafka-cluster:Connect", + "kafka-cluster:DescribeTopic", + "kafka-cluster:WriteData" + ], + "Resource": [ + "arn:aws:kafka:REGION:ACCOUNT:cluster/CLUSTER_NAME/CLUSTER_UUID", + "arn:aws:kafka:REGION:ACCOUNT:topic/CLUSTER_NAME/CLUSTER_UUID/my-topic" + ] + } + ] } ``` + +Replace `REGION`, `ACCOUNT`, `CLUSTER_NAME`, `CLUSTER_UUID`, and topic name with your actual values. + +**Note:** The `CLUSTER_UUID` segment is required in all topic ARNs. You can find your cluster's UUID in the MSK console or by describing the cluster with the AWS CLI. + +{% hint style="info" %} +For detailed IAM policy configuration, consult your AWS administrator or refer to the [AWS MSK documentation](https://docs.aws.amazon.com/msk/latest/developerguide/iam-access-control.html). +{% endhint %} From a76799149cb85a5f1931277a4bb6220dbcbe70ed Mon Sep 17 00:00:00 2001 From: Arbin Date: Fri, 12 Dec 2025 22:25:08 +0800 Subject: [PATCH 2/3] docs: fix typo in Kafka output plugin description Change 'Kafka input plugin' to 'Kafka output plugin' in the introduction Signed-off-by: Arbin --- pipeline/outputs/kafka.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipeline/outputs/kafka.md b/pipeline/outputs/kafka.md index 3fe3ff64c..4192d9c21 100644 --- a/pipeline/outputs/kafka.md +++ b/pipeline/outputs/kafka.md @@ -2,7 +2,7 @@ The _Kafka Producer_ output plugin lets you ingest your records into an [Apache Kafka](https://kafka.apache.org/) service. This plugin uses the official [librdkafka C library](https://github.com/edenhill/librdkafka). -In Fluent Bit 4.0.4 and later, the Kafka input plugin supports authentication with AWS MSK IAM, enabling integration with Amazon MSK (Managed Streaming for Apache Kafka) clusters that require IAM-based access. +In Fluent Bit 4.0.4 and later, the Kafka output plugin supports authentication with AWS MSK IAM, enabling integration with Amazon MSK (Managed Streaming for Apache Kafka) clusters that require IAM-based access. ## Configuration parameters From 21db1cd1d86a9f0a7eceaf92d5978d095e1ad33a Mon Sep 17 00:00:00 2001 From: Alexa Kreizinger Date: Mon, 15 Dec 2025 15:07:37 -0800 Subject: [PATCH 3/3] Apply suggestions from code review Signed-off-by: Alexa Kreizinger --- pipeline/inputs/kafka.md | 45 +++++++++++++++++++++++++-------------- pipeline/outputs/kafka.md | 43 ++++++++++++++++++++++++------------- 2 files changed, 57 insertions(+), 31 deletions(-) diff --git a/pipeline/inputs/kafka.md b/pipeline/inputs/kafka.md index d8c35f261..667a11fd9 100644 --- a/pipeline/inputs/kafka.md +++ b/pipeline/inputs/kafka.md @@ -136,24 +136,26 @@ The example can be executed locally with `make start` in the `examples/kafka_fil ## AWS MSK IAM authentication -Starting with version 4.0.4, Fluent Bit supports AWS IAM authentication for Amazon MSK clusters. This allows you to use your AWS credentials and IAM policies to control access to Kafka topics. +In Fluent Bit version 4.0.4 and later, you can use AWS IAM authentication for Amazon MSK clusters. This lets you use your AWS credentials and IAM policies to control access to Kafka topics. ### Prerequisites -- Access to an AWS MSK cluster with IAM authentication enabled -- Valid AWS credentials (IAM role, access keys, or instance profile) -- Network connectivity to your MSK brokers +To use AWS MSK IAM authentication, you must meet these requirements: + +- You must have access to an AWS MSK cluster with IAM authentication enabled. +- You must have valid AWS credentials (IAM role, access keys, or instance profile). +- You must have network connectivity to your MSK brokers. ### Configuration parameters [#config-aws] | Property | Description | Default | | -------- | ----------- | ------- | -| `rdkafka.sasl.mechanism` | Set to `aws_msk_iam` to enable MSK IAM authentication | _none_ | -| `aws_region` | AWS region (optional, automatically detected from broker hostname for standard MSK endpoints) | auto-detected | +| `rdkafka.sasl.mechanism` | Set to `aws_msk_iam` to enable MSK IAM authentication. | _none_ | +| `aws_region` | The name of your AWS region. This value is optional. If you don't set a value, but MSK IAM authentication is enabled, Fluent Bit detects your AWS region from the broker hostname for standard MSK endpoints. | _none_ | ### Basic configuration -For most use cases, simply set `rdkafka.sasl.mechanism` to `aws_msk_iam`: +For most use cases, the only necessary configuration step is to set `rdkafka.sasl.mechanism` to `aws_msk_iam`: ```yaml pipeline: @@ -166,7 +168,11 @@ pipeline: The AWS region is automatically detected from the broker hostname for standard MSK endpoints. -**Note:** When using `aws_msk_iam`, Fluent Bit automatically sets `rdkafka.security.protocol` to `SASL_SSL`. You don't need to configure it manually. +{% hint style="info" %} + +When using `aws_msk_iam`, Fluent Bit automatically sets `rdkafka.security.protocol` to `SASL_SSL`. You don't need to configure it manually. + +{% endhint %} ### Using custom DNS or PrivateLink @@ -187,13 +193,19 @@ pipeline: Fluent Bit uses the standard AWS credentials chain to authenticate: 1. Environment variables (`AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`) -2. AWS credentials file (`~/.aws/credentials`) -3. IAM instance profile (recommended for EC2) -4. IAM task role (recommended for ECS) -5. IAM service account (recommended for EKS) +1. AWS credentials file (`~/.aws/credentials`) +1. IAM instance profile (recommended for EC2) +1. IAM task role (recommended for ECS) +1. IAM service account (recommended for EKS) ### Required IAM permissions +{% hint style="info" %} + +For detailed IAM policy configuration, consult your AWS administrator or refer to the [AWS MSK documentation](https://docs.aws.amazon.com/msk/latest/developerguide/iam-access-control.html). + +{% endhint %} + Your AWS credentials need the following permissions to consume from MSK topics: ```json @@ -219,10 +231,11 @@ Your AWS credentials need the following permissions to consume from MSK topics: } ``` -Replace `REGION`, `ACCOUNT`, `CLUSTER_NAME`, `CLUSTER_UUID`, and topic/group names with your actual values. - -**Note:** The `CLUSTER_UUID` segment is required in all topic and group ARNs. You can find your cluster's UUID in the MSK console or by describing the cluster with the AWS CLI. +Replace `REGION`, `ACCOUNT`, `CLUSTER_NAME`, `CLUSTER_UUID`, and topic and group names with your actual values. {% hint style="info" %} -For detailed IAM policy configuration, consult your AWS administrator or refer to the [AWS MSK documentation](https://docs.aws.amazon.com/msk/latest/developerguide/iam-access-control.html). + +The `CLUSTER_UUID` segment is required in all topic and group ARNs. You can find your cluster's UUID in the MSK console or by describing the cluster with the AWS CLI. + {% endhint %} + diff --git a/pipeline/outputs/kafka.md b/pipeline/outputs/kafka.md index 4192d9c21..92fe9796e 100644 --- a/pipeline/outputs/kafka.md +++ b/pipeline/outputs/kafka.md @@ -237,24 +237,26 @@ pipeline: ## AWS MSK IAM authentication -Starting with version 4.0.4, Fluent Bit supports AWS IAM authentication for Amazon MSK clusters. This allows you to use your AWS credentials and IAM policies to control access to Kafka topics. +In Fluent Bit version 4.0.4 and later, you can use AWS IAM authentication for Amazon MSK clusters. This lets you use your AWS credentials and IAM policies to control access to Kafka topics. ### Prerequisites -- Access to an AWS MSK cluster with IAM authentication enabled -- Valid AWS credentials (IAM role, access keys, or instance profile) -- Network connectivity to your MSK brokers +To use AWS MSK IAM authentication, you must meet these requirements: + +- You must have access to an AWS MSK cluster with IAM authentication enabled. +- You must have valid AWS credentials (IAM role, access keys, or instance profile). +- You must have network connectivity to your MSK brokers. ### Configuration parameters | Property | Description | Default | | -------- | ----------- | ------- | -| `rdkafka.sasl.mechanism` | Set to `aws_msk_iam` to enable MSK IAM authentication | _none_ | -| `aws_region` | AWS region (optional, automatically detected from broker hostname for standard MSK endpoints) | auto-detected | +| `rdkafka.sasl.mechanism` | Set to `aws_msk_iam` to enable MSK IAM authentication. | _none_ | +| `aws_region` | The name of your AWS region. This value is optional. If you don't set a value, but MSK IAM authentication is enabled, Fluent Bit detects your AWS region from the broker hostname for standard MSK endpoints. | _none_ | ### Basic configuration -For most use cases, simply set `rdkafka.sasl.mechanism` to `aws_msk_iam`: +For most use cases, the only necessary configuration step is to set `rdkafka.sasl.mechanism` to `aws_msk_iam`: {% tabs %} {% tab title="fluent-bit.yaml" %} @@ -292,7 +294,11 @@ pipeline: The AWS region is automatically detected from the broker hostname for standard MSK endpoints. -**Note:** When using `aws_msk_iam`, Fluent Bit automatically sets `rdkafka.security.protocol` to `SASL_SSL`. You don't need to configure it manually. +{% hint style="info" %} + +When using `aws_msk_iam`, Fluent Bit automatically sets `rdkafka.security.protocol` to `SASL_SSL`. You don't need to configure it manually. + +{% endhint %} ### Using custom DNS or PrivateLink @@ -339,13 +345,19 @@ pipeline: Fluent Bit uses the standard AWS credentials chain to authenticate: 1. Environment variables (`AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`) -2. AWS credentials file (`~/.aws/credentials`) -3. IAM instance profile (recommended for EC2) -4. IAM task role (recommended for ECS) -5. IAM service account (recommended for EKS) +1. AWS credentials file (`~/.aws/credentials`) +1. IAM instance profile (recommended for EC2) +1. IAM task role (recommended for ECS) +1. IAM service account (recommended for EKS) ### Required IAM permissions +{% hint style="info" %} + +For detailed IAM policy configuration, consult your AWS administrator or refer to the [AWS MSK documentation](https://docs.aws.amazon.com/msk/latest/developerguide/iam-access-control.html). + +{% endhint %} + Your AWS credentials need the following permissions to produce to MSK topics: ```json @@ -370,8 +382,9 @@ Your AWS credentials need the following permissions to produce to MSK topics: Replace `REGION`, `ACCOUNT`, `CLUSTER_NAME`, `CLUSTER_UUID`, and topic name with your actual values. -**Note:** The `CLUSTER_UUID` segment is required in all topic ARNs. You can find your cluster's UUID in the MSK console or by describing the cluster with the AWS CLI. - {% hint style="info" %} -For detailed IAM policy configuration, consult your AWS administrator or refer to the [AWS MSK documentation](https://docs.aws.amazon.com/msk/latest/developerguide/iam-access-control.html). + +The `CLUSTER_UUID` segment is required in all topic and group ARNs. You can find your cluster's UUID in the MSK console or by describing the cluster with the AWS CLI. + {% endhint %} +