Traffine I/O

日本語

2022-12-30

dbtのSource

dbt の Source とは

dbtにおいてSourceとは変換の元となるDWH上のローデータのことです。dbtでは以下のようなSourceを扱う機能があります。

  • Sourceに名前をつけてモデルから参照させる
  • Sourceのデータをテストする
  • Sourceのデータの鮮度を確認する

Sources の宣言

任意の名前の.ymlファイルをdbt_project.ymlmodel-pathsに宣言されたディレクトリに配置します。

BigQueryに以下のようなローデータがあるとします。

  • プロジェクトID: my_project
  • データセット: jaffle_shop
    • テーブル:
      • raw_customers
      • raw_orders
      • raw_payments

データセットjaffle_shopにあるローデータをSourceとしたい場合は以下のように記述します。

models/<filename>.yml
version: 2
sources:
  - name: jaffle ## Any name
    database: my_project
    schema: jaffle_shop
    tables:
      - name: raw_customers
      - name: raw_orders
      - name: raw_payments

Sources の利用

宣言されたSourcesはモデルから参照することができます。以下の例では、jaffleと宣言したSourceからraw_ordersテーブルとraw_customersテーブルを参照しています。

models/orders.sql
SELECT
  *
FROM {{ source('jaffle', 'raw_orders') }}
LEFT JOIN {{ source('jaffle', 'raw_customers') }} USING (customer_id)

dbtは内部的に以下のようにコンパイルしています。

SELECT
  *
FROM jaffle.jaffle_shop.raw_orders
LEFT JOIN jaffle.jaffle_shop.raw_customers USING (customer_id)

Sources のテスト

以下のように記述し、dbt testを実行するとSourceのテストをすることができます。

models/<filename>.yml
  version: 2
  sources:
    - name: jaffle
      database: my_project
      schema: jaffle_shop
      tables:
        - name: raw_customers
+         columns:
+          - name: id
+            tests:
+               - unique
+               - not_null
        - name: raw_orders
+         columns:
+           - name: id
+             tests:
+               - unique
        - name: raw_payments

Source の鮮度チェック

loaded_at_fieldfreshnessについて記述することでSourceの鮮度をチェックすることができます。

models/<filename>.yml
  version: 2
  sources:
    - name: jaffle
      database: my_project
      schema: jaffle_shop
+     freshness: # default freshness
+       warn_after: {count: 12, period: hour}
+       error_after: {count: 24, period: hour}
+     loaded_at_field: _etl_loaded_at
      tables:
      - name: raw_customers # this will use the freshness defined above
      - name: raw_orders
+       freshness: # make this a little more strict
+         warn_after: {count: 6, period: hour}
+         error_after: {count: 12, period: hour}
      - name: raw_payments
+       freshness: null # do not check freshness for this table

loaded_at_fieldには鮮度の計算に使用するための時間データのカラムを指定します。dbtは指定したカラムのMAX値をとり、現在時刻との差を計ることで、データの鮮度をチェックします。ただし、時間はtimestampかつUTCである必要があります。そうでない場合は以下のようにキャストをかけるクエリを記述する必要があります。

loaded_at_field: "convert_timezone('UTC', 'Asia/Tokyo', _etl_loaded_at)"

freshnessには現在時刻とデータの時刻の差についての警告やエラーを定義をします。例えば、warn_after: {count: 12, period: hour}は最新データが現在時刻より12時間以上離れていたら警告を出すという意味になります。

dbt source freshnessを実行するとSourcesの鮮度を計ることができます。

$ dbt source freshness

Running with dbt=0.18.1
Found 5 models, 11 tests, 0 snapshots, 0 analyses, 155 macros, 0 operations, 0 seed files, 2 sources

20:35:05 | Concurrency: 4 threads (target='learn')
20:35:05 |
20:35:06 | 1 of 1 START freshness of jaffle.raw_orders........................... [RUN]
20:35:09 | 1 of 1 WARN freshness of jaffle.raw_orders............................ [WARN in 3.98s]
20:35:09 | Done.

WARNが出たので、warn_afterで定めた鮮度を超えている、つまり現在時刻(コマンド実行時刻)から6時間以上経過しているということになります。なお、鮮度に問題がなければPASSSと表示され、error_afterの場合はERROR STALEと表示されます。

参考

https://docs.getdbt.com/docs/build/sources
https://dev.classmethod.jp/articles/dbt-sources/

Ryusei Kakujo

researchgatelinkedingithub

Focusing on data science for mobility

Bench Press 100kg!