精确遗忘:重训练与分片缓存实现完全移除
FreeGuideOnline
最新
2026-06-27
python import hashlib import pickle from collections import defaultdict from pathlib import Path
class ShardManager: def init(self, num_shards, cache_dir): self.num_shards = num_shards self.cache_dir = Path(cache_dir) self.cache_dir.mkdir(parents=True, exist_ok=True)
def get_shard_id(self, sample_id):
# 确定性哈希分片
hash_val = int(hashlib.md5(str(sample_id).encode()).hexdigest(), 16)
return hash_val % self.num_shards
def load_shard_model(self, shard_id):
# 从缓存加载子模型及训练状态
cache_file = self.cache_dir / f"shard_{shard_id}.pkl"
if cache_file.exists():
with open(cache_file, 'rb') as f:
return pickle.load(f)
return None # 新分片,尚未训练
def save_shard_model(self, shard_id, model_state):
cache_file = self.cache_dir / f"shard_{shard_id}.pkl"
with open(cache_file, 'wb') as f:
pickle.dump(model_state, f)
#### 训练分片子模型(带缓存)
假设我们有一个简单的线性模型类 `LinearModel`,它包含 `weights`、`bias` 和 `optimizer_state`,并支持 `fit`、`partial_fit` 和 `predict` 方法。
```python
class ShardedTrainer:
def __init__(self, shard_manager, base_model_factory):
self.shard_manager = shard_manager
self.base_model_factory = base_model_factory
def train_on_dataset(self, data):
# data: list of (sample_id, x, y)
shard_data = defaultdict(list)
for sid, x, y in data:
shard_id = self.shard_manager.get_shard_id(sid)
shard_data[shard_id].append((x, y))
for shard_id, samples in shard_data.items():
model_state = self.shard_manager.load_shard_model(shard_id)
if model_state is None:
# 无缓存,从头训练
model = self.base_model_factory()
X, Y = zip(*samples)
model.fit(X, Y)
else:
# 从缓存热启动,增量训练或重新全量训练该分片(由于分片小,全量也可接受)
model = model_state['model']
# 仅用当前分片数据重训(因为分片内容可能已改变)
X, Y = zip(*samples)
model.fit(X, Y) # 重设参数完全重训该子模型
# 更新缓存
self.shard_manager.save_shard_model(shard_id, {'model': model})
处理遗忘请求
遗忘接口接收样本ID,将对应分片的数据从存储中删除(数据层删除),然后触发分片重训练。
class UnlearningService:
def __init__(self, shard_manager, trainer, data_store):
self.shard_manager = shard_manager
self.trainer = trainer
self.data_store = data_store
def forget(self, sample_id):
shard_id = self.shard_manager.get_shard_id(sample_id)
# 1. 从持久化存储中删除该样本(保证数据源头删除)
self.data_store.remove(sample_id, shard_id)
# 2. 获取该分片当前全部剩余数据
remaining_samples = self.data_store.get_all_samples_of_shard(shard_id)
# 3. 仅重训练受影响的分片
if remaining_samples:
self.trainer.train_on_shard(shard_id, remaining_samples)
else:
# 该分片变为空,可选择删除子模型或赋予默认行为
self.shard_manager.remove_shard_model(shard_id)
推理时聚合
预测时,需要聚合所有非空分片子模型的输出。简单实现为加权平均(权重可基于各分片样本量)。
class Predictor:
def __init__(self, shard_manager, data_store):
self.shard_manager = shard_manager
self.data_store = data_store
def predict(self, x):
total_samples = 0
weighted_sum = 0.0
for shard_id in range(self.shard_manager.num_shards):
model_state = self.shard_manager.load_shard_model(shard_id)
if model_state is None:
continue
model = model_state['model']
shard_size = self.data_store.get_shard_size(shard_id)
pred = model.predict(x)
weighted_sum += pred * shard_size
total_samples += shard_size
if total_samples == 0:
raise ValueError("No model available")
return weighted_sum / total_samples