{
"cells": [
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
">### 🚩 *Create a free WhyLabs account to get more value out of whylogs!*
\n",
">*Did you know you can store, visualize, and monitor whylogs profiles with the [WhyLabs Observability Platform](https://whylabs.ai/whylogs-free-signup?utm_source=whylogs-Github&utm_medium=whylogs-example&utm_campaign=Pyspark_Constraints)? Sign up for a [free WhyLabs account](https://whylabs.ai/whylogs-free-signup?utm_source=whylogs-Github&utm_medium=whylogs-example&utm_campaign=Pyspark_Constraints) to leverage the power of whylogs and WhyLabs together!*"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"# Data Validation for Spark Dataframes with whylogs"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"[![Open in Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/whylabs/whylogs/blob/mainline/python/examples/tutorials/Pyspark_and_Constraints.ipynb)"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"In this example, we will show how you can perform data validation for profiles that were created from a PySpark dataframe. This example is an advanced scenario that combines three topics. If you want to know more about each of these topics, please refer to the following tutorials:\n",
"\n",
"1. [How to create a whylogs profile from a PySpark dataframe](https://nbviewer.org/github/whylabs/whylogs/blob/mainline/python/examples/integrations/Pyspark_Profiling.ipynb)\n",
"2. [How to create user defined condition count metrics](https://nbviewer.org/github/whylabs/whylogs/blob/mainline/python/examples/advanced/Condition_Count_Metrics.ipynb)\n",
"3. [How to create and visualize constraints](https://nbviewer.org/github/whylabs/whylogs/blob/mainline/python/examples/basic/Constraints_Suite.ipynb)\n",
"\n",
"In this example, we will:\n",
"- Create a PySpark dataframe\n",
"- Create a whylogs profile from a PySpark dataframe\n",
"- Create two condition count metrics to check date format and url addresses\n",
"- Create and visualize a set of constraints based on the condition count and other standard metrics"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"## About the Dataset - 🛏️ Airbnb Listings in Rio de Janeiro, Brazil\n",
"\n",
"We will read data made available from Airbnb. It's a listing dataset from the city of Rio de Janeiro, Brazil. We'll access data that was adapted from the following location: \"http://data.insideairbnb.com/brazil/rj/rio-de-janeiro/2021-01-26/data/listings.csv.gz\"\n",
"\n",
"In this example, we want to do some basic data validation. Let's define those:\n",
"\n",
"- Completeness Checks\n",
" - `id` (long): should not contain any missing values\n",
" - `listing_url` (string): should not contain any missing values\n",
" - `last_review` (string): should not contain any missing values\n",
"- Consistency Checks\n",
" - `last_review` (string): date should be in the format YYYY-MM-DD\n",
" - `listing_url` (string): should be an url from airbnb (starting with https://www.airbnb.com/rooms/)\n",
" - `latitude` and `longitude` (double): should be within the range of -24 to -22 and -44 to -43 respectively\n",
" - `room_type` (string): frequent strings should be in the set of expected values\n",
"- Statistics Checks\n",
" - `reviews_per_month` (double): standard deviation should be in expected range\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Installing the extra dependency\n",
"\n",
"As we want to enable users to have exactly what they need to use from whylogs, the `pyspark` integration comes as an extra dependency. In order to have it available, simply uncomment and run the following cell:"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"# Note: you may need to restart the kernel to use updated packages.\n",
"%pip install 'whylogs[spark]'"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Initializing a SparkSession\n",
"\n",
"Here we will initialize a SparkSession. I'm also setting the `pyarrow` execution config, because it makes our methods even more performant. \n",
"\n",
">**IMPORTANT**: Make sure you have Spark 3.0+ available in your environment, as our implementation relies on it for a smoother integration"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from pyspark.sql import SparkSession\n",
"\n",
"spark = SparkSession.builder.appName('whylogs-testing').getOrCreate()\n",
"arrow_config_key = \"spark.sql.execution.arrow.pyspark.enabled\"\n",
"spark.conf.set(arrow_config_key, \"true\")"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"## Creating the PySpark dataframe\n",
"\n",
"\n",
"This is a relatively small dataset, so we can run this example locally."
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
" \r"
]
}
],
"source": [
"from pyspark import SparkFiles\n",
"\n",
"data_url = \"https://whylabs-public.s3.us-west-2.amazonaws.com/whylogs_examples/Listings/airbnb_listings.parquet\"\n",
"spark.sparkContext.addFile(data_url)\n",
"\n",
"spark_dataframe = spark.read.parquet(SparkFiles.get(\"airbnb_listings.parquet\"))"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"-RECORD 0--------------------------------------\n",
" name | Very Nice 2Br in ... \n",
" description | Discounts for lon... \n",
" listing_url | https://www.airbn... \n",
" last_review | 2020-12-26 \n",
" number_of_reviews_ltm | 13 \n",
" number_of_reviews_l30d | 0 \n",
" id | 17878 \n",
" latitude | -22.96592 \n",
" longitude | -43.17896 \n",
" availability_365 | 286 \n",
" bedrooms | 2.0 \n",
" bathrooms | null \n",
" reviews_per_month | 2.01 \n",
" room_type | Entire home/apt \n",
"only showing top 1 row\n",
"\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
" \r"
]
}
],
"source": [
"spark_dataframe.show(n=1, vertical=True)"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"root\n",
" |-- name: string (nullable = true)\n",
" |-- description: string (nullable = true)\n",
" |-- listing_url: string (nullable = true)\n",
" |-- last_review: string (nullable = true)\n",
" |-- number_of_reviews_ltm: long (nullable = true)\n",
" |-- number_of_reviews_l30d: long (nullable = true)\n",
" |-- id: long (nullable = true)\n",
" |-- latitude: double (nullable = true)\n",
" |-- longitude: double (nullable = true)\n",
" |-- availability_365: long (nullable = true)\n",
" |-- bedrooms: double (nullable = true)\n",
" |-- bathrooms: double (nullable = true)\n",
" |-- reviews_per_month: double (nullable = true)\n",
" |-- room_type: string (nullable = true)\n",
"\n"
]
}
],
"source": [
"spark_dataframe.printSchema()"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"## Creating the Condition Count Metrics\n",
"\n",
"To create a profile with the standard metrics, we can simply call `collect_dataset_profile_view` from whylog's PySpark extra module. However, if we look at our defined set of constraints, there are two of those that need to checked agains individual values:\n",
"\n",
"- `last_review` (string): date should be in the format YYYY-MM-DD\n",
"- `listing_url` (string): should be an url from airbnb (starting with https://www.airbnb.com/rooms/)\n",
"\n",
"As opposed to the other constraints, that can be checked against aggregate metrics, these two need to be checked against individual values. For that, we will create two condition count metrics. Later on, we will create metric constraints based on these metrics."
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [],
"source": [
"import datetime\n",
"from whylogs.core.relations import Predicate\n",
"from typing import Any\n",
"from whylogs.core.metrics.condition_count_metric import Condition\n",
"from whylogs.core.schema import DeclarativeSchema\n",
"from whylogs.core.resolvers import STANDARD_RESOLVER\n",
"from whylogs.core.specialized_resolvers import ConditionCountMetricSpec\n",
"\n",
"def date_format(x: Any) -> bool:\n",
" date_format = '%Y-%m-%d'\n",
" try:\n",
" datetime.datetime.strptime(x, date_format)\n",
" return True\n",
" except ValueError:\n",
" return False\n",
"\n",
"last_review_conditions = {\"is_date_format\": Condition(Predicate().is_(date_format))}\n",
"listing_url_conditions = {\"url_matches_airbnb_domain\": Condition(Predicate().matches(\"^https:\\/\\/www.airbnb.com\\/rooms\"))}"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"Now that we have the our set of conditions for both columns, we can create the condition count metrics. We can do so by creating a Standard Schema and then extending it by adding the condition count metrics with `add_condition_count_metrics`:"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [],
"source": [
"schema = DeclarativeSchema(STANDARD_RESOLVER)\n",
"\n",
"schema.add_resolver_spec(column_name=\"last_review\", metrics=[ConditionCountMetricSpec(last_review_conditions)])\n",
"schema.add_resolver_spec(column_name=\"listing_url\", metrics=[ConditionCountMetricSpec(listing_url_conditions)])"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"> To know more about condition count metrics and how to use them, check out the [Metric Constraints with Condition Count Metrics](https://nbviewer.org/github/whylabs/whylogs/blob/mainline/python/examples/advanced/Metric_Constraints_with_Condition_Count_Metrics.ipynb) example."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Profiling the PySpark DataFrame\n",
"\n",
"Now, we can use the schema to pass to our logger through `collect_dataset_profile_view`"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
" \r"
]
}
],
"source": [
"from whylogs.api.pyspark.experimental import collect_dataset_profile_view\n",
"\n",
"dataset_profile_view = collect_dataset_profile_view(input_df=spark_dataframe, schema=schema)"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"This will create a profile with the standard metrics, as well as the two condition count metrics that we created. As a sanity check, let's see the metrics for the `last_review` column:"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"['types',\n",
" 'cardinality',\n",
" 'counts',\n",
" 'distribution',\n",
" 'frequent_items',\n",
" 'condition_count']"
]
},
"execution_count": 9,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"dataset_profile_view.get_column(\"last_review\").get_metric_names()"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"## Creating and Visualizing Metric Constraints\n",
"\n",
"We have all that we need to build our set of constraints. We will use out-of-the-box factory constraints to do that:"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[ReportResult(name='last_review meets condition is_date_format', passed=1, failed=0, summary=None),\n",
" ReportResult(name='last_review has no missing values', passed=0, failed=1, summary=None),\n",
" ReportResult(name='listing_url meets condition url_matches_airbnb_domain', passed=1, failed=0, summary=None),\n",
" ReportResult(name='listing_url has no missing values', passed=1, failed=0, summary=None),\n",
" ReportResult(name='latitude is in range [-24,-22]', passed=1, failed=0, summary=None),\n",
" ReportResult(name='longitude is in range [-44,-43]', passed=1, failed=0, summary=None),\n",
" ReportResult(name='id has no missing values', passed=1, failed=0, summary=None),\n",
" ReportResult(name='reviews_per_month standard deviation between 0.8 and 1.1 (inclusive)', passed=1, failed=0, summary=None),\n",
" ReportResult(name=\"room_type values in set {'Shared room', 'Hotel room', 'Private room', 'Entire home/apt'}\", passed=1, failed=0, summary=None)]"
]
},
"execution_count": 10,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"from whylogs.core.constraints.factories import condition_meets\n",
"from whylogs.core.constraints import ConstraintsBuilder\n",
"from whylogs.core.constraints.factories import no_missing_values\n",
"from whylogs.core.constraints.factories import is_in_range\n",
"from whylogs.core.constraints.factories import stddev_between_range\n",
"from whylogs.core.constraints.factories import frequent_strings_in_reference_set\n",
"\n",
"builder = ConstraintsBuilder(dataset_profile_view=dataset_profile_view)\n",
"reference_set = {\"Entire home/apt\", \"Private room\", \"Shared room\", \"Hotel room\"}\n",
"\n",
"builder.add_constraint(condition_meets(column_name=\"last_review\", condition_name=\"is_date_format\"))\n",
"builder.add_constraint(condition_meets(column_name=\"listing_url\", condition_name=\"url_matches_airbnb_domain\"))\n",
"builder.add_constraint(no_missing_values(column_name=\"last_review\"))\n",
"builder.add_constraint(no_missing_values(column_name=\"listing_url\"))\n",
"builder.add_constraint(is_in_range(column_name=\"latitude\",lower=-24,upper=-22))\n",
"builder.add_constraint(is_in_range(column_name=\"longitude\",lower=-44,upper=-43))\n",
"builder.add_constraint(no_missing_values(column_name=\"id\"))\n",
"builder.add_constraint(stddev_between_range(column_name=\"reviews_per_month\", lower=0.8, upper=1.1))\n",
"builder.add_constraint(frequent_strings_in_reference_set(column_name=\"room_type\", reference_set=reference_set))\n",
"\n",
"constraints = builder.build()\n",
"constraints.generate_constraints_report()"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"> If you're interested in a more complete list of helper constraints, please check out the [Constraints Suite](https://nbviewer.org/github/whylabs/whylogs/blob/mainline/python/examples/basic/Constraints_Suite.ipynb) example."
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"Now, we can visualize the constraints report using the __Notebook Profile Visualizer__:"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"Connection error. Skip stats collection.\n"
]
},
{
"data": {
"text/html": [
"