Traffine I/O

Bahasa Indonesia

2023-01-14

Tutorial Kedro

Pendahuluan

Artikel ini adalah tutorial tentang Kedro sepanjang alur Kedro spaceflights tutorial (sebuah proyek untuk membangun model untuk memprediksi harga perjalanan ke bulan dan penerbangan kembalinya).

Alur pengembangan menggunakan Kedro adalah sebagai berikut:

  1. Buat templat proyek
    • Buat proyek baru dan instal library.
    • Siapkan kredensial dan log.
  2. Siapkan folder data
    • Simpan data di folder data.
    • Rujuk semua set data dalam proyek.
  3. Buat pipeline
    • Bangun Node.
    • Pilih cara menjalankan Pipeline.
  4. Kemas proyek
    • Bangun dokumentasi proyek.
    • Kemas proyek untuk distribusi.

Buat templat proyek

Pertama, bangun lingkungan virtual.

$ python -m venv venv
$ . venv/bin/activate
$ pip install --upgrade pip

Pasang Kedro.

$ pip install kedro

Buat proyek baru.

$ kedro new

Project Name
============
Please enter a human readable name for your new project.
Spaces, hyphens, and underscores are allowed.
 [New Kedro Project]: spaceflights

The project name 'spaceflights' has been applied to:
- The project title in /Users/ryu/spaceflights/README.md
- The folder created for your project in /Users/ryu/spaceflights
- The project's python package in /Users/ryu/demo/src/spaceflights

A best-practice setup includes initialising git and creating a virtual environment before running 'pip install -r src/requirements.txt' to install project-specific dependencies. Refer to the Kedro documentation: https://kedro.readthedocs.io/

Arahkan ke direktori proyek yang dibuat.

$ cd spaceflights

Jelaskan pustaka-pustaka yang diperlukan di src/requirements.txt.

src/requirements.txt
black~=22.0
flake8>=3.7.9, <5.0
ipython>=7.31.1, <8.0
isort~=5.0
jupyter~=1.0
jupyterlab_server>=2.11.1, <2.16.0
jupyterlab~=3.0
kedro~=0.18.4
kedro-datasets[pandas.CSVDataSet, pandas.ExcelDataSet, pandas.ParquetDataSet]~=1.0.0
kedro-telemetry~=0.2.0
kedro-viz~=5.0
nbstripout~=0.4
pytest-cov~=3.0
pytest-mock>=1.7.1, <2.0
pytest~=6.2
scikit-learn~=1.0

Instal pustaka.

$ pip install -r src/requirements.txt

Untuk menyimpan informasi otentikasi seperti nama pengguna dan kata sandi, tambahkan ke conf/local/credentials.yml.

conf/local/credentials.yml
dev_sql:
    username: admin
    password: admin

Anda juga bisa memodifikasi pengaturan logging dengan mengedit conf/base/logging.yml. Standarnya adalah sebagai berikut.

conf/base/logging.yml
version: 1

disable_existing_loggers: False

formatters:
  simple:
    format: "%(asctime)s - %(name)s - %(levelname)s - %(message)s"

handlers:
  console:
    class: logging.StreamHandler
    level: INFO
    formatter: simple
    stream: ext://sys.stdout

  info_file_handler:
    class: logging.handlers.RotatingFileHandler
    level: INFO
    formatter: simple
    filename: logs/info.log
    maxBytes: 10485760 # 10MB
    backupCount: 20
    encoding: utf8
    delay: True

  error_file_handler:
    class: logging.handlers.RotatingFileHandler
    level: ERROR
    formatter: simple
    filename: logs/errors.log
    maxBytes: 10485760 # 10MB
    backupCount: 20
    encoding: utf8
    delay: True

  rich:
    class: rich.logging.RichHandler

loggers:
  kedro:
    level: INFO

  spaceflights:
    level: INFO

root:
  handlers: [rich, info_file_handler, error_file_handler]

Siapkan folder data

Unduh tiga file berikut dan simpan di data/01_raw.

https://kedro-org.github.io/kedro/companies.csv
https://kedro-org.github.io/kedro/reviews.csv
https://kedro-org.github.io/kedro/shuttles.xlsx

Daftarkan data yang diunduh di conf/base/catalog.yml.

conf/base/catalog.yml
companies:
  type: pandas.CSVDataSet
  filepath: data/01_raw/companies.csv

