December 19, 2022

Scaling ML Model Development With MLflow

Scaling ML models

Model prototyping and experimenting are crucial parts of the model development journey, where signals are extracted from data and new codes are created. To keep track of all of the chaos within this phase MLflow comes to help us. In this blog post will see a possible Python SDK implementation to help your data science team to keep track of all the model’s experiments, saving from codes to artifacts to plots and related files.

In this article, let’s see how your data science team 👩‍💻👨‍💻 could enjoy experimenting 🧪 with new models with an MLflow SDK. If you’re interested in MLflow and you want to go the extra mile, follow me ✋

What is MLflow?

MLflow is an open-source platform for machine learning that covers the entire ML-model cycle, from development to production and retirement. MLflow comes directly from Databricks, it works with any library, language, and framework and it can run on the cloud and it is a pivotal product for collaboration across teams.

We can define four MLflow products:

  • MLflow Tracking: this component covers the model experiment phase. Data scientists can study and develop different data and models’ parameters, gather all the relevant info in MLflow and choose which model could get to the production phase
  • MLflow Projects: it often happens in a big collaboration that data, APIs, and code need to be shared. Constantly collaborators come across installation issues, problems in the model, code segmentation faults, and other similar “life-threatening” hurdles. Projects allow a common format for packaging data code, making them more reproducible and sharing across teams without heart attacks
  • MLflow Models: once a team lands on the production land, the next step is to avoid confusion and mess of models. Models allow deploying machine learning models in diverse environments in an easy and intuitive way
  • MLflow Model Registry: and how to avoid the final mess in production? A registry is a solution, keeping track of all the model’s status, whether they are in production, in staging or they are finally retired.

I will write 3 articles about MLflow, which will deal with MLflow Tracking and MLflow models. In particular, in this story, I’ll show you how to help your team by thinking of an MLflow SDK, so that data scientists can skip some tedious parts in coding MLflow. In the second story, we’ll extend the SDK to retrieve metrics and create intuitive Plotly plots for data scientists, along with some examples. Finally, I will show you how to create an MLflow plugin, that can be used for deploying the model in GCP AI-platform

Ten seconds introduction to the MLflow environment

MLflow developers have put together this fantastic and thorough guide, to allow everyone to install MLflow and start working with it: https://mlflow.org/docs/latest/quickstart.html

Here I am going to give you a very quick glimpse, running on a local machine. After MLflow has been installed, we can write a wee code like this one:

import os
from random import random, randint
from mlflow import log_metric, log_param, log_artifacts

log_param("this_param", randint(0,100))
log_metric("metric1", random())
log_metric("metric2", random())
log_metric("metric3", random())

if not os.path.exists("outputs"):
    os.makedirs("outputs")
with open("outputs/test.txt", "w") as f:
    f.write("This is my important file")

log_artifacts("outputs")

Fig.1: A wee introduction code to MLflow

Execute this code, by running python name_of_your_file.py , and this will start a recording process in MLflow. Here is where MLflow comes into action! In your CLI just type mlflow ui to initiate the MLflow graphical interface. Head to http://localhost:5000 (or http://127.0.0.1:5000) and you’ll land in the MLflow UI (fig.2):

Fig.2: MLflow UI. On the left is the list of experiments and in the center the runs along with basic information and parameters, and metrics.

If you click on a specific run you’ll get a new window, as shown in fig.3. Here you can retrieve the run’s parameters (e.g. this could be the number of layers, batch size, initial learning rate, and so on), the run’s metrics as interactive plots (e.g. accuracy as a function of steps/epochs) and all the saved artifacts

Fig.3: MLflow single run UI. In this window you can get more details about the code run and check artifacts as well

Help out your team: towards an MLflow SDK

So far so good, but now it’s time to scale up and help your data science team to integrate MLflow into their already existent models. An MLflow SDK, built on top of the MLflow package, will be a perfect solution, in order not only to lower the headache for the team to digest a new technology but also to accommodate data scientists’ requests in a simple Python class.

