Traffine I/O

日本語

2023-01-14

Kedro DataCatalog

DataCatalog

KedroにはDataCatalogという機能があります。catalog/ymlにプロジェクトで使用するデータソースを記述し、データのロードと保存を管理します。Kedroがデフォルトでサポートしているデータセットのファイルパスは次のとおりです。

  • Local / Network File System
    • file://
  • Hadoop File System (HDFS)
    • hdfs://user@server:port/path/to/data
  • Amazon S3
    • s3://my-bucket-name/path/to/data
  • S3 Compatible Storage
    • s3://my-bucket-name/path/_to/data
  • Google Cloud Storage
    • gcs://
  • Azure Blob Storage / Azure Data Lake Storage Gen2
    • abfs://
  • HTTP(s)
    • http://
    • https://

catalog.yml の例

以下に公式ドキュメントで示されているデータセットの書き方の例を紹介します。

conf/base/catalog.yml
# Provide the project value to the underlying filesystem class (GCSFileSystem) to interact with Google Cloud Storage (GCS)
test_dataset:
  type: ...
  fs_args:
    project: test_project


# Load data from a local binary file using utf-8 encoding
test_dataset:
  type: ...
  fs_args:
    open_args_load:
      mode: "rb"
      encoding: "utf-8"


# Save data to a CSV file without row names (index) using utf-8 encoding
test_dataset:
  type: pandas.CSVDataSet
  ...
  save_args:
    index: False
    encoding: "utf-8"

---

# Loads / saves a CSV file from / to a local file system
bikes:
  type: pandas.CSVDataSet
  filepath: data/01_raw/bikes.csv


# Loads and saves a CSV on a local file system, using specified load and save arguments
cars:
  type: pandas.CSVDataSet
  filepath: data/01_raw/company/cars.csv
  load_args:
    sep: ','
  save_args:
    index: False
    date_format: '%Y-%m-%d %H:%M'
    decimal: .


# Loads and saves a compressed CSV on a local file system
boats:
  type: pandas.CSVDataSet
  filepath: data/01_raw/company/boats.csv.gz
  load_args:
    sep: ','
    compression: 'gzip'
  fs_args:
    open_args_load:
      mode: 'rb'


# Loads a CSV file from a specific S3 bucket, using credentials and load arguments
motorbikes:
  type: pandas.CSVDataSet
  filepath: s3://your_bucket/data/02_intermediate/company/motorbikes.csv
  credentials: dev_s3
  load_args:
    sep: ','
    skiprows: 5
    skipfooter: 1
    na_values: ['#NA', NA]


# Loads / saves a pickle file from / to a local file system
airplanes:
  type: pickle.PickleDataSet
  filepath: data/06_models/airplanes.pkl
  backend: pickle


# Loads an Excel file from Google Cloud Storage
rockets:
  type: pandas.ExcelDataSet
  filepath: gcs://your_bucket/data/02_intermediate/company/motorbikes.xlsx
  fs_args:
    project: my-project
  credentials: my_gcp_credentials
  save_args:
    sheet_name: Sheet1


# Loads a multi-sheet Excel file from a local file system
trains:
  type: pandas.ExcelDataSet
  filepath: data/02_intermediate/company/trains.xlsx
  load_args:
    sheet_name: [Sheet1, Sheet2, Sheet3]


# Saves an image created with Matplotlib on Google Cloud Storage
results_plot:
  type: matplotlib.MatplotlibWriter
  filepath: gcs://your_bucket/data/08_results/plots/output_1.jpeg
  fs_args:
    project: my-project
  credentials: my_gcp_credentials


# Loads / saves an HDF file on local file system storage, using specified load and save arguments
skateboards:
  type: pandas.HDFDataSet
  filepath: data/02_intermediate/skateboards.hdf
  key: name
  load_args:
    columns: [brand, length]
  save_args:
    mode: w  # Overwrite even when the file already exists
    dropna: True


# Loads / saves a parquet file on local file system storage, using specified load and save arguments
trucks:
  type: pandas.ParquetDataSet
  filepath: data/02_intermediate/trucks.parquet
  load_args:
    columns: [name, gear, disp, wt]
    categories: list
    index: name
  save_args:
    compression: GZIP
    file_scheme: hive
    has_nulls: False
    partition_on: [name]


# Loads / saves a Spark table on S3, using specified load and save arguments
weather:
  type: spark.SparkDataSet
  filepath: s3a://your_bucket/data/01_raw/weather*
  credentials: dev_s3
  file_format: csv
  load_args:
    header: True
    inferSchema: True
  save_args:
    sep: '|'
    header: True


# Loads / saves a SQL table using credentials, a database connection, using specified load and save arguments
scooters:
  type: pandas.SQLTableDataSet
  credentials: scooters_credentials
  table_name: scooters
  load_args:
    index_col: [name]
    columns: [name, gear]
  save_args:
    if_exists: replace


# Loads an SQL table with credentials, a database connection, and applies a SQL query to the table
scooters_query:
  type: pandas.SQLQueryDataSet
  credentials: scooters_credentials
  sql: select * from cars where gear=4
  load_args:
    index_col: [name]


# Loads data from an API endpoint, example US corn yield data from USDA
us_corn_yield_data:
  type: api.APIDataSet
  url: https://quickstats.nass.usda.gov
  credentials: usda_credentials
  params:
    key: SOME_TOKEN
    format: JSON
    commodity_desc: CORN
    statisticcat_des: YIELD
    agg_level_desc: STATE
    year: 2000


