はじめに
この記事では、Kedro spaceflights tutorial(月への旅行とその帰りの便の料金を予測するモデルを構築するプロジェクト)に沿って Kedro のチュートリアルを行います。
Kedroを用いた開発フローは次のようになります。
- プロジェクトテンプレートの作成
- 新規プロジェクトを作成し、ライブラリをインストールする
- クレデンシャルとログを設定する
- データフォルダの設定
data
フォルダにデータを格納する- プロジェクトの全てのデータセットを参照する
- パイプラインの作成
- Nodeを構築する
- Pipelineの実行方法を選択する
- プロジェクトのパッケージ化
- プロジェクトのドキュメントを構築する
- プロジェクトを配布するためにパッケージ化する
プロジェクトテンプレートの作成
まずは仮想環境を構築します。
$ python -m venv venv
$ . venv/bin/activate
$ pip install --upgrade pip
Kedroをインストールします。
$ pip install kedro
新規プロジェクトを作成します。
$ kedro new
Project Name
============
Please enter a human readable name for your new project.
Spaces, hyphens, and underscores are allowed.
[New Kedro Project]: spaceflights
The project name 'spaceflights' has been applied to:
- The project title in /Users/ryu/spaceflights/README.md
- The folder created for your project in /Users/ryu/spaceflights
- The project's python package in /Users/ryu/demo/src/spaceflights
A best-practice setup includes initialising git and creating a virtual environment before running 'pip install -r src/requirements.txt' to install project-specific dependencies. Refer to the Kedro documentation: https://kedro.readthedocs.io/
作成されたプロジェクトのディレクトリに移動します。
$ cd spaceflights
src/requirements.txt
に必要なライブラリを記述します。
black~=22.0
flake8>=3.7.9, <5.0
ipython>=7.31.1, <8.0
isort~=5.0
jupyter~=1.0
jupyterlab_server>=2.11.1, <2.16.0
jupyterlab~=3.0
kedro~=0.18.4
kedro-datasets[pandas.CSVDataSet, pandas.ExcelDataSet, pandas.ParquetDataSet]~=1.0.0
kedro-telemetry~=0.2.0
kedro-viz~=5.0
nbstripout~=0.4
pytest-cov~=3.0
pytest-mock>=1.7.1, <2.0
pytest~=6.2
scikit-learn~=1.0
ライブラリをインストールします。
$ pip install -r src/requirements.txt
ユーザー名やパスワードなどの認証情報を保存する場合はconf/local/credentials.yml
に追加します。
dev_sql:
username: admin
password: admin
また、conf/base/logging.yml
を編集することでロギングの設定を変更することができます。デフォルトでは次のようになっています。
version: 1
disable_existing_loggers: False
formatters:
simple:
format: "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
handlers:
console:
class: logging.StreamHandler
level: INFO
formatter: simple
stream: ext://sys.stdout
info_file_handler:
class: logging.handlers.RotatingFileHandler
level: INFO
formatter: simple
filename: logs/info.log
maxBytes: 10485760 # 10MB
backupCount: 20
encoding: utf8
delay: True
error_file_handler:
class: logging.handlers.RotatingFileHandler
level: ERROR
formatter: simple
filename: logs/errors.log
maxBytes: 10485760 # 10MB
backupCount: 20
encoding: utf8
delay: True
rich:
class: rich.logging.RichHandler
loggers:
kedro:
level: INFO
spaceflights:
level: INFO
root:
handlers: [rich, info_file_handler, error_file_handler]
データフォルダの設定
次の3つのファイルをダウンロードし、data/01_raw
に格納します。
ダウンロードしたデータをconf/base/catalog.yml
に登録します。
companies:
type: pandas.CSVDataSet
filepath: data/01_raw/companies.csv
reviews:
type: pandas.CSVDataSet
filepath: data/01_raw/reviews.csv
shuttles:
type: pandas.ExcelDataSet
filepath: data/01_raw/shuttles.xlsx
load_args:
engine: openpyxl # Use modern Excel engine (the default since Kedro 0.18.0)
csv
データをロードすることができるかテストします。
$ kedro ipython
In [1]: companies = catalog.load("Companies")
[01/14/23 15:15:57] INFO Loading data from 'companies' (CSVDataSet)...
In [2]: companies.head()
Out[2]:
id company_rating company_location total_fleet_count iata_approved
0 35029 100% Niue 4.0 f
1 30292 67% Anguilla 6.0 f
2 19032 67% Russian Federation 4.0 f
3 8238 91% Barbados 15.0 t
4 30342 NaN Sao Tome and Principe 2.0 t
xlsx
データをロードすることができるかテストします。
In [3]: shuttles = catalog.load("shuttles")
[01/14/23 15:20:09] INFO Loading data from 'shuttles' (ExcelDataSet)... data_catalog.py:343
In [4]: shuttles.head()
Out[4]:
id shuttle_location shuttle_type engine_type ... d_check_complete moon_clearance_complete price company_id
0 63561 Niue Type V5 Quantum ... f f $1,325.0 35029
1 36260 Anguilla Type V5 Quantum ... t f $1,780.0 30292
2 57015 Russian Federation Type V5 Quantum ... f f $1,715.0 19032
3 14035 Barbados Type V5 Plasma ... f f $4,770.0 8238
4 10036 Sao Tome and Principe Type V2 Plasma ... f f $2,820.0 30342
ipython
のセッションを終了するにはexit()
を実行します。
In [5]: exit()
パイプラインの作成
データ処理のパイプライン
データ処理のパイプラインを作成します。
$ kedro pipeline create data_processing
上記のコマンドを実行すると次のファイルが生成されます。
src/spaceflights/pipelines/data_processing/nodes.py
src/spaceflights/pipelines/data_processing/pipelines.py
conf/base/parameters/data_processing.yml
src/tests/pipelines/data_processing
__init__.py
src/spaceflights/pipelines/data_processing/nodes.py
に次のコードを記述します。
import pandas as pd
def _is_true(x: pd.Series) -> pd.Series:
return x == "t"
def _parse_percentage(x: pd.Series) -> pd.Series:
x = x.str.replace("%", "")
x = x.astype(float) / 100
return x
def _parse_money(x: pd.Series) -> pd.Series:
x = x.str.replace("$", "").str.replace(",", "")
x = x.astype(float)
return x
def preprocess_companies(companies: pd.DataFrame) -> pd.DataFrame:
"""Preprocesses the data for companies.
Args:
companies: Raw data.
Returns:
Preprocessed data, with `company_rating` converted to a float and
`iata_approved` converted to boolean.
"""
companies["iata_approved"] = _is_true(companies["iata_approved"])
companies["company_rating"] = _parse_percentage(companies["company_rating"])
return companies
def preprocess_shuttles(shuttles: pd.DataFrame) -> pd.DataFrame:
"""Preprocesses the data for shuttles.
Args:
shuttles: Raw data.
Returns:
Preprocessed data, with `price` converted to a float and `d_check_complete`,
`moon_clearance_complete` converted to boolean.
"""
shuttles["d_check_complete"] = _is_true(shuttles["d_check_complete"])
shuttles["moon_clearance_complete"] = _is_true(shuttles["moon_clearance_complete"])
shuttles["price"] = _parse_money(shuttles["price"])
return shuttles
def create_model_input_table(
shuttles: pd.DataFrame, companies: pd.DataFrame, reviews: pd.DataFrame
) -> pd.DataFrame:
"""Combines all data to create a model input table.
Args:
shuttles: Preprocessed data for shuttles.
companies: Preprocessed data for companies.
reviews: Raw data for reviews.
Returns:
Model input table.
"""
rated_shuttles = shuttles.merge(reviews, left_on="id", right_on="shuttle_id")
model_input_table = rated_shuttles.merge(
companies, left_on="company_id", right_on="id"
)
model_input_table = model_input_table.dropna()
return model_input_table
src/spaceflights/pipelines/data_processing/pipelines.py
に次のコードを記述します。
from kedro.pipeline import Pipeline, node, pipeline
from .nodes import create_model_input_table, preprocess_companies, preprocess_shuttles
def create_pipeline(**kwargs) -> Pipeline:
return pipeline(
[
node(
func=preprocess_companies,
inputs="companies",
outputs="preprocessed_companies",
name="preprocess_companies_node",
),
node(
func=preprocess_shuttles,
inputs="shuttles",
outputs="preprocessed_shuttles",
name="preprocess_shuttles_node",
),
node(
func=create_model_input_table,
inputs=["preprocessed_shuttles", "preprocessed_companies", "reviews"],
outputs="model_input_table",
name="create_model_input_table_node",
),
]
)
上記コードは、各関数のノードを生成し、データ処理のためのパイプラインを作成しています。
次のコードでpreprocess_companies_node
ノードのテストをします。
$ kedro run --node=preprocess_companies_node
[08/09/22 16:43:10] INFO Kedro project kedro-tutorial session.py:346
[08/09/22 16:43:11] INFO Loading data from 'companies' (CSVDataSet)... data_catalog.py:343
INFO Running node: preprocess_companies_node: node.py:327
preprocess_companies([companies]) -> [preprocessed_companies]
INFO Saving data to 'preprocessed_companies' (MemoryDataSet)... data_catalog.py:382
INFO Completed 1 out of 1 tasks sequential_runner.py:85
INFO Pipeline execution completed successfully. runner.py:89
INFO Loading data from 'preprocessed_companies' (MemoryDataSet)... data_catalog.py:343
完全なデータ処理のパイプラインとして全てのノードをテストするにはkedro run
を実行します。
$ kedro run
[08/09/22 17:00:54] INFO Kedro project kedro-tutorial session.py:346
[08/09/22 17:01:10] INFO Reached after_catalog_created hook plugin.py:17
INFO Loading data from 'companies' (CSVDataSet)... data_catalog.py:343
INFO Running node: preprocess_companies_node: node.py:327
preprocess_companies([companies]) -> [preprocessed_companies]
INFO Saving data to 'preprocessed_companies' (MemoryDataSet)... data_catalog.py:382
INFO Completed 1 out of 3 tasks sequential_runner.py:85
INFO Loading data from 'shuttles' (ExcelDataSet)... data_catalog.py:343
[08/09/22 17:01:25] INFO Running node: preprocess_shuttles_node: preprocess_shuttles([shuttles]) node.py:327
-> [preprocessed_shuttles]
INFO Saving data to 'preprocessed_shuttles' (MemoryDataSet)... data_catalog.py:382
INFO Completed 2 out of 3 tasks sequential_runner.py:85
INFO Loading data from 'preprocessed_shuttles' (MemoryDataSet)... data_catalog.py:343
INFO Loading data from 'preprocessed_companies' (MemoryDataSet)... data_catalog.py:343
INFO Loading data from 'reviews' (CSVDataSet)... data_catalog.py:343
INFO Running node: create_model_input_table_node: node.py:327
create_model_input_table([preprocessed_shuttles,preprocessed_companies,
reviews]) -> [model_input_table]
[08/09/22 17:01:28] INFO Saving data to 'model_input_table' (MemoryDataSet)... data_catalog.py:382
[08/09/22 17:01:29] INFO Completed 3 out of 3 tasks sequential_runner.py:85
INFO Pipeline execution completed successfully. runner.py:89
INFO Loading data from 'model_input_table' (MemoryDataSet)... data_catalog.py:343
前処理データにより生成されるデータをファイルに保存したい場合はconf/base/catalog.yml
に次のように追記します。
preprocessed_companies:
type: pandas.ParquetDataSet
filepath: data/02_intermediate/preprocessed_companies.pq
preprocessed_shuttles:
type: pandas.ParquetDataSet
filepath: data/02_intermediate/preprocessed_shuttles.pq
model_input_table:
type: pandas.ParquetDataSet
filepath: data/03_primary/model_input_table.pq
データサイエンスのパイプライン
データサイエンスのパイプラインを作成します。
$ kedro pipeline create data_science
src/spaceflights/pipelines/data_science/nodes.py
に次のコードを記述します。データサイエンスコードにはscikit-learnのLinearRegressionが使われます。
import logging
from typing import Dict, Tuple
import pandas as pd
from sklearn.linear_model import LinearRegression
from sklearn.metrics import r2_score
from sklearn.model_selection import train_test_split
def split_data(data: pd.DataFrame, parameters: Dict) -> Tuple:
"""Splits data into features and targets training and test sets.
Args:
data: Data containing features and target.
parameters: Parameters defined in parameters/data_science.yml.
Returns:
Split data.
"""
X = data[parameters["features"]]
y = data["price"]
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=parameters["test_size"], random_state=parameters["random_state"]
)
return X_train, X_test, y_train, y_test
def train_model(X_train: pd.DataFrame, y_train: pd.Series) -> LinearRegression:
"""Trains the linear regression model.
Args:
X_train: Training data of independent features.
y_train: Training data for price.
Returns:
Trained model.
"""
regressor = LinearRegression()
regressor.fit(X_train, y_train)
return regressor
def evaluate_model(
regressor: LinearRegression, X_test: pd.DataFrame, y_test: pd.Series
):
"""Calculates and logs the coefficient of determination.
Args:
regressor: Trained model.
X_test: Testing data of independent features.
y_test: Testing data for price.
"""
y_pred = regressor.predict(X_test)
score = r2_score(y_test, y_pred)
logger = logging.getLogger(__name__)
logger.info("Model has a coefficient R^2 of %.3f on test data.", score)
conf/base/parameters/data_science.yml
にパイプライン実行時に参照されるパラメータを設定します。
model_options:
test_size: 0.2
random_state: 3
features:
- engines
- passenger_capacity
- crew
- d_check_complete
- moon_clearance_complete
- iata_approved
- company_rating
- review_scores_rating
src/spaceflights/pipelines/data_science/pipeline.py
に次のコードを記述します。
from kedro.pipeline import Pipeline, node, pipeline
from .nodes import evaluate_model, split_data, train_model
def create_pipeline(**kwargs) -> Pipeline:
return pipeline(
[
node(
func=split_data,
inputs=["model_input_table", "params:model_options"],
outputs=["X_train", "X_test", "y_train", "y_test"],
name="split_data_node",
),
node(
func=train_model,
inputs=["X_train", "y_train"],
outputs="regressor",
name="train_model_node",
),
node(
func=evaluate_model,
inputs=["regressor", "X_test", "y_test"],
outputs=None,
name="evaluate_model_node",
),
]
)
conf/base/catalog.yml
にデータセットを追加します。
regressor:
type: pickle.PickleDataSet
filepath: data/06_models/regressor.pickle
versioned: true
kedro run
を実行してパイプラインをテストします。
$ kedro run
[08/09/22 16:56:00] INFO Kedro project kedro-tutorial session.py:346
INFO Loading data from 'companies' (CSVDataSet)... data_catalog.py:343
INFO Running node: preprocess_companies_node: node.py:327
preprocess_companies([companies]) -> [preprocessed_companies]
INFO Saving data to 'preprocessed_companies' (MemoryDataSet)... data_catalog.py:382
INFO Completed 1 out of 6 tasks sequential_runner.py:85
INFO Loading data from 'shuttles' (ExcelDataSet)... data_catalog.py:343
[08/09/22 16:56:15] INFO Running node: preprocess_shuttles_node: preprocess_shuttles([shuttles]) node.py:327
-> [preprocessed_shuttles]
INFO Saving data to 'preprocessed_shuttles' (MemoryDataSet)... data_catalog.py:382
INFO Completed 2 out of 6 tasks sequential_runner.py:85
INFO Loading data from 'preprocessed_shuttles' (MemoryDataSet)... data_catalog.py:343
INFO Loading data from 'preprocessed_companies' (MemoryDataSet)... data_catalog.py:343
INFO Loading data from 'reviews' (CSVDataSet)... data_catalog.py:343
INFO Running node: create_model_input_table_node: node.py:327
create_model_input_table([preprocessed_shuttles,preprocessed_companies,
reviews]) -> [model_input_table]
[08/09/22 16:56:18] INFO Saving data to 'model_input_table' (MemoryDataSet)... data_catalog.py:382
[08/09/22 16:56:19] INFO Completed 3 out of 6 tasks sequential_runner.py:85
INFO Loading data from 'model_input_table' (MemoryDataSet)... data_catalog.py:343
INFO Loading data from 'params:model_options' (MemoryDataSet)... data_catalog.py:343
INFO Running node: split_data_node: node.py:327
split_data([model_input_table,params:model_options]) ->
[X_train,X_test,y_train,y_test]
INFO Saving data to 'X_train' (MemoryDataSet)... data_catalog.py:382
INFO Saving data to 'X_test' (MemoryDataSet)... data_catalog.py:382
INFO Saving data to 'y_train' (MemoryDataSet)... data_catalog.py:382
INFO Saving data to 'y_test' (MemoryDataSet)... data_catalog.py:382
INFO Completed 4 out of 6 tasks sequential_runner.py:85
INFO Loading data from 'X_train' (MemoryDataSet)... data_catalog.py:343
INFO Loading data from 'y_train' (MemoryDataSet)... data_catalog.py:343
INFO Running node: train_model_node: train_model([X_train,y_train]) -> node.py:327
[regressor]
[08/09/22 16:56:20] INFO Saving data to 'regressor' (PickleDataSet)... data_catalog.py:382
INFO Completed 5 out of 6 tasks sequential_runner.py:85
INFO Loading data from 'regressor' (PickleDataSet)... data_catalog.py:343
INFO Loading data from 'X_test' (MemoryDataSet)... data_catalog.py:343
INFO Loading data from 'y_test' (MemoryDataSet)... data_catalog.py:343
INFO Running node: evaluate_model_node: node.py:327
evaluate_model([regressor,X_test,y_test]) -> None
INFO Model has a coefficient R^2 of 0.462 on test data. nodes.py:55
INFO Completed 6 out of 6 tasks sequential_runner.py:85
INFO Pipeline execution completed successfully. runner.py:89
プロジェクトのパッケージ化
ドキュメント作成
次のコマンドをルートディレクトリで実行するとドキュメントを生成することができます。
$ kedro build-docs --open
docs/build/html
にHTMLのドキュメントが生成され、自動的にブラウザが開きます。
パッケージング
次のコマンドをルートディレクトリで実行するとプロジェクトをパッケージ化することができます。
$ kedro package
/dist
フォルダにパッケージがビルドされ、.egg
ファイルと.whl
ファイルが生成されます。なお、パッケージはPythonコードのみが含まれており、conf
やdata
、logs
などは含んでいません。
次のコマンドを実行すると、Pythonがインストールされているフォルダのbin
フォルダにプロジェクトが生成されます。
$ pip install <path-to-wheel-file>
参考