{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": { "tags": [ "parameters" ] }, "outputs": [], "source": [ "# replace this with a Spark history log of your own or parameterize with Papermill!\n", "\n", "metrics_file = \"metrics/application_1601392010735_0030\"\n", "\n", "wide_output_file = \"output.db\"\n", "output_file = \"output.db\"\n", "\n", "transform_structure_prefixes = True\n", "use_structure_prefixes = False\n", "driver_memory = '8g'\n", "executor_memory = '8g'\n", "master = 'local[*]'\n", "\n", "debug_me = False\n", "interactive = True\n", "store_parquet = True" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import pandas as pd\n", "\n", "if debug_me:\n", " pd.options.display.max_columns = None\n", " pd.options.display.max_rows = None" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import pyspark\n", "import pyspark.sql.functions as F\n", "from pyspark import SparkConf\n", "\n", "import json\n", "import eventlog\n", "\n", "spark = pyspark.sql.SparkSession.\\\n", " builder.\\\n", " master(master).\\\n", " config(\"spark.ui.showConsoleProgress\", False).\\\n", " config(\"spark.driver.memory\", driver_memory).\\\n", " config(\"spark.executor.memory\", executor_memory).\\\n", " getOrCreate()\n", "\n", "sc = spark.sparkContext\n", "sc.setLogLevel(\"OFF\")\n", "\n", "logger = sc._jvm.org.apache.log4j\n", "logger.LogManager.getLogger(\"org\").setLevel(logger.Level.ERROR)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "metrics = spark.read.json(metrics_file)\n", "eventlog.init_eventlog(metrics, use_structure_prefixes=use_structure_prefixes, transform_structure_prefixes=transform_structure_prefixes, debug_me=debug_me)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from eventlog import *\n", "\n", "app_id, app_name = metrics.select(\"App ID\", \"App Name\").dropna().collect()[0]" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "jobs_to_stages = all_stage_meta(metrics).select('Job ID', 'Stage ID').distinct()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "plan_nodes, accumulable_nodes = plan_dfs(metrics)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "metric_meta = spark.createDataFrame(data=eventlog.metric_metas)\n", "task_metrics = tidy_tasks(metrics)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "tasks_to_plans = with_appmeta(task_metrics.join(accumulable_nodes, \"accumulatorId\").join(plan_nodes, \"plan_node\"))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import altair as alt\n", "alt.data_transformers.disable_max_rows()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Shuffle metrics" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "task_byte_metrics = tidy_tasks(metrics).join(\n", " metric_meta.withColumnRenamed(\"MetricName\", \"Metric Name\"), \n", " \"Metric Name\", \n", " how=\"outer\"\n", ").where(F.col(\"unit\") == \"bytes\").groupBy(\"Stage ID\", \"Task ID\", \"Metric Name\").sum(\"Metric Value\").withColumnRenamed(\"sum(Metric Value)\", \"Metric Value\").toPandas()\n", "\n", "task_shuffle_metrics = task_byte_metrics[task_byte_metrics['Metric Name'].str.contains('internal.metrics.shuffle')].sort_values('Task ID')\n", "shuffle_replacer = lambda match: \"Shuffle %s\" % match.group('metric')\n", "task_shuffle_metrics['Metric Name'] = task_shuffle_metrics['Metric Name'].str.replace(r'internal\\.metrics\\.shuffle\\.(?Pread|write).(?P.*)$', shuffle_replacer)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": false }, "outputs": [], "source": [ "stage_and_task_charts(task_shuffle_metrics, \"bytes\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Executor time metrics" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [], "source": [ "task_metrics = tidy_tasks(metrics).join(\n", " metric_meta.withColumnRenamed(\"MetricName\", \"Metric Name\"), \n", " \"Metric Name\", \n", " how=\"outer\"\n", ").withColumn(\"Metric Value\", F.col(\"Metric Value\").cast(\"float\"))\n", "\n", "task_ms_metrics = task_metrics.where(F.col(\"unit\") == \"ms\").groupBy(\"Stage ID\", \"Task ID\", \"Metric Name\").sum(\"Metric Value\").withColumnRenamed(\"sum(Metric Value)\", \"Metric Value\")\n", "task_ns_metrics = task_metrics.where(F.col(\"unit\") == \"ns\").groupBy(\"Stage ID\", \"Task ID\", \"Metric Name\").sum(\"Metric Value\").withColumnRenamed(\"sum(Metric Value)\", \"Metric Value\").withColumn(\"Metric Value\", F.col(\"Metric Value\").cast(\"float\") / 1000000)\n", "\n", "task_time_metrics = task_ms_metrics.union(task_ns_metrics).toPandas()\n", "\n", "stage_and_task_charts(task_time_metrics[~task_time_metrics[\"Metric Name\"].str.contains(\"internal.metrics\")])" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "stage_and_task_charts(\n", " task_time_metrics[task_time_metrics[\"Metric Name\"].str.contains(\"CPU\") | \n", " task_time_metrics[\"Metric Name\"].str.contains(\"GPU\") |\n", " task_time_metrics[\"Metric Name\"].str.contains(\"JVM GC\")\n", " ])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Plotting wall-clock vs CPU time with layered charts\n", "\n", "This gives us some sense of the relationship between CPU time and system time." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "cputime = task_time_metrics[task_time_metrics['Metric Name'].str.contains('executorCpuTime')]\n", "runtime = task_time_metrics[task_time_metrics['Metric Name'].str.contains('executorRunTime')]\n", "layered_stage_and_task_charts([runtime, cputime])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Memory and spill metrics" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": false }, "outputs": [], "source": [ "stage_and_task_charts(task_byte_metrics[task_byte_metrics['Metric Name'].str.contains(' memory') | task_byte_metrics['Metric Name'].str.contains('size') | task_byte_metrics['Metric Name'].str.contains('Bytes Spilled')], \"bytes\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "stage_and_task_charts(task_byte_metrics, \"bytes\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Task metrics and metadata" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "task_metrics_table, task_meta_table = split_metrics(task_metrics)\n", "task_all_spark = task_metrics_table.join(task_meta_table, [\"Task ID\", \"Stage ID\", \"Application ID\", \"Application Name\"]).join(jobs_to_stages, \"Stage ID\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Query plan node metrics" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "plan_metrics = plan_nodes.join(accumulable_nodes, [\"plan_node\", \"Application ID\", \"Application Name\"]).join(task_all_spark, [\"accumulatorId\", \"Application ID\", \"Application Name\"])\n", "\n", "plan_metrics_full = plan_metrics_rollup(plan_metrics)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "# Configuration information" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "configs = meltconfig(metrics, [\"SparkListenerEnvironmentUpdate\",\"SparkListenerJobStart\"])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Exporting tabular data" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import sqlite3\n", "conn = sqlite3.Connection(output_file)\n", "wide_conn = sqlite3.Connection(wide_output_file)\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "if wide_output_file != output_file:\n", " with_appmeta(configs).toPandas().to_sql(\"configs\", wide_conn, index=False, if_exists='append')\n", " \n", "with_appmeta(configs).toPandas().to_sql(\"configs\", conn, index=False, if_exists='append')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "task_metrics_table.join(jobs_to_stages, \"Stage ID\").toPandas().to_sql('task_metrics', conn, index=False, if_exists='append')\n", "task_meta_table.join(jobs_to_stages, \"Stage ID\").toPandas().to_sql('task_meta', conn, index=False, if_exists='append')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "task_all = task_all_spark.toPandas()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "project_columns = ['Application ID', 'Application Name', 'Attempt', 'Executor ID', 'Failed', 'Finish Time', 'Getting Result Time', 'Host', 'Index', 'Killed', 'Launch Time', 'Locality', 'Metric Name', 'Metric Value', 'Speculative', 'Job ID', 'Stage ID', 'Task ID']\n", "\n", "index_columns = ['Application ID','Application Name','Attempt','Executor ID','Failed','Finish Time','Getting Result Time','Host','Index','Killed','Launch Time','Locality','Speculative','Job ID', 'Stage ID','Task ID']\n", "\n", "wide_tasks = task_all[\n", " project_columns\n", "].pivot_table(index=index_columns, columns=\"Metric Name\", values=\"Metric Value\").reset_index().rename_axis(None, axis=1)\n", "\n", "# wide_tasks.to_sql('wide_tasks', conn, index=False, if_exists='append')\n", "\n", "if wide_output_file != output_file:\n", " safe_write(wide_tasks, 'wide_tasks', wide_conn, index=False)\n", "\n", "safe_write(wide_tasks, 'wide_tasks', conn, index=False)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Exporting query plans " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "ppn = plan_nodes.toPandas()\n", "pan = accumulable_nodes.toPandas()\n", "\n", "ppn.to_sql('plans', conn, index=False, if_exists='append')\n", "pan.to_sql('accumulables', conn, index=False, if_exists='append')\n", "\n", "if wide_output_file != output_file:\n", " ppn.to_sql('plans', wide_conn, index=False, if_exists='append')\n", " pan.to_sql('accumulables', wide_conn, index=False, if_exists='append')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "metric_names = metric_names_for(plan_metrics)\n", "wide_plan_metrics = plan_metrics_full.groupBy([\"plan_node\", \"accumulatorId\", \"Application ID\", \"Application Name\", \"Task ID\", \"Stage ID\", \"Job ID\"]).pivot(\"Metric Name\", metric_names).agg(F.sum(\"Metric Value\"))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "pmf = plan_metrics_full.toPandas()\n", "wpm = wide_plan_metrics.toPandas()\n", "\n", "if wide_output_file != output_file:\n", " safe_write(pmf, 'plan_metrics', wide_conn, index=False)\n", " safe_write(wpm, 'wide_plans', wide_conn, index=False)\n", " \n", "safe_write(pmf, 'plan_metrics', conn, index=False)\n", "safe_write(wpm, 'wide_plans', conn, index=False)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Miscellaneous metadata" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "for table in [job_info, stage_meta, stage_parents, stage_rddmeta, stage_rddparents]:\n", " raw = table(metrics)\n", " the_df = with_appmeta(raw).toPandas()\n", " the_df.to_sql(table.__name__, conn, index=False, if_exists='append')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "if wide_output_file != output_file:\n", " wide_conn.execute('CREATE INDEX IF NOT EXISTS accumulable_apps on accumulables ([Application ID])')\n", " wide_conn.execute('CREATE INDEX IF NOT EXISTS plan_apps on plans ([Application ID])')\n", " wide_conn.execute('CREATE INDEX IF NOT EXISTS plan_metrics_apps on plan_metrics ([Application ID])')\n", " wide_conn.execute('CREATE INDEX IF NOT EXISTS wide_tasks_task on wide_tasks ([Application ID], [Task ID])')\n", " wide_conn.execute('CREATE INDEX IF NOT EXISTS wide_tasks_app on wide_tasks ([Application ID])')\n", " \n", "conn.execute('CREATE INDEX IF NOT EXISTS wide_tasks_task on wide_tasks ([Application ID], [Task ID])')\n", "conn.execute('CREATE INDEX IF NOT EXISTS wide_tasks_app on wide_tasks ([Application ID])')\n", "conn.execute('CREATE INDEX IF NOT EXISTS accumulable_apps on accumulables ([Application ID])')\n", "conn.execute('CREATE INDEX IF NOT EXISTS task_metric_apps on task_metrics ([Application ID])')\n", "conn.execute('CREATE INDEX IF NOT EXISTS task_metric_names on task_metrics ([Metric Name])')\n", "conn.execute('CREATE INDEX IF NOT EXISTS task_metric_kinds on task_metrics ([kind])')\n", "conn.execute('CREATE INDEX IF NOT EXISTS task_metric_agg on task_metrics ([Task ID], [Metric Name])')\n", "conn.execute('CREATE INDEX IF NOT EXISTS stage_metric_agg on task_metrics ([Stage ID], [Metric Name])')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "conn.close()\n", "wide_conn.close()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "if not interactive:\n", " spark.stop()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.8" } }, "nbformat": 4, "nbformat_minor": 4 }