Apa itu Kedro
Kedro adalah alat manajemen pipeline OSS yang dikembangkan oleh Quantum Black Lab, cabang penelitian McKinsey & Company, yang memungkinkan Anda membuat kode data science yang sangat dapat direproduksi, dapat dipelihara, dan modular.
Fitur Kedro
Kedro memiliki fitur-fitur utama berikut.
- Visualisasi pipeline
- Anda dapat memvisualisasikan DAG dari pipeline Anda.
- Katalog data
- Tersedia konektor data ringan untuk membaca dan menulis data antara banyak format file (Pandas, Spark, Dask, NetworkX, Pickle, Plotly, Matplotlib, dll.) Dan sistem file (S3, GCP, Azure, sFTP, DBFS, lokal).
- Integrasi
- Integrasi yang sangat baik dengan Apache Spark, Pandas, Dask, Matplotlib, Plotly, fsspec, Apache Airflow, Jupyter Notebook, Docker, dll.
- Template proyek
- Templat proyek memungkinkan Anda untuk menstandarkan cara Anda mengatur konfigurasi, kode sumber, pengujian, dokumentasi, buku catatan, dll.
- Abstraksi pipeline
- Kedro mendukung alur kerja berdatabaseset yang secara otomatis menyelesaikan ketergantungan antar fungsi, menghilangkan kebutuhan untuk memberi label urutan pelaksanaan tugas dalam pipeline.
- Fleksibilitas dalam penyebaran
- Pipeline dapat diterapkan ke Argo, Prefect, Kubeflow, AWS Batch, AWS Sagemaker, Databricks, Dask, dll.
Komponen Kedro
Kedro memiliki empat komponen berikut.
- Node
- Pipeline
- DataCatalog
- Runner
Node
Node adalah unit dari setiap proses dalam pipeline. Anda menggunakan node()
untuk mendefinisikan pemrosesan, input, dan output.
Dalam kode berikut, fungsi return_greeting
dibungkus dalam sebuah node yang disebut return_greeting_node
, yang tidak memiliki input dan menamai outputnya may_salutation
.
from kedro.pipeline import node
# Prepare first node
def return_greeting():
return "Hello"
return_greeting_node = node(func=return_greeting,
inputs=None,
outputs="my_salutation")
Dalam kode berikut, fungsi join_statements
dibungkus dalam sebuah node yang disebut join_statements_node
dengan my_salutation
sebagai input dan output bernama my_message
.
# Prepare second node
def join_statements(greeting):
return f"{greeting} Kedro!"
join_statements_node = node(
join_statements,
inputs="my_salutation",
outputs="my_message"
)
Ini berarti bahwa my_salutation
adalah output dari return_greeting_node
dan input dari join_statements_node
. Dengan demikian, input
dan output
dari node()
dapat mendefinisikan ketergantungan dari pipeline.
Pipeline
Pipeline adalah sebuah jalur eksekusi. Pipeline dibangun dengan melewatkan daftar Node ke Pipeline()
. Urutan daftar yang akan dioper ke pipeline adalah sembarang.
Dalam pipeline berikut ini, return_greeting_node
dieksekusi terlebih dahulu, diikuti oleh join_statements_node
.
from kedro.pipeline import Pipeline
# Assemble nodes into a pipeline
pipeline = Pipeline([return_greeting_node, join_statements_node])
DataCatalog
DataCatalog adalah katalog yang mendefinisikan data yang akan digunakan dalam pipeline; DataCatalog mengambil Node inputs
dan outputs
sebagai kunci dan membaca dan menulisnya ke dalam Dataset
. Dataset
mendukung berbagai format file dan sistem file. Silakan lihat dokumentasi resmi di bawah ini untuk Dataset
yang tersedia.
Kode berikut menggunakan MemoryDataSet
untuk menyimpan my_salutation
di memori.
from kedro.io import DataCatalog, MemoryDataSet
# Prepare a data catalog
data_catalog = DataCatalog({"my_salutation": MemoryDataSet()})
Runner
Ada tiga jenis runner: SequentialRunner
untuk mengeksekusi pipeline secara seri, ParallelRunner
untuk mengeksekusi pipeline secara paralel, dan ThreadRunner
untuk mengeksekusi pipeline secara thread.
# Create a runner to run the pipeline
runner = SequentialRunner()
# Run the pipeline
print(runner.run(pipeline, data_catalog))
Hello Kedro
Kode lengkap untuk komponen di atas ditunjukkan di bawah ini.
from kedro.io import DataCatalog, MemoryDataSet
from kedro.pipeline import node, Pipeline
from kedro.runner import SequentialRunner
# Prepare a data catalog
data_catalog = DataCatalog({"my_salutation": MemoryDataSet()})
# Prepare first node
def return_greeting():
return "Hello"
return_greeting_node = node(return_greeting, inputs=None, outputs="my_salutation")
# Prepare second node
def join_statements(greeting):
return f"{greeting} Kedro!"
join_statements_node = node(
join_statements, inputs="my_salutation", outputs="my_message"
)
# Assemble nodes into a pipeline
pipeline = Pipeline([return_greeting_node, join_statements_node])
# Create a runner to run the pipeline
runner = SequentialRunner()
# Run the pipeline
print(runner.run(pipeline, data_catalog))
Apabila kode dieksekusi, proses berikut ini dilakukan.
- Eksekusi
return_greeting_node
(yang mengeksekusireturn_greeting
dan mengeluarkan string "Hello") - Simpan string output ke
MemoryDataSet
bernamamy_salutation
- Jalankan
join_statements_node
(set datamy_salutation
dimuat dan diinjeksikan ke dalam fungsijoin_statements
) - Gabungkan dengan "Kedro!" untuk menghasilkan string output "Hello Kedro!
- Kembalikan output dari pipeline dalam kamus dengan
my_message
sebagai kunci
$ python hello_kedro.py
{'my_message': 'Hello Kedro!'}
Bagaimana cara menggunakan Kedro
Berikut adalah cara menggunakan Kedro.
Instalasi
Instal Kedro dengan perintah berikut.
$ pip install kedro
Verifikasi bahwa kedro telah diinstal.
$ kedro info
| | _____ __| |_ __ ___
| |/ / _ \/ _` | '__/ _ \
| < __/ (_| | | | (_) |
|_|\_\___|\__,_|_| \___/
v0.18.4
Kedro is a Python framework for
creating reproducible, maintainable
and modular data science code.
No plugins installed
Pembuatan proyek
Kedro memiliki fungsi untuk membuat sebuah proyek dari sebuah template. Pada dasarnya, anda dapat menggunakan fungsi template ini untuk membuat sebuah proyek.
kedro new
Buat sebuah proyek dengan perintah kedro new
. Pada titik ini, anda akan ditanyai serangkaian pertanyaan interaktif, yang akan anda jawab.
$ kedro new
Project Name
============
Please enter a human readable name for your new project.
Spaces, hyphens, and underscores are allowed.
[New Kedro Project]: get_started
The project name 'get_started' has been applied to:
- The project title in /Users/ryu/get_started/README.md
- The folder created for your project in /Users/ryu/get_started
- The project's python package in /Users/ryu/demo/src/get_started
A best-practice setup includes initialising git and creating a virtual environment before running 'pip install -r src/requirements.txt' to install project-specific dependencies. Refer to the Kedro documentation: https://kedro.readthedocs.io/
Direktori berikut ini dibuat.
get-started
├── README.md
├── conf
│ ├── README.md
│ ├── base
│ │ ├── catalog.yml
│ │ ├── logging.yml
│ │ └── parameters.yml
│ └── local
│ └── credentials.yml
├── data
│ ├── 01_raw
│ ├── 02_intermediate
│ ├── 03_primary
│ ├── 04_feature
│ ├── 05_model_input
│ ├── 06_models
│ ├── 07_model_output
│ └── 08_reporting
├── docs
│ └── source
│ ├── conf.py
│ └── index.rst
├── logs
├── notebooks
├── pyproject.toml
├── setup.cfg
└── src
├── get_started
│ ├── __init__.py
│ ├── __main__.py
│ ├── pipeline_registry.py
│ ├── pipelines
│ │ └── __init__.py
│ └── settings.py
├── requirements.txt
├── setup.py
└── tests
├── __init__.py
├── pipelines
│ └── __init__.py
└── test_run.py
Berikut ini adalah deskripsi direktori.
Nama file (nama folder) | Deskripsi |
---|---|
conf | Project configuration files |
data | Local project data (not committed to version control) |
docs | Project documentation |
logs | Project output logs (not committed to version control) |
notebooks | Project-related Jupyter notebooks (can be used for experimental code before moving the code to src) |
pyproject.toml | Identifies the project root and contains configuration information |
setup.cfg | Configuration options for pytest when doing kedro test and for the isort utility when doing kedro lint |
src | Project source code |
Perintah berikut akan menginstal paket.
# Move to project directory
$ cd get_started
# Install dependencies
$ pip install -r src/requirements.txt
Anda bisa menjalankan pipeline dengan perintah kedro run
.
$ kedro run
Namun, dalam keadaan saat ini, perintah di atas akan menghasilkan error karena logika pipeline belum dituliskan.
kedro new --config
Anda dapat membuat proyek dari file Yaml yang telah disiapkan sebelumnya dengan menambahkan flag --config
saat menjalankan kedro new
.
output_dir: ~/code
project_name: My First Kedro Project
repo_name: testing-kedro
python_package: test_kedro
$ kedro new --config <path>/config.yml
kedro new --starter=pandas-iris
Anda dapat membuat proyek template untuk dataset iris
dengan menentukan pandas-iris
sebagai starter
.
$ kedro new --starter=pandas-iris
Project Name
============
Please enter a human readable name for your new project.
Spaces, hyphens, and underscores are allowed.
[Iris]:
The project name 'Iris' has been applied to:
- The project title in /Users/ryu/iris/README.md
- The folder created for your project in /Users/ryu/iris
- The project's python package in /Users/ryu/iris/src/iris
A best-practice setup includes initialising git and creating a virtual environment before running 'pip install -r src/requirements.txt' to install project-specific dependencies. Refer to the Kedro documentation: https://kedro.readthedocs.io/
iris
├── README.md
├── conf
│ ├── README.md
│ ├── base
│ │ ├── catalog.yml
│ │ ├── logging.yml
│ │ └── parameters.yml
│ └── local
│ └── credentials.yml
├── data
│ ├── 01_raw
│ │ └── iris.csv
│ ├── 02_intermediate
│ ├── 03_primary
│ ├── 04_feature
│ ├── 05_model_input
│ ├── 06_models
│ ├── 07_model_output
│ └── 08_reporting
├── docs
│ └── source
│ ├── conf.py
│ └── index.rst
├── logs
├── notebooks
├── pyproject.toml
├── setup.cfg
└── src
├── iris
│ ├── README.md
│ ├── __init__.py
│ ├── __main__.py
│ ├── nodes.py
│ ├── pipeline.py
│ ├── pipeline_registry.py
│ └── settings.py
├── requirements.txt
├── setup.py
└── tests
├── __init__.py
├── test_pipeline.py
└── test_run.py
Instal pustaka.
$ cd iris
$ pip install -r src/requirements.txt
Mengeksekusi pipeline.
$ kedro run
You have opted out of product usage analytics, so none will be collected.
[01/14/23 11:53:19] INFO Kedro project iris session.py:340
[01/14/23 11:53:20] INFO Loading data from 'example_iris_data' (CSVDataSet)... data_catalog.py:343
INFO Loading data from 'parameters' (MemoryDataSet)... data_catalog.py:343
INFO Running node: split: split_data([example_iris_data,parameters]) -> node.py:327
[X_train,X_test,y_train,y_test]
INFO Saving data to 'X_train' (MemoryDataSet)... data_catalog.py:382
INFO Saving data to 'X_test' (MemoryDataSet)... data_catalog.py:382
INFO Saving data to 'y_train' (MemoryDataSet)... data_catalog.py:382
INFO Saving data to 'y_test' (MemoryDataSet)... data_catalog.py:382
INFO Completed 1 out of 3 tasks sequential_runner.py:85
INFO Loading data from 'X_train' (MemoryDataSet)... data_catalog.py:343
INFO Loading data from 'X_test' (MemoryDataSet)... data_catalog.py:343
INFO Loading data from 'y_train' (MemoryDataSet)... data_catalog.py:343
INFO Running node: make_predictions: make_predictions([X_train,X_test,y_train]) -> [y_pred] node.py:327
INFO Saving data to 'y_pred' (MemoryDataSet)... data_catalog.py:382
INFO Completed 2 out of 3 tasks sequential_runner.py:85
INFO Loading data from 'y_pred' (MemoryDataSet)... data_catalog.py:343
INFO Loading data from 'y_test' (MemoryDataSet)... data_catalog.py:343
INFO Running node: report_accuracy: report_accuracy([y_pred,y_test]) -> None node.py:327
INFO Model has accuracy of 0.933 on test data. nodes.py:74
INFO Completed 3 out of 3 tasks sequential_runner.py:85
INFO Pipeline execution completed successfully.
Log dari pemrosesan yang dilakukan adalah output ke logs/info.log
.
2023-01-14 11:53:19,849 - kedro.framework.session.session - INFO - Kedro project iris
2023-01-14 11:53:20,796 - kedro.io.data_catalog - INFO - Loading data from 'example_iris_data' (CSVDataSet)...
2023-01-14 11:53:20,805 - kedro.io.data_catalog - INFO - Loading data from 'parameters' (MemoryDataSet)...
2023-01-14 11:53:20,807 - kedro.pipeline.node - INFO - Running node: split: split_data([example_iris_data,parameters]) -> [X_train,X_test,y_train,y_test]
2023-01-14 11:53:20,811 - kedro.io.data_catalog - INFO - Saving data to 'X_train' (MemoryDataSet)...
2023-01-14 11:53:20,813 - kedro.io.data_catalog - INFO - Saving data to 'X_test' (MemoryDataSet)...
2023-01-14 11:53:20,814 - kedro.io.data_catalog - INFO - Saving data to 'y_train' (MemoryDataSet)...
2023-01-14 11:53:20,815 - kedro.io.data_catalog - INFO - Saving data to 'y_test' (MemoryDataSet)...
2023-01-14 11:53:20,817 - kedro.runner.sequential_runner - INFO - Completed 1 out of 3 tasks
2023-01-14 11:53:20,818 - kedro.io.data_catalog - INFO - Loading data from 'X_train' (MemoryDataSet)...
2023-01-14 11:53:20,820 - kedro.io.data_catalog - INFO - Loading data from 'X_test' (MemoryDataSet)...
2023-01-14 11:53:20,821 - kedro.io.data_catalog - INFO - Loading data from 'y_train' (MemoryDataSet)...
2023-01-14 11:53:20,822 - kedro.pipeline.node - INFO - Running node: make_predictions: make_predictions([X_train,X_test,y_train]) -> [y_pred]
2023-01-14 11:53:20,824 - kedro.io.data_catalog - INFO - Saving data to 'y_pred' (MemoryDataSet)...
2023-01-14 11:53:20,826 - kedro.runner.sequential_runner - INFO - Completed 2 out of 3 tasks
2023-01-14 11:53:20,827 - kedro.io.data_catalog - INFO - Loading data from 'y_pred' (MemoryDataSet)...
2023-01-14 11:53:20,829 - kedro.io.data_catalog - INFO - Loading data from 'y_test' (MemoryDataSet)...
2023-01-14 11:53:20,830 - kedro.pipeline.node - INFO - Running node: report_accuracy: report_accuracy([y_pred,y_test]) -> None
2023-01-14 11:53:20,832 - iris.nodes - INFO - Model has accuracy of 0.933 on test data.
2023-01-14 11:53:20,833 - kedro.runner.sequential_runner - INFO - Completed 3 out of 3 tasks
2023-01-14 11:53:20,835 - kedro.runner.sequential_runner - INFO - Pipeline execution completed successfully.
Starter lainnya
Selain pandas-iris
, Kedro menawarkan Starter berikut ini.
Starter | Deskripsi |
---|---|
astro-airflow-iris |
The Kedro Iris dataset example project with a minimal setup for deploying the pipeline on Airflow with Astronomer. |
standalone-datacatalog |
A minimum setup to use the traditional Iris dataset with Kedro’s DataCatalog, which is a core component of Kedro. This starter is of use in the exploratory phase of a project. For more information, read the guide to standalone use of the DataCatalog. This starter was formerly known as mini-kedro. |
pandas-iris |
The Kedro Iris dataset example project |
pyspark-iris |
An alternative Kedro Iris dataset example, using PySpark |
pyspark |
The configuration and initialisation code for a Kedro pipeline using PySpark |
spaceflights |
The spaceflights tutorial example code |
Referensi