Triton 模型编排:多模型管道与业务逻辑集成

FreeGuideOnline 最新 2026-06-29

Triton 模型编排:多模型管道与业务逻辑集成

什么是模型编排?

模型编排是指将多个推理模型按特定顺序连接起来,形成一个端到端的推理管道,同时可以在管道节点之间插入自定义的业务逻辑。在 NVIDIA Triton Inference Server 中,编排能力让你能够将预处理、模型推理、后处理、条件判断甚至循环等步骤组合成一个统一的推理服务,客户端只需一次请求即可获得最终结果。

没有编排时,客户端需要依次调用多个独立模型,处理中间数据的转换与传输,这会导致更高的延迟和复杂的客户端代码。Triton 的编排机制将管道执行全部放在服务端,最大限度地减少网络往返,并利用 GPU 加速的中间张量传输实现高性能。

Triton 的两种编排方式

Triton 提供了两种互补的编排方法,适用于不同复杂度的场景。

Ensemble 模型

Ensemble(集成模型)是一种声明式编排方式。你通过一个 config.pbtxt 文件定义管道中各个模型之间的连接关系,Triton 会根据数据流图自动调度和执行。

  • 优点:配置简单,无需编写额外代码,Triton 会自动优化数据传递(例如使用 GPU 显存零拷贝),非常高效。
  • 适用场景:管道结构固定,没有复杂条件分支或循环,输入/输出张量直接匹配或仅需简单的形状调整。

一个典型的 Ensemble 配置文件如下:

name: "image_classification_pipeline"
platform: "ensemble"
max_batch_size: 8
input [
  {
    name: "raw_image"
    data_type: TYPE_UINT8
    dims: [ 3, 224, 224 ]
  }
]
output [
  {
    name: "top5_labels"
    data_type: TYPE_STRING
    dims: [ 5 ]
  }
]
ensemble_scheduling {
  step [
    {
      model_name: "image_preprocess"
      model_version: -1
      input_map { key: "input_image" value: "raw_image" }
      output_map { key: "preprocessed_output" value: "preprocessed_image" }
    },
    {
      model_name: "resnet50_classifier"
      model_version: -1
      input_map { key: "input" value: "preprocessed_image" }
      output_map { key: "output" value: "class_probabilities" }
    },
    {
      model_name: "top5_extractor"
      model_version: -1
      input_map { key: "probabilities" value: "class_probabilities" }
      output_map { key: "labels" value: "top5_labels" }
    }
  ]
}

每个步骤指定了一个模型,通过 input_mapoutput_map 映射张量名称,从而定义了数据流动路径。Triton 会根据输入依赖关系自动并行执行无依赖的步骤。

业务逻辑脚本 (BLS)

BLS(Business Logic Scripting)允许你在 Python 或 C++ 后端中编写自定义的执行逻辑,从而实现动态编排。你可以在一个模型推理过程中,通过 Python 代码向同一个 Triton 实例发送其他模型的推理请求,并获得结果后继续处理。

  • 优点:完全灵活,可以实现条件分支、循环、基于中间结果的动态模型选择,以及与数据库、文件系统等外部服务交互。
  • 适用场景:管道中包含复杂业务规则、需要根据输入数据动态调整推理流程、或集成非模型的自定义计算。

BLS 需要将你的模型后端配置为 Python,并在 model.py 中调用 tritonclient 或直接使用 pb_utils 进行子请求。

快速上手:构建一个简单的图像分类管道

我们以语音情感识别管道为例(可替换为你的实际场景),演示如何用 Ensemble 把预处理、推理、后处理三个模型串联起来。

1. 准备模型

假设已有三个模型:

  • audio_preprocess:输入原始音频波形,输出梅尔频谱图张量。
  • emotion_cnn:输入频谱图,输出情感类别概率。
  • label_mapping:输入概率向量,输出前 K 个情感标签(字符串)。

每个模型各自有 config.pbtxt,并放入模型仓库,如 /models/audio_preprocess/1/model.onnx 等。

2. 创建 Ensemble 配置

在模型仓库中新建目录 /models/emotion_pipeline/1/(版本目录可为空),并创建 /models/emotion_pipeline/config.pbtxt

name: "emotion_pipeline"
platform: "ensemble"
max_batch_size: 64
input [
  {
    name: "waveform"
    data_type: TYPE_FP32
    dims: [ -1 ]   # 可变长度波形
  }
]
output [
  {
    name: "emotions"
    data_type: TYPE_STRING
    dims: [ 3 ]
  }
]
ensemble_scheduling {
  step [
    {
      model_name: "audio_preprocess"
      model_version: -1
      input_map { key: "raw_audio" value: "waveform" }
      output_map { key: "mel_spec" value: "spec" }
    },
    {
      model_name: "emotion_cnn"
      model_version: -1
      input_map { key: "input_spec" value: "spec" }
      output_map { key: "probs" value: "probabilities" }
    },
    {
      model_name: "label_mapping"
      model_version: -1
      input_map { key: "prob_in" value: "probabilities" }
      output_map { key: "top_labels" value: "emotions" }
    }
  ]
}

3. 启动 Triton 并测试

启动 Triton 并加载所有模型后,管道模型 emotion_pipeline 将自动可见。你可以像调用普通模型一样调用它:

curl -X POST http://localhost:8000/v2/models/emotion_pipeline/infer \
  -H "Content-Type: application/json" \
  -d '{
    "inputs": [{
      "name": "waveform",
      "shape": [1, 16000],
      "datatype": "FP32",
      "data": [0.1, 0.2, ...]
    }],
    "outputs": [{"name": "emotions"}]
  }'

