{"nbformat":4,"nbformat_minor":0,"metadata":{"colab":{"name":"2022-01-25-kafka-colab.ipynb","provenance":[{"file_id":"https://github.com/recohut/nbs/blob/main/raw/T301207%20%7C%20Setup%20Kafka%20on%20Colab.ipynb","timestamp":1644669790532}],"collapsed_sections":[],"authorship_tag":"ABX9TyNPfg1LHOA8qg/SFGPQhL2G"},"kernelspec":{"name":"python3","display_name":"Python 3"},"language_info":{"name":"python"}},"cells":[{"cell_type":"markdown","source":["# Setup Kafka in Colab"],"metadata":{"id":"X1hs_38u9Stu"}},{"cell_type":"markdown","metadata":{"id":"qpqm3c_YbxwO"},"source":["Download and setup Kafka and Zookeeper instances"]},{"cell_type":"code","metadata":{"colab":{"base_uri":"https://localhost:8080/"},"id":"XuUIW7WAbY_K","executionInfo":{"status":"ok","timestamp":1629007882610,"user_tz":-330,"elapsed":20307,"user":{"displayName":"Sparsh Agarwal","photoUrl":"","userId":"13037694610922482904"}},"outputId":"e6062e63-7a00-44a2-d115-ee0bb4762d11"},"source":["!curl -sSOL https://downloads.apache.org/kafka/2.7.0/kafka_2.13-2.7.0.tgz\n","!tar -xzf kafka_2.13-2.7.0.tgz\n","!./kafka_2.13-2.7.0/bin/zookeeper-server-start.sh -daemon ./kafka_2.13-2.7.0/config/zookeeper.properties\n","!./kafka_2.13-2.7.0/bin/kafka-server-start.sh -daemon ./kafka_2.13-2.7.0/config/server.properties\n","!echo \"Waiting for 10 secs until kafka and zookeeper services are up and running\"\n","!sleep 10"],"execution_count":null,"outputs":[{"output_type":"stream","text":["Waiting for 10 secs until kafka and zookeeper services are up and running\n"],"name":"stdout"}]},{"cell_type":"code","metadata":{"colab":{"base_uri":"https://localhost:8080/"},"id":"alx-OsSddQhi","executionInfo":{"status":"ok","timestamp":1629008244304,"user_tz":-330,"elapsed":940,"user":{"displayName":"Sparsh Agarwal","photoUrl":"","userId":"13037694610922482904"}},"outputId":"0254dd6b-5640-46cb-88ca-f2434104dfe6"},"source":["!ls ./kafka_2.13-2.7.0/bin/"],"execution_count":null,"outputs":[{"output_type":"stream","text":["connect-distributed.sh\t kafka-preferred-replica-election.sh\n","connect-mirror-maker.sh kafka-producer-perf-test.sh\n","connect-standalone.sh\t kafka-reassign-partitions.sh\n","kafka-acls.sh\t\t kafka-replica-verification.sh\n","kafka-broker-api-versions.sh kafka-run-class.sh\n","kafka-configs.sh\t kafka-server-start.sh\n","kafka-console-consumer.sh kafka-server-stop.sh\n","kafka-console-producer.sh kafka-streams-application-reset.sh\n","kafka-consumer-groups.sh kafka-topics.sh\n","kafka-consumer-perf-test.sh kafka-verifiable-consumer.sh\n","kafka-delegation-tokens.sh kafka-verifiable-producer.sh\n","kafka-delete-records.sh trogdor.sh\n","kafka-dump-log.sh\t windows\n","kafka-features.sh\t zookeeper-security-migration.sh\n","kafka-leader-election.sh zookeeper-server-start.sh\n","kafka-log-dirs.sh\t zookeeper-server-stop.sh\n","kafka-mirror-maker.sh\t zookeeper-shell.sh\n"],"name":"stdout"}]},{"cell_type":"code","metadata":{"id":"LtMSDDqabJGn"},"source":["!pip install kafka-python"],"execution_count":null,"outputs":[]},{"cell_type":"code","metadata":{"id":"jNzVgmR0bRKn"},"source":["from kafka import KafkaProducer\n","from kafka.errors import KafkaError"],"execution_count":null,"outputs":[]},{"cell_type":"markdown","metadata":{"id":"VmMjdL_wcLnT"},"source":["### Create a Kafka Topic\n","Before we start developing our Kafka Streams application, we should pre-create any topics we expect it to interact with. For example, to create a Kafka topic named tweets, we can run the following command:"]},{"cell_type":"code","metadata":{"colab":{"base_uri":"https://localhost:8080/"},"id":"-FAZUE7ncPkb","executionInfo":{"status":"ok","timestamp":1629008028073,"user_tz":-330,"elapsed":3807,"user":{"displayName":"Sparsh Agarwal","photoUrl":"","userId":"13037694610922482904"}},"outputId":"d54e5bee-1295-420b-eb3b-181b7df89bf1"},"source":["!./kafka_2.13-2.7.0/bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 1 --topic tweets"],"execution_count":null,"outputs":[{"output_type":"stream","text":["Created topic tweets.\n"],"name":"stdout"}]},{"cell_type":"markdown","metadata":{"id":"zKMRhFW8cvd9"},"source":["### List Kafka Topics\n","Another common task is to list the available Kafka topics in your local cluster. You can do this by passing the --list flag to the kafka-topics script. The full command is shown below:"]},{"cell_type":"code","metadata":{"colab":{"base_uri":"https://localhost:8080/"},"id":"ndgkqgQPcT8d","executionInfo":{"status":"ok","timestamp":1629008145129,"user_tz":-330,"elapsed":3407,"user":{"displayName":"Sparsh Agarwal","photoUrl":"","userId":"13037694610922482904"}},"outputId":"e29e2886-b262-4301-ff11-4b0445d01d66"},"source":["# !./kafka_2.13-2.7.0/bin/kafka-topics.sh --describe --bootstrap-server 127.0.0.1:9092 --topic tweets\n","!./kafka_2.13-2.7.0/bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list"],"execution_count":null,"outputs":[{"output_type":"stream","text":["tweets\n"],"name":"stdout"}]},{"cell_type":"markdown","metadata":{"id":"gxPLqVKtchqJ"},"source":["### Producing Test Data\n","We often need to produce test data to our local Kafka topics to observe our code in action. Some common methods for doing this are described next. The simplest method for producing data to a Kafka topic is to use the kafka-console-producer script. If you run the following command, you will be dropped into a prompt where you can type each message that you want to produce to the tweets topic."]},{"cell_type":"code","metadata":{"colab":{"base_uri":"https://localhost:8080/"},"id":"CxV-SUBmdNYY","executionInfo":{"status":"ok","timestamp":1629008654135,"user_tz":-330,"elapsed":28309,"user":{"displayName":"Sparsh Agarwal","photoUrl":"","userId":"13037694610922482904"}},"outputId":"9608e8eb-f183-4a06-b0c0-5138d16f126b"},"source":["!./kafka_2.13-2.7.0/bin/kafka-console-producer.sh \\\n"," --bootstrap-server localhost:9092 \\\n"," --topic tweets \\\n"," --property 'key.separator=|' \\\n"," --property 'parse.key=true'"],"execution_count":null,"outputs":[{"output_type":"stream","text":[">1|{\"id\": 1, \"name\": \"Elyse\"}\n",">2|{\"id\": 2, \"name\": \"Mitch\"}\n",">"],"name":"stdout"}]},{"cell_type":"markdown","metadata":{"id":"j3JQ2ecjdnN7"},"source":["The --bootstrap-server and --topic flags tell the script to which Kafka cluster and topic we intend to produce data. The last two flags are optional, but they are useful when we want to specify a message key in addition to a message value. In this case, we will be producing both a message key and a message value, and the message key will appear before the | (our key separator)."]},{"cell_type":"markdown","metadata":{"id":"_hVJmv2udy5I"},"source":["### Verifying\n","You can verify that the data was produced to your Kafka topic by reading the data back out using a Kafka consumer. The kafka-console-consumer script can be used for this purpose. Click the following command to see an example usage:"]},{"cell_type":"code","metadata":{"colab":{"base_uri":"https://localhost:8080/"},"id":"3-iJ2sAWeCaq","executionInfo":{"status":"ok","timestamp":1629008686343,"user_tz":-330,"elapsed":15030,"user":{"displayName":"Sparsh Agarwal","photoUrl":"","userId":"13037694610922482904"}},"outputId":"eaec5a2c-5e04-4a5d-f091-4416809a1228"},"source":["!./kafka_2.13-2.7.0/bin/kafka-console-consumer.sh \\\n"," --bootstrap-server localhost:9092 \\\n"," --topic tweets \\\n"," --from-beginning \\\n"," --property print.key=true"],"execution_count":null,"outputs":[{"output_type":"stream","text":["1\t{\"id\": 1, \"name\": \"Elyse\"} 2|{\"id\": 2, \"name\": \"Mitch\"}\n","1\t{\"id\": 1, \"name\": \"Elyse\"}\n","2\t{\"id\": 2, \"name\": \"Mitch\"}\n","Processed a total of 3 messages\n"],"name":"stdout"}]},{"cell_type":"markdown","metadata":{"id":"p0k1V6SeeKX4"},"source":["### Producing Test Data From a File\n","In the previous step, we manually typed in each of our topic inputs using a prompt. However, this can be time-consuming and repetitive if we need to do it on a regular basis.\n","\n","Another option is to save all of your inputs to a file, and use the kafka-console-producer script to produce all of the messages in the file to the topic of your choice. The benefit of this approach is you can save a file with test data alongside your application code, and repopulate your application's source topic deterministically."]},{"cell_type":"code","metadata":{"colab":{"base_uri":"https://localhost:8080/"},"id":"IcMYE2YTfJvf","executionInfo":{"status":"ok","timestamp":1629008866825,"user_tz":-330,"elapsed":635,"user":{"displayName":"Sparsh Agarwal","photoUrl":"","userId":"13037694610922482904"}},"outputId":"7da227bf-8156-4699-f9c3-00dab54f5c84"},"source":["%%writefile inputs.txt\n","3|{\"id\": 3, \"name\": \"Isabelle\"}\n","4|{\"id\": 4, \"name\": \"Chloe\"}"],"execution_count":null,"outputs":[{"output_type":"stream","text":["Writing inputs.txt\n"],"name":"stdout"}]},{"cell_type":"code","metadata":{"id":"w7EmL-rwlZUA"},"source":["%%writefile inputs2.txt\n","1|{\"CreatedAt\":1577933872630,\"Id\":10005,\"Text\":\"Bitcoin has a lot of promise. I'm not too sure about #ethereum\",\"Lang\":\"en\",\"Retweet\":false,\"Source\":\"\",\"User\":{\"Id\":\"14377870\",\"Name\":\"MagicalPipelines\",\"Description\":\"Learn something magical today.\",\"ScreenName\":\"MagicalPipelines\",\"URL\":\"http://www.magicalpipelines.com\",\"FollowersCount\":\"248247\",\"FriendsCount\":\"16417\"}}\n","2|{\"CreatedAt\":1577933871912,\"Id\":10006,\"Text\":\"RT Bitcoin has a lot of promise. I'm not too sure about #ethereum\",\"Lang\":\"en\",\"Retweet\":true,\"Source\":\"\",\"User\":{\"Id\":\"14377871\",\"Name\":\"MagicalPipelines\",\"Description\":\"\",\"ScreenName\":\"Mitch\",\"URL\":\"http://blog.mitchseymour.com/\",\"FollowersCount\":\"120\",\"FriendsCount\":\"120\"}}"],"execution_count":null,"outputs":[]},{"cell_type":"code","metadata":{"colab":{"base_uri":"https://localhost:8080/"},"id":"we3lxLL0fqi4","executionInfo":{"status":"ok","timestamp":1629008914127,"user_tz":-330,"elapsed":2995,"user":{"displayName":"Sparsh Agarwal","photoUrl":"","userId":"13037694610922482904"}},"outputId":"d80738c3-430f-4b15-fd1c-18bc3c11459c"},"source":[" !./kafka_2.13-2.7.0/bin/kafka-console-producer.sh \\\n"," --bootstrap-server localhost:9092 \\\n"," --topic tweets \\\n"," --property 'parse.key=true' \\\n"," --property 'key.separator=|' < inputs.txt"],"execution_count":null,"outputs":[{"output_type":"stream","text":[">>>"],"name":"stdout"}]},{"cell_type":"markdown","metadata":{"id":"mAGwhuhpf1gt"},"source":["### Verifying\n","Once again, you can verify using the kafka-console-consumer script. Click the following command to verify that the messages in our file were produced to the tweets topic."]},{"cell_type":"code","metadata":{"colab":{"base_uri":"https://localhost:8080/"},"id":"S_9yg-4Ef5hK","executionInfo":{"status":"ok","timestamp":1629008958653,"user_tz":-330,"elapsed":17831,"user":{"displayName":"Sparsh Agarwal","photoUrl":"","userId":"13037694610922482904"}},"outputId":"f667b398-55ec-47b8-ea48-8b5426f19bff"},"source":["!./kafka_2.13-2.7.0/bin/kafka-console-consumer.sh \\\n"," --bootstrap-server localhost:9092 \\\n"," --topic tweets \\\n"," --from-beginning \\\n"," --property print.key=true"],"execution_count":null,"outputs":[{"output_type":"stream","text":["1\t{\"id\": 1, \"name\": \"Elyse\"} 2|{\"id\": 2, \"name\": \"Mitch\"}\n","1\t{\"id\": 1, \"name\": \"Elyse\"}\n","2\t{\"id\": 2, \"name\": \"Mitch\"}\n","3\t{\"id\": 3, \"name\": \"Isabelle\"}\n","3\t{\"id\": 4, \"name\": \"Chloe\"}\n","Processed a total of 5 messages\n"],"name":"stdout"}]}]}