Traffine I/O

日本語

2023-01-14

Kedroのチュートリアル

はじめに

この記事では、Kedro spaceflights tutorial(月への旅行とその帰りの便の料金を予測するモデルを構築するプロジェクト)に沿って Kedro のチュートリアルを行います。

Kedroを用いた開発フローは以下のようになります。

  1. プロジェクトテンプレートの作成
    • 新規プロジェクトを作成し、ライブラリをインストールする
    • クレデンシャルとログを設定する
  2. データフォルダの設定
    • dataフォルダにデータを格納する
    • プロジェクトの全てのデータセットを参照する
  3. パイプラインの作成
    • Nodeを構築する
    • Pipelineの実行方法を選択する
  4. プロジェクトのパッケージ化
    • プロジェクトのドキュメントを構築する
    • プロジェクトを配布するためにパッケージ化する

プロジェクトテンプレートの作成

まずは仮想環境を構築します。

$ python -m venv venv
$ . venv/bin/activate
$ pip install --upgrade pip

Kedroをインストールします。

$ pip install kedro

新規プロジェクトを作成します。

$ kedro new

Project Name
============
Please enter a human readable name for your new project.
Spaces, hyphens, and underscores are allowed.
 [New Kedro Project]: spaceflights

The project name 'spaceflights' has been applied to:
- The project title in /Users/ryu/spaceflights/README.md
- The folder created for your project in /Users/ryu/spaceflights
- The project's python package in /Users/ryu/demo/src/spaceflights

A best-practice setup includes initialising git and creating a virtual environment before running 'pip install -r src/requirements.txt' to install project-specific dependencies. Refer to the Kedro documentation: https://kedro.readthedocs.io/

作成されたプロジェクトのディレクトリに移動します。

$ cd spaceflights

src/requirements.txtに必要なライブラリを記述します。

src/requirements.txt
black~=22.0
flake8>=3.7.9, <5.0
ipython>=7.31.1, <8.0
isort~=5.0
jupyter~=1.0
jupyterlab_server>=2.11.1, <2.16.0
jupyterlab~=3.0
kedro~=0.18.4
kedro-datasets[pandas.CSVDataSet, pandas.ExcelDataSet, pandas.ParquetDataSet]~=1.0.0
kedro-telemetry~=0.2.0
kedro-viz~=5.0
nbstripout~=0.4
pytest-cov~=3.0
pytest-mock>=1.7.1, <2.0
pytest~=6.2
scikit-learn~=1.0

ライブラリをインストールします。

$ pip install -r src/requirements.txt

ユーザー名やパスワードなどの認証情報を保存する場合はconf/local/credentials.ymlに追加します。

conf/local/credentials.yml
dev_sql:
    username: admin
    password: admin

また、conf/base/logging.ymlを編集することでロギングの設定を変更することができます。デフォルトでは以下のようになっています。

conf/base/logging.yml
version: 1

disable_existing_loggers: False

formatters:
  simple:
    format: "%(asctime)s - %(name)s - %(levelname)s - %(message)s"

handlers:
  console:
    class: logging.StreamHandler
    level: INFO
    formatter: simple
    stream: ext://sys.stdout

  info_file_handler:
    class: logging.handlers.RotatingFileHandler
    level: INFO
    formatter: simple
    filename: logs/info.log
    maxBytes: 10485760 # 10MB
    backupCount: 20
    encoding: utf8
    delay: True

  error_file_handler:
    class: logging.handlers.RotatingFileHandler
    level: ERROR
    formatter: simple
    filename: logs/errors.log
    maxBytes: 10485760 # 10MB
    backupCount: 20
    encoding: utf8
    delay: True

  rich:
    class: rich.logging.RichHandler

loggers:
  kedro:
    level: INFO

  spaceflights:
    level: INFO

root:
  handlers: [rich, info_file_handler, error_file_handler]

データフォルダの設定

以下の3つのファイルをダウンロードし、data/01_rawに格納します。

https://kedro-org.github.io/kedro/companies.csv
https://kedro-org.github.io/kedro/reviews.csv
https://kedro-org.github.io/kedro/shuttles.xlsx

ダウンロードしたデータをconf/base/catalog.ymlに登録します。

conf/base/catalog.yml
companies:
  type: pandas.CSVDataSet
  filepath: data/01_raw/companies.csv

