Kedro とは
Kedroとは、McKinsey & Companyの研究機関であるQuantum Black Labが開発するOSSのパイプライン管理ツールです。Kedroにより再現性、保守性、モジュール性に優れたデータサイエンスコードを作成することができます。
Kedro の特徴
Kedroは主に次の特徴があります。
- パイプラインの可視化
- パイプラインの DAG を可視化することができます。
- データカタログ
- 多くのファイル形式(Pandas、Spark、Dask、NetworkX、Pickle、Plotly、Matplotlibなど)やファイルシステム(S3、GCP、Azure、sFTP、DBFS、ローカル)間でデータの読み書きをするための軽量なデータコネクタが提供されています。
- 統合
- Apache Spark、Pandas、Dask、Matplotlib、Plotly、fsspec、Apache Airflow、Jupyter Notebook、Dockerなどとの連携が秀逸です。
- プロジェクトテンプレート
- 設定、ソースコード、テスト、ドキュメント、ノートブックなどの整理方法をプロジェクトテンプレートで標準化することができます。
- パイプラインの抽象化
- Kedroは関数間の依存関係を自動的に解決するデータセット駆動型のワークフローをサポートしており、パイプライン内のタスクの実行順序をラベル付けする必要がありません。
- デプロイメントの柔軟性
- Argo、Prefect、Kubeflow、AWS Batch、AWS Sagemaker、Databricks、Daskなどにパイプラインをデプロイすることができます。
Kedro の構成要素
Kedroは次の4つのコンポーネントがあります。
- Node
- Pipeline
- DataCatalog
- Runner
Node
Nodeとは、パイプラインの各処理の単位です。node()
を使って処理、入力、出力を定義します。
次のコードでは、return_greeting
関数がreturn_greeting_node
というノードでラップされ、入力を持たず、出力をmy_salutation
と名付けています。
from kedro.pipeline import node
# Prepare first node
def return_greeting():
return "Hello"
return_greeting_node = node(func=return_greeting,
inputs=None,
outputs="my_salutation")
次のコードでは、join_statements
関数がjoin_statements_node
というノードでラップされ、my_salutation
を入力とし、出力をmy_message
と名付けています。
# Prepare second node
def join_statements(greeting):
return f"{greeting} Kedro!"
join_statements_node = node(
join_statements,
inputs="my_salutation",
outputs="my_message"
)
my_salutation
はreturn_greeting_node
の出力であり、join_statements_node
の入力でもあるということになります。このようにnode()
のinputs
とoutputs
によりパイプラインの依存関係を定義することができます。
Pipeline
Pipelineとは、処理の実行パイプラインです。Pipeline()
にNodeのリストを渡すことでパイプラインが構築されます。この時、Pipelineに渡すリストの順序は任意で構いません。
次のパイプラインでは、return_greeting_node
が先に実行され,その後にjoin_statements_node
が実行されます。
from kedro.pipeline import Pipeline
# Assemble nodes into a pipeline
pipeline = Pipeline([return_greeting_node, join_statements_node])
DataCatalog
DataCatalogとは、パイプラインで使用するデータを定義するカタログです。DataCatalogはNodeのinputs
やoutputs
をキーとして受け取り、Dataset
に読み書きします。Dataset
は様々なファイル形式、ファイルシステムに対応しています。利用可能なDataset
は次の公式ドキュメントをご参照ください。
次のコードでは、MemoryDataSet
を使ってメモリにmy_salutation
を保存しています。
from kedro.io import DataCatalog, MemoryDataSet
# Prepare a data catalog
data_catalog = DataCatalog({"my_salutation": MemoryDataSet()})
Runner
Runnerとは、パイプラインを実行するものです。Runnerにはパイプラインを直列に実行するSequentialRunner
、並列に実行するParallelRunner
、スレッド実行するThreadRunner
の3種類があります。
# Create a runner to run the pipeline
runner = SequentialRunner()
# Run the pipeline
print(runner.run(pipeline, data_catalog))
Hello Kedro
上記のコンポーネントをまとめたコードは以下になります。
from kedro.io import DataCatalog, MemoryDataSet
from kedro.pipeline import node, Pipeline
from kedro.runner import SequentialRunner
# Prepare a data catalog
data_catalog = DataCatalog({"my_salutation": MemoryDataSet()})
# Prepare first node
def return_greeting():
return "Hello"
return_greeting_node = node(return_greeting, inputs=None, outputs="my_salutation")
# Prepare second node
def join_statements(greeting):
return f"{greeting} Kedro!"
join_statements_node = node(
join_statements, inputs="my_salutation", outputs="my_message"
)
# Assemble nodes into a pipeline
pipeline = Pipeline([return_greeting_node, join_statements_node])
# Create a runner to run the pipeline
runner = SequentialRunner()
# Run the pipeline
print(runner.run(pipeline, data_catalog))
コードを実行すると次の処理が実行されます。
return_greeting_node
を実行する(return_greeting
を実行し、文字列 "Hello" を出力する)- 出力された文字列は
my_salutation
という名前のMemoryDataSet
に格納される join_statements_node
を実行する(my_salutation
データセットがロードされ、join_statements
関数にインジェクトされる)- "Kedro!" と結合して、出力文字列 "Hello Kedro!" を生成する
- パイプラインの出力を
my_message
をキーとするディクショナリで返す
$ python hello_kedro.py
{'my_message': 'Hello Kedro!'}
Kedro の使い方
Kedroの使い方を紹介します。
インストール
次のコマンドでKedroをインストールします。
$ pip install kedro
kedroがインストールされたことを確認します。
$ kedro info
| | _____ __| |_ __ ___
| |/ / _ \/ _` | '__/ _ \
| < __/ (_| | | | (_) |
|_|\_\___|\__,_|_| \___/
v0.18.4
Kedro is a Python framework for
creating reproducible, maintainable
and modular data science code.
No plugins installed
プロジェクト作成
Kedroにはプロジェクトをテンプレートから作成する機能があります。基本的にはこのテンプレート機能を使ってプロジェクトを作成することになります。
kedro new
kedro new
コマンドでプロジェクトを作成します。この時、インタラクティブに質問されるので答えていきます。
$ kedro new
Project Name
============
Please enter a human readable name for your new project.
Spaces, hyphens, and underscores are allowed.
[New Kedro Project]: get_started
The project name 'get_started' has been applied to:
- The project title in /Users/ryu/get_started/README.md
- The folder created for your project in /Users/ryu/get_started
- The project's python package in /Users/ryu/demo/src/get_started
A best-practice setup includes initialising git and creating a virtual environment before running 'pip install -r src/requirements.txt' to install project-specific dependencies. Refer to the Kedro documentation: https://kedro.readthedocs.io/
次のようなディレクトリが作成されます。
get-started
├── README.md
├── conf
│ ├── README.md
│ ├── base
│ │ ├── catalog.yml
│ │ ├── logging.yml
│ │ └── parameters.yml
│ └── local
│ └── credentials.yml
├── data
│ ├── 01_raw
│ ├── 02_intermediate
│ ├── 03_primary
│ ├── 04_feature
│ ├── 05_model_input
│ ├── 06_models
│ ├── 07_model_output
│ └── 08_reporting
├── docs
│ └── source
│ ├── conf.py
│ └── index.rst
├── logs
├── notebooks
├── pyproject.toml
├── setup.cfg
└── src
├── get_started
│ ├── __init__.py
│ ├── __main__.py
│ ├── pipeline_registry.py
│ ├── pipelines
│ │ └── __init__.py
│ └── settings.py
├── requirements.txt
├── setup.py
└── tests
├── __init__.py
├── pipelines
│ └── __init__.py
└── test_run.py
ディレクトリについての説明は次のとおりです。
ファイル名(フォルダ名) | 説明 |
---|---|
conf | Project configuration files |
data | Local project data (not committed to version control) |
docs | Project documentation |
logs | Project output logs (not committed to version control) |
notebooks | Project-related Jupyter notebooks (can be used for experimental code before moving the code to src) |
pyproject.toml | Identifies the project root and contains configuration information |
setup.cfg | Configuration options for pytest when doing kedro test and for the isort utility when doing kedro lint |
src | Project source code |
次のコマンドでパッケージのインストールを行います。
# Move to project directory
$ cd get_started
# Install dependencies
$ pip install -r src/requirements.txt
kedro run
コマンドでパイプラインの実行をすることができます。
$ kedro run
ただし、今の状態だとプロジェクトを作成しただけでパイプラインのロジックを記述していないので上記コマンドはエラーになります。
kedro new --config
kedro new
の実行時に--config
フラグをつけることで事前に用意したYamlファイルからプロジェクトを作成することができます。
output_dir: ~/code
project_name: My First Kedro Project
repo_name: testing-kedro
python_package: test_kedro
$ kedro new --config <path>/config.yml
kedro new --starter=pandas-iris
starter
にpandas-iris
を指定することでiris
データセットのテンプレートプロジェクトを作成することができます。
$ kedro new --starter=pandas-iris
Project Name
============
Please enter a human readable name for your new project.
Spaces, hyphens, and underscores are allowed.
[Iris]:
The project name 'Iris' has been applied to:
- The project title in /Users/ryu/iris/README.md
- The folder created for your project in /Users/ryu/iris
- The project's python package in /Users/ryu/iris/src/iris
A best-practice setup includes initialising git and creating a virtual environment before running 'pip install -r src/requirements.txt' to install project-specific dependencies. Refer to the Kedro documentation: https://kedro.readthedocs.io/
iris
├── README.md
├── conf
│ ├── README.md
│ ├── base
│ │ ├── catalog.yml
│ │ ├── logging.yml
│ │ └── parameters.yml
│ └── local
│ └── credentials.yml
├── data
│ ├── 01_raw
│ │ └── iris.csv
│ ├── 02_intermediate
│ ├── 03_primary
│ ├── 04_feature
│ ├── 05_model_input
│ ├── 06_models
│ ├── 07_model_output
│ └── 08_reporting
├── docs
│ └── source
│ ├── conf.py
│ └── index.rst
├── logs
├── notebooks
├── pyproject.toml
├── setup.cfg
└── src
├── iris
│ ├── README.md
│ ├── __init__.py
│ ├── __main__.py
│ ├── nodes.py
│ ├── pipeline.py
│ ├── pipeline_registry.py
│ └── settings.py
├── requirements.txt
├── setup.py
└── tests
├── __init__.py
├── test_pipeline.py
└── test_run.py
ライブラリをインストールします。
$ cd iris
$ pip install -r src/requirements.txt
パイプラインを実行します。
$ kedro run
You have opted out of product usage analytics, so none will be collected.
[01/14/23 11:53:19] INFO Kedro project iris session.py:340
[01/14/23 11:53:20] INFO Loading data from 'example_iris_data' (CSVDataSet)... data_catalog.py:343
INFO Loading data from 'parameters' (MemoryDataSet)... data_catalog.py:343
INFO Running node: split: split_data([example_iris_data,parameters]) -> node.py:327
[X_train,X_test,y_train,y_test]
INFO Saving data to 'X_train' (MemoryDataSet)... data_catalog.py:382
INFO Saving data to 'X_test' (MemoryDataSet)... data_catalog.py:382
INFO Saving data to 'y_train' (MemoryDataSet)... data_catalog.py:382
INFO Saving data to 'y_test' (MemoryDataSet)... data_catalog.py:382
INFO Completed 1 out of 3 tasks sequential_runner.py:85
INFO Loading data from 'X_train' (MemoryDataSet)... data_catalog.py:343
INFO Loading data from 'X_test' (MemoryDataSet)... data_catalog.py:343
INFO Loading data from 'y_train' (MemoryDataSet)... data_catalog.py:343
INFO Running node: make_predictions: make_predictions([X_train,X_test,y_train]) -> [y_pred] node.py:327
INFO Saving data to 'y_pred' (MemoryDataSet)... data_catalog.py:382
INFO Completed 2 out of 3 tasks sequential_runner.py:85
INFO Loading data from 'y_pred' (MemoryDataSet)... data_catalog.py:343
INFO Loading data from 'y_test' (MemoryDataSet)... data_catalog.py:343
INFO Running node: report_accuracy: report_accuracy([y_pred,y_test]) -> None node.py:327
INFO Model has accuracy of 0.933 on test data. nodes.py:74
INFO Completed 3 out of 3 tasks sequential_runner.py:85
INFO Pipeline execution completed successfully.
行われた処理のログはlogs/info.log
に出力されます。
2023-01-14 11:53:19,849 - kedro.framework.session.session - INFO - Kedro project iris
2023-01-14 11:53:20,796 - kedro.io.data_catalog - INFO - Loading data from 'example_iris_data' (CSVDataSet)...
2023-01-14 11:53:20,805 - kedro.io.data_catalog - INFO - Loading data from 'parameters' (MemoryDataSet)...
2023-01-14 11:53:20,807 - kedro.pipeline.node - INFO - Running node: split: split_data([example_iris_data,parameters]) -> [X_train,X_test,y_train,y_test]
2023-01-14 11:53:20,811 - kedro.io.data_catalog - INFO - Saving data to 'X_train' (MemoryDataSet)...
2023-01-14 11:53:20,813 - kedro.io.data_catalog - INFO - Saving data to 'X_test' (MemoryDataSet)...
2023-01-14 11:53:20,814 - kedro.io.data_catalog - INFO - Saving data to 'y_train' (MemoryDataSet)...
2023-01-14 11:53:20,815 - kedro.io.data_catalog - INFO - Saving data to 'y_test' (MemoryDataSet)...
2023-01-14 11:53:20,817 - kedro.runner.sequential_runner - INFO - Completed 1 out of 3 tasks
2023-01-14 11:53:20,818 - kedro.io.data_catalog - INFO - Loading data from 'X_train' (MemoryDataSet)...
2023-01-14 11:53:20,820 - kedro.io.data_catalog - INFO - Loading data from 'X_test' (MemoryDataSet)...
2023-01-14 11:53:20,821 - kedro.io.data_catalog - INFO - Loading data from 'y_train' (MemoryDataSet)...
2023-01-14 11:53:20,822 - kedro.pipeline.node - INFO - Running node: make_predictions: make_predictions([X_train,X_test,y_train]) -> [y_pred]
2023-01-14 11:53:20,824 - kedro.io.data_catalog - INFO - Saving data to 'y_pred' (MemoryDataSet)...
2023-01-14 11:53:20,826 - kedro.runner.sequential_runner - INFO - Completed 2 out of 3 tasks
2023-01-14 11:53:20,827 - kedro.io.data_catalog - INFO - Loading data from 'y_pred' (MemoryDataSet)...
2023-01-14 11:53:20,829 - kedro.io.data_catalog - INFO - Loading data from 'y_test' (MemoryDataSet)...
2023-01-14 11:53:20,830 - kedro.pipeline.node - INFO - Running node: report_accuracy: report_accuracy([y_pred,y_test]) -> None
2023-01-14 11:53:20,832 - iris.nodes - INFO - Model has accuracy of 0.933 on test data.
2023-01-14 11:53:20,833 - kedro.runner.sequential_runner - INFO - Completed 3 out of 3 tasks
2023-01-14 11:53:20,835 - kedro.runner.sequential_runner - INFO - Pipeline execution completed successfully.
その他の Starter
pandas-iris
の他にKedroでは次の Starter を提供しています。
Starter | 説明 |
---|---|
astro-airflow-iris |
The Kedro Iris dataset example project with a minimal setup for deploying the pipeline on Airflow with Astronomer. |
standalone-datacatalog |
A minimum setup to use the traditional Iris dataset with Kedro’s DataCatalog, which is a core component of Kedro. This starter is of use in the exploratory phase of a project. For more information, read the guide to standalone use of the DataCatalog. This starter was formerly known as mini-kedro. |
pandas-iris |
The Kedro Iris dataset example project |
pyspark-iris |
An alternative Kedro Iris dataset example, using PySpark |
pyspark |
The configuration and initialisation code for a Kedro pipeline using PySpark |
spaceflights |
The spaceflights tutorial example code |
参考