MLOps 实战:模型训练、部署与监控流水线
MLOps 机器学习运维体系实战教程
引言:为什么我们需要 MLOps?
在传统软件开发中,DevOps 已经将开发、测试与运维无缝连接,实现了持续集成与持续交付(CI/CD)。然而,机器学习项目面临独特的挑战:数据漂移、模型退化、实验可复现性、特征工程与模型训练的复杂依赖等。MLOps(Machine Learning Operations)正是为了解决这些问题而生,它将 DevOps 原则与机器学习生命周期结合,使模型从实验到生产的全流程自动化、可监控且可持续迭代。
本教程面向初学者,但包含可直接落地的实战内容。我们将构建一条完整的 MLOps 流水线,覆盖模型训练、部署与监控三大核心环节。
MLOps 成熟度与核心组件
在动手之前,先理解 MLOps 的层级架构:
- Level 0 (手动过程):脚本驱动,数据科学家手动执行每一步,完全没有自动化。
- Level 1 (ML 流水线自动化):自动化训练流水线,模型持续训练(CT),实验追踪,模型注册与版本控制。
- Level 2 (CI/CD 流水线):完整的自动化构建、测试、部署,包括数据与模型验证,一键回滚。
本节教程将聚焦于构建 Level 1~2 的混合流水线,涉及以下核心组件:
| 组件 | 功能 | 常用工具 |
|---|---|---|
| 数据与特征仓库 | 数据版本控制、特征定义与共享 | Feast, DVC |
| 实验追踪 | 记录参数、指标、模型文件 | MLflow, Weights & Biases |
| 模型注册 | 集中管理模型版本与阶段 | MLflow Model Registry, Neptune |
| 训练编排 | 自动化训练管道,资源调度 | Kubeflow Pipelines, Airflow |
| 模型服务 | 低延迟推理 API,部署策略 | BentoML, KServe, Seldon Core |
| 监控与告警 | 数据漂移、模型性能、系统健康 | Evidently, Prometheus + Grafana |
我们将以 MLflow 作为实验追踪和模型注册的核心,用 Kubeflow Pipelines 演示编排,并以 FastAPI + Docker 实现轻量级部署,最终加入 Evidently AI 进行生产监控。
实战环境准备
为了降低门槛,你可以选择在本地或云上搭建。以下为最小环境要求:
- Python 3.9+
- Docker & Docker Compose
- Minikube(或使用 Kind)用于 Kubeflow 演示
- Git
安装核心库:
pip install mlflow scikit-learn pandas numpy evidently fastapi uvicorn docker
启动 Mlflow Tracking Server(本地模式):
mlflow server --backend-store-uri sqlite:///mlflow.db --default-artifact-root ./mlruns --host 0.0.0.0 --port 5000
访问 http://localhost:5000 即可打开 MLflow UI。
模型训练流水线:从实验到可复现
1. 实验追踪与代码结构
一个稳健的训练脚本必须记录参数、指标、模型和环境。使用 MLflow 的自动日志(autolog)可以快速覆盖主流框架,但对于更精细的控制,推荐显式调用 API。
创建项目结构:
mlops-project/
├── data/
│ └── raw/
├── src/
│ ├── train.py
│ ├── evaluate.py
│ └── pipeline.py
├── docker/
└── pipelines/
└── training_pipeline.py
2. 训练脚本示例
下面是一个使用 sklearn 训练分类模型的完整示例,同时集成 MLflow:
# src/train.py
import mlflow
import mlflow.sklearn
from sklearn.datasets import load_iris
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score
import pandas as pd
import os
# 设置或获取实验
mlflow.set_tracking_uri("http://localhost:5000")
mlflow.set_experiment("iris_classification")
def train():
with mlflow.start_run(run_name="random_forest_baseline") as run:
# 参数记录
params = {"n_estimators": 100, "max_depth": 5, "random_state": 42}
mlflow.log_params(params)
# 加载数据
data = load_iris()
X, y = data.data, data.target
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42)
# 训练模型
model = RandomForestClassifier(**params)
model.fit(X_train, y_train)
# 评估指标
preds = model.predict(X_val)
acc = accuracy_score(y_val, preds)
f1 = f1_score(y_val, preds, average='macro')
mlflow.log_metrics({"accuracy": acc, "f1_macro": f1})
# 记录输入数据样本(用于后续监控基线)
train_df = pd.DataFrame(X_train, columns=data.feature_names)
train_df.to_csv("train_sample.csv", index=False)
mlflow.log_artifact("train_sample.csv")
# 注册模型到 MLflow Model Registry
signature = mlflow.models.infer_signature(X_train, model.predict(X_train))
mlflow.sklearn.log_model(model, "model", signature=signature)
# 打印 run id 供后续部署使用
print(f"Run ID: {run.info.run_id}")
return run.info.run_id
if __name__ == "__main__":
train()
运行脚本后,在 MLflow UI 中可看到本次运行的参数、指标和模型工件。点击模型可查看输入输出 schema。
3. 模型注册与版本管理
在 MLflow UI 的 Artifacts 部分点击 model,然后选择 Register Model,创建一个名为 iris_rf 的注册模型。每次运行后都可以注册新版本,并添加阶段标签(Staging、Production、Archived)。
通过 API 也可以实现自动化注册:
from mlflow.tracking import MlflowClient
client = MlflowClient()
model_uri = f"runs:/{run.info.run_id}/model"
result = mlflow.register_model(model_uri, "iris_rf")
client.transition_model_version_stage(
name="iris_rf",
version=result.version,
stage="Staging"
)
这为 CI/CD 管道中的模型升级提供了清晰的审计轨迹。
4. 构建可重用的训练管道 (Pipeline)
将上述步骤封装为 Kubeflow Pipeline 组件,实现在 Kubernetes 上自动运行。以下为简化版示例,展示如何将训练步骤定义为轻量级 Python 函数:
# pipelines/training_pipeline.py
from kfp import dsl, compiler
from kfp.components import create_component_from_func
def load_and_preprocess():
# 实际可从 Feature Store 拉取
from sklearn.datasets import load_iris
import pandas as pd
data = load_iris()
df = pd.DataFrame(data.data, columns=data.feature_names)
df['target'] = data.target
return df
def train_model(df, n_estimators: int, max_depth: int):
import mlflow
# ... 同 train.py 中的逻辑,使用传入参数
# 返回 model_uri
@dsl.pipeline(name="iris-training-pipeline")
def training_pipeline(n_estimators: int = 100, max_depth: int = 5):
load_op = create_component_from_func(load_and_preprocess)()
train_op = create_component_from_func(train_model)(load_op.output, n_estimators, max_depth)
compiler.Compiler().compile(training_pipeline, 'iris_training_pipeline.yaml')
编译后的 YAML 文件可以直接上传到 Kubeflow 运行。对于不想引入 Kubernetes 的初学者,可以使用 Apache Airflow 或 GitHub Actions 触发定时训练。
模型部署:从注册表到生产 API
部署是 MLOps 的关键环节,需要兼顾低延迟、弹性伸缩和策略灰度。我们采用两步走:使用 MLflow 直接加载模型构建 REST API,并用 Docker 容器化。
1. 通过 MLflow 加载模型构建服务
# serve_model.py
import mlflow
import mlflow.sklearn
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import numpy as np
app = FastAPI(title="Iris Classifier")
model_uri = "models:/iris_rf/Production" # 加载 Production 阶段的模型版本
model = mlflow.sklearn.load_model(model_uri)
class InputData(BaseModel):
sepal_length: float
sepal_width: float
petal_length: float
petal_width: float
@app.post("/predict")
async def predict(data: InputData):
try:
input_array = np.array([[data.sepal_length, data.sepal_width, data.petal_length, data.petal_width]])
prediction = model.predict(input_array).tolist()
return {"class": prediction[0]}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/health")
def health():
return {"status": "ok"}
也可以直接使用 mlflow models serve -m models:/iris_rf/Production -p 1234 启动内置服务,但自己包装 FastAPI 可获得更高的定制性。
2. 容器化与部署策略
编写 Dockerfile:
FROM python:3.9-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY serve_model.py .
EXPOSE 8000
CMD ["uvicorn", "serve_model:app", "--host", "0.0.0.0", "--port", "8000"]
requirements.txt 应包含 mlflow, fastapi, uvicorn, numpy, pandas 等。
构建镜像并运行:
docker build -t iris-classifier:latest .
docker run -p 8000:8000 iris-classifier:latest
对于生产环境,我们需要多阶段构建和容器编排。以下是常见的部署策略:
| 策略 | 描述 | 实施工具 |
|---|---|---|
| 滚动更新 | 逐步替换旧版本 Pod | Kubernetes Deployment |
| 金丝雀发布 | 将小部分流量导到新版本 | Istio, Linkerd, nginx traffic-split |
| 蓝绿部署 | 同时运行两套环境,切换流量 | Service Mesh |
| 影子流量 | 复制真实请求到新版本但忽略其响应 | Envoy route mirroring |
在 Kubernetes 中,可创建 Deployment 和 Service,并通过 ConfigMap 注入模型 URI。进阶做法是使用 KServe,它直接支持 MLflow 模型格式,通过 InferenceService 资源一键拉起带有弹性扩缩的微服务。
模型监控与反馈循环
部署不是终点,监控数据漂移和模型精度衰减才能让 MLOps 形成闭环。我们采用 Evidently AI 生成监控报告。
1. 生成监控基线
在训练时,我们已经将训练数据样本保存为 Artifact (train_sample.csv)。从 MLflow 下载该文件作为参考数据:
import mlflow
client = mlflow.tracking.MlflowClient()
run_id = "你的训练 run_id"
local_path = client.download_artifacts(run_id, "train_sample.csv")
import pandas as pd
reference_data = pd.read_csv(local_path)
2. 实时推理数据抓取与漂移检测
在模型服务中,我们可以把每次请求的输入特征记录到日志或数据库中。以下展示一个简单的周期性监控脚本:
# monitor.py
from evidently.metrics import DataDriftTable, ClassificationQualityMetric
from evidently.report import Report
from sqlalchemy import create_engine
import pandas as pd
import mlflow
# 假设推理数据已存入 PostgreSQL 表 inference_logs
engine = create_engine("postgresql://user:pass@host/db")
current_data = pd.read_sql("SELECT * FROM inference_logs WHERE timestamp > NOW() - INTERVAL '1 DAY'", engine)
# 加载参考数据
ref = pd.read_csv("train_sample.csv")
report = Report(metrics=[DataDriftTable(), ClassificationQualityMetric()])
report.run(reference_data=ref, current_data=current_data, column_mapping=None)
report.save_html("monitoring_report.html")
# 上传报告到 MLflow 或发送邮件通知
with mlflow.start_run(run_name="monitoring"):
mlflow.log_artifact("monitoring_report.html")
Evidently 会计算每个特征的分布差异,并给出漂移分数。我们可设置阈值(例如 PSI > 0.2 或 JS Divergence > 0.1)触发自动重训练流水线。
3. 系统健康监控
除了模型质量,还要监控:
- 推理延迟(P50, P99)
- 请求吞吐量
- 错误率(HTTP 5xx)
- 资源利用率(CPU/GPU)
使用 Prometheus + Grafana 方案:在 FastAPI 应用中嵌入 prometheus_fastapi_instrumentator 暴露指标,通过 Grafana 仪表盘可视化。配置告警规则,当延迟超过 200ms 或错误率超过 1% 时发送通知。
端到端自动化:构建 CI/CD 管道
为了将上述环节连成一体,我们可以使用 GitHub Actions 或 Jenkins 实现以下逻辑:
- 当新数据到达或代码推送到仓库时,触发训练管道。
- 训练完成后,运行评估并与生产模型对比。
- 如果新模型指标更优,自动注册为 Production 阶段,并触发镜像构建与部署。
- 部署后启动自动化测试(冒烟测试)。
- 持续监控指标,一旦漂移超过阈值,回滚或触发重训练。
一个简化版的 GitHub Actions 工作流示例如下:
name: MLOps CI/CD
on:
push:
branches: [main]
schedule:
- cron: '0 3 * * 1' # 每周一凌晨3点执行
jobs:
train:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.9'
- name: Install dependencies
run: pip install -r requirements.txt
- name: Run training
run: python src/train.py
- name: Evaluate and register
run: python src/evaluate.py
- name: Deploy if accepted
run: ./deploy.sh
evaluate.py 脚本负责比较新旧模型表现,并调用 MLflow API 更新阶段,deploy.sh 则执行镜像构建、推送和 Rollout 命令。
总结与最佳实践
通过本教程,你从零开始搭建了一条可工作的 MLOps 流水线。回顾关键要点:
- 一切皆版本化:代码、数据、模型、环境。
- 自动化测试不仅是代码,还包括数据和模型验证。
- 从简单开始,逐步引入复杂性:先本地 MLflow + Docker 部署,再考虑 K8s 等编排系统。
- 监控形成闭环:没有反馈流水线是不完整的。
- 协作与文化:MLOps 不只是工具,更是数据科学家、ML 工程师和运维团队的合作模式。
推荐下一步学习
- 尝试引入 Feast 特征存储,实现离线/在线特征一致性。
- 探索 KServe 的高级功能:Transformer、Explainer 集成。
- 使用 Metaflow 或 Flyte 简化工作流编排。
- 学习 MLflow Tracking 的扩展及安全加固。
MLOps 是持续进化的工程实践,希望本教程能成为你构建可靠机器学习系统的起点。