First thing, we need to understand what our MLflow class should do and what path we’d like to go for. Fig.4 shows a possible roadmap, from the creation of the MLflow SDK towards CI/CD and deployment through MLflow plugins

Fig.4: Roadmap sketch. The MLflow SDK should take care of existing data scientists’ models and save artifacts. Then an MLflow deployment plugin could push final results to production

Implement the MLflow tracking SDK class

Thus, the very first step is to devise a class for implementing MLflow, that could have the following functions:

  • read_params this function reads the tracking input parameters, which are needed to set up a new MLflow experiment
  • set_tracking_uri and get_tracking_uri: these functions set mlflow tracking uri (mlflow.set_tracking_uri) and return the current tracking uri respectively
  • set_experiment_id and get_experiment_id: the set function checks whether an experiment_name has been given, otherwise it creates a new one.
  • run_training: this is the main function to capture the model’s parameters and metrics (mainly through the use of the Python module inspect). Model’s parameters and metrics are retrieved automatically by mlflow with mlflow.package.autolog functions.

log_any_model_file: it happens sometimes that MLflow autolog functionalities are not able to upload saved models files to MLflow-server. This function helps data scientists in these situations. As a matter of fact, once the training has been finished, this function is called and it is able to find for all the .joblib, .h5, .pkl, .model files within the working running directory as well

  • as metrics in png format and json metadata file and report them correctly to MLflow-server.

Object constructor

So let’s now start to code this up, setting up a class interface and protocol and its constructor. It could be useful to have a cloud storage option, so you can import this (e.g. from your_cloud_provider import storage_module ) and save artifacts directly on the cloud.

import mlflow
from mlflow.tracking.client import MlflowClient
import abc
import os
import sys
import datetime
import plotly.express as px
import plotly.graph_objects as go
# import your cloud provider storage SDK
from your_cloud_provider import storage_module


# Define a class interface
class ExperimentTrackingInterface(object, metaclass=abc.ABCMeta):
    r"""A class to implement experiment tracking in a portable and integrable way
    """
    @abc.abstractmethod
    def __init__(self):
        r""" Default value if not specified"""
        pass
    ... # all methods
   
# Define the protocol
class ExperimentTrackingProtocol(ExperimentTrackingInterface):
    r""" Protocol for experiment tracking."""

    def __init__(self):
        r""" Initialize experiment tracking default values"""
        self.tracking_uri = 'http://localhost:5000' # or whatever uri you can access to
        self.tracking_storage = 'path_to_your_default_storage_option'
        self.run_name = 'default_run'
        self.tags = {}
        self.experiment_name = "tester"
        self.caller = "thisfile.py"
        self.run_id = "0000000"

Fig.5: Set up the MLflow SDK interface and protocol. The SDK will live on this main object

Read input parameters

Next, we can start thinking of parsing and reading the parameters. Data scientists could parse a yaml or json or txt input files with given instructions and parameters to a Python dictionary, that could be read directly by the class:

def read_params(self, experiment_tracking_params):
    r""" Read input parameters from experiment_tracking_params dictionary for setting up MLflow
    Parameters
    ----------
    experiment_tracking_params: dict, parameters for mlflow with the following keys:
                                tracking_uri: str, tracking uri to run on mlflow (e.g. http://127.0.0.1:5000)
                                tracking_storage: str, output path, this can be a local folder or cloud storage
                                experiment_name: str, name of the model we're experimenting,
                                run_name: str, name of the runner of the experiment
                                tags: dict, specific tags for the experiment
    """
    # MLFLOW
    self.tracking_uri = experiment_tracking_params['tracking_uri']
    self.tracking_storage = experiment_tracking_params['tracking_storage']
    self.run_name = experiment_tracking_params['run_name']
    self.tags = experiment_tracking_params['tags']
    self.caller = experiment_tracking_params['caller']

    # define some metrics
    self.metrics_per_epochs = ['loss',
                              'accuracy',
                              'val_loss',
                              'val_accuracy',
                              'lr']
    self.experiment_name = experiment_tracking_params['experiment_name']

    self.tracker = experiment_tracking_params['tracker']

