2023-01-14

Kedro

What is Kedro

Kedro is an OSS pipeline management tool developed by Quantum Black Lab, a research arm of McKinsey & Company, that allows you to create highly reproducible, maintainable, and modular data science code.

Kedro features

Kedro has the following main features.

  • Pipeline visualization
    • You can visualize the DAG of your pipeline.
  • Data catalog
    • Lightweight data connectors for reading and writing data between many file formats (Pandas, Spark, Dask, NetworkX, Pickle, Plotly, Matplotlib, etc.) and file systems (S3, GCP, Azure, sFTP, DBFS, local) is provided.
  • Integration
    • Excellent integration with Apache Spark, Pandas, Dask, Matplotlib, Plotly, fsspec, Apache Airflow, Jupyter Notebook, Docker, etc.
  • Project templates
    • Project templates allow you to standardize how you organize your configuration, source code, tests, documentation, notebooks, etc.
  • Pipeline abstraction
    • Kedro supports a dataset-driven workflow that automatically resolves dependencies between functions, eliminating the need to label the order of execution of tasks in the pipeline.
  • Flexibility in deployment
    • Pipelines can be deployed to Argo, Prefect, Kubeflow, AWS Batch, AWS Sagemaker, Databricks, Dask, etc.

Kedro components

Kedro has the following four components

  • Node
  • Pipeline
  • DataCatalog
  • Runner

Node

A node is the unit of each process in the pipeline. You use node() to define the processing, input, and output.

In the following code, the return_greeting function is wrapped in a node called return_greeting_node, which has no input and names its output my_salutation.

from kedro.pipeline import node

# Prepare first node
def return_greeting():
    return "Hello"

return_greeting_node = node(func=return_greeting,
                            inputs=None,
                            outputs="my_salutation")

In the following code, the join_statements function is wrapped in a node called join_statements_node with my_salutation as input and the output named my_message.

# Prepare second node
def join_statements(greeting):
    return f"{greeting} Kedro!"

join_statements_node = node(
    join_statements,
    inputs="my_salutation",
    outputs="my_message"
)

It means that my_salutation is an output of return_greeting_node and an input of join_statements_node. Thus, inputs and outputs of node() can define the dependencies of the pipeline.

Pipeline

A pipeline is an execution pipeline. A pipeline is constructed by passing a list of Nodes to Pipeline(). The order of the list to be passed to the pipeline is arbitrary.

In the following pipeline, return_greeting_node is executed first, followed by join_statements_node.

from kedro.pipeline import Pipeline

# Assemble nodes into a pipeline
pipeline = Pipeline([return_greeting_node, join_statements_node])

DataCatalog

DataCatalog is a catalog that defines the data to be used in the pipeline; DataCatalog takes Node inputs and outputs as keys and reads and writes them into a Dataset. The Dataset supports various file formats and file systems. Please refer to the official documentation below for available Datasets.

https://kedro.readthedocs.io/en/0.17.6/kedro.extras.datasets.html

The following code uses MemoryDataSet to store my_salutation in memory.

from kedro.io import DataCatalog, MemoryDataSet

# Prepare a data catalog
data_catalog = DataCatalog({"my_salutation": MemoryDataSet()})

Runner

There are three types of runners: SequentialRunner for executing pipelines in series, ParallelRunner for executing pipelines in parallel, and ThreadRunner for executing pipelines in threads.

# Create a runner to run the pipeline
runner = SequentialRunner()

# Run the pipeline
print(runner.run(pipeline, data_catalog))

Hello Kedro

The full code for the above components is shown below.

hello_kedro.py
from kedro.io import DataCatalog, MemoryDataSet
from kedro.pipeline import node, Pipeline
from kedro.runner import SequentialRunner

# Prepare a data catalog
data_catalog = DataCatalog({"my_salutation": MemoryDataSet()})

# Prepare first node
def return_greeting():
    return "Hello"


return_greeting_node = node(return_greeting, inputs=None, outputs="my_salutation")

# Prepare second node
def join_statements(greeting):
    return f"{greeting} Kedro!"


