FlashDecoding:长序列推理的并行解码加速
FreeGuideOnline
最新
2026-06-22
python import torch import torch.nn as nn from transformers import AutoModelForCausalLM, AutoTokenizer import torch.nn.functional as F
### 4.2 加载模型
```python
target_model_name = "gpt2-large"
draft_model_name = "distilgpt2"
target_model = AutoModelForCausalLM.from_pretrained(target_model_name).cuda().eval()
draft_model = AutoModelForCausalLM.from_pretrained(draft_model_name).cuda().eval()
tokenizer = AutoTokenizer.from_pretrained(target_model_name)
tokenizer.pad_token = tokenizer.eos_token
4.3 实现验证函数
验证需要计算草稿模型提出的每个 token 在被目标模型接受的概率。采用 Speculative Sampling 中的概率比较方式:对于位置 ( i ),如果 ( p_{targ}(x_i) \ge p_{draft}(x_i) ),则接受;否则以概率 ( \frac{p_{targ}(x_i)}{p_{draft}(x_i)} ) 接受,不接受时从这个位置重新采样。
def speculative_verify(target_model, draft_model, prefix_ids, draft_ids, k):
"""
prefix_ids: [1, L] 原始上下文 token IDs
draft_ids: [k] 草稿模型生成的 k 个 token IDs
k: 一次尝试步数
返回: accepted_tokens(实际被接受的 token 列表)
"""
# 拼接序列,一次性计算目标模型的 logits
full_ids = torch.cat([prefix_ids, draft_ids.unsqueeze(0)], dim=-1) # [1, L+k]
with torch.no_grad():
# 获取目标模型对每个新位置的预测
target_logits = target_model(full_ids).logits # [1, L+k, V]
# 只关心草稿 token 所在位置,位置索引:L...L+k-1
target_logits = target_logits[0, prefix_ids.shape[1]-1:prefix_ids.shape[1]-1+k, :]
# 计算目标模型对这些 token 的概率
target_probs = F.softmax(target_logits, dim=-1)
target_token_probs = target_probs[torch.arange(k), draft_ids] # [k]
# 获取草稿模型的概率(可以在生成时记录,这里简化重新前向)
draft_logits = draft_model(full_ids).logits
draft_logits = draft_logits[0, prefix_ids.shape[1]-1:prefix_ids.shape[1]-1+k, :]
draft_probs = F.softmax(draft_logits, dim=-1)
draft_token_probs = draft_probs[torch.arange(k), draft_ids] # [k]
accepted = []
for i in range(k):
# 均匀随机数
r = torch.rand(1).item()
if r < min(1.0, (target_token_probs[i] / draft_token_probs[i]).item()):
accepted.append(draft_ids[i].item())
else:
# 拒绝,从 (target_probs - draft_probs)_+ 归一化分布中重新采样
corrected_probs = torch.clamp(target_probs[i] - draft_probs[i], min=0.0)
corrected_probs /= corrected_probs.sum()
# 采样替代 token
next_token = torch.multinomial(corrected_probs, num_samples=1).item()
accepted.append(next_token)
break # 一旦发生拒绝,停止接受后续所有草稿 token
return accepted
4.4 主解码循环
def flash_decoding(target_model, draft_model, tokenizer, prompt, max_new_tokens=256, k=4):
input_ids = tokenizer(prompt, return_tensors="pt").input_ids.cuda() # [1, L_prompt]
generated = input_ids.clone()
while generated.shape[1] < input_ids.shape[1] + max_new_tokens:
# 1. 草稿模型生成 k 个候选
draft_input = generated.clone()
draft_output = draft_model.generate(
draft_input,
max_new_tokens=k,
do_sample=True,
temperature=1.0,
pad_token_id=tokenizer.eos_token_id,
output_scores=False,
return_dict_in_generate=True # 用于获取 token ids
)
draft_new_ids = draft_output.sequences[0, generated.shape[1]:] # [k]
# 2. 验证并接受
accepted = speculative_verify(target_model, draft_model, generated, draft_new_ids, k)
# 3. 更新序列
accepted_tensor = torch.tensor([accepted], device=generated.device)
generated = torch.cat([generated, accepted_tensor], dim=-1)
# 遇到结束符则停止
if tokenizer.eos_token_id in accepted:
break
return tokenizer.decode(generated[0], skip_special_tokens=True)
4.5 使用示例
prompt = "In the year 2050, humanity had finally achieved interstellar travel. "
result = flash_decoding(target_model, draft_model, tokenizer, prompt, max_new_tokens=256, k=4)
print(result)