Installing Strimzi CLI with Homebrew

It has been a while since a friend of mine asked if Strimzi CLI can be installed with Homebrew.

I had to answer by saying “No” that time. But now my friend, we have the Homebrew installer:)

I had been thinking that using the Python package installer is enough because any OS can have Python and pip and so the CLI. After some time, I changed my mind since these package managers are just tools that people might be a bit fanatic of.

So why not start to support other package managers? Asking this question to myself, I decided to start with Homebrew because macOS (and Mac itself) is pretty popular among developers.

To install Strimzi CLI with Homebrew, first, you need to add the `tap` of the package:

brew tap systemcraftsman/strimzi-kafka-cli

Then you can run the `brew install` command for Strimzi CLI:

brew install strimzi-kafka-cli

The installer will check whether you have Python 3 on your machine or not and install it if it does not exist.

Here is a short video that I demonstrate how to install Strimzi CLI with Homebrew.

More curious about Strimzi CLI? Check out the Strimzi Kafka CLI Cheat Sheet to explore Strimzi Kafka CLI capabilities at a glance!

A Cheat Sheet for Strimzi Kafka CLI

TL;DR: You can download the Strimzi CLI Cheat Sheet from this link if you are curious about Strimzi CLI capabilities and want a kind of quick reference guide for it.

It is a little bit more than one year since I first announced Strimzi Kafka CLI; a command-line interface for Strimzi Kafka Operator.

It was a 4-month-old project back then and I was too excited about the project to share it before it is Beta.

Well now it is a nearly 1.5 years old project that is still in Alpha since there is still a lot to harden for the Beta roadmap but in the meanwhile added many features like creating the operator from the command-line, creating the Kafka cluster with just one command, and managing a Kafka Connect cluster and its connectors with a similar way to the traditional one.

Through this time, I published a variety of videos in the System Craftsman Youtube channel about Strimzi CLI features and how it simplifies Strimzi usage imperatively. Most importantly, shared its advantages over the declarative model by explaining its story in a presentation that I did in an internal event of Red Hat. (Planning to do it publicly as well by submitting the same talk for CFPs)

With a lot of interest in the videos and the posts that I have been sharing from here, I thought that a document that gathers all the current features of the CLI in a summarized way would be great. So I decided to create a cheat sheet for Strimzi Kafka CLI.

After a few months of work (yes, unfortunately, it took a bit long since I have a full-time job🙂), I was able to finish the cheat sheet and find a proper way to distribute it safely.

The cheat sheet has shell command examples for different features of Strimzi CLI. So if you take a quick look at inside, you will see it has 7-8 pages that have more or less the following concepts:

  • A short definition of what Strimzi Kafka Operator is
  • How Strimzi Kafka CLI works
  • How to install it
  • Managing Strimzi Kafka Operator via CLI
  • Managing Kafka clusters via CLI
  • Managing Kafka topics
  • Producing and consuming messages
  • Managing users
  • Managing ACLs
  • Managing cluster, topic and user configurations
  • Kafka Connect and connectors

You can download the Strimzi CLI Cheat Sheet from this link if you are curious about Strimzi CLI capabilities and want a kind of quick reference guide for it.

Here is a short video that I do a quick walk-through for Strimzi Kafka CLI Cheat Sheet:

Creating a Kafka Connect Cluster on Strimzi using Strimzi CLI

Kafka Connect

In this example, we will create a Kafka Connect cluster, and a few connectors that consumes particular twitter topics and writes them to an elasticsearch index to be searched easily.

We are not going to deal with any custom resources of Strimzi. Instead we will use traditional .property files that are used for Kafka Connect instances and connectors, with the help of Strimzi Kafka CLI.

Prerequisites

  • A Kubernetes/OpenShift cluster that has Strimzi Kafka Operator installed.
  • A namespace called kafka and a Kafka cluster called my-cluster
  • An elasticsearch instance up and running in the same namespace. (You can use the elasticsearch.yaml file in the repository if you have the ElasticSearch Operator running.)
  • A public image registry that has a repository called demo-connect-cluster.
  • Most importantly, a Twitter Developer Account that enables you to use Twitter API for development purposes. In this example we are going to use it with one of our Kafka Connect connectors.
  • This part of the repository. Clone this repository to be able to use the scripts provided for this example.

As a recommendation create the namespace first:

$ kubectl create namespace kafka

Or you can use the new-project command if you are using OpenShift.

$ oc new-project kafka

Then install the Strimzi Operator if its not installed. You can use Strimzi CLI for this:

$ kfk operator --install -n kafka

Clone the repo if you haven't done before and cd into the example's directory.

$ git clone https://github.com/systemcraftsman/strimzi-kafka-cli.git
$ cd strimzi-kafka-cli/examples/5_connect

Create Kafka and Elasticsearch cluster by running the setup_example.sh script in the example's directory:

$ chmod +x ./scripts/setup_example.sh
$ ./scripts/setup_example.sh

This will create a Kafka cluster with 2 brokers, and an Elasticsearch cluster that's accessible through a Route.

Keep in mind that this script doesn't create the Elasticsearch operator which Elasticsearch resource that is created in this script needs. So first you will need to install the operator for Elasticsearch before running the helper script.


NOTE

If you are using Kubernetes you can create an Ingress and expose the Elasticsearch instance.

Exposing the Elasticsearch instance is not mandatory; you can access the Elasticsearch instance cluster internally.


Lastly create an empty repository in any image registry of your choice. For this example we are going to use Quay.io as our repository will be quay.io/systemcraftsman/demo-connect-cluster.

Creating a Kafka Connect Cluster with a Twitter Source Connector

For this example, to show how it is easy to create a Kafka Connect cluster with a traditional properties file, we will use an example of a well-known Kafka instructor, Stephane Maarek, who demonstrates a very basic Twitter Source Connector in one of his courses.

So let's clone the repository https://github.com/simplesteph/kafka-beginners-course.git and change directory into the kafka-connect folder in the repository.

In the repository we have this twitter.properties file which has the following config in it:

name=TwitterSourceDemo
tasks.max=1
connector.class=com.github.jcustenborder.kafka.connect.twitter.TwitterSourceConnector

# Set these required values
process.deletes=false
filter.keywords=bitcoin
kafka.status.topic=twitter_status_connect
kafka.delete.topic=twitter_deletes_connect
# put your own credentials here - don't share with anyone
twitter.oauth.consumerKey=
twitter.oauth.consumerSecret=
twitter.oauth.accessToken=
twitter.oauth.accessTokenSecret=

This connector get the tweets statuses or deletions and saves them into the twitter_status_connect or twitter_deletes_connect depending on the action. The filter.keywords defines the keywords to be filtered for the returned tweets. In this case it is set as bitcoin, so this will consume every tweet that has bitcoin and put it in the relevant topics.

Now let's make a few changes on this file regarding the content and restrictions that Strimzi has for topic names.

Copy the twitter.properties file and save it as twitter_connector.properties which you will be editing.

In the new file change the twitter_status_connect to twitter-status-connect which Strimzi will complain about since it is not a good name for a topic. Normally Apache Kafka returns a warning about this but allows this underscore(_) convention. Since Strimzi uses custom resources for managing Kafka resources, it is not a good practice to use underscores in the topic names, or in any other custom resource of Strimzi.

Also change the twitter_deletes_connect to twitter-deletes-connect and the connector name to twitter-source-demo for a common convention.

Enter your Twitter OAuth keys which you can get from your Twitter Developer Account. For the creation of a Twitter Developer Account, Stephane explains this perfectly in his Kafka For Beginners course on Udemy. So I recommend you to take a look at both the course and the twitter setup that is explained.

Finally, change the bitcoin filter to kafka for our demo (Or you can change it to anything that you want to see the tweets of).

The final connector configuration file should look like this:

name=twitter-source-demo
tasks.max=1
connector.class=com.github.jcustenborder.kafka.connect.twitter.TwitterSourceConnector

# Set these required values
process.deletes=false
filter.keywords=kafka
kafka.status.topic=twitter-status-connect
kafka.delete.topic=twitter-deletes-connect
# put your own credentials here - don't share with anyone
twitter.oauth.consumerKey=_YOUR_CONSUMER_KEY_
twitter.oauth.consumerSecret=_YOUR_CONSUMER_SECRET_
twitter.oauth.accessToken=_YOUR_ACCESS_TOKEN_
twitter.oauth.accessTokenSecret=_YOUR_ACCESS_TOKEN_SECRET_

Notice how little we changed (actually just the names) in order to use it in the Strimzi Kafka Connect cluster.

Because we are going to need the twitter-status-connect and twitter-deletes-connect topics, let's create them upfront and continue our configuration. You must have remembered our kfk topics --create commands topics creation with Strimzi Kafka CLI:

$ kfk topics --create --topic twitter-status-connect --partitions 3 --replication-factor 1 -c my-cluster -n kafka
$ kfk topics --create --topic twitter-deletes-connect --partitions 3 --replication-factor 1 -c my-cluster -n kafka

Now let's continue with our Connect cluster's creation.

In the same repository we have this connect-standalone.properties file which has the following config in it:

...output omitted...
bootstrap.servers=localhost:9092

...output omitted...
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

...output omitted...
key.converter.schemas.enable=true
value.converter.schemas.enable=true

offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000

...output omitted...
plugin.path=connectors

Kafka Connect normally has this plugin.path key which has the all connector binaries to be used for any connector created for that Connect cluster. In our case, for Strimzi, it will be a bit different because we are going to create our connect cluster in a Kubernetes/OpenShift environment, so we should either create an image locally, or make Strimzi create the connect image for us. We will use the second option, which is fairly a new feature of Strimzi.

Only thing we have to do, instead of defining a path, we will define a set of url that has the different connector resources. So let's copy the file that Stephane created for us and save it as connect.properties since Kafka Connect works in the distributed mode in Strimzi (there is no standalone mode in short).

In the connect.properties file change the plugin.path with plugin.url and set the following source url to it:

plugin.url=https://github.com/jcustenborder/kafka-connect-twitter/releases/download/0.2.26/kafka-connect-twitter-0.2.26.tar.gz

By comparing to the original repository, you can see in the connectors folder there are a bunch of jar files that Twitter Source Connector uses. The url that you set above has the same resources archived. Strimzi extracts them while building the Connect image in the Kubernetes/OpenShift cluster.

Speaking of the image, we have to set an image, actually an image repository path, that Strimzi can push the built image into. This can be either an internal registry of yours, or a public one like Docker Hub or Quay. In this example we will use Quay and we should set the image URL like the following:

image=quay.io/systemcraftsman/demo-connect-cluster:latest

Here you can set the repository URL of your choice instead of quay.io/systemcraftsman/demo-connect-cluster:latest. As a prerequisite, you have to create this repository and make the credentials ready for the image push for Strimzi.

