Traffine I/O

日本語

2023-01-14

Kedro

Kedro とは

Kedroとは、McKinsey & Companyの研究機関であるQuantum Black Labが開発するOSSのパイプライン管理ツールです。Kedroにより再現性、保守性、モジュール性に優れたデータサイエンスコードを作成することができます。

Kedro の特徴

Kedroは主に次の特徴があります。

  • パイプラインの可視化
    • パイプラインの DAG を可視化することができます。
  • データカタログ
    • 多くのファイル形式(Pandas、Spark、Dask、NetworkX、Pickle、Plotly、Matplotlibなど)やファイルシステム(S3、GCP、Azure、sFTP、DBFS、ローカル)間でデータの読み書きをするための軽量なデータコネクタが提供されています。
  • 統合
    • Apache Spark、Pandas、Dask、Matplotlib、Plotly、fsspec、Apache Airflow、Jupyter Notebook、Dockerなどとの連携が秀逸です。
  • プロジェクトテンプレート
    • 設定、ソースコード、テスト、ドキュメント、ノートブックなどの整理方法をプロジェクトテンプレートで標準化することができます。
  • パイプラインの抽象化
    • Kedroは関数間の依存関係を自動的に解決するデータセット駆動型のワークフローをサポートしており、パイプライン内のタスクの実行順序をラベル付けする必要がありません。
  • デプロイメントの柔軟性
    • Argo、Prefect、Kubeflow、AWS Batch、AWS Sagemaker、Databricks、Daskなどにパイプラインをデプロイすることができます。

Kedro の構成要素

Kedroは次の4つのコンポーネントがあります。

  • Node
  • Pipeline
  • DataCatalog
  • Runner

Node

Nodeとは、パイプラインの各処理の単位です。node()を使って処理、入力、出力を定義します。

次のコードでは、return_greeting関数がreturn_greeting_nodeというノードでラップされ、入力を持たず、出力をmy_salutationと名付けています。

from kedro.pipeline import node

# Prepare first node
def return_greeting():
    return "Hello"

return_greeting_node = node(func=return_greeting,
                            inputs=None,
                            outputs="my_salutation")

次のコードでは、join_statements関数がjoin_statements_nodeというノードでラップされ、my_salutationを入力とし、出力をmy_messageと名付けています。

# Prepare second node
def join_statements(greeting):
    return f"{greeting} Kedro!"

join_statements_node = node(
    join_statements,
    inputs="my_salutation",
    outputs="my_message"
)

my_salutationreturn_greeting_nodeの出力であり、join_statements_nodeの入力でもあるということになります。このようにnode()inputsoutputsによりパイプラインの依存関係を定義することができます。

Pipeline

Pipelineとは、処理の実行パイプラインです。Pipeline()にNodeのリストを渡すことでパイプラインが構築されます。この時、Pipelineに渡すリストの順序は任意で構いません。

次のパイプラインでは、return_greeting_nodeが先に実行され,その後にjoin_statements_nodeが実行されます。

from kedro.pipeline import Pipeline

# Assemble nodes into a pipeline
pipeline = Pipeline([return_greeting_node, join_statements_node])

DataCatalog

DataCatalogとは、パイプラインで使用するデータを定義するカタログです。DataCatalogはNodeのinputsoutputsをキーとして受け取り、Datasetに読み書きします。Datasetは様々なファイル形式、ファイルシステムに対応しています。利用可能なDatasetは次の公式ドキュメントをご参照ください。

https://kedro.readthedocs.io/en/0.17.6/kedro.extras.datasets.html

次のコードでは、MemoryDataSetを使ってメモリにmy_salutationを保存しています。

from kedro.io import DataCatalog, MemoryDataSet

# Prepare a data catalog
data_catalog = DataCatalog({"my_salutation": MemoryDataSet()})

Runner

Runnerとは、パイプラインを実行するものです。Runnerにはパイプラインを直列に実行するSequentialRunner、並列に実行するParallelRunner、スレッド実行するThreadRunnerの3種類があります。

# Create a runner to run the pipeline
runner = SequentialRunner()

# Run the pipeline
print(runner.run(pipeline, data_catalog))

Hello Kedro

上記のコンポーネントをまとめたコードは以下になります。

hello_kedro.py
from kedro.io import DataCatalog, MemoryDataSet
from kedro.pipeline import node, Pipeline
from kedro.runner import SequentialRunner

