OpenFL:英特尔开源的灵活联邦学习框架

FreeGuideOnline 最新 2026-06-28

bash pip install openfl


这将安装 OpenFL 的核心库,包含 `aggregator` 和 `collaborator` 命令行入口。

### 带 PyTorch 后端的安装

如果你使用 PyTorch,建议同时安装对应依赖:

```bash
pip install openfl[pytorch]

对于 TensorFlow:

pip install openfl[tensorflow]

安装完成后,可通过以下命令验证:

fx --help

fx 是 OpenFL 提供的主命令行工具,用于工作空间管理、实验启动等。

创建工作空间

OpenFL 通过“工作空间”管理所有实验产物。使用以下命令创建一个新的工作空间:

fx workspace create --prefix my_fl_project --template torch_unet
  • --prefix 指定工作空间目录名前缀;
  • --template 基于内置模板快速生成项目骨架。常用模板有 torch_unettf_2d_unetkeras_cnn_mnist 等。

进入生成的工作空间目录,结构如下:

my_fl_project/
├── plan/               # 联邦计划存于此
│   └── plan.yaml
├── src/                # 自定义数据加载器、网络定义等
├── cert/               # 证书目录(用于安全通信)
└── works/              # 每个协作者的独立工作目录

构建你的第一个联邦训练计划

计划文件 plan.yaml 是 OpenFL 的配置中心。我们以 PyTorch 的图像分类为例,逐步拆解一个最小可行的计划。

定义全局参数

task_name: demo_torch_classification
aggregator:
  settings:
    rounds_to_train: 10
    log_metric_callback:
      template: src.pt_utils.write_metric
    best_state_path: save/best.pbuf

这里指定了训练轮次 rounds_to_train 和聚合器端用于记录指标的 Python 函数引用。

定义协作者参数

collaborator:
  settings:
    epochs_per_round: 2
    batch_size: 64
    opt_treatment: RESET

opt_treatment: RESET 表示每轮开始时优化器状态会被重置,这是联邦学习中的常见做法,能避免梯度方向延续上一轮的偏差。

定义数据加载器

OpenFL 要求协作者必须实现一个 get_data 函数,该函数接收数据路径和 collaborator rank,返回训练集和验证集的 DataLoader。

src/pt_utils.py 中编写:

def get_data(data_path, collaborator_count, collaborator_rank, **kwargs):
    # 假设数据已按 collaborator_rank 预先分区存放
    train_set = MyDataset(root=f"{data_path}/client_{collaborator_rank}/train")
    val_set = MyDataset(root=f"{data_path}/client_{collaborator_rank}/val")
    train_loader = DataLoader(train_set, batch_size=kwargs['batch_size'], shuffle=True)
    val_loader = DataLoader(val_set, batch_size=kwargs['batch_size'])
    return train_loader, val_loader

然后在计划中引用:

data_loader:
  template: src.pt_utils.MyDataLoader
  settings:
    collaborator_count: 2

定义模型、优化器和损失函数

OpenFL 计划允许你直接指定模型构建函数、优化器工厂和损失函数。这些函数将在协作者端被调用。

model:
  template: src.pt_utils.Net
  settings:
    num_classes: 10

optimizer:
  template: torch.optim.Adam
  settings:
    lr: 0.001

loss_fn:
  template: torch.nn.CrossEntropyLoss

这里的 template 均为 Python 可导入路径,OpenFL 会动态加载。

定义任务(Tasks)

任务是 OpenFL 执行流的核心。默认计划中内置了 trainlocally_tuned_model_validate 等任务。你必须实现 train 任务对应的逻辑。

src/pt_utils.py 中实现 train 函数:

def train(model, train_loader, optimizer, device, loss_fn, **kwargs):
    model.train()
    for data, target in train_loader:
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = loss_fn(output, target)
        loss.backward()
        optimizer.step()
    return model

协调计划中的任务定义:

tasks:
  train:
    function: src.pt_utils.train
    kwargs:
      device: cpu

这样,协作者就会在每个联邦轮中调用 train 函数进行本地训练。

启动联邦实验

单机模拟多协作者

对于开发和调试,可以在单台机器上用虚拟协作者模拟联邦学习。

  1. 初始化协调器证书和协作者
fx collaborator generate -n 2

这将在 works/ 下生成 collaborator0collaborator1 的工作目录。

  1. 准备数据:确保两个协作者目录下都有对应的分区数据。

  2. 启动聚合器

fx aggregator start
  1. 启动所有协作者(分别在两个终端中执行):
cd works/collaborator0 && fx collaborator start -n collaborator0
cd works/collaborator1 && fx collaborator start -n collaborator1

聚合器终端会输出每轮的训练进度和汇总指标,实验结束后模型保存于 save/best.pbuf

跨真实节点的部署

在生产环境中,聚合器和协作者可能位于不同机构的不同服务器上。核心区别在于通信地址的设置。

在计划中指定聚合器的监听地址:

network:
  scheme: grpc
  aggregator_address: 192.168.1.100:8080

然后为每个协作者配置其证书和聚合器信息。启动方式与单机类似,只需确保网络可达且证书正确。

自定义聚合算法与回调

OpenFL 的默认聚合算法是 FedAvg,但你完全可以定义自己的聚合逻辑。

自定义聚合函数

在聚合器端,你可以通过继承 Aggregator 类并重写 aggregate 方法来实现自定义聚合。

from openfl.aggregator import Aggregator

class MyAggregator(Aggregator):
    def aggregate(self, tasks, task_name, round_num):
        # tasks 包含了所有协作者上传的模型更新
        # 在这里实现你的加权聚合、鲁棒聚合等
        return aggregated_model

然后在聚合器启动脚本中指定你的类。

使用回调扩展行为

OpenFL 提供了丰富的回调钩子,你可以在计划中注册回调函数,在联邦训练的各个生命周期节点执行自定义操作,例如:

  • on_round_start/end:记录每一轮的开始和结束时间。
  • on_collaborator_task_completed:监控单个协作者的训练时长。

回调函数的签名固定,只需在计划中添加:

aggregator:
  settings:
    round_start_callback:
      template: src.callbacks.log_round_start