Apart from the plugin.path, we can do a few changes like changing the offset storage to a topic instead of a file and disabling the key/value converter schemas because we will just barely need to see the data itself; we don't need the JSON schemas.

Lastly change the bootstrap.servers value to my-cluster-kafka-bootstrap:9092, as my-cluster-kafka-bootstrap is the my-cluster Kafka cluster's Kubernetes internal host name that is provided by a Kubernetes Service.

So the final connect.properties file should look like this:

...output omitted...

bootstrap.servers=my-cluster-kafka-bootstrap:9092

# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=false
value.converter.schemas.enable=false

offset.storage.topic=connect-cluster-offsets
config.storage.topic=connect-cluster-configs
status.storage.topic=connect-cluster-status
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1

...output omitted...

image=quay.io/systemcraftsman/demo-connect-cluster:latest
plugin.url=https://github.com/jcustenborder/kafka-connect-twitter/releases/download/0.2.26/kafka-connect-twitter-0.2.26.tar.gz

Again notice how the changes are small to make it compatible for a Strimzi Kafka Connect cluster. Now lets run the Kafka Connect cluster in a way that we used to do with the traditional CLI of Kafka.

In order to start a standalone Kafka Connect cluster traditionally some must be familiar with a command like the following:

$ ./bin/connect-standalone.sh connect.properties connector.properties

The command syntax for Strimzi Kafka CLI is the same. This means you can create a Connect cluster along with one or more connectors by providing their config properties. The only difference is, Strimzi runs the Connect cluster in the distributed mode.

Run the following command to create to create a connect cluster called my-connect-cluster and a connector called twitter-source-demo. Don't forget to replace your image registry user with _YOUR_IMAGE_REGISTRY_USER_.

$ kfk connect clusters --create --cluster my-connect-cluster --replicas 1 -n kafka connect.properties twitter_connector.properties -u _YOUR_IMAGE_REGISTRY_USER_ -y

IMPORTANT

You can also create the cluster with a more controlled way; by not passing the -y flag. Without the -y flag, Strimzi Kafka CLI shows you the resource YAML of the Kafka Connect cluster in an editor, and you can modify or just save the resource before the creation. In this example we skip this part with -y flag.


You should be prompted for the registry password. Enter the password and observe the CLI response as follows:

secret/my-connect-cluster-push-secret created
kafkaconnect.kafka.strimzi.io/my-connect-cluster created
kafkaconnector.kafka.strimzi.io/twitter-source-demo created

IMPORTANT

Be careful while entering that because there is no mechanism that checks this password in Strimzi Kafka CLI, so if the password is wrong, simply the Connect image will be built sucessfully, but Strimzi won't be able to push it to the registry you specified before.

In case of any problem just delete the Connect cluster with the following command and create it again:

$ kfk connect clusters --delete --cluster my-connect-cluster -n kafka -y

Or you can delete/create the push secret that is created if you are experienced enough.


Now you can check the pods and wait till the Connect cluster pod runs without a problem.

$ watch kubectl get pods -n kafka
...output omitted...
my-connect-cluster-connect-8444df69c9-x7xf6   1/1     Running     0          3m43s
my-connect-cluster-connect-build-1-build      0/1     Completed   0          6m47s
...output omitted...

If everything is ok with the connect cluster, now you should see some messages in one of the topics we created before running the Connect cluster. Let's consume messages from twitter-status-connect topic to see if our Twitter Source Connector works.