reviews:
  type: pandas.CSVDataSet
  filepath: data/01_raw/reviews.csv

shuttles:
  type: pandas.ExcelDataSet
  filepath: data/01_raw/shuttles.xlsx
  load_args:
    engine: openpyxl # Use modern Excel engine (the default since Kedro 0.18.0)

Menguji apakah memungkinkan untuk memuat data csv.

$ kedro ipython

In [1]: companies = catalog.load("Companies")
[01/14/23 15:15:57] INFO     Loading data from 'companies' (CSVDataSet)...

In [2]: companies.head()
Out[2]:
      id company_rating       company_location  total_fleet_count iata_approved
0  35029           100%                   Niue                4.0             f
1  30292            67%               Anguilla                6.0             f
2  19032            67%     Russian Federation                4.0             f
3   8238            91%               Barbados               15.0             t
4  30342            NaN  Sao Tome and Principe                2.0             t

Menguji apakah memungkinkan untuk memuat data xlsx.

In [3]: shuttles = catalog.load("shuttles")
[01/14/23 15:20:09] INFO     Loading data from 'shuttles' (ExcelDataSet)...                                          data_catalog.py:343

In [4]: shuttles.head()
Out[4]:
      id       shuttle_location shuttle_type engine_type  ... d_check_complete  moon_clearance_complete     price company_id
0  63561                   Niue      Type V5     Quantum  ...                f                        f  $1,325.0      35029
1  36260               Anguilla      Type V5     Quantum  ...                t                        f  $1,780.0      30292
2  57015     Russian Federation      Type V5     Quantum  ...                f                        f  $1,715.0      19032
3  14035               Barbados      Type V5      Plasma  ...                f                        f  $4,770.0       8238
4  10036  Sao Tome and Principe      Type V2      Plasma  ...                f                        f  $2,820.0      30342

Untuk keluar dari sesi ipython, jalankan exit().

In [5]: exit()

Membuat pipeline

Jalur pipeline pemrosesan data

Buat pipeline pemrosesan data.

$ kedro pipeline create data_processing

Perintah di atas akan menghasilkan file berikut ini.

  • src/spaceflights/pipelines/data_processing/nodes.py
  • src/spaceflights/pipelines/data_processing/pipelines.py
  • conf/base/parameters/data_processing.yml
  • src/tests/pipelines/data_processing
  • __init__.py

Tulis kode berikut di src/spaceflights/pipelines/data_processing/nodes.py.

src/spaceflights/pipelines/data_processing/nodes.py
import pandas as pd


def _is_true(x: pd.Series) -> pd.Series:
    return x == "t"


def _parse_percentage(x: pd.Series) -> pd.Series:
    x = x.str.replace("%", "")
    x = x.astype(float) / 100
    return x


def _parse_money(x: pd.Series) -> pd.Series:
    x = x.str.replace("$", "").str.replace(",", "")
    x = x.astype(float)
    return x


def preprocess_companies(companies: pd.DataFrame) -> pd.DataFrame:
    """Preprocesses the data for companies.

    Args:
        companies: Raw data.
    Returns:
        Preprocessed data, with `company_rating` converted to a float and
        `iata_approved` converted to boolean.
    """
    companies["iata_approved"] = _is_true(companies["iata_approved"])
    companies["company_rating"] = _parse_percentage(companies["company_rating"])
    return companies


def preprocess_shuttles(shuttles: pd.DataFrame) -> pd.DataFrame:
    """Preprocesses the data for shuttles.

    Args:
        shuttles: Raw data.
    Returns:
        Preprocessed data, with `price` converted to a float and `d_check_complete`,
        `moon_clearance_complete` converted to boolean.
    """
    shuttles["d_check_complete"] = _is_true(shuttles["d_check_complete"])
    shuttles["moon_clearance_complete"] = _is_true(shuttles["moon_clearance_complete"])
    shuttles["price"] = _parse_money(shuttles["price"])
    return shuttles


def create_model_input_table(
    shuttles: pd.DataFrame, companies: pd.DataFrame, reviews: pd.DataFrame
) -> pd.DataFrame:
    """Combines all data to create a model input table.

    Args:
        shuttles: Preprocessed data for shuttles.
        companies: Preprocessed data for companies.
        reviews: Raw data for reviews.
    Returns:
        Model input table.

    """
    rated_shuttles = shuttles.merge(reviews, left_on="id", right_on="shuttle_id")
    model_input_table = rated_shuttles.merge(
        companies, left_on="company_id", right_on="id"
    )
    model_input_table = model_input_table.dropna()
    return model_input_table