# Loads data from Minio (S3 API Compatible Storage)
test:
  type: pandas.CSVDataSet
  filepath: s3://your_bucket/test.csv # assume `test.csv` is uploaded to the Minio server.
  credentials: dev_minio


# Loads a model saved as a pickle from Azure Blob Storage
ml_model:
  type: pickle.PickleDataSet
  filepath: "abfs://models/ml_models.pickle"
  versioned: True
  credentials: dev_abs

# Loads a CSV file stored in a remote location through SSH
cool_dataset:
  type: pandas.CSVDataSet
  filepath: "sftp:///path/to/remote_cluster/cool_data.csv"
  credentials: cluster_credentials
conf/local/credentials.yml
dev_s3:
  client_kwargs:
    aws_access_key_id: key
    aws_secret_access_key: secret

scooters_credentials:
  con: sqlite:///kedro.db

my_gcp_credentials:
  id_token: key

usda_credentials:
  - username
  - password

dev_minio:
  key: token
  secret: key
  client_kwargs:
    endpoint_url : 'http://localhost:9000'

dev_abs:
  account_name: accountname
  account_key: key

cluster_credentials:
  username: my_username
  host: host_address
  port: 22
  password: password

データセットの記法の汎化

ファイルフォーマットやロードとセーブの引数が同じデータセットを次のような記法で記述することができます。

conf/base/catalog.yml
_csv: &csv
  type: spark.SparkDataSet
  file_format: csv
  load_args:
    sep: ','
    na_values: ['#NA', NA]
    header: True
    inferSchema: False

cars:
  <<: *csv
  filepath: s3a://data/01_raw/cars.csv

trucks:
  <<: *csv
  filepath: s3a://data/01_raw/trucks.csv

bikes:
  <<: *csv
  filepath: s3a://data/01_raw/bikes.csv
  load_args:
    header: False
conf/base/catalog.yml
_csv: &csv
  type: spark.SparkDataSet
  file_format: csv
  load_args: &csv_load_args
    header: True
    inferSchema: False

airplanes:
  <<: *csv
  filepath: s3a://data/01_raw/airplanes.csv
  load_args:
    <<: *csv_load_args
    sep: ;

サポートされているデータセット

Kedroでデフォルトでサポートされているデータセットの一覧は次の公式ドキュメントで確認することができます。

https://kedro.readthedocs.io/en/stable/kedro.extras.datasets.html

カスタムデータセット

カスタムデータセットを作成することでデフォルトでは読み込むことができないファイルを扱うことができます。

次のようにcustom_datasets.pyを生成します。なお、ファイル名は任意で構いません。

src/<project_name>/extras
├── __init__.py
└── datasets
    ├── __init__.py
    └── custom_datasets.py

custom_datasets.pyに次のように、カスタムデータセットを記述します。なお、_save_describeは必ず定義する必要があります。

src/<project_name>/extras/datasets/custom_datasets.py
import fsspec
from typing import Any, Dict
from kedro.io import AbstractDataSet
import numpy as np
import gensim


class ImageDataSet(AbstractVersionedDataSet[np.ndarray, np.ndarray]):
    def __init__(
        self,
        filepath: str,
        version: Version = None,
        credentials: Dict[str, Any] = None,
        fs_args: Dict[str, Any] = None,
    ):
        """Creates a new instance of ImageDataSet to load / save image data for given filepath.

        Args:
            filepath: The location of the image file to load / save data.
            version: The version of the dataset being saved and loaded.
            credentials: Credentials required to get access to the underlying filesystem.
                E.g. for ``GCSFileSystem`` it should look like `{"token": None}`.
            fs_args: Extra arguments to pass into underlying filesystem class.
                E.g. for ``GCSFileSystem`` class: `{"project": "my-project", ...}`.
        """
        protocol, path = get_protocol_and_path(filepath)
        self._protocol = protocol
        self._fs = fsspec.filesystem(self._protocol, **credentials, **fs_args)

    def _describe(self) -> Dict[str, Any]:
        """Returns a dict that describes the attributes of the dataset."""
        return dict(filepath=self._filepath, protocol=self._protocol)

    def _save(self, data: np.ndarray) -> None:
        """Saves image data to the specified filepath."""
        # using get_filepath_str ensures that the protocol and path are appended correctly for different filesystems
        save_path = get_filepath_str(self._filepath, self._protocol)
        with self._fs.open(save_path, "wb") as f:
            image = Image.fromarray(data)
            image.save(f)


class WordVec(AbstractDataSet):
    def __init__(self, filepath: str):
        self._filepath = filepath

    def _load(self):
        return gensim.models.KeyedVectors.load_word2vec_format(
            self._filepath, binary=False
        )

    def _save(self):
        ...

    def _describe(self):
        ...

catalog.ymlに次のように記述するとカスタムデータセットを読み込むことができます。

conf/base/catalog.yml
pikachu:
  type: <project_name>.extras.datasets.custom_datasets.ImageDataSet
  filepath: data/01_raw/pokemon-images-and-types/images/images/pikachu.png
  # Note: the duplicated `images` path is part of the original Kaggle dataset

fast_text_model:
  type: <project_name>.extras.datasets.custom_datasets.WordVec
  filepath: data/01_raw/fast_text_model.vec

参考

https://kedro.readthedocs.io/en/stable/data/data_catalog.html
https://kedro.readthedocs.io/en/stable/extend_kedro/custom_datasets.html
https://kedro.readthedocs.io/en/stable/kedro.extras.datasets.html

Ryusei Kakujo

researchgatelinkedingithub

Focusing on data science for mobility

Bench Press 100kg!