I was lucky to use Landoop Lenses for a few weeks on my current project. This is a user-friendly Kafka UI tool, comes with a ton of functionality. I was able to inspect and query topic messages, delete and create topics, check the status of consumer groups, look at Avro schema, quickly count messages in the topic, etc. Everything was a few clicks away. Well, this happy story ends here, sadly it was decommissioned recently.
Basically, we were left without “eyes” in Kafka. Yes, we still able to send and receive messages in applications, but that’s all happened in the background. Most of the time I need to see which data is transferred between apps, sometimes data will be transformed in one app and passed to the other, so visual representation was critical for me to debug.
How can I have all this info without any UI tool? The answer is simple, Kafka scripts! You can download them on Apache or get them from Confluent(make sure to download “Open Source”). Only difference is that Apache doesn’t have avro support. That’s why I choose to download Confluent one.
Once you downloaded library, you need to point your PATH to the Confluent_or_Apache_Home/bin. You do it in .bash_profile or maybe .zshrc, depends what you are using.
export PATH=/Users/karengrygoryan/CLI-Tools/confluent-5.0.0/bin:$PATH
Before moving to the next step lets spin up Kafka cluster in a local docker container.
docker run --rm -p 2181:2181 -p 3030:3030 -p 8081-8083:8081-8083 \ -p 9581-9585:9581-9585 -p 9092:9092 -e ADV_HOST=127.0.0.1 \ landoop/fast-data-dev:latest
Next step in setup is to define Kafka cluster environment variables, e.g. bootstrap servers, zookeeper server, schema registry, etc. Again open your .bash_profile/.zshrc and add those environment variables, we will use them later in our scripts.
export PATH=/Users/karengrygoryan/CLI-Tools/confluent-5.0.0/bin:$PATH ZK_DEV="127.0.0.1:2181" export ZK_DEV BRK_DEV="127.0.0.1:9092" export BRK_DEV SCH_REG_DEV="http://127.0.0.1:8081" export SCH_REG_DEV
Once this done, we can write our first script, which will create a topic:
dev-create-topic.sh
#!/usr/bin/env bash # $ZK_DEV - environment variable we defined earlier # $1 - topic name # $2 - number of partitions # $3 - replication factor kafka-topics --zookeeper $ZK_DEV --create --topic $1 --partitions $2 --replication-factor $3
Script to delete topic:
dev-delete-topic.sh
#!/usr/bin/env bash kafka-topics --zookeeper $ZK_DEV --delete --topic $1
Script to list topic with “search”:
dev-list-topic.sh
#!/usr/bin/env bash kafka-topics --list --zookeeper $ZK_DEV | egrep "$1"
Script to list consumer groups with “search”:
dev-cons-list.sh
#!/usr/bin/env bash kafka-consumer-groups --list --bootstrap-server $BRK_DEV | egrep "$1"
Script to inspect consumer group:
dev-cons.sh
#!/usr/bin/env bash kafka-consumer-groups --bootstrap-server $BRK_DEV --describe --group $1
Script to count number of messages in topic:
dev-count-topic.sh
#!/usr/bin/env bash kafka-run-class kafka.tools.GetOffsetShell --broker-list $BRK_DEV --topic $1 --time -1 --offsets 1 | awk -F ":" '{sum += $3} END {print sum}'
Script to describe topic:
dev-desc-topic.sh
#!/usr/bin/env bash kafka-topics --zookeeper $ZK_DEV --describe --topic $1
Script to consume from topic:
dev-consume-topic.sh
#!/usr/bin/env bash kafka-console-consumer --bootstrap-server $BRK_DEV --topic $1 --from-beginning
Script to consume avro messages from topic and write them to local file, {topic name}-topic.json:
dev-avro-consume-topic.sh
#!/usr/bin/env bash var=$(kafka-run-class kafka.tools.GetOffsetShell --broker-list $BRK_DEV --topic $1 --time -1 --offsets 1 | awk -F ":" '{sum += $3} END {print sum}') kafka-avro-console-consumer --topic $1 --bootstrap-server $BRK_DEV --from-beginning --property schema.registry.url=$SCH_REG_DEV --max-messages $var > $1-topic.json
Script produce to the topic:
dev-produce-topic.sh
#!/usr/bin/env bash kafka-console-producer --broker-list $BRK_DEV --topic $1
I saved all my scripts in one directory called “Kafka/KafkaScripts/dev” and then I added this directory to my PATH.
export PATH=/Users/karengrygoryan/CLI-Tools/confluent-5.0.0/bin:/Users/karengrygoryan/Programming/Kafka/KafkaScripts/dev:$PATH ZK_DEV="127.0.0.1:2181" export ZK_DEV BRK_DEV="127.0.0.1:9092" export BRK_DEV SCH_REG_DEV="http://127.0.0.1:8081" export SCH_REG_DEV
Don’t forget to do
source ~/.zshrc or source ~/.bash_profile
Let’s create our first topic:
➜ ~ dev-create-topic.sh my-topic 1 1 1 Created topic "my-topic".
List topics:
➜ ~ dev-list-topic.sh __consumer_offsets _schemas backblaze_smart connect-configs connect-offsets connect-statuses coyote-test-avro coyote-test-binary coyote-test-json logs_broker my-topic nyc_yellow_taxi_trip_data reddit_posts sea_vessel_position_reports telecom_italia_data telecom_italia_grid
Push to the topic:
➜ ~ dev-produce-topic.sh my-topic >hello >world
Consume from the topic:
➜ ~ dev-consume-topic.sh my-topic hello world
List consumer groups:
➜ ~ dev-cons-list.sh console-consumer-93851
Describe consumer group:
➜ ~ dev-cons.sh console-consumer-93851 Consumer group 'console-consumer-93851' has no active members. TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID my-topic 0 2 2 0 - - -
Delete topic:
➜ ~ dev-delete-topic.sh my-topic Topic my-topic is marked for deletion. Note: This will have no impact if delete.topic.enable is not set to true.
Consume Avro messages and save them in the file:
➜ ~ dev-avro-consume-topic.sh reddit_posts Processed a total of 30000 messages ➜ ~ ls reddit_posts-topic.json ... ... ...
Query our file, let’s look if we have “hello” somewhere in reddit_posts topic, I used jq for nice json formatting:
➜ ~ grep "hello" reddit_posts-topic.json | jq -C { "created_utc": 1430439051, "ups": 1, "subreddit_id": "t5_2s7yr", "link_id": "t3_34criw", "name": "t1_cqugmtq", "score_hidden": 0, "author_flair_css_class": null, "author_flair_text": null, "subreddit": "stopdrinking", "id": "cqugmtq", "removal_reason": null, "gilded": 0, "downs": 0, "archived": false, "author": "hellomeru", "score": 1, "retrieved_on": 1432703259, "body": "That's awesome, and you're totally right. I think my head is still trying to get me to justify having at least one when I know that's just not going to work", "distinguished": null, "edited": 0, "controversiality": false, "parent_id": "t1_cqtstbl" } ...
Indeed, we have “author”: “hellomeru”. So that’s one of the basic ways you can “query” topic. Querying topic/file with this approach will be limited to your familiarity with different terminal/cli tools. Here I listed only a tiny bit of all possibilities you can do with scripts. Pls, explore more on Confluent and Apache website.
Thanks for reading, hopefully, you will find this info helpful and will be able to apply this knowledge.