dbt の Source とは
dbtにおいてSourceとは変換の元となるDWH上のローデータのことです。dbtでは次のようなSourceを扱う機能があります。
- Sourceに名前をつけてモデルから参照させる
- Sourceのデータをテストする
- Sourceのデータの鮮度を確認する
Sources の宣言
任意の名前の.yml
ファイルをdbt_project.yml
のmodel-paths
に宣言されたディレクトリに配置します。
BigQueryに次のようなローデータがあるとします。
- プロジェクトID:
my_project
- データセット:
jaffle_shop
- テーブル:
raw_customers
raw_orders
raw_payments
- テーブル:
データセットjaffle_shop
にあるローデータをSourceとしたい場合は次のように記述します。
version: 2
sources:
- name: jaffle ## Any name
database: my_project
schema: jaffle_shop
tables:
- name: raw_customers
- name: raw_orders
- name: raw_payments
Sources の利用
宣言されたSourcesはモデルから参照することができます。次の例では、jaffle
と宣言したSourceからraw_orders
テーブルとraw_customers
テーブルを参照しています。
SELECT
*
FROM {{ source('jaffle', 'raw_orders') }}
LEFT JOIN {{ source('jaffle', 'raw_customers') }} USING (customer_id)
dbtは内部的に次のようにコンパイルしています。
SELECT
*
FROM jaffle.jaffle_shop.raw_orders
LEFT JOIN jaffle.jaffle_shop.raw_customers USING (customer_id)
Sources のテスト
次のように記述し、dbt test
を実行するとSourceのテストをすることができます。
version: 2
sources:
- name: jaffle
database: my_project
schema: jaffle_shop
tables:
- name: raw_customers
+ columns:
+ - name: id
+ tests:
+ - unique
+ - not_null
- name: raw_orders
+ columns:
+ - name: id
+ tests:
+ - unique
- name: raw_payments
Source の鮮度チェック
loaded_at_field
とfreshness
について記述することでSourceの鮮度をチェックすることができます。
version: 2
sources:
- name: jaffle
database: my_project
schema: jaffle_shop
+ freshness: # default freshness
+ warn_after: {count: 12, period: hour}
+ error_after: {count: 24, period: hour}
+ loaded_at_field: _etl_loaded_at
tables:
- name: raw_customers # this will use the freshness defined above
- name: raw_orders
+ freshness: # make this a little more strict
+ warn_after: {count: 6, period: hour}
+ error_after: {count: 12, period: hour}
- name: raw_payments
+ freshness: null # do not check freshness for this table
loaded_at_field
には鮮度の計算に使用するための時間データのカラムを指定します。dbtは指定したカラムのMAX値をとり、現在時刻との差を計ることで、データの鮮度をチェックします。ただし、時間はtimestampかつUTCである必要があります。そうでない場合は次のようにキャストをかけるクエリを記述する必要があります。
loaded_at_field: "convert_timezone('UTC', 'Asia/Tokyo', _etl_loaded_at)"
freshness
には現在時刻とデータの時刻の差についての警告やエラーを定義をします。例えば、warn_after: {count: 12, period: hour}
は最新データが現在時刻より12時間以上離れていたら警告を出すという意味になります。
dbt source freshness
を実行するとSourcesの鮮度を計ることができます。
$ dbt source freshness
Running with dbt=0.18.1
Found 5 models, 11 tests, 0 snapshots, 0 analyses, 155 macros, 0 operations, 0 seed files, 2 sources
20:35:05 | Concurrency: 4 threads (target='learn')
20:35:05 |
20:35:06 | 1 of 1 START freshness of jaffle.raw_orders........................... [RUN]
20:35:09 | 1 of 1 WARN freshness of jaffle.raw_orders............................ [WARN in 3.98s]
20:35:09 | Done.
WARN
が出たので、warn_after
で定めた鮮度を超えている、つまり現在時刻(コマンド実行時刻)から6時間以上経過しているということになります。なお、鮮度に問題がなければPASSS
と表示され、error_after
の場合はERROR STALE
と表示されます。
参考