FlashDecoding:长序列推理的并行解码加速

FreeGuideOnline 最新 2026-06-22

python import torch import torch.nn as nn from transformers import AutoModelForCausalLM, AutoTokenizer import torch.nn.functional as F


### 4.2 加载模型

```python
target_model_name = "gpt2-large"
draft_model_name = "distilgpt2"

target_model = AutoModelForCausalLM.from_pretrained(target_model_name).cuda().eval()
draft_model = AutoModelForCausalLM.from_pretrained(draft_model_name).cuda().eval()
tokenizer = AutoTokenizer.from_pretrained(target_model_name)
tokenizer.pad_token = tokenizer.eos_token

4.3 实现验证函数

验证需要计算草稿模型提出的每个 token 在被目标模型接受的概率。采用 Speculative Sampling 中的概率比较方式:对于位置 ( i ),如果 ( p_{targ}(x_i) \ge p_{draft}(x_i) ),则接受;否则以概率 ( \frac{p_{targ}(x_i)}{p_{draft}(x_i)} ) 接受,不接受时从这个位置重新采样。

def speculative_verify(target_model, draft_model, prefix_ids, draft_ids, k):
    """
    prefix_ids: [1, L] 原始上下文 token IDs
    draft_ids:  [k]     草稿模型生成的 k 个 token IDs
    k:          一次尝试步数
    返回: accepted_tokens(实际被接受的 token 列表)
    """
    # 拼接序列,一次性计算目标模型的 logits
    full_ids = torch.cat([prefix_ids, draft_ids.unsqueeze(0)], dim=-1)  # [1, L+k]
    with torch.no_grad():
        # 获取目标模型对每个新位置的预测
        target_logits = target_model(full_ids).logits  # [1, L+k, V]
        # 只关心草稿 token 所在位置,位置索引:L...L+k-1
        target_logits = target_logits[0, prefix_ids.shape[1]-1:prefix_ids.shape[1]-1+k, :]
        
        # 计算目标模型对这些 token 的概率
        target_probs = F.softmax(target_logits, dim=-1)
        target_token_probs = target_probs[torch.arange(k), draft_ids]  # [k]

        # 获取草稿模型的概率(可以在生成时记录,这里简化重新前向)
        draft_logits = draft_model(full_ids).logits
        draft_logits = draft_logits[0, prefix_ids.shape[1]-1:prefix_ids.shape[1]-1+k, :]
        draft_probs = F.softmax(draft_logits, dim=-1)
        draft_token_probs = draft_probs[torch.arange(k), draft_ids]  # [k]

    accepted = []
    for i in range(k):
        # 均匀随机数
        r = torch.rand(1).item()
        if r < min(1.0, (target_token_probs[i] / draft_token_probs[i]).item()):
            accepted.append(draft_ids[i].item())
        else:
            # 拒绝,从 (target_probs - draft_probs)_+ 归一化分布中重新采样
            corrected_probs = torch.clamp(target_probs[i] - draft_probs[i], min=0.0)
            corrected_probs /= corrected_probs.sum()
            # 采样替代 token
            next_token = torch.multinomial(corrected_probs, num_samples=1).item()
            accepted.append(next_token)
            break  # 一旦发生拒绝,停止接受后续所有草稿 token
    return accepted

4.4 主解码循环

def flash_decoding(target_model, draft_model, tokenizer, prompt, max_new_tokens=256, k=4):
    input_ids = tokenizer(prompt, return_tensors="pt").input_ids.cuda()  # [1, L_prompt]
    generated = input_ids.clone()
    while generated.shape[1] < input_ids.shape[1] + max_new_tokens:
        # 1. 草稿模型生成 k 个候选
        draft_input = generated.clone()
        draft_output = draft_model.generate(
            draft_input,
            max_new_tokens=k,
            do_sample=True,
            temperature=1.0,
            pad_token_id=tokenizer.eos_token_id,
            output_scores=False,
            return_dict_in_generate=True  # 用于获取 token ids
        )
        draft_new_ids = draft_output.sequences[0, generated.shape[1]:]  # [k]
        
        # 2. 验证并接受
        accepted = speculative_verify(target_model, draft_model, generated, draft_new_ids, k)
        
        # 3. 更新序列
        accepted_tensor = torch.tensor([accepted], device=generated.device)
        generated = torch.cat([generated, accepted_tensor], dim=-1)
        
        # 遇到结束符则停止
        if tokenizer.eos_token_id in accepted:
            break
            
    return tokenizer.decode(generated[0], skip_special_tokens=True)

4.5 使用示例

prompt = "In the year 2050, humanity had finally achieved interstellar travel. "
result = flash_decoding(target_model, draft_model, tokenizer, prompt, max_new_tokens=256, k=4)
print(result)