$ kfk console-consumer --topic twitter-status-connect -c my-cluster -n kafka
...output omitted...
{"CreatedAt":1624542267000,"Id":1408058441428439047,"Text":"@Ch1pmaster @KAFKA_Dev Of het is gewoon het zoveelste boelsjit verhaal van Bauke...
...output omitted...

Observe that in the console tweets appear one by one while they are created in the twitter-status-connect topic and consumed by the consumer.

As you can see we took a couple of traditional config files from one of the most loved Kafka instructor's samples and with just a few changes on the configuration, we could create our Kafka Connect cluster along with a Twitter Source connector easily.

Now let's take a step forward and try another thing. What about putting these tweets in an elasticsearch index and make them searchable?

Altering the Kafka Connect Cluster

In order to get the tweets from the twitter-status-connect topic and index them in Elasticsearch we need to use a connector that does this for us.

Camel Elasticsearch REST Kafka Sink Connector is the connector that will do the magic for us.

First we need to add the relevant plugin resources of Camel Elasticsearch REST Sink Connector in our current connect.properties file that configures our Kafka Connect cluster.

Add the URL of the connector like the following in the connect.properties file:

...output omitted...
plugin.url=https://github.com/jcustenborder/kafka-connect-twitter/releases/download/0.2.26/kafka-connect-twitter-0.2.26.tar.gz,https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-elasticsearch-rest-kafka-connector/0.10.0/camel-elasticsearch-rest-kafka-connector-0.10.0-package.tar.gz
...output omitted...

Now run the kfk connect clusters command this time with --alter flag.

$ kfk connect clusters --alter --cluster my-connect-cluster -n kafka connect.properties
kafkaconnect.kafka.strimzi.io/my-connect-cluster replaced

Observe the connector is being build again by watching the pods.

$ watch kubectl get pods -n kafka

Wait until the build finishes, and the connector pod is up and running again.

...output omitted...
my-connect-cluster-connect-7b575b6cf9-rdmbt   1/1     Running     0          111s
...output omitted...
my-connect-cluster-connect-build-2-build      0/1     Completed   0          2m37s

Because we have a running Connect cluster ready for a Camel Elasticsearch REST Sink Connector, we can create our connector now, this time using the kfk connect connectors command.

Creating a Camel Elasticsearch REST Sink Connector

Create a file called camel_es_connector.properties and paste the following in it.

name=camel-elasticsearch-sink-demo
tasks.max=1
connector.class=org.apache.camel.kafkaconnector.elasticsearchrest.CamelElasticsearchrestSinkConnector

value.converter=org.apache.kafka.connect.storage.StringConverter

topics=twitter-status-connect
camel.sink.endpoint.hostAddresses=elasticsearch-es-http:9200
camel.sink.endpoint.indexName=tweets
camel.sink.endpoint.operation=Index
camel.sink.path.clusterName=elasticsearch
errors.tolerance=all
errors.log.enable=true
errors.log.include.messages=true

Observe that our connector's name is camel-elasticsearch-sink-demo and we use the CamelElasticsearchrestSinkConnector class to read the tweets from twitter-status-connect topic.

Properties starting with camel.sink. defines the connector specific properties. With these properties the connector will create an index called tweets in the Elasticsearch cluster which is accesible from elasticsearch-es-http:9200 host and port.

For more details for this connector, please visit the connector's configuration page link that we provided above.

Creating a connector is very simple. If you defined a topic or another object of Strimzi via Strimzi Kafka CLI before, you will notice the syntax is pretty much the same.

Run the following command to create the connector for Camel Elasticsearch REST Sink:

$ kfk connect connectors --create -c my-connect-cluster -n kafka camel_es_connector.properties
kafkaconnector.kafka.strimzi.io/camel-elasticsearch-sink-demo created

You can list the created connectors so far:

$ kfk connect connectors --list -c my-connect-cluster -n kafka
NAME                            CLUSTER              CONNECTOR CLASS                                                                         MAX TASKS   READY
twitter-source-demo             my-connect-cluster   com.github.jcustenborder.kafka.connect.twitter.TwitterSourceConnector                   1           1
camel-elasticsearch-sink-demo   my-connect-cluster   org.apache.camel.kafkaconnector.elasticsearchrest.CamelElasticsearchrestSinkConnector   1           1

After the resource created run the following curl command in the watch mode to observe how the indexed values increases per tweet consumption. Change the _ELASTIC_EXTERNAL_URL_ with your Route or Ingress URL of the Elasticsearch cluster you created as a prerequisite.

$ watch "curl -s http://_ELASTIC_EXTERNAL_URL_/tweets/_search | jq -r '.hits.total.value'"

In another terminal window you can run the console consumer again to see both the Twitter Source connector and the Camel Elasticsearch Sink connector in action:

tweets_flowing

In a browser or with curl, call the following URL for searching Apache word in the tweet texts.

$ curl -s http://_ELASTIC_EXTERNAL_URL_/tweets/_search?q=Text:Apache
{"took":3,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":3,"relation":"eq"},"max_score":5.769906,"hits":[{"_index":"tweets","_type":"_doc","_id":"bm6aPnoBRxta4q47oss0","_score":5.769906,"_source":{"CreatedAt":1624542084000,"Id":1408057673577345026,"Text":"RT @KCUserGroups: June 29: Kansas City Apache Kafka® Meetup by Confluent - Testing with AsyncAPI for Apache Kafka: Brokering the Complexity…","Source":"<a href=\"http://twitter.com/download/android\" rel=\"nofollow\">Twitter for Android</a>","Truncated":false,"InReplyToStatusId":-1,"InReplyToUserId":-1,"InReplyToScreenName":null,"GeoLocation":null,"Place":null,"Favorited":false,"Retweeted":false,"FavoriteCount":0,"User":{"Id":87489271,"Name":"Fran Méndez","ScreenName":"fmvilas","Location":"Badajoz, España","Description":"Founder of @AsyncAPISpec. Director of Engineering at @getpostman.\n\nAtheist, feminist, proud husband of @e_morcillo, and father of Ada & 2 cats \uD83D\uDC31\uD83D\uDC08 he/him","ContributorsEnabled":false,"ProfileImageURL":"http://pbs.twimg.com/profile_images/1373387614238179328/cB1gp6Lh_normal.jpg","BiggerProfileImageURL":"http://pbs.twimg.com/profile_images/1373387614238179328/cB1gp6Lh_bigger.jpg","MiniProfileImageURL":"http://pbs.twimg.com/profile_images/1373387614238179328/cB1gp6Lh_mini.jpg","OriginalProfileImageURL":"http://pbs.twimg.com/profile_images/1373387614238179328/cB1gp6Lh.jpg","ProfileImageURLHttps":"https://pbs.twimg.com/profile_images/1373387614238179328/cB1gp6Lh_normal.jpg","BiggerProfileImageURLHttps":"https://pbs.twimg.com/profile_images/1373387614238179328/cB1gp6Lh_bigger.jpg","MiniProfileImageURLHttps":"https://pbs.twimg.com/profile_images/1373387614238179328/cB1gp6Lh_mini.jpg","OriginalProfileImageURLHttps":"https://pbs.twimg.com/profile_images/1373387614238179328/cB1gp6Lh.jpg","DefaultProfileImage":false,"URL":"http://www.fmvilas.com","Protected":false,"FollowersCount":1983,"ProfileBackgroundColor":"000000","ProfileTextColor":"000000","ProfileLinkColor":"1B95E0","ProfileSidebarFillColor":"000000","ProfileSidebarBorderColor":"000000","ProfileUseBackgroundImage":false,"DefaultProfile":false,"ShowAllInlineMedia":false,"FriendsCount":3197,
...output omitted...

Cool! We hit some Apache Kafka tweets with our Apache search in Twitter tweets related to kafka. How about yours? If you don't hit anything you can do the search with any word of your choice.

Since we are almost done with our example let's delete the resources one by one to observe how Strimzi Kafka CLI works with the deletion of the Kafka Connect resources.

Deleting Connectors and the Kafka Connect Cluster

First let's delete our connectors one by one:

$ kfk connect connectors --delete --connector twitter-source-demo -c my-connect-cluster -n kafka
kafkaconnector.kafka.strimzi.io "twitter-source-demo" deleted
$ kfk connect connectors --delete --connector camel-elasticsearch-sink-demo -c my-connect-cluster -n kafka
kafkaconnector.kafka.strimzi.io "camel-elasticsearch-sink-demo" deleted

Observe no more tweets are produced in the twitter-status-connect topic and no more data is indexed in Elasticsearch anymore.

Now we can also delete the my-connect-cluster Kafka Connect cluster. Notice that it is pretty much the same with the Kafka cluster deletion syntax of Strimzi CLI.

$ kfk connect clusters --delete --cluster my-connect-cluster -n kafka -y

This command will both delete the KafkaConnect resource and the push secret that is created for the Connect image.

kafkaconnect.kafka.strimzi.io "my-connect-cluster" deleted
secret "my-connect-cluster-push-secret" deleted

Check the Connect cluster pod is terminated by the Strimzi operator:

$ kubectl get pods -n kafka
NAME                                          READY   STATUS    RESTARTS   AGE
elastic-operator-84774b4d49-v2lbr             1/1     Running   0          4h9m
elasticsearch-es-default-0                    1/1     Running   0          4h8m
my-cluster-entity-operator-5c84b64ddf-22t9p   3/3     Running   0          4h8m
my-cluster-kafka-0                            1/1     Running   0          4h8m
my-cluster-kafka-1                            1/1     Running   0          4h8m
my-cluster-zookeeper-0                        1/1     Running   0          4h8m

Congratulations!

In this example we are able to create a Kafka Connect cluster along with a Twitter Source connector with Strimzi Kafka CLI, to consume the tweets from Twitter and write them to one of our topics that we defined in the configuration. We also altered the Kafka Connect Cluster and added new plugin resources for Camel Elasticsearch REST Sink connector, to write our tweets from the relevant topic to an Elasticsearch index with a single --alter command of Strimzi Kafka CLI. This made our consumed tweets searchable, so that we could search for the word Apache in our tweets Elasticsearch index. After finishing the example, we cleared up our resources by deleting them easily with the CLI.

Access the repo of this post from here: https://github.com/systemcraftsman/strimzi-kafka-cli/tree/master/examples/5_connect

Join our newsletter

* indicates required



A Strimzi Kafka Quickstart for Quarkus with Strimzi CLI

Quarkus Strimzi Quickstart (with Strimzi Kafka CLI)

This project illustrates how you can interact with Apache Kafka on Kubernetes (Strimzi) using MicroProfile Reactive Messaging.

Creating the Kafka cluster (on OpenShift)

First you need a Kafka cluster on your OpenShift.

Create a namespace:

 oc new-project kafka-quickstart

Then you will need Strimzi Kafka CLI by running the following command (you will need Python3 and pip):

 sudo pip install strimzi-kafka-cli

(See this link for other details about Strimzi Kafka CLI.)

After installing Strimzi Kafka CLI run the following command to install the operator on kafka-quickstart namespace:

 kfk operator --install -n kafka-quickstart

When the operator is ready to serve, run the following command in order to create a Kafka cluster:

 kfk clusters --create --cluster my-cluster -n kafka-quickstart

A vim interface will pop-up. If you like you can change the broker and zookeeper replicas to 1 but I suggest you to leave them as is if your Kubernetes cluster have enough resources. Save the cluster configuration file and respond Yes to make Strimzi CLI apply the changes.

Wait for the 3 broker and 3 zookeeper pods running in ready state in your cluster:

 oc get pods -n kafka-quickstart -w

When all pods are ready, create your prices topic to be used by the application:

 kfk topics --create --topic prices --partitions 10 --replication-factor 2 -c my-cluster -n kafka-quickstart

Check your topic is created successfully by describing it natively:

kfk topics --describe --topic prices -c my-cluster -n kafka-quickstart --native

Deploying the application

The application can be deployed to OpenShift using:

 ./mvnw clean package -DskipTests

This will take a while since the s2i build will run before the deployment. Be sure the application's pod is running in ready state in the end. Run the following command to get the URL of the Prices page:

echo http://$(oc get routes -n kafka-quickstart -o json | jq -r '.items[0].spec.host')/prices.html 

Copy the URL to your browser, and you should see a fluctuating price.

Anatomy

In addition to the prices.html page, the application is composed by 3 components:

  • PriceGenerator
  • PriceConverter
  • PriceResource

We generate (random) prices in PriceGenerator. These prices are written in prices Kafka topic that we recently created. PriceConverter reads from the prices Kafka topic and apply some magic conversion to the price. The result is sent to an in-memory stream consumed by a JAX-RS resource PriceResource. The data is sent to a browser using server-sent events.

Running and deploying in native

You can compile the application into a native binary using:

 ./mvnw clean install -Pnative

or deploy with:

 ./mvnw clean package -Pnative -DskipTests

This demo is based on the following resources:

Visit the following link to clone this demo:

https://github.com/systemcraftsman/quarkus-strimzi-cli-demos

Configuring Kafka Topics, Users and Brokers on Strimzi using Strimzi Kafka CLI

Topic, User and Broker Configuration

Strimzi Kafka CLI enables users to describe, create, delete configurations of topics, users and brokers like you did with native Apache Kafka commands.

While kfk configs command can be used to change the configuration of these three entities, one can change relevant entities' configuration by using the following as well:

  • kfk topics --config/--delete-config for adding and deleting configurations to topics.

  • kfk users --quota/--delete-quota for managing quotas as a part of the configuration of it.

  • kfk clusters --config/--delete-config for adding and deleting configurations to all brokers.

In this example we will show you to do the configuration by using kfk configs only but will mention about the options above. So let's start with topic configuration.

Topic Configuration

Considering we already have a Kafka cluster called my-cluster on our namespace called kafka, let's create a topic on it called my-topic:

kfk topics --create --topic my-topic --partitions 12 --replication-factor 3 -c my-cluster -n kafka

IMPORTANT

If you don't have any Kafka cluster that is created on your OpenShift/Kubernetes, you can use the following command:

kfk clusters --create --cluster my-cluster -n kafka

You can easily create an operator on the current OpenShift/Kubernetes before creating a Kafka cluster if you don't have one:

kfk operator --install -n kafka

First let's see what pre-defined configurations we have on our topic:

kfk configs --describe --entity-type topics --entity-name my-topic -c my-cluster -n kafka --native

Since we are running our config --describe command with --native flag, we can see all the default dynamic configurations for the topic:

Dynamic configs for topic my-topic are:
  segment.bytes=1073741824 sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:segment.bytes=1073741824, DEFAULT_CONFIG:log.segment.bytes=1073741824}
  retention.ms=7200000 sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:retention.ms=7200000}

INFO

Additionally you can describe all of the topic configurations natively on current cluster. To do this, just remove the entity-name option:

kfk configs --describe --entity-type topics -c my-cluster -n kafka --native

We can also describe the Topic custom resource itself by removing the --native flag:

kfk configs --describe --entity-type topics --entity-name my-topic -c my-cluster -n kafka
...
Spec:
  Config:
    retention.ms:   7200000
    segment.bytes:  1073741824
...

Now let's add a configuration like min.insync.replicas, which configures the sync replica count through the broker, between the leaders and followers. In order to add a configuration you must use --alter and for each config to be add --add-config following the kfk config command:

kfk configs --alter --add-config min.insync.replicas=3 --entity-type topics --entity-name my-topic -c my-cluster -n kafka

You should see a message like this:

kafkatopic.kafka.strimzi.io/my-topic configured

Alternatively you can set the topic configuration by using kfk topics with --config option:

kfk topics --alter --topic my-topic --config min.insync.replicas=3 -c my-cluster -n kafka

In order to add two configs -let's say that we wanted to add cleanup.policy=compact configuration along with min.insync.replicas- run a command like following:

kfk configs --alter --add-config 'min.insync.replicas=3,cleanup.policy=compact' --entity-type topics --entity-name my-topic -c my-cluster -n kafka

or

kfk topics --alter --topic my-topic --config min.insync.replicas=3 --config cleanup.policy=compact -c my-cluster -n kafka

After setting the configurations in order to see the changes, use the --describe flag like we did before:

kfk configs --describe --entity-type topics --entity-name my-topic -c my-cluster -n kafka --native

Output is:

Dynamic configs for topic my-topic are:
  min.insync.replicas=3 sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:min.insync.replicas=3, DEFAULT_CONFIG:min.insync.replicas=1}
  cleanup.policy=compact sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:cleanup.policy=compact, DEFAULT_CONFIG:log.cleanup.policy=delete}
  segment.bytes=1073741824 sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:segment.bytes=1073741824, DEFAULT_CONFIG:log.segment.bytes=1073741824}
  retention.ms=7200000 sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:retention.ms=7200000}

In order to see the added configuration as a Strimzi resource run the same command without --native option:

kfk configs --describe --entity-type topics --entity-name my-topic -c my-cluster -n kafka
...
  Config:
    cleanup.policy:       compact
    min.insync.replicas:  3
    retention.ms:         7200000
    segment.bytes:        1073741824
...

Like adding a configuration, deleting a configuration is very easy. You can remove all the configurations that you've just set with a single command:

kfk configs --alter --delete-config 'min.insync.replicas,cleanup.policy' --entity-type topics --entity-name my-topic -c my-cluster -n kafka

or you can use:

kfk topics --alter --topic my-topic --delete-config min.insync.replicas --delete-config cleanup.policy -c my-cluster -n kafka

When you run the describe command again you will see that the relevant configurations are removed:

kfk configs --describe --entity-type topics --entity-name my-topic -c my-cluster -n kafka --native
Dynamic configs for topic my-topic are:
  segment.bytes=1073741824 sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:segment.bytes=1073741824, DEFAULT_CONFIG:log.segment.bytes=1073741824}
  retention.ms=7200000 sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:retention.ms=7200000}

As you can see we could easily manipulate the topic configurations almost like the native shell executables of Apache Kafka. Now let's see how it is done for user configuration.

