精确遗忘:重训练与分片缓存实现完全移除

FreeGuideOnline 最新 2026-06-27

python import hashlib import pickle from collections import defaultdict from pathlib import Path

class ShardManager: def init(self, num_shards, cache_dir): self.num_shards = num_shards self.cache_dir = Path(cache_dir) self.cache_dir.mkdir(parents=True, exist_ok=True)

def get_shard_id(self, sample_id):
    # 确定性哈希分片
    hash_val = int(hashlib.md5(str(sample_id).encode()).hexdigest(), 16)
    return hash_val % self.num_shards

def load_shard_model(self, shard_id):
    # 从缓存加载子模型及训练状态
    cache_file = self.cache_dir / f"shard_{shard_id}.pkl"
    if cache_file.exists():
        with open(cache_file, 'rb') as f:
            return pickle.load(f)
    return None    # 新分片,尚未训练

def save_shard_model(self, shard_id, model_state):
    cache_file = self.cache_dir / f"shard_{shard_id}.pkl"
    with open(cache_file, 'wb') as f:
        pickle.dump(model_state, f)

#### 训练分片子模型(带缓存)

假设我们有一个简单的线性模型类 `LinearModel`,它包含 `weights`、`bias` 和 `optimizer_state`,并支持 `fit`、`partial_fit` 和 `predict` 方法。

```python
class ShardedTrainer:
    def __init__(self, shard_manager, base_model_factory):
        self.shard_manager = shard_manager
        self.base_model_factory = base_model_factory

    def train_on_dataset(self, data):
        # data: list of (sample_id, x, y)
        shard_data = defaultdict(list)
        for sid, x, y in data:
            shard_id = self.shard_manager.get_shard_id(sid)
            shard_data[shard_id].append((x, y))

        for shard_id, samples in shard_data.items():
            model_state = self.shard_manager.load_shard_model(shard_id)
            if model_state is None:
                # 无缓存,从头训练
                model = self.base_model_factory()
                X, Y = zip(*samples)
                model.fit(X, Y)
            else:
                # 从缓存热启动,增量训练或重新全量训练该分片(由于分片小,全量也可接受)
                model = model_state['model']
                # 仅用当前分片数据重训(因为分片内容可能已改变)
                X, Y = zip(*samples)
                model.fit(X, Y)   # 重设参数完全重训该子模型
            # 更新缓存
            self.shard_manager.save_shard_model(shard_id, {'model': model})

处理遗忘请求

遗忘接口接收样本ID,将对应分片的数据从存储中删除(数据层删除),然后触发分片重训练。

class UnlearningService:
    def __init__(self, shard_manager, trainer, data_store):
        self.shard_manager = shard_manager
        self.trainer = trainer
        self.data_store = data_store

    def forget(self, sample_id):
        shard_id = self.shard_manager.get_shard_id(sample_id)
        # 1. 从持久化存储中删除该样本(保证数据源头删除)
        self.data_store.remove(sample_id, shard_id)
        # 2. 获取该分片当前全部剩余数据
        remaining_samples = self.data_store.get_all_samples_of_shard(shard_id)
        # 3. 仅重训练受影响的分片
        if remaining_samples:
            self.trainer.train_on_shard(shard_id, remaining_samples)
        else:
            # 该分片变为空,可选择删除子模型或赋予默认行为
            self.shard_manager.remove_shard_model(shard_id)

推理时聚合

预测时,需要聚合所有非空分片子模型的输出。简单实现为加权平均(权重可基于各分片样本量)。

class Predictor:
    def __init__(self, shard_manager, data_store):
        self.shard_manager = shard_manager
        self.data_store = data_store

    def predict(self, x):
        total_samples = 0
        weighted_sum = 0.0
        for shard_id in range(self.shard_manager.num_shards):
            model_state = self.shard_manager.load_shard_model(shard_id)
            if model_state is None:
                continue
            model = model_state['model']
            shard_size = self.data_store.get_shard_size(shard_id)
            pred = model.predict(x)
            weighted_sum += pred * shard_size
            total_samples += shard_size
        if total_samples == 0:
            raise ValueError("No model available")
        return weighted_sum / total_samples