Tulis kode berikut di src/spaceflights/pipelines/data_processing/pipelines.py.

src/spaceflights/pipelines/data_processing/pipelines.py
from kedro.pipeline import Pipeline, node, pipeline

from .nodes import create_model_input_table, preprocess_companies, preprocess_shuttles


def create_pipeline(**kwargs) -> Pipeline:
    return pipeline(
        [
            node(
                func=preprocess_companies,
                inputs="companies",
                outputs="preprocessed_companies",
                name="preprocess_companies_node",
            ),
            node(
                func=preprocess_shuttles,
                inputs="shuttles",
                outputs="preprocessed_shuttles",
                name="preprocess_shuttles_node",
            ),
            node(
                func=create_model_input_table,
                inputs=["preprocessed_shuttles", "preprocessed_companies", "reviews"],
                outputs="model_input_table",
                name="create_model_input_table_node",
            ),
        ]
    )

Kode di atas membuat node untuk setiap fungsi dan membuat pipeline untuk pemrosesan data.

Kode berikut ini menguji node preprocess_companies_node.

$ kedro run --node=preprocess_companies_node

[08/09/22 16:43:10] INFO     Kedro project kedro-tutorial                                         session.py:346
[08/09/22 16:43:11] INFO     Loading data from 'companies' (CSVDataSet)...                   data_catalog.py:343
                    INFO     Running node: preprocess_companies_node:                                node.py:327
                             preprocess_companies([companies]) -> [preprocessed_companies]
                    INFO     Saving data to 'preprocessed_companies' (MemoryDataSet)...      data_catalog.py:382
                    INFO     Completed 1 out of 1 tasks                                  sequential_runner.py:85
                    INFO     Pipeline execution completed successfully.                             runner.py:89
                    INFO     Loading data from 'preprocessed_companies' (MemoryDataSet)...   data_catalog.py:343

Untuk menguji semua node sebagai pipeline pemrosesan data yang lengkap, jalankan kedro run.

$ kedro run

[08/09/22 17:00:54] INFO     Kedro project kedro-tutorial                                         session.py:346
[08/09/22 17:01:10] INFO     Reached after_catalog_created hook                                     plugin.py:17
                    INFO     Loading data from 'companies' (CSVDataSet)...                   data_catalog.py:343
                    INFO     Running node: preprocess_companies_node:                                node.py:327
                             preprocess_companies([companies]) -> [preprocessed_companies]
                    INFO     Saving data to 'preprocessed_companies' (MemoryDataSet)...      data_catalog.py:382
                    INFO     Completed 1 out of 3 tasks                                  sequential_runner.py:85
                    INFO     Loading data from 'shuttles' (ExcelDataSet)...                  data_catalog.py:343
[08/09/22 17:01:25] INFO     Running node: preprocess_shuttles_node: preprocess_shuttles([shuttles]) node.py:327
                             -> [preprocessed_shuttles]

                    INFO     Saving data to 'preprocessed_shuttles' (MemoryDataSet)...       data_catalog.py:382
                    INFO     Completed 2 out of 3 tasks                                  sequential_runner.py:85
                    INFO     Loading data from 'preprocessed_shuttles' (MemoryDataSet)...    data_catalog.py:343
                    INFO     Loading data from 'preprocessed_companies' (MemoryDataSet)...   data_catalog.py:343
                    INFO     Loading data from 'reviews' (CSVDataSet)...                     data_catalog.py:343
                    INFO     Running node: create_model_input_table_node:                            node.py:327
                             create_model_input_table([preprocessed_shuttles,preprocessed_companies,
                             reviews]) -> [model_input_table]
[08/09/22 17:01:28] INFO     Saving data to 'model_input_table' (MemoryDataSet)...           data_catalog.py:382
[08/09/22 17:01:29] INFO     Completed 3 out of 3 tasks                                  sequential_runner.py:85
                    INFO     Pipeline execution completed successfully.                             runner.py:89
                    INFO     Loading data from 'model_input_table' (MemoryDataSet)...        data_catalog.py:343

