2023-01-14

Kedro modular pipelines

Modular pipelines

Kedro allows you to modularize your pipelines to create reusable pipelines. For example, you can run multiple pipelines that differ only in features. Modular pipelines are easier to develop, test, and maintain and are more portable.

Extending project with modular pipelines

Let us extend the pipeline of spaceflights Starter provided by Kedro. First, prepare your spaceflights project.

$ kedro new --starter=spaceflights
$ cd spaceflights
$ pip install -r src/requirements.txt

We will now add namespaces with the following different parameters to the modeling component of the data science pipeline.

  • active_modelling_pipeline
  • candidate_modelling_pipeline

Edit conf/base/parameters/data_science.yml as follows.

conf/base/parameters/data_science.yml
-  model_options:
-    test_size: 0.2
-    random_state: 3
-    features:
-      - engines
-      - passenger_capacity
-      - crew
-      - d_check_complete
-      - moon_clearance_complete
-      - iata_approved
-      - company_rating
-      - review_scores_rating

+  active_modelling_pipeline:
+      model_options:
+        test_size: 0.2
+        random_state: 3
+        features:
+          - engines
+          - passenger_capacity
+          - crew
+          - d_check_complete
+          - moon_clearance_complete
+          - iata_approved
+          - company_rating
+          - review_scores_rating
+
+  candidate_modelling_pipeline:
+      model_options:
+        test_size: 0.2
+        random_state: 8
+        features:
+          - engines
+          - passenger_capacity
+          - crew
+          - review_scores_rating

Edit conf/base/catalog.yml as follows.

conf/base/catalog.yml
-  regressor:
-    type: pickle.PickleDataSet
-    filepath: data/06_models/regressor.pickle
-    versioned: true

+  active_modelling_pipeline.regressor:
+    type: pickle.PickleDataSet
+    filepath: data/06_models/regressor_active.pickle
+    versioned: true
+
+  candidate_modelling_pipeline.regressor:
+    type: pickle.PickleDataSet
+    filepath: data/06_models/regressor_candidate.pickle
+    versioned: true

Edit pipelines/data_science/pipeline.py as follows.

pipelines/data_science/pipeline.py
  from kedro.pipeline import Pipeline, node
- from kedro.pipeline import pipeline
+ from kedro.pipeline.modular_pipeline import pipeline

  from .nodes import evaluate_model, split_data, train_model


  def create_pipeline(**kwargs) -> Pipeline:
-      return pipeline(
-          [
-              node(
-                  func=split_data,
-                  inputs=["model_input_table", "params:model_options"],
-                  outputs=["X_train", "X_test", "y_train", "y_test"],
-                  name="split_data_node",
-              ),
-              node(
-                  func=train_model,
-                  inputs=["X_train", "y_train"],
-                  outputs="regressor",
-                  name="train_model_node",
-              ),
-              node(
-                  func=evaluate_model,
-                  inputs=["regressor", "X_test", "y_test"],
-                  outputs=None,
-                  name="evaluate_model_node",
-              ),
-          ]
-      )

+      pipeline_instance = pipeline(
+          [
+              node(
+                  func=split_data,
+                  inputs=["model_input_table", "params:model_options"],
+                  outputs=["X_train", "X_test", "y_train", "y_test"],
+                  name="split_data_node",
+              ),
+              node(
+                  func=train_model,
+                  inputs=["X_train", "y_train"],
+                  outputs="regressor",
+                  name="train_model_node",
+              ),
+              node(
+                  func=evaluate_model,
+                  inputs=["regressor", "X_test", "y_test"],
+                  outputs=None,
+                  name="evaluate_model_node",
+              ),
+          ]
+      )
+      ds_pipeline_1 = pipeline(
+          pipe=pipeline_instance,
+          inputs="model_input_table",
+          namespace="active_modelling_pipeline",
+      )
+      ds_pipeline_2 = pipeline(
+          pipe=pipeline_instance,
+          inputs="model_input_table",
+          namespace="candidate_modelling_pipeline",
+      )
+
+      return ds_pipeline_1 + ds_pipeline_2

If you run kedro run, you will see that both active_modelling_pipeline and candidate_modelling_pipeline pipelines are running.

$ kedro run

[11/02/22 10:41:06] WARNING  /Users/<username>/opt/anaconda3/envs/py38/lib/python3.8/site-packages/plotly/graph_objects/ warnings.py:109
                             __init__.py:288: DeprecationWarning: distutils Version classes are deprecated. Use
                             packaging.version instead.
                               if LooseVersion(ipywidgets.__version__) >= LooseVersion("7.0.0"):

