Traffine I/O

Bahasa Indonesia

2023-01-14

Kedro DataCatalog

DataCatalog

Kedro memiliki fitur yang disebut DataCatalog. Dalam catalog/yml, Anda dapat mendeskripsikan sumber data yang akan digunakan dalam proyek Anda dan mengelola pemuatan dan penyimpanan data. Jalur file untuk dataset yang didukung Kedro secara default adalah sebagai berikut.

  • Local / Network File System
    • file://
  • Hadoop File System (HDFS)
    • hdfs://user@server:port/path/to/data
  • Amazon S3
    • s3://my-bucket-name/path/to/data
  • S3 Compatible Storage
    • s3://my-bucket-name/path/_to/data
  • Google Cloud Storage
    • gcs://
  • Azure Blob Storage / Azure Data Lake Storage Gen2
    • abfs://
  • HTTP(s)
    • http://
    • https://

Contoh catalog.yml

Di bawah ini adalah contoh bagaimana menulis dataset seperti yang ditunjukkan dalam dokumentasi resmi.

conf/base/catalog.yml
# Provide the project value to the underlying filesystem class (GCSFileSystem) to interact with Google Cloud Storage (GCS)
test_dataset:
  type: ...
  fs_args:
    project: test_project


# Load data from a local binary file using utf-8 encoding
test_dataset:
  type: ...
  fs_args:
    open_args_load:
      mode: "rb"
      encoding: "utf-8"


# Save data to a CSV file without row names (index) using utf-8 encoding
test_dataset:
  type: pandas.CSVDataSet
  ...
  save_args:
    index: False
    encoding: "utf-8"

---

# Loads / saves a CSV file from / to a local file system
bikes:
  type: pandas.CSVDataSet
  filepath: data/01_raw/bikes.csv


# Loads and saves a CSV on a local file system, using specified load and save arguments
cars:
  type: pandas.CSVDataSet
  filepath: data/01_raw/company/cars.csv
  load_args:
    sep: ','
  save_args:
    index: False
    date_format: '%Y-%m-%d %H:%M'
    decimal: .


# Loads and saves a compressed CSV on a local file system
boats:
  type: pandas.CSVDataSet
  filepath: data/01_raw/company/boats.csv.gz
  load_args:
    sep: ','
    compression: 'gzip'
  fs_args:
    open_args_load:
      mode: 'rb'


# Loads a CSV file from a specific S3 bucket, using credentials and load arguments
motorbikes:
  type: pandas.CSVDataSet
  filepath: s3://your_bucket/data/02_intermediate/company/motorbikes.csv
  credentials: dev_s3
  load_args:
    sep: ','
    skiprows: 5
    skipfooter: 1
    na_values: ['#NA', NA]


# Loads / saves a pickle file from / to a local file system
airplanes:
  type: pickle.PickleDataSet
  filepath: data/06_models/airplanes.pkl
  backend: pickle


# Loads an Excel file from Google Cloud Storage
rockets:
  type: pandas.ExcelDataSet
  filepath: gcs://your_bucket/data/02_intermediate/company/motorbikes.xlsx
  fs_args:
    project: my-project
  credentials: my_gcp_credentials
  save_args:
    sheet_name: Sheet1


# Loads a multi-sheet Excel file from a local file system
trains:
  type: pandas.ExcelDataSet
  filepath: data/02_intermediate/company/trains.xlsx
  load_args:
    sheet_name: [Sheet1, Sheet2, Sheet3]


# Saves an image created with Matplotlib on Google Cloud Storage
results_plot:
  type: matplotlib.MatplotlibWriter
  filepath: gcs://your_bucket/data/08_results/plots/output_1.jpeg
  fs_args:
    project: my-project
  credentials: my_gcp_credentials


# Loads / saves an HDF file on local file system storage, using specified load and save arguments
skateboards:
  type: pandas.HDFDataSet
  filepath: data/02_intermediate/skateboards.hdf
  key: name
  load_args:
    columns: [brand, length]
  save_args:
    mode: w  # Overwrite even when the file already exists
    dropna: True


# Loads / saves a parquet file on local file system storage, using specified load and save arguments
trucks:
  type: pandas.ParquetDataSet
  filepath: data/02_intermediate/trucks.parquet
  load_args:
    columns: [name, gear, disp, wt]
    categories: list
    index: name
  save_args:
    compression: GZIP
    file_scheme: hive
    has_nulls: False
    partition_on: [name]


# Loads / saves a Spark table on S3, using specified load and save arguments
weather:
  type: spark.SparkDataSet
  filepath: s3a://your_bucket/data/01_raw/weather*
  credentials: dev_s3
  file_format: csv
  load_args:
    header: True
    inferSchema: True
  save_args:
    sep: '|'
    header: True


# Loads / saves a SQL table using credentials, a database connection, using specified load and save arguments
scooters:
  type: pandas.SQLTableDataSet
  credentials: scooters_credentials
  table_name: scooters
  load_args:
    index_col: [name]
    columns: [name, gear]
  save_args:
    if_exists: replace


# Loads an SQL table with credentials, a database connection, and applies a SQL query to the table
scooters_query:
  type: pandas.SQLQueryDataSet
  credentials: scooters_credentials
  sql: select * from cars where gear=4
  load_args:
    index_col: [name]


# Loads data from an API endpoint, example US corn yield data from USDA
us_corn_yield_data:
  type: api.APIDataSet
  url: https://quickstats.nass.usda.gov
  credentials: usda_credentials
  params:
    key: SOME_TOKEN
    format: JSON
    commodity_desc: CORN
    statisticcat_des: YIELD
    agg_level_desc: STATE
    year: 2000


