Source code for lightning.pytorch.loggers.mlflow
# Copyright The Lightning AI team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
MLflow Logger
-------------
"""
import os
from argparse import Namespace
from collections.abc import Mapping
from typing import TYPE_CHECKING, Any, Literal, Optional, Union
from typing_extensions import override
from lightning.fabric.utilities.imports import _raise_enterprise_not_available
from lightning.pytorch.callbacks.model_checkpoint import ModelCheckpoint
from lightning.pytorch.loggers.logger import Logger, rank_zero_experiment
from lightning.pytorch.utilities.rank_zero import rank_zero_only
if TYPE_CHECKING:
from mlflow.tracking import MlflowClient
[docs]class MLFlowLogger(Logger):
"""Log using `MLflow <https://mlflow.org>`_.
Install it with pip:
.. code-block:: bash
pip install mlflow # or mlflow-skinny
.. code-block:: python
from lightning.pytorch import Trainer
from lightning.pytorch.loggers import MLFlowLogger
mlf_logger = MLFlowLogger(experiment_name="lightning_logs", tracking_uri="file:./ml-runs")
trainer = Trainer(logger=mlf_logger)
Use the logger anywhere in your :class:`~lightning.pytorch.core.LightningModule` as follows:
.. code-block:: python
from lightning.pytorch import LightningModule
class LitModel(LightningModule):
def training_step(self, batch, batch_idx):
# example
self.logger.experiment.whatever_ml_flow_supports(...)
def any_lightning_module_function_or_hook(self):
self.logger.experiment.whatever_ml_flow_supports(...)
Args:
experiment_name: The name of the experiment.
run_name: Name of the new run. The `run_name` is internally stored as a ``mlflow.runName`` tag.
If the ``mlflow.runName`` tag has already been set in `tags`, the value is overridden by the `run_name`.
tracking_uri: Address of local or remote tracking server.
If not provided, defaults to `MLFLOW_TRACKING_URI` environment variable if set, otherwise it falls
back to `file:<save_dir>`.
tags: A dictionary tags for the experiment.
save_dir: A path to a local directory where the MLflow runs get saved.
Defaults to `./mlruns` if `tracking_uri` is not provided.
Has no effect if `tracking_uri` is provided.
log_model: Log checkpoints created by :class:`~lightning.pytorch.callbacks.model_checkpoint.ModelCheckpoint`
as MLFlow artifacts.
* if ``log_model == 'all'``, checkpoints are logged during training.
* if ``log_model == True``, checkpoints are logged at the end of training, except when
:paramref:`~lightning.pytorch.callbacks.Checkpoint.save_top_k` ``== -1``
which also logs every checkpoint during training.
* if ``log_model == False`` (default), no checkpoint is logged.
prefix: A string to put at the beginning of metric keys.
artifact_location: The location to store run artifacts. If not provided, the server picks an appropriate
default.
run_id: The run identifier of the experiment. If not provided, a new run is started.
synchronous: Hints mlflow whether to block the execution for every logging call until complete where
applicable. Requires mlflow >= 2.8.0
Raises:
ModuleNotFoundError:
If required MLFlow package is not installed on the device.
"""
LOGGER_JOIN_CHAR = "-"
def __init__(
self,
experiment_name: str = "lightning_logs",
run_name: Optional[str] = None,
tracking_uri: Optional[str] = os.getenv("MLFLOW_TRACKING_URI"),
tags: Optional[dict[str, Any]] = None,
save_dir: Optional[str] = "./mlruns",
log_model: Literal[True, False, "all"] = False,
prefix: str = "",
artifact_location: Optional[str] = None,
run_id: Optional[str] = None,
synchronous: Optional[bool] = None,
):
_raise_enterprise_not_available()
from pytorch_lightning_enterprise.loggers.mlflow import MLFlowLogger as EnterpriseMLFlowLogger
super().__init__()
self.logger_impl = EnterpriseMLFlowLogger(
experiment_name=experiment_name,
run_name=run_name,
tracking_uri=tracking_uri,
tags=tags,
save_dir=save_dir,
log_model=log_model,
prefix=prefix,
artifact_location=artifact_location,
run_id=run_id,
synchronous=synchronous,
)
@property
@rank_zero_experiment
def experiment(self) -> "MlflowClient":
r"""Actual MLflow object. To use MLflow features in your :class:`~lightning.pytorch.core.LightningModule` do the
following.
Example::
self.logger.experiment.some_mlflow_function()
"""
return self.logger_impl.experiment
@property
def run_id(self) -> Optional[str]:
"""Create the experiment if it does not exist to get the run id.
Returns:
The run id.
"""
return self.logger_impl.run_id
@property
def experiment_id(self) -> Optional[str]:
"""Create the experiment if it does not exist to get the experiment id.
Returns:
The experiment id.
"""
return self.logger_impl.experiment_id
[docs] @override
@rank_zero_only
def log_hyperparams(self, params: Union[dict[str, Any], Namespace]) -> None:
return self.logger_impl.log_hyperparams(params)
[docs] @override
@rank_zero_only
def log_metrics(self, metrics: Mapping[str, float], step: Optional[int] = None) -> None:
return self.logger_impl.log_metrics(metrics, step)
[docs] @override
@rank_zero_only
def finalize(self, status: str = "success") -> None:
return self.logger_impl.finalize(status)
@property
@override
def save_dir(self) -> Optional[str]:
"""The root file directory in which MLflow experiments are saved.
Return:
Local path to the root experiment directory if the tracking uri is local.
Otherwise returns `None`.
"""
return self.logger_impl.save_dir
@property
@override
def name(self) -> Optional[str]:
"""Get the experiment id.
Returns:
The experiment id.
"""
return self.logger_impl.name
@property
@override
def version(self) -> Optional[str]:
"""Get the run id.
Returns:
The run id.
"""
return self.logger_impl.version
[docs] @override
def after_save_checkpoint(self, checkpoint_callback: ModelCheckpoint) -> None:
return self.logger_impl.after_save_checkpoint(checkpoint_callback)