Spot / 抢占实例训练:利用低价算力进行容错训练

FreeGuideOnline 最新 2026-06-28

python import requests

def is_spot_termination_notice(): resp = requests.get( "http://169.254.169.254/latest/meta-data/spot/instance-action", timeout=1 ) return resp.status_code == 200


### 3. 自动恢复与续训

训练启动脚本首先检查外部存储中是否存在最新检查点,若存在则自动加载恢复训练;若不存在则从头开始初始化。配合云厂商的**自动伸缩组或实例队列**,可以在中断后自动替换新实例,并执行相同的启动脚本。

## 实施步骤(以 PyTorch + AWS 为例)

### 步骤一:准备持久化存储

使用 **Amazon S3** 或 **EFS 文件系统**。本例使用 S3,通过 `s3fs` 或 `boto3` 上传文件。

```python
import boto3
s3 = boto3.client("s3")
bucket = "my-training-checkpoints"

步骤二:编写检查点保存与加载逻辑

def save_checkpoint(state, filename="checkpoint.pth.tar"):
    torch.save(state, filename)
    s3.upload_file(filename, bucket, f"checkpoints/{filename}")

def load_checkpoint(checkpoint_path):
    state = torch.load(checkpoint_path, map_location=device)
    model.load_state_dict(state['model'])
    optimizer.load_state_dict(state['optimizer'])
    scheduler.load_state_dict(state['scheduler'])
    start_epoch = state['epoch']
    global_step = state['global_step']
    return start_epoch, global_step

步骤三:信号处理与中断保管

import signal
import sys

def grace_exit(signum, frame):
    print("收到中断信号,保存检查点...")
    save_checkpoint({
        'epoch': epoch,
        'global_step': global_step,
        'model': model.state_dict(),
        'optimizer': optimizer.state_dict(),
        'scheduler': scheduler.state_dict(),
    }, f"interrupt_ep{epoch}_step{global_step}.pth.tar")
    sys.exit(0)

signal.signal(signal.SIGTERM, grace_exit)

在训练循环中也可配合轮询 Spot 中断通知(若平台不发送 SIGTERM):

if is_spot_termination_notice():
    grace_exit(None, None)

步骤四:启动脚本加入自动恢复

import os
import glob

s3_checkpoints = s3.list_objects_v2(Bucket=bucket, Prefix="checkpoints/")
if 'Contents' in s3_checkpoints:
    latest = max(obj['Key'] for obj in s3_checkpoints['Contents'])
    s3.download_file(bucket, latest, "resume.pth.tar")
    start_epoch, global_step = load_checkpoint("resume.pth.tar")
else:
    start_epoch, global_step = 0, 0