Traffine I/O

Bahasa Indonesia

2023-01-14

Kedro

Apa itu Kedro

Kedro adalah alat manajemen pipeline OSS yang dikembangkan oleh Quantum Black Lab, cabang penelitian McKinsey & Company, yang memungkinkan Anda membuat kode data science yang sangat dapat direproduksi, dapat dipelihara, dan modular.

Fitur Kedro

Kedro memiliki fitur-fitur utama berikut.

  • Visualisasi pipeline
    • Anda dapat memvisualisasikan DAG dari pipeline Anda.
  • Katalog data
    • Tersedia konektor data ringan untuk membaca dan menulis data antara banyak format file (Pandas, Spark, Dask, NetworkX, Pickle, Plotly, Matplotlib, dll.) Dan sistem file (S3, GCP, Azure, sFTP, DBFS, lokal).
  • Integrasi
    • Integrasi yang sangat baik dengan Apache Spark, Pandas, Dask, Matplotlib, Plotly, fsspec, Apache Airflow, Jupyter Notebook, Docker, dll.
  • Template proyek
    • Templat proyek memungkinkan Anda untuk menstandarkan cara Anda mengatur konfigurasi, kode sumber, pengujian, dokumentasi, buku catatan, dll.
  • Abstraksi pipeline
    • Kedro mendukung alur kerja berdatabaseset yang secara otomatis menyelesaikan ketergantungan antar fungsi, menghilangkan kebutuhan untuk memberi label urutan pelaksanaan tugas dalam pipeline.
  • Fleksibilitas dalam penyebaran
    • Pipeline dapat diterapkan ke Argo, Prefect, Kubeflow, AWS Batch, AWS Sagemaker, Databricks, Dask, dll.

Komponen Kedro

Kedro memiliki empat komponen berikut.

  • Node
  • Pipeline
  • DataCatalog
  • Runner

Node

Node adalah unit dari setiap proses dalam pipeline. Anda menggunakan node() untuk mendefinisikan pemrosesan, input, dan output.

Dalam kode berikut, fungsi return_greeting dibungkus dalam sebuah node yang disebut return_greeting_node, yang tidak memiliki input dan menamai outputnya may_salutation.

from kedro.pipeline import node

# Prepare first node
def return_greeting():
    return "Hello"

return_greeting_node = node(func=return_greeting,
                            inputs=None,
                            outputs="my_salutation")

Dalam kode berikut, fungsi join_statements dibungkus dalam sebuah node yang disebut join_statements_node dengan my_salutation sebagai input dan output bernama my_message.

# Prepare second node
def join_statements(greeting):
    return f"{greeting} Kedro!"

join_statements_node = node(
    join_statements,
    inputs="my_salutation",
    outputs="my_message"
)

Ini berarti bahwa my_salutation adalah output dari return_greeting_node dan input dari join_statements_node. Dengan demikian, input dan output dari node() dapat mendefinisikan ketergantungan dari pipeline.

Pipeline

Pipeline adalah sebuah jalur eksekusi. Pipeline dibangun dengan melewatkan daftar Node ke Pipeline(). Urutan daftar yang akan dioper ke pipeline adalah sembarang.

Dalam pipeline berikut ini, return_greeting_node dieksekusi terlebih dahulu, diikuti oleh join_statements_node.

from kedro.pipeline import Pipeline

# Assemble nodes into a pipeline
pipeline = Pipeline([return_greeting_node, join_statements_node])

DataCatalog

DataCatalog adalah katalog yang mendefinisikan data yang akan digunakan dalam pipeline; DataCatalog mengambil Node inputs dan outputs sebagai kunci dan membaca dan menulisnya ke dalam Dataset. Dataset mendukung berbagai format file dan sistem file. Silakan lihat dokumentasi resmi di bawah ini untuk Dataset yang tersedia.

https://kedro.readthedocs.io/en/0.17.6/kedro.extras.datasets.html

Kode berikut menggunakan MemoryDataSet untuk menyimpan my_salutation di memori.

from kedro.io import DataCatalog, MemoryDataSet

# Prepare a data catalog
data_catalog = DataCatalog({"my_salutation": MemoryDataSet()})

Runner

Ada tiga jenis runner: SequentialRunner untuk mengeksekusi pipeline secara seri, ParallelRunner untuk mengeksekusi pipeline secara paralel, dan ThreadRunner untuk mengeksekusi pipeline secara thread.

# Create a runner to run the pipeline
runner = SequentialRunner()

# Run the pipeline
print(runner.run(pipeline, data_catalog))

Hello Kedro