# Prepare a data catalog
data_catalog = DataCatalog({"my_salutation": MemoryDataSet()})

# Prepare first node
def return_greeting():
    return "Hello"


return_greeting_node = node(return_greeting, inputs=None, outputs="my_salutation")

# Prepare second node
def join_statements(greeting):
    return f"{greeting} Kedro!"


join_statements_node = node(
    join_statements, inputs="my_salutation", outputs="my_message"
)

# Assemble nodes into a pipeline
pipeline = Pipeline([return_greeting_node, join_statements_node])

# Create a runner to run the pipeline
runner = SequentialRunner()

# Run the pipeline
print(runner.run(pipeline, data_catalog))

コードを実行すると次の処理が実行されます。

  1. return_greeting_nodeを実行する(return_greetingを実行し、文字列 "Hello" を出力する)
  2. 出力された文字列はmy_salutationという名前のMemoryDataSetに格納される
  3. join_statements_nodeを実行する(my_salutationデータセットがロードされ、join_statements関数にインジェクトされる)
  4. "Kedro!" と結合して、出力文字列 "Hello Kedro!" を生成する
  5. パイプラインの出力をmy_messageをキーとするディクショナリで返す
$ python hello_kedro.py

{'my_message': 'Hello Kedro!'}

Kedro の使い方

Kedroの使い方を紹介します。

インストール

次のコマンドでKedroをインストールします。

$ pip install kedro

kedroがインストールされたことを確認します。

$ kedro info

