# Setup Kafka in Colab

Download and setup Kafka and Zookeeper instances

In [None]:
!curl -sSOL https://downloads.apache.org/kafka/2.7.0/kafka_2.13-2.7.0.tgz
!tar -xzf kafka_2.13-2.7.0.tgz
!./kafka_2.13-2.7.0/bin/zookeeper-server-start.sh -daemon ./kafka_2.13-2.7.0/config/zookeeper.properties
!./kafka_2.13-2.7.0/bin/kafka-server-start.sh -daemon ./kafka_2.13-2.7.0/config/server.properties
!echo "Waiting for 10 secs until kafka and zookeeper services are up and running"
!sleep 10

Waiting for 10 secs until kafka and zookeeper services are up and running


In [None]:
!ls ./kafka_2.13-2.7.0/bin/

connect-distributed.sh	      kafka-preferred-replica-election.sh
connect-mirror-maker.sh       kafka-producer-perf-test.sh
connect-standalone.sh	      kafka-reassign-partitions.sh
kafka-acls.sh		      kafka-replica-verification.sh
kafka-broker-api-versions.sh  kafka-run-class.sh
kafka-configs.sh	      kafka-server-start.sh
kafka-console-consumer.sh     kafka-server-stop.sh
kafka-console-producer.sh     kafka-streams-application-reset.sh
kafka-consumer-groups.sh      kafka-topics.sh
kafka-consumer-perf-test.sh   kafka-verifiable-consumer.sh
kafka-delegation-tokens.sh    kafka-verifiable-producer.sh
kafka-delete-records.sh       trogdor.sh
kafka-dump-log.sh	      windows
kafka-features.sh	      zookeeper-security-migration.sh
kafka-leader-election.sh      zookeeper-server-start.sh
kafka-log-dirs.sh	      zookeeper-server-stop.sh
kafka-mirror-maker.sh	      zookeeper-shell.sh


In [None]:
!pip install kafka-python

In [None]:
from kafka import KafkaProducer
from kafka.errors import KafkaError

### Create a Kafka Topic
Before we start developing our Kafka Streams application, we should pre-create any topics we expect it to interact with. For example, to create a Kafka topic named tweets, we can run the following command:

In [None]:
!./kafka_2.13-2.7.0/bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 1 --topic tweets

Created topic tweets.


### List Kafka Topics
Another common task is to list the available Kafka topics in your local cluster. You can do this by passing the --list flag to the kafka-topics script. The full command is shown below:

In [None]:
# !./kafka_2.13-2.7.0/bin/kafka-topics.sh --describe --bootstrap-server 127.0.0.1:9092 --topic tweets
!./kafka_2.13-2.7.0/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list

tweets


### Producing Test Data
We often need to produce test data to our local Kafka topics to observe our code in action. Some common methods for doing this are described next. The simplest method for producing data to a Kafka topic is to use the kafka-console-producer script. If you run the following command, you will be dropped into a prompt where you can type each message that you want to produce to the tweets topic.

In [None]:
!./kafka_2.13-2.7.0/bin/kafka-console-producer.sh \
    --bootstrap-server localhost:9092 \
    --topic tweets \
    --property 'key.separator=|' \
    --property 'parse.key=true'

>1|{"id": 1, "name": "Elyse"}
>2|{"id": 2, "name": "Mitch"}
>

The --bootstrap-server and --topic flags tell the script to which Kafka cluster and topic we intend to produce data. The last two flags are optional, but they are useful when we want to specify a message key in addition to a message value. In this case, we will be producing both a message key and a message value, and the message key will appear before the | (our key separator).

### Verifying
You can verify that the data was produced to your Kafka topic by reading the data back out using a Kafka consumer. The kafka-console-consumer script can be used for this purpose. Click the following command to see an example usage:

In [None]:
!./kafka_2.13-2.7.0/bin/kafka-console-consumer.sh \
    --bootstrap-server localhost:9092 \
    --topic tweets \
    --from-beginning \
    --property print.key=true

1	{"id": 1, "name": "Elyse"} 2|{"id": 2, "name": "Mitch"}
1	{"id": 1, "name": "Elyse"}
2	{"id": 2, "name": "Mitch"}
Processed a total of 3 messages


### Producing Test Data From a File
In the previous step, we manually typed in each of our topic inputs using a prompt. However, this can be time-consuming and repetitive if we need to do it on a regular basis.

Another option is to save all of your inputs to a file, and use the kafka-console-producer script to produce all of the messages in the file to the topic of your choice. The benefit of this approach is you can save a file with test data alongside your application code, and repopulate your application's source topic deterministically.

In [None]:
%%writefile inputs.txt
3|{"id": 3, "name": "Isabelle"}
4|{"id": 4, "name": "Chloe"}

Writing inputs.txt


In [None]:
%%writefile inputs2.txt
1|{"CreatedAt":1577933872630,"Id":10005,"Text":"Bitcoin has a lot of promise. I'm not too sure about #ethereum","Lang":"en","Retweet":false,"Source":"","User":{"Id":"14377870","Name":"MagicalPipelines","Description":"Learn something magical today.","ScreenName":"MagicalPipelines","URL":"http://www.magicalpipelines.com","FollowersCount":"248247","FriendsCount":"16417"}}
2|{"CreatedAt":1577933871912,"Id":10006,"Text":"RT Bitcoin has a lot of promise. I'm not too sure about #ethereum","Lang":"en","Retweet":true,"Source":"","User":{"Id":"14377871","Name":"MagicalPipelines","Description":"","ScreenName":"Mitch","URL":"http://blog.mitchseymour.com/","FollowersCount":"120","FriendsCount":"120"}}

In [None]:
 !./kafka_2.13-2.7.0/bin/kafka-console-producer.sh \
  --bootstrap-server localhost:9092 \
  --topic tweets \
  --property 'parse.key=true' \
  --property 'key.separator=|' < inputs.txt

>>>

### Verifying
Once again, you can verify using the kafka-console-consumer script. Click the following command to verify that the messages in our file were produced to the tweets topic.

In [None]:
!./kafka_2.13-2.7.0/bin/kafka-console-consumer.sh \
    --bootstrap-server localhost:9092 \
    --topic tweets \
    --from-beginning \
    --property print.key=true

1	{"id": 1, "name": "Elyse"} 2|{"id": 2, "name": "Mitch"}
1	{"id": 1, "name": "Elyse"}
2	{"id": 2, "name": "Mitch"}
3	{"id": 3, "name": "Isabelle"}
3	{"id": 4, "name": "Chloe"}
Processed a total of 5 messages