Kode lengkap untuk komponen di atas ditunjukkan di bawah ini.

hello_kedro.py
from kedro.io import DataCatalog, MemoryDataSet
from kedro.pipeline import node, Pipeline
from kedro.runner import SequentialRunner

# Prepare a data catalog
data_catalog = DataCatalog({"my_salutation": MemoryDataSet()})

# Prepare first node
def return_greeting():
    return "Hello"


return_greeting_node = node(return_greeting, inputs=None, outputs="my_salutation")

# Prepare second node
def join_statements(greeting):
    return f"{greeting} Kedro!"


join_statements_node = node(
    join_statements, inputs="my_salutation", outputs="my_message"
)

# Assemble nodes into a pipeline
pipeline = Pipeline([return_greeting_node, join_statements_node])

# Create a runner to run the pipeline
runner = SequentialRunner()

# Run the pipeline
print(runner.run(pipeline, data_catalog))

Apabila kode dieksekusi, proses berikut ini dilakukan.

  1. Eksekusi return_greeting_node (yang mengeksekusi return_greeting dan mengeluarkan string "Hello")
  2. Simpan string output ke MemoryDataSet bernama my_salutation
  3. Jalankan join_statements_node (set data my_salutation dimuat dan diinjeksikan ke dalam fungsi join_statements)
  4. Gabungkan dengan "Kedro!" untuk menghasilkan string output "Hello Kedro!
  5. Kembalikan output dari pipeline dalam kamus dengan my_message sebagai kunci
$ python hello_kedro.py

{'my_message': 'Hello Kedro!'}

Bagaimana cara menggunakan Kedro

Berikut adalah cara menggunakan Kedro.

Instalasi

Instal Kedro dengan perintah berikut.

$ pip install kedro

Verifikasi bahwa kedro telah diinstal.

$ kedro info