# Loads data from Minio (S3 API Compatible Storage)
test:
  type: pandas.CSVDataSet
  filepath: s3://your_bucket/test.csv # assume `test.csv` is uploaded to the Minio server.
  credentials: dev_minio


# Loads a model saved as a pickle from Azure Blob Storage
ml_model:
  type: pickle.PickleDataSet
  filepath: "abfs://models/ml_models.pickle"
  versioned: True
  credentials: dev_abs

# Loads a CSV file stored in a remote location through SSH
cool_dataset:
  type: pandas.CSVDataSet
  filepath: "sftp:///path/to/remote_cluster/cool_data.csv"
  credentials: cluster_credentials
conf/local/credentials.yml
dev_s3:
  client_kwargs:
    aws_access_key_id: key
    aws_secret_access_key: secret

scooters_credentials:
  con: sqlite:///kedro.db

my_gcp_credentials:
  id_token: key

usda_credentials:
  - username
  - password

dev_minio:
  key: token
  secret: key
  client_kwargs:
    endpoint_url : 'http://localhost:9000'

dev_abs:
  account_name: accountname
  account_key: key

cluster_credentials:
  username: my_username
  host: host_address
  port: 22
  password: password

Generalisasi notasi dataset

Dataset dengan format file dan argumen load dan save yang sama dapat dijelaskan dengan notasi berikut.

conf/base/catalog.yml
_csv: &csv
  type: spark.SparkDataSet
  file_format: csv
  load_args:
    sep: ','
    na_values: ['#NA', NA]
    header: True
    inferSchema: False

cars:
  <<: *csv
  filepath: s3a://data/01_raw/cars.csv

trucks:
  <<: *csv
  filepath: s3a://data/01_raw/trucks.csv

bikes:
  <<: *csv
  filepath: s3a://data/01_raw/bikes.csv
  load_args:
    header: False
conf/base/catalog.yml
_csv: &csv
  type: spark.SparkDataSet
  file_format: csv
  load_args: &csv_load_args
    header: True
    inferSchema: False

airplanes:
  <<: *csv
  filepath: s3a://data/01_raw/airplanes.csv
  load_args:
    <<: *csv_load_args
    sep: ;

Dataset yang didukung

Daftar dataset yang didukung oleh Kedro secara default dapat ditemukan dalam dokumentasi resmi di bawah ini.

https://kedro.readthedocs.io/en/stable/kedro.extras.datasets.html

Dataset khusus

Membuat custom datasets memungkinkan Anda untuk bekerja dengan file yang tidak dapat dimuat secara default.

Buat custom_datasets.py sebagai berikut. Perhatikan bahwa nama file bisa berubah-ubah.

src/<project_name>/extras
├── __init__.py
└── datasets
    ├── __init__.py
    └── custom_datasets.py

Tuliskan dataset kustom Anda di custom_datasets.py sebagai berikut. Perhatikan bahwa _save dan _describe harus didefinisikan.

src/<project_name>/extras/datasets/custom_datasets.py
import fsspec
from typing import Any, Dict
from kedro.io import AbstractDataSet
import numpy as np
import gensim


class ImageDataSet(AbstractVersionedDataSet[np.ndarray, np.ndarray]):
    def __init__(
        self,
        filepath: str,
        version: Version = None,
        credentials: Dict[str, Any] = None,
        fs_args: Dict[str, Any] = None,
    ):
        """Creates a new instance of ImageDataSet to load / save image data for given filepath.

        Args:
            filepath: The location of the image file to load / save data.
            version: The version of the dataset being saved and loaded.
            credentials: Credentials required to get access to the underlying filesystem.
                E.g. for ``GCSFileSystem`` it should look like `{"token": None}`.
            fs_args: Extra arguments to pass into underlying filesystem class.
                E.g. for ``GCSFileSystem`` class: `{"project": "my-project", ...}`.
        """
        protocol, path = get_protocol_and_path(filepath)
        self._protocol = protocol
        self._fs = fsspec.filesystem(self._protocol, **credentials, **fs_args)

    def _describe(self) -> Dict[str, Any]:
        """Returns a dict that describes the attributes of the dataset."""
        return dict(filepath=self._filepath, protocol=self._protocol)

    def _save(self, data: np.ndarray) -> None:
        """Saves image data to the specified filepath."""
        # using get_filepath_str ensures that the protocol and path are appended correctly for different filesystems
        save_path = get_filepath_str(self._filepath, self._protocol)
        with self._fs.open(save_path, "wb") as f:
            image = Image.fromarray(data)
            image.save(f)


class WordVec(AbstractDataSet):
    def __init__(self, filepath: str):
        self._filepath = filepath

    def _load(self):
        return gensim.models.KeyedVectors.load_word2vec_format(
            self._filepath, binary=False
        )

    def _save(self):
        ...

    def _describe(self):
        ...

Anda dapat memuat dataset kustom dengan meletakkan yang berikut ini di catalog.yml.

conf/base/catalog.yml
pikachu:
  type: <project_name>.extras.datasets.custom_datasets.ImageDataSet
  filepath: data/01_raw/pokemon-images-and-types/images/images/pikachu.png
  # Note: the duplicated `images` path is part of the original Kaggle dataset

fast_text_model:
  type: <project_name>.extras.datasets.custom_datasets.WordVec
  filepath: data/01_raw/fast_text_model.vec

Referensi

https://kedro.readthedocs.io/en/stable/data/data_catalog.html
https://kedro.readthedocs.io/en/stable/extend_kedro/custom_datasets.html
https://kedro.readthedocs.io/en/stable/kedro.extras.datasets.html

Ryusei Kakujo

researchgatelinkedingithub

Focusing on data science for mobility

Bench Press 100kg!