{ "cells": [ { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "import json\n", "import boto3\n", "from snowflake.snowpark.session import Session" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "def get_secret(secret_name, region_name=\"us-east-1\"):\n", " session = boto3.session.Session()\n", " client = session.client(\n", " service_name='secretsmanager',\n", " region_name=region_name)\n", " get_secret_value_response = client.get_secret_value(SecretId=secret_name)\n", " get_secret_value_response = json.loads(get_secret_value_response['SecretString'])\n", " return get_secret_value_response" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "creds = get_secret(\"wysde\")\n", "\n", "connection_parameters = {\n", " \"account\": creds[\"SNOWFLAKE_ACCOUNT\"],\n", " \"user\": creds[\"SNOWFLAKE_USERNAME\"],\n", " \"password\": creds[\"SNOWFLAKE_PASSWORD\"],\n", " \"warehouse\": creds[\"SNOWFLAKE_WAREHOUSE\"],\n", " \"role\": creds[\"SNOWFLAKE_ROLE\"],\n", " \"database\": \"sparsh\",\n", " \"schema\": \"madcow\"\n", "}" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Install needed packages\n", "# Be sure check that packages also exist in Acaconda repo for Snowflake\n", "!pip install scipy numpy seaborn pandas statsmodels" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!mkdir -p seeds\n", "!cp -r ../data/*.csv seeds" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "\u001b[0m10:41:41 Running with dbt=1.3.2\n", "\u001b[0m10:41:41 Partial parse save file not found. Starting full parse.\n", "\u001b[0m10:41:43 [\u001b[33mWARNING\u001b[0m]: Configuration paths exist in your dbt_project.yml file which do not apply to any resources.\n", "There are 1 unused configuration paths:\n", "- models.fifapark.example\n", "\n", "\u001b[0m10:41:43 Found 0 models, 0 tests, 0 snapshots, 0 analyses, 303 macros, 0 operations, 3 seed files, 0 sources, 0 exposures, 0 metrics\n", "\u001b[0m10:41:43 \n", "\u001b[0m10:41:48 Concurrency: 1 threads (target='dev')\n", "\u001b[0m10:41:48 \n", "\u001b[0m10:41:48 1 of 3 START seed file MADCOW.goalscorers ...................................... [RUN]\n", "\u001b[0m10:43:04 1 of 3 OK loaded seed file MADCOW.goalscorers .................................. [\u001b[32mINSERT 41008\u001b[0m in 75.78s]\n", "\u001b[0m10:43:04 2 of 3 START seed file MADCOW.results .......................................... [RUN]\n", "\u001b[0m10:44:33 2 of 3 OK loaded seed file MADCOW.results ...................................... [\u001b[32mINSERT 44353\u001b[0m in 89.37s]\n", "\u001b[0m10:44:33 3 of 3 START seed file MADCOW.shootouts ........................................ [RUN]\n", "\u001b[0m10:44:37 3 of 3 OK loaded seed file MADCOW.shootouts .................................... [\u001b[32mINSERT 547\u001b[0m in 3.77s]\n", "\u001b[0m10:44:37 \n", "\u001b[0m10:44:37 Finished running 3 seeds in 0 hours 2 minutes and 53.44 seconds (173.44s).\n", "\u001b[0m10:44:37 \n", "\u001b[0m10:44:37 \u001b[32mCompleted successfully\u001b[0m\n", "\u001b[0m10:44:37 \n", "\u001b[0m10:44:37 Done. PASS=3 WARN=0 ERROR=0 SKIP=0 TOTAL=3\n" ] } ], "source": [ "!dbt seed" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "session = Session.builder.configs(connection_parameters).create()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import pandas as pd\n", "import numpy as np\n", "import seaborn\n", "import statsmodels.api as sm\n", "import statsmodels.formula.api as smf\n", "from scipy.stats import poisson\n", "from snowflake.snowpark.functions import udf" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [], "source": [ "# Let's fetch training data that we have in Snowflake about FIFA games\n", "# Data is limited to games played after 2000 for validity reasons and also because this limits the size of the model\n", "df = session.sql(\"SELECT * FROM RESULTS WHERE DATE > '2000-01-01'\").to_pandas()" ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [], "source": [ "# It seems that score values are casted as strings, let's cast them to floats\n", "df = df[['HOME_TEAM','AWAY_TEAM','HOME_SCORE','AWAY_SCORE']].copy()\n", "df['HOME_SCORE'] = df['HOME_SCORE'].astype(float)\n", "df['AWAY_SCORE'] = df['AWAY_SCORE'].astype(float)\n", "df = df.sample(frac=0.1)" ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [], "source": [ "# Lets build the model using Poisson model\n", "goal_model_data = pd.concat([df[['HOME_TEAM','AWAY_TEAM','HOME_SCORE']].assign(home=1).rename(\n", " columns={'HOME_TEAM':'team', 'AWAY_TEAM':'opponent','HOME_SCORE':'goals'}),\n", " df[['AWAY_TEAM','HOME_TEAM','AWAY_SCORE']].assign(home=0).rename(\n", " columns={'AWAY_TEAM':'team', 'HOME_TEAM':'opponent','AWAY_SCORE':'goals'})])\n", "\n", "model = smf.glm(formula=\"goals ~ home + team + opponent\", data=goal_model_data, \n", " family=sm.families.Poisson()).fit()" ] }, { "cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[Row(status='Stage area MODELSTAGE successfully created.')]" ] }, "execution_count": 16, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Lets import the model to Snowflake stage\n", "session.sql('create or replace stage MODELSTAGE').collect() #Create a model stage if it does not already exist. " ] }, { "cell_type": "code", "execution_count": 18, "metadata": {}, "outputs": [], "source": [ "# Let's build the Snowflake Python UDF using Snowpark @udf command\n", "# Necessary packages are imported in the same command\n", "# This is part is needed only once\n", "\n", "@udf(name='predict_result',\n", " is_permanent = True,\n", " stage_location = '@MODELSTAGE',\n", " replace=True,\n", " session=session,\n", " packages=[\"numpy\", \"pandas\", \"scipy\", \"statsmodels\"])\n", "def simulate_match(homeTeam: str, awayTeam: str) -> float:\n", " home_goals_avg = model.predict(pd.DataFrame(data={'team': homeTeam, \n", " 'opponent': awayTeam,'home':1},\n", " index=[1])).values[0]\n", " away_goals_avg = model.predict(pd.DataFrame(data={'team': awayTeam, \n", " 'opponent': homeTeam,'home':0},\n", " index=[1])).values[0]\n", " team_pred = [[poisson.pmf(i, team_avg) for i in range(0, 10)] for team_avg in [home_goals_avg, away_goals_avg]]\n", " home_win=(np.outer(np.array(team_pred[0]), np.array(team_pred[1])))\n", " result = np.sum(np.tril(home_win, -1))\n", " return result" ] }, { "cell_type": "code", "execution_count": 19, "metadata": {}, "outputs": [], "source": [ "!mkdir -p models" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%writefile models/fifa_predictions.py\n", "def model(dbt, session):\n", " dbt.config(\n", " materialized=\"table\",\n", " packages=[\"pandas\", \"numpy\", \"statsmodels\", \"scipy\", \"joblib\"]\n", " )\n", " goal_model_data = dbt.ref(\"goal_model_data\").to_pandas()\n", "\n", " model = smf.glm(formula=\"goals ~ home + team + opponent\", data=goal_model_data, \n", " family=sm.families.Poisson()).fit()\n", "\n", " session.sql('create or replace stage MODELSTAGE').collect()\n", "\n", " model_file = 'tmp/model.joblib'\n", " dump(model, model_file)\n", " session.file.put(model_file, \"@MODELSTAGE\", overwrite=True)\n", "\n", " return goal_model_data #We need to always return a DF from dbt Python" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "Predict 1st round winners using existing RESULTS Loop through the schedule and populate PREDICT_FOR_HOME_WIN column with value provided by previously build FUNCTION\n", " \n", "Note, full data predicton can take up to 20minutes with X-SMALL warehouse" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%sql\n", "SELECT * FROM SHOOTOUTS LIMIT 10\n", "SELECT * FROM GOALSCORERS LIMIT 10\n", "SELECT * FROM RESULTS LIMIT 10\n", "\n", "CREATE TABLE RESULTS_SMALL AS\n", "SELECT * FROM RESULTS LIMIT 10\n", "\n", "ALTER TABLE RESULTS_SMALL\n", "ADD PREDICT_FOR_HOME_WIN smallint;\n", "\n", "DECLARE\n", " c1 CURSOR FOR SELECT home_team, away_team FROM RESULTS_SMALL;\n", " home_team VARCHAR;\n", " away_team VARCHAR;\n", "BEGIN\n", " FOR record IN c1 DO\n", " home_team := record.home_team;\n", " away_team := record.away_team;\n", " UPDATE RESULTS_SMALL f SET predict_for_home_win = prediction.p \n", " FROM (SELECT predict_result(:home_team, :away_team) p ) AS prediction\n", " WHERE f.home_team = :home_team AND f.away_team = :away_team ;\n", " END FOR;\n", "END;\n", "\n", "SELECT * FROM RESULTS_SMALL LIMIT 10" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%sql\n", "-- 3. Create view to show winners of Group stage\n", "CREATE OR REPLACE VIEW A1_WINNERS_GROUP_STAGE(\n", "\tHOME_TEAM,\n", "\tAWAY_TEAM,\n", "\tWINNER\n", ") as (\n", "SELECT \n", " HOME_TEAM\n", " , AWAY_TEAM\n", " , CASE WHEN PREDICT_FOR_HOME_WIN > 0.5 THEN HOME_TEAM\n", " ELSE AWAY_TEAM END AS WINNER\n", "FROM RESULTS_SMALL\n", ");\n", "\n", "SELECT * FROM RESULTS_SMALL LIMIT 10" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%sql\n", "-- 4. Create view to show Round of 16 game pairs\n", "CREATE OR REPLACE VIEW A2_GAME_PAIRS_ROUND_OF_16(\n", "\tHOME_TEAM,\n", "\tAWAY_TEAM\n", ") as (\n", "WITH BRACKET_WINNERS AS (\n", "SELECT \n", " DISTINCT\n", " winners_group_stage.HOME_TEAM as W_HOME,\n", " winners_group_stage.AWAY_TEAM as W_AWAY,\n", " winners_group_stage.WINNER\n", "FROM A1_WINNERS_GROUP_STAGE winners_group_stage\n", "INNER JOIN RESULTS_SMALL fifa_schedule ON TRIM(fifa_schedule.HOME_TEAM) = TRIM(winners_group_stage.HOME_TEAM) AND TRIM(fifa_schedule.AWAY_TEAM) = TRIM(winners_group_stage.AWAY_TEAM)\n", " GROUP BY W_HOME, W_AWAY, WINNER\n", "), BRACKET_QUALIFIERS AS (\n", " SELECT \n", " WINNER,\n", " COUNT(WINNER) AS WIN_COUNT,\n", " RANK() OVER (ORDER BY COUNT(WINNER) DESC) AS rank \n", " FROM BRACKET_WINNERS\n", " GROUP BY WINNER\n", "), BRACKET_RANKS AS (\n", " SELECT \n", " bracket_qualifiers.WINNER AS COUNTRY\n", " , bracket_qualifiers.WIN_COUNT AS WINS\n", " , CASE WHEN rank = 1 THEN\n", " 1\n", " ELSE 2\n", " END AS RANK_IN_BRACKET\n", " FROM BRACKET_QUALIFIERS bracket_qualifiers\n", " WHERE RANK < 3\n", "), BRACKET_RANK_TO_COUNTRY_SCHEDULE AS (\n", " SELECT \n", " COUNTRY\n", " , REPLACE(RANK_IN_BRACKET, ' ', '') AS RANK_IN_BRACKET\n", " FROM BRACKET_RANKS\n", " ORDER BY RANK_IN_BRACKET\n", "), KNOCKOUT_GAMES AS (SELECT \n", " *\n", " FROM RESULTS_SMALL \n", ") SELECT \n", " bg_home.country AS HOME_TEAM\n", " , bg_away.country AS AWAY_TEAM\n", " FROM KNOCKOUT_GAMES kg \n", " INNER JOIN BRACKET_RANK_TO_COUNTRY_SCHEDULE bg_home ON bg_home.RANK_IN_BRACKET = kg.HOME_TEAM\n", " INNER JOIN BRACKET_RANK_TO_COUNTRY_SCHEDULE bg_away ON bg_away.RANK_IN_BRACKET = kg.AWAY_TEAM\n", ") ;\n", "\n", "SELECT * FROM A2_GAME_PAIRS_ROUND_OF_16 LIMIT 10;" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "env-snow", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.13 (default, Mar 28 2022, 06:16:26) \n[Clang 12.0.0 ]" }, "orig_nbformat": 4, "vscode": { "interpreter": { "hash": "56a1ac3adb378e653a21dff9ebdc18cce0787fb994608cdea40d0604c77313fd" } } }, "nbformat": 4, "nbformat_minor": 2 }