+++ title = "Literate Kedro: Using Org-mode for Data Science" author = ["Justin"] date = 2022-07-25T01:43:00-04:00 lastmod = 2022-08-01T05:41:48-04:00 tags = ["org", "kedro", "python", "data-science", "emacs"] draft = false +++ ## Summary {#summary} This is both a general(ish) explainer and template(s) for usage in data science, specifically utilizing (formerly) Quantum Black's now open-source (Linux Foundation) framework for creating {{% sidenote "f1" "reproducible, maintainable, and modular data science code." %}} What does that mean? At its most simple it basically ties things into \"nodes\" and \"pipelines\", where nodes are utility functions and functions that tie things together, and pipelines chain together multiple nodes, allowing for steps to be easily repeated. DAGs, they're all the rage. {{% /sidenote %}} Make sure to read their tutorial if you're not familiar with it, it's good (and I won't be covering nearly as much as there, it's under the assumption you've skimmed it): Some benefits you get using it with org-mode and emacs, however: - Single file navigation / no need-to bounce around VSCode haphazardly when you can jump between headers and tangle to the appropriate places. - Transition to a presentation from inside Org-mode seamlessly - Include data exploration and other documentation in the same file. - Other languages with org-babel Kedro uses a [specific workflow](https://kedro.readthedocs.io/en/stable/tutorial/spaceflights_tutorial.html#kedro-project-development-workflow) which I will follow for this template/tutorial. I'm not entirely sure how useful this will be, but it allows me to get some practice with org-babel and a project excuse to tinker around with kedro some more in a use-case that I haven't seen. This will be more of a "update as I come up with new ideas" type of article, and, of course, if you see something that would be better served by a different method, feel free to leave a comment / issue. **Highlevel / TLDR**: _This is for using org to config Kedro. Similar to using a monolithic org file for configing emacs. Geared towards people who already use emacs and use something like ivy/helm/vertico to traverse headings easily._ ### Assumptions {#assumptions} - I'm using conda for environment management (conda.el / jupyter-python) - You've created the kedro project already since there's the y/n steps insofar as telemetry, etc. - TODO: maybe put the steps here? Probably not worth the timesink insofar as a template. The steps would 95% of the time be done in the command line but once. ## Kedro Spaceflights Tutorial {#kedro-spaceflights-tutorial} **Scenario:** It is 2160 and the space tourism industry is booming. Globally, thousands of space shuttle companies take tourists to the Moon and back. You have been able to source amenities offered in each space shuttle, customer reviews and company information. **Project:** You want to construct a model that predicts the price for each trip to the Moon and the corresponding return flight. ### Setup {#setup} This is for environment setup, utility functions that you might not necessarily want to put in your `init.el` or other things not directly related to the core of the project. #### Emacs and Shell {#emacs-and-shell} - Elisp Commands and utility functions that help reduce repeated code and make the process more seamless. Run manually with `C-c` `C-c`. One could alternatively add this to their `init.el`. The following code is used to prevent org-mode from complaining about the variables. Otherwise you can set these as local variables in a real-life scenario. ```elisp (setq projdir "~/code/literate-kedro") (setq projsub "/src/literate_kedro") (print projdir) ``` - Shell / Other If you plan on using the terminal for all shell commands, this isn't necessary. You can simply activate your conda environment with `conda-env-activate` under the assumption you're using `conda.el`. Because the session is set at the top level, it should persist through the following commands. See the echo commands below confirming. ```bash conda activate kedro-example ``` Making sure I'm in the correct directory / using the correct environment. ```bash echo $(pwd) echo $(which pip) ``` #### requirements.txt {#requirements-dot-txt} The requirements for the kedro project in the form of a ```cfg black~=22.0 flake8>=3.7.9, <4.0 ipython>=7.31.1, <8.0 isort~=5.0 jupyter~=1.0 jupyterlab~=3.0 kedro[pandas.CSVDataSet, pandas.ExcelDataSet, pandas.ParquetDataSet, plotly.PlotlyDataSet, plotly.JSONDataSet]==0.18.2 kedro-telemetry~=0.2.0 kedro-viz~=4.7 nbstripout~=0.4 pytest-cov~=3.0 pytest-mock>=1.7.1, <2.0 pytest~=6.2 scikit-learn~=1.0 ``` #### Configuration {#configuration} The conf folder in kedro is for: - Logging - Credentials - Other Sensitive/Personal content TODO: These are things that generally I would be more comfortable configuring outside of org. You could probably get fancy with piping things with keys back and forth but that remains outside of my scope for now. You could probably get away with safely configing your logging here without needing anything fancy. - data_science.yml ```yaml data_science: active_modelling_pipeline: model_options: test_size: 0.2 random_state: 3 features: - engines - passenger_capacity - crew - d_check_complete - moon_clearance_complete - iata_approved - company_rating - review_scores_rating candidate_modelling_pipeline: model_options: test_size: 0.2 random_state: 8 features: - engines - passenger_capacity - crew - review_scores_rating data_processing.companies_columns: type: tracking.JSONDataSet filepath: data/09_tracking/companies_columns.json data_science.active_modelling_pipeline.metrics: type: tracking.MetricsDataSet filepath: data/09_tracking/metrics.json ``` - settings.py Normally you don't edit this, but there's some cases where it's necessary. This is for storing metrics in a sqlite database. ```python from kedro_viz.integrations.kedro.sqlite_store import SQLiteStore from pathlib import Path SESSION_STORE_CLASS = SQLiteStore SESSION_STORE_ARGS = {"path": str(Path(__file__).parents[2] / "data")} ``` ### Data {#data} The second part of the kedro workflow is the data phase, which involves adding data to the `data` folder, and then referencing the datasets for the project in the `conf/base/catalog.yml` file. #### Preparation {#preparation} Steps involved in acquiring / registering the data to the appropriate catalog(s). You could explain the business case or variables also. - Acquisition In some cases data is easily acquired with a curl/wget, specifically forone-off analyses. It can also be helpful to show where you downloaded your data. ```shell mkdir -p $directory/ #reviews curl -o "$directory/reviews.csv" https://kedro-org.github.io/kedro/reviews.csv # companies curl -o "$directory/companies.csv" https://kedro-org.github.io/kedro/companies.csv # shuttles curl -o "$directory/shuttles.xlsx" https://kedro-org.github.io/kedro/shuttles.xlsx ``` - Data Registration - catalog.yml You now need to register the datasets so they can be loaded by Kedro. All Kedro projects have a `conf/base/catalog.yml`. file. ```yaml companies: type: pandas.CSVDataSet filepath: data/01_raw/companies.csv layer: raw reviews: type: pandas.CSVDataSet filepath: data/01_raw/reviews.csv layer: raw shuttles: type: pandas.ExcelDataSet filepath: data/01_raw/shuttles.xlsx layer: raw data_processing.preprocessed_companies: type: pandas.ParquetDataSet filepath: data/02_intermediate/preprocessed_companies.pq layer: intermediate data_processing.preprocessed_shuttles: type: pandas.ParquetDataSet filepath: data/02_intermediate/preprocessed_shuttles.pq layer: intermediate model_input_table: type: pandas.ParquetDataSet filepath: data/03_primary/model_input_table.pq layer: primary data_science.active_modelling_pipeline.regressor: type: pickle.PickleDataSet filepath: data/06_models/regressor_active.pickle versioned: true layer: models data_science.candidate_modelling_pipeline.regressor: type: pickle.PickleDataSet filepath: data/06_models/regressor_candidate.pickle versioned: true layer: models data_science.active_modelling_pipeline.metrics: type: tracking.MetricsDataSet filepath: data/09_tracking/metrics.json data_processing.companies_columns: type: tracking.JSONDataSet filepath: data/09_tracking/companies_columns.json ``` ### Pipelines {#pipelines} These are the node functions associated with the data_processing pipeline. > In many typical Kedro projects, a single (“main”) pipeline > increases in complexity as the project evolves. To keep your > project fit for purpose, you can create modular pipelines, which > are logically isolated and can be reused. Modular pipelines are > easier to develop, test and maintain, and are portable so they can > be copied and reused between projects. - data_processing This pipeline is for processing the data. - nodes.py > A Kedro node is a wrapper for a Python function that names the inputs > and outputs of that function. It is the building block of a pipeline. > Nodes can be linked when the output of one node is the input of another. NOTE: Could theoretically break this into seperate codeblocks (import, utility, preprocess, but might be a bit cumbersome. Input welcome for best practices.) ```python from typing import Tuple, Dict import pandas as pd def _is_true(x: pd.Series) -> pd.Series: return x == "t" def _parse_percentage(x: pd.Series) -> pd.Series: x = x.str.replace("%", "") x = x.astype(float) / 100 return x def _parse_money(x: pd.Series) -> pd.Series: x = x.str.replace("$", "").str.replace(",", "") x = x.astype(float) return x def preprocess_companies(companies: pd.DataFrame) -> Tuple[pd.DataFrame, Dict]: """Preprocesses the data for companies. Args: companies: Raw data. Returns: Preprocessed data, with `company_rating` converted to a float and `iata_approved` converted to boolean. """ companies["iata_approved"] = _is_true(companies["iata_approved"]) companies["company_rating"] = _parse_percentage(companies["company_rating"]) return companies, {"columns": companies.columns.tolist(), "data_type": "companies"} def preprocess_shuttles(shuttles: pd.DataFrame) -> pd.DataFrame: """Preprocesses the data for shuttles. Args: shuttles: Raw data. Returns: Preprocessed data, with `price` converted to a float and `d_check_complete`, `moon_clearance_complete` converted to boolean. """ shuttles["d_check_complete"] = _is_true(shuttles["d_check_complete"]) shuttles["moon_clearance_complete"] = _is_true(shuttles["moon_clearance_complete"]) shuttles["price"] = _parse_money(shuttles["price"]) return shuttles def create_model_input_table( shuttles: pd.DataFrame, companies: pd.DataFrame, reviews: pd.DataFrame ) -> pd.DataFrame: """Combines all data to create a model input table. Args: shuttles: Preprocessed data for shuttles. companies: Preprocessed data for companies. reviews: Raw data for reviews. Returns: model input table. """ rated_shuttles = shuttles.merge(reviews, left_on="id", right_on="shuttle_id") model_input_table = rated_shuttles.merge( companies, left_on="company_id", right_on="id" ) model_input_table = model_input_table.dropna() return model_input_table ``` - pipeline.py > A Kedro pipeline organises the dependencies and execution order of a collection > of nodes, and connects inputs and outputs. The pipeline determines the node > execution order by resolving dependencies. ```python from kedro.pipeline import Pipeline, node from kedro.pipeline.modular_pipeline import pipeline from .nodes import ( preprocess_companies, preprocess_shuttles, create_model_input_table, ) def create_pipeline(**kwargs) -> Pipeline: return pipeline( [ node( func=preprocess_companies, inputs="companies", outputs=["preprocessed_companies","companies_columns"], name="preprocess_companies_node", ), node( func=preprocess_shuttles, inputs="shuttles", outputs="preprocessed_shuttles", name="preprocess_shuttles_node", ), node( func=create_model_input_table, inputs=["preprocessed_shuttles", "preprocessed_companies", "reviews"], outputs="model_input_table", name="create_model_input_table_node", ), ], namespace="data_processing", inputs=["companies", "shuttles", "reviews"], outputs="model_input_table", ) ``` - data_science This pipeline is for performing data science on the data previously processed. - nodes.py ```python import logging from typing import Dict, Tuple import pandas as pd from sklearn.linear_model import LinearRegression from sklearn.metrics import r2_score, mean_absolute_error, max_error from sklearn.model_selection import train_test_split def split_data(data: pd.DataFrame, parameters: Dict) -> Tuple: """Splits data into features and targets training and test sets. Args: data: Data containing features and target. parameters: Parameters defined in parameters/data_science.yml. Returns: Split data. """ X = data[parameters["features"]] y = data["price"] X_train, X_test, y_train, y_test = train_test_split( X, y, test_size=parameters["test_size"], random_state=parameters["random_state"] ) return X_train, X_test, y_train, y_test def train_model(X_train: pd.DataFrame, y_train: pd.Series) -> LinearRegression: """Trains the linear regression model. Args: X_train: Training data of independent features. y_train: Training data for price. Returns: Trained model. """ regressor = LinearRegression() regressor.fit(X_train, y_train) return regressor def evaluate_model( regressor: LinearRegression, X_test: pd.DataFrame, y_test: pd.Series ) -> Dict[str, float]: """Calculates and logs the coefficient of determination. Args: regressor: Trained model. X_test: Testing data of independent features. y_test: Testing data for price. """ y_pred = regressor.predict(X_test) score = r2_score(y_test, y_pred) mae = mean_absolute_error(y_test, y_pred) me = max_error(y_test, y_pred) logger = logging.getLogger(__name__) logger.info("Model has a coefficient R^2 of %.3f on test data.", score) return {"r2_score": score, "mae": mae, "max_error": me} ``` - pipeline.py ```python from kedro.pipeline import Pipeline, node from kedro.pipeline.modular_pipeline import pipeline from .nodes import evaluate_model, split_data, train_model def create_pipeline(**kwargs) -> Pipeline: pipeline_instance = pipeline( [ node( func=split_data, inputs=["model_input_table", "params:model_options"], outputs=["X_train", "X_test", "y_train", "y_test"], name="split_data_node", ), node( func=train_model, inputs=["X_train", "y_train"], outputs="regressor", name="train_model_node", ), node( func=evaluate_model, inputs=["regressor", "X_test", "y_test"], outputs="metrics", name="evaluate_model_node", ), ] ) ds_pipeline_1 = pipeline( pipe=pipeline_instance, inputs="model_input_table", namespace="active_modelling_pipeline", ) ds_pipeline_2 = pipeline( pipe=pipeline_instance, inputs="model_input_table", namespace="candidate_modelling_pipeline", ) return pipeline( pipe=ds_pipeline_1 + ds_pipeline_2, inputs="model_input_table", namespace="data_science", ) ``` - pipeline_registry.py This is the pipeline registry, where you add the appropriate pipelines defined further down into the register function. This is how kedro recognizes which pipelines to run. ```python from typing import Dict from kedro.pipeline import Pipeline from literate_kedro.pipelines import data_processing as dp from literate_kedro.pipelines import data_science as ds def register_pipelines() -> Dict[str, Pipeline]: """Register the project's pipelines. Returns: A mapping from a pipeline name to a ``Pipeline`` object. """ data_processing_pipeline = dp.create_pipeline() data_science_pipeline = ds.create_pipeline() return { "__default__": data_processing_pipeline + data_science_pipeline, "dp": data_processing_pipeline, "ds": data_science_pipeline, } ``` ## Conclusions {#conclusions} This is pretty much all you need to replicate the kedro tutorial from within a single org-file. I would be remiss to not mention that kedro itself has a pretty good alternative of simply using their jupyter plugin, but I tend to use emacs all the time anyways, so I figured, what the hey. There are still a couple things I'd like to ponder over insofar as best practices and future articles like: - Should you split up the nodes and pipelines further? Using literate programmign to basically make a utility, processing, etc. code block. - Version control, only on the tangled files? - Making use of transclusion from other files. - Turning the "relevant parts"' into a presentation - slicing the org document into exploratory analysis that ignores the code depending on audience. I'll probably try to write an article doing a "real" analysis with this workflow in the future. ```sh cp ./literate-kedro-spaceflight.org ~/code/justin.vc/static/org/literate-kedro-spaceflight.org ``` This copies the file when I export, which allows you to see the [original org file](https://raw.githubusercontent.com/brickfrog/justin.vc/master/static/org/literate-kedro-spaceflight.org) if curious. That specifically includes how I tangled the files to the correct places. (And, once again, input welcome if I there's better ways to do it).