{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Aggregate the stream data\n", "\n", "**Purpose of the notebook:** The purpose of this notebook is to load and aggregate the stream data every three second (try to approximate the live score). \n", "\n", "**Input of the notebook:** The input data are ingested streamed data.\n", "\n", "**Output of the notebook:** The output of this notebook is the output text, e.g livescore with live pitch.\n", "\n", "**Some notes:**:\n", "* The `spark.DataFrame` will always have notation `_df` at the end of the name of variable\n", "* the `pandas.DataFrame` will always have notation `_pd` at the end of the name of variable" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Set the environment\n", "\n", "In this part, the environment is set. The set up is:\n", "\n", "* Loading the necessary python modules and helper functions\n", "* Setting the path to data and metadata\n", "* Initialize the spark session\n", "\n", "Other config, such as `spark` application name, path, where the final `delta` table will be saved, etc. are defined in `config.yaml` file" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Import modules" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "from pyspark.sql import SparkSession\n", "from utils import read_config, init_spark_session,plot_pitch,check_throw_corner\n", "import time\n", "from datetime import datetime\n", "import matplotlib.pyplot as plt\n", "import os\n", "from bs4 import BeautifulSoup" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Read config" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "config = read_config()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Set env variables" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "spark_app_name = config['spark_application']['spark_app_stream_name']\n", "\n", "file_format = config['streaming']['format']\n", "file_dir = config['streaming']['file_dir']\n", "\n", "agg_time = config['streaming_aggregation']['aggregating_time_s']\n", "wait_time = config['streaming_aggregation']['wait_for_agg_s']\n", "\n", "meta_data_path = \"/home/tomas/Personal_projects/Aston_Villa/data/g1059783_Metadata.xml\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Init spark session" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "22/07/18 23:20:30 WARN Utils: Your hostname, tomas-Yoga-Slim-7-Pro-14ACH5-O resolves to a loopback address: 127.0.1.1; using 192.168.0.53 instead (on interface wlp1s0)\n", "22/07/18 23:20:30 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address\n", "WARNING: An illegal reflective access operation has occurred\n", "WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/opt/spark/jars/spark-unsafe_2.12-3.2.0.jar) to constructor java.nio.DirectByteBuffer(long,int)\n", "WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform\n", "WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations\n", "WARNING: All illegal access operations will be denied in a future release\n", "Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties\n", "Setting default log level to \"WARN\".\n", "To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n", "22/07/18 23:20:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n", "22/07/18 23:20:33 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.\n", "22/07/18 23:20:33 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.\n", "22/07/18 23:20:33 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.\n" ] } ], "source": [ "spark = init_spark_session(spark_app_name)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Read the metadata" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Match date: 2019-10-19\n", "Field dimension: (105.16, 67.97)\n" ] } ], "source": [ "with open(meta_data_path,'r') as f:\n", " metadata = f.read()\n", "\n", "match_metadata = BeautifulSoup(metadata,'xml')\n", "\n", "metadata_match_data = match_metadata.find('match').get('dtDate').split(' ')[0]\n", "field_x = float(match_metadata.find('match').get('fPitchXSizeMeters'))\n", "field_y = float(match_metadata.find('match').get('fPitchYSizeMeters'))\n", "metadata_field_dim = (field_x,field_y)\n", "\n", "print(f\"Match date: {metadata_match_data}\")\n", "print(f\"Field dimension: {metadata_field_dim}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Aggregate stream data" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "now = datetime.now()\n", "delta = 0\n", "init_ball_status = 'Alive'\n", "init_ball_position = False # ball is not in the penalty box at the beginning\n", "init_field_position = True\n", "field_dimen = metadata_field_dim\n", "#fig,ax = plot_pitch(field_dimen)\n", "\n", "xy_init = (0,0)\n", "\n", "if os.path.isdir(file_dir):\n", " \n", " while delta < agg_time:\n", "\n", " # ax.plot(float(xy_init[0]), float(xy_init[1]), 'ro')\n", " # fig.show()\n", "\n", " time.sleep(wait_time)\n", " df = (\n", " spark.read.format(file_format).load(file_dir)\n", " )\n", "\n", " df_pd = df.toPandas()\n", " df_pd = df_pd.sort_values(\"wallClock\")\n", " df_pd = df_pd.tail(1)\n", " \n", " alive = df_pd['ballStatus'].values[0]\n", " box_position = df_pd['ballInsideBox'].values[0]\n", " xy_position = df_pd['ballPosition'].values[0]\n", " ts= df_pd['match_timestamp'].values[0]\n", " field_position = df_pd['ballInsideField'].values[0]\n", " team_last = df_pd['ballTeam'].values[0]\n", "\n", " if alive != init_ball_status:\n", " text = f'Timestamp: {ts}; Ball change status from {init_ball_status} to {alive}'\n", " print(text)\n", " \n", " if box_position != init_ball_position:\n", " if box_position:\n", " text = f'Timestamp: {ts}; Action is coming! Ball is going to inside penalty box!; Position: {xy_position}'\n", " print(text)\n", " else:\n", " text = f'Timestamp: {ts}; There is no more danger! Ball is going outside the penalty box!; Position: {xy_position}'\n", " print(text)\n", " \n", " if field_position != init_field_position:\n", " if field_position:\n", " text = f'Timestamp: {ts}; Ball is back on the field!; Position: {xy_position}'\n", " print(text)\n", " else:\n", "\n", " pos,adjust_xy_pos = check_throw_corner(field_dimen=field_dimen, ball_pos=xy_position)\n", " team = 'home team' if team_last == 'A' else 'away team'\n", " throw_corner = f'{pos} for {team}'\n", " text = f'Timestamp: {ts}; Ball is outside of the field!; {throw_corner}; Position: {xy_position}'\n", " print(text)\n", " \n", " # ax.plot(float(xy_init[0]), float(xy_init[1]), color = 'mediumseagreen', marker='o', mec = 'mediumseagreen')\n", " # fig.show()\n", "\n", " then = datetime.now()\n", " delta = (then - now).seconds\n", " init_ball_position = box_position\n", " init_field_position = field_position\n", " init_ball_status = alive\n", " xy_init = xy_position " ] } ], "metadata": { "interpreter": { "hash": "31f2aee4e71d21fbe5cf8b01ff0e069b9275f58929596ceb00d14d90e3e16cd6" }, "kernelspec": { "display_name": "Python 3.8.10 64-bit", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.10" }, "orig_nbformat": 4 }, "nbformat": 4, "nbformat_minor": 2 }