[11/02/22 10:41:07] INFO     Kedro project kedro-tutorial                                                                   session.py:340
[11/02/22 10:41:08] INFO     Loading data from 'companies' (CSVDataSet)...                                             data_catalog.py:343
                    INFO     Running node: preprocess_companies_node: preprocess_companies([companies]) ->                     node.py:327
                             [preprocessed_companies]
                    INFO     Saving data to 'preprocessed_companies' (ParquetDataSet)...                               data_catalog.py:382
                    INFO     Completed 1 out of 9 tasks                                                            sequential_runner.py:85
                    INFO     Loading data from 'shuttles' (ExcelDataSet)...                                            data_catalog.py:343
[11/02/22 10:41:13] INFO     Running node: preprocess_shuttles_node: preprocess_shuttles([shuttles]) ->                        node.py:327
                             [preprocessed_shuttles]
                    WARNING  /Users/<username>/Documents/kedro-projects/kedro-tutorial/src/kedro_tutorial/pipelines/data warnings.py:109
                             _processing/nodes.py:19: FutureWarning: The default value of regex will change from True to
                             False in a future version. In addition, single character regular expressions will *not* be
                             treated as literal strings when regex=True.
                               x = x.str.replace("$", "").str.replace(",", "")

                    INFO     Saving data to 'preprocessed_shuttles' (ParquetDataSet)...                                data_catalog.py:382
                    INFO     Completed 2 out of 9 tasks                                                            sequential_runner.py:85
                    INFO     Loading data from 'preprocessed_shuttles' (ParquetDataSet)...                             data_catalog.py:343
                    INFO     Loading data from 'preprocessed_companies' (ParquetDataSet)...                            data_catalog.py:343
                    INFO     Loading data from 'reviews' (CSVDataSet)...                                               data_catalog.py:343
                    INFO     Running node: create_model_input_table_node:                                                      node.py:327
                             create_model_input_table([preprocessed_shuttles,preprocessed_companies,reviews]) ->
                             [model_input_table]
