Pendahuluan
Artikel ini adalah tutorial tentang Kedro sepanjang alur Kedro spaceflights tutorial (sebuah proyek untuk membangun model untuk memprediksi harga perjalanan ke bulan dan penerbangan kembalinya).
Alur pengembangan menggunakan Kedro adalah sebagai berikut:
- Buat templat proyek
- Buat proyek baru dan instal library.
- Siapkan kredensial dan log.
- Siapkan folder data
- Simpan data di folder
data
. - Rujuk semua set data dalam proyek.
- Simpan data di folder
- Buat pipeline
- Bangun Node.
- Pilih cara menjalankan Pipeline.
- Kemas proyek
- Bangun dokumentasi proyek.
- Kemas proyek untuk distribusi.
Buat templat proyek
Pertama, bangun lingkungan virtual.
$ python -m venv venv
$ . venv/bin/activate
$ pip install --upgrade pip
Pasang Kedro.
$ pip install kedro
Buat proyek baru.
$ kedro new
Project Name
============
Please enter a human readable name for your new project.
Spaces, hyphens, and underscores are allowed.
[New Kedro Project]: spaceflights
The project name 'spaceflights' has been applied to:
- The project title in /Users/ryu/spaceflights/README.md
- The folder created for your project in /Users/ryu/spaceflights
- The project's python package in /Users/ryu/demo/src/spaceflights
A best-practice setup includes initialising git and creating a virtual environment before running 'pip install -r src/requirements.txt' to install project-specific dependencies. Refer to the Kedro documentation: https://kedro.readthedocs.io/
Arahkan ke direktori proyek yang dibuat.
$ cd spaceflights
Jelaskan pustaka-pustaka yang diperlukan di src/requirements.txt
.
black~=22.0
flake8>=3.7.9, <5.0
ipython>=7.31.1, <8.0
isort~=5.0
jupyter~=1.0
jupyterlab_server>=2.11.1, <2.16.0
jupyterlab~=3.0
kedro~=0.18.4
kedro-datasets[pandas.CSVDataSet, pandas.ExcelDataSet, pandas.ParquetDataSet]~=1.0.0
kedro-telemetry~=0.2.0
kedro-viz~=5.0
nbstripout~=0.4
pytest-cov~=3.0
pytest-mock>=1.7.1, <2.0
pytest~=6.2
scikit-learn~=1.0
Instal pustaka.
$ pip install -r src/requirements.txt
Untuk menyimpan informasi otentikasi seperti nama pengguna dan kata sandi, tambahkan ke conf/local/credentials.yml
.
dev_sql:
username: admin
password: admin
Anda juga bisa memodifikasi pengaturan logging dengan mengedit conf/base/logging.yml
. Standarnya adalah sebagai berikut.
version: 1
disable_existing_loggers: False
formatters:
simple:
format: "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
handlers:
console:
class: logging.StreamHandler
level: INFO
formatter: simple
stream: ext://sys.stdout
info_file_handler:
class: logging.handlers.RotatingFileHandler
level: INFO
formatter: simple
filename: logs/info.log
maxBytes: 10485760 # 10MB
backupCount: 20
encoding: utf8
delay: True
error_file_handler:
class: logging.handlers.RotatingFileHandler
level: ERROR
formatter: simple
filename: logs/errors.log
maxBytes: 10485760 # 10MB
backupCount: 20
encoding: utf8
delay: True
rich:
class: rich.logging.RichHandler
loggers:
kedro:
level: INFO
spaceflights:
level: INFO
root:
handlers: [rich, info_file_handler, error_file_handler]
Siapkan folder data
Unduh tiga file berikut dan simpan di data/01_raw
.
Daftarkan data yang diunduh di conf/base/catalog.yml
.
companies:
type: pandas.CSVDataSet
filepath: data/01_raw/companies.csv
reviews:
type: pandas.CSVDataSet
filepath: data/01_raw/reviews.csv
shuttles:
type: pandas.ExcelDataSet
filepath: data/01_raw/shuttles.xlsx
load_args:
engine: openpyxl # Use modern Excel engine (the default since Kedro 0.18.0)
Menguji apakah memungkinkan untuk memuat data csv
.
$ kedro ipython
In [1]: companies = catalog.load("Companies")
[01/14/23 15:15:57] INFO Loading data from 'companies' (CSVDataSet)...
In [2]: companies.head()
Out[2]:
id company_rating company_location total_fleet_count iata_approved
0 35029 100% Niue 4.0 f
1 30292 67% Anguilla 6.0 f
2 19032 67% Russian Federation 4.0 f
3 8238 91% Barbados 15.0 t
4 30342 NaN Sao Tome and Principe 2.0 t
Menguji apakah memungkinkan untuk memuat data xlsx
.
In [3]: shuttles = catalog.load("shuttles")
[01/14/23 15:20:09] INFO Loading data from 'shuttles' (ExcelDataSet)... data_catalog.py:343
In [4]: shuttles.head()
Out[4]:
id shuttle_location shuttle_type engine_type ... d_check_complete moon_clearance_complete price company_id
0 63561 Niue Type V5 Quantum ... f f $1,325.0 35029
1 36260 Anguilla Type V5 Quantum ... t f $1,780.0 30292
2 57015 Russian Federation Type V5 Quantum ... f f $1,715.0 19032
3 14035 Barbados Type V5 Plasma ... f f $4,770.0 8238
4 10036 Sao Tome and Principe Type V2 Plasma ... f f $2,820.0 30342
Untuk keluar dari sesi ipython
, jalankan exit()
.
In [5]: exit()
Membuat pipeline
Jalur pipeline pemrosesan data
Buat pipeline pemrosesan data.
$ kedro pipeline create data_processing
Perintah di atas akan menghasilkan file berikut ini.
src/spaceflights/pipelines/data_processing/nodes.py
src/spaceflights/pipelines/data_processing/pipelines.py
conf/base/parameters/data_processing.yml
src/tests/pipelines/data_processing
__init__.py
Tulis kode berikut di src/spaceflights/pipelines/data_processing/nodes.py
.
import pandas as pd
def _is_true(x: pd.Series) -> pd.Series:
return x == "t"
def _parse_percentage(x: pd.Series) -> pd.Series:
x = x.str.replace("%", "")
x = x.astype(float) / 100
return x
def _parse_money(x: pd.Series) -> pd.Series:
x = x.str.replace("$", "").str.replace(",", "")
x = x.astype(float)
return x
def preprocess_companies(companies: pd.DataFrame) -> pd.DataFrame:
"""Preprocesses the data for companies.
Args:
companies: Raw data.
Returns:
Preprocessed data, with `company_rating` converted to a float and
`iata_approved` converted to boolean.
"""
companies["iata_approved"] = _is_true(companies["iata_approved"])
companies["company_rating"] = _parse_percentage(companies["company_rating"])
return companies
def preprocess_shuttles(shuttles: pd.DataFrame) -> pd.DataFrame:
"""Preprocesses the data for shuttles.
Args:
shuttles: Raw data.
Returns:
Preprocessed data, with `price` converted to a float and `d_check_complete`,
`moon_clearance_complete` converted to boolean.
"""
shuttles["d_check_complete"] = _is_true(shuttles["d_check_complete"])
shuttles["moon_clearance_complete"] = _is_true(shuttles["moon_clearance_complete"])
shuttles["price"] = _parse_money(shuttles["price"])
return shuttles
def create_model_input_table(
shuttles: pd.DataFrame, companies: pd.DataFrame, reviews: pd.DataFrame
) -> pd.DataFrame:
"""Combines all data to create a model input table.
Args:
shuttles: Preprocessed data for shuttles.
companies: Preprocessed data for companies.
reviews: Raw data for reviews.
Returns:
Model input table.
"""
rated_shuttles = shuttles.merge(reviews, left_on="id", right_on="shuttle_id")
model_input_table = rated_shuttles.merge(
companies, left_on="company_id", right_on="id"
)
model_input_table = model_input_table.dropna()
return model_input_table
Tulis kode berikut di src/spaceflights/pipelines/data_processing/pipelines.py
.
from kedro.pipeline import Pipeline, node, pipeline
from .nodes import create_model_input_table, preprocess_companies, preprocess_shuttles
def create_pipeline(**kwargs) -> Pipeline:
return pipeline(
[
node(
func=preprocess_companies,
inputs="companies",
outputs="preprocessed_companies",
name="preprocess_companies_node",
),
node(
func=preprocess_shuttles,
inputs="shuttles",
outputs="preprocessed_shuttles",
name="preprocess_shuttles_node",
),
node(
func=create_model_input_table,
inputs=["preprocessed_shuttles", "preprocessed_companies", "reviews"],
outputs="model_input_table",
name="create_model_input_table_node",
),
]
)
Kode di atas membuat node untuk setiap fungsi dan membuat pipeline untuk pemrosesan data.
Kode berikut ini menguji node preprocess_companies_node
.
$ kedro run --node=preprocess_companies_node
[08/09/22 16:43:10] INFO Kedro project kedro-tutorial session.py:346
[08/09/22 16:43:11] INFO Loading data from 'companies' (CSVDataSet)... data_catalog.py:343
INFO Running node: preprocess_companies_node: node.py:327
preprocess_companies([companies]) -> [preprocessed_companies]
INFO Saving data to 'preprocessed_companies' (MemoryDataSet)... data_catalog.py:382
INFO Completed 1 out of 1 tasks sequential_runner.py:85
INFO Pipeline execution completed successfully. runner.py:89
INFO Loading data from 'preprocessed_companies' (MemoryDataSet)... data_catalog.py:343
Untuk menguji semua node sebagai pipeline pemrosesan data yang lengkap, jalankan kedro run
.
$ kedro run
[08/09/22 17:00:54] INFO Kedro project kedro-tutorial session.py:346
[08/09/22 17:01:10] INFO Reached after_catalog_created hook plugin.py:17
INFO Loading data from 'companies' (CSVDataSet)... data_catalog.py:343
INFO Running node: preprocess_companies_node: node.py:327
preprocess_companies([companies]) -> [preprocessed_companies]
INFO Saving data to 'preprocessed_companies' (MemoryDataSet)... data_catalog.py:382
INFO Completed 1 out of 3 tasks sequential_runner.py:85
INFO Loading data from 'shuttles' (ExcelDataSet)... data_catalog.py:343
[08/09/22 17:01:25] INFO Running node: preprocess_shuttles_node: preprocess_shuttles([shuttles]) node.py:327
-> [preprocessed_shuttles]
INFO Saving data to 'preprocessed_shuttles' (MemoryDataSet)... data_catalog.py:382
INFO Completed 2 out of 3 tasks sequential_runner.py:85
INFO Loading data from 'preprocessed_shuttles' (MemoryDataSet)... data_catalog.py:343
INFO Loading data from 'preprocessed_companies' (MemoryDataSet)... data_catalog.py:343
INFO Loading data from 'reviews' (CSVDataSet)... data_catalog.py:343
INFO Running node: create_model_input_table_node: node.py:327
create_model_input_table([preprocessed_shuttles,preprocessed_companies,
reviews]) -> [model_input_table]
[08/09/22 17:01:28] INFO Saving data to 'model_input_table' (MemoryDataSet)... data_catalog.py:382
[08/09/22 17:01:29] INFO Completed 3 out of 3 tasks sequential_runner.py:85
INFO Pipeline execution completed successfully. runner.py:89
INFO Loading data from 'model_input_table' (MemoryDataSet)... data_catalog.py:343
Jika Anda ingin menyimpan data yang dihasilkan oleh data prapemrosesan dalam sebuah file, tambahkan yang berikut ini ke conf/base/catalog.yml
.
preprocessed_companies:
type: pandas.ParquetDataSet
filepath: data/02_intermediate/preprocessed_companies.pq
preprocessed_shuttles:
type: pandas.ParquetDataSet
filepath: data/02_intermediate/preprocessed_shuttles.pq
model_input_table:
type: pandas.ParquetDataSet
filepath: data/03_primary/model_input_table.pq
Pipeline ilmu data
Buat pipeline ilmu data.
$ kedro pipeline create data_science
Tulis kode berikut di src/spaceflights/pipelines/data_science/nodes.py
. LinearRegression dari scikit-learn digunakan untuk kode data science.
import logging
from typing import Dict, Tuple
import pandas as pd
from sklearn.linear_model import LinearRegression
from sklearn.metrics import r2_score
from sklearn.model_selection import train_test_split
def split_data(data: pd.DataFrame, parameters: Dict) -> Tuple:
"""Splits data into features and targets training and test sets.
Args:
data: Data containing features and target.
parameters: Parameters defined in parameters/data_science.yml.
Returns:
Split data.
"""
X = data[parameters["features"]]
y = data["price"]
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=parameters["test_size"], random_state=parameters["random_state"]
)
return X_train, X_test, y_train, y_test
def train_model(X_train: pd.DataFrame, y_train: pd.Series) -> LinearRegression:
"""Trains the linear regression model.
Args:
X_train: Training data of independent features.
y_train: Training data for price.
Returns:
Trained model.
"""
regressor = LinearRegression()
regressor.fit(X_train, y_train)
return regressor
def evaluate_model(
regressor: LinearRegression, X_test: pd.DataFrame, y_test: pd.Series
):
"""Calculates and logs the coefficient of determination.
Args:
regressor: Trained model.
X_test: Testing data of independent features.
y_test: Testing data for price.
"""
y_pred = regressor.predict(X_test)
score = r2_score(y_test, y_pred)
logger = logging.getLogger(__name__)
logger.info("Model has a coefficient R^2 of %.3f on test data.", score)
Atur parameter di conf/base/parameter/data_science.yml
yang akan direferensikan selama eksekusi pipeline.
model_options:
test_size: 0.2
random_state: 3
features:
- engines
- passenger_capacity
- crew
- d_check_complete
- moon_clearance_complete
- iata_approved
- company_rating
- review_scores_rating
Tulis kode berikut di src/spaceflights/pipelines/data_science/pipeline.py
.
from kedro.pipeline import Pipeline, node, pipeline
from .nodes import evaluate_model, split_data, train_model
def create_pipeline(**kwargs) -> Pipeline:
return pipeline(
[
node(
func=split_data,
inputs=["model_input_table", "params:model_options"],
outputs=["X_train", "X_test", "y_train", "y_test"],
name="split_data_node",
),
node(
func=train_model,
inputs=["X_train", "y_train"],
outputs="regressor",
name="train_model_node",
),
node(
func=evaluate_model,
inputs=["regressor", "X_test", "y_test"],
outputs=None,
name="evaluate_model_node",
),
]
)
Tambahkan dataset ke conf/base/catalog.yml
.
regressor:
type: pickle.PickleDataSet
filepath: data/06_models/regressor.pickle
versioned: true
Jalankan kedro run
untuk menguji pipeline.
$ kedro run
[08/09/22 16:56:00] INFO Kedro project kedro-tutorial session.py:346
INFO Loading data from 'companies' (CSVDataSet)... data_catalog.py:343
INFO Running node: preprocess_companies_node: node.py:327
preprocess_companies([companies]) -> [preprocessed_companies]
INFO Saving data to 'preprocessed_companies' (MemoryDataSet)... data_catalog.py:382
INFO Completed 1 out of 6 tasks sequential_runner.py:85
INFO Loading data from 'shuttles' (ExcelDataSet)... data_catalog.py:343
[08/09/22 16:56:15] INFO Running node: preprocess_shuttles_node: preprocess_shuttles([shuttles]) node.py:327
-> [preprocessed_shuttles]
INFO Saving data to 'preprocessed_shuttles' (MemoryDataSet)... data_catalog.py:382
INFO Completed 2 out of 6 tasks sequential_runner.py:85
INFO Loading data from 'preprocessed_shuttles' (MemoryDataSet)... data_catalog.py:343
INFO Loading data from 'preprocessed_companies' (MemoryDataSet)... data_catalog.py:343
INFO Loading data from 'reviews' (CSVDataSet)... data_catalog.py:343
INFO Running node: create_model_input_table_node: node.py:327
create_model_input_table([preprocessed_shuttles,preprocessed_companies,
reviews]) -> [model_input_table]
[08/09/22 16:56:18] INFO Saving data to 'model_input_table' (MemoryDataSet)... data_catalog.py:382
[08/09/22 16:56:19] INFO Completed 3 out of 6 tasks sequential_runner.py:85
INFO Loading data from 'model_input_table' (MemoryDataSet)... data_catalog.py:343
INFO Loading data from 'params:model_options' (MemoryDataSet)... data_catalog.py:343
INFO Running node: split_data_node: node.py:327
split_data([model_input_table,params:model_options]) ->
[X_train,X_test,y_train,y_test]
INFO Saving data to 'X_train' (MemoryDataSet)... data_catalog.py:382
INFO Saving data to 'X_test' (MemoryDataSet)... data_catalog.py:382
INFO Saving data to 'y_train' (MemoryDataSet)... data_catalog.py:382
INFO Saving data to 'y_test' (MemoryDataSet)... data_catalog.py:382
INFO Completed 4 out of 6 tasks sequential_runner.py:85
INFO Loading data from 'X_train' (MemoryDataSet)... data_catalog.py:343
INFO Loading data from 'y_train' (MemoryDataSet)... data_catalog.py:343
INFO Running node: train_model_node: train_model([X_train,y_train]) -> node.py:327
[regressor]
[08/09/22 16:56:20] INFO Saving data to 'regressor' (PickleDataSet)... data_catalog.py:382
INFO Completed 5 out of 6 tasks sequential_runner.py:85
INFO Loading data from 'regressor' (PickleDataSet)... data_catalog.py:343
INFO Loading data from 'X_test' (MemoryDataSet)... data_catalog.py:343
INFO Loading data from 'y_test' (MemoryDataSet)... data_catalog.py:343
INFO Running node: evaluate_model_node: node.py:327
evaluate_model([regressor,X_test,y_test]) -> None
INFO Model has a coefficient R^2 of 0.462 on test data. nodes.py:55
INFO Completed 6 out of 6 tasks sequential_runner.py:85
INFO Pipeline execution completed successfully. runner.py:89
Kemas proyek
Hasilkan dokumentasi
Perintah-perintah berikut ini dapat dieksekusi di direktori root untuk menghasilkan dokumentasi.
$ kedro build-docs --open
Sebuah dokumen HTML akan dihasilkan di docs/build/html
dan secara otomatis dibuka di browser Anda.
Pengemasan
Perintah berikut ini dapat dieksekusi di direktori root untuk memaketkan sebuah proyek.
$ kedro package
Paket akan dibangun di folder /dist
dan akan menghasilkan file .egg
dan file .whl
. Perhatikan bahwa paket hanya berisi kode Python, bukan conf
, data
, logs
, dll.
Perintah berikut ini akan menghasilkan proyek di folder bin
dari instalasi Python Anda.
$ pip install <path-to-wheel-file>
Referensi