| | _____  __| |_ __ ___
| |/ / _ \/ _` | '__/ _ \
|   <  __/ (_| | | | (_) |
|_|\_\___|\__,_|_|  \___/
v0.18.4

Kedro is a Python framework for
creating reproducible, maintainable
and modular data science code.

No plugins installed

プロジェクト作成

Kedroにはプロジェクトをテンプレートから作成する機能があります。基本的にはこのテンプレート機能を使ってプロジェクトを作成することになります。

kedro new

kedro newコマンドでプロジェクトを作成します。この時、インタラクティブに質問されるので答えていきます。

$ kedro new

Project Name
============
Please enter a human readable name for your new project.
Spaces, hyphens, and underscores are allowed.
 [New Kedro Project]: get_started

The project name 'get_started' has been applied to:
- The project title in /Users/ryu/get_started/README.md
- The folder created for your project in /Users/ryu/get_started
- The project's python package in /Users/ryu/demo/src/get_started

A best-practice setup includes initialising git and creating a virtual environment before running 'pip install -r src/requirements.txt' to install project-specific dependencies. Refer to the Kedro documentation: https://kedro.readthedocs.io/

次のようなディレクトリが作成されます。

get-started
├── README.md
├── conf
│   ├── README.md
│   ├── base
│   │   ├── catalog.yml
│   │   ├── logging.yml
│   │   └── parameters.yml
│   └── local
│       └── credentials.yml
├── data
│   ├── 01_raw
│   ├── 02_intermediate
│   ├── 03_primary
│   ├── 04_feature
│   ├── 05_model_input
│   ├── 06_models
│   ├── 07_model_output
│   └── 08_reporting
├── docs
│   └── source
│       ├── conf.py
│       └── index.rst
├── logs
├── notebooks
├── pyproject.toml
├── setup.cfg
└── src
    ├── get_started
    │   ├── __init__.py
    │   ├── __main__.py
    │   ├── pipeline_registry.py
    │   ├── pipelines
    │   │   └── __init__.py
    │   └── settings.py
    ├── requirements.txt
    ├── setup.py
    └── tests
        ├── __init__.py
        ├── pipelines
        │   └── __init__.py
        └── test_run.py

ディレクトリについての説明は次のとおりです。

ファイル名(フォルダ名) 説明
conf Project configuration files
data Local project data (not committed to version control)
docs Project documentation
logs Project output logs (not committed to version control)
notebooks Project-related Jupyter notebooks (can be used for experimental code before moving the code to src)
pyproject.toml Identifies the project root and contains configuration information
setup.cfg Configuration options for pytest when doing kedro test and for the isort utility when doing kedro lint
src Project source code

次のコマンドでパッケージのインストールを行います。

# Move to project directory
$ cd get_started

# Install dependencies
$ pip install -r src/requirements.txt

kedro runコマンドでパイプラインの実行をすることができます。

$ kedro run

ただし、今の状態だとプロジェクトを作成しただけでパイプラインのロジックを記述していないので上記コマンドはエラーになります。

kedro new --config

kedro newの実行時に--configフラグをつけることで事前に用意したYamlファイルからプロジェクトを作成することができます。

code/config.yaml
output_dir: ~/code
project_name: My First Kedro Project
repo_name: testing-kedro
python_package: test_kedro
$ kedro new --config <path>/config.yml

kedro new --starter=pandas-iris

starterpandas-irisを指定することでirisデータセットのテンプレートプロジェクトを作成することができます。

$ kedro new --starter=pandas-iris

Project Name
============
Please enter a human readable name for your new project.
Spaces, hyphens, and underscores are allowed.
 [Iris]:

The project name 'Iris' has been applied to:
- The project title in /Users/ryu/iris/README.md
- The folder created for your project in /Users/ryu/iris
- The project's python package in /Users/ryu/iris/src/iris

A best-practice setup includes initialising git and creating a virtual environment before running 'pip install -r src/requirements.txt' to install project-specific dependencies. Refer to the Kedro documentation: https://kedro.readthedocs.io/
iris
├── README.md
├── conf
│   ├── README.md
│   ├── base
│   │   ├── catalog.yml
│   │   ├── logging.yml
│   │   └── parameters.yml
│   └── local
│       └── credentials.yml
├── data
│   ├── 01_raw
│   │   └── iris.csv
│   ├── 02_intermediate
│   ├── 03_primary
│   ├── 04_feature
│   ├── 05_model_input
│   ├── 06_models
│   ├── 07_model_output
│   └── 08_reporting
├── docs
│   └── source
│       ├── conf.py
│       └── index.rst
├── logs
├── notebooks
├── pyproject.toml
├── setup.cfg
└── src
    ├── iris
    │   ├── README.md
    │   ├── __init__.py
    │   ├── __main__.py
    │   ├── nodes.py
    │   ├── pipeline.py
    │   ├── pipeline_registry.py
    │   └── settings.py
    ├── requirements.txt
    ├── setup.py
    └── tests
        ├── __init__.py
        ├── test_pipeline.py
        └── test_run.py

ライブラリをインストールします。

$ cd iris
$ pip install -r src/requirements.txt

パイプラインを実行します。

$ kedro run

You have opted out of product usage analytics, so none will be collected.
[01/14/23 11:53:19] INFO     Kedro project iris                                                                           session.py:340
[01/14/23 11:53:20] INFO     Loading data from 'example_iris_data' (CSVDataSet)...                                   data_catalog.py:343
                    INFO     Loading data from 'parameters' (MemoryDataSet)...                                       data_catalog.py:343
                    INFO     Running node: split: split_data([example_iris_data,parameters]) ->                              node.py:327
                             [X_train,X_test,y_train,y_test]
                    INFO     Saving data to 'X_train' (MemoryDataSet)...                                             data_catalog.py:382
                    INFO     Saving data to 'X_test' (MemoryDataSet)...                                              data_catalog.py:382
                    INFO     Saving data to 'y_train' (MemoryDataSet)...                                             data_catalog.py:382
                    INFO     Saving data to 'y_test' (MemoryDataSet)...                                              data_catalog.py:382
                    INFO     Completed 1 out of 3 tasks                                                          sequential_runner.py:85
                    INFO     Loading data from 'X_train' (MemoryDataSet)...                                          data_catalog.py:343
                    INFO     Loading data from 'X_test' (MemoryDataSet)...                                           data_catalog.py:343
                    INFO     Loading data from 'y_train' (MemoryDataSet)...                                          data_catalog.py:343
                    INFO     Running node: make_predictions: make_predictions([X_train,X_test,y_train]) -> [y_pred]          node.py:327
                    INFO     Saving data to 'y_pred' (MemoryDataSet)...                                              data_catalog.py:382
                    INFO     Completed 2 out of 3 tasks                                                          sequential_runner.py:85
                    INFO     Loading data from 'y_pred' (MemoryDataSet)...                                           data_catalog.py:343
                    INFO     Loading data from 'y_test' (MemoryDataSet)...                                           data_catalog.py:343
                    INFO     Running node: report_accuracy: report_accuracy([y_pred,y_test]) -> None                         node.py:327
                    INFO     Model has accuracy of 0.933 on test data.                                                       nodes.py:74
                    INFO     Completed 3 out of 3 tasks                                                          sequential_runner.py:85
                    INFO     Pipeline execution completed successfully.

行われた処理のログはlogs/info.logに出力されます。

logs/info.log
2023-01-14 11:53:19,849 - kedro.framework.session.session - INFO - Kedro project iris
2023-01-14 11:53:20,796 - kedro.io.data_catalog - INFO - Loading data from 'example_iris_data' (CSVDataSet)...
2023-01-14 11:53:20,805 - kedro.io.data_catalog - INFO - Loading data from 'parameters' (MemoryDataSet)...
2023-01-14 11:53:20,807 - kedro.pipeline.node - INFO - Running node: split: split_data([example_iris_data,parameters]) -> [X_train,X_test,y_train,y_test]
2023-01-14 11:53:20,811 - kedro.io.data_catalog - INFO - Saving data to 'X_train' (MemoryDataSet)...
2023-01-14 11:53:20,813 - kedro.io.data_catalog - INFO - Saving data to 'X_test' (MemoryDataSet)...
2023-01-14 11:53:20,814 - kedro.io.data_catalog - INFO - Saving data to 'y_train' (MemoryDataSet)...
2023-01-14 11:53:20,815 - kedro.io.data_catalog - INFO - Saving data to 'y_test' (MemoryDataSet)...
2023-01-14 11:53:20,817 - kedro.runner.sequential_runner - INFO - Completed 1 out of 3 tasks
2023-01-14 11:53:20,818 - kedro.io.data_catalog - INFO - Loading data from 'X_train' (MemoryDataSet)...
2023-01-14 11:53:20,820 - kedro.io.data_catalog - INFO - Loading data from 'X_test' (MemoryDataSet)...
2023-01-14 11:53:20,821 - kedro.io.data_catalog - INFO - Loading data from 'y_train' (MemoryDataSet)...
2023-01-14 11:53:20,822 - kedro.pipeline.node - INFO - Running node: make_predictions: make_predictions([X_train,X_test,y_train]) -> [y_pred]
2023-01-14 11:53:20,824 - kedro.io.data_catalog - INFO - Saving data to 'y_pred' (MemoryDataSet)...
2023-01-14 11:53:20,826 - kedro.runner.sequential_runner - INFO - Completed 2 out of 3 tasks
2023-01-14 11:53:20,827 - kedro.io.data_catalog - INFO - Loading data from 'y_pred' (MemoryDataSet)...
2023-01-14 11:53:20,829 - kedro.io.data_catalog - INFO - Loading data from 'y_test' (MemoryDataSet)...
2023-01-14 11:53:20,830 - kedro.pipeline.node - INFO - Running node: report_accuracy: report_accuracy([y_pred,y_test]) -> None
2023-01-14 11:53:20,832 - iris.nodes - INFO - Model has accuracy of 0.933 on test data.
2023-01-14 11:53:20,833 - kedro.runner.sequential_runner - INFO - Completed 3 out of 3 tasks
2023-01-14 11:53:20,835 - kedro.runner.sequential_runner - INFO - Pipeline execution completed successfully.

その他の Starter

pandas-irisの他にKedroでは次の Starter を提供しています。

Starter 説明
astro-airflow-iris The Kedro Iris dataset example project with a minimal setup for deploying the pipeline on Airflow with Astronomer.
standalone-datacatalog A minimum setup to use the traditional Iris dataset with Kedro’s DataCatalog, which is a core component of Kedro. This starter is of use in the exploratory phase of a project. For more information, read the guide to standalone use of the DataCatalog. This starter was formerly known as mini-kedro.
pandas-iris The Kedro Iris dataset example project
pyspark-iris An alternative Kedro Iris dataset example, using PySpark
pyspark The configuration and initialisation code for a Kedro pipeline using PySpark
spaceflights The spaceflights tutorial example code

参考

https://kedro.org/
https://kedro.readthedocs.io/en/stable/get_started/kedro_concepts.html
https://kedro.readthedocs.io/en/0.17.6/02_get_started/03_hello_kedro.html
https://kedro.readthedocs.io/en/0.17.6/kedro.extras.datasets.html
https://kedro.readthedocs.io/en/stable/kedro_project_setup/starters.html
https://neptune.ai/blog/data-science-pipelines-with-kedro

Ryusei Kakujo

researchgatelinkedingithub

Focusing on data science for mobility

Bench Press 100kg!