Jika Anda ingin menyimpan data yang dihasilkan oleh data prapemrosesan dalam sebuah file, tambahkan yang berikut ini ke conf/base/catalog.yml.

conf/base/catalog.yml
preprocessed_companies:
  type: pandas.ParquetDataSet
  filepath: data/02_intermediate/preprocessed_companies.pq

preprocessed_shuttles:
  type: pandas.ParquetDataSet
  filepath: data/02_intermediate/preprocessed_shuttles.pq

model_input_table:
  type: pandas.ParquetDataSet
  filepath: data/03_primary/model_input_table.pq

Pipeline ilmu data

Buat pipeline ilmu data.

$ kedro pipeline create data_science

Tulis kode berikut di src/spaceflights/pipelines/data_science/nodes.py. LinearRegression dari scikit-learn digunakan untuk kode data science.

src/spaceflights/pipelines/data_science/nodes.py
import logging
from typing import Dict, Tuple

import pandas as pd
from sklearn.linear_model import LinearRegression
from sklearn.metrics import r2_score
from sklearn.model_selection import train_test_split


def split_data(data: pd.DataFrame, parameters: Dict) -> Tuple:
    """Splits data into features and targets training and test sets.

    Args:
        data: Data containing features and target.
        parameters: Parameters defined in parameters/data_science.yml.
    Returns:
        Split data.
    """
    X = data[parameters["features"]]
    y = data["price"]
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=parameters["test_size"], random_state=parameters["random_state"]
    )
    return X_train, X_test, y_train, y_test


def train_model(X_train: pd.DataFrame, y_train: pd.Series) -> LinearRegression:
    """Trains the linear regression model.

    Args:
        X_train: Training data of independent features.
        y_train: Training data for price.

    Returns:
        Trained model.
    """
    regressor = LinearRegression()
    regressor.fit(X_train, y_train)
    return regressor


def evaluate_model(
    regressor: LinearRegression, X_test: pd.DataFrame, y_test: pd.Series
):
    """Calculates and logs the coefficient of determination.

    Args:
        regressor: Trained model.
        X_test: Testing data of independent features.
        y_test: Testing data for price.
    """
    y_pred = regressor.predict(X_test)
    score = r2_score(y_test, y_pred)
    logger = logging.getLogger(__name__)
    logger.info("Model has a coefficient R^2 of %.3f on test data.", score)

Atur parameter di conf/base/parameter/data_science.yml yang akan direferensikan selama eksekusi pipeline.

conf/base/parameters/data_science.yml
model_options:
  test_size: 0.2
  random_state: 3
  features:
    - engines
    - passenger_capacity
    - crew
    - d_check_complete
    - moon_clearance_complete
    - iata_approved
    - company_rating
    - review_scores_rating

Tulis kode berikut di src/spaceflights/pipelines/data_science/pipeline.py.

src/spaceflights/pipelines/data_science/pipeline.py
from kedro.pipeline import Pipeline, node, pipeline

from .nodes import evaluate_model, split_data, train_model


def create_pipeline(**kwargs) -> Pipeline:
    return pipeline(
        [
            node(
                func=split_data,
                inputs=["model_input_table", "params:model_options"],
                outputs=["X_train", "X_test", "y_train", "y_test"],
                name="split_data_node",
            ),
            node(
                func=train_model,
                inputs=["X_train", "y_train"],
                outputs="regressor",
                name="train_model_node",
            ),
            node(
                func=evaluate_model,
                inputs=["regressor", "X_test", "y_test"],
                outputs=None,
                name="evaluate_model_node",
            ),
        ]
    )

Tambahkan dataset ke conf/base/catalog.yml.

conf/base/catalog.yml
regressor:
  type: pickle.PickleDataSet
  filepath: data/06_models/regressor.pickle
  versioned: true

Jalankan kedro run untuk menguji pipeline.

$ kedro run

[08/09/22 16:56:00] INFO     Kedro project kedro-tutorial                                         session.py:346
                    INFO     Loading data from 'companies' (CSVDataSet)...                   data_catalog.py:343
                    INFO     Running node: preprocess_companies_node:                                node.py:327
                             preprocess_companies([companies]) -> [preprocessed_companies]
                    INFO     Saving data to 'preprocessed_companies' (MemoryDataSet)...      data_catalog.py:382
                    INFO     Completed 1 out of 6 tasks                                  sequential_runner.py:85
                    INFO     Loading data from 'shuttles' (ExcelDataSet)...                  data_catalog.py:343
