{ "cells": [ { "cell_type": "code", "execution_count": null, "id": "b6675043", "metadata": {}, "outputs": [], "source": [ "from kafka import KafkaConsumer\n", "from time import sleep\n", "from json import dumps,loads\n", "import json\n", "from s3fs import S3FileSystem" ] }, { "cell_type": "code", "execution_count": null, "id": "9eeff3ef", "metadata": {}, "outputs": [], "source": [ "consumer = KafkaConsumer(\n", " 'demo_test',\n", " bootstrap_servers=[':9092'], #add your IP here\n", " value_deserializer=lambda x: loads(x.decode('utf-8')))" ] }, { "cell_type": "code", "execution_count": null, "id": "eda5a608", "metadata": {}, "outputs": [], "source": [ "# for c in consumer:\n", "# print(c.value)" ] }, { "cell_type": "code", "execution_count": null, "id": "8d60dc6c", "metadata": {}, "outputs": [], "source": [ "s3 = S3FileSystem()" ] }, { "cell_type": "code", "execution_count": null, "id": "0f135e81", "metadata": {}, "outputs": [], "source": [ "for count, i in enumerate(consumer):\n", " with s3.open(\"s3://<>/stock_market_{}.json\".format(count), 'w') as file:\n", " json.dump(i.value, file) " ] }, { "cell_type": "code", "execution_count": null, "id": "7b811cb6", "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.10 (v3.9.10:f2f3f53782, Jan 13 2022, 17:02:14) \n[Clang 6.0 (clang-600.0.57)]" }, "vscode": { "interpreter": { "hash": "aee8b7b246df8f9039afb4144a1f6fd8d2ca17a180786b69acc140d282b71a49" } } }, "nbformat": 4, "nbformat_minor": 5 }