Kubeflow 机器学习流水线:K8s 上的 ML 编排

FreeGuideOnline 最新 2026-06-17

什么是 Kubeflow

Kubeflow 是一个专为 Kubernetes(K8s)设计的机器学习工具集,目标是让在 K8s 上部署、编排和管理机器学习工作流变得简单、可移植、可扩展。它最初由 Google 开源,现已成为云原生计算基金会(CNCF)的孵化项目。

核心思路:把机器学习流水线中的每一步(数据预处理、训练、评估、模型部署)都封装成可复用的容器化组件,然后像编排微服务一样在 Kubernetes 上执行。

为什么选择 Kubeflow

  • 可移植性:基于 Kubernetes,支持任何云平台(AWS、GCP、Azure)或本地集群。
  • 可重复性:流水线定义为代码,版本可控,实验可重现。
  • 扩展性:自动利用 K8s 的资源调度、弹性伸缩能力,处理大规模训练任务。
  • 生态集成:内置 Jupyter Notebook、Katib(超参调优)、KFServing(模型服务)等组件,覆盖 ML 全生命周期。

Kubeflow 核心组件速览

Kubeflow 不是一个单体应用,而是一组松散耦合的组件,部署在同一 K8s 命名空间中。初学者只需关注与流水线最相关的几个部分:

组件 功能
Kubeflow Pipelines 定义、执行和监控 ML 流水线
Katib 自动化超参数搜索和神经架构搜索
KFServing 高性能模型推理服务(支持金丝雀发布、解释等)
Notebooks 集成 Jupyter,可直接创建带 GPU 的 notebook 容器
Training Operators 专用于 TensorFlow、PyTorch、MXNet 等的分布式训练 job 控制器

Kubeflow Pipelines 内部结构

Kubeflow Pipelines(KFP)是 Kubeflow 的编排中枢,由下列组件构成:

  • Pipeline 定义:用 Python DSL 编写工作流(有 v1 与 v2 版本),描述步骤间的依赖关系。
  • Pipeline 编译器:将 Python 定义转化为 YAML(Argo Workflow 格式)。
  • Pipelines 后端:基于 Argo Workflows 引擎,负责在 K8s 上执行 DAG 任务。
  • Metadata Store:基于 ML Metadata(MLMD)记录实验、构件和执行信息。
  • UI 与 API:可视化流水线运行状态、比较多次实验的结果。

准备你的环境

开始之前需要有一个 Kubernetes 集群(v1.22 以上)及 kubectl 已配置。本地开发推荐以下轻量方式:

选项一:使用 Minikube(本地单节点)

minikube start --cpus 4 --memory 8192 --disk-size 30g

选项二:使用 Kind(Docker 内多节点)

kind create cluster --name kubeflow --config kind-config.yaml

安装 Kubeflow

官方提供不同平台的安装清单,最简单方式是使用 kfctl(或直接套用已打包的 manifests)。以 Kubeflow 1.8 为例,拉取一个针对通用 K8s 的 manifest:

# 下载 kubeflow manifests 仓库
git clone https://github.com/kubeflow/manifests.git
cd manifests

# 执行安装(稍等片刻)
while ! kustomize build example | kubectl apply -f -; do echo "Retrying..."; sleep 10; done

安装完成后,检查 kubeflow 命名空间中的 Pod 状态:

kubectl get pods -n kubeflow

一切就绪后,通过端口转发访问 Central Dashboard:

kubectl port-forward svc/istio-ingressgateway -n istio-system 8080:80

打开浏览器访问 http://localhost:8080,即可看到 Kubeflow 仪表板。

编写你的第一条流水线

安装 Python SDK

在本地开发机器上安装 KFP SDK(推荐使用 v2 版本):

pip install kfp

从最简单的“两步流水线”开始

我们将创建一个流水线:生成一个随机数 → 将数字加 10。这虽然简单,但能完整演示组件定义、流水线拼接、编译和提交的整个过程。

