MLOps 实战:模型训练、部署与监控流水线

FreeGuideOnline 最新 2026-06-12

MLOps 机器学习运维体系实战教程

引言:为什么我们需要 MLOps?

在传统软件开发中,DevOps 已经将开发、测试与运维无缝连接,实现了持续集成与持续交付(CI/CD)。然而,机器学习项目面临独特的挑战:数据漂移、模型退化、实验可复现性、特征工程与模型训练的复杂依赖等。MLOps(Machine Learning Operations)正是为了解决这些问题而生,它将 DevOps 原则与机器学习生命周期结合,使模型从实验到生产的全流程自动化、可监控且可持续迭代。

本教程面向初学者,但包含可直接落地的实战内容。我们将构建一条完整的 MLOps 流水线,覆盖模型训练、部署与监控三大核心环节。

MLOps 成熟度与核心组件

在动手之前,先理解 MLOps 的层级架构:

  • Level 0 (手动过程):脚本驱动,数据科学家手动执行每一步,完全没有自动化。
  • Level 1 (ML 流水线自动化):自动化训练流水线,模型持续训练(CT),实验追踪,模型注册与版本控制。
  • Level 2 (CI/CD 流水线):完整的自动化构建、测试、部署,包括数据与模型验证,一键回滚。

本节教程将聚焦于构建 Level 1~2 的混合流水线,涉及以下核心组件:

组件 功能 常用工具
数据与特征仓库 数据版本控制、特征定义与共享 Feast, DVC
实验追踪 记录参数、指标、模型文件 MLflow, Weights & Biases
模型注册 集中管理模型版本与阶段 MLflow Model Registry, Neptune
训练编排 自动化训练管道,资源调度 Kubeflow Pipelines, Airflow
模型服务 低延迟推理 API,部署策略 BentoML, KServe, Seldon Core
监控与告警 数据漂移、模型性能、系统健康 Evidently, Prometheus + Grafana

我们将以 MLflow 作为实验追踪和模型注册的核心,用 Kubeflow Pipelines 演示编排,并以 FastAPI + Docker 实现轻量级部署,最终加入 Evidently AI 进行生产监控。

实战环境准备

为了降低门槛,你可以选择在本地或云上搭建。以下为最小环境要求:

  • Python 3.9+
  • Docker & Docker Compose
  • Minikube(或使用 Kind)用于 Kubeflow 演示
  • Git

安装核心库:

pip install mlflow scikit-learn pandas numpy evidently fastapi uvicorn docker

启动 Mlflow Tracking Server(本地模式):

mlflow server --backend-store-uri sqlite:///mlflow.db --default-artifact-root ./mlruns --host 0.0.0.0 --port 5000

访问 http://localhost:5000 即可打开 MLflow UI。


模型训练流水线:从实验到可复现

1. 实验追踪与代码结构

一个稳健的训练脚本必须记录参数、指标、模型和环境。使用 MLflow 的自动日志(autolog)可以快速覆盖主流框架,但对于更精细的控制,推荐显式调用 API。

创建项目结构:

mlops-project/
├── data/
│   └── raw/
├── src/
│   ├── train.py
│   ├── evaluate.py
│   └── pipeline.py
├── docker/
└── pipelines/
    └── training_pipeline.py

2. 训练脚本示例

下面是一个使用 sklearn 训练分类模型的完整示例,同时集成 MLflow:

# src/train.py
import mlflow
import mlflow.sklearn
from sklearn.datasets import load_iris
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score
import pandas as pd
import os

# 设置或获取实验
mlflow.set_tracking_uri("http://localhost:5000")
mlflow.set_experiment("iris_classification")

def train():
    with mlflow.start_run(run_name="random_forest_baseline") as run:
        # 参数记录
        params = {"n_estimators": 100, "max_depth": 5, "random_state": 42}
        mlflow.log_params(params)

        # 加载数据
        data = load_iris()
        X, y = data.data, data.target
        X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42)

        # 训练模型
        model = RandomForestClassifier(**params)
        model.fit(X_train, y_train)

        # 评估指标
        preds = model.predict(X_val)
        acc = accuracy_score(y_val, preds)
        f1 = f1_score(y_val, preds, average='macro')
        mlflow.log_metrics({"accuracy": acc, "f1_macro": f1})

        # 记录输入数据样本(用于后续监控基线)
        train_df = pd.DataFrame(X_train, columns=data.feature_names)
        train_df.to_csv("train_sample.csv", index=False)
        mlflow.log_artifact("train_sample.csv")

        # 注册模型到 MLflow Model Registry
        signature = mlflow.models.infer_signature(X_train, model.predict(X_train))
        mlflow.sklearn.log_model(model, "model", signature=signature)

        # 打印 run id 供后续部署使用
        print(f"Run ID: {run.info.run_id}")
        return run.info.run_id