| | _____  __| |_ __ ___
| |/ / _ \/ _` | '__/ _ \
|   <  __/ (_| | | | (_) |
|_|\_\___|\__,_|_|  \___/
v0.18.4

Kedro is a Python framework for
creating reproducible, maintainable
and modular data science code.

No plugins installed

Pembuatan proyek

Kedro memiliki fungsi untuk membuat sebuah proyek dari sebuah template. Pada dasarnya, anda dapat menggunakan fungsi template ini untuk membuat sebuah proyek.

kedro new

Buat sebuah proyek dengan perintah kedro new. Pada titik ini, anda akan ditanyai serangkaian pertanyaan interaktif, yang akan anda jawab.

$ kedro new

Project Name
============
Please enter a human readable name for your new project.
Spaces, hyphens, and underscores are allowed.
 [New Kedro Project]: get_started

The project name 'get_started' has been applied to:
- The project title in /Users/ryu/get_started/README.md
- The folder created for your project in /Users/ryu/get_started
- The project's python package in /Users/ryu/demo/src/get_started

A best-practice setup includes initialising git and creating a virtual environment before running 'pip install -r src/requirements.txt' to install project-specific dependencies. Refer to the Kedro documentation: https://kedro.readthedocs.io/

Direktori berikut ini dibuat.

get-started
├── README.md
├── conf
│   ├── README.md
│   ├── base
│   │   ├── catalog.yml
│   │   ├── logging.yml
│   │   └── parameters.yml
│   └── local
│       └── credentials.yml
├── data
│   ├── 01_raw
│   ├── 02_intermediate
│   ├── 03_primary
│   ├── 04_feature
│   ├── 05_model_input
│   ├── 06_models
│   ├── 07_model_output
│   └── 08_reporting
├── docs
│   └── source
│       ├── conf.py
│       └── index.rst
├── logs
├── notebooks
├── pyproject.toml
├── setup.cfg
└── src
    ├── get_started
    │   ├── __init__.py
    │   ├── __main__.py
    │   ├── pipeline_registry.py
    │   ├── pipelines
    │   │   └── __init__.py
    │   └── settings.py
    ├── requirements.txt
    ├── setup.py
    └── tests
        ├── __init__.py
        ├── pipelines
        │   └── __init__.py
        └── test_run.py

Berikut ini adalah deskripsi direktori.

Nama file (nama folder) Deskripsi
conf Project configuration files
data Local project data (not committed to version control)
docs Project documentation
logs Project output logs (not committed to version control)
notebooks Project-related Jupyter notebooks (can be used for experimental code before moving the code to src)
pyproject.toml Identifies the project root and contains configuration information
setup.cfg Configuration options for pytest when doing kedro test and for the isort utility when doing kedro lint
src Project source code

Perintah berikut akan menginstal paket.

# Move to project directory
$ cd get_started

# Install dependencies
$ pip install -r src/requirements.txt

Anda bisa menjalankan pipeline dengan perintah kedro run.

$ kedro run

Namun, dalam keadaan saat ini, perintah di atas akan menghasilkan error karena logika pipeline belum dituliskan.

kedro new --config

Anda dapat membuat proyek dari file Yaml yang telah disiapkan sebelumnya dengan menambahkan flag --config saat menjalankan kedro new.

code/config.yaml
output_dir: ~/code
project_name: My First Kedro Project
repo_name: testing-kedro
python_package: test_kedro
$ kedro new --config <path>/config.yml

kedro new --starter=pandas-iris

Anda dapat membuat proyek template untuk dataset iris dengan menentukan pandas-iris sebagai starter.

$ kedro new --starter=pandas-iris

Project Name
============
Please enter a human readable name for your new project.
Spaces, hyphens, and underscores are allowed.
 [Iris]:

The project name 'Iris' has been applied to:
- The project title in /Users/ryu/iris/README.md
- The folder created for your project in /Users/ryu/iris
- The project's python package in /Users/ryu/iris/src/iris

A best-practice setup includes initialising git and creating a virtual environment before running 'pip install -r src/requirements.txt' to install project-specific dependencies. Refer to the Kedro documentation: https://kedro.readthedocs.io/
iris
├── README.md
├── conf
│   ├── README.md
│   ├── base
│   │   ├── catalog.yml
│   │   ├── logging.yml
│   │   └── parameters.yml
│   └── local
│       └── credentials.yml
├── data
│   ├── 01_raw
│   │   └── iris.csv
│   ├── 02_intermediate
│   ├── 03_primary
│   ├── 04_feature
│   ├── 05_model_input
│   ├── 06_models
│   ├── 07_model_output
│   └── 08_reporting
├── docs
│   └── source
│       ├── conf.py
│       └── index.rst
├── logs
├── notebooks
├── pyproject.toml
├── setup.cfg
└── src
    ├── iris
    │   ├── README.md
    │   ├── __init__.py
    │   ├── __main__.py
    │   ├── nodes.py
    │   ├── pipeline.py
    │   ├── pipeline_registry.py
    │   └── settings.py
    ├── requirements.txt
    ├── setup.py
    └── tests
        ├── __init__.py
        ├── test_pipeline.py
        └── test_run.py

Instal pustaka.

$ cd iris
$ pip install -r src/requirements.txt

Mengeksekusi pipeline.

$ kedro run

You have opted out of product usage analytics, so none will be collected.
[01/14/23 11:53:19] INFO     Kedro project iris                                                                           session.py:340
[01/14/23 11:53:20] INFO     Loading data from 'example_iris_data' (CSVDataSet)...                                   data_catalog.py:343
                    INFO     Loading data from 'parameters' (MemoryDataSet)...                                       data_catalog.py:343
                    INFO     Running node: split: split_data([example_iris_data,parameters]) ->                              node.py:327
                             [X_train,X_test,y_train,y_test]
                    INFO     Saving data to 'X_train' (MemoryDataSet)...                                             data_catalog.py:382
                    INFO     Saving data to 'X_test' (MemoryDataSet)...                                              data_catalog.py:382
                    INFO     Saving data to 'y_train' (MemoryDataSet)...                                             data_catalog.py:382
                    INFO     Saving data to 'y_test' (MemoryDataSet)...                                              data_catalog.py:382
                    INFO     Completed 1 out of 3 tasks                                                          sequential_runner.py:85
                    INFO     Loading data from 'X_train' (MemoryDataSet)...                                          data_catalog.py:343
                    INFO     Loading data from 'X_test' (MemoryDataSet)...                                           data_catalog.py:343
                    INFO     Loading data from 'y_train' (MemoryDataSet)...                                          data_catalog.py:343
                    INFO     Running node: make_predictions: make_predictions([X_train,X_test,y_train]) -> [y_pred]          node.py:327
                    INFO     Saving data to 'y_pred' (MemoryDataSet)...                                              data_catalog.py:382
                    INFO     Completed 2 out of 3 tasks                                                          sequential_runner.py:85
                    INFO     Loading data from 'y_pred' (MemoryDataSet)...                                           data_catalog.py:343
                    INFO     Loading data from 'y_test' (MemoryDataSet)...                                           data_catalog.py:343
                    INFO     Running node: report_accuracy: report_accuracy([y_pred,y_test]) -> None                         node.py:327
                    INFO     Model has accuracy of 0.933 on test data.                                                       nodes.py:74
                    INFO     Completed 3 out of 3 tasks                                                          sequential_runner.py:85
                    INFO     Pipeline execution completed successfully.

Log dari pemrosesan yang dilakukan adalah output ke logs/info.log.

logs/info.log
2023-01-14 11:53:19,849 - kedro.framework.session.session - INFO - Kedro project iris
2023-01-14 11:53:20,796 - kedro.io.data_catalog - INFO - Loading data from 'example_iris_data' (CSVDataSet)...
2023-01-14 11:53:20,805 - kedro.io.data_catalog - INFO - Loading data from 'parameters' (MemoryDataSet)...
2023-01-14 11:53:20,807 - kedro.pipeline.node - INFO - Running node: split: split_data([example_iris_data,parameters]) -> [X_train,X_test,y_train,y_test]
2023-01-14 11:53:20,811 - kedro.io.data_catalog - INFO - Saving data to 'X_train' (MemoryDataSet)...
2023-01-14 11:53:20,813 - kedro.io.data_catalog - INFO - Saving data to 'X_test' (MemoryDataSet)...
2023-01-14 11:53:20,814 - kedro.io.data_catalog - INFO - Saving data to 'y_train' (MemoryDataSet)...
2023-01-14 11:53:20,815 - kedro.io.data_catalog - INFO - Saving data to 'y_test' (MemoryDataSet)...
2023-01-14 11:53:20,817 - kedro.runner.sequential_runner - INFO - Completed 1 out of 3 tasks
2023-01-14 11:53:20,818 - kedro.io.data_catalog - INFO - Loading data from 'X_train' (MemoryDataSet)...
2023-01-14 11:53:20,820 - kedro.io.data_catalog - INFO - Loading data from 'X_test' (MemoryDataSet)...
2023-01-14 11:53:20,821 - kedro.io.data_catalog - INFO - Loading data from 'y_train' (MemoryDataSet)...
2023-01-14 11:53:20,822 - kedro.pipeline.node - INFO - Running node: make_predictions: make_predictions([X_train,X_test,y_train]) -> [y_pred]
2023-01-14 11:53:20,824 - kedro.io.data_catalog - INFO - Saving data to 'y_pred' (MemoryDataSet)...
2023-01-14 11:53:20,826 - kedro.runner.sequential_runner - INFO - Completed 2 out of 3 tasks
2023-01-14 11:53:20,827 - kedro.io.data_catalog - INFO - Loading data from 'y_pred' (MemoryDataSet)...
2023-01-14 11:53:20,829 - kedro.io.data_catalog - INFO - Loading data from 'y_test' (MemoryDataSet)...
2023-01-14 11:53:20,830 - kedro.pipeline.node - INFO - Running node: report_accuracy: report_accuracy([y_pred,y_test]) -> None
2023-01-14 11:53:20,832 - iris.nodes - INFO - Model has accuracy of 0.933 on test data.
2023-01-14 11:53:20,833 - kedro.runner.sequential_runner - INFO - Completed 3 out of 3 tasks
2023-01-14 11:53:20,835 - kedro.runner.sequential_runner - INFO - Pipeline execution completed successfully.

Starter lainnya

Selain pandas-iris, Kedro menawarkan Starter berikut ini.

Starter Deskripsi
astro-airflow-iris The Kedro Iris dataset example project with a minimal setup for deploying the pipeline on Airflow with Astronomer.
standalone-datacatalog A minimum setup to use the traditional Iris dataset with Kedro’s DataCatalog, which is a core component of Kedro. This starter is of use in the exploratory phase of a project. For more information, read the guide to standalone use of the DataCatalog. This starter was formerly known as mini-kedro.
pandas-iris The Kedro Iris dataset example project
pyspark-iris An alternative Kedro Iris dataset example, using PySpark
pyspark The configuration and initialisation code for a Kedro pipeline using PySpark
spaceflights The spaceflights tutorial example code

Referensi

https://kedro.org/
https://kedro.readthedocs.io/en/stable/get_started/kedro_concepts.html
https://kedro.readthedocs.io/en/0.17.6/02_get_started/03_hello_kedro.html
https://kedro.readthedocs.io/en/0.17.6/kedro.extras.datasets.html
https://kedro.readthedocs.io/en/stable/kedro_project_setup/starters.html
https://neptune.ai/blog/data-science-pipelines-with-kedro

Ryusei Kakujo

researchgatelinkedingithub

Focusing on data science for mobility

Bench Press 100kg!