User Configuration

For the user configuration let's first create a user called my-user:

kfk users --create --user my-user --authentication-type tls -n kafka -c my-cluster

After creating the user, let's add two configurations as quota configurations like request_percentage=55 and consumer_byte_rate=2097152.

kfk configs --alter --add-config 'request_percentage=55,consumer_byte_rate=2097152' --entity-type users --entity-name my-user -c my-cluster -n kafka

Alternatively you can set the user quota configuration by using kfk users with --quota option:

kfk users --alter --user my-user --quota request_percentage=55 --quota consumer_byte_rate=2097152 -c my-cluster -n kafka

IMPORTANT

In traditional kafka-configs.sh command there are actually 5 configurations, 3 of which are quota related ones:

consumer_byte_rate                    
producer_byte_rate                    
request_percentage  

and the other 2 is for authentication type:

SCRAM-SHA-256                         
SCRAM-SHA-512 

While these two configurations are also handled by kafka-configs.sh in traditional Kafka usage, in Strimzi CLI they are configured by altering the cluster by running the kfk clusters --alter command and altering the user by using the kfk users --alter command for adding the relevant authentication type. So kfk configs command will not be used for these two configurations since it's not supported.


Now let's take a look at the configurations we just set:

kfk configs --describe --entity-type users --entity-name my-user -c my-cluster -n kafka --native
Configs for user-principal 'CN=my-user' are consumer_byte_rate=2097152.0, request_percentage=55.0

INFO

Additionally you can describe all of the user configurations natively on current cluster. To do this, just remove the entity-name option:

kfk configs --describe --entity-type users -c my-cluster -n kafka --native

You can also see the changes in the Kubernetes native description:

kfk configs --describe --entity-type users --entity-name my-user -c my-cluster -n kafka
...
Spec:
...
  Quotas:
    Consumer Byte Rate:  2097152
    Request Percentage:  55
...

Deletion of the configurations is almost the same as deleting the topic configurations:

kfk configs --alter --delete-config 'request_percentage,consumer_byte_rate' --entity-type users --entity-name my-user -c my-cluster -n kafka

or

kfk users --alter --user my-user --delete-quota request_percentage=55 --delete-quota consumer_byte_rate=2097152 -c my-cluster -n kafka

You can see that empty response returning since there is no configuration anymore after the deletion:

kfk configs --describe --entity-type users --entity-name my-user -c my-cluster -n kafka --native

So we could easily update/create/delete the user configurations for Strimzi, almost like the native shell executables of Apache Kafka. Now let's take our final step to see how it is done for broker configuration.

Broker Configuration

Adding configurations either as dynamic ones or static ones are as easy as it is for the topics and users. For both configuration types, Strimzi takes care about it itself by rolling update the brokers for the static configurations and reflecting directly the dynamic configurations.

Here is a way to add a static configuration that will be reflected after the rolling update of the brokers:

kfk configs --alter --add-config log.retention.hours=168 --entity-type brokers --entity-name my-cluster -c my-cluster -n kafka

or alternatively using the kfk clusters command:

kfk clusters --alter --cluster my-cluster --config log.retention.hours=168 -n kafka

IMPORTANT

Unlike the native kafka-configs.sh command, for the entity-name, the Kafka cluster name should be set rather than the broker ids.


kfk configs --describe --entity-type brokers --entity-name my-cluster -c my-cluster -n kafka
...
  Kafka:
    Config:
      log.message.format.version:                2.6
      log.retention.hours:                       168
      offsets.topic.replication.factor:          3
      transaction.state.log.min.isr:             2
      transaction.state.log.replication.factor:  3
...

You can describe the cluster config Kafka natively like the following:

kfk configs --describe --entity-type brokers -c my-cluster -n kafka --native
Dynamic configs for broker 0 are:
Dynamic configs for broker 1 are:
Dynamic configs for broker 2 are:
Default configs for brokers in the cluster are:

All user provided configs for brokers in the cluster are:
log.message.format.version=2.6
log.retention.hours=168
offsets.topic.replication.factor=3
transaction.state.log.min.isr=2
transaction.state.log.replication.factor=3

IMPORTANT

Note that using describe with native flag doesn't require any entity-name option since it fetches the cluster-wide broker configuration. For a specific broker configuration one can set entity-name as the broker id which will only show the first broker's configuration which will be totally the same with the cluster-wide one.


Now let's add a dynamic configuration in order to see it while describing with native flag. We will change log.cleaner.threads configuration which is responsible for controlling the background threads that do log compaction and is 1 one by default.

kfk configs --alter --add-config log.cleaner.threads=2 --entity-type brokers --entity-name my-cluster -c my-cluster -n kafka

or

kfk clusters --alter --cluster my-cluster --config log.cleaner.threads=2 -n kafka

While describing it via Strimzi custom resources will return you the list again:

kfk configs --describe --entity-type brokers --entity-name my-cluster -c my-cluster -n kafka
...
  Kafka:
    Config:
      log.cleaner.threads:                       2
      log.message.format.version:                2.6
      log.retention.hours:                       168
      offsets.topic.replication.factor:          3
      transaction.state.log.min.isr:             2
      transaction.state.log.replication.factor:  3
...

Describing it with native flag will give more details about configurations' being dynamic or not:

kfk configs --describe --entity-type brokers -c my-cluster -n kafka --native
Dynamic configs for broker 0 are:
  log.cleaner.threads=2 sensitive=false synonyms={DYNAMIC_BROKER_CONFIG:log.cleaner.threads=2, DEFAULT_CONFIG:log.cleaner.threads=1}
Dynamic configs for broker 1 are:
  log.cleaner.threads=2 sensitive=false synonyms={DYNAMIC_BROKER_CONFIG:log.cleaner.threads=2, DEFAULT_CONFIG:log.cleaner.threads=1}
Dynamic configs for broker 2 are:
  log.cleaner.threads=2 sensitive=false synonyms={DYNAMIC_BROKER_CONFIG:log.cleaner.threads=2, DEFAULT_CONFIG:log.cleaner.threads=1}
Default configs for brokers in the cluster are:

All user provided configs for brokers in the cluster are:
log.cleaner.threads=2
log.message.format.version=2.6
log.retention.hours=168
offsets.topic.replication.factor=3
transaction.state.log.min.isr=2
transaction.state.log.replication.factor=3

Deleting the configurations are exactly the same with the topics and users:

kfk configs --alter --delete-config 'log.retention.hours,log.cleaner.threads' --entity-type brokers --entity-name my-cluster -c my-cluster -n kafka

or use the following command:

kfk clusters --alter --cluster my-cluster --delete-config log.retention.hours --delete-config log.cleaner.threads -n kafka

You can see only initial configurations after the deletion:

kfk configs --describe --entity-type brokers -c my-cluster -n kafka --native
Dynamic configs for broker 0 are:
Dynamic configs for broker 1 are:
Dynamic configs for broker 2 are:
Default configs for brokers in the cluster are:

All user provided configs for brokers in the cluster are:
log.message.format.version=2.6
offsets.topic.replication.factor=3
transaction.state.log.min.isr=2
transaction.state.log.replication.factor=3

So that's all!

We are able to create, update, delete the configurations of topics, users and Kafka cluster itself and describe the changed configurations both Kubernetes and Kafka natively using Strimzi Kafka CLI.

If you are interested more, you can have a look at the short video that I demonstrate the Apache Kafka configuration on Strimzi using Strimzi Kafka CLI:

Join our newsletter

* indicates required



Messaging Architectures for Cloud-Native Applications

1. Traditional Applications and Monoliths

Do you remember the times we used to create applications by creating data models regarding the business domains and use those data models as the reflection of the relational database objects -mostly tables- in order to do CRUD actions?

Business requirements were pouring from the waterfall and making us so soaking wet that we could not easily respond to change: new business requirements, bug fixes, enhancements, etc.

When Agile methodologies came out, after some time, this made us more flexible and respond to change very quickly whereas there came out some ideas of SOA, service bus, distributed state management, etc. but business domains stayed kind of merged and monoliths survived.

Monolith applications -which is actually not an anti-pattern- ruled the world for a considerable number of years with different kinds of architectures that have their own benefits and drawbacks.

Figure 1. Traditional Application Design. https://speakerdeck.com/mbogoevici/data-strategies-for-microservice-architectures?slide=4

1.1. Benefits and Drawbacks

The Benefits:

  • Easy to start with
  • Easy transaction management
  • Sync communication
  • Can be powered-up with the modular architecture

The Drawbacks:

  • Hard to change business domain and data model
  • Hard to scale
  • Tightly coupled components

2. Cloud-Native Applications and Microservices

One of the best motivations for approaches like Domain Driven Design is being monoliths’ tightly coupled between business domains and the need of separating those domains in order to loosen the coupling and provide single responsibility for each domain as bounded contexts.

Figure 2. Bounded Contexts. https://martinfowler.com/bliki/BoundedContext.html

So these kinds of approaches led to microservices’ creation with the motivation of loosely coupling between bounded contexts, being polyglot, which means using best-fitting tools for the relevant service, easily being scalable horizontally, and most importantly with these benefits, being able to easily adapt into the could-native world.

Apart from creating a lot of benefits on the microservices side, microservices and the cloud-native application architecture has some challenges that may turn a developer’s life into a nightmare.

2.1. Challenges

Microservice architectures have many challenges like manageability of the services, traceability, monitoring, service discovery, distributed state and data management, and resilience which are handled automatically by cloud-native platforms like Kubernetes. For example service discovery is one of the requirements of an application that consists of microservices and Kubernetes provides this service discovery mechanism on its own side.
What cloud-native platforms can not provide and leave it to the guru developers is state management itself.

2.1.1. State

In order to keep the state in a distributed system and make it flew through the microservices has some challenges. Keeping the state in a distributed cache system like Infinispan and create a kind of single source of truth for the state is a common pattern but in the mesh of the service, it is tough to manage since there will be an Inception of caches.

Keeping the state distributed through services is even tougher.

Figure 3. Microservices & Data. https://speakerdeck.com/mbogoevici/data-strategies-for-microservice-architectures?slide=5

As Database-per-microservice is a common pattern and as each bounded context should have and handle its own data, -with the need of sharing the state/data through services- this makes direct point-to-point communication between microservices more important.

2.1.2. Synchronous Communication

Synchronous data retrieval is a way to get the data that is needed from a microservice to another microservice. One can use comparingly new technologies like HTTP+REST, gRPC, or some old school technologies like RMI and IIOP, but all these synchronous point-to-point styles of data retrieval have some costs.

Figure 4. Synchronous Data Retrieval. https://speakerdeck.com/yanaga/distribute-your-microservices-data-with-events-cqrs-and-event-sourcing?slide=5

