Kafka Connect is an integration framework that is part of the Apache Kafka project. On Kubernetes and Red Hat OpenShift platforms, you can deploy it using the Strimzi and Red Hat AMQ Streams . Kafka Connect has two types of connectors: source and sink. The former are intended for downloading data to Kafka from external systems, the latter are intended for downloading data from Kafka. Connection to external systems, as a rule, requires authentication, therefore, when configuring the connector, you must specify credentials. In this post, we show how to use Kubernetes secrets to store credentials when configuring connectors.

ITKarma picture

Here we will use the S3 source connector, which is part of Apache Camel Kafka (for details see here ), and on his example we will show how to configure the connector so that it uses a secret.

The setup procedure described here is universal and suitable for any type of connector. We will use the S3 connector to connect to Amazon AWS S3 storage and upload files from the S3 bucket to the Apache Kafka topic. To connect to the S3 storage, we need the following AWS credentials: an access key and a secret key. So, let's start by preparing a secret with credentials.

Create a secret with credentials


First of all, we create the simplest properties file aws-credentials.properties and write the credentials into it, like this:

aws_access_key_id=AKIAIOSFODNN7EXAMPLE aws_secret_access_key=wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY 

The credentials specified here should provide access to the S3 basket, from where the data will be read. Now you need to generate a secret from this file, this is done with the following command:

$ kubectl create secret generic aws-credentials --from-file=./aws-credentials.properties 

Build a new container image with a connector


Next, you need to prepare a new Docker image with our connector. If using Strimzi, the Dockerfile for adding our connector looks like this:

FROM strimzi/kafka:0.16.1-kafka-2.4.0 USER root:root COPY./my-plugins//opt/kafka/plugins/USER 1001 

If Red Hat AMQ Streams is used, then the Dockerfile looks like this:

FROM registry.redhat.io/amq7/amq-streams-kafka-23:1.3.0 USER root:root COPY./my-plugins//opt/kafka/plugins/USER jboss:jboss 

Then, using the Dockerfile, you need to build a container image containing the connectors we need and place it in the registry. If you do not have your own private registry, you can use public registries, for example Quay or Docker Hub .

Deploy Apache Kafka Connect


So, having a container image, we can deploy Apache Kafka Connect by creating the following resource in Kubernetes:

apiVersion: kafka.strimzi.io/v1alpha1 kind: KafkaConnect metadata: name: my-connect-cluster spec: image: docker.io/scholzj/kafka:camel-kafka-2.4.0 replicas: 3 bootstrapServers: my-cluster-kafka-bootstrap:9092 externalConfiguration: volumes: - name: aws-credentials secret: secretName: aws-credentials config: config.providers: file config.providers.file.class: org.apache.kafka.common.config.provider.FileConfigProvider key.converter: org.apache.kafka.connect.json.JsonConverter value.converter: org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable: false value.converter.schemas.enable: false 

What should I look for in this description? Firstly, on the image field, which tells the Apache Kafka Connect deployment operator what image to use in this case - that is, the image where we included the connectors we need. In our example, the container image collected in the previous step is placed in the Docker Hub registry at scholzj/kafka: camel-kafka-2.4.0, so our image field looks like this:

image: docker.io/scholzj/kafka:camel-kafka-2.4.0 

Secondly, the externalConfiguration section:

externalConfiguration: volumes: - name: aws-credentials secret: secretName: aws-credentials 

Here we tell the operator to mount the Kubernetes secret aws-credentials (we created it above) in the Apache Kafka Connect pods. The secrets listed here will be mounted with the path/opt/kafka/external-configuration/, where is the name of the secret.

Finally, pay attention to the config section, where we say that Apache Kafka Connect uses FileConfigProvider as the configuration provider:

config: config.providers: file config.providers.file.class: org.apache.kafka.common.config.provider.FileConfigProvider 

A configuration provider is such a way not to write parameters directly to the configuration, but to take them from another source. In our case, we create a configuration provider named file that will use the FileConfigProvider class. This provider is part of Apache Kafka. FileConfigProvider can read property files and extract values ​​from them, and we will use it to load API keys for our Amazon AWS account.

