{ "cells": [ { "cell_type": "code", "execution_count": null, "id": "48abe67f-0609-4c63-9904-59464f39ae19", "metadata": {}, "outputs": [], "source": [ "Oracle AI Data Platform v1.0\n", "\n", "Copyright © 2025, Oracle and/or its affiliates.\n", "\n", "Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/" ] }, { "cell_type": "markdown", "id": "67cd2293-baa8-4651-a6b3-ade2027a910c", "metadata": { "execution": { "iopub.status.busy": "2025-09-15T23:32:43.347Z" }, "type": "markdown" }, "source": [ "# Reading Data Streams from Kafka with Spark Structured Streaming\n", "\n", "This example shows how to configure **Spark Structured Streaming** to consume messages from an ** OCI Streaming (Kafka-compatible) endpoint**" ] }, { "cell_type": "code", "execution_count": null, "id": "f813229d-e370-4c10-a41a-9fbddbe81467", "metadata": { "type": "python" }, "outputs": [], "source": [ "dataStreamReader = spark.readStream \\\n", ".format(\"kafka\") \\\n", ".option(\"kafka.bootstrap.servers\", \"cell-1.streaming.us-phoenix-1.oci.oraclecloud.com:9092\") \\\n", ".option(\"subscribe\", \"word-streampool\") \\\n", ".option(\"kafka.security.protocol\", \"SASL_SSL\") \\\n", ".option(\"kafka.max.partition.fetch.bytes\", 1024 * 1024) \\\n", ".option(\"startingOffsets\",\"latest\") \\\n", ".option(\"maxOffsetsPerTrigger\",\"5\")\n", "\n", "kafkaUsername = \"USERNAME\"\n", "kafkaPassword = \"PASSWORD\"\n", "\n", "dataStreamReader.option(\"kafka.sasl.mechanism\", \"PLAIN\") \\\n", ".option(\"kafka.sasl.jaas.config\", \"org.apache.kafka.common.security.plain.PlainLoginModule required username=\\\"\" + kafkaUsername + \"\\\" password=\\\"\" + kafkaPassword + \"\\\";\")\n", " \n", "\n", "lines = dataStreamReader \\\n", ".load() \\\n", ".selectExpr(\"CAST(value AS STRING)\")" ] }, { "cell_type": "code", "execution_count": 1, "id": "b75744f8-866e-4aa1-953f-12ec382e0afa", "metadata": { "cellType": "streaming", "chart": { "ef0f42b4-4fcb-4e82-9299-3b541d886102": [ { "batchDuration": 5159, "input_rows_per_second": 0, "processed_rows_per_second": 0, "timestamp": "04:34:02 PM" }, { "batchDuration": 5159, "input_rows_per_second": 0, "processed_rows_per_second": 0, "timestamp": "04:34:02 PM" }, { "batchDuration": 5159, "input_rows_per_second": 0, "processed_rows_per_second": 0, "timestamp": "04:34:02 PM" }, { "batchDuration": 5159, "input_rows_per_second": 0, "processed_rows_per_second": 0, "timestamp": "04:34:02 PM" }, { "batchDuration": 5159, "input_rows_per_second": 0, "processed_rows_per_second": 0, "timestamp": "04:34:02 PM" }, { "batchDuration": 5159, "input_rows_per_second": 0, "processed_rows_per_second": 0, "timestamp": "04:34:02 PM" }, { "batchDuration": 5159, "input_rows_per_second": 0, "processed_rows_per_second": 0, "timestamp": "04:34:02 PM" }, { "batchDuration": 5159, "input_rows_per_second": 0, "processed_rows_per_second": 0, "timestamp": "04:34:02 PM" }, { "batchDuration": 5159, "input_rows_per_second": 0, "processed_rows_per_second": 0, "timestamp": "04:34:02 PM" }, { "batchDuration": 5159, "input_rows_per_second": 0, "processed_rows_per_second": 0, "timestamp": "04:34:02 PM" }, { "batchDuration": 5159, "input_rows_per_second": 0, "processed_rows_per_second": 0, "timestamp": "04:34:02 PM" }, { "batchDuration": 5159, "input_rows_per_second": 0, "processed_rows_per_second": 0, "timestamp": "04:34:02 PM" }, { "batchDuration": 5159, "input_rows_per_second": 0, "processed_rows_per_second": 0, "timestamp": "04:34:02 PM" }, { "batchDuration": 5159, "input_rows_per_second": 0, "processed_rows_per_second": 0, "timestamp": "04:34:02 PM" }, { "batchDuration": 5159, "input_rows_per_second": 0, "processed_rows_per_second": 0, "timestamp": "04:34:02 PM" } ] }, "events": { "ef0f42b4-4fcb-4e82-9299-3b541d886102": { "batch_id": 0, "duration_ms": { "add_batch": 3177, "commit_offsets": 1623, "get_batch": 93, "latest_offset": 0, "query_planning": 8, "trigger_execution": 5159, "wal_commit": 0 }, "id": "ef0f42b4-4fcb-4e82-9299-3b541d886102", "input_rows_per_second": 0, "name": "OCIStreamingSource", "num_input_rows": 0, "processed_rows_per_second": 0, "run_id": "3e29ae80-ec8c-4512-af9b-9300e9de7812", "sink": { "description": "DeltaSink[/Volumes/default/default/streaming/kafkaStreamingSink]", "num_output_rows": -1 }, "sources": [ { "description": "KafkaV2[Subscribe[word-streampool]]", "end_offset": null, "input_rows_per_second": 0, "latest_offset": null, "num_input_rows": 0, "processed_rows_per_second": 0, "start_offset": null } ], "state_operators": [], "timestamp": "2025-09-15T23:34:02.461000+00:00" } }, "execution": { "iopub.status.busy": "2025-09-15T23:34:03.968Z" }, "type": "python" }, "outputs": [ { "data": { "application/vnd.stream+json": "{\"events\":{\"ef0f42b4-4fcb-4e82-9299-3b541d886102\":{\"batch_id\":0,\"duration_ms\":{\"add_batch\":3177,\"commit_offsets\":1623,\"get_batch\":93,\"latest_offset\":0,\"query_planning\":8,\"trigger_execution\":5159,\"wal_commit\":0},\"id\":\"ef0f42b4-4fcb-4e82-9299-3b541d886102\",\"input_rows_per_second\":0,\"name\":\"OCIStreamingSource\",\"num_input_rows\":0,\"processed_rows_per_second\":0,\"run_id\":\"3e29ae80-ec8c-4512-af9b-9300e9de7812\",\"sink\":{\"description\":\"DeltaSink[/Volumes/default/default/streaming/kafkaStreamingSink]\",\"num_output_rows\":-1},\"sources\":[{\"description\":\"KafkaV2[Subscribe[word-streampool]]\",\"end_offset\":null,\"input_rows_per_second\":0,\"latest_offset\":null,\"num_input_rows\":0,\"processed_rows_per_second\":0,\"start_offset\":null}],\"state_operators\":[],\"timestamp\":\"2025-09-15T23:34:02.461000+00:00\"}},\"chart\":{\"ef0f42b4-4fcb-4e82-9299-3b541d886102\":[{\"batchDuration\":5159,\"input_rows_per_second\":0,\"processed_rows_per_second\":0,\"timestamp\":\"04:34:02 PM\"},{\"batchDuration\":5159,\"input_rows_per_second\":0,\"processed_rows_per_second\":0,\"timestamp\":\"04:34:02 PM\"},{\"batchDuration\":5159,\"input_rows_per_second\":0,\"processed_rows_per_second\":0,\"timestamp\":\"04:34:02 PM\"},{\"batchDuration\":5159,\"input_rows_per_second\":0,\"processed_rows_per_second\":0,\"timestamp\":\"04:34:02 PM\"},{\"batchDuration\":5159,\"input_rows_per_second\":0,\"processed_rows_per_second\":0,\"timestamp\":\"04:34:02 PM\"},{\"batchDuration\":5159,\"input_rows_per_second\":0,\"processed_rows_per_second\":0,\"timestamp\":\"04:34:02 PM\"},{\"batchDuration\":5159,\"input_rows_per_second\":0,\"processed_rows_per_second\":0,\"timestamp\":\"04:34:02 PM\"},{\"batchDuration\":5159,\"input_rows_per_second\":0,\"processed_rows_per_second\":0,\"timestamp\":\"04:34:02 PM\"},{\"batchDuration\":5159,\"input_rows_per_second\":0,\"processed_rows_per_second\":0,\"timestamp\":\"04:34:02 PM\"},{\"batchDuration\":5159,\"input_rows_per_second\":0,\"processed_rows_per_second\":0,\"timestamp\":\"04:34:02 PM\"},{\"batchDuration\":5159,\"input_rows_per_second\":0,\"processed_rows_per_second\":0,\"timestamp\":\"04:34:02 PM\"},{\"batchDuration\":5159,\"input_rows_per_second\":0,\"processed_rows_per_second\":0,\"timestamp\":\"04:34:02 PM\"},{\"batchDuration\":5159,\"input_rows_per_second\":0,\"processed_rows_per_second\":0,\"timestamp\":\"04:34:02 PM\"},{\"batchDuration\":5159,\"input_rows_per_second\":0,\"processed_rows_per_second\":0,\"timestamp\":\"04:34:02 PM\"},{\"batchDuration\":5159,\"input_rows_per_second\":0,\"processed_rows_per_second\":0,\"timestamp\":\"04:34:02 PM\"}]}}" }, "metadata": { "application/vnd.stream+json": { "height": "auto", "width": "auto" }, "modelType": "streaming_event" }, "output_type": "display_data" }, { "name": "stderr", "output_type": "stream", "text": [ "opc-request-id: 4D04F81F6ABB4CD2B1064FB859804188\n", "\n", "Request is cancelled by the user." ] } ], "source": [ "query = lines \\\n", ".writeStream \\\n", ".queryName(\"OCIStreamingSource\") \\\n", ".format(\"delta\") \\\n", ".option(\"checkpointLocation\", \"/Volumes/default/default/streaming/kafkaStreamingCheckpoint\") \\\n", ".start(\"/Volumes/default/default/streaming/kafkaStreamingSink\")" ] } ], "metadata": { "Last_Active_Cell_Index": 2, "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.12" } }, "nbformat": 4, "nbformat_minor": 5 }