Latency is one of the key points of messaging between services and with synchronous communication if one of the services whose data will be retrieved has some performance problems in itself, it will be able to serve the data with a bit of latency that may cause data retrieval latency or timeout exceptions.

Or a service may have some failure inside and is not available that specific time period the sync data call won’t work.

Also, any performance issues on the network will directly either affect the latency or service availability. So it is up to the network’s being reliable.

We know that there are some patterns like distributed caching, bulkhead, and circuit breaker patterns for handling this kind of failure scenario by implementing fault tolerance strategies, but is it really the right way to do it?

2.2. Challenging the Challenges

So there are some solutions which some of them are invented years ago, but still rigid enough to be a ‘solution’ while others are brand-new architectures that will help us for challenging the challenges of cloud-native application messaging and communications.

Let’s start by taking a look at the common asynchronous messaging architectures before jumping into the solutions.

2.2.1. Asynchronous Messaging and Messaging Architectures

Like synchronous communication, asynchronous communication -or in other words messaging– has to be done over protocols. Two sides of the communication should agree on the protocol, so that message data that is either forecasted or consumed can be understood by the consumer.

While HTTP+REST protocol is the most used protocol for synchronous communication, there are several other protocols that asynchronous messaging systems widely use; like AMQP, STOMP, XMMP, MQTT, or Kafka protocols.

There are three main types of messaging models:

  • Point-to-point
  • Publish-subscribe (Pub-sub)
  • Hybrid
Point-to-Point

Point-to-point messaging is like sending a parcel via mail post services. You go to a post office, write the address you want the parcel to be delivered to, and post the parcel knowing it will be delivered sometime later. The receiver does not have to be at home when the parcel is sent and at some point later the parcel will be received at the address.

Point-to-point messaging systems are mostly implemented as queues that use the first-in, first-out (FIFO) order. So this means that only one subscriber of a queue can receive a specific message.

Figure 5. Point-to-Point Messaging. https://docs.oracle.com/cd/E19340-01/820-6424/aerbj/index.html

This opens the conversation of queues are being durable, which means if there are no active subscribers the messaging system will retain the messages until a subscriber comes and consumes them.

Point-to-point messaging is generally used for the use cases of calling for a message to be acted upon once only as queues can best provide an at-least-once delivery guarantee.

Publish-Subcribe

In order to understand how publish-subscribe (pub-sub) works think that you are an attendee on a webinar. When you are connected you can hear and watch what the speaker says, along with the other participants. When you disconnect you miss what the speaker says, but when you connect again you are able to hear what is being said.

So the webinar works like a pub-sub mechanism that while all the attendees are subscribers, the speaker is the broadcaster/publisher.

Pub-sub mechanisms are generally implemented through topics that act like the webinar broadcast to be subscribed. So when a message is produced on a topic, all the subscribers get it since the message is distributed along.

Figure 6. Publish-Subscribe Messaging. https://engineering.carsguide.com.au/laravel-pub-sub-messaging-with-apache-kafka-3b27ed1ee5e8

Topics are nondurable, unlike the queues. This means that a subscriber/consumer that is not consuming any messages -as it might be not running etc.-, misses the broadcasted messages in that period of being off. So this means that topics can provide a at-most-once delivery guarantee for each subscriber.

Hybrid model

Hybrid models of messaging systems both include the point-to-point and publish-subscribe as the use cases generally require a messaging system that has many consumers who want a copy of the message with full durability; in other words without message loss.

Technologies like ActiveMQ and Apache Kafka both implement this hybrid model with their own ways of persistence and distribution mechanisms.

Durability is a key factor especially on Cloud-Native distributed systems since the persistence of the state and being able to somehow replay it plays a key role in component communication. By adding it the capabilities of the publish-subscribe mechanism decrease the dependencies between components/services/microservices as it has the power of persisting and getting the message again either with the same subscriber or another one.

So the hybrid messaging systems are very vital when it comes to passing states as messages through Cloud-Native microservices as events since event-driven distributed architectures require these capabilities.

2.2.2. Events & Event Sourcing

As a process of developing microservice-based cloud-native architectures, approaches like Domain-Driven Design (DDD) makes it easy to divide the bounded contexts and see the sub-domains related to the parent domain.

One of the best techniques to separate and define the bounded contexts is the Event Storming technique, which takes the events as entry points and emerges everything including commands, data relationships, communication styles, and most importantly combobulators which are mostly mapped as bounded contexts.

Figure 7. Events Storming Components. https://medium.com/@springdo/a-facilitators-recipe-for-event-storming-941dcb38db0d

After all when most of the event storming map emerges, one can see all communication points between bounded contexts which are mostly mapped as microservices or services in the system that has their own database and data structure.

Figure 8. A real-life Event Storming example;)

This structure, as it all consists of events, gives the main idea of using async communication via a publish-subscribe system that queues the events to be consumed, in other words, doing Event Sourcing.

Event Sourcing is a state-event-message pattern that captures all changes to an application state as a sequence of events that can be consumed by other applications -in this case, microservices.

Figure 9. Event Sourcing. https://speakerdeck.com/mbogoevici/data-strategies-for-microservice-architectures?slide=14

Event Sourcing is very important in a distributed cloud-native environment because in the cloud-native world microservices can easily be scaled, new microservices can join or -from an application modernization perspective- microservices can be separated from their big monolith mother in order to make it live its own life.

So having the capability of an asynchronous publish-subscribe system that has the durability of data or ability to replay is very important. Additionally, queueing the events rather than the final data makes it flexible for other services which means implementing dependency inversion in an asynchronous environment with the capability of eventual consistency.

The question here is: How to create/trigger those events?

There are many programmatic ways rather than languages or framework libraries to create events and publish them. One can create database listeners or interceptors programmatically (like Hibernate Envers does) or can handle them in the DAO (Data Access Object) or service layer of the application. Even so, creating an Event Sourcing mechanism is not easy.

At this point, a relatively new pattern comes as a savior: Change Data Capture.

2.2.3. Change Data Capture

Change Data Capture (CDC) is a pattern that is used to track the data change -mostly in databases- in order to take any action on it.

Figure 10. Change Data Capture. https://speakerdeck.com/mbogoevici/data-strategies-for-microservice-architectures?slide=15

A CDC mechanism should listen to the data change, and create the event that includes the change as Create, Insert, Update, Delete actions, and the data change itself. After creating the action it can be published to any durable pub-sub system in order to be consumed.

In order to decouple the database change event listening capability from the application code, it is one of the best patterns that is used for event-driven architectures of cloud-native applications.

Debezium is probably the most popular open-source implementation nowadays because of its easy integration with a popular set of databases and Apache Kafka, especially on platforms like Kubernetes/OpenShift.

Figure 11. CDC with Debezium. https://developers.redhat.com/blog/2020/04/14/capture-database-changes-with-debezium-apache-kafka-connectors/

Now that, we have our event sourcing listener and creator as a CDC implementation like Debezium and let’s say we use Apache Kafka for event distribution between microservices.

Since the data we are creating is not the data itself but the change subscribed microservice should get the change and reflect it to its database. This change -when it comes again a microservice having a database of its own because it’s being a bounded context– is generally used for the read purpose rather than its being a write on the database because it is a reflection of the event that is just triggered by another write on another database.

So this makes us recall a pattern -that is already automatically implemented by design in the sample of ours- that’s been used for many years especially by enterprise-level relational databases: CQRS.

2.2.4. Command Query Responsibility Segregation (CQRS)

CQRS is a pattern that suggests one to separate the read model from the write model, mainly with the motivation of separation of concerns and performance.

Figure 12. CQRS with Separate Datastores. https://speakerdeck.com/mbogoevici/data-strategies-for-microservice-architectures?slide=20

In a cloud-native system that has a set of polyglot microservices that has its own database -either as relational or NoSQL- the CQRS pattern fits well since each microservice has to have the data reflection of the dependent/dependee application.

2.3. Solutions Assemble: State Propagation

So in our imaginary, well-architected, distributed cloud-native system, in order to make our microservices communicate and transfer their state, we called the best of breed super-heroic patterns -some of which has great implementations.

Figure 13. State Propagation & Outbox Pattern. https://debezium.io/blog/2019/02/19/reliable-microservices-data-exchange-with-the-outbox-pattern/

This state transfer through an asynchronous system that has the durability and pub-sub ability like Apache Kafka, triggered by a change data capture mechanism like Debezium, writing to a component which creates the event data to be read by another component is all called State Propagation which is backed by the Outbox pattern on the microservices side.

To sum up all, getting all the solutions altogether state propagation and creating event-driven mechanisms with CDC, Event Sourcing and CQRS help us in a very elegant way in order to solve the challenges of the cloud-native microservices era.

Resources

Books
  • Jakub Korab. ‘Understanding Message Brokers’. O’Reilly Media, Inc. ISBN 9781491981535.
  • Todd Palino, Gwen Shapira, Neha Narkhede. ‘Kafka: The Definitive Guide’. O’Reilly Media, Inc. ISBN 9781491936160.
Videos
Presentations
Website Articles

Join our newsletter

* indicates required



ASAP! – The Storified Demo of Introduction to Debezium and Kafka on Kubernetes

A few days ago, I had a chance to speak about “Change Data Capture with Debezium and Apache Kafka” at an Istanbul Java User Group event. After the presentation, I did a small demo that I think was very beneficial for the audience so I thought that it would be best to improve it and kind of “storify” it in order to have both fun and spread it to a wider audience. So here is the demo, and here are the resources that you might need. Enjoy:)

ASAP! – The Storyfied Demo of Introduction to Debezium and Kafka on Kubernetes

Pre-Demo Preparations

Install the prereqs:

  • Strimzi Kafka CLI:

sudo pip install strimzi-kafka-cli

  • oc or kubectl
  • helm

Login to a Kubernetes or OpenShift cluster and create a new namespace/project.

Let's say we create a namespace called debezium-demo by running the following command on OpenShift:

oc new-project debezium-demo

Install demo application 'The NeverEnding Blog'

Clone the repository:

git clone https://github.com/mabulgu/the-neverending-blog.git

Checkout the debezium-demo branch:

git checkout debezium-demo

Go into the application directory:

cd the-neverending-blog

Install the helm template:

helm template the-neverending-blog chart | oc apply -f - -n debezium-demo

Start the s2i build for the application:

oc start-build neverending-blog --from-dir=. -n debezium-demo

...and OpenShift will take care of the rest and you should have a blog application called 'The NeverEnding Blog' in the end:

Install Elasticsearch

Apply Elasticsearch resources to OpenShift:

oc apply -f resources/elasticsearch.yaml -n debezium-demo

Expose the route for Elasticsearch:

oc expose svc elasticsearch-es-http -n debezium-demo