Fig.6: Read parameters function. Data scientists could parse an input file to a dictionary, which provides instructions such as the tracking uri (e.g. http://localhost:5000), as well as the storage location, run name, and tags

Here in this example, I added self.caller. We’ll see later how this info is passed to the SDK, but this variable aims to record which script has called the MLflow SDK, to copy that Python script to the artifacts and achieve a code-lineage as well. 

Code lineage is useful at run time, as many modifications can happen and, although data scientists could keep track of changes in Github, some changes may be missing, leading to puzzling and mysterious results.

Setters and getters

Following, we need to take care of the setters and getters (fig.7). These functions may sound useless, but they are so important when executing massive and long codes, that could save your life!

def set_tracking_uri(self):
    r""" This function set the MLflow tracking uri, so results will be reported to that specific server. """
    try:
        mlflow.set_tracking_uri(self.tracking_uri)
    except:
        pass

def get_tracking_uri(self):
    r""" Getter for tracking_uri"""
    try:
        print(f"Tracking uri: {mlflow.get_tracking_uri()}")
        return mlflow.get_tracking_uri()
    except:
        pass

     
def set_experiment_id(self):
    r""" Setter for experiment_id. Initially a check is done based on self.experiment_name.
    If the experiment already exists, the function retrieves the experiment_id, otherwise
    a new experiment is set up. If experiment_name is None, datetime is used to create a new
    experiment name"""

    if not self.experiment_name:
        # if experiment name is not given create a new one
        self.experiment_name = datetime.datetime.today().strftime("%Y-%m-%d_%H_%M_%S")
        # check if experiment exists already and retrieve the bucket location, otherwise create a new one
    try:
        self.experiment_id = mlflow.get_experiment_by_name(self.experiment_name).experiment_id
    except:
        # let's add the experiment name for the artifact location
        # if experiment_id is None let's create a new experiment
        self.experiment_id = mlflow.create_experiment(self.experiment_name,
                                                      artifact_location=self.tracking_storage)
       
    return self.experiment_id


def get_experiment_id(self):
    r""" Getter for experiment id"""
    print(f"Experiment id {self.experiment_id}")
    return self.experiment_id
 
 
def set_run_id(self, run_id):
    r"""Set run id, this can be useful to retrieve info for a specific run"""
    self.run_id = run_id
   

def get_run_id(self):
    r""" return the run id"""
    return self.run_id

Fig.7: Setters and getters for the main class.

It’s worth spending a few seconds on the set_experiment_id To create a new experiment the experiment_name and tracking_storage are passed to mlflow.create_experiment(experiment_name, tracking_bucket) and from there the experiment_id is retrieved. The experiment_id, along with the run_name, is mandatory to start a new run in MLflow, thus we need to pay special attention to these variables.

Training model routine

At this point we’re ready to deal with the training routine. Usually, MLflow keeps track of model training jobs through a context manager, which is a Python protocol, that often simplifies some resource management patterns:

with mlflow.start_run():
    lr = ElasticNet(alpha=alpha, l1_ratio=l1_ratio, random_state=42)
    lr.fit(train_x, train_y)

    predicted_qualities = lr.predict(test_x)

    (rmse, mae, r2) = eval_metrics(test_y, predicted_qualities)

    print("Elasticnet model (alpha=%f, l1_ratio=%f):" % (alpha, l1_ratio))
    print("  RMSE: %s" % rmse)
    print("  MAE: %s" % mae)
    print("  R2: %s" % r2)

    mlflow.log_param("alpha", alpha)
    mlflow.log_param("l1_ratio", l1_ratio)
    mlflow.log_metric("rmse", rmse)
    mlflow.log_metric("r2", r2)
    mlflow.log_metric("mae", mae)

Fig.8: Example from the MLflow tutorial (https://www.mlflow.org/docs/latest/tutorials-and-examples/tutorial.html) about training tracking

The problem of context manager is that with mlflow.start_run() can record artifacts and training sessions only if directly called within the training Python code. This may be a very boring restructuring task for data scientists, who should re-write part of their codes to accommodate these functionalities. A wanted solution would be something similar:

from your_mlflow_sdk import experiment_tracking
...
if tracking: # if you decide to track:
  experiment_tracking.start_trianing_job(input_parameters)
  history = model.fit(**fit_params)
 
  # report additional metrics (we'll see below)
  experiment_tracking.end_training_job(input_parameters)
else:
  history = model.fit(**fit_params) # normal training

run_training function is supposed to deal with this trick, it can listen to what the user is doing, without interfering with already written pieces of codes:

def run_training(self,):
    r""" Function to run an experiment tracking job, given a model and its fitting params
    """
    def run_mlflow():
        r""" Function to run experiment tracking on mlflow
        Parameters
        ----------
        """
        # autolog model parameters
        mlflow.autolog()
        # here run an inspect to cehck what's in input
        mlflow.xgboost.autolog(log_input_examples=True)

        mlflow.sklearn.autolog(log_models=True,
                              log_input_examples=True,
                              log_model_signatures=True,)

        mlflow.tensorflow.autolog(every_n_iter=100,
                                  log_models=True,
                                  )
        mlflow.keras.autolog(log_models=True)
        mlflow.lightgbm.autolog(log_input_examples=True)
        mlflow.keras.log_model = "h5"
        # record the model training
        starter = mlflow.start_run(experiment_id=self.experiment_id,
                        run_name=self.run_name,
                        nested=False,
                        tags=self.tags)

        # self.caller is the path of the model code, save the code to the artifacts
        mlflow.log_artifact(self.caller, artifact_path='code')
        # run_id
        run_id = starter.info.run_id
        #
        print(f"Run id: {run_id}")
        return run_id


    # run MLFLOW
    run_id = run_mlflow()
    return run_id

Fig.9: Main training function, that can be called to record all the model artifacts

The function calls mlflow.autlog() functions, which provide fantastic support in listening to xgboost , sklearn , tensorflow , keras, and lightgbm modules, automatically reporting all the model parameters to the main UI. The core of the function is mlflow.start_run() which sets up the run to be recorded and, as I said above, it reports the caller Python script self.caller to the artifacts folder code as mlflow.log_artifact(self.caller, artifact_path=’code’)

Log any model file/artifacts to a MLflow

One of the problems in avoiding the context manager is that not all the generated model files will be automatically saved to the MLflow server. In my case, for example, many of the model files were saved either locally or to the cloud, but I wanted them to be reported in the MLflow server as well. To patch this issue, log_any_model_file , fig. 10, comes into play.

The function calls the newly run experiments. It checks whether the experiment’s runs went fine with client.search_runs(…).to_dictionary() . Then, it looks at a given storage location, either local or cloud, to see which files have been saved and created there. Following, a list of wanted formats is scanned, local_file_to_upload , and run a for-loop to log all the found files in a model folder with client.log_artifact(run_id, file_to_upload, artifact_path=’model’)

def log_any_model_file(self):
    r""" This function looks for pkl, h5,.model, .joblib files in the current working
    directory, as well as metadata and metrics files.
    Log all these files to the MLflow server if those has not been saved.
    This ensures we can save all the possible models, including, for example,
    hybrid sklearn pipelines."""

    def parse_storage(artifact_uri):
        r""" Parse the storage
       
        Parameters
        ----------
        artifact_uri: str, path from self.tracking_storage
       
        Returns
        --------
        artifacts: list, list of all the files saved on the tracking storage
        """
        # you may need some preprocessing here
        storage = artifact_uri.split("/")[2]
        prefix = "/".join(artifact_uri.split("/")[3:])

        artifact_storage = storage_module(storage)
        artifact_list = artifact_storage.list_of_files(prefix)
        artifacts = []

        for artifact in artifact_list:
            artifacts.append(artifact.name.split("/")[-1])

        return artifacts


    def parse_local(artifact_path):
        r""" Function to return artifacts from local folder
       
        Parameters
        ----------
        artifact_path: str, path to local folder where model artifacts are saved
       
        Returns
        -------
        artifacts: list, list of all the files saved on the tracking storage
        """
        # given the current working folder check what files are contained
        artifacts = []

        for root, dirs, files in os.walk(artifact_path):
            for file in files:
                # just take the name of the file
                artifacts.append(file)

        return artifacts

    def list_artifacts(last_run):
        r""" From the last run retrieve all the files in the artifact storage
       
        Parameters
        ----------
        last_run: dict, last run info retrieved from MLflow server
       
        Return
        ------
        artifacts: list, list of all the files saved on the tracking storage
        (under model folder)
        """
        # enlist all the files in the cloud storage or local
        artifact_prefix = last_run['info']['artifact_uri']
        if artifact_prefix.startswith("your_storage_location"): # e.g. for aws is s3
            # create the usual cloud uri
            artifact_uri = artifact_prefix + "/model"
            artifacts = parse_storage(artifact_uri)
        else:
            # local mode
            artifacts = parse_local(artifact_prefix)

        return artifacts


    def local_files_to_upload(working_dir, artifacts, run_date):
        r""" This function looks for local files to upload. Firstly files in working_dir
        are scanned and added to a list. If the file is already present in artifacts
        the file is ignored. Then, we take only the most recent files
       
        Parameters
        ----------
        working_dir: str, current working directory (os.getcwd())
        artifacts: list, list of artifacts on MLflow server or local MLflow folder
        run_date: str, this is the run date timestamp
       
        Returns
        -------
        list_of_files: list, list of most recent files to be uploaded
        """
        format_files = ["h5",
                        "pkl",
                        "model",
                        "joblib",
                        "metadata.json",
                        "metrics.json",]
        # list of excluded directories, where we do not want to scan for files
        excluded_dirs = ["exclude_directories_you_do_not_want"]
        # define a list of files to be uplaoded
        list_of_files = []
        # check if a file has this extension. if the file already is contained in artifacts skip it
        # otherwise add it to the upload list
        for root, dirs, files in os.walk(working_dir, topdown=True):
            # exclude these directories:
            dirs[:] = [d for d in dirs if d not in excluded_dirs]
            for file in files:
                extension = file.split(".")[-1]
                filename = file.split("/")[-1]
                if (extension in format_files) or (filename in format_files):
                    # check if file is already present in artifacts
                    if filename in artifacts:
                        continue
                    else:
                        # just add the most recent file if the file is arleady in list
                        file_path = os.path.join(root, file)
                        # check timestamp
                        creation_date = os.path.getctime(file_path)
                        if creation_date > run_date:
                            # this means the file has been created after the run
                            list_of_files.append(os.path.join(root, file))
                        else:
                            continue

        return list_of_files

    # MAIN FUNCTION
    client = MlflowClient(tracking_uri=self.tracking_uri)
    # read the experiment
    experiment = dict(client.get_experiment_by_name(self.experiment_name))
    # check if there's a run
    try:
        last_run = client.search_runs([experiment['experiment_id']])[0].to_dictionary()
    except:
        print("No runs found for the given experiment")
        sys.exit(-1)

    run_date = (last_run['info']['start_time'])/1000 # timestamp
    # take run_id
    run_id = last_run['info']['run_id']
    # enlist all the files on the cloud or local
    artifacts = list_artifacts(last_run)
    # now check if model/metrics files are on local or cloud and push them to MLflow server
    files_to_upload = local_files_to_upload(os.getcwd(), artifacts, run_date)
    # upload
    for file_to_upload in files_to_upload:
        print(f"Uploading {file_to_upload} to model")
        client.log_artifact(run_id, file_to_upload, artifact_path='model')

Fig.10: log any metadata and model file to MLflow server. Sometimes not all the files are directly saved by MLflow, especially when using a mixed model. Thus, this function looks for additional artifacts on the cloud or locally and upload them to the MLflow server.

Patch all together in a module

Now that you’ve created the basis of MLflow SDK, we can go ahead with implementing a training module that data scientists could use for their experiment. The module name could be experiment_tracking_training.py which implements the functions start_training_job and end_training_job .

The former function sets up the MLflow tracking job. It retrieves the caller Python script code through a traceback module with caller = traceback.extract_stack()[-2][0] , which will be reported amongst the artifacts. Then, it initializes the experiment parameters and finally kicks off the training run runner.run_training()

end_training_job can be called after the model fitting, to retrieve any other possible file which was not saved, through the log_any_model_file().

import traceback
from your_package import ExperimentTrackingInterface


def start_training_job(experiment_tracking_params=None,):
    r"""This function runs a given model with experiment tracker

    Parameters:
    --------
    experiment_tracking_params: dict, parameters for mlflow with the following keys:
                                tracking_uri: str, tracking uri to run on mlflow (e.g. http://127.0.0.1:5000)
                                tracking_storage: str, output path, this can be a local folder or cloud storage
                                experiment_name: str, name of the model we're experimenting,
                                run_name: str, name of the runner of the experiment
                                tags: dict, specific tags for the experiment

    Returns:
    --------
    run_id: str
            run id of the current run. This might be helpful if customer metrics need to be added later
    """

    # retrieve where what script is running this function, this is the caller
    caller = traceback.extract_stack()[-2][0]
    # add the caller to the input parameters given by data scientists
    experiment_tracking_params['caller'] = caller
    # set up the protocol
    runner = ExperimentTrackingInterface.ExperimentTrackingProtocol()
    runner.read_params(experiment_tracking_params)
    print("Setting up tracking uri")
    runner.set_tracking_uri()
    print("Setting up experiment id")
    runner.set_experiment_id()
    print("Run training")
    # and here we go record the training session
    run_id = runner.run_training()
    return run_id
   

def end_training_job(experiment_tracking_params=None):
    r""" This function helps to complete an experiment. The function take the very last run
    of a given experiment family and reports artefacts and files
   
    Parameters
    -------
    experiment_tracking_params: dict, parameters for mlflow with the following keys:
                                tracking_uri: str, tracking uri to run on mlflow (e.g. http://127.0.0.1:5000)
                                tracking_storage: str, output path, this can be a local folder or cloud storage
                                experiment_name: str, name of the model we're experimenting,
                                run_name: str, name of the runner of the experiment
                                tags: dict, specific tags for the experiment
    """
    # set up the protocol
    runner = ExperimentTrackingInterface.ExperimentTrackingProtocol()
    runner.read_params(experiment_tracking_params)
    runner.set_tracking_uri()
    # check for any possible model which has not been saved - possibly due to mlflow experimental autlogs
    runner.log_any_model_file()

Fig.11: Module to start and complete a training job session recording with MLflow. The start_training_job initializes the ExperimentTrackingProtocol while end_training_job checks for further artifacts which have not been saved to the MLflow server.

That’s all for today! We went through a lot of things and if you’re not familiar with MLflow you’d like to digest everything step by step. Stay tuned for our next story, which will show how to report additional metrics to our experiments and we’ll see some examples of how to use our MLflow SDK 🙂

If you have any questions or curiosity, just write me an email at stefanobosisio1 [at] gmail.com


Author’s Bio: Stefano is a Machine Learning Engineer at Trustpilot, based in Edinburgh. Stefano helps data science teams to have a smooth journey from model prototyping to model deployment. Stefano’s background is Biomedical Engineering (Polytechnic of Milan) and a Ph.D in Computational Chemistry (University of Edinburgh).