dbt の Source とは
dbtにおいてSourceとは変換の元となるDWH上のローデータのことです。dbtでは次のようなSourceを扱う機能があります。
- Sourceに名前をつけてモデルから参照させる
- Sourceのデータをテストする
- Sourceのデータの鮮度を確認する
Sources の宣言
任意の名前の.ymlファイルをdbt_project.ymlのmodel-pathsに宣言されたディレクトリに配置します。
BigQueryに次のようなローデータがあるとします。
- プロジェクトID:
my_project - データセット:
jaffle_shop- テーブル:
raw_customersraw_ordersraw_payments
- テーブル:
データセットjaffle_shopにあるローデータをSourceとしたい場合は次のように記述します。
version: 2
sources:
- name: jaffle ## Any name
database: my_project
schema: jaffle_shop
tables:
- name: raw_customers
- name: raw_orders
- name: raw_payments
Sources の利用
宣言されたSourcesはモデルから参照することができます。次の例では、jaffleと宣言したSourceからraw_ordersテーブルとraw_customersテーブルを参照しています。
SELECT
*
FROM {{ source('jaffle', 'raw_orders') }}
LEFT JOIN {{ source('jaffle', 'raw_customers') }} USING (customer_id)
dbtは内部的に次のようにコンパイルしています。
SELECT
*
FROM jaffle.jaffle_shop.raw_orders
LEFT JOIN jaffle.jaffle_shop.raw_customers USING (customer_id)
Sources のテスト
次のように記述し、dbt testを実行するとSourceのテストをすることができます。
version: 2
sources:
- name: jaffle
database: my_project
schema: jaffle_shop
tables:
- name: raw_customers
+ columns:
+ - name: id
+ tests:
+ - unique
+ - not_null
- name: raw_orders
+ columns:
+ - name: id
+ tests:
+ - unique
- name: raw_payments
Source の鮮度チェック
loaded_at_fieldとfreshnessについて記述することでSourceの鮮度をチェックすることができます。
version: 2
sources:
- name: jaffle
database: my_project
schema: jaffle_shop
+ freshness: # default freshness
+ warn_after: {count: 12, period: hour}
+ error_after: {count: 24, period: hour}
+ loaded_at_field: _etl_loaded_at
tables:
- name: raw_customers # this will use the freshness defined above
- name: raw_orders
+ freshness: # make this a little more strict
+ warn_after: {count: 6, period: hour}
+ error_after: {count: 12, period: hour}
- name: raw_payments
+ freshness: null # do not check freshness for this table
loaded_at_fieldには鮮度の計算に使用するための時間データのカラムを指定します。dbtは指定したカラムのMAX値をとり、現在時刻との差を計ることで、データの鮮度をチェックします。ただし、時間はtimestampかつUTCである必要があります。そうでない場合は次のようにキャストをかけるクエリを記述する必要があります。
loaded_at_field: "convert_timezone('UTC', 'Asia/Tokyo', _etl_loaded_at)"
freshnessには現在時刻とデータの時刻の差についての警告やエラーを定義をします。例えば、warn_after: {count: 12, period: hour}は最新データが現在時刻より12時間以上離れていたら警告を出すという意味になります。
dbt source freshnessを実行するとSourcesの鮮度を計ることができます。
$ dbt source freshness
Running with dbt=0.18.1
Found 5 models, 11 tests, 0 snapshots, 0 analyses, 155 macros, 0 operations, 0 seed files, 2 sources
20:35:05 | Concurrency: 4 threads (target='learn')
20:35:05 |
20:35:06 | 1 of 1 START freshness of jaffle.raw_orders........................... [RUN]
20:35:09 | 1 of 1 WARN freshness of jaffle.raw_orders............................ [WARN in 3.98s]
20:35:09 | Done.
WARNが出たので、warn_afterで定めた鮮度を超えている、つまり現在時刻(コマンド実行時刻)から6時間以上経過しているということになります。なお、鮮度に問題がなければPASSSと表示され、error_afterの場合はERROR STALEと表示されます。
参考