reviews:
  type: pandas.CSVDataSet
  filepath: data/01_raw/reviews.csv

shuttles:
  type: pandas.ExcelDataSet
  filepath: data/01_raw/shuttles.xlsx
  load_args:
    engine: openpyxl # Use modern Excel engine (the default since Kedro 0.18.0)

csvデータをロードすることができるかテストします。

$ kedro ipython

In [1]: companies = catalog.load("Companies")
[01/14/23 15:15:57] INFO     Loading data from 'companies' (CSVDataSet)...

In [2]: companies.head()
Out[2]:
      id company_rating       company_location  total_fleet_count iata_approved
0  35029           100%                   Niue                4.0             f
1  30292            67%               Anguilla                6.0             f
2  19032            67%     Russian Federation                4.0             f
3   8238            91%               Barbados               15.0             t
4  30342            NaN  Sao Tome and Principe                2.0             t

xlsxデータをロードすることができるかテストします。

In [3]: shuttles = catalog.load("shuttles")
[01/14/23 15:20:09] INFO     Loading data from 'shuttles' (ExcelDataSet)...                                          data_catalog.py:343

In [4]: shuttles.head()
Out[4]:
      id       shuttle_location shuttle_type engine_type  ... d_check_complete  moon_clearance_complete     price company_id
0  63561                   Niue      Type V5     Quantum  ...                f                        f  $1,325.0      35029
1  36260               Anguilla      Type V5     Quantum  ...                t                        f  $1,780.0      30292
2  57015     Russian Federation      Type V5     Quantum  ...                f                        f  $1,715.0      19032
3  14035               Barbados      Type V5      Plasma  ...                f                        f  $4,770.0       8238
4  10036  Sao Tome and Principe      Type V2      Plasma  ...                f                        f  $2,820.0      30342

ipythonのセッションを終了するにはexit()を実行します。

In [5]: exit()

パイプラインの作成

データ処理のパイプライン

データ処理のパイプラインを作成します。

$ kedro pipeline create data_processing

上記のコマンドを実行すると以下のファイルが生成されます。

  • src/spaceflights/pipelines/data_processing/nodes.py
  • src/spaceflights/pipelines/data_processing/pipelines.py
  • conf/base/parameters/data_processing.yml
  • src/tests/pipelines/data_processing
  • __init__.py

src/spaceflights/pipelines/data_processing/nodes.pyに以下のコードを記述します。

src/spaceflights/pipelines/data_processing/nodes.py
import pandas as pd


def _is_true(x: pd.Series) -> pd.Series:
    return x == "t"


def _parse_percentage(x: pd.Series) -> pd.Series:
    x = x.str.replace("%", "")
    x = x.astype(float) / 100
    return x


def _parse_money(x: pd.Series) -> pd.Series:
    x = x.str.replace("$", "").str.replace(",", "")
    x = x.astype(float)
    return x


def preprocess_companies(companies: pd.DataFrame) -> pd.DataFrame:
    """Preprocesses the data for companies.

    Args:
        companies: Raw data.
    Returns:
        Preprocessed data, with `company_rating` converted to a float and
        `iata_approved` converted to boolean.
    """
    companies["iata_approved"] = _is_true(companies["iata_approved"])
    companies["company_rating"] = _parse_percentage(companies["company_rating"])
    return companies


def preprocess_shuttles(shuttles: pd.DataFrame) -> pd.DataFrame:
    """Preprocesses the data for shuttles.

    Args:
        shuttles: Raw data.
    Returns:
        Preprocessed data, with `price` converted to a float and `d_check_complete`,
        `moon_clearance_complete` converted to boolean.
    """
    shuttles["d_check_complete"] = _is_true(shuttles["d_check_complete"])
    shuttles["moon_clearance_complete"] = _is_true(shuttles["moon_clearance_complete"])
    shuttles["price"] = _parse_money(shuttles["price"])
    return shuttles


def create_model_input_table(
    shuttles: pd.DataFrame, companies: pd.DataFrame, reviews: pd.DataFrame
) -> pd.DataFrame:
    """Combines all data to create a model input table.

    Args:
        shuttles: Preprocessed data for shuttles.
        companies: Preprocessed data for companies.
        reviews: Raw data for reviews.
    Returns:
        Model input table.

    """
    rated_shuttles = shuttles.merge(reviews, left_on="id", right_on="shuttle_id")
    model_input_table = rated_shuttles.merge(
        companies, left_on="company_id", right_on="id"
    )
    model_input_table = model_input_table.dropna()
    return model_input_table