[08/09/22 16:56:15] INFO     Running node: preprocess_shuttles_node: preprocess_shuttles([shuttles]) node.py:327
                             -> [preprocessed_shuttles]
                    INFO     Saving data to 'preprocessed_shuttles' (MemoryDataSet)...       data_catalog.py:382
                    INFO     Completed 2 out of 6 tasks                                  sequential_runner.py:85
                    INFO     Loading data from 'preprocessed_shuttles' (MemoryDataSet)...    data_catalog.py:343
                    INFO     Loading data from 'preprocessed_companies' (MemoryDataSet)...   data_catalog.py:343
                    INFO     Loading data from 'reviews' (CSVDataSet)...                     data_catalog.py:343
                    INFO     Running node: create_model_input_table_node:                            node.py:327
                             create_model_input_table([preprocessed_shuttles,preprocessed_companies,
                             reviews]) -> [model_input_table]
[08/09/22 16:56:18] INFO     Saving data to 'model_input_table' (MemoryDataSet)...           data_catalog.py:382
[08/09/22 16:56:19] INFO     Completed 3 out of 6 tasks                                  sequential_runner.py:85
                    INFO     Loading data from 'model_input_table' (MemoryDataSet)...        data_catalog.py:343
                    INFO     Loading data from 'params:model_options' (MemoryDataSet)...     data_catalog.py:343
                    INFO     Running node: split_data_node:                                          node.py:327
                             split_data([model_input_table,params:model_options]) ->
                             [X_train,X_test,y_train,y_test]
                    INFO     Saving data to 'X_train' (MemoryDataSet)...                     data_catalog.py:382
                    INFO     Saving data to 'X_test' (MemoryDataSet)...                      data_catalog.py:382
                    INFO     Saving data to 'y_train' (MemoryDataSet)...                     data_catalog.py:382
                    INFO     Saving data to 'y_test' (MemoryDataSet)...                      data_catalog.py:382
                    INFO     Completed 4 out of 6 tasks                                  sequential_runner.py:85
                    INFO     Loading data from 'X_train' (MemoryDataSet)...                  data_catalog.py:343
                    INFO     Loading data from 'y_train' (MemoryDataSet)...                  data_catalog.py:343
                    INFO     Running node: train_model_node: train_model([X_train,y_train]) ->       node.py:327
                             [regressor]
[08/09/22 16:56:20] INFO     Saving data to 'regressor' (PickleDataSet)...                   data_catalog.py:382
                    INFO     Completed 5 out of 6 tasks                                  sequential_runner.py:85
                    INFO     Loading data from 'regressor' (PickleDataSet)...                data_catalog.py:343
                    INFO     Loading data from 'X_test' (MemoryDataSet)...                   data_catalog.py:343
                    INFO     Loading data from 'y_test' (MemoryDataSet)...                   data_catalog.py:343
                    INFO     Running node: evaluate_model_node:                                      node.py:327
                             evaluate_model([regressor,X_test,y_test]) -> None
                    INFO     Model has a coefficient R^2 of 0.462 on test data.                      nodes.py:55
                    INFO     Completed 6 out of 6 tasks                                  sequential_runner.py:85
                    INFO     Pipeline execution completed successfully.                             runner.py:89

Kemas proyek

Hasilkan dokumentasi

Perintah-perintah berikut ini dapat dieksekusi di direktori root untuk menghasilkan dokumentasi.

$ kedro build-docs --open

Sebuah dokumen HTML akan dihasilkan di docs/build/html dan secara otomatis dibuka di browser Anda.

Pengemasan

Perintah berikut ini dapat dieksekusi di direktori root untuk memaketkan sebuah proyek.

$ kedro package

Paket akan dibangun di folder /dist dan akan menghasilkan file .egg dan file .whl. Perhatikan bahwa paket hanya berisi kode Python, bukan conf, data, logs, dll.

Perintah berikut ini akan menghasilkan proyek di folder bin dari instalasi Python Anda.

$ pip install <path-to-wheel-file>

Referensi

https://kedro.readthedocs.io/en/stable/tutorial/spaceflights_tutorial.html

Ryusei Kakujo

researchgatelinkedingithub

Focusing on data science for mobility

Bench Press 100kg!