Introduction
This article is a tutorial on Kedro along the lines of Kedro spaceflights tutorial (a project to build a model to predict the price of a trip to the moon and its return flight).
The development flow using Kedro is as follows:
- Create a project template
- Create a new project and install the library.
- Set up credentials and logs.
- Set up data folder
- Store data in the
data
folder. - Refer all datasets in the project.
- Store data in the
- Create pipelines
- Build the Node.
- Choose how to run the Pipeline.
- Package the project
- Build project documentation.
- Package the project for distribution.
Create a project template
First, build the virtual environment.
$ python -m venv venv
$ . venv/bin/activate
$ pip install --upgrade pip
Install Kedro.
$ pip install kedro
Create a new project.
$ kedro new
Project Name
============
Please enter a human readable name for your new project.
Spaces, hyphens, and underscores are allowed.
[New Kedro Project]: spaceflights
The project name 'spaceflights' has been applied to:
- The project title in /Users/ryu/spaceflights/README.md
- The folder created for your project in /Users/ryu/spaceflights
- The project's python package in /Users/ryu/demo/src/spaceflights
A best-practice setup includes initialising git and creating a virtual environment before running 'pip install -r src/requirements.txt' to install project-specific dependencies. Refer to the Kedro documentation: https://kedro.readthedocs.io/
Navigate to the directory of the created project.
$ cd spaceflights
Describe the required libraries in src/requirements.txt
.
black~=22.0
flake8>=3.7.9, <5.0
ipython>=7.31.1, <8.0
isort~=5.0
jupyter~=1.0
jupyterlab_server>=2.11.1, <2.16.0
jupyterlab~=3.0
kedro~=0.18.4
kedro-datasets[pandas.CSVDataSet, pandas.ExcelDataSet, pandas.ParquetDataSet]~=1.0.0
kedro-telemetry~=0.2.0
kedro-viz~=5.0
nbstripout~=0.4
pytest-cov~=3.0
pytest-mock>=1.7.1, <2.0
pytest~=6.2
scikit-learn~=1.0
Install the libraries.
$ pip install -r src/requirements.txt
To store authentication information such as user name and password, add them to conf/local/credentials.yml
.
dev_sql:
username: admin
password: admin
You can also modify logging settings by editing conf/base/logging.yml
. The default is as follows.
version: 1
disable_existing_loggers: False
formatters:
simple:
format: "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
handlers:
console:
class: logging.StreamHandler
level: INFO
formatter: simple
stream: ext://sys.stdout
info_file_handler:
class: logging.handlers.RotatingFileHandler
level: INFO
formatter: simple
filename: logs/info.log
maxBytes: 10485760 # 10MB
backupCount: 20
encoding: utf8
delay: True
error_file_handler:
class: logging.handlers.RotatingFileHandler
level: ERROR
formatter: simple
filename: logs/errors.log
maxBytes: 10485760 # 10MB
backupCount: 20
encoding: utf8
delay: True
rich:
class: rich.logging.RichHandler
loggers:
kedro:
level: INFO
spaceflights:
level: INFO
root:
handlers: [rich, info_file_handler, error_file_handler]
Set up data folder
Download the following three files and store them in data/01_raw
.
Register the downloaded data in conf/base/catalog.yml
.
companies:
type: pandas.CSVDataSet
filepath: data/01_raw/companies.csv
reviews:
type: pandas.CSVDataSet
filepath: data/01_raw/reviews.csv
shuttles:
type: pandas.ExcelDataSet
filepath: data/01_raw/shuttles.xlsx
load_args:
engine: openpyxl # Use modern Excel engine (the default since Kedro 0.18.0)
Tests if it is possible to load csv
data.
$ kedro ipython
In [1]: companies = catalog.load("Companies")
[01/14/23 15:15:57] INFO Loading data from 'companies' (CSVDataSet)...
In [2]: companies.head()
Out[2]:
id company_rating company_location total_fleet_count iata_approved
0 35029 100% Niue 4.0 f
1 30292 67% Anguilla 6.0 f
2 19032 67% Russian Federation 4.0 f
3 8238 91% Barbados 15.0 t
4 30342 NaN Sao Tome and Principe 2.0 t
Tests if it is possible to load xlsx
data.
In [3]: shuttles = catalog.load("shuttles")
[01/14/23 15:20:09] INFO Loading data from 'shuttles' (ExcelDataSet)... data_catalog.py:343
In [4]: shuttles.head()
Out[4]:
id shuttle_location shuttle_type engine_type ... d_check_complete moon_clearance_complete price company_id
0 63561 Niue Type V5 Quantum ... f f $1,325.0 35029
1 36260 Anguilla Type V5 Quantum ... t f $1,780.0 30292
2 57015 Russian Federation Type V5 Quantum ... f f $1,715.0 19032
3 14035 Barbados Type V5 Plasma ... f f $4,770.0 8238
4 10036 Sao Tome and Principe Type V2 Plasma ... f f $2,820.0 30342
To exit the ipython
session, run exit()
.
In [5]: exit()
Create pipelines
Data processing pipeline
Create a data processing pipeline.
$ kedro pipeline create data_processing
The above command will generate the following file.
src/spaceflights/pipelines/data_processing/nodes.py
src/spaceflights/pipelines/data_processing/pipelines.py
conf/base/parameters/data_processing.yml
src/tests/pipelines/data_processing
__init__.py
Write the following code in src/spaceflights/pipelines/data_processing/nodes.py
.
import pandas as pd
def _is_true(x: pd.Series) -> pd.Series:
return x == "t"
def _parse_percentage(x: pd.Series) -> pd.Series:
x = x.str.replace("%", "")
x = x.astype(float) / 100
return x
def _parse_money(x: pd.Series) -> pd.Series:
x = x.str.replace("$", "").str.replace(",", "")
x = x.astype(float)
return x
def preprocess_companies(companies: pd.DataFrame) -> pd.DataFrame:
"""Preprocesses the data for companies.
Args:
companies: Raw data.
Returns:
Preprocessed data, with `company_rating` converted to a float and
`iata_approved` converted to boolean.
"""
companies["iata_approved"] = _is_true(companies["iata_approved"])
companies["company_rating"] = _parse_percentage(companies["company_rating"])
return companies
def preprocess_shuttles(shuttles: pd.DataFrame) -> pd.DataFrame:
"""Preprocesses the data for shuttles.
Args:
shuttles: Raw data.
Returns:
Preprocessed data, with `price` converted to a float and `d_check_complete`,
`moon_clearance_complete` converted to boolean.
"""
shuttles["d_check_complete"] = _is_true(shuttles["d_check_complete"])
shuttles["moon_clearance_complete"] = _is_true(shuttles["moon_clearance_complete"])
shuttles["price"] = _parse_money(shuttles["price"])
return shuttles
def create_model_input_table(
shuttles: pd.DataFrame, companies: pd.DataFrame, reviews: pd.DataFrame
) -> pd.DataFrame:
"""Combines all data to create a model input table.
Args:
shuttles: Preprocessed data for shuttles.
companies: Preprocessed data for companies.
reviews: Raw data for reviews.
Returns:
Model input table.
"""
rated_shuttles = shuttles.merge(reviews, left_on="id", right_on="shuttle_id")
model_input_table = rated_shuttles.merge(
companies, left_on="company_id", right_on="id"
)
model_input_table = model_input_table.dropna()
return model_input_table
Write the following code in src/spaceflights/pipelines/data_processing/pipelines.py
.
from kedro.pipeline import Pipeline, node, pipeline
from .nodes import create_model_input_table, preprocess_companies, preprocess_shuttles
def create_pipeline(**kwargs) -> Pipeline:
return pipeline(
[
node(
func=preprocess_companies,
inputs="companies",
outputs="preprocessed_companies",
name="preprocess_companies_node",
),
node(
func=preprocess_shuttles,
inputs="shuttles",
outputs="preprocessed_shuttles",
name="preprocess_shuttles_node",
),
node(
func=create_model_input_table,
inputs=["preprocessed_shuttles", "preprocessed_companies", "reviews"],
outputs="model_input_table",
name="create_model_input_table_node",
),
]
)
The above code creates a node for each function and creates a pipeline for data processing.
The following code tests the preprocess_companies_node
node.
$ kedro run --node=preprocess_companies_node
[08/09/22 16:43:10] INFO Kedro project kedro-tutorial session.py:346
[08/09/22 16:43:11] INFO Loading data from 'companies' (CSVDataSet)... data_catalog.py:343
INFO Running node: preprocess_companies_node: node.py:327
preprocess_companies([companies]) -> [preprocessed_companies]
INFO Saving data to 'preprocessed_companies' (MemoryDataSet)... data_catalog.py:382
INFO Completed 1 out of 1 tasks sequential_runner.py:85
INFO Pipeline execution completed successfully. runner.py:89
INFO Loading data from 'preprocessed_companies' (MemoryDataSet)... data_catalog.py:343
To test all nodes as a complete data processing pipeline, run kedro run
.
$ kedro run
[08/09/22 17:00:54] INFO Kedro project kedro-tutorial session.py:346
[08/09/22 17:01:10] INFO Reached after_catalog_created hook plugin.py:17
INFO Loading data from 'companies' (CSVDataSet)... data_catalog.py:343
INFO Running node: preprocess_companies_node: node.py:327
preprocess_companies([companies]) -> [preprocessed_companies]
INFO Saving data to 'preprocessed_companies' (MemoryDataSet)... data_catalog.py:382
INFO Completed 1 out of 3 tasks sequential_runner.py:85
INFO Loading data from 'shuttles' (ExcelDataSet)... data_catalog.py:343
[08/09/22 17:01:25] INFO Running node: preprocess_shuttles_node: preprocess_shuttles([shuttles]) node.py:327
-> [preprocessed_shuttles]
INFO Saving data to 'preprocessed_shuttles' (MemoryDataSet)... data_catalog.py:382
INFO Completed 2 out of 3 tasks sequential_runner.py:85
INFO Loading data from 'preprocessed_shuttles' (MemoryDataSet)... data_catalog.py:343
INFO Loading data from 'preprocessed_companies' (MemoryDataSet)... data_catalog.py:343
INFO Loading data from 'reviews' (CSVDataSet)... data_catalog.py:343
INFO Running node: create_model_input_table_node: node.py:327
create_model_input_table([preprocessed_shuttles,preprocessed_companies,
reviews]) -> [model_input_table]
[08/09/22 17:01:28] INFO Saving data to 'model_input_table' (MemoryDataSet)... data_catalog.py:382
[08/09/22 17:01:29] INFO Completed 3 out of 3 tasks sequential_runner.py:85
INFO Pipeline execution completed successfully. runner.py:89
INFO Loading data from 'model_input_table' (MemoryDataSet)... data_catalog.py:343
If you want to save the data generated by the preprocessing data in a file, add the following to conf/base/catalog.yml
.
preprocessed_companies:
type: pandas.ParquetDataSet
filepath: data/02_intermediate/preprocessed_companies.pq
preprocessed_shuttles:
type: pandas.ParquetDataSet
filepath: data/02_intermediate/preprocessed_shuttles.pq
model_input_table:
type: pandas.ParquetDataSet
filepath: data/03_primary/model_input_table.pq
Data science pipeline
Create a data science pipeline.
$ kedro pipeline create data_science
Write the following code in src/spaceflights/pipelines/data_science/nodes.py
. LinearRegression from scikit-learn is used for the data science code.
import logging
from typing import Dict, Tuple
import pandas as pd
from sklearn.linear_model import LinearRegression
from sklearn.metrics import r2_score
from sklearn.model_selection import train_test_split
def split_data(data: pd.DataFrame, parameters: Dict) -> Tuple:
"""Splits data into features and targets training and test sets.
Args:
data: Data containing features and target.
parameters: Parameters defined in parameters/data_science.yml.
Returns:
Split data.
"""
X = data[parameters["features"]]
y = data["price"]
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=parameters["test_size"], random_state=parameters["random_state"]
)
return X_train, X_test, y_train, y_test
def train_model(X_train: pd.DataFrame, y_train: pd.Series) -> LinearRegression:
"""Trains the linear regression model.
Args:
X_train: Training data of independent features.
y_train: Training data for price.
Returns:
Trained model.
"""
regressor = LinearRegression()
regressor.fit(X_train, y_train)
return regressor
def evaluate_model(
regressor: LinearRegression, X_test: pd.DataFrame, y_test: pd.Series
):
"""Calculates and logs the coefficient of determination.
Args:
regressor: Trained model.
X_test: Testing data of independent features.
y_test: Testing data for price.
"""
y_pred = regressor.predict(X_test)
score = r2_score(y_test, y_pred)
logger = logging.getLogger(__name__)
logger.info("Model has a coefficient R^2 of %.3f on test data.", score)
Set parameters in conf/base/parameters/data_science.yml
that will be referenced during pipeline execution.
model_options:
test_size: 0.2
random_state: 3
features:
- engines
- passenger_capacity
- crew
- d_check_complete
- moon_clearance_complete
- iata_approved
- company_rating
- review_scores_rating
Write the following code in src/spaceflights/pipelines/data_science/pipeline.py
.
from kedro.pipeline import Pipeline, node, pipeline
from .nodes import evaluate_model, split_data, train_model
def create_pipeline(**kwargs) -> Pipeline:
return pipeline(
[
node(
func=split_data,
inputs=["model_input_table", "params:model_options"],
outputs=["X_train", "X_test", "y_train", "y_test"],
name="split_data_node",
),
node(
func=train_model,
inputs=["X_train", "y_train"],
outputs="regressor",
name="train_model_node",
),
node(
func=evaluate_model,
inputs=["regressor", "X_test", "y_test"],
outputs=None,
name="evaluate_model_node",
),
]
)
Add the dataset to conf/base/catalog.yml
.
regressor:
type: pickle.PickleDataSet
filepath: data/06_models/regressor.pickle
versioned: true
Run kedro run
to test the pipeline.
$ kedro run
[08/09/22 16:56:00] INFO Kedro project kedro-tutorial session.py:346
INFO Loading data from 'companies' (CSVDataSet)... data_catalog.py:343
INFO Running node: preprocess_companies_node: node.py:327
preprocess_companies([companies]) -> [preprocessed_companies]
INFO Saving data to 'preprocessed_companies' (MemoryDataSet)... data_catalog.py:382
INFO Completed 1 out of 6 tasks sequential_runner.py:85
INFO Loading data from 'shuttles' (ExcelDataSet)... data_catalog.py:343
[08/09/22 16:56:15] INFO Running node: preprocess_shuttles_node: preprocess_shuttles([shuttles]) node.py:327
-> [preprocessed_shuttles]
INFO Saving data to 'preprocessed_shuttles' (MemoryDataSet)... data_catalog.py:382
INFO Completed 2 out of 6 tasks sequential_runner.py:85
INFO Loading data from 'preprocessed_shuttles' (MemoryDataSet)... data_catalog.py:343
INFO Loading data from 'preprocessed_companies' (MemoryDataSet)... data_catalog.py:343
INFO Loading data from 'reviews' (CSVDataSet)... data_catalog.py:343
INFO Running node: create_model_input_table_node: node.py:327
create_model_input_table([preprocessed_shuttles,preprocessed_companies,
reviews]) -> [model_input_table]
[08/09/22 16:56:18] INFO Saving data to 'model_input_table' (MemoryDataSet)... data_catalog.py:382
[08/09/22 16:56:19] INFO Completed 3 out of 6 tasks sequential_runner.py:85
INFO Loading data from 'model_input_table' (MemoryDataSet)... data_catalog.py:343
INFO Loading data from 'params:model_options' (MemoryDataSet)... data_catalog.py:343
INFO Running node: split_data_node: node.py:327
split_data([model_input_table,params:model_options]) ->
[X_train,X_test,y_train,y_test]
INFO Saving data to 'X_train' (MemoryDataSet)... data_catalog.py:382
INFO Saving data to 'X_test' (MemoryDataSet)... data_catalog.py:382
INFO Saving data to 'y_train' (MemoryDataSet)... data_catalog.py:382
INFO Saving data to 'y_test' (MemoryDataSet)... data_catalog.py:382
INFO Completed 4 out of 6 tasks sequential_runner.py:85
INFO Loading data from 'X_train' (MemoryDataSet)... data_catalog.py:343
INFO Loading data from 'y_train' (MemoryDataSet)... data_catalog.py:343
INFO Running node: train_model_node: train_model([X_train,y_train]) -> node.py:327
[regressor]
[08/09/22 16:56:20] INFO Saving data to 'regressor' (PickleDataSet)... data_catalog.py:382
INFO Completed 5 out of 6 tasks sequential_runner.py:85
INFO Loading data from 'regressor' (PickleDataSet)... data_catalog.py:343
INFO Loading data from 'X_test' (MemoryDataSet)... data_catalog.py:343
INFO Loading data from 'y_test' (MemoryDataSet)... data_catalog.py:343
INFO Running node: evaluate_model_node: node.py:327
evaluate_model([regressor,X_test,y_test]) -> None
INFO Model has a coefficient R^2 of 0.462 on test data. nodes.py:55
INFO Completed 6 out of 6 tasks sequential_runner.py:85
INFO Pipeline execution completed successfully. runner.py:89
Package the project
Generate documentation
The following commands can be executed in the root directory to generate documentation.
$ kedro build-docs --open
An HTML document will be generated in docs/build/html
and automatically opened in your browser.
Packaging
The following command can be executed in the root directory to package a project.
$ kedro package
The package will be built in the /dist
folder and will generate a .egg
file and a .whl
file. Note that the package contains only Python code, not conf
, data
, logs
, etc.
The following command will generate a project in the bin
folder of your Python installation.
$ pip install <path-to-wheel-file>
References