src/spaceflights/pipelines/data_processing/pipelines.pyに以下のコードを記述します。

src/spaceflights/pipelines/data_processing/pipelines.py
from kedro.pipeline import Pipeline, node, pipeline

from .nodes import create_model_input_table, preprocess_companies, preprocess_shuttles


def create_pipeline(**kwargs) -> Pipeline:
    return pipeline(
        [
            node(
                func=preprocess_companies,
                inputs="companies",
                outputs="preprocessed_companies",
                name="preprocess_companies_node",
            ),
            node(
                func=preprocess_shuttles,
                inputs="shuttles",
                outputs="preprocessed_shuttles",
                name="preprocess_shuttles_node",
            ),
            node(
                func=create_model_input_table,
                inputs=["preprocessed_shuttles", "preprocessed_companies", "reviews"],
                outputs="model_input_table",
                name="create_model_input_table_node",
            ),
        ]
    )

上記コードは、各関数のノードを生成し、データ処理のためのパイプラインを作成しています。

以下のコードでpreprocess_companies_nodeノードのテストをします。

$ kedro run --node=preprocess_companies_node

[08/09/22 16:43:10] INFO     Kedro project kedro-tutorial                                         session.py:346
[08/09/22 16:43:11] INFO     Loading data from 'companies' (CSVDataSet)...                   data_catalog.py:343
                    INFO     Running node: preprocess_companies_node:                                node.py:327
                             preprocess_companies([companies]) -> [preprocessed_companies]
                    INFO     Saving data to 'preprocessed_companies' (MemoryDataSet)...      data_catalog.py:382
                    INFO     Completed 1 out of 1 tasks                                  sequential_runner.py:85
                    INFO     Pipeline execution completed successfully.                             runner.py:89
                    INFO     Loading data from 'preprocessed_companies' (MemoryDataSet)...   data_catalog.py:343

完全なデータ処理のパイプラインとして全てのノードをテストするにはkedro runを実行します。

$ kedro run

[08/09/22 17:00:54] INFO     Kedro project kedro-tutorial                                         session.py:346
[08/09/22 17:01:10] INFO     Reached after_catalog_created hook                                     plugin.py:17
                    INFO     Loading data from 'companies' (CSVDataSet)...                   data_catalog.py:343
                    INFO     Running node: preprocess_companies_node:                                node.py:327
                             preprocess_companies([companies]) -> [preprocessed_companies]
                    INFO     Saving data to 'preprocessed_companies' (MemoryDataSet)...      data_catalog.py:382
                    INFO     Completed 1 out of 3 tasks                                  sequential_runner.py:85
                    INFO     Loading data from 'shuttles' (ExcelDataSet)...                  data_catalog.py:343
[08/09/22 17:01:25] INFO     Running node: preprocess_shuttles_node: preprocess_shuttles([shuttles]) node.py:327
                             -> [preprocessed_shuttles]

                    INFO     Saving data to 'preprocessed_shuttles' (MemoryDataSet)...       data_catalog.py:382
                    INFO     Completed 2 out of 3 tasks                                  sequential_runner.py:85
                    INFO     Loading data from 'preprocessed_shuttles' (MemoryDataSet)...    data_catalog.py:343
                    INFO     Loading data from 'preprocessed_companies' (MemoryDataSet)...   data_catalog.py:343
                    INFO     Loading data from 'reviews' (CSVDataSet)...                     data_catalog.py:343
                    INFO     Running node: create_model_input_table_node:                            node.py:327
                             create_model_input_table([preprocessed_shuttles,preprocessed_companies,
                             reviews]) -> [model_input_table]
[08/09/22 17:01:28] INFO     Saving data to 'model_input_table' (MemoryDataSet)...           data_catalog.py:382
[08/09/22 17:01:29] INFO     Completed 3 out of 3 tasks                                  sequential_runner.py:85
                    INFO     Pipeline execution completed successfully.                             runner.py:89
                    INFO     Loading data from 'model_input_table' (MemoryDataSet)...        data_catalog.py:343

