{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "metrics_file = \"metrics/app-20201104211738-0276\"" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from glom import glom" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import json\n", "from collections import defaultdict\n", "\n", "evset = defaultdict(lambda: list())\n", "\n", "def dequalify_event(ev):\n", " if \".\" in ev:\n", " idx = ev.rfind(\".\")\n", " return ev[idx+1:]\n", " return ev\n", "\n", "with open(metrics_file, \"r\") as mf:\n", " for line in mf:\n", " js = json.loads(line)\n", " event = dequalify_event(glom(json.loads(line), \"Event\"))\n", " evset[event].append(js)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "list(evset.keys())" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "job0 = evset['SparkListenerJobStart'][0]\n", "job0" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "jobs = evset[\"SparkListenerJobStart\"]" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "len(jobs)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "spec = {\n", " 'job_ids': ['Job ID'],\n", " 'stage_ids': [\n", " ('Stage Infos', \n", " [\n", " {\n", " 'stage_id': 'Stage ID', \n", " 'parents': 'Parent IDs'\n", " }\n", " ]\n", " )\n", " ]\n", "}\n", "\n", "glom(jobs, spec)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "spec = [\n", " {\n", " 'job_id': 'Job ID',\n", " 'stages': ('Stage Infos', \n", " [\n", " {\n", " 'stage_id': 'Stage ID', \n", " 'parents': 'Parent IDs'\n", " }\n", " ]\n", " )\n", " }\n", "]\n", "\n", "glom(jobs, spec)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from glom import S, T, glom, Assign, Spec\n", "\n", "spec = [(\n", " S(job_id=T['Job ID']),\n", " {\n", " 'stage': ('Stage Infos', \n", " [\n", " {\n", " 'stage_id': 'Stage ID', \n", " 'job_id': S['job_id'],\n", " 'parents': 'Parent IDs'\n", " }\n", " ]\n", " )\n", " }\n", ")]\n", "\n", "glom(jobs, spec)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from glom import flatten\n", "\n", "spec = [(\n", " S(job_id=T['Job ID']),\n", " ('Stage Infos', \n", " [\n", " {\n", " 'stage_id': 'Stage ID', \n", " 'attempt_id': 'Stage Attempt ID',\n", " 'name': 'Stage Name',\n", " 'details': 'Details',\n", " 'accumulables': 'Accumulables',\n", " 'job_id': S['job_id'],\n", " 'parents': 'Parent IDs'\n", " }\n", " ]\n", " )\n", ")]\n", "\n", "flatten(glom(jobs, spec))[-1]" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from glom import flatten, merge\n", "\n", "spec = [\n", " ('Stage Infos', \n", " [\n", " (S(stage_id=T['Stage ID'], rdd_info=T['RDD Info']),\n", " merge([{'stage_id': S['stage_id']}, {'rdd_info': S['rdd_info']}])\n", " )\n", " ]\n", " )\n", "]\n", "glom(jobs, spec)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from glom import flatten, merge\n", "\n", "spec = [\n", " ('Stage Infos', \n", " [\n", " (S(stage_id=T['Stage ID'], rdd_info=T['RDD Info']),\n", " [merge([{'stage_id': S['stage_id']}, rddi]) for rddi in S['rdd_info']])\n", " ]\n", " )\n", "]\n", "glom(jobs, spec)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from glom import flatten, merge\n", "\n", "spec = [\n", " ('Stage Infos', \n", " [\n", " (S(stage_id=T['Stage ID']),\n", " [rddi.update({'stage_id': S['stage_id']}) for rddi in T['RDD Info']]\n", " )\n", " \n", " ]\n", " )\n", "]\n", "glom(jobs, spec)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from glom import flatten, Merge, Iter\n", "\n", "spec = [(\n", " S(job_id=T['Job ID'], properties=T['Properties']),\n", " ('Properties', \n", " (\n", " T.items(),\n", " Iter().map({'job_id': S['job_id'], 'property': T[0], 'value': T[1]}).all()\n", " )\n", " )\n", ")]\n", "\n", "flatten(glom(jobs, spec))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "help({'a': 1, 'b': 2}.update)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.8" } }, "nbformat": 4, "nbformat_minor": 4 }