{ "cells": [ { "cell_type": "markdown", "id": "5d856347-d81f-426f-8abc-b562fd1bcf19", "metadata": { "nteract": { "transient": { "deleting": false } } }, "source": [ "# Multivariate anomaly detection in Real-Time Analytics in Microsoft Fabric\n", "\n", "For the full tutorial, see https://aka.ms/mvad-rti" ] }, { "cell_type": "code", "execution_count": null, "id": "fd10b8ec-384c-4a1b-b2ab-8f505a5da0c8", "metadata": { "jupyter": { "outputs_hidden": false, "source_hidden": false }, "microsoft": { "language": "python", "language_group": "synapse_pyspark" }, "nteract": { "transient": { "deleting": false } } }, "outputs": [], "source": [ "import numpy as np\n", "import pandas as pd" ] }, { "cell_type": "markdown", "id": "556320f9", "metadata": {}, "source": [ "## Load the table for training the model\n", "\n", "Spark needs an ABFSS URI to securely connect to OneLake storage, so the next step defines this function to convert the OneLake URI to ABFSS URI." ] }, { "cell_type": "code", "execution_count": null, "id": "93a7bfa1-5a33-4140-8c12-614b1aa29176", "metadata": { "jupyter": { "outputs_hidden": false, "source_hidden": false }, "microsoft": { "language": "python", "language_group": "synapse_pyspark" }, "nteract": { "transient": { "deleting": false } } }, "outputs": [], "source": [ "def convert_onelake_to_abfss(onelake_uri):\n", " if not onelake_uri.startswith('https://'):\n", " raise ValueError(\"Invalid OneLake URI. It should start with 'https://'.\")\n", " uri_without_scheme = onelake_uri[8:]\n", " parts = uri_without_scheme.split('/')\n", " if len(parts) < 3:\n", " raise ValueError(\"Invalid OneLake URI format.\")\n", " account_name = parts[0].split('.')[0]\n", " container_name = parts[1]\n", " path = '/'.join(parts[2:])\n", " abfss_uri = f\"abfss://{container_name}@{parts[0]}/{path}\"\n", " return abfss_uri" ] }, { "cell_type": "markdown", "id": "4edeb178-3b1c-49a7-b948-776253e0df31", "metadata": { "microsoft": { "language": "python", "language_group": "synapse_pyspark" }, "nteract": { "transient": { "deleting": false } } }, "source": [ "Use the Onelake URI copied from part 6 of the tutorial to convert it to ABFSS URI.\n" ] }, { "cell_type": "code", "execution_count": null, "id": "5a582664-dd5c-4cc3-9ea5-c4b9697e8e5e", "metadata": { "jupyter": { "outputs_hidden": false, "source_hidden": false }, "microsoft": { "language": "python", "language_group": "synapse_pyspark" }, "nteract": { "transient": { "deleting": false } } }, "outputs": [], "source": [ "onelake_uri = \"OneLakeTableURI\" # Replace with your OneLake table URI \n", "abfss_uri = convert_onelake_to_abfss(onelake_uri)\n", "print(abfss_uri)" ] }, { "cell_type": "markdown", "id": "d5526da1-854c-469c-aa28-d586119ddd66", "metadata": { "microsoft": { "language": "python", "language_group": "synapse_pyspark" }, "nteract": { "transient": { "deleting": false } } }, "source": [ "Load the table and convert it to a pandas dataframe" ] }, { "cell_type": "code", "execution_count": null, "id": "f1d48fe0-a49f-4499-9092-77a83aad17f9", "metadata": { "jupyter": { "outputs_hidden": false, "source_hidden": false }, "microsoft": { "language": "python", "language_group": "synapse_pyspark" }, "nteract": { "transient": { "deleting": false } } }, "outputs": [], "source": [ "df = spark.read.format('delta').load(abfss_uri)\n", "df = df.toPandas().set_index('Date')\n", "print(df.shape)\n", "df[:3]" ] }, { "cell_type": "markdown", "id": "77873729", "metadata": {}, "source": [ "## Prepare the training dataframe\n", "\n", "The actual predictions will be run on data by the Eventhouse in [part 9 Predict-anomalies-in-the-kql-queryset](https://learn.microsoft.com/fabric/real-time-intelligence/multivariate-anomaly-detection#part-9--predict-anomalies-in-the-kql-queryset). In a production scenario, if you were streaming data into the eventhouse, the predictions would be made on the new streaming data. For the purpose of the tutorial, the dataset has been split by date into two sections for training and prediction. This is to simulate historical data and new streaming data." ] }, { "cell_type": "code", "execution_count": null, "id": "fe8bb0c4-1696-4c20-beff-156f4cc0a606", "metadata": { "jupyter": { "outputs_hidden": false, "source_hidden": false }, "microsoft": { "language": "python", "language_group": "synapse_pyspark" }, "nteract": { "transient": { "deleting": false } } }, "outputs": [], "source": [ "features_cols = ['AAPL', 'AMZN', 'GOOG', 'MSFT', 'SPY']\n", "cutoff_date = '2023-01-01'" ] }, { "cell_type": "code", "execution_count": null, "id": "de1e1a10-8894-4d1e-a436-825bcf79e428", "metadata": { "jupyter": { "outputs_hidden": false, "source_hidden": false }, "microsoft": { "language": "python", "language_group": "synapse_pyspark" }, "nteract": { "transient": { "deleting": false } } }, "outputs": [], "source": [ "train_df = df[df.index < cutoff_date]\n", "print(train_df.shape)\n", "train_df[:3]" ] }, { "cell_type": "code", "execution_count": null, "id": "70627dcc-aaa6-4c18-81b1-ec1510ec78f6", "metadata": { "jupyter": { "outputs_hidden": false, "source_hidden": false }, "microsoft": { "language": "python", "language_group": "synapse_pyspark" }, "nteract": { "transient": { "deleting": false } } }, "outputs": [], "source": [ "train_len = len(train_df)\n", "predict_len = len(df) - train_len\n", "print(f'Total samples: {len(df)}. Split to {train_len} for training, {predict_len} for testing')" ] }, { "cell_type": "markdown", "id": "34c4324c-e0dc-4f4b-9b22-2bb635df63ea", "metadata": { "nteract": { "transient": { "deleting": false } } }, "source": [ "## Train the model and register it" ] }, { "cell_type": "code", "execution_count": null, "id": "0dbe5c1d-8369-4063-bfe5-dae4e834f095", "metadata": { "jupyter": { "outputs_hidden": false, "source_hidden": false }, "microsoft": { "language": "python", "language_group": "synapse_pyspark" }, "nteract": { "transient": { "deleting": false } } }, "outputs": [], "source": [ "from anomaly_detector import MultivariateAnomalyDetector\n", "model = MultivariateAnomalyDetector()" ] }, { "cell_type": "code", "execution_count": null, "id": "14b84dae-cc1a-42db-9c78-4135f79727f8", "metadata": { "jupyter": { "outputs_hidden": false, "source_hidden": false }, "microsoft": { "language": "python", "language_group": "synapse_pyspark" }, "nteract": { "transient": { "deleting": false } } }, "outputs": [], "source": [ "sliding_window = 200\n", "params = {\"sliding_window\": sliding_window}" ] }, { "cell_type": "code", "execution_count": null, "id": "3631f464-32e2-4d2c-991d-3321f52d51f6", "metadata": { "jupyter": { "outputs_hidden": false, "source_hidden": false }, "microsoft": { "language": "python", "language_group": "synapse_pyspark" }, "nteract": { "transient": { "deleting": false } } }, "outputs": [], "source": [ "model.fit(train_df, params=params)" ] }, { "cell_type": "code", "execution_count": null, "id": "f518d4ee", "metadata": {}, "outputs": [], "source": [ "model_name = \"mvad_5_stocks_model\"" ] }, { "cell_type": "code", "execution_count": null, "id": "8024acef-91f2-4e5f-8b6d-3dcb267851ab", "metadata": { "jupyter": { "outputs_hidden": false, "source_hidden": false }, "microsoft": { "language": "python", "language_group": "synapse_pyspark" }, "nteract": { "transient": { "deleting": false } } }, "outputs": [], "source": [ "import mlflow\n", "\n", "with mlflow.start_run():\n", " mlflow.log_params(params)\n", " mlflow.set_tag(\"Training Info\", \"MVAD on 5 Stocks Dataset\")\n", "\n", " model_info = mlflow.pyfunc.log_model(\n", " python_model=model,\n", " artifact_path=\"mvad_artifacts\",\n", " registered_model_name=model_name,\n", " )" ] }, { "cell_type": "markdown", "id": "05b521c7", "metadata": {}, "source": [ "## Extract the registered model path to be used for prediction using Kusto Python sandbox" ] }, { "cell_type": "code", "execution_count": null, "id": "5b2e333d-492e-4b22-9e6f-2020a2a1a2d4", "metadata": { "jupyter": { "outputs_hidden": false, "source_hidden": false }, "microsoft": { "language": "python", "language_group": "synapse_pyspark" }, "nteract": { "transient": { "deleting": false } } }, "outputs": [], "source": [ "from mlflow.tracking import MlflowClient\n", "\n", "client = MlflowClient()\n", "mvs = client.search_model_versions(f\"name='{model_name}'\")\n", "latest = max(mvs, key=lambda v: v.creation_timestamp)\n", "model_abfss = latest.source\n", "print(model_abfss)" ] } ], "metadata": { "a365ComputeOptions": null, "dependencies": { "environment": {}, "lakehouse": null }, "kernel_info": { "name": "synapse_pyspark" }, "kernelspec": { "display_name": "Synapse PySpark", "language": "Python", "name": "synapse_pyspark" }, "language_info": { "name": "python" }, "microsoft": { "language": "python", "language_group": "synapse_pyspark", "ms_spell_check": { "ms_spell_check_language": "en" } }, "notebook_environment": {}, "nteract": { "version": "nteract-front-end@1.0.0" }, "save_output": true, "sessionKeepAliveTimeout": 0, "spark_compute": { "compute_id": "/trident/default", "session_options": { "conf": {}, "enableDebugMode": false } }, "synapse_widget": { "state": {}, "version": "0.1" }, "widgets": {} }, "nbformat": 4, "nbformat_minor": 5 }