前処理データにより生成されるデータをファイルに保存したい場合はconf/base/catalog.ymlに以下のように追記します。

conf/base/catalog.yml
preprocessed_companies:
  type: pandas.ParquetDataSet
  filepath: data/02_intermediate/preprocessed_companies.pq

preprocessed_shuttles:
  type: pandas.ParquetDataSet
  filepath: data/02_intermediate/preprocessed_shuttles.pq

model_input_table:
  type: pandas.ParquetDataSet
  filepath: data/03_primary/model_input_table.pq

データサイエンスのパイプライン

データサイエンスのパイプラインを作成します。

$ kedro pipeline create data_science

src/spaceflights/pipelines/data_science/nodes.pyに以下のコードを記述します。データサイエンスコードにはscikit-learnのLinearRegressionが使われます。

src/spaceflights/pipelines/data_science/nodes.py
import logging
from typing import Dict, Tuple

import pandas as pd
from sklearn.linear_model import LinearRegression
from sklearn.metrics import r2_score
from sklearn.model_selection import train_test_split


def split_data(data: pd.DataFrame, parameters: Dict) -> Tuple:
    """Splits data into features and targets training and test sets.

    Args:
        data: Data containing features and target.
        parameters: Parameters defined in parameters/data_science.yml.
    Returns:
        Split data.
    """
    X = data[parameters["features"]]
    y = data["price"]
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=parameters["test_size"], random_state=parameters["random_state"]
    )
    return X_train, X_test, y_train, y_test


def train_model(X_train: pd.DataFrame, y_train: pd.Series) -> LinearRegression:
    """Trains the linear regression model.

    Args:
        X_train: Training data of independent features.
        y_train: Training data for price.

    Returns:
        Trained model.
    """
    regressor = LinearRegression()
    regressor.fit(X_train, y_train)
    return regressor


def evaluate_model(
    regressor: LinearRegression, X_test: pd.DataFrame, y_test: pd.Series
):
    """Calculates and logs the coefficient of determination.

    Args:
        regressor: Trained model.
        X_test: Testing data of independent features.
        y_test: Testing data for price.
    """
    y_pred = regressor.predict(X_test)
    score = r2_score(y_test, y_pred)
    logger = logging.getLogger(__name__)
    logger.info("Model has a coefficient R^2 of %.3f on test data.", score)

conf/base/parameters/data_science.ymlにパイプライン実行時に参照されるパラメータを設定します。

conf/base/parameters/data_science.yml
model_options:
  test_size: 0.2
  random_state: 3
  features:
    - engines
    - passenger_capacity
    - crew
    - d_check_complete
    - moon_clearance_complete
    - iata_approved
    - company_rating
    - review_scores_rating

src/spaceflights/pipelines/data_science/pipeline.pyに以下のコードを記述します。

src/spaceflights/pipelines/data_science/pipeline.py
from kedro.pipeline import Pipeline, node, pipeline

from .nodes import evaluate_model, split_data, train_model


def create_pipeline(**kwargs) -> Pipeline:
    return pipeline(
        [
            node(
                func=split_data,
                inputs=["model_input_table", "params:model_options"],
                outputs=["X_train", "X_test", "y_train", "y_test"],
                name="split_data_node",
            ),
            node(
                func=train_model,
                inputs=["X_train", "y_train"],
                outputs="regressor",
                name="train_model_node",
            ),
            node(
                func=evaluate_model,
                inputs=["regressor", "X_test", "y_test"],
                outputs=None,
                name="evaluate_model_node",
            ),
        ]
    )

conf/base/catalog.ymlにデータセットを追加します。

conf/base/catalog.yml
regressor:
  type: pickle.PickleDataSet
  filepath: data/06_models/regressor.pickle
  versioned: true

kedro runを実行してパイプラインをテストします。

$ kedro run

[08/09/22 16:56:00] INFO     Kedro project kedro-tutorial                                         session.py:346
                    INFO     Loading data from 'companies' (CSVDataSet)...                   data_catalog.py:343
                    INFO     Running node: preprocess_companies_node:                                node.py:327
                             preprocess_companies([companies]) -> [preprocessed_companies]
                    INFO     Saving data to 'preprocessed_companies' (MemoryDataSet)...      data_catalog.py:382
                    INFO     Completed 1 out of 6 tasks                                  sequential_runner.py:85
                    INFO     Loading data from 'shuttles' (ExcelDataSet)...                  data_catalog.py:343
