Configuring Kafka Topics, Users and Brokers on Strimzi using Strimzi Kafka CLI

Topic, User and Broker Configuration

Strimzi Kafka CLI enables users to describe, create, delete configurations of topics, users and brokers like you did with native Apache Kafka commands.

While kfk configs command can be used to change the configuration of these three entities, one can change relevant entities' configuration by using the following as well:

  • kfk topics --config/--delete-config for adding and deleting configurations to topics.

  • kfk users --quota/--delete-quota for managing quotas as a part of the configuration of it.

  • kfk clusters --config/--delete-config for adding and deleting configurations to all brokers.

In this example we will show you to do the configuration by using kfk configs only but will mention about the options above. So let's start with topic configuration.

Topic Configuration

Considering we already have a Kafka cluster called my-cluster on our namespace called kafka, let's create a topic on it called my-topic:

kfk topics --create --topic my-topic --partitions 12 --replication-factor 3 -c my-cluster -n kafka

IMPORTANT

If you don't have any Kafka cluster that is created on your OpenShift/Kubernetes, you can use the following command:

kfk clusters --create --cluster my-cluster -n kafka

You can easily create an operator on the current OpenShift/Kubernetes before creating a Kafka cluster if you don't have one:

kfk operator --install -n kafka

First let's see what pre-defined configurations we have on our topic:

kfk configs --describe --entity-type topics --entity-name my-topic -c my-cluster -n kafka --native

Since we are running our config --describe command with --native flag, we can see all the default dynamic configurations for the topic:

Dynamic configs for topic my-topic are:
  segment.bytes=1073741824 sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:segment.bytes=1073741824, DEFAULT_CONFIG:log.segment.bytes=1073741824}
  retention.ms=7200000 sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:retention.ms=7200000}

INFO

Additionally you can describe all of the topic configurations natively on current cluster. To do this, just remove the entity-name option:

kfk configs --describe --entity-type topics -c my-cluster -n kafka --native

We can also describe the Topic custom resource itself by removing the --native flag:

kfk configs --describe --entity-type topics --entity-name my-topic -c my-cluster -n kafka
...
Spec:
  Config:
    retention.ms:   7200000
    segment.bytes:  1073741824
...

Now let's add a configuration like min.insync.replicas, which configures the sync replica count through the broker, between the leaders and followers. In order to add a configuration you must use --alter and for each config to be add --add-config following the kfk config command:

kfk configs --alter --add-config min.insync.replicas=3 --entity-type topics --entity-name my-topic -c my-cluster -n kafka

You should see a message like this:

kafkatopic.kafka.strimzi.io/my-topic configured

Alternatively you can set the topic configuration by using kfk topics with --config option:

kfk topics --alter --topic my-topic --config min.insync.replicas=3 -c my-cluster -n kafka

In order to add two configs -let's say that we wanted to add cleanup.policy=compact configuration along with min.insync.replicas- run a command like following:

kfk configs --alter --add-config 'min.insync.replicas=3,cleanup.policy=compact' --entity-type topics --entity-name my-topic -c my-cluster -n kafka

or

kfk topics --alter --topic my-topic --config min.insync.replicas=3 --config cleanup.policy=compact -c my-cluster -n kafka

After setting the configurations in order to see the changes, use the --describe flag like we did before:

kfk configs --describe --entity-type topics --entity-name my-topic -c my-cluster -n kafka --native

Output is:

Dynamic configs for topic my-topic are:
  min.insync.replicas=3 sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:min.insync.replicas=3, DEFAULT_CONFIG:min.insync.replicas=1}
  cleanup.policy=compact sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:cleanup.policy=compact, DEFAULT_CONFIG:log.cleanup.policy=delete}
  segment.bytes=1073741824 sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:segment.bytes=1073741824, DEFAULT_CONFIG:log.segment.bytes=1073741824}
  retention.ms=7200000 sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:retention.ms=7200000}

In order to see the added configuration as a Strimzi resource run the same command without --native option:

kfk configs --describe --entity-type topics --entity-name my-topic -c my-cluster -n kafka
...
  Config:
    cleanup.policy:       compact
    min.insync.replicas:  3
    retention.ms:         7200000
    segment.bytes:        1073741824
...

Like adding a configuration, deleting a configuration is very easy. You can remove all the configurations that you've just set with a single command:

kfk configs --alter --delete-config 'min.insync.replicas,cleanup.policy' --entity-type topics --entity-name my-topic -c my-cluster -n kafka

or you can use:

kfk topics --alter --topic my-topic --delete-config min.insync.replicas --delete-config cleanup.policy -c my-cluster -n kafka

When you run the describe command again you will see that the relevant configurations are removed:

kfk configs --describe --entity-type topics --entity-name my-topic -c my-cluster -n kafka --native
Dynamic configs for topic my-topic are:
  segment.bytes=1073741824 sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:segment.bytes=1073741824, DEFAULT_CONFIG:log.segment.bytes=1073741824}
  retention.ms=7200000 sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:retention.ms=7200000}

As you can see we could easily manipulate the topic configurations almost like the native shell executables of Apache Kafka. Now let's see how it is done for user configuration.

User Configuration

For the user configuration let's first create a user called my-user:

kfk users --create --user my-user --authentication-type tls -n kafka -c my-cluster

After creating the user, let's add two configurations as quota configurations like request_percentage=55 and consumer_byte_rate=2097152.

kfk configs --alter --add-config 'request_percentage=55,consumer_byte_rate=2097152' --entity-type users --entity-name my-user -c my-cluster -n kafka

Alternatively you can set the user quota configuration by using kfk users with --quota option:

kfk users --alter --user my-user --quota request_percentage=55 --quota consumer_byte_rate=2097152 -c my-cluster -n kafka

IMPORTANT

In traditional kafka-configs.sh command there are actually 5 configurations, 3 of which are quota related ones:

consumer_byte_rate                    
producer_byte_rate                    
request_percentage  

and the other 2 is for authentication type:

SCRAM-SHA-256                         
SCRAM-SHA-512 

While these two configurations are also handled by kafka-configs.sh in traditional Kafka usage, in Strimzi CLI they are configured by altering the cluster by running the kfk clusters --alter command and altering the user by using the kfk users --alter command for adding the relevant authentication type. So kfk configs command will not be used for these two configurations since it's not supported.


Now let's take a look at the configurations we just set:

kfk configs --describe --entity-type users --entity-name my-user -c my-cluster -n kafka --native
Configs for user-principal 'CN=my-user' are consumer_byte_rate=2097152.0, request_percentage=55.0

INFO

Additionally you can describe all of the user configurations natively on current cluster. To do this, just remove the entity-name option:

kfk configs --describe --entity-type users -c my-cluster -n kafka --native

You can also see the changes in the Kubernetes native description:

kfk configs --describe --entity-type users --entity-name my-user -c my-cluster -n kafka
...
Spec:
...
  Quotas:
    Consumer Byte Rate:  2097152
    Request Percentage:  55
...

Deletion of the configurations is almost the same as deleting the topic configurations:

kfk configs --alter --delete-config 'request_percentage,consumer_byte_rate' --entity-type users --entity-name my-user -c my-cluster -n kafka

or

kfk users --alter --user my-user --delete-quota request_percentage=55 --delete-quota consumer_byte_rate=2097152 -c my-cluster -n kafka

You can see that empty response returning since there is no configuration anymore after the deletion:

kfk configs --describe --entity-type users --entity-name my-user -c my-cluster -n kafka --native

So we could easily update/create/delete the user configurations for Strimzi, almost like the native shell executables of Apache Kafka. Now let's take our final step to see how it is done for broker configuration.

Broker Configuration

Adding configurations either as dynamic ones or static ones are as easy as it is for the topics and users. For both configuration types, Strimzi takes care about it itself by rolling update the brokers for the static configurations and reflecting directly the dynamic configurations.

Here is a way to add a static configuration that will be reflected after the rolling update of the brokers:

kfk configs --alter --add-config log.retention.hours=168 --entity-type brokers --entity-name my-cluster -c my-cluster -n kafka

or alternatively using the kfk clusters command:

kfk clusters --alter --cluster my-cluster --config log.retention.hours=168 -n kafka

IMPORTANT

Unlike the native kafka-configs.sh command, for the entity-name, the Kafka cluster name should be set rather than the broker ids.


kfk configs --describe --entity-type brokers --entity-name my-cluster -c my-cluster -n kafka
...
  Kafka:
    Config:
      log.message.format.version:                2.6
      log.retention.hours:                       168
      offsets.topic.replication.factor:          3
      transaction.state.log.min.isr:             2
      transaction.state.log.replication.factor:  3