Triton 会在内部依次执行三个模型,最终返回情感标签,无需客户端介入中间步骤。

使用 Python 后端实现 BLS

当 Ensemble 无法满足条件逻辑时,使用 Python 后端的 BLS 是绝佳选择。以下示例展示如何根据图像分类的前一结果决定是否调用另一个模型。

步骤 1:设置 Python 模型

在模型仓库创建目录 /models/decision_pipeline/1/,包含 model.pyconfig.pbtxt

config.pbtxt

backend: "python"
input [
  {
    name: "input_data"
    data_type: TYPE_FP32
    dims: [ -1 ]
  }
]
output [
  {
    name: "output_result"
    data_type: TYPE_STRING
    dims: [ 1 ]
  }
]

步骤 2:编写 model.py 实现动态逻辑

import triton_python_backend_utils as pb_utils
import numpy as np

class TritonPythonModel:
    def execute(self, requests):
        responses = []
        for request in requests:
            # 获取输入张量
            in_tensor = pb_utils.get_input_tensor_by_name(request, "input_data")
            input_data = in_tensor.as_numpy()

            # 第一步:调用快速分类模型
            input_tensor0 = pb_utils.Tensor("input", input_data)
            infer_request = pb_utils.InferenceRequest(
                model_name="fast_classifier",
                requested_output_names=["class_id"],
                inputs=[input_tensor0])
            infer_response = infer_request.exec()
            class_id = pb_utils.get_output_tensor_by_name(
                infer_response, "class_id").as_numpy()[0]

            # 业务逻辑:如果类别ID大于100,使用精细模型
            if class_id > 100:
                detail_input = pb_utils.Tensor("detail_input", input_data)
                detail_request = pb_utils.InferenceRequest(
                    model_name="detailed_classifier",
                    requested_output_names=["detailed_result"],
                    inputs=[detail_input])
                detail_response = detail_request.exec()
                result = pb_utils.get_output_tensor_by_name(
                    detail_response, "detailed_result").as_numpy()
                output_string = f"Detailed: {result}"
            else:
                output_string = f"Fast class: {class_id}"

            # 构造输出
            out_tensor = pb_utils.Tensor(
                "output_result",
                np.array([output_string.encode()], dtype=np.object_))
            inference_response = pb_utils.InferenceResponse(
                output_tensors=[out_tensor])
            responses.append(inference_response)
        return responses

model.py 中,通过 pb_utils.InferenceRequest 向同一 Triton 服务器发起子请求,并同步等待结果。这让你可以使用 Python 全部的表达能力来实现串行、并行、循环等复杂逻辑。

步骤 3:部署和调用

decision_pipeline 放入模型仓库,重启 Triton。客户端可以直接请求这个模型,体验与普通模型完全一致,但其内部执行了动态决策。

高级编排:条件分支与循环

使用 BLS 可以轻松实现基于中间结果的多分支管道,甚至循环直到满足某个条件。

示例:迭代式图像超分 某些超分模型需要多次迭代才能达到目标分辨率。你可以在 Python 模型中循环调用同一个超分模型,直到输出尺寸足够。

desired_size = 1024
current = input_image
while current.shape[2] < desired_size:
    in_tensor = pb_utils.Tensor("lr_image", current)
    req = pb_utils.InferenceRequest(
        model_name="super_res_step",
        inputs=[in_tensor],
        requested_output_names=["sr_image"])
    resp = req.exec()
    current = pb_utils.get_output_tensor_by_name(resp, "sr_image").as_numpy()
# 最终 current 即为目标

这种控制在 Ensemble 中无法实现,因为 Ensemble 的图是静态的。

最佳实践与性能调优

  1. 优先使用 Ensemble 处理固定管道
    它比 BLS 具有更低的开销,因为 Triton 可以直接在 C++ 层面调度模型,并利用共享内存传递张量。仅在需要动态行为时使用 BLS。

  2. 为 BLS 模型启用动态批处理
    在 BLS 模型的 config.pbtxt 中添加 dynamic_batching {},可以让多个客户端请求合并执行,提高吞吐量,尤其当子请求耗时较长时效果显著。

  3. 避免在 BLS 循环中进行大量小请求
    如果循环内部每次只发一个小批量推理,可以考虑将循环展开或使用批量推理接口,减少调度开销。

  4. 监控和调试
    使用 Triton 的 Perf Analyzer 和 Metrics 端点查看管道各步骤的延迟和吞吐量,定位瓶颈。对于 BLS,可以在代码中加入日志,通过 Triton 的标准输出查看。

  5. 版本策略
    Ensemble 和 BLS 模型本身也可以有多个版本,通过 Triton 的版本策略无缝切换。升级管道中的某个子模型时,只需更新其版本,管道自动引用最新版本(如果 model_version: -1)。

  6. 安全性
    如果 BLS 需要访问外部资源(如数据库),注意配置网络策略和凭据管理,避免将敏感信息硬编码在模型文件中。

总结

Triton 的模型编排能力让你能够将复杂的推理工作流封装成一个可复用的高内聚服务。通过 Ensemble 模型 快速搭建静态管道,用 BLS 实现动态逻辑,你可以应对从简单预处理串联到高度复杂的自适应推理管道的各种场景。掌握这两项技术,你的推理服务将不再是孤立的模型,而是完整的智能应用核心。

现在就开始设计你的第一个 Triton 编排管道,将模型调用从客户端解放出来,享受服务端高效执行带来的性能与简洁性吧。