{ "cells": [ { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "08b3c927-0b10-42d3-823b-b04dd2def3ac", "showTitle": false, "tableResultSettingsMap": {}, "title": "" } }, "source": [ "# Connect Spark with Redis Cloud\n", "Spark and Redis together unlock powerful capabilities for data professionals. This guide demonstrates how to integrate these technologies for enhanced analytics, real-time processing, and machine learning applications.\n", "\n", "In this hands-on notebook, you'll learn how to make efficient use of Redis data structures alongside Spark's distributed computing framework. You'll see firsthand how to extract data from Redis, process it in Spark, and write results back to Redis for application use. Key topics include:\n", "1. Setting up the Spark-Redis connector in Databricks\n", "2. Writing data to Redis from Spark\n", "3. Reading data from Redis for application access\n", "\n", "## Databricks Cluster Setup with Redis Connector\n", "\n", "1. Set up a new Databricks cluster\n", "1. Go to the cluster's **Libraries** section\n", "1. Select **Install New**\n", "1. Choose **Maven** as your source and click **Search Packages**\n", "1. Enter `redis-spark-connector` and select `com.redis:redis-spark-connector:x.y.z`\n", "1. Finalize by clicking **Install**
\n", "Want to explore the connector's full capabilities? Check the [detailed documentation](https://redis-field-engineering.github.io/redis-spark)" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "aafb8639-2a2f-4c0c-911d-aa4eec118608", "showTitle": false, "tableResultSettingsMap": {}, "title": "" } }, "source": [ "## Setting Up Redis Cloud Environment\n", "\n", "Redis Cloud offers a fully-managed Redis service ideal for this integration. Follow these steps:\n", "\n", "1. Register for a [Redis Cloud account](https://redis.io/cloud/)\n", "1. Follow the [quickstart guide](https://redis.io/docs/latest/operate/rc/rc-quickstart/) to create a free tier database\n", "1. From your Redis Cloud database dashboard in the Configuration tab click on Import. In the dialog window select Google Cloud Storage as the source type, paste the following in the source path: [gs://jrx/nobels.rdb](gs://jrx/nobels.rdb), and finally click Import.\n", "1. Examine the keys you've imported using **Redis Insight**, Redis' visual data browser. From your Redis Cloud database dashboard, click on **Redis Insight** and explore the data we imported in the previous step. \n", "\n", "## Configuring Spark with Redis Connection Details\n", "\n", "1. From your Redis Cloud database dashboard, find your connection endpoint under **Connect**. The string follows this pattern: `redis://:@:`\n", "1. In Databricks, open your cluster settings and locate **Advanced Options**. Under **Spark** in the **Spark config** text area, add your Redis connection string as both `spark.redis.read.connection.uri redis://...` and `spark.redis.write.connection.uri redis://...` parameters. This configuration applies to all notebooks using this cluster. Note that it is recommended to use secrets to store sensitive Redis URIs. Refer to the [Redis Spark documentation](https://redis-field-engineering.github.io/redis-spark/#_databricks) for more details." ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "6a73287f-3746-4126-be49-24931838a670", "showTitle": false, "tableResultSettingsMap": {}, "title": "" } }, "source": [ "## Reading from Redis\n", "\n", "To read data from Redis use the following line." ] }, { "cell_type": "code", "execution_count": 0, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "574c93b6-8c31-4f4e-b5f8-e9414845c71c", "showTitle": false, "tableResultSettingsMap": {}, "title": "" } }, "outputs": [], "source": [ "df = spark.read.format(\"redis\").option(\"keyType\", \"hash\").load()\n", "display(df)" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "63658b45-2684-440f-9d2d-b0d84bb4af8f", "showTitle": false, "tableResultSettingsMap": {}, "title": "" } }, "source": [ "## Writing to Redis\n", "\n", "Let's use the `df` data we imported earlier and write it back to Redis as JSON. Refresh **Redis Insight** and notice the new JSON keys prefixed with `spark:nobel`." ] }, { "cell_type": "code", "execution_count": 0, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "37786ad3-b0ec-49a9-8839-75ff907e4cae", "showTitle": false, "tableResultSettingsMap": {}, "title": "" } }, "outputs": [], "source": [ "df.write.format(\"redis\").option(\"type\", \"json\").option(\"keyspace\", \"spark:nobel\").option(\"key\", \"id\").mode(\"append\").save()" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "caf94ef3-5337-401f-b2d2-a43ff58b8e22", "showTitle": false, "tableResultSettingsMap": {}, "title": "" } }, "source": [ "## Reading from Redis in Streaming Mode\n", "\n", "The following code reads data from the Redis stream `nobels`, appending data to a streaming in-memory dataframe." ] }, { "cell_type": "code", "execution_count": 0, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "b72cc53e-9c7d-4e10-a828-83a98b1a9021", "showTitle": false, "tableResultSettingsMap": {}, "title": "" } }, "outputs": [], "source": [ "streamDf = spark.readStream.format(\"redis\").option(\"type\", \"stream\").option(\"streamKey\", \"nobels\").load()\n", "query = streamDf.writeStream.format(\"memory\").queryName(\"nobels\").outputMode(\"append\").trigger(processingTime=\"1 second\").start()\n", "\n", "import time\n", "time.sleep(3)\n", "display(spark.sql(\"SELECT * FROM nobels\"))" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "db0205c2-3764-42c7-b90b-008afa288a89", "showTitle": false, "tableResultSettingsMap": {}, "title": "" } }, "source": [ "With the previously created streaming dataframe still running we can add data to the stream and see the dataframe receiving that new data. In **Redis Insight** change the \"All Key Types\" filter to only show keys of **Stream** type. Double-click on the `nobels` stream and click `New Entry`. Add the following fields: `category`: `physics`, `id`: `123`, `share`: `1`, `year`: `2025`, `firstName`, `lastName`, and `motivation`. Hit `Save` and run the query again. You should now see your entry in at the bottom of the table:" ] }, { "cell_type": "code", "execution_count": 0, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "551e6a88-a9f8-48ec-82b7-34f7f8326569", "showTitle": false, "tableResultSettingsMap": {}, "title": "" } }, "outputs": [], "source": [ "display(spark.sql(\"SELECT * FROM nobels\"))" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "f6ba51ee-705b-4e92-89e6-8530978ba987", "showTitle": false, "tableResultSettingsMap": {}, "title": "" } }, "source": [ "## Writing to Redis in Streaming Mode\n", "\n", "We can also write to Redis in streaming mode.\n", "1. Replace ``, ``, and `` with names for a Unity Catalog volume." ] }, { "cell_type": "code", "execution_count": 0, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "51a55f3b-8912-417d-8f78-57aad0cf86ca", "showTitle": false, "tableResultSettingsMap": {}, "title": "" } }, "outputs": [], "source": [ "catalog = \"\"\n", "schema = \"\"\n", "volume = \"\"\n", "\n", "path_volume = f\"/Volumes/{catalog}/{schema}/{volume}\"\n", "checkpoint_dir = f\"{path_volume}/mycp\"\n", "dbutils.fs.mkdirs(checkpoint_dir)\n", "\n", "streamDf.writeStream.format(\"redis\").outputMode(\"append\") \\\n", " .option(\"type\", \"hash\") \\\n", " .option(\"keyspace\", \"spark:nobel\") \\\n", " .option(\"key\", \"id\") \\\n", " .option(\"checkpointLocation\", checkpoint_dir) \\\n", " .start()\n" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "b494d5a2-3187-4f8a-8632-33fee60d6492", "showTitle": false, "tableResultSettingsMap": {}, "title": "" } }, "source": [ "In **Redis Insight** select keys with the pattern `spark:nobel:*`. You should see hashes corresponding to the entries in the `nobels` that we used previously. If you add other entries to the stream like we did in the *Reading from Redis in Streaming Mode* section, you will them reflected in that `spark:nobel` keyspace." ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "49fa26c8-cd66-49f8-a5d4-b0c29098ffc6", "showTitle": false, "tableResultSettingsMap": {}, "title": "" } }, "source": [] } ], "metadata": { "application/vnd.databricks.v1+notebook": { "computePreferences": null, "dashboards": [], "environmentMetadata": null, "inputWidgetPreferences": null, "language": "python", "notebookMetadata": { "pythonIndentUnit": 4 }, "notebookName": "redis-spark-notebook", "widgets": {} }, "language_info": { "name": "python" } }, "nbformat": 4, "nbformat_minor": 0 }