{ "cells": [ { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "%load_ext autoreload\n", "%autoreload 2" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ " \n", "import sys\n", "sys.path.append(\"..\")" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "C:\\Users\\argenisleon\\Anaconda3\\lib\\site-packages\\dask\\config.py:161: YAMLLoadWarning: calling yaml.load() without Loader=... is deprecated, as the default Loader is unsafe. Please read https://msg.pyyaml.org/load for full details.\n", " data = yaml.load(f.read()) or {}\n", "C:\\Users\\argenisleon\\Anaconda3\\lib\\site-packages\\statsmodels\\compat\\pandas.py:49: FutureWarning: The Panel class is removed from pandas. Accessing it from the top-level namespace will also be removed in the next version\n", " data_klasses = (pandas.Series, pandas.DataFrame, pandas.Panel)\n" ] } ], "source": [ "from optimus import Optimus" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "op = Optimus(\"dask\")" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "data": { "text/html": [ "\n", "\n", "\n", "\n", "\n", "
\n", "

Client

\n", "\n", "
\n", "

Cluster

\n", "
    \n", "
  • Workers: 4
  • \n", "
  • Cores: 8
  • \n", "
  • Memory: 17.06 GB
  • \n", "
\n", "
" ], "text/plain": [ "" ] }, "execution_count": 5, "metadata": {}, "output_type": "execute_result" } ], "source": [ "op.client" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [], "source": [ "# !pip install prefect" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [], "source": [ "import datetime\n", "import os\n", "\n", "import prefect\n", "from prefect import task\n", "from prefect.engine.signals import SKIP\n", "from prefect.tasks.shell import ShellTask" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
INCIDENT_NUMBEROFFENSE_CODEOFFENSE_CODE_GROUPOFFENSE_DESCRIPTIONDISTRICTREPORTING_AREASHOOTINGOCCURRED_ON_DATEMONTHDAY_OF_WEEKHOURUCR_PARTSTREETLatLongLocationyear_0year_1year_2
0I182070945619LarcenyLARCENY ALL OTHERSD14808NaN2018-09-02 13:00:009Sunday13Part OneLINCOLN ST42.357791-71.139371(42.35779134, -71.13937053)20180902 13:00:00
1I1820709431402VandalismVANDALISMC11347NaN2018-08-21 00:00:008Tuesday0Part TwoHECLA ST42.306821-71.060300(42.30682138, -71.06030035)20180821 00:00:00
2I1820709413410TowedTOWED MOTOR VEHICLED4151NaN2018-09-03 19:27:009Monday19Part ThreeCAZENOVE ST42.346589-71.072429(42.34658879, -71.07242943)20180903 19:27:00
3I1820709403114Investigate PropertyINVESTIGATE PROPERTYD4272NaN2018-09-03 21:16:009Monday21Part ThreeNEWCOMB ST42.334182-71.078664(42.33418175, -71.07866441)20180903 21:16:00
4I1820709383114Investigate PropertyINVESTIGATE PROPERTYB3421NaN2018-09-03 21:05:009Monday21Part ThreeDELHI ST42.275365-71.090361(42.27536542, -71.09036101)20180903 21:05:00
............................................................
319068I050310906-003125Warrant ArrestsWARRANT ARRESTD4285NaN2016-06-05 17:25:006Sunday17Part ThreeCOVENTRY ST42.336951-71.085748(42.33695098, -71.08574813)20160605 17:25:00
319069I030217815-08111HomicideMURDER, NON-NEGLIGIENT MANSLAUGHTERE18520NaN2015-07-09 13:38:007Thursday13Part OneRIVER ST42.255926-71.123172(42.25592648, -71.12317207)20150709 13:38:00
319070I030217815-083125Warrant ArrestsWARRANT ARRESTE18520NaN2015-07-09 13:38:007Thursday13Part ThreeRIVER ST42.255926-71.123172(42.25592648, -71.12317207)20150709 13:38:00
319071I010370257-003125Warrant ArrestsWARRANT ARRESTE13569NaN2016-05-31 19:35:005Tuesday19Part ThreeNEW WASHINGTON ST42.302333-71.111565(42.30233307, -71.11156487)20160531 19:35:00
3190721420525503125Warrant ArrestsWARRANT ARRESTD4903NaN2015-06-22 00:12:006Monday0Part ThreeWASHINGTON ST42.333839-71.080290(42.33383935, -71.08029038)20150622 00:12:00
\n", "

319073 rows × 19 columns

\n", "
" ], "text/plain": [ " INCIDENT_NUMBER OFFENSE_CODE OFFENSE_CODE_GROUP \\\n", "0 I182070945 619 Larceny \n", "1 I182070943 1402 Vandalism \n", "2 I182070941 3410 Towed \n", "3 I182070940 3114 Investigate Property \n", "4 I182070938 3114 Investigate Property \n", "... ... ... ... \n", "319068 I050310906-00 3125 Warrant Arrests \n", "319069 I030217815-08 111 Homicide \n", "319070 I030217815-08 3125 Warrant Arrests \n", "319071 I010370257-00 3125 Warrant Arrests \n", "319072 142052550 3125 Warrant Arrests \n", "\n", " OFFENSE_DESCRIPTION DISTRICT REPORTING_AREA SHOOTING \\\n", "0 LARCENY ALL OTHERS D14 808 NaN \n", "1 VANDALISM C11 347 NaN \n", "2 TOWED MOTOR VEHICLE D4 151 NaN \n", "3 INVESTIGATE PROPERTY D4 272 NaN \n", "4 INVESTIGATE PROPERTY B3 421 NaN \n", "... ... ... ... ... \n", "319068 WARRANT ARREST D4 285 NaN \n", "319069 MURDER, NON-NEGLIGIENT MANSLAUGHTER E18 520 NaN \n", "319070 WARRANT ARREST E18 520 NaN \n", "319071 WARRANT ARREST E13 569 NaN \n", "319072 WARRANT ARREST D4 903 NaN \n", "\n", " OCCURRED_ON_DATE MONTH DAY_OF_WEEK HOUR UCR_PART \\\n", "0 2018-09-02 13:00:00 9 Sunday 13 Part One \n", "1 2018-08-21 00:00:00 8 Tuesday 0 Part Two \n", "2 2018-09-03 19:27:00 9 Monday 19 Part Three \n", "3 2018-09-03 21:16:00 9 Monday 21 Part Three \n", "4 2018-09-03 21:05:00 9 Monday 21 Part Three \n", "... ... ... ... ... ... \n", "319068 2016-06-05 17:25:00 6 Sunday 17 Part Three \n", "319069 2015-07-09 13:38:00 7 Thursday 13 Part One \n", "319070 2015-07-09 13:38:00 7 Thursday 13 Part Three \n", "319071 2016-05-31 19:35:00 5 Tuesday 19 Part Three \n", "319072 2015-06-22 00:12:00 6 Monday 0 Part Three \n", "\n", " STREET Lat Long Location \\\n", "0 LINCOLN ST 42.357791 -71.139371 (42.35779134, -71.13937053) \n", "1 HECLA ST 42.306821 -71.060300 (42.30682138, -71.06030035) \n", "2 CAZENOVE ST 42.346589 -71.072429 (42.34658879, -71.07242943) \n", "3 NEWCOMB ST 42.334182 -71.078664 (42.33418175, -71.07866441) \n", "4 DELHI ST 42.275365 -71.090361 (42.27536542, -71.09036101) \n", "... ... ... ... ... \n", "319068 COVENTRY ST 42.336951 -71.085748 (42.33695098, -71.08574813) \n", "319069 RIVER ST 42.255926 -71.123172 (42.25592648, -71.12317207) \n", "319070 RIVER ST 42.255926 -71.123172 (42.25592648, -71.12317207) \n", "319071 NEW WASHINGTON ST 42.302333 -71.111565 (42.30233307, -71.11156487) \n", "319072 WASHINGTON ST 42.333839 -71.080290 (42.33383935, -71.08029038) \n", "\n", " year_0 year_1 year_2 \n", "0 2018 09 02 13:00:00 \n", "1 2018 08 21 00:00:00 \n", "2 2018 09 03 19:27:00 \n", "3 2018 09 03 21:16:00 \n", "4 2018 09 03 21:05:00 \n", "... ... ... ... \n", "319068 2016 06 05 17:25:00 \n", "319069 2015 07 09 13:38:00 \n", "319070 2015 07 09 13:38:00 \n", "319071 2016 05 31 19:35:00 \n", "319072 2015 06 22 00:12:00 \n", "\n", "[319073 rows x 19 columns]" ] }, "execution_count": 8, "metadata": {}, "output_type": "execute_result" } ], "source": [ "file = \"data/crime.csv\"\n", "df = op.load.csv(file, sep=\",\", error_bad_lines=False, header=True, null_value=\"null\", infer_schema='true', charset=\"latin1\").persist()\n", "df = df.cols.unnest(\"OCCURRED_ON_DATE\", separator=\"-\", splits=3, output_cols=\"year\", drop=False)\n", "df = df.cols.drop(\"YEAR\")\n", "df.compute()\n" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [], "source": [ "from prefect import Parameter, Flow\n", "\n", "@task\n", "def run_job():\n", "# print(1)\n", " logger = prefect.context.get(\"logger\")\n", " logger.info(\"saving reference data...\")\n", " file = \"data/crime.csv\"\n", " df = op.load.csv(file, sep=\",\", error_bad_lines=False, header=True, null_value=\"null\", infer_schema='true', charset=\"latin1\").persist()\n", " df = df.cols.unnest(\"OCCURRED_ON_DATE\", separator=\"-\", splits=3, output_cols=\"year\", drop=False)\n", " df = df.cols.drop(\"YEAR\")\n", " df.compute()\n", " \n", "\n", "# schedules.clocks.IntervalClock(\n", "# start_date=pendulum.datetime(\n", "# 2019, 1, 1, tz=\"America/New York\", interval=timedelta(days=1)\n", "# )\n", "# )\n", "from prefect.schedules import IntervalSchedule\n", "from datetime import timedelta\n", "schedule = IntervalSchedule(interval=timedelta(minutes=2))\n", "\n", "with Flow(\"Optimus ETL\", schedule) as flow:\n", " result = run_job()\n", "\n", "\n", "# flow.visualize()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "[2020-03-23 03:35:39,488] INFO - prefect.Flow: Optimus ETL | Waiting for next scheduled run at 2020-03-23T03:36:00+00:00\n", "[2020-03-23 03:36:00,001] INFO - prefect.FlowRunner | Beginning Flow run for 'Optimus ETL'\n", "[2020-03-23 03:36:00,008] INFO - prefect.FlowRunner | Starting flow run.\n", "[2020-03-23 03:36:00,025] INFO - prefect.TaskRunner | Task 'run_job': Starting task run...\n", "[2020-03-23 03:36:00,026] INFO - prefect.Task: run_job | saving reference data...\n", "[2020-03-23 03:36:02,276] INFO - prefect.TaskRunner | Task 'run_job': finished task run for task with final state: 'Success'\n", "[2020-03-23 03:36:02,277] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded\n", "[2020-03-23 03:36:02,278] INFO - prefect.Flow: Optimus ETL | Waiting for next scheduled run at 2020-03-23T03:38:00+00:00\n" ] } ], "source": [ "from prefect.engine.executors import LocalExecutor\n", "flow.run(executor=LocalExecutor())" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "### Schedule https://docs.prefect.io/core/concepts/schedules.html#complex-schedules" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.6" } }, "nbformat": 4, "nbformat_minor": 4 }