By clicking on the route of the application in the browser you should see a page like this:

And for the overall applications before the demo you should be having something like this (OpenShift Developer Perspective is used here):

So you should have a Django application which uses a MySQL database and an Elasticsearch that has no data connection to the application -yet:)

ASAP!

So you are working at a company called NeverEnding Inc. as a Software Person and you are responsible for the company's blog application which runs on Django and use MYSQL as a database.

One day your boss comes and tells you this:

So getting the command from your boss, you think that this is a good use case for using Change Data Capture (CDC) pattern.

Since the boss wants it ASAP, and you don't want to make dual writes which may cause consistency problems, you have to find a way to apply this request easily and you think it will be best to implement it via Debezium on your OpenShift Office Space cluster along with Strimzi: Kafka on Kubernetes.

Oh, you can wear a Hawaiian shirt and jeans while you are doing all these even if it's not Friday:)

Deploying a Kafka cluster with Strimzi Kafka CLI

In order to install Strimzi cluster on OpenShift you decide to use Strimzi Kafka CLI which you can also install the cluster operator of it.

First install the Strimzi operator:

kfk operator --install -n debezium-demo

IMPORTANT

If you have already an operator installed, please check the version. If the Strimzi version you've been using is older than 0.20.0, you have to set the right version as an environment variable, so that you will be able to use the right version of cluster custom resource.

export STRIMZI_KAFKA_CLI_STRIMZI_VERSION=0.19.0

Let's create a Kafka cluster called demo on our OpenShift namespace debezium-demo.

kfk clusters --create --cluster demo -n debezium-demo

In the opened editor you may choose 3 broker, 3 zookeeper configuration which is the default. So after saving the configuration file of the Kafka cluster in the developer preview of OpenShift you should see the resources that are created for the Kafka cluster:

Deploying a Kafka Connect Cluster for Debezium

Now it's time to create a Kafka Connect cluster via using Strimzi custom resources. Since Strimzi Kafka CLI is not capable of creating connect objects yet at the time of writing this article we will create it by using the sample resources in the demo project.

Create a custom resource like the following:

apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaConnect
metadata:
  annotations:
    strimzi.io/use-connector-resources: 'true'
  name: debezium
spec:
  bootstrapServers: 'demo-kafka-bootstrap:9092'
  config:
    config.storage.replication.factor: '1'
    config.storage.topic: debezium-cluster-configs
    group.id: debezium-cluster
    offset.storage.replication.factor: '1'
    offset.storage.topic: debezium-cluster-offsets
    status.storage.replication.factor: '1'
    status.storage.topic: debezium-cluster-status
  image: 'quay.io/hguerreroo/rhi-cdc-connect:2020-Q3'
  jvmOptions:
    gcLoggingEnabled: false
  replicas: 1
  resources:
    limits:
      memory: 2Gi
    requests:
      memory: 2Gi

And apply it to OpenShift debezium-demo namespace (or just apply the one you have in this demo repository)

oc apply -f resources/kafka-connect-debezium.yaml -n debezium-demo

This will create a Kafka Connect cluster with the name debezium on your namespace:

Deploy a Debezium connector for MySQL

So you have the Kafka Connect cluster to be able to use with Debezium. Now it's time for the real magic; the Debezium connector for MySQL.

Create the custom resource like the following, by noticing the parts of configuration starts with database.

Since you have to capture the changes in the neverendingblog database which has the posts database your configuration should be something like this:

apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaConnector
metadata:
  labels:
    strimzi.io/cluster: debezium
  name: debezium-mysql-connector
spec:
  class: io.debezium.connector.mysql.MySqlConnector
  config:
    database.server.name: db
    database.hostname: mysql
    database.user: debezium
    database.password: dbz
    database.server.id: '184054'
    database.port: '3306'
    database.dbname: neverendingblog
    database.history.kafka.topic: db.history
    database.history.kafka.bootstrap.servers: 'demo-kafka-bootstrap:9092'
  tasksMax: 1

Apply this YAML by saving it or just run the following command in this repository:

oc apply -f resources/kafka-connector-mysql-debezium.yaml -n debezium-demo

So you should now have some action in your Kafka cluster by now and the big picture should look like this:

In order to see if there is any new topic is created in your Kafka cluster run this command to list the topics in the debezium-demo namespace and demo Kafka cluster:

kfk topics --list -n debezium-demo -c demo

So you should see some topics are created for you:

NAME                                                                                PARTITIONS   REPLICATION FACTOR
consumer-offsets---84e7a678d08f4bd226872e5cdd4eb527fadc1c6a                         50           1
db                                                                                  1            1
db.history                                                                          1            1
db.neverendingblog.auth-permission---68ff3df4ec8e6a44b01288a87974b27990a559d2       1            1
db.neverendingblog.auth-user---a76d163ac9b98b60f06bfda76e966523ee9ffad              1            1
db.neverendingblog.django-admin-log---889a02bc079f08f8adf60c1b1f1cc6782dd99531      1            1
db.neverendingblog.django-content-type---79cc865eac5ac5b439174d2165a8035d52062610   1            1
db.neverendingblog.django-migrations---adc510d5c63e7b6ccbbf460dfa8c03408559591d     1            1
db.neverendingblog.django-session---38f5de04ea83f7a9add8be00a2d695a9503505c6        1            1
db.neverendingblog.posts                                                            1            1
debezium-cluster-configs                                                            1            1
debezium-cluster-offsets                                                            25           1
debezium-cluster-status                                                             5            1

Now let's check this connector works or not. So start a consumer that listens your db.neverendingblog.posts topic which the captured data from posts topic is put.

kfk console-consumer --topic db.neverendingblog.posts -n debezium-demo -c demo

After starting the consumer let's make some changes in the NeverEnding Blog. Open the Django admin page by getting the route URL of the blog and putting a "/admin" at the end.


INFO

You can get the route URL of your application with the following command:

oc get routes -n debezium-demo

So login to the admin page with the credentials mabulgu/123456 and click on Posts and add a new one by clicking Add Post and put these values as a test and save it:

In the consumer you must already have seen a move right? Copy that into a JSON beautifier and see what you have. You must have something like this:

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "title"
          },
          {
            "type": "string",
            "optional": false,
            "field": "text"
          },
          {
            "type": "int64",
            "optional": false,
            "name": "io.debezium.time.MicroTimestamp",
            "version": 1,
            "field": "created_date"
          },
          {
            "type": "int64",
            "optional": true,
            "name": "io.debezium.time.MicroTimestamp",
            "version": 1,
            "field": "published_date"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "author_id"
          }
        ],
        "optional": true,
        "name": "db.neverendingblog.posts.Value",
        "field": "before"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "title"
          },
          {
            "type": "string",
            "optional": false,
            "field": "text"
          },
          {
            "type": "int64",
            "optional": false,
            "name": "io.debezium.time.MicroTimestamp",
            "version": 1,
            "field": "created_date"
          },
          {
            "type": "int64",
            "optional": true,
            "name": "io.debezium.time.MicroTimestamp",
            "version": 1,
            "field": "published_date"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "author_id"
          }
        ],
        "optional": true,
        "name": "db.neverendingblog.posts.Value",
        "field": "after"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": false,
            "field": "version"
          },
          {
            "type": "string",
            "optional": false,
            "field": "connector"
          },
          {
            "type": "string",
            "optional": false,
            "field": "name"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "ts_ms"
          },
          {
            "type": "string",
            "optional": true,
            "name": "io.debezium.data.Enum",
            "version": 1,
            "parameters": {
              "allowed": "true,last,false"
            },
            "default": "false",
            "field": "snapshot"
          },
          {
            "type": "string",
            "optional": false,
            "field": "db"
          },
          {
            "type": "string",
            "optional": true,
            "field": "table"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "server_id"
          },
          {
            "type": "string",
            "optional": true,
            "field": "gtid"
          },
          {
            "type": "string",
            "optional": false,
            "field": "file"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "pos"
          },
          {
            "type": "int32",
            "optional": false,
            "field": "row"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "thread"
          },
          {
            "type": "string",
            "optional": true,
            "field": "query"
          }
        ],
        "optional": false,
        "name": "io.debezium.connector.mysql.Source",
        "field": "source"
      },
      {
        "type": "string",
        "optional": false,
        "field": "op"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ms"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": false,
            "field": "id"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "total_order"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "data_collection_order"
          }
        ],
        "optional": true,
        "field": "transaction"
      }
    ],
    "optional": false,
    "name": "db.neverendingblog.posts.Envelope"
  },
  "payload": {
    "before": null,
    "after": {
      "id": 3,
      "title": "Javaday Istanbul 2020",
      "text": "It was perfect as always!",
      "created_date": 1606400139000000,
      "published_date": null,
      "author_id": 1
    },
    "source": {
      "version": "1.2.4.Final-redhat-00001",
      "connector": "mysql",
      "name": "db",
      "ts_ms": 1606400180000,
      "snapshot": "false",
      "db": "neverendingblog",
      "table": "posts",
      "server_id": 223344,
      "gtid": null,
      "file": "mysql-bin.000003",
      "pos": 27078,
      "row": 0,
      "thread": 221,
      "query": null
    },
    "op": "c",
    "ts_ms": 1606400180703,
    "transaction": null
  }
}

So congratulations! You can capture changes on your neverendingblog database.

But your boss still wants you to put these changes on your search system Elasticsearch.

Before rolling the sleeves to send this change data to Elasticsearch let's purify this data since all you need to index is the operation type and the table fields in this Debezium JSON data.

Simple Data Transformation

So in order to transform the data some key/value converters has to be set in order to do extract transformation which will create a different data model in the end.

So add these lines and apply it on your OpenShift cluster:

    key.converter: org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable: 'false'
    value.converter: org.apache.kafka.connect.json.JsonConverter
    value.converter.schemas.enable: 'false'
    transforms: extract
    transforms.extract.add.fields: 'op,table'
    transforms.extract.type: io.debezium.transforms.ExtractNewRecordState

Or just run this sample in the repository:

oc apply -f resources/kafka-connector-mysql-debezium_transformed.yaml -n debezium-demo

This means that we will extract the data for op and table fields and create a new JSON to be returned.

After applying the changes let's consume the messages again if we did stop the consumer already:

kfk console-consumer --topic db.neverendingblog.posts -n debezium-demo -c demo

Go to the blog admin page again but this time let's change one of the blog posts instead of adding one.

Edit the post titled Strimzi Kafka CLI: Managing Strimzi in a Kafka Native Way and put a "CHANGED -" at the very start of the body for example.

When you change the data, a relatively smaller JSON data must have been consumed in your console, something like this:

{
  "id": 2,
  "title": "Strimzi Kafka CLI: Managing Strimzi in a Kafka Native Way",
  "text": "CHANGED - Strimzi Kafka CLI is a CLI that helps traditional Apache Kafka users -mostly administrators- to easily adapt Strimzi, a Kubernetes operator for Apache Kafka.\r\n\r\nIntention here is to ramp up Strimzi usage by creating a similar CLI experience with traditional Apache Kafka binaries. \r\n\r\nkfk command stands for the usual kafka-* prefix of the Apache Kafka runnable files which are located in bin directory. There are options provided like topics, console-consumer, etc. which also mostly stand for the rest of the runnable file names like kafka-topic.sh.\r\n\r\nHowever, because of the nature of Strimzi and its capabilities, there are also unusual options like clusters which is used for cluster configuration or users which is used for user management and configuration.",
  "created_date": 1594644431000000,
  "published_date": 1594644489000000,
  "author_id": 1,
  "__op": "u",
  "__table": "posts"
}

So this will be the data that you will index in Elasticsearch. Now let's go for it!

Deploying a Kafka Connect Cluster for Camel

In order to use another connector that consumes the data from Kafka and puts it onto Elasticsearch, first we need another Kafka Connect cluster, this time for a Camel connector.

apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaConnect
metadata:
  annotations:
    strimzi.io/use-connector-resources: 'true'
  name: camel
spec:
  bootstrapServers: 'demo-kafka-bootstrap:9092'
  config:
    config.storage.replication.factor: '1'
    config.storage.topic: camel-cluster-configs
    group.id: camel-cluster
    offset.storage.replication.factor: '1'
    offset.storage.topic: camel-cluster-offsets
    status.storage.replication.factor: '1'
    status.storage.topic: camel-cluster-status
  image: 'quay.io/hguerreroo/camel-kafka-connect:0.5.0'
  jvmOptions:
    gcLoggingEnabled: false
  replicas: 1
  resources:
    limits:
      memory: 2Gi
    requests:
      memory: 2Gi

Saving or apply this YAML to your OpenShift namespace or just simply run this sample:

oc apply -f resources/kafka-connect-camel.yaml -n debezium-demo

This will create a Kafka Connect cluster with the name camel on your namespace:

Now let's put some connector on this connect cluster.

Deploy a Camel Sink connector for Elasticsearch

In order to send the consumed data to Elasticsearch we can use Apache Camel project's connectors for Kafka Connect.

The following is a sample of an Elasticsearch Sink Connector of Camel, which takes Kafka as the source and Elasticsearch as the sink.

apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaConnector
metadata:
  labels:
    strimzi.io/cluster: camel
  name: elasticsearch-connector
spec:
  class: >-
    org.apache.camel.kafkaconnector.elasticsearchrest.CamelElasticsearchrestSinkConnector
  config:
    camel.sink.endpoint.hostAddresses: 'elasticsearch-es-http:9200'
    camel.sink.endpoint.indexName: posts
    camel.sink.endpoint.operation: Index
    camel.sink.path.clusterName: elasticsearch
    key.converter: org.apache.kafka.connect.storage.StringConverter
    value.converter: org.apache.kafka.connect.storage.StringConverter
    topics: db.neverendingblog.posts
  tasksMax: 1

By saving and applying this resource you tell the connect cluster that consume the db.neverendingblog.posts topic of Kafka, and put them in a posts index in Elasticsearch.

Or just run this command to create the connector:

oc apply -f resources/kafka-connector-elastic-camel.yaml -n debezium-demo

Now the big picture should look like this:

So let's test your Elasticsearch running some curls as a search request.

Try out Elasticsearch

For Elasticsearch, just like other applications in OpenShift in order to access it externally, you should get its route with the command:

oc get routes -n debezium-demo

Let's say that we get the route as http://elasticsearch-es-http-debezium-demo.apps.cluster-jdayist-6d29.jdayist-6d29.example.opentlc.com.

So in order to see if the index is created or if it has anything inside, just run the following command for searching everything in the index:

curl -X GET \
  http://elasticsearch-es-http-debezium-demo.apps.cluster-jdayist-6d29.jdayist-6d29.example.opentlc.com/posts/_search

You should get a response that has all the changes including the one for Javaday Istanbul. So let's see if we can find it or not:

curl -X GET \
  'http://elasticsearch-es-http-debezium-demo.apps.cluster-jdayist-6d29.jdayist-6d29.example.opentlc.com/posts/_search?q=title:Javaday%20Istanbul%202020'

So you should see somethinhg like this in return:

{
    "took": 8,
    "timed_out": false,
    "_shards": {
        "total": 1,
        "successful": 1,
        "skipped": 0,
        "failed": 0
    },
    "hits": {
        "total": {
            "value": 1,
            "relation": "eq"
        },
        "max_score": 4.852654,
        "hits": [
            {
                "_index": "posts",
                "_type": "_doc",
                "_id": "8VI-FnYBP8VChxowl2Pr",
                "_score": 4.852654,
                "_source": {
                    "id": 3,
                    "title": "Javaday Istanbul 2020",
                    "text": "It was perfect as always!",
                    "created_date": 1606690949000000,
                    "published_date": null,
                    "author_id": 1,
                    "__op": "c",
                    "__table": "posts"
                }
            }
        ]
    }
}

Congratulations! You finished it ASAP! Now you can relax and may feel a little bit like a gansta:)

By the way, if you are interested in the event presentation and the demo video, here it is! (p.s. Event was in Turkish)

Join our newsletter

* indicates required



Bootstrap Kafka on Kubernetes (Strimzi) with Just 5 Commands

1 – Install Strimzi Kafka CLI

sudo pip install strimzi-kafka-cli

2 – Create project namespace on Openshift/Kubernetes

oc new-project kafka

3 – Install Strimzi Kafka Operator

kfk operator --install -n kafka

4 – Create Kafka Cluster

kfk clusters --create --cluster my-cluster -n kafka

5 – Create a Kafka topic

kfk topics --create --topic my-topic --partitions 12 --replication-factor 3 -n kafka -c my-cluster

Let’s try it out!

kfk console-producer --topic my-topic -n kafka -c my-cluster
kfk console-consumer --topic my-topic -n kafka -c my-cluster

Special thanks to Timecop1983 for the great music!

Join our newsletter

* indicates required



Simple ACL Authorization on Strimzi using Strimzi Kafka CLI

Simple ACL Authorization

In the previous example we implemented TLS authentication on Strimzi Kafka cluster with Strimzi Kafka CLI. In this example, we will be continuing with enabling the ACL authorization, so that we will be able to restrict access to our topics and only allow the users or groups we want to.

Let's first see our cluster list.

kfk clusters --list
NAMESPACE    NAME              DESIRED KAFKA REPLICAS   DESIRED ZK REPLICAS
kafka        my-cluster        3                        3

IMPORTANT

You should have a cluster called my-cluster on the namespace kafka we created before. If you don't have the cluster and haven't yet done the authentication part please go back to the previous example and do it first since for authorization you will need authentication to be set up before.

Also please copy the truststore.jks and the user.p12 files or recreate them as explained in the previous example and put it along the example folder which we ignore in git.


Considering you have the cluster my-cluster on namespace kafka, let's list our topics to see the topic we created before:

kfk topics --list -n kafka -c my-cluster
NAME                                                          PARTITIONS   REPLICATION FACTOR
consumer-offsets---84e7a678d08f4bd226872e5cdd4eb527fadc1c6a   50           3
my-topic                                                      12           3

Lastly let's list our user that we created previously, which we will be setting the authorization for.

kfk users --list -n kafka -c my-cluster
NAME      AUTHENTICATION   AUTHORIZATION
my-user   tls

As you can see we have the my-user user that we created and authenticated in the previous example.

Now let's configure our cluster to enable for ACL authorization. We have to alter our cluster for this:

kfk clusters --alter --cluster my-cluster -n kafka

and put the simple authorization definitions under kafka like the following:

authorization:
  type: simple

After saving the cluster configuration wait for the brokers to be rolling updated by checking their status:

watch kubectl get pods -n kafka

Now it's time to run the producer and consumer to check if authorization is enabled:

kfk console-producer --topic my-topic -n kafka -c my-cluster --producer.config client.properties
ERROR Error when sending message to topic my-topic with key: null, value: 4 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [my-topic]
kfk console-consumer --topic my-topic -n kafka -c my-cluster --consumer.config client.properties
ERROR Error processing message, terminating consumer process:  (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [my-topic]
Processed a total of 0 messages

As you might also observe, both the producer and consumer returned TopicAuthorizationException by saying Not authorized to access topics: [my-topic]. So let's define authorization access to this topic for the user my-user.

In order to enable user's authorization, we have to both define the user's authorization type as simple for it to use SimpleAclAuthorizer of Apache Kafka, and the ACL definitions for the relevant topic -in this case it is my-topic. To do this, we need to alter the user with the following command options:

kfk users --alter --user my-user --authorization-type simple --add-acl --resource-type topic --resource-name my-topic -n kafka -c my-cluster

The --add-acl option requires arguments like:


--operation TEXT                Operation that is being allowed or denied.
                                  (default: All)

--host TEXT                     Host which User will have access. (default:
                              *)

--type [allow|deny]             Operation type for ACL. (default: allow)
--resource-type TEXT            This argument is mutually inclusive with
                              ['add_acl', 'delete_acl']

--resource-name TEXT            This argument is mutually inclusive with
                              ['add_acl', 'delete_acl']

In this example we only used --resource-type and --resource-name since those are the required fields and others have some defaults that we could use.

So in this case we used the defaults of type:allow, host:* and operation:All. The equal command should look like this:

kfk users --alter --user my-user --authorization-type simple --add-acl --resource-type topic --resource-name my-topic --type allow --host * --operation All -n kafka -c my-cluster

In order to see the ACL that is defined for allowing all operations of my-topic for the user my-user, let's describe it, in this case as YAML format:

kfk users --describe --user my-user -n kafka -c my-cluster -o yaml
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaUser
metadata:
...
spec:
  authentication:
    type: tls
  authorization:
    type: simple
    acls:
    - host: '*'
      operation: All
      resource:
        name: my-topic
        patternType: literal
        type: topic
      type: allow
status:
...

As you can see the user has the authorization defined as simple and ACL that allows all (read, write, describe) access for my-topic from this user.

Now with the updated configuration let's run our producer and consumer again:

kfk console-producer --topic my-topic -n kafka -c my-cluster --producer.config client.properties
>message1
>message2
>message3
>

It seems that we are able to produce messages to my-topic. Let's consume those messages then:

kfk console-consumer --topic my-topic -n kafka -c my-cluster --consumer.config client.properties
ERROR Error processing message, terminating consumer process:  (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: console-consumer-96150
Processed a total of 0 messages

Whoops! It did not work like the producer. But why? Because the consumer group that is randomly generated for us (because we did not define it anywhere) doesn't have at least read permission on my-topic topic.


IMPORTANT

In Apache Kafka, if you want to consume messages you have to do it via a consumer group. You might say that "we did not specify any consumer group while using the console consumer". Well just like the traditional console consumer of Kafka, it uses a randomly created consumer group id so you have a consumer group but it is created for you (like the one above as console-consumer-96150) since we did not define one previously.


Ok then. Now let's add the ACL for a group in order to give read permission for my-topic topic. Let's call this group my-group, which we will also use it as the group id in our consumer client configuration. This time let's use kfk acls command which works like kfk users --alter --add-acl command. In order to give the best traditional experience to Strimzi CLI users, just like the traditional bin/kafka-acls.sh command, we have the kfk acls command which works mostly the same with the traditional one.

With the following command, we give the my-group group the read right for consuming the messages.

kfk acls --add --allow-principal User:my-user --group my-group --operation Read -n kafka -c my-cluster

After adding the ACL, let's check whether our user has the ACL for the group or not:

kfk users --describe --user my-user -n kafka -c my-cluster -o yaml

In the acls section of the YAML you can see the entries are added:

    - host: '*'
      operation: Read
      resource:
        name: my-group
        patternType: literal
        type: group
      type: allow

You can list the ACLs with the following command as well which lists all the ACLs Kafka natively:

kfk acls --list -n kafka -c my-cluster
Current ACLs for resource `ResourcePattern(resourceType=GROUP, name=my-group, patternType=LITERAL)`:
 	(principal=User:CN=my-user, host=*, operation=READ, permissionType=ALLOW)

Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=my-topic, patternType=LITERAL)`:
 	(principal=User:CN=my-user, host=*, operation=ALL, permissionType=ALLOW)

Or you can list topic and group ACLs seperately:

kfk acls --list --topic my-topic -n kafka -c my-cluster
Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=my-topic, patternType=LITERAL)`:
 	(principal=User:CN=my-user, host=*, operation=ALL, permissionType=ALLOW)

For the group ACLs:

kfk acls --list --group my-group -n kafka -c my-cluster
Current ACLs for resource `ResourcePattern(resourceType=GROUP, name=my-group, patternType=LITERAL)`:
 	(principal=User:CN=my-user, host=*, operation=READ, permissionType=ALLOW)

The only thing we have to do right now is to put the group id definition in our client.properties file:

security.protocol=SSL
ssl.truststore.location=./truststore.jks
ssl.truststore.password=123456
ssl.keystore.location=./user.p12
ssl.keystore.password=123456
group.id=my-group

Running the consumer again with the updated client configuration -this time consuming from the beginning- let's see the previously produced logs:

kfk console-consumer --topic my-topic -n kafka -c my-cluster --consumer.config client.properties --from-beginning
message1
message2
message3

VoilĂ !

We are able to configure the Strimzi cluster for ACL authorization, define ACLs easily with different methods and use the client configurations successfully with Strimzi Kafka CLI.

Access the repo of this post from here: https://github.com/systemcraftsman/strimzi-kafka-cli/tree/master/examples/3_simple_acl_authorization

If you are interested more, you can have a look at the short video that I explain the Simple ACL authorization example here:

Take caređź‘‹

Join our newsletter

* indicates required



TLS Authentication on Strimzi by using Strimzi Kafka CLI

TLS Authentication

In this example we will demonstrate setting up TLS authentication for Strimzi using Strimzi Kafka CLI. So let's get started!

First lets list the clusters and see our clusters list.

kfk clusters --list

IMPORTANT

If you don't have any Kafka cluster that is created on your OpenShift/Kubernetes, pls. see the Strimzi Quick Start document or simply use:

kfk clusters --create --cluster my-cluster -n kafka

Assuming we have a cluster called my-cluster already set up for us let's list the topics in the cluster

kfk topics --list -n kafka -c my-cluster

If it is a new cluster probably there is no topic living in the cluster yet. So let's create a new topic for our example.

Create a topic called my-topic with 12 partitions and replication factor 3 in my-cluster cluster

kfk topics --create --topic my-topic --partitions 12 --replication-factor 3 -n kafka -c my-cluster

Run console producer to produce messages to my-topic

kfk console-producer --topic my-topic -n kafka -c my-cluster

Run console consumer to consume messages from my-topic

kfk console-consumer --topic my-topic -n kafka -c my-cluster

After being sure to produce and consume messages without a problem, now lets enable the authentication for TLS. In Strimzi, if you want to enable authentication, there are listeners configurations that provides a couple of authentication methodologies like scram-sha-512, oauth and tls.

In order to enable the authentication we have to alter our Kafka cluster:

kfk clusters --alter --cluster my-cluster -n kafka

An editor will be opened in order to change the Strimzi Kafka cluster configuration. Since Strimzi Kafka cluster resource has many items inside, for now, we don’t have any special property flag in order to directly set the value while altering. That’s why we only open the cluster custom resource available for editing.

In the opened editor we have to add the following listeners as:

listeners:
  plain: {}
  tls:
    authentication:
      type: tls

If you want to fully secure your cluster you have to also change the plain listener for authentication, because with the upper configuration unless we use a client configuration that doesn’t use SSL security protocol it will use the plain one which doesn’t require any authentication. In order to do that, we can tell the plain listener in cluster config to use one of the authentication methodologies among scram-sha-512 or oauth. In this example we will set it as scram-sha-512 but we will show the authentication via scram-sha-512 in another example.

So the latest listener definition should be like this:

listeners:
  plain:
    authentication:
      type: scram-sha-512
  tls:
    authentication:
      type: tls

Save the file and see the successfully edited message.

After the configuration change all the broker pods will be updated one by one, thanks to our operator. You can watch the pods state since we have to wait till each of them are in ready state.

watch kubectl get pods -n kafka

Now lets run our console producer and consumer again and see what happens:

kfk console-producer --topic my-topic -n kafka -c my-cluster
kfk console-consumer --topic my-topic -n kafka -c my-cluster

You got some WARN log messages saying disconnected (org.apache.kafka.clients.NetworkClient) from both producer and consumer right?

When we check the first pod logs that we ran the producer and consumer commands we can see the failed authentication message:

kubectl logs -f my-cluster-kafka-0 -c kafka -n kafka
2020-09-22 11:18:33,122 INFO [SocketServer brokerId=0] Failed authentication with /10.130.2.58 (Unexpected Kafka request of type METADATA during SASL handshake.) (org.apache.kafka.common.network.Selector) [data-plane-kafka-network-thread-0-ListenerName(PLAIN-9092)-SASL_PLAINTEXT-3]

Since we are not yet using SSL for authentication, but the PLAIN connection method, which we set up as scram-sha-512, we can not authenticate to the Strimzi Kafka cluster.

In order to login this cluster via SSL authentication we have to;

  • Create a user that uses TLS authentication
  • Create truststore and keystore files by getting the certificates from Openshift/Kubernetes cluster
  • Create a client.properties file that is to be used by producer and consumer clients in order to be able to authenticate via TLS

Let's first create the user with the name my-user:

kfk users --create --user my-user --authentication-type tls -n kafka -c my-cluster

After creating the user let's describe it to view a few attributes:

kfk users --describe --user my-user -n kafka -c my-cluster

At the bottom of the details of the user; in the status section, you can see a secret and a username definition:

Name:         my-user
Namespace:    kafka
Labels:       strimzi.io/cluster=my-cluster
Annotations:  <none>
API Version:  kafka.strimzi.io/v1beta1
Kind:         KafkaUser
Metadata:
  Creation Timestamp:  2020-09-21T12:54:52Z
  Generation:          3
  Resource Version:    53996010
  Self Link:           /apis/kafka.strimzi.io/v1beta1/namespaces/kafka/kafkausers/my-user
  UID:                 1c1dad0c-4e7a-4e63-933c-a785e6941021
Spec:
  Authentication:
    Type:  tls
Status:
  Observed Generation:     3
  Secret:                  my-user
  Username:                CN=my-user
Events:                    <none>

This means that a secret named my-user is created for this user and with the username CN=my-user as a common name definition.

In the secrets there are private and public keys that should be imported in the truststore and the keystore files that will be created shortly.

kubectl describe secret/my-user -n kafka
Name:         my-user
Namespace:    kafka
Labels:       app.kubernetes.io/instance=my-user
              app.kubernetes.io/managed-by=strimzi-user-operator
              app.kubernetes.io/name=strimzi-user-operator
              app.kubernetes.io/part-of=strimzi-my-user
              strimzi.io/cluster=my-cluster
              strimzi.io/kind=KafkaUser
Annotations:  <none>

Type:  Opaque

Data
====
ca.crt:         1164 bytes
user.crt:       1009 bytes
user.key:       1704 bytes
user.p12:       2364 bytes
user.password:  12 bytes

In order create the truststore and keystore files just run the get_keys.sh file in the example directory:

chmod a+x ./get_keys.sh;./get_keys.sh 

This will generate two files:

  • truststore.jks for the client's truststore definition
  • user.p12 for the client's keystore definition

TLS authentications are made with bidirectional TLS handshake. In order to do this apart from a truststore that has the public key imported, a keystore file that has both the public and private keys has to be created and defined in the client configuration file.

So let's create our client configuration file.

Our client configuration should have a few definitions like:

  • Security protocol
  • Truststore location and password
  • Keystore location and password

Security protocol should be SSL and since the truststore and keystore files are located in the example directory the client config file should be something like this:

security.protocol=SSL
ssl.truststore.location=./truststore.jks
ssl.truststore.password=123456
ssl.keystore.location=./user.p12
ssl.keystore.password=123456

Since the get_keys.sh script sets the store passwords as 123456 we use it in the config file.

Save it as client.properties (or just use the one that is already created in this directory with the name client.properties)

Now it's time to test it. Let's call the console producer and consumer again, but this time with the client configuration:


IMPORTANT

Be careful to run producer and consumer commands from example's directory. Otherwise you have to change the truststore and keystore paths in the client.properties file.


kfk console-producer --topic my-topic -n kafka -c my-cluster --producer.config client.properties

The console producer seems to be working just fine since we can produce messages.

>message1
>message2
>message3
>

Let's run the console consumer to consume the just produced messages:

kfk console-consumer --topic my-topic -n kafka -c my-cluster --consumer.config client.properties
message1
message2
message3

Worked like a charm!

We are able to configure the Strimzi cluster and use the client configurations for TLS authentication easily with Strimzi Kafka CLI.

Access the repo of this post from here: https://github.com/systemcraftsman/strimzi-kafka-cli/tree/master/examples/2_tls_authentication

If you are interested more, you can have a look at the short video that I explain the TLS authentication example here:

Take caređź‘‹

Join our newsletter

* indicates required



Page 1 of 2