if __name__ == "__main__":
    train()

运行脚本后,在 MLflow UI 中可看到本次运行的参数、指标和模型工件。点击模型可查看输入输出 schema。

3. 模型注册与版本管理

在 MLflow UI 的 Artifacts 部分点击 model,然后选择 Register Model,创建一个名为 iris_rf 的注册模型。每次运行后都可以注册新版本,并添加阶段标签(Staging、Production、Archived)。

通过 API 也可以实现自动化注册:

from mlflow.tracking import MlflowClient

client = MlflowClient()
model_uri = f"runs:/{run.info.run_id}/model"
result = mlflow.register_model(model_uri, "iris_rf")
client.transition_model_version_stage(
    name="iris_rf",
    version=result.version,
    stage="Staging"
)

这为 CI/CD 管道中的模型升级提供了清晰的审计轨迹。

4. 构建可重用的训练管道 (Pipeline)

将上述步骤封装为 Kubeflow Pipeline 组件,实现在 Kubernetes 上自动运行。以下为简化版示例,展示如何将训练步骤定义为轻量级 Python 函数:

# pipelines/training_pipeline.py
from kfp import dsl, compiler
from kfp.components import create_component_from_func

def load_and_preprocess():
    # 实际可从 Feature Store 拉取
    from sklearn.datasets import load_iris
    import pandas as pd
    data = load_iris()
    df = pd.DataFrame(data.data, columns=data.feature_names)
    df['target'] = data.target
    return df

def train_model(df, n_estimators: int, max_depth: int):
    import mlflow
    # ... 同 train.py 中的逻辑,使用传入参数
    # 返回 model_uri

@dsl.pipeline(name="iris-training-pipeline")
def training_pipeline(n_estimators: int = 100, max_depth: int = 5):
    load_op = create_component_from_func(load_and_preprocess)()
    train_op = create_component_from_func(train_model)(load_op.output, n_estimators, max_depth)

compiler.Compiler().compile(training_pipeline, 'iris_training_pipeline.yaml')

编译后的 YAML 文件可以直接上传到 Kubeflow 运行。对于不想引入 Kubernetes 的初学者,可以使用 Apache AirflowGitHub Actions 触发定时训练。


模型部署:从注册表到生产 API

部署是 MLOps 的关键环节,需要兼顾低延迟、弹性伸缩和策略灰度。我们采用两步走:使用 MLflow 直接加载模型构建 REST API,并用 Docker 容器化。

1. 通过 MLflow 加载模型构建服务

# serve_model.py
import mlflow
import mlflow.sklearn
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import numpy as np

app = FastAPI(title="Iris Classifier")

model_uri = "models:/iris_rf/Production"  # 加载 Production 阶段的模型版本
model = mlflow.sklearn.load_model(model_uri)

class InputData(BaseModel):
    sepal_length: float
    sepal_width: float
    petal_length: float
    petal_width: float

@app.post("/predict")
async def predict(data: InputData):
    try:
        input_array = np.array([[data.sepal_length, data.sepal_width, data.petal_length, data.petal_width]])
        prediction = model.predict(input_array).tolist()
        return {"class": prediction[0]}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/health")
def health():
    return {"status": "ok"}

也可以直接使用 mlflow models serve -m models:/iris_rf/Production -p 1234 启动内置服务,但自己包装 FastAPI 可获得更高的定制性。

2. 容器化与部署策略

编写 Dockerfile:

FROM python:3.9-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY serve_model.py .
EXPOSE 8000
CMD ["uvicorn", "serve_model:app", "--host", "0.0.0.0", "--port", "8000"]

requirements.txt 应包含 mlflow, fastapi, uvicorn, numpy, pandas 等。

构建镜像并运行:

docker build -t iris-classifier:latest .
docker run -p 8000:8000 iris-classifier:latest

对于生产环境,我们需要多阶段构建容器编排。以下是常见的部署策略:

策略 描述 实施工具
滚动更新 逐步替换旧版本 Pod Kubernetes Deployment
金丝雀发布 将小部分流量导到新版本 Istio, Linkerd, nginx traffic-split
蓝绿部署 同时运行两套环境,切换流量 Service Mesh
影子流量 复制真实请求到新版本但忽略其响应 Envoy route mirroring

在 Kubernetes 中,可创建 Deployment 和 Service,并通过 ConfigMap 注入模型 URI。进阶做法是使用 KServe,它直接支持 MLflow 模型格式,通过 InferenceService 资源一键拉起带有弹性扩缩的微服务。


