Autoformer:用自相关替代自注意力的时序分解模型

FreeGuideOnline 最新 2026-06-21

python import torch import torch.nn as nn import torch.fft

class AutoCorrelation(nn.Module): def init(self, d_model, n_heads, factor=1): super().init() self.n_heads = n_heads self.d_k = d_model // n_heads self.factor = factor # 控制选择top-k的比例

    self.wq = nn.Linear(d_model, d_model)
    self.wk = nn.Linear(d_model, d_model)
    self.wv = nn.Linear(d_model, d_model)
    self.out_proj = nn.Linear(d_model, d_model)

def time_delay_agg(self, q, k, v):
    # q, k, v: [B, H, L, D_k]
    B, H, L, D = q.shape
    
    # 1. FFT加速计算自相关
    q_fft = torch.fft.rfft(q.float(), dim=2)  # 实数FFT
    k_fft = torch.fft.rfft(k.float(), dim=2)
    res = q_fft * torch.conj(k_fft)            # 频域相乘
    corr = torch.fft.irfft(res, n=L, dim=2)   # [B, H, L, D] 自相关值
    
    # 2. 选择top-k延迟
    top_k = int(self.factor * torch.log(torch.tensor(L)).item())
    # 对每个head,平均所有D维得到总体相关性,选出最大的k个延迟
    weights = torch.mean(corr, dim=-1)         # [B, H, L]
    top_weights, top_indices = torch.topk(weights, top_k, dim=-1) # [B, H, k]
    
    # 3. 时延聚合
    v_rolled = []   # 存储不同延迟滚动的v
    for i in range(top_k):
        tau = top_indices[:,:,i:i+1]           # [B, H, 1]
        # 对每个批次和头,将v沿时间轴滚动 tau 步(对齐)
        # 需要实现针对批次和头的滚动,这里简化为循环处理(实际可用gather优化)
        v_roll = torch.zeros_like(v)
        for b in range(B):
            for h in range(H):
                shift = tau[b, h].item()
                v_roll[b, h] = torch.roll(v[b, h], shifts=shift, dims=0)
        v_rolled.append(v_roll)
        
    v_stack = torch.stack(v_rolled, dim=0)     # [k, B, H, L, D]
    # 聚合:用softmax归一化的权重加权
    top_weights = torch.softmax(top_weights, dim=-1)  # [B, H, k]
    # 调整形状以便乘法
    top_weights = top_weights.permute(2,0,1,2) # [k, B, H, L] -> 增加一维用于广播
    agg = torch.sum(v_stack * top_weights.unsqueeze(-1), dim=0) # [B, H, L, D]
    return agg

def forward(self, x):
    B, L, C = x.shape
    H = self.n_heads
    q = self.wq(x).view(B, L, H, self.d_k).transpose(1,2)
    k = self.wk(x).view(B, L, H, self.d_k).transpose(1,2)
    v = self.wv(x).view(B, L, H, self.d_k).transpose(1,2)
    
    out = self.time_delay_agg(q, k, v)
    out = out.transpose(1,2).contiguous().view(B, L, C)
    return self.out_proj(out)