...

You can describe the cluster config Kafka natively like the following:

kfk configs --describe --entity-type brokers -c my-cluster -n kafka --native
Dynamic configs for broker 0 are:
Dynamic configs for broker 1 are:
Dynamic configs for broker 2 are:
Default configs for brokers in the cluster are:

All user provided configs for brokers in the cluster are:
log.message.format.version=2.6
log.retention.hours=168
offsets.topic.replication.factor=3
transaction.state.log.min.isr=2
transaction.state.log.replication.factor=3

IMPORTANT

Note that using describe with native flag doesn't require any entity-name option since it fetches the cluster-wide broker configuration. For a specific broker configuration one can set entity-name as the broker id which will only show the first broker's configuration which will be totally the same with the cluster-wide one.


Now let's add a dynamic configuration in order to see it while describing with native flag. We will change log.cleaner.threads configuration which is responsible for controlling the background threads that do log compaction and is 1 one by default.

kfk configs --alter --add-config log.cleaner.threads=2 --entity-type brokers --entity-name my-cluster -c my-cluster -n kafka

or

kfk clusters --alter --cluster my-cluster --config log.cleaner.threads=2 -n kafka

While describing it via Strimzi custom resources will return you the list again:

kfk configs --describe --entity-type brokers --entity-name my-cluster -c my-cluster -n kafka
...
  Kafka:
    Config:
      log.cleaner.threads:                       2
      log.message.format.version:                2.6
      log.retention.hours:                       168
      offsets.topic.replication.factor:          3
      transaction.state.log.min.isr:             2
      transaction.state.log.replication.factor:  3
...

Describing it with native flag will give more details about configurations' being dynamic or not:

kfk configs --describe --entity-type brokers -c my-cluster -n kafka --native
Dynamic configs for broker 0 are:
  log.cleaner.threads=2 sensitive=false synonyms={DYNAMIC_BROKER_CONFIG:log.cleaner.threads=2, DEFAULT_CONFIG:log.cleaner.threads=1}
Dynamic configs for broker 1 are:
  log.cleaner.threads=2 sensitive=false synonyms={DYNAMIC_BROKER_CONFIG:log.cleaner.threads=2, DEFAULT_CONFIG:log.cleaner.threads=1}
Dynamic configs for broker 2 are:
  log.cleaner.threads=2 sensitive=false synonyms={DYNAMIC_BROKER_CONFIG:log.cleaner.threads=2, DEFAULT_CONFIG:log.cleaner.threads=1}
Default configs for brokers in the cluster are:

All user provided configs for brokers in the cluster are:
log.cleaner.threads=2
log.message.format.version=2.6
log.retention.hours=168
offsets.topic.replication.factor=3
transaction.state.log.min.isr=2
transaction.state.log.replication.factor=3

Deleting the configurations are exactly the same with the topics and users:

kfk configs --alter --delete-config 'log.retention.hours,log.cleaner.threads' --entity-type brokers --entity-name my-cluster -c my-cluster -n kafka

or use the following command:

kfk clusters --alter --cluster my-cluster --delete-config log.retention.hours --delete-config log.cleaner.threads -n kafka

You can see only initial configurations after the deletion:

kfk configs --describe --entity-type brokers -c my-cluster -n kafka --native
Dynamic configs for broker 0 are:
Dynamic configs for broker 1 are:
Dynamic configs for broker 2 are:
Default configs for brokers in the cluster are:

All user provided configs for brokers in the cluster are:
log.message.format.version=2.6
offsets.topic.replication.factor=3
transaction.state.log.min.isr=2
transaction.state.log.replication.factor=3

So that's all!

We are able to create, update, delete the configurations of topics, users and Kafka cluster itself and describe the changed configurations both Kubernetes and Kafka natively using Strimzi Kafka CLI.

If you are interested more, you can have a look at the short video that I demonstrate the Apache Kafka configuration on Strimzi using Strimzi Kafka CLI:

Don't miss my next article!

Join my newsletter to enjoy my latest tutorials, guides, and videos. Put your details in the form below to join today.

About the author

Aykut Bulgu

Aykut Bulgu, a Principal Software Engineer at Red Hat, working on Apache Kafka and Strimzi. Previously, he worked as a software engineer, consultant, and trainer. Aykut worked on many enterprise projects—mainly Java™—and used many open source projects including Red Hat® JBoss® middleware.