OpenFL:英特尔开源的灵活联邦学习框架
bash pip install openfl
这将安装 OpenFL 的核心库,包含 `aggregator` 和 `collaborator` 命令行入口。
### 带 PyTorch 后端的安装
如果你使用 PyTorch,建议同时安装对应依赖:
```bash
pip install openfl[pytorch]
对于 TensorFlow:
pip install openfl[tensorflow]
安装完成后,可通过以下命令验证:
fx --help
fx 是 OpenFL 提供的主命令行工具,用于工作空间管理、实验启动等。
创建工作空间
OpenFL 通过“工作空间”管理所有实验产物。使用以下命令创建一个新的工作空间:
fx workspace create --prefix my_fl_project --template torch_unet
--prefix指定工作空间目录名前缀;--template基于内置模板快速生成项目骨架。常用模板有torch_unet、tf_2d_unet、keras_cnn_mnist等。
进入生成的工作空间目录,结构如下:
my_fl_project/
├── plan/ # 联邦计划存于此
│ └── plan.yaml
├── src/ # 自定义数据加载器、网络定义等
├── cert/ # 证书目录(用于安全通信)
└── works/ # 每个协作者的独立工作目录
构建你的第一个联邦训练计划
计划文件 plan.yaml 是 OpenFL 的配置中心。我们以 PyTorch 的图像分类为例,逐步拆解一个最小可行的计划。
定义全局参数
task_name: demo_torch_classification
aggregator:
settings:
rounds_to_train: 10
log_metric_callback:
template: src.pt_utils.write_metric
best_state_path: save/best.pbuf
这里指定了训练轮次 rounds_to_train 和聚合器端用于记录指标的 Python 函数引用。
定义协作者参数
collaborator:
settings:
epochs_per_round: 2
batch_size: 64
opt_treatment: RESET
opt_treatment: RESET 表示每轮开始时优化器状态会被重置,这是联邦学习中的常见做法,能避免梯度方向延续上一轮的偏差。
定义数据加载器
OpenFL 要求协作者必须实现一个 get_data 函数,该函数接收数据路径和 collaborator rank,返回训练集和验证集的 DataLoader。
在 src/pt_utils.py 中编写:
def get_data(data_path, collaborator_count, collaborator_rank, **kwargs):
# 假设数据已按 collaborator_rank 预先分区存放
train_set = MyDataset(root=f"{data_path}/client_{collaborator_rank}/train")
val_set = MyDataset(root=f"{data_path}/client_{collaborator_rank}/val")
train_loader = DataLoader(train_set, batch_size=kwargs['batch_size'], shuffle=True)
val_loader = DataLoader(val_set, batch_size=kwargs['batch_size'])
return train_loader, val_loader
然后在计划中引用:
data_loader:
template: src.pt_utils.MyDataLoader
settings:
collaborator_count: 2
定义模型、优化器和损失函数
OpenFL 计划允许你直接指定模型构建函数、优化器工厂和损失函数。这些函数将在协作者端被调用。
model:
template: src.pt_utils.Net
settings:
num_classes: 10
optimizer:
template: torch.optim.Adam
settings:
lr: 0.001
loss_fn:
template: torch.nn.CrossEntropyLoss
这里的 template 均为 Python 可导入路径,OpenFL 会动态加载。
定义任务(Tasks)
任务是 OpenFL 执行流的核心。默认计划中内置了 train、locally_tuned_model_validate 等任务。你必须实现 train 任务对应的逻辑。
在 src/pt_utils.py 中实现 train 函数:
def train(model, train_loader, optimizer, device, loss_fn, **kwargs):
model.train()
for data, target in train_loader:
data, target = data.to(device), target.to(device)
optimizer.zero_grad()
output = model(data)
loss = loss_fn(output, target)
loss.backward()
optimizer.step()
return model
协调计划中的任务定义:
tasks:
train:
function: src.pt_utils.train
kwargs:
device: cpu
这样,协作者就会在每个联邦轮中调用 train 函数进行本地训练。
启动联邦实验
单机模拟多协作者
对于开发和调试,可以在单台机器上用虚拟协作者模拟联邦学习。
- 初始化协调器证书和协作者:
fx collaborator generate -n 2
这将在 works/ 下生成 collaborator0 和 collaborator1 的工作目录。
-
准备数据:确保两个协作者目录下都有对应的分区数据。
-
启动聚合器:
fx aggregator start
- 启动所有协作者(分别在两个终端中执行):
cd works/collaborator0 && fx collaborator start -n collaborator0
cd works/collaborator1 && fx collaborator start -n collaborator1
聚合器终端会输出每轮的训练进度和汇总指标,实验结束后模型保存于 save/best.pbuf。
跨真实节点的部署
在生产环境中,聚合器和协作者可能位于不同机构的不同服务器上。核心区别在于通信地址的设置。
在计划中指定聚合器的监听地址:
network:
scheme: grpc
aggregator_address: 192.168.1.100:8080
然后为每个协作者配置其证书和聚合器信息。启动方式与单机类似,只需确保网络可达且证书正确。
自定义聚合算法与回调
OpenFL 的默认聚合算法是 FedAvg,但你完全可以定义自己的聚合逻辑。
自定义聚合函数
在聚合器端,你可以通过继承 Aggregator 类并重写 aggregate 方法来实现自定义聚合。
from openfl.aggregator import Aggregator
class MyAggregator(Aggregator):
def aggregate(self, tasks, task_name, round_num):
# tasks 包含了所有协作者上传的模型更新
# 在这里实现你的加权聚合、鲁棒聚合等
return aggregated_model
然后在聚合器启动脚本中指定你的类。
使用回调扩展行为
OpenFL 提供了丰富的回调钩子,你可以在计划中注册回调函数,在联邦训练的各个生命周期节点执行自定义操作,例如:
- on_round_start/end:记录每一轮的开始和结束时间。
- on_collaborator_task_completed:监控单个协作者的训练时长。
回调函数的签名固定,只需在计划中添加:
aggregator:
settings:
round_start_callback:
template: src.callbacks.log_round_start