Hooks
Hooks are a mechanism for adding sub-processes to the main execution of Kedro, and the timing for firing Hooks is selected from the following main execution timings.
after_catalog_created
before_node_run
after_node_run
on_node_error
before_pipeline_run
after_pipeline_run
on_pipeline_error
before_dataset_loaded
after_dataset_loaded
before_dataset_saved
after_dataset_saved
after_context_created
The above is named by the rule <before/after>_<noun>_<past_participle>
. For example, before_node_run
means that the Hook is run before the Node is executed.
How to use Hooks
Follow the steps below to set up Hooks.
- Define Hooks in
src/<project_name>/hooks.py
- Update
HOOKS
insrc/<project_name>/settings.py
In src/<project_name>/hooks.py
, use the @hook_impl
decorator to declare the execution of the Hook. The following code declares the Hook to run at after_data_catalog_created
timing, that is, after the DataCatalog has been created.
import logging
from kedro.framework.hooks import hook_impl
from kedro.io import DataCatalog
class DataCatalogHooks:
@property
def _logger(self):
return logging.getLogger(self.__class__.__name__)
@hook_impl
def after_catalog_created(self, catalog: DataCatalog) -> None:
self._logger.info(catalog.list())
Update src/<project_name>/settings.py
as follows to set up the Hook.
`from <project_name>.hooks import ProjectHooks, DataCatalogHooks
- HOOKS = (ProjectHooks(),)
+ `HOOKS = (ProjectHooks(), DataCatalogHooks())
Hooks examples
Tracking memory consumption
You can use memory-profiler to track memory consumption.
$ pip install memory_profiler
from memory_profiler import memory_usage
import logging
def _normalise_mem_usage(mem_usage):
# memory_profiler < 0.56.0 returns list instead of float
return mem_usage[0] if isinstance(mem_usage, (list, tuple)) else mem_usage
class MemoryProfilingHooks:
def __init__(self):
self._mem_usage = {}
@property
def _logger(self):
return logging.getLogger(self.__class__.__name__)
@hook_impl
def before_dataset_loaded(self, dataset_name: str) -> None:
before_mem_usage = memory_usage(
-1,
interval=0.1,
max_usage=True,
retval=True,
include_children=True,
)
before_mem_usage = _normalise_mem_usage(before_mem_usage)
self._mem_usage[dataset_name] = before_mem_usage
@hook_impl
def after_dataset_loaded(self, dataset_name: str) -> None:
after_mem_usage = memory_usage(
-1,
interval=0.1,
max_usage=True,
retval=True,
include_children=True,
)
# memory_profiler < 0.56.0 returns list instead of float
after_mem_usage = _normalise_mem_usage(after_mem_usage)
self._logger.info(
"Loading %s consumed %2.2fMiB memory",
dataset_name,
after_mem_usage - self._mem_usage[dataset_name],
)
Edit HOOKS
in src/<project_name>/settings.py
as follows.
- HOOKS = (ProjectHooks(),)
+ HOOKS = (MemoryProfilingHooks(),)
Running kedro run
will output a log of memory consumption.
$ kedro run
$ 2021-10-05 12:02:34,946 - kedro.io.data_catalog - INFO - Loading data from `shuttles` (ExcelDataSet)...
2021-10-05 12:02:43,358 - MemoryProfilingHooks - INFO - Loading shuttles consumed 82.67MiB memory
2021-10-05 12:02:43,358 - kedro.pipeline.node - INFO - Running node: preprocess_shuttles_node: preprocess_shuttles([shuttles]) -> [preprocessed_shuttles]
2021-10-05 12:02:43,440 - kedro.io.data_catalog - INFO - Saving data to `preprocessed_shuttles` (MemoryDataSet)...
2021-10-05 12:02:43,446 - kedro.runner.sequential_runner - INFO - Completed 1 out of 2 tasks
2021-10-05 12:02:43,559 - kedro.io.data_catalog - INFO - Loading data from `companies` (CSVDataSet)...
2021-10-05 12:02:43,727 - MemoryProfilingHooks - INFO - Loading companies consumed 4.16MiB memory
Data validation
You can use Great Expectations to validate inputs and outputs.
$ pip install great-expectations
from typing import Any, Dict
from kedro.framework.hooks import hook_impl
from kedro.io import DataCatalog
import great_expectations as ge
class DataValidationHooks:
# Map expectation to dataset
DATASET_EXPECTATION_MAPPING = {
"companies": "raw_companies_dataset_expectation",
"preprocessed_companies": "preprocessed_companies_dataset_expectation",
}
@hook_impl
def before_node_run(
self, catalog: DataCatalog, inputs: Dict[str, Any], session_id: str
) -> None:
"""Validate inputs data to a node based on using great expectation
if an expectation suite is defined in ``DATASET_EXPECTATION_MAPPING``.
"""
self._run_validation(catalog, inputs, session_id)
@hook_impl
def after_node_run(
self, catalog: DataCatalog, outputs: Dict[str, Any], session_id: str
) -> None:
"""Validate outputs data from a node based on using great expectation
if an expectation suite is defined in ``DATASET_EXPECTATION_MAPPING``.
"""
self._run_validation(catalog, outputs, session_id)
def _run_validation(
self, catalog: DataCatalog, data: Dict[str, Any], session_id: str
):
for dataset_name, dataset_value in data.items():
if dataset_name not in self.DATASET_EXPECTATION_MAPPING:
continue
dataset = catalog._get_dataset(dataset_name)
dataset_path = str(dataset._filepath)
expectation_suite = self.DATASET_EXPECTATION_MAPPING[dataset_name]
expectation_context = ge.data_context.DataContext()
batch = expectation_context.get_batch(
{"path": dataset_path, "datasource": "files_datasource"},
expectation_suite,
)
expectation_context.run_validation_operator(
"action_list_operator",
assets_to_validate=[batch],
session_id=session_id,
)
Edit HOOKS
in src/<project_name>/settings.py
as follows.
- HOOKS = (ProjectHooks(),)
+ HOOKS = (DataValidationHooks(),)
Metrics tracking
You can use MLflow to plant metrics tracking.
$ pip install mlflow
from typing import Any, Dict
import mlflow
import mlflow.sklearn
from kedro.framework.hooks import hook_impl
from kedro.pipeline.node import Node
class ModelTrackingHooks:
"""Namespace for grouping all model-tracking hooks with MLflow together."""
@hook_impl
def before_pipeline_run(self, run_params: Dict[str, Any]) -> None:
"""Hook implementation to start an MLflow run
with the session_id of the Kedro pipeline run.
"""
mlflow.start_run(run_name=run_params["session_id"])
mlflow.log_params(run_params)
@hook_impl
def after_node_run(
self, node: Node, outputs: Dict[str, Any], inputs: Dict[str, Any]
) -> None:
"""Hook implementation to add model tracking after some node runs.
In this example, we will:
* Log the parameters after the data splitting node runs.
* Log the model after the model training node runs.
* Log the model's metrics after the model evaluating node runs.
"""
if node._func_name == "split_data":
mlflow.log_params(
{"split_data_ratio": inputs["params:example_test_data_ratio"]}
)
elif node._func_name == "train_model":
model = outputs["example_model"]
mlflow.sklearn.log_model(model, "model")
mlflow.log_params(inputs["parameters"])
@hook_impl
def after_pipeline_run(self) -> None:
"""Hook implementation to end the MLflow run
after the Kedro pipeline finishes.
"""
mlflow.end_run()
Edit HOOKS
in src/<project_name>/settings.py
as follows.
- HOOKS = (ProjectHooks(),)
+ HOOKS = (ModelTrackingHooks(),)
References