join_statements_node = node(
    join_statements, inputs="my_salutation", outputs="my_message"
)

# Assemble nodes into a pipeline
pipeline = Pipeline([return_greeting_node, join_statements_node])

# Create a runner to run the pipeline
runner = SequentialRunner()

# Run the pipeline
print(runner.run(pipeline, data_catalog))

When the code is executed, the following processes are performed.

  1. Execute return_greeting_node (which executes return_greeting and outputs the string "Hello")
  2. Store the output string to MemoryDataSet named my_salutation
  3. Run join_statements_node (the my_salutation data set is loaded and injected into the join_statements function)
  4. Concatenate with "Kedro!" to produce the output string "Hello Kedro!
  5. Return the output of the pipeline in a dictionary with my_message as key
$ python hello_kedro.py

{'my_message': 'Hello Kedro!'}

How to use Kedro

Here is how to use Kedro.

Installation

Install Kedro with the following command.

$ pip install kedro

Verify that kedro has been installed.

$ kedro info

| | _____  __| |_ __ ___
| |/ / _ \/ _` | '__/ _ \
|   <  __/ (_| | | | (_) |
|_|\_\___|\__,_|_|  \___/
v0.18.4

Kedro is a Python framework for
creating reproducible, maintainable
and modular data science code.

No plugins installed

Project creation

Kedro has a function to create a project from a template. Basically, you can use this template function to create a project.

kedro new

Create a project with the kedro new command. At this point, you will be asked a series of interactive questions, which you will answer.

$ kedro new

Project Name
============
Please enter a human readable name for your new project.
Spaces, hyphens, and underscores are allowed.
 [New Kedro Project]: get_started

The project name 'get_started' has been applied to:
- The project title in /Users/ryu/get_started/README.md
- The folder created for your project in /Users/ryu/get_started
- The project's python package in /Users/ryu/demo/src/get_started

A best-practice setup includes initialising git and creating a virtual environment before running 'pip install -r src/requirements.txt' to install project-specific dependencies. Refer to the Kedro documentation: https://kedro.readthedocs.io/

The following directories are created.

get-started
├── README.md
├── conf
│   ├── README.md
│   ├── base
│   │   ├── catalog.yml
│   │   ├── logging.yml
│   │   └── parameters.yml
│   └── local
│       └── credentials.yml
├── data
│   ├── 01_raw
│   ├── 02_intermediate
│   ├── 03_primary
│   ├── 04_feature
│   ├── 05_model_input
│   ├── 06_models
│   ├── 07_model_output
│   └── 08_reporting
├── docs
│   └── source
│       ├── conf.py
│       └── index.rst
├── logs
├── notebooks
├── pyproject.toml
├── setup.cfg
└── src
    ├── get_started
    │   ├── __init__.py
    │   ├── __main__.py
    │   ├── pipeline_registry.py
    │   ├── pipelines
    │   │   └── __init__.py
    │   └── settings.py
    ├── requirements.txt
    ├── setup.py
    └── tests
        ├── __init__.py
        ├── pipelines
        │   └── __init__.py
        └── test_run.py

The following is a description of the directory.

File name (folder name) Description
conf Project configuration files
data Local project data (not committed to version control)
docs Project documentation
logs Project output logs (not committed to version control)
notebooks Project-related Jupyter notebooks (can be used for experimental code before moving the code to src)
pyproject.toml Identifies the project root and contains configuration information
setup.cfg Configuration options for pytest when doing kedro test and for the isort utility when doing kedro lint
src Project source code

The following command will install the package.

# Move to project directory
$ cd get_started

# Install dependencies
$ pip install -r src/requirements.txt

You can run the pipeline with the kedro run command.

$ kedro run

However, in the current state, the above command will result in an error because the pipeline logic has not been scripted.

kedro new --config

You can create a project from a previously prepared Yaml file by adding the --config flag when running kedro new.

code/config.yaml
output_dir: ~/code
project_name: My First Kedro Project
repo_name: testing-kedro
python_package: test_kedro
$ kedro new --config <path>/config.yml

kedro new --starter=pandas-iris

You can create a template project for the iris dataset by specifying pandas-iris as the starter.

$ kedro new --starter=pandas-iris

Project Name
============
Please enter a human readable name for your new project.
Spaces, hyphens, and underscores are allowed.
 [Iris]:

The project name 'Iris' has been applied to:
- The project title in /Users/ryu/iris/README.md
- The folder created for your project in /Users/ryu/iris
- The project's python package in /Users/ryu/iris/src/iris

A best-practice setup includes initialising git and creating a virtual environment before running 'pip install -r src/requirements.txt' to install project-specific dependencies. Refer to the Kedro documentation: https://kedro.readthedocs.io/
iris
├── README.md
├── conf
│   ├── README.md
│   ├── base
│   │   ├── catalog.yml
│   │   ├── logging.yml
│   │   └── parameters.yml
│   └── local
│       └── credentials.yml
├── data
│   ├── 01_raw
│   │   └── iris.csv
│   ├── 02_intermediate
│   ├── 03_primary
│   ├── 04_feature
│   ├── 05_model_input
│   ├── 06_models
│   ├── 07_model_output
│   └── 08_reporting
├── docs
│   └── source
│       ├── conf.py
│       └── index.rst
├── logs
├── notebooks
├── pyproject.toml
├── setup.cfg
└── src
    ├── iris
    │   ├── README.md
    │   ├── __init__.py
    │   ├── __main__.py
    │   ├── nodes.py
    │   ├── pipeline.py
    │   ├── pipeline_registry.py
    │   └── settings.py
    ├── requirements.txt
    ├── setup.py
    └── tests
        ├── __init__.py
        ├── test_pipeline.py
        └── test_run.py

Install the libraries.

$ cd iris
$ pip install -r src/requirements.txt

Execute the pipeline.

$ kedro run

You have opted out of product usage analytics, so none will be collected.
[01/14/23 11:53:19] INFO     Kedro project iris                                                                           session.py:340
[01/14/23 11:53:20] INFO     Loading data from 'example_iris_data' (CSVDataSet)...                                   data_catalog.py:343
                    INFO     Loading data from 'parameters' (MemoryDataSet)...                                       data_catalog.py:343
                    INFO     Running node: split: split_data([example_iris_data,parameters]) ->                              node.py:327
                             [X_train,X_test,y_train,y_test]
                    INFO     Saving data to 'X_train' (MemoryDataSet)...                                             data_catalog.py:382
                    INFO     Saving data to 'X_test' (MemoryDataSet)...                                              data_catalog.py:382
                    INFO     Saving data to 'y_train' (MemoryDataSet)...                                             data_catalog.py:382
                    INFO     Saving data to 'y_test' (MemoryDataSet)...                                              data_catalog.py:382
                    INFO     Completed 1 out of 3 tasks                                                          sequential_runner.py:85
                    INFO     Loading data from 'X_train' (MemoryDataSet)...                                          data_catalog.py:343
                    INFO     Loading data from 'X_test' (MemoryDataSet)...                                           data_catalog.py:343
                    INFO     Loading data from 'y_train' (MemoryDataSet)...                                          data_catalog.py:343
                    INFO     Running node: make_predictions: make_predictions([X_train,X_test,y_train]) -> [y_pred]          node.py:327
                    INFO     Saving data to 'y_pred' (MemoryDataSet)...                                              data_catalog.py:382
                    INFO     Completed 2 out of 3 tasks                                                          sequential_runner.py:85
                    INFO     Loading data from 'y_pred' (MemoryDataSet)...                                           data_catalog.py:343
                    INFO     Loading data from 'y_test' (MemoryDataSet)...                                           data_catalog.py:343
                    INFO     Running node: report_accuracy: report_accuracy([y_pred,y_test]) -> None                         node.py:327
                    INFO     Model has accuracy of 0.933 on test data.                                                       nodes.py:74
                    INFO     Completed 3 out of 3 tasks                                                          sequential_runner.py:85
                    INFO     Pipeline execution completed successfully.

A log of the processing performed is output to logs/info.log.

logs/info.log
2023-01-14 11:53:19,849 - kedro.framework.session.session - INFO - Kedro project iris
2023-01-14 11:53:20,796 - kedro.io.data_catalog - INFO - Loading data from 'example_iris_data' (CSVDataSet)...
2023-01-14 11:53:20,805 - kedro.io.data_catalog - INFO - Loading data from 'parameters' (MemoryDataSet)...
2023-01-14 11:53:20,807 - kedro.pipeline.node - INFO - Running node: split: split_data([example_iris_data,parameters]) -> [X_train,X_test,y_train,y_test]
2023-01-14 11:53:20,811 - kedro.io.data_catalog - INFO - Saving data to 'X_train' (MemoryDataSet)...
2023-01-14 11:53:20,813 - kedro.io.data_catalog - INFO - Saving data to 'X_test' (MemoryDataSet)...
2023-01-14 11:53:20,814 - kedro.io.data_catalog - INFO - Saving data to 'y_train' (MemoryDataSet)...
2023-01-14 11:53:20,815 - kedro.io.data_catalog - INFO - Saving data to 'y_test' (MemoryDataSet)...
2023-01-14 11:53:20,817 - kedro.runner.sequential_runner - INFO - Completed 1 out of 3 tasks
2023-01-14 11:53:20,818 - kedro.io.data_catalog - INFO - Loading data from 'X_train' (MemoryDataSet)...
2023-01-14 11:53:20,820 - kedro.io.data_catalog - INFO - Loading data from 'X_test' (MemoryDataSet)...
2023-01-14 11:53:20,821 - kedro.io.data_catalog - INFO - Loading data from 'y_train' (MemoryDataSet)...
2023-01-14 11:53:20,822 - kedro.pipeline.node - INFO - Running node: make_predictions: make_predictions([X_train,X_test,y_train]) -> [y_pred]
2023-01-14 11:53:20,824 - kedro.io.data_catalog - INFO - Saving data to 'y_pred' (MemoryDataSet)...
2023-01-14 11:53:20,826 - kedro.runner.sequential_runner - INFO - Completed 2 out of 3 tasks
2023-01-14 11:53:20,827 - kedro.io.data_catalog - INFO - Loading data from 'y_pred' (MemoryDataSet)...
2023-01-14 11:53:20,829 - kedro.io.data_catalog - INFO - Loading data from 'y_test' (MemoryDataSet)...
2023-01-14 11:53:20,830 - kedro.pipeline.node - INFO - Running node: report_accuracy: report_accuracy([y_pred,y_test]) -> None
2023-01-14 11:53:20,832 - iris.nodes - INFO - Model has accuracy of 0.933 on test data.
2023-01-14 11:53:20,833 - kedro.runner.sequential_runner - INFO - Completed 3 out of 3 tasks
2023-01-14 11:53:20,835 - kedro.runner.sequential_runner - INFO - Pipeline execution completed successfully.

Other starters

In addition to pandas-iris, Kedro offers the following Starter.

Starter Description
astro-airflow-iris The Kedro Iris dataset example project with a minimal setup for deploying the pipeline on Airflow with Astronomer.
standalone-datacatalog A minimum setup to use the traditional Iris dataset with Kedro’s DataCatalog, which is a core component of Kedro. This starter is of use in the exploratory phase of a project. For more information, read the guide to standalone use of the DataCatalog. This starter was formerly known as mini-kedro.
pandas-iris The Kedro Iris dataset example project
pyspark-iris An alternative Kedro Iris dataset example, using PySpark
pyspark The configuration and initialisation code for a Kedro pipeline using PySpark
spaceflights The spaceflights tutorial example code

References

https://kedro.org/
https://kedro.readthedocs.io/en/stable/get_started/kedro_concepts.html
https://kedro.readthedocs.io/en/0.17.6/02_get_started/03_hello_kedro.html
https://kedro.readthedocs.io/en/0.17.6/kedro.extras.datasets.html
https://kedro.readthedocs.io/en/stable/kedro_project_setup/starters.html
https://neptune.ai/blog/data-science-pipelines-with-kedro

Ryusei Kakujo

researchgatelinkedingithub

Focusing on data science for mobility

Bench Press 100kg!