In [None]:
# notebook parameters

import os

spark_master = "local[*]"
app_name = "data-summary"
input_prefix = ""
input_file = "churn-etl"
output_prefix = ""
input_kind = "parquet"
driver_memory = '8g'
executor_memory = '8g'

In [None]:
import pyspark

session = pyspark.sql.SparkSession.builder \
 .master(spark_master) \
 .appName(app_name) \
 .config("spark.driver.memory", driver_memory) \
 .config("spark.executor.memory", executor_memory) \
 .getOrCreate()
session

In [None]:
df = session.read.parquet("%s%s.%s" % (input_prefix, input_file, input_kind))

In [None]:
df.columns

In [None]:
import pyspark.sql.types as T
import pyspark.sql.functions as F

string_columns = []
boolean_columns = []
numeric_columns = []
other_columns = []

def isnumeric(data_type):
 numeric_types = [T.ByteType, T.ShortType, T.IntegerType, T.LongType, T.FloatType, T.DoubleType, T.DecimalType]
 return any([isinstance(data_type, t) for t in numeric_types])


for field in df.schema.fields:
 if isinstance(field.dataType, T.StringType):
 string_columns.append(field.name)
 elif isinstance(field.dataType, T.BooleanType):
 boolean_columns.append(field.name)
 elif isnumeric(field.dataType):
 numeric_columns.append(field.name)
 else:
 other_columns.append(field.name)

In [None]:
def percent_true(df, cols):
 denominator = df.count()
 return {col : df.where(F.col(col) == True).count() / denominator for col in cols}

In [None]:
percent_true(df, boolean_columns)

In [None]:
def approx_cardinalities(df, cols):
 from functools import reduce
 
 counts = df.groupBy(
 F.lit(True).alias("drop_me")
 ).agg(
 F.count('*').alias("total"),
 *[F.approx_count_distinct(F.col(c)).alias(c) for c in cols]
 ).drop("drop_me").cache()
 
 result = reduce(lambda l, r: l.unionAll(r), [counts.select(F.lit(c).alias("field"), F.col(c).alias("approx_count")) for c in counts.columns]).collect()
 counts.unpersist()
 
 return dict([(r[0],r[1]) for r in result])

def likely_unique(counts):
 total = counts["total"]
 return [k for (k, v) in counts.items() if k != "total" and abs(total - v) < total * 0.15]

def likely_categoricals(counts):
 total = counts["total"]
 return [k for (k, v) in counts.items() if v < total * 0.15 or v < 128]




In [None]:
cardinalities = approx_cardinalities(df, string_columns)

In [None]:
cardinalities

In [None]:
likely_unique(cardinalities)

In [None]:
likely_categoricals(cardinalities)

In [None]:
def unique_values(df, cols):
 from functools import reduce
 
 counts = df.groupBy(
 F.lit(True).alias("drop_me")
 ).agg(
 *[F.array_sort(F.collect_set(F.col(c))).alias(c) for c in cols]
 ).drop("drop_me").cache()
 
 result = reduce(lambda l, r: l.unionAll(r), [counts.select(F.lit(c).alias("field"), F.col(c).alias("unique_vals")) for c in counts.columns]).collect()
 counts.unpersist()
 
 return dict([(r[0],r[1]) for r in result])



In [None]:
unique_values(df, likely_categoricals(cardinalities))

In [None]:
def approx_ecdf(df, cols):
 from functools import reduce
 
 quantiles = [0.0, 0.01, 0.05, 0.1, 0.25, 0.5, 0.75, 0.9, 0.95, 0.99, 1.0]

 qs = df.approxQuantile(cols, quantiles, 0.01)
 
 result = dict(zip(cols, qs))
 return {c: dict(zip(quantiles, vs)) for (c, vs) in result.items()}


In [None]:
approx_ecdf(df, numeric_columns)

In [None]:
dir(df.schema)

In [None]:
df.schema.jsonValue()

In [None]:
def gen_summary(df):
 summary = {}
 
 string_cols = []
 boolean_cols = []
 numeric_cols = []
 other_cols = []

 
 for field in df.schema.fields:
 if isinstance(field.dataType, T.StringType):
 string_cols.append(field.name)
 elif isinstance(field.dataType, T.BooleanType):
 boolean_cols.append(field.name)
 elif isnumeric(field.dataType):
 numeric_cols.append(field.name)
 else:
 other_cols.append(field.name)
 
 cardinalities = approx_cardinalities(df, string_cols)
 uniques = likely_unique(cardinalities)
 categoricals = unique_values(df, likely_categoricals(cardinalities))
 
 encoding_struct = {
 "categorical" : categoricals,
 "numeric" : numeric_cols + boolean_cols,
 "unique": uniques
 }
 
 summary["schema"] = df.schema.jsonValue()
 summary["ecdfs"] = approx_ecdf(df, numeric_cols)
 summary["true_percentage"] = percent_true(df, boolean_cols)
 summary["encoding"] = encoding_struct
 
 return summary

In [None]:
gen_summary(df)