Pipeline function
Write pipeline function
Refer to Kubeflow Pipelines on details how to implement the pipeline.
Important: Kfops executes pipeline function using Kubeflow Pipelines v1.7.0
(pip install kfp==1.7.0
). Make sure your function satisfies this requirement.
Remember that execution mode
defined in your config.yaml
should match the mode used in pipeline function definition.
By default execution mode is set to V2_COMPATIBLE
.
The model saving step has to be defined explicitly in the pipeline code.
Save ML model from the pipeline
The Python kfops
module contains convenience function materialize_model
to simplify the
process of saving the model to MinIO bucket.
The function requires your pipeline is written using v2 execution mode and
can be imported using: from kfops.model import materialize_model
.
Example below saves the model directly from the pipeline's "train" task:
import kfp
from kfops.model import materialize_model
from kfp.v2.dsl import component, Output, Model
@component
def train(model: Output[Model]): # Important: Make sure the variable name is 'model'
# ... Here are all the steps required to train the model ...
# Save the model to the temporary file
import os
import pickle
from distutils.dir_util import copy_tree
os.mkdir('/tmp/models')
with open('/tmp/models/model.pickle', 'wb') as f:
pickle.dump(my_trained_model, f)
# Copy saved file to the "Output".
# Notice: No matter if your model is single or multiple files, make sure
# you are copying folder, not the file itself.
copy_tree('/tmp/models/', model.path)
# Pipeline
@kfp.v2.dsl.pipeline(name='Example_pipeline')
def example_pipeline(version_id: 'str'):
train_model_task = train()
materialize_model_task = materialize_model(model=train_model_task.outputs['model'])
# Notice: It is important to disable cache for that task
materialize_model_task.execution_options.caching_strategy.max_cache_staleness = "P0D"
Notice that if the container image used by your pipeline step does not have Python with
kfops
package installed, you will need to reproduce it manually.
Refer to project's code, file package/kfops/model.py
, for details.
Materialized model is copied to MinIO bucket: s3://trained_models/<pipeline_run_id>/
where pipeline_run_id
is ID of pipeline run that "produced" the model.
Synchronize container image tag with compiled Kubeflow Pipeline
When your container images are being built by Kaniko, they have to have an image tag.
In simple case they could always be marked with tag latest
but if you want to make sure
your pipelines are reproducible, backwards and forward compatible, then the container
image tag should match the compiled Kubeflow Pipeline.
To make the synchronization working, first the pipeline is built and then the ID (Kubeflow pipeline version ID) of compiled pipeline is used as a tag of the built container image.
Each compiled pipeline has to have input parameter "version_id" that when filled in with pipeline ID will execute container images with matching tag. Simply put, during pipeline run, "Version ID" of the compiled Kubeflow Pipeline is used as an image tag.
Kfops simplifies this setup with the versioned_image
function that can be used
inside your pipeline function definition as follows:
from kfops.tagger import versioned_image
my_op = kfp.components.create_component_from_func(
func=my_pipeline_function,
base_image=versioned_image('<IMAGE NAME>')
)
Where <IMAGE NAME>
is the image name defined in config.yaml
' file, section image_builder.images
:
image_builder:
container_registry_uri: <YOUR REGISTRY URI>
images:
- name: <IMAGE NAME>
The image tag (Kubeflow Pipelines Version ID) has to be passed into pipeline.
This requires simple change in your pipeline function code with version_id
passed on the input:
@kfp.dsl.pipeline(name='My pipeline')
def my_pipeline(version_id: 'str'):
...
Pull images from private docker registry
Note: this section is not directly related to Kfops but might save you time when composing your pipeline.
The Kfops with Kaniko allows you to push images to private registry but additional changes have to be made in order to pull those images into your Kubeflow pipeline.
Steps:
-
Create docker registry secret in a namespace where pipeline is going to be executed. Refer to Kubernetes documentation for details. The secret has to be created per each namespace in which your pipeline is going to be executed. With Kfops, the Kubeflow pipeline is always executed in a single namespace, configured with
pipeline.namespace
parameter inconfig.yaml
file. -
Add
set_image_pull_secrets
function to the Kubeflow Pipeline with secret you just created. Check example here for details.