What is Kedro
Kedro is an OSS pipeline management tool developed by Quantum Black Lab, a research arm of McKinsey & Company, that allows you to create highly reproducible, maintainable, and modular data science code.
Kedro features
Kedro has the following main features.
- Pipeline visualization
- You can visualize the DAG of your pipeline.
- Data catalog
- Lightweight data connectors for reading and writing data between many file formats (Pandas, Spark, Dask, NetworkX, Pickle, Plotly, Matplotlib, etc.) and file systems (S3, GCP, Azure, sFTP, DBFS, local) is provided.
- Integration
- Excellent integration with Apache Spark, Pandas, Dask, Matplotlib, Plotly, fsspec, Apache Airflow, Jupyter Notebook, Docker, etc.
- Project templates
- Project templates allow you to standardize how you organize your configuration, source code, tests, documentation, notebooks, etc.
- Pipeline abstraction
- Kedro supports a dataset-driven workflow that automatically resolves dependencies between functions, eliminating the need to label the order of execution of tasks in the pipeline.
- Flexibility in deployment
- Pipelines can be deployed to Argo, Prefect, Kubeflow, AWS Batch, AWS Sagemaker, Databricks, Dask, etc.
Kedro components
Kedro has the following four components
- Node
- Pipeline
- DataCatalog
- Runner
Node
A node is the unit of each process in the pipeline. You use node()
to define the processing, input, and output.
In the following code, the return_greeting
function is wrapped in a node called return_greeting_node
, which has no input and names its output my_salutation
.
from kedro.pipeline import node
# Prepare first node
def return_greeting():
return "Hello"
return_greeting_node = node(func=return_greeting,
inputs=None,
outputs="my_salutation")
In the following code, the join_statements
function is wrapped in a node called join_statements_node
with my_salutation
as input and the output named my_message
.
# Prepare second node
def join_statements(greeting):
return f"{greeting} Kedro!"
join_statements_node = node(
join_statements,
inputs="my_salutation",
outputs="my_message"
)
It means that my_salutation
is an output of return_greeting_node
and an input of join_statements_node
. Thus, inputs
and outputs
of node()
can define the dependencies of the pipeline.
Pipeline
A pipeline is an execution pipeline. A pipeline is constructed by passing a list of Nodes to Pipeline()
. The order of the list to be passed to the pipeline is arbitrary.
In the following pipeline, return_greeting_node
is executed first, followed by join_statements_node
.
from kedro.pipeline import Pipeline
# Assemble nodes into a pipeline
pipeline = Pipeline([return_greeting_node, join_statements_node])
DataCatalog
DataCatalog is a catalog that defines the data to be used in the pipeline; DataCatalog takes Node inputs
and outputs
as keys and reads and writes them into a Dataset
. The Dataset
supports various file formats and file systems. Please refer to the official documentation below for available Datasets
.
The following code uses MemoryDataSet
to store my_salutation
in memory.
from kedro.io import DataCatalog, MemoryDataSet
# Prepare a data catalog
data_catalog = DataCatalog({"my_salutation": MemoryDataSet()})
Runner
There are three types of runners: SequentialRunner
for executing pipelines in series, ParallelRunner
for executing pipelines in parallel, and ThreadRunner
for executing pipelines in threads.
# Create a runner to run the pipeline
runner = SequentialRunner()
# Run the pipeline
print(runner.run(pipeline, data_catalog))
Hello Kedro
The full code for the above components is shown below.
from kedro.io import DataCatalog, MemoryDataSet
from kedro.pipeline import node, Pipeline
from kedro.runner import SequentialRunner
# Prepare a data catalog
data_catalog = DataCatalog({"my_salutation": MemoryDataSet()})
# Prepare first node
def return_greeting():
return "Hello"
return_greeting_node = node(return_greeting, inputs=None, outputs="my_salutation")
# Prepare second node
def join_statements(greeting):
return f"{greeting} Kedro!"
join_statements_node = node(
join_statements, inputs="my_salutation", outputs="my_message"
)
# Assemble nodes into a pipeline
pipeline = Pipeline([return_greeting_node, join_statements_node])
# Create a runner to run the pipeline
runner = SequentialRunner()
# Run the pipeline
print(runner.run(pipeline, data_catalog))
When the code is executed, the following processes are performed.
- Execute
return_greeting_node
(which executesreturn_greeting
and outputs the string "Hello") - Store the output string to
MemoryDataSet
namedmy_salutation
- Run
join_statements_node
(themy_salutation
data set is loaded and injected into thejoin_statements
function) - Concatenate with "Kedro!" to produce the output string "Hello Kedro!
- Return the output of the pipeline in a dictionary with
my_message
as key
$ python hello_kedro.py
{'my_message': 'Hello Kedro!'}
How to use Kedro
Here is how to use Kedro.
Installation
Install Kedro with the following command.
$ pip install kedro
Verify that kedro has been installed.
$ kedro info
| | _____ __| |_ __ ___
| |/ / _ \/ _` | '__/ _ \
| < __/ (_| | | | (_) |
|_|\_\___|\__,_|_| \___/
v0.18.4
Kedro is a Python framework for
creating reproducible, maintainable
and modular data science code.
No plugins installed
Project creation
Kedro has a function to create a project from a template. Basically, you can use this template function to create a project.
kedro new
Create a project with the kedro new
command. At this point, you will be asked a series of interactive questions, which you will answer.
$ kedro new
Project Name
============
Please enter a human readable name for your new project.
Spaces, hyphens, and underscores are allowed.
[New Kedro Project]: get_started
The project name 'get_started' has been applied to:
- The project title in /Users/ryu/get_started/README.md
- The folder created for your project in /Users/ryu/get_started
- The project's python package in /Users/ryu/demo/src/get_started
A best-practice setup includes initialising git and creating a virtual environment before running 'pip install -r src/requirements.txt' to install project-specific dependencies. Refer to the Kedro documentation: https://kedro.readthedocs.io/
The following directories are created.
get-started
├── README.md
├── conf
│ ├── README.md
│ ├── base
│ │ ├── catalog.yml
│ │ ├── logging.yml
│ │ └── parameters.yml
│ └── local
│ └── credentials.yml
├── data
│ ├── 01_raw
│ ├── 02_intermediate
│ ├── 03_primary
│ ├── 04_feature
│ ├── 05_model_input
│ ├── 06_models
│ ├── 07_model_output
│ └── 08_reporting
├── docs
│ └── source
│ ├── conf.py
│ └── index.rst
├── logs
├── notebooks
├── pyproject.toml
├── setup.cfg
└── src
├── get_started
│ ├── __init__.py
│ ├── __main__.py
│ ├── pipeline_registry.py
│ ├── pipelines
│ │ └── __init__.py
│ └── settings.py
├── requirements.txt
├── setup.py
└── tests
├── __init__.py
├── pipelines
│ └── __init__.py
└── test_run.py
The following is a description of the directory.
File name (folder name) | Description |
---|---|
conf | Project configuration files |
data | Local project data (not committed to version control) |
docs | Project documentation |
logs | Project output logs (not committed to version control) |
notebooks | Project-related Jupyter notebooks (can be used for experimental code before moving the code to src) |
pyproject.toml | Identifies the project root and contains configuration information |
setup.cfg | Configuration options for pytest when doing kedro test and for the isort utility when doing kedro lint |
src | Project source code |
The following command will install the package.
# Move to project directory
$ cd get_started
# Install dependencies
$ pip install -r src/requirements.txt
You can run the pipeline with the kedro run
command.
$ kedro run
However, in the current state, the above command will result in an error because the pipeline logic has not been scripted.
kedro new --config
You can create a project from a previously prepared Yaml file by adding the --config
flag when running kedro new
.
output_dir: ~/code
project_name: My First Kedro Project
repo_name: testing-kedro
python_package: test_kedro
$ kedro new --config <path>/config.yml
kedro new --starter=pandas-iris
You can create a template project for the iris
dataset by specifying pandas-iris
as the starter
.
$ kedro new --starter=pandas-iris
Project Name
============
Please enter a human readable name for your new project.
Spaces, hyphens, and underscores are allowed.
[Iris]:
The project name 'Iris' has been applied to:
- The project title in /Users/ryu/iris/README.md
- The folder created for your project in /Users/ryu/iris
- The project's python package in /Users/ryu/iris/src/iris
A best-practice setup includes initialising git and creating a virtual environment before running 'pip install -r src/requirements.txt' to install project-specific dependencies. Refer to the Kedro documentation: https://kedro.readthedocs.io/
iris
├── README.md
├── conf
│ ├── README.md
│ ├── base
│ │ ├── catalog.yml
│ │ ├── logging.yml
│ │ └── parameters.yml
│ └── local
│ └── credentials.yml
├── data
│ ├── 01_raw
│ │ └── iris.csv
│ ├── 02_intermediate
│ ├── 03_primary
│ ├── 04_feature
│ ├── 05_model_input
│ ├── 06_models
│ ├── 07_model_output
│ └── 08_reporting
├── docs
│ └── source
│ ├── conf.py
│ └── index.rst
├── logs
├── notebooks
├── pyproject.toml
├── setup.cfg
└── src
├── iris
│ ├── README.md
│ ├── __init__.py
│ ├── __main__.py
│ ├── nodes.py
│ ├── pipeline.py
│ ├── pipeline_registry.py
│ └── settings.py
├── requirements.txt
├── setup.py
└── tests
├── __init__.py
├── test_pipeline.py
└── test_run.py
Install the libraries.
$ cd iris
$ pip install -r src/requirements.txt
Execute the pipeline.
$ kedro run
You have opted out of product usage analytics, so none will be collected.
[01/14/23 11:53:19] INFO Kedro project iris session.py:340
[01/14/23 11:53:20] INFO Loading data from 'example_iris_data' (CSVDataSet)... data_catalog.py:343
INFO Loading data from 'parameters' (MemoryDataSet)... data_catalog.py:343
INFO Running node: split: split_data([example_iris_data,parameters]) -> node.py:327
[X_train,X_test,y_train,y_test]
INFO Saving data to 'X_train' (MemoryDataSet)... data_catalog.py:382
INFO Saving data to 'X_test' (MemoryDataSet)... data_catalog.py:382
INFO Saving data to 'y_train' (MemoryDataSet)... data_catalog.py:382
INFO Saving data to 'y_test' (MemoryDataSet)... data_catalog.py:382
INFO Completed 1 out of 3 tasks sequential_runner.py:85
INFO Loading data from 'X_train' (MemoryDataSet)... data_catalog.py:343
INFO Loading data from 'X_test' (MemoryDataSet)... data_catalog.py:343
INFO Loading data from 'y_train' (MemoryDataSet)... data_catalog.py:343
INFO Running node: make_predictions: make_predictions([X_train,X_test,y_train]) -> [y_pred] node.py:327
INFO Saving data to 'y_pred' (MemoryDataSet)... data_catalog.py:382
INFO Completed 2 out of 3 tasks sequential_runner.py:85
INFO Loading data from 'y_pred' (MemoryDataSet)... data_catalog.py:343
INFO Loading data from 'y_test' (MemoryDataSet)... data_catalog.py:343
INFO Running node: report_accuracy: report_accuracy([y_pred,y_test]) -> None node.py:327
INFO Model has accuracy of 0.933 on test data. nodes.py:74
INFO Completed 3 out of 3 tasks sequential_runner.py:85
INFO Pipeline execution completed successfully.
A log of the processing performed is output to logs/info.log
.
2023-01-14 11:53:19,849 - kedro.framework.session.session - INFO - Kedro project iris
2023-01-14 11:53:20,796 - kedro.io.data_catalog - INFO - Loading data from 'example_iris_data' (CSVDataSet)...
2023-01-14 11:53:20,805 - kedro.io.data_catalog - INFO - Loading data from 'parameters' (MemoryDataSet)...
2023-01-14 11:53:20,807 - kedro.pipeline.node - INFO - Running node: split: split_data([example_iris_data,parameters]) -> [X_train,X_test,y_train,y_test]
2023-01-14 11:53:20,811 - kedro.io.data_catalog - INFO - Saving data to 'X_train' (MemoryDataSet)...
2023-01-14 11:53:20,813 - kedro.io.data_catalog - INFO - Saving data to 'X_test' (MemoryDataSet)...
2023-01-14 11:53:20,814 - kedro.io.data_catalog - INFO - Saving data to 'y_train' (MemoryDataSet)...
2023-01-14 11:53:20,815 - kedro.io.data_catalog - INFO - Saving data to 'y_test' (MemoryDataSet)...
2023-01-14 11:53:20,817 - kedro.runner.sequential_runner - INFO - Completed 1 out of 3 tasks
2023-01-14 11:53:20,818 - kedro.io.data_catalog - INFO - Loading data from 'X_train' (MemoryDataSet)...
2023-01-14 11:53:20,820 - kedro.io.data_catalog - INFO - Loading data from 'X_test' (MemoryDataSet)...
2023-01-14 11:53:20,821 - kedro.io.data_catalog - INFO - Loading data from 'y_train' (MemoryDataSet)...
2023-01-14 11:53:20,822 - kedro.pipeline.node - INFO - Running node: make_predictions: make_predictions([X_train,X_test,y_train]) -> [y_pred]
2023-01-14 11:53:20,824 - kedro.io.data_catalog - INFO - Saving data to 'y_pred' (MemoryDataSet)...
2023-01-14 11:53:20,826 - kedro.runner.sequential_runner - INFO - Completed 2 out of 3 tasks
2023-01-14 11:53:20,827 - kedro.io.data_catalog - INFO - Loading data from 'y_pred' (MemoryDataSet)...
2023-01-14 11:53:20,829 - kedro.io.data_catalog - INFO - Loading data from 'y_test' (MemoryDataSet)...
2023-01-14 11:53:20,830 - kedro.pipeline.node - INFO - Running node: report_accuracy: report_accuracy([y_pred,y_test]) -> None
2023-01-14 11:53:20,832 - iris.nodes - INFO - Model has accuracy of 0.933 on test data.
2023-01-14 11:53:20,833 - kedro.runner.sequential_runner - INFO - Completed 3 out of 3 tasks
2023-01-14 11:53:20,835 - kedro.runner.sequential_runner - INFO - Pipeline execution completed successfully.
Other starters
In addition to pandas-iris
, Kedro offers the following Starter.
Starter | Description |
---|---|
astro-airflow-iris |
The Kedro Iris dataset example project with a minimal setup for deploying the pipeline on Airflow with Astronomer. |
standalone-datacatalog |
A minimum setup to use the traditional Iris dataset with Kedro’s DataCatalog, which is a core component of Kedro. This starter is of use in the exploratory phase of a project. For more information, read the guide to standalone use of the DataCatalog. This starter was formerly known as mini-kedro. |
pandas-iris |
The Kedro Iris dataset example project |
pyspark-iris |
An alternative Kedro Iris dataset example, using PySpark |
pyspark |
The configuration and initialisation code for a Kedro pipeline using PySpark |
spaceflights |
The spaceflights tutorial example code |
References