1. 定义组件

在 Kubeflow Pipelines v2 中,可以使用 @dsl.component 装饰器将普通 Python 函数转化为可运行的流水线组件:

from kfp import dsl
import random

@dsl.component
def generate_random_number() -> int:
    """生成 1~100 之间的随机整数"""
    num = random.randint(1, 100)
    print(f"Generated number: {num}")
    return num

@dsl.component
def add_ten(number: int) -> int:
    """将输入数字加 10"""
    result = number + 10
    print(f"Result after adding 10: {result}")
    return result

每个 @dsl.component 函数都会被自动打包成一个容器,并通过 K8s Pod 执行。函数参数和返回值会自动映射为组件输入输出。

2. 拼接流水线

使用 @dsl.pipeline 定义完整的工作流,描述组件之间的数据依赖:

@dsl.pipeline(name="my-first-pipeline")
def my_pipeline():
    gen_task = generate_random_number()
    add_task = add_ten(number=gen_task.output)

这里 gen_task.output 表示前一步生成的随机数,作为 add_ten 的输入。框架自动推导执行顺序。

3. 编译并提交

将流水线编译为 YAML 文件,然后通过 KFP 客户端提交:

from kfp import compiler
from kfp.client import Client

# 编译
compiler.Compiler().compile(my_pipeline, 'my_first_pipeline.yaml')

# 连接到 Kubeflow Pipelines 端点(请替换为实际地址)
client = Client(
    host='http://localhost:8888',  # pipeline-ui 的端口转发地址
)

# 创建一次运行
run = client.create_run_from_pipeline_package(
    'my_first_pipeline.yaml',
    arguments={},  # 本次运行不传额外参数
    experiment_name="demo-experiment",
)
print(f"Run submitted: {run.run_id}")

获取端点地址:在 Kubeflow Central Dashboard 中点击 “Pipelines”,URL 中的主机部分即为 API endpoint。你也可以 kubectl port-forward -n kubeflow svc/ml-pipeline-ui 8888:80 后使用 http://localhost:8888

流水线中的关键概念

组件输入与输出

组件间通过输出-输入建立依赖及传值。Kubeflow Pipelines v2 支持将小数据直接传递,对于大数据集,会自动暂存到 MinIO 或 S3 对象存储中。

可以在组件中声明带类型的输入输出:

@dsl.component
def train_model(dataset_uri: str, learning_rate: float) -> str:
    """训练模型,返回模型 URI"""
    # ... 训练逻辑
    return model_uri

流水线参数

用户可在提交运行时传入参数,实现流水线复用:

@dsl.pipeline(name="parameterized-pipeline")
def training_pipeline(learning_rate: float = 0.01, epochs: int = 10):
    train_task = train_model(
        dataset_uri="s3://my-bucket/data.csv",
        learning_rate=learning_rate,
    )

在 UI 或通过 API 调用时可动态设置 learning_rateepochs

使用 Artifacts(构件)

当组件产生较大或需要追踪的输出(如模型文件、评估图表)时,应定义为 Artifact。组件会将 Artifact 序列化并交由 ML Metadata 存储,以便在 UI 中可视化。

from kfp.dsl import Output, Artifact

@dsl.component
def evaluate_model(model_uri: str, metrics: Output[Artifact]):
    # 计算准确率等指标,并写入 metrics.path
    import json
    results = {"accuracy": 0.93}
    with open(metrics.path, "w") as f:
        json.dump(results, f)

在 UI 的运行详情中,可以直接点击查看生成的构件文件。

进阶流水线设计

条件控制与循环

Kubeflow Pipelines v2 支持使用 dsl.Conditiondsl.ParallelFor 进行分支和循环控制:

@dsl.pipeline
def conditional_pipeline(deploy: bool = False):
    train_op = train_model(...)
    with dsl.Condition(deploy == True):
        deploy_op = deploy_model(model=train_op.outputs['model'])