模型监控与反馈循环

部署不是终点,监控数据漂移和模型精度衰减才能让 MLOps 形成闭环。我们采用 Evidently AI 生成监控报告。

1. 生成监控基线

在训练时,我们已经将训练数据样本保存为 Artifact (train_sample.csv)。从 MLflow 下载该文件作为参考数据:

import mlflow
client = mlflow.tracking.MlflowClient()
run_id = "你的训练 run_id"
local_path = client.download_artifacts(run_id, "train_sample.csv")
import pandas as pd
reference_data = pd.read_csv(local_path)

2. 实时推理数据抓取与漂移检测

在模型服务中,我们可以把每次请求的输入特征记录到日志或数据库中。以下展示一个简单的周期性监控脚本:

# monitor.py
from evidently.metrics import DataDriftTable, ClassificationQualityMetric
from evidently.report import Report
from sqlalchemy import create_engine
import pandas as pd
import mlflow

# 假设推理数据已存入 PostgreSQL 表 inference_logs
engine = create_engine("postgresql://user:pass@host/db")
current_data = pd.read_sql("SELECT * FROM inference_logs WHERE timestamp > NOW() - INTERVAL '1 DAY'", engine)

# 加载参考数据
ref = pd.read_csv("train_sample.csv")

report = Report(metrics=[DataDriftTable(), ClassificationQualityMetric()])
report.run(reference_data=ref, current_data=current_data, column_mapping=None)
report.save_html("monitoring_report.html")

# 上传报告到 MLflow 或发送邮件通知
with mlflow.start_run(run_name="monitoring"):
    mlflow.log_artifact("monitoring_report.html")

Evidently 会计算每个特征的分布差异,并给出漂移分数。我们可设置阈值(例如 PSI > 0.2 或 JS Divergence > 0.1)触发自动重训练流水线。

3. 系统健康监控

除了模型质量,还要监控:

  • 推理延迟(P50, P99)
  • 请求吞吐量
  • 错误率(HTTP 5xx)
  • 资源利用率(CPU/GPU)

使用 Prometheus + Grafana 方案:在 FastAPI 应用中嵌入 prometheus_fastapi_instrumentator 暴露指标,通过 Grafana 仪表盘可视化。配置告警规则,当延迟超过 200ms 或错误率超过 1% 时发送通知。


端到端自动化:构建 CI/CD 管道

为了将上述环节连成一体,我们可以使用 GitHub ActionsJenkins 实现以下逻辑:

  1. 当新数据到达或代码推送到仓库时,触发训练管道。
  2. 训练完成后,运行评估并与生产模型对比。
  3. 如果新模型指标更优,自动注册为 Production 阶段,并触发镜像构建与部署。
  4. 部署后启动自动化测试(冒烟测试)。
  5. 持续监控指标,一旦漂移超过阈值,回滚或触发重训练。

一个简化版的 GitHub Actions 工作流示例如下:

name: MLOps CI/CD
on:
  push:
    branches: [main]
  schedule:
    - cron: '0 3 * * 1' # 每周一凌晨3点执行

jobs:
  train:
    runs-on: ubuntu-latest
    steps:
      - name: Checkout
        uses: actions/checkout@v3
      - name: Set up Python
        uses: actions/setup-python@v4
        with:
          python-version: '3.9'
      - name: Install dependencies
        run: pip install -r requirements.txt
      - name: Run training
        run: python src/train.py
      - name: Evaluate and register
        run: python src/evaluate.py
      - name: Deploy if accepted
        run: ./deploy.sh

evaluate.py 脚本负责比较新旧模型表现,并调用 MLflow API 更新阶段,deploy.sh 则执行镜像构建、推送和 Rollout 命令。


总结与最佳实践

通过本教程,你从零开始搭建了一条可工作的 MLOps 流水线。回顾关键要点:

  • 一切皆版本化:代码、数据、模型、环境。
  • 自动化测试不仅是代码,还包括数据和模型验证
  • 从简单开始,逐步引入复杂性:先本地 MLflow + Docker 部署,再考虑 K8s 等编排系统。
  • 监控形成闭环:没有反馈流水线是不完整的。
  • 协作与文化:MLOps 不只是工具,更是数据科学家、ML 工程师和运维团队的合作模式。

推荐下一步学习

  • 尝试引入 Feast 特征存储,实现离线/在线特征一致性。
  • 探索 KServe 的高级功能:Transformer、Explainer 集成。
  • 使用 Metaflow 或 Flyte 简化工作流编排。
  • 学习 MLflow Tracking 的扩展及安全加固。

MLOps 是持续进化的工程实践,希望本教程能成为你构建可靠机器学习系统的起点。