^[[B[11/02/22 10:41:14] INFO     Saving data to 'model_input_table' (ParquetDataSet)...                                    data_catalog.py:382
[11/02/22 10:41:15] INFO     Completed 3 out of 9 tasks                                                            sequential_runner.py:85
                    INFO     Loading data from 'model_input_table' (ParquetDataSet)...                                 data_catalog.py:343
                    INFO     Loading data from 'params:active_modelling_pipeline.model_options' (MemoryDataSet)...     data_catalog.py:343
                    INFO     Running node: split_data_node:                                                                    node.py:327
                             split_data([model_input_table,params:active_modelling_pipeline.model_options]) ->
                             [active_modelling_pipeline.X_train,active_modelling_pipeline.X_test,active_modelling_pipeline.y_t
                             rain,active_modelling_pipeline.y_test]
                    INFO     Saving data to 'active_modelling_pipeline.X_train' (MemoryDataSet)...                     data_catalog.py:382
                    INFO     Saving data to 'active_modelling_pipeline.X_test' (MemoryDataSet)...                      data_catalog.py:382
                    INFO     Saving data to 'active_modelling_pipeline.y_train' (MemoryDataSet)...                     data_catalog.py:382
                    INFO     Saving data to 'active_modelling_pipeline.y_test' (MemoryDataSet)...                      data_catalog.py:382
                    INFO     Completed 4 out of 9 tasks                                                            sequential_runner.py:85
                    INFO     Loading data from 'model_input_table' (ParquetDataSet)...                                 data_catalog.py:343
                    INFO     Loading data from 'params:candidate_modelling_pipeline.model_options' (MemoryDataSet)...  data_catalog.py:343
                    INFO     Running node: split_data_node:                                                                    node.py:327
                             split_data([model_input_table,params:candidate_modelling_pipeline.model_options]) ->
                             [candidate_modelling_pipeline.X_train,candidate_modelling_pipeline.X_test,candidate_modelling_pip
                             eline.y_train,candidate_modelling_pipeline.y_test]
                    INFO     Saving data to 'candidate_modelling_pipeline.X_train' (MemoryDataSet)...                  data_catalog.py:382
                    INFO     Saving data to 'candidate_modelling_pipeline.X_test' (MemoryDataSet)...                   data_catalog.py:382
                    INFO     Saving data to 'candidate_modelling_pipeline.y_train' (MemoryDataSet)...                  data_catalog.py:382
                    INFO     Saving data to 'candidate_modelling_pipeline.y_test' (MemoryDataSet)...                   data_catalog.py:382
                    INFO     Completed 5 out of 9 tasks                                                            sequential_runner.py:85
                    INFO     Loading data from 'active_modelling_pipeline.X_train' (MemoryDataSet)...                  data_catalog.py:343
                    INFO     Loading data from 'active_modelling_pipeline.y_train' (MemoryDataSet)...                  data_catalog.py:343
                    INFO     Running node: train_model_node:                                                                   node.py:327
                             train_model([active_modelling_pipeline.X_train,active_modelling_pipeline.y_train]) ->
                             [active_modelling_pipeline.regressor]
                    INFO     Saving data to 'active_modelling_pipeline.regressor' (PickleDataSet)...                   data_catalog.py:382
                    INFO     Completed 6 out of 9 tasks                                                            sequential_runner.py:85
                    INFO     Loading data from 'candidate_modelling_pipeline.X_train' (MemoryDataSet)...               data_catalog.py:343
                    INFO     Loading data from 'candidate_modelling_pipeline.y_train' (MemoryDataSet)...               data_catalog.py:343
                    INFO     Running node: train_model_node:                                                                   node.py:327
                             train_model([candidate_modelling_pipeline.X_train,candidate_modelling_pipeline.y_train]) ->
                             [candidate_modelling_pipeline.regressor]
                    INFO     Saving data to 'candidate_modelling_pipeline.regressor' (PickleDataSet)...                data_catalog.py:382
                    INFO     Completed 7 out of 9 tasks                                                            sequential_runner.py:85
                    INFO     Loading data from 'active_modelling_pipeline.regressor' (PickleDataSet)...                data_catalog.py:343
                    INFO     Loading data from 'active_modelling_pipeline.X_test' (MemoryDataSet)...                   data_catalog.py:343
                    INFO     Loading data from 'active_modelling_pipeline.y_test' (MemoryDataSet)...                   data_catalog.py:343
                    INFO     Running node: evaluate_model_node:                                                                node.py:327
                             evaluate_model([active_modelling_pipeline.regressor,active_modelling_pipeline.X_test,active_model
                             ling_pipeline.y_test]) -> None
                    INFO     Model has a coefficient R^2 of 0.462 on test data.                                                nodes.py:60
                    INFO     Completed 8 out of 9 tasks                                                            sequential_runner.py:85
                    INFO     Loading data from 'candidate_modelling_pipeline.regressor' (PickleDataSet)...             data_catalog.py:343
                    INFO     Loading data from 'candidate_modelling_pipeline.X_test' (MemoryDataSet)...                data_catalog.py:343
                    INFO     Loading data from 'candidate_modelling_pipeline.y_test' (MemoryDataSet)...                data_catalog.py:343
                    INFO     Running node: evaluate_model_node:                                                                node.py:327
                             evaluate_model([candidate_modelling_pipeline.regressor,candidate_modelling_pipeline.X_test,candid
                             ate_modelling_pipeline.y_test]) -> None
                    INFO     Model has a coefficient R^2 of 0.449 on test data.                                                nodes.py:60
                    INFO     Completed 9 out of 9 tasks                                                            sequential_runner.py:85
                    INFO     Pipeline execution completed successfully.

pipeline() wrapper

The following imports allow the creation of multiple instances of a pipeline with dynamic inputs and outputs and parameters.

from kedro.pipeline.modular_pipeline import pipeline

The arguments to pipeline() are as follows

Keyword argument Description
pipe The Pipeline object you want to wrap
inputs Any overrides provided to this instance of the underlying wrapped Pipeline object
outputs Any overrides provided to this instance of the underlying wrapped Pipeline object
parameters Any overrides provided to this instance of the underlying wrapped Pipeline object
namespace The namespace that will be encapsulated by this pipeline instance

Pipeline linage can be checked with the kedro viz command.

Kedro viz

References

https://kedro.readthedocs.io/en/stable/nodes_and_pipelines/modular_pipelines.html
https://kedro.readthedocs.io/en/stable/tutorial/add_another_pipeline.html#optional-modular-pipelines

Ryusei Kakujo

researchgatelinkedingithub

Focusing on data science for mobility

Bench Press 100kg!