Create a connector using the Apache Kafka Connect REST API


Usually, you have to wait a minute or two for Apache Kafka Connect to deploy before creating an instance of the connector. In previous versions of Strimzi and Red Hat AMQ Streams, you had to use the REST API for this. Now we can create the connector by doing a POST for the following JSON’s:

{ "name": "s3-connector", "config": { "connector.class": "org.apache.camel.kafkaconnector.CamelSourceConnector", "tasks.max": "1", "camel.source.kafka.topic": "s3-topic", "camel.source.maxPollDuration": "10000", "camel.source.url": "aws-s3://camel-connector-test?autocloseBody=false", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.camel.kafkaconnector.converters.S3ObjectConverter", "camel.component.aws-s3.configuration.access-key": "${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:aws_access_key_id}", "camel.component.aws-s3.configuration.secret-key": "${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:aws_secret_access_key}", "camel.component.aws-s3.configuration.region": "US_EAST_1" } } 

The connector configuration contains AWS API keys in the fields camel.component.aws-s3.configuration.access-key and camel.component.aws-s3.configuration.secret-key. Instead of writing values ​​directly, we simply reference the file configuration provider to load the aws_access_key_id and aws_secret_access_key fields from our aws-credentials.properties file.

Pay attention to exactly how we refer to the configuration provider: specify the path to the file to be used, and through the colon - the name of the key to be extracted, like this:

"camel.component.aws-s3.configuration.access-key": "${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:aws_access_key_id}" 

And so:

"camel.component.aws-s3.configuration.secret-key": "${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:aws_secret_access_key}" 

You can POST these results in the Apache Kafka Connect REST API, for example, using the curl command

$ curl -X POST -H "Content-Type: application/json" -d connector-config.json http://my-connect-cluster-connect-api:8083/connectors 

One of the advantages of configuration providers is that they allow you not to “shine” those configuration parameters that you need to keep secret:

$ curl http://my-connect-cluster-connect-api:8083/connectors/s3-connector { "name": "s3-connector", "config": { "connector.class": "org.apache.camel.kafkaconnector.CamelSourceConnector", "camel.source.maxPollDuration": "10000", "camel.source.url": "aws-s3://camel-connector-test?autocloseBody=false", "camel.component.aws-s3.configuration.region": "US_EAST_1", "camel.component.aws-s3.configuration.secret-key": "${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:aws_secret_access_key}", "tasks.max": "1", "name": "s3-connector", "value.converter": "org.apache.camel.kafkaconnector.converters.S3ObjectConverter", "camel.component.aws-s3.configuration.access-key": "${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:aws_access_key_id}", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "camel.source.kafka.topic": "s3-topic" }, "tasks": [ { "connector": "s3-connector", "task": 0 } ], "type": "source" } 

Create a connector using the Strimzi operator


Starting from version 0.16.0, Strimzi also introduced a new operator for creating connectors using the following custom YAML resource (you can also use the configuration provider directly in KafkaConnector):

apiVersion: kafka.strimzi.io/v1alpha1 kind: KafkaConnector metadata: name: s3-connector labels: strimzi.io/cluster: my-connect-cluster spec: class: org.apache.camel.kafkaconnector.CamelSourceConnector tasksMax: 1 config: key.converter: org.apache.kafka.connect.storage.StringConverter value.converter: org.apache.camel.kafkaconnector.converters.S3ObjectConverter camel.source.kafka.topic: s3-topic camel.source.url: aws-s3://camel-connector-test?autocloseBody=false camel.source.maxPollDuration: 10000 camel.component.aws-s3.configuration.access-key: ${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:aws_access_key_id} camel.component.aws-s3.configuration.secret-key: ${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:aws_secret_access_key} camel.component.aws-s3.configuration.region: US_EAST_1 

To summarize


The security of secrets in Kubernetes has its limitations, and any user with execute rights inside the container can read the connected secrets anyway. But the method described in this article, at a minimum, allows you to not reveal credentials, API keys, and other secret information when using the REST API or user resources in KafkaConnector.

Source