{ "cells": [ { "cell_type": "code", "execution_count": null, "id": "broken-hardwood", "metadata": { "tags": [ "parameters" ] }, "outputs": [], "source": [ "# notebook parameters\n", "\n", "import os\n", "\n", "spark_master = \"local[*]\"\n", "app_name = \"data-summary\"\n", "input_prefix = \"\"\n", "input_file = \"churn-etl\"\n", "output_prefix = \"\"\n", "input_kind = \"parquet\"\n", "driver_memory = '8g'\n", "executor_memory = '8g'" ] }, { "cell_type": "code", "execution_count": null, "id": "eight-leisure", "metadata": {}, "outputs": [], "source": [ "import pyspark\n", "\n", "session = pyspark.sql.SparkSession.builder \\\n", " .master(spark_master) \\\n", " .appName(app_name) \\\n", " .config(\"spark.driver.memory\", driver_memory) \\\n", " .config(\"spark.executor.memory\", executor_memory) \\\n", " .getOrCreate()\n", "session" ] }, { "cell_type": "code", "execution_count": null, "id": "blind-agenda", "metadata": {}, "outputs": [], "source": [ "df = session.read.parquet(\"%s%s.%s\" % (input_prefix, input_file, input_kind))" ] }, { "cell_type": "code", "execution_count": null, "id": "skilled-tablet", "metadata": {}, "outputs": [], "source": [ "df.columns" ] }, { "cell_type": "code", "execution_count": null, "id": "working-astronomy", "metadata": {}, "outputs": [], "source": [ "import pyspark.sql.types as T\n", "import pyspark.sql.functions as F\n", "\n", "string_columns = []\n", "boolean_columns = []\n", "numeric_columns = []\n", "other_columns = []\n", "\n", "def isnumeric(data_type):\n", " numeric_types = [T.ByteType, T.ShortType, T.IntegerType, T.LongType, T.FloatType, T.DoubleType, T.DecimalType]\n", " return any([isinstance(data_type, t) for t in numeric_types])\n", "\n", "\n", "for field in df.schema.fields:\n", " if isinstance(field.dataType, T.StringType):\n", " string_columns.append(field.name)\n", " elif isinstance(field.dataType, T.BooleanType):\n", " boolean_columns.append(field.name)\n", " elif isnumeric(field.dataType):\n", " numeric_columns.append(field.name)\n", " else:\n", " other_columns.append(field.name)" ] }, { "cell_type": "code", "execution_count": null, "id": "contemporary-shuttle", "metadata": {}, "outputs": [], "source": [ "def percent_true(df, cols):\n", " denominator = df.count()\n", " return {col : df.where(F.col(col) == True).count() / denominator for col in cols}" ] }, { "cell_type": "code", "execution_count": null, "id": "organic-fusion", "metadata": {}, "outputs": [], "source": [ "percent_true(df, boolean_columns)" ] }, { "cell_type": "code", "execution_count": null, "id": "based-leone", "metadata": {}, "outputs": [], "source": [ "def approx_cardinalities(df, cols):\n", " from functools import reduce\n", " \n", " counts = df.groupBy(\n", " F.lit(True).alias(\"drop_me\")\n", " ).agg(\n", " F.count('*').alias(\"total\"),\n", " *[F.approx_count_distinct(F.col(c)).alias(c) for c in cols]\n", " ).drop(\"drop_me\").cache()\n", " \n", " result = reduce(lambda l, r: l.unionAll(r), [counts.select(F.lit(c).alias(\"field\"), F.col(c).alias(\"approx_count\")) for c in counts.columns]).collect()\n", " counts.unpersist()\n", " \n", " return dict([(r[0],r[1]) for r in result])\n", "\n", "def likely_unique(counts):\n", " total = counts[\"total\"]\n", " return [k for (k, v) in counts.items() if k != \"total\" and abs(total - v) < total * 0.15]\n", "\n", "def likely_categoricals(counts):\n", " total = counts[\"total\"]\n", " return [k for (k, v) in counts.items() if v < total * 0.15 or v < 128]\n", "\n", "\n" ] }, { "cell_type": "code", "execution_count": null, "id": "danish-vessel", "metadata": {}, "outputs": [], "source": [ "cardinalities = approx_cardinalities(df, string_columns)" ] }, { "cell_type": "code", "execution_count": null, "id": "mineral-congo", "metadata": {}, "outputs": [], "source": [ "cardinalities" ] }, { "cell_type": "code", "execution_count": null, "id": "mexican-protection", "metadata": {}, "outputs": [], "source": [ "likely_unique(cardinalities)" ] }, { "cell_type": "code", "execution_count": null, "id": "worldwide-sheffield", "metadata": {}, "outputs": [], "source": [ "likely_categoricals(cardinalities)" ] }, { "cell_type": "code", "execution_count": null, "id": "stopped-flesh", "metadata": {}, "outputs": [], "source": [ "def unique_values(df, cols):\n", " from functools import reduce\n", " \n", " counts = df.groupBy(\n", " F.lit(True).alias(\"drop_me\")\n", " ).agg(\n", " *[F.array_sort(F.collect_set(F.col(c))).alias(c) for c in cols]\n", " ).drop(\"drop_me\").cache()\n", " \n", " result = reduce(lambda l, r: l.unionAll(r), [counts.select(F.lit(c).alias(\"field\"), F.col(c).alias(\"unique_vals\")) for c in counts.columns]).collect()\n", " counts.unpersist()\n", " \n", " return dict([(r[0],r[1]) for r in result])\n", "\n" ] }, { "cell_type": "code", "execution_count": null, "id": "arctic-firewall", "metadata": {}, "outputs": [], "source": [ "unique_values(df, likely_categoricals(cardinalities))" ] }, { "cell_type": "code", "execution_count": null, "id": "amateur-aluminum", "metadata": {}, "outputs": [], "source": [ "def approx_ecdf(df, cols):\n", " from functools import reduce\n", " \n", " quantiles = [0.0, 0.01, 0.05, 0.1, 0.25, 0.5, 0.75, 0.9, 0.95, 0.99, 1.0]\n", "\n", " qs = df.approxQuantile(cols, quantiles, 0.01)\n", " \n", " result = dict(zip(cols, qs))\n", " return {c: dict(zip(quantiles, vs)) for (c, vs) in result.items()}\n" ] }, { "cell_type": "code", "execution_count": null, "id": "greenhouse-object", "metadata": {}, "outputs": [], "source": [ "approx_ecdf(df, numeric_columns)" ] }, { "cell_type": "code", "execution_count": null, "id": "affiliated-violence", "metadata": {}, "outputs": [], "source": [ "dir(df.schema)" ] }, { "cell_type": "code", "execution_count": null, "id": "smart-motion", "metadata": {}, "outputs": [], "source": [ "df.schema.jsonValue()" ] }, { "cell_type": "code", "execution_count": null, "id": "documentary-output", "metadata": {}, "outputs": [], "source": [ "def gen_summary(df):\n", " summary = {}\n", " \n", " string_cols = []\n", " boolean_cols = []\n", " numeric_cols = []\n", " other_cols = []\n", "\n", " \n", " for field in df.schema.fields:\n", " if isinstance(field.dataType, T.StringType):\n", " string_cols.append(field.name)\n", " elif isinstance(field.dataType, T.BooleanType):\n", " boolean_cols.append(field.name)\n", " elif isnumeric(field.dataType):\n", " numeric_cols.append(field.name)\n", " else:\n", " other_cols.append(field.name)\n", " \n", " cardinalities = approx_cardinalities(df, string_cols)\n", " uniques = likely_unique(cardinalities)\n", " categoricals = unique_values(df, likely_categoricals(cardinalities))\n", " \n", " encoding_struct = {\n", " \"categorical\" : categoricals,\n", " \"numeric\" : numeric_cols + boolean_cols,\n", " \"unique\": uniques\n", " }\n", " \n", " summary[\"schema\"] = df.schema.jsonValue()\n", " summary[\"ecdfs\"] = approx_ecdf(df, numeric_cols)\n", " summary[\"true_percentage\"] = percent_true(df, boolean_cols)\n", " summary[\"encoding\"] = encoding_struct\n", " \n", " return summary" ] }, { "cell_type": "code", "execution_count": null, "id": "neutral-refrigerator", "metadata": {}, "outputs": [], "source": [ "gen_summary(df)" ] }, { "cell_type": "code", "execution_count": null, "id": "simplified-somewhere", "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "celltoolbar": "Edit Metadata", "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.7" } }, "nbformat": 4, "nbformat_minor": 5 }