[08/09/22 16:56:15] INFO     Running node: preprocess_shuttles_node: preprocess_shuttles([shuttles]) node.py:327
                             -> [preprocessed_shuttles]
                    INFO     Saving data to 'preprocessed_shuttles' (MemoryDataSet)...       data_catalog.py:382
                    INFO     Completed 2 out of 6 tasks                                  sequential_runner.py:85
                    INFO     Loading data from 'preprocessed_shuttles' (MemoryDataSet)...    data_catalog.py:343
                    INFO     Loading data from 'preprocessed_companies' (MemoryDataSet)...   data_catalog.py:343
                    INFO     Loading data from 'reviews' (CSVDataSet)...                     data_catalog.py:343
                    INFO     Running node: create_model_input_table_node:                            node.py:327
                             create_model_input_table([preprocessed_shuttles,preprocessed_companies,
                             reviews]) -> [model_input_table]
[08/09/22 16:56:18] INFO     Saving data to 'model_input_table' (MemoryDataSet)...           data_catalog.py:382
[08/09/22 16:56:19] INFO     Completed 3 out of 6 tasks                                  sequential_runner.py:85
                    INFO     Loading data from 'model_input_table' (MemoryDataSet)...        data_catalog.py:343
                    INFO     Loading data from 'params:model_options' (MemoryDataSet)...     data_catalog.py:343
                    INFO     Running node: split_data_node:                                          node.py:327
                             split_data([model_input_table,params:model_options]) ->
                             [X_train,X_test,y_train,y_test]
                    INFO     Saving data to 'X_train' (MemoryDataSet)...                     data_catalog.py:382
                    INFO     Saving data to 'X_test' (MemoryDataSet)...                      data_catalog.py:382
                    INFO     Saving data to 'y_train' (MemoryDataSet)...                     data_catalog.py:382
                    INFO     Saving data to 'y_test' (MemoryDataSet)...                      data_catalog.py:382
                    INFO     Completed 4 out of 6 tasks                                  sequential_runner.py:85
                    INFO     Loading data from 'X_train' (MemoryDataSet)...                  data_catalog.py:343
                    INFO     Loading data from 'y_train' (MemoryDataSet)...                  data_catalog.py:343
                    INFO     Running node: train_model_node: train_model([X_train,y_train]) ->       node.py:327
                             [regressor]
[08/09/22 16:56:20] INFO     Saving data to 'regressor' (PickleDataSet)...                   data_catalog.py:382
                    INFO     Completed 5 out of 6 tasks                                  sequential_runner.py:85
                    INFO     Loading data from 'regressor' (PickleDataSet)...                data_catalog.py:343
                    INFO     Loading data from 'X_test' (MemoryDataSet)...                   data_catalog.py:343
                    INFO     Loading data from 'y_test' (MemoryDataSet)...                   data_catalog.py:343
                    INFO     Running node: evaluate_model_node:                                      node.py:327
                             evaluate_model([regressor,X_test,y_test]) -> None
                    INFO     Model has a coefficient R^2 of 0.462 on test data.                      nodes.py:55
                    INFO     Completed 6 out of 6 tasks                                  sequential_runner.py:85
                    INFO     Pipeline execution completed successfully.                             runner.py:89

プロジェクトのパッケージ化

ドキュメント作成

以下のコマンドをルートディレクトリで実行するとドキュメントを生成することができます。

$ kedro build-docs --open

docs/build/htmlにHTMLのドキュメントが生成され、自動的にブラウザが開きます。

パッケージング

以下のコマンドをルートディレクトリで実行するとプロジェクトをパッケージ化することができます。

$ kedro package

/distフォルダにパッケージがビルドされ、.eggファイルと.whlファイルが生成されます。なお、パッケージはPythonコードのみが含まれており、confdatalogsなどは含んでいません。

以下のコマンドを実行すると、Pythonがインストールされているフォルダのbinフォルダにプロジェクトが生成されます。

$ pip install <path-to-wheel-file>

参考

https://kedro.readthedocs.io/en/stable/tutorial/spaceflights_tutorial.html

Ryusei Kakujo

researchgatelinkedingithub

Focusing on data science for mobility

Bench Press 100kg!