循环可用于超参批量试验或并行数据预处理:

with dsl.ParallelFor(items=[0.001, 0.01, 0.1]) as lr:
    train_op = train_model(learning_rate=lr)

这些控制结构会在编译时转换为 Argo Workflows 的 DAG 条件与循环。

复用与模块化

通过将常用的组件封装成函数或独立模块,可以构建可复用的流水线库。也支持直接导入 YAML 组件或使用预置的 Google Cloud Pipeline Components。

集成 Katib 自动调参

在流水线中触发 Katib 实验,而不是手动写循环:

from kfp import components as comp

# 使用 Katib 提供的组件
katib_op = comp.load_component_from_url(
    'https://raw.githubusercontent.com/kubeflow/pipelines/master/components/katib/experiment/component.yaml'
)

然后将 Katib 实验作为一个步骤嵌入流水线,待超参搜索完成后,下游步骤使用最优参数继续训练。

模型部署

流水线后期可通过 KFServing 将模型部署为 REST 推理服务:

kfserving_op = comp.load_component_from_url(
    'https://raw.githubusercontent.com/kubeflow/pipelines/master/components/kubeflow/kfserving/component.yaml'
)

传入模型 URI 和框架名称,组件会创建对应的 InferenceService。

监控、调试与实验管理

使用 UI 管理实验

在 Pipelines 仪表板上,你可以:

  • 创建 Experiment:将相关运行归类,对比参数和指标。
  • 运行列表:查看每次运行的状态、耗时、各步骤日志。
  • 可视化构件:对比多次运行输出的准确率曲线、混淆矩阵等(通过 TensorBoard 等集成)。
  • 克隆与重跑:一键复制参数并重新执行。

查看任务日志

点击任一运行中的具体步骤,即可查看该容器输出的标准输出与标准错误。如果任务失败,可以从这里快速定位堆栈错误。也可直接通过 kubectl:

kubectl logs <pod-name> -n kubeflow-user-example-com

缓存与加速

Kubeflow Pipelines 支持 execution caching,如果某步的输入与之前某次成功执行完全一致,可直接复用之前的结果,极大缩短反复实验时间。在组件上通过 @dsl.component(base_image=..., caching=True) 显式启用。

常见问题与最佳实践

本地开发先测试组件

在将组件放入流水线前,可以单独作为 Python 脚本手动运行,保证逻辑正确。KFP 组件本质上就是 Python 函数 + 容器镜像。

理解 Pod 资源限制

对于训练任务,务必在组件中设置资源请求(内存和 GPU/CPU):

@dsl.component
def gpu_training():
    pass

gpu_training = gpu_training.set_cpu_limit('4').set_memory_limit('16G').add_node_selector_constraint(
    'cloud.google.com/gke-accelerator', 'nvidia-tesla-t4'
).set_gpu_limit(1)

使用 Secrets 管理凭据

不要将敏感信息(API密钥、数据集密码)硬编码在流水线中。可通过 K8s Secrets 挂载为环境变量或卷。

版本兼容性

务必保持 kfp 客户端 SDK 版本与集群中部署的 KFP 后端版本匹配(尤其 v1 vs v2)。建议使用与集群相同大版本的 SDK。

总结

Kubeflow Pipelines 将机器学习工作流视为一等代码,让你可以在 Kubernetes 上可靠地定义、执行和复现从数据处理到模型部署的整个流程。通过组件化设计、强大的 DAG 控制和深度 K8s 集成,它有效地解决了 ML 工程中的可重复性、可扩展性和协作难题。

下一步建议:

  • 跟着官方示例,用真实数据集构建训练流水线。
  • 尝试加入 Katib 自动调参。
  • 探索用 KServe 部署模型并测试端到端链路。

掌握 Kubeflow 不仅意味着学会一个工具,更是迈向生产级机器学习工程的坚实一步。