训练流水线 - 从随机权重到智能
"训练通过三个阶段将随机权重转化为智能系统:预训练、微调和对齐。"
现代LLM不是单次端到端训练的。它们 undergo 多阶段流水线,每个阶段都建立在之前的基础上:预训练创建基础模型,SFT(监督微调)教授指令遵循,RLHF/DPO将模型与人类偏好对齐。本文档详细介绍每个阶段,包括算法、数据要求和生产级LLM训练的实现考虑。
训练阶段概览
阶段比较
| 阶段 | 数据 | 目标 | Token数 | 成本 | 结果 |
|---|---|---|---|---|---|
| 预训练 | 网络文本 | 下一个token预测 | 1T-3T | ~$2M | 基础模型 |
| SFT | 指令 | 指令遵循 | 10M-100M | ~$10K | 可聊天 |
| RLHF/DPO | 比较对 | 偏好对齐 | 1M-10M | ~$5K | 对齐行为 |
预训练
下一个Token预测目标
所有现代LLM的基本训练目标:
L = -sum_{t=1}^{T} log P(x_t | x_1, x_2, ..., x_{t-1})
对于token序列,模型学习在给定所有先前token的情况下最大化每个token的概率。这个简单的目标,在大规模应用时, enables 复杂推理、世界知识和语言能力的 emergence。
为什么下一个Token预测有效
下一个token预测的力量来自:
- 规模: 在万亿token上训练使模型接触到多样化模式
- 上下文: 预测下一个token需要理解完整上下文
- 压缩: 模型学习语言结构的高效表示
- 泛化: 学习的模式泛化到未见过的组合
数据处理流水线
import re
from typing import List, Tuple
from collections import defaultdict
class TextPreprocessor:
"""
为LLM训练预处理文本数据。
处理去重、质量过滤和隐私移除。
"""
def __init__(self, min_length: int = 128, max_length: int = 4096):
self.min_length = min_length
self.max_length = max_length
# 隐私移除模式
self.email_pattern = re.compile(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b')
self.phone_pattern = re.compile(r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b')
self.ssn_pattern = re.compile(r'\b\d{3}-\d{2}-\d{4}\b')
self.ip_pattern = re.compile(r'\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b')
def remove_pii(self, text: str) -> str:
"""移除个人身份信息。"""
text = self.email_pattern.sub('[EMAIL]', text)
text = self.phone_pattern.sub('[PHONE]', text)
text = self.ssn_pattern.sub('[SSN]', text)
text = self.ip_pattern.sub('[IP]', text)
return text
def check_quality(self, text: str) -> bool:
"""
基于启发式检查文本质量。
如果文本通过质量检查则返回True。
"""
# 长度检查
words = text.split()
if len(words) < self.min_length or len(words) > self.max_length:
return False
# 平均词长度(拒绝过短/过长词)
mean_word_len = sum(len(w) for w in words) / len(words)
if mean_word_len < 3 or mean_word_len > 10:
return False
# 特殊字符比例(太多=垃圾)
special_ratio = sum(1 for c in text if not c.isalnum() and not c.isspace()) / len(text)
if special_ratio > 0.3:
return False
# 重复检查(检测"aaaaaaa..."模式)
if len(set(words)) / len(words) < 0.2:
return False
return True
def deduplicate_by_ngram(self, texts: List[str], n: int = 13) -> List[str]:
"""
使用n-gram重叠移除近似重复。
使用MinHash类似方法提高效率。
"""
seen_ngrams = set()
unique_texts = []
for text in texts:
words = text.split()
if len(words) < n:
continue
# 采样n-gram
ngrams = [' '.join(words[i:i+n]) for i in range(0, len(words) - n, n)]
# 检查是否见过任何n-gram
if not any(ngram in seen_ngrams for ngram in ngrams[:5]):
seen_ngrams.update(ngrams)
unique_texts.append(text)
return unique_texts
# 用法
preprocessor = TextPreprocessor(min_length=128, max_length=4096)
# 处理一批文本
raw_texts = [
"这是一个示例文档,包含一些contact@example.com内容...",
"另一个文档质量问题" * 10, # 过于重复
]
clean_texts = []
for text in raw_texts:
text = preprocessor.remove_pii(text)
if preprocessor.check_quality(text):
clean_texts.append(text)
print(f"处理了 {len(clean_texts)}/{len(raw_texts)} 个文本")
课程学习策略
训练通过精心设计的课程阶段进行:
from enum import Enum
from dataclasses import dataclass
class TrainingStage(Enum):
"""预训练的课程学习阶段。"""
FOUNDATION = "foundation" # 高质量、多样化文本
KNOWLEDGE = "knowledge" # 专注于事实内容
REASONING = "reasoning" # 逻辑推理模式
SYNTHESIS = "synthesis" # 多步问题解决
@dataclass
class StageConfig:
"""训练阶段的配置。"""
stage: TrainingStage
data_proportion: float # 总数据的比例
learning_rate: float
batch_size: int
duration_steps: int
description: str
CURRICULUM = [
StageConfig(
stage=TrainingStage.FOUNDATION,
data_proportion=0.40,
learning_rate=3e-4,
batch_size=512,
duration_steps=400000,
description="构建基础语言理解"
),
StageConfig(
stage=TrainingStage.KNOWLEDGE,
data_proportion=0.30,
learning_rate=2e-4,
batch_size=512,
duration_steps=300000,
description="获取世界知识和事实"
),
StageConfig(
stage=TrainingStage.REASONING,
data_proportion=0.20,
learning_rate=1.5e-4,
batch_size=512,
duration_steps=200000,
description="开发推理能力"
),
StageConfig(
stage=TrainingStage.SYNTHESIS,
data_proportion=0.10,
learning_rate=1e-4,
batch_size=512,
duration_steps=100000,
description="集成技能以完成复杂任务"
),
]
def get_curriculum_lr(step: int, curriculum: list[StageConfig]) -> float:
"""基于课程阶段获取学习率。"""
total_steps = sum(c.duration_steps for c in curriculum)
current_step = step % total_steps
cumulative_steps = 0
for config in curriculum:
if cumulative_steps <= current_step < cumulative_steps + config.duration_steps:
return config.learning_rate
cumulative_steps += config.duration_steps
return curriculum[-1].learning_rate
Python实现
import torch
import torch.nn as nn
import torch.nn.functional as F
def compute_language_model_loss(logits: torch.Tensor, targets: torch.Tensor) -> torch.Tensor:
"""
计算语言建模的交叉熵损失。
Args:
logits: 模型输出,形状为 (batch, seq_len, vocab_size)
targets: 目标token ID,形状为 (batch, seq_len)
"""
# 展平用于交叉熵
batch_size, seq_len, vocab_size = logits.shape
logits_flat = logits.view(-1, vocab_size)
targets_flat = targets.view(-1)
# 计算交叉熵损失
loss = F.cross_entropy(logits_flat, targets_flat, ignore_index=-100)
return loss
# 示例
batch_size, seq_len, vocab_size = 2, 128, 50000
logits = torch.randn(batch_size, seq_len, vocab_size)
targets = torch.randint(0, vocab_size, (batch_size, seq_len))
loss = compute_language_model_loss(logits, targets)
print(f"损失: {loss.item():.4f}")
# 典型预训练损失: 2.0-4.0(训练前)
# 好模型收敛到 ~1.8-2.5
混合精度训练
混合精度训练使用FP16/BF16进行计算,同时保持FP32主权重:
import torch
from torch.cuda.amp import autocast, GradScaler
class MixedPrecisionTrainer:
"""
支持混合精度的训练器。
在可能的情况下使用BF16以获得更好的数值稳定性。
"""
def __init__(self, model, optimizer, device='cuda'):
self.model = model.to(device)
self.optimizer = optimizer
self.device = device
# 检查BF16支持
if torch.cuda.is_bf16_supported():
self.dtype = torch.bfloat16
print("使用BF16进行训练")
else:
self.dtype = torch.float16
self.scaler = GradScaler()
print("使用FP16和GradScaler进行训练")
def training_step(self, batch):
"""带混合精度的单步训练。"""
input_ids = batch['input_ids'].to(self.device)
attention_mask = batch['attention_mask'].to(self.device)
targets = input_ids[:, 1:].contiguous()
self.optimizer.zero_grad()
if self.dtype == torch.bfloat16:
# BF16不需要梯度缩放
with autocast(dtype=torch.bfloat16):
logits = self.model(input_ids[:, :-1], attention_mask[:, :-1])
loss = compute_language_model_loss(logits, targets)
loss.backward()
torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=1.0)
self.optimizer.step()
else:
# FP16带梯度缩放
with autocast(dtype=torch.float16):
logits = self.model(input_ids[:, :-1], attention_mask[:, :-1])
loss = compute_language_model_loss(logits, targets)
self.scaler.scale(loss).backward()
self.scaler.unscale_(self.optimizer)
torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=1.0)
self.scaler.step(self.optimizer)
self.scaler.update()
return loss.item()
FlashAttention集成
FlashAttention是一种内存高效的注意力机制,对大规模训练至关重要:
try:
from flash_attn import flash_attn_func
FLASH_ATTENTION_AVAILABLE = True
except ImportError:
FLASH_ATTENTION_AVAILABLE = False
print("FlashAttention不可用,使用标准注意力")
class FlashAttentionBlock(nn.Module):
"""
带FlashAttention的Transformer块。
内存高效:注意力复杂度从O(N^2)降到O(N)。
"""
def __init__(self, d_model: int, num_heads: int, d_ff: int, dropout: float = 0.1):
super().__init__()
self.num_heads = num_heads
self.head_dim = d_model // num_heads
self.qkv_proj = nn.Linear(d_model, 3 * d_model, bias=False)
self.out_proj = nn.Linear(d_model, d_model, bias=False)
self.ffn = nn.Sequential(
nn.Linear(d_model, d_ff),
nn.GELU(),
nn.Dropout(dropout),
nn.Linear(d_ff, d_model),
nn.Dropout(dropout)
)
self.norm1 = nn.LayerNorm(d_model)
self.norm2 = nn.LayerNorm(d_model)
def forward(self, x, attention_mask=None):
"""
Args:
x: (batch, seq_len, d_model)
attention_mask: (batch, seq_len) 或 None
"""
# 带FlashAttention的自注意力
residual = x
x = self.norm1(x)
if FLASH_ATTENTION_AVAILABLE:
# FlashAttention期望 (batch, seq_len, 3, heads, head_dim)
batch_size, seq_len, _ = x.shape
qkv = self.qkv_proj(x).reshape(batch_size, seq_len, 3, self.num_heads, self.head_dim)
q, k, v = qkv.unbind(dim=2)
# FlashAttention需要单独处理因果掩码
x = flash_attn_func(q, k, v, causal=True)
x = self.out_proj(x.reshape(batch_size, seq_len, -1))
else:
# 回退到标准注意力
qkv = self.qkv_proj(x)
q, k, v = qkv.chunk(3, dim=-1)
q = q.view(batch_size, seq_len, self.num_heads, self.head_dim).transpose(1, 2)
k = k.view(batch_size, seq_len, self.num_heads, self.head_dim).transpose(1, 2)
v = v.view(batch_size, seq_len, self.num_heads, self.head_dim).transpose(1, 2)
attn = (q @ k.transpose(-2, -1)) / (self.head_dim ** 0.5)
# 因果掩码
causal_mask = torch.triu(torch.ones(seq_len, seq_len), diagonal=1).bool()
attn = attn.masked_fill(causal_mask.to(x.device), float('-inf'))
attn = attn.softmax(dim=-1)
x = (attn @ v).transpose(1, 2).reshape(batch_size, seq_len, -1)
x = self.out_proj(x)
x = x + residual
# FFN
residual = x
x = self.norm2(x)
x = self.ffn(x) + residual
return x
高级优化器配置
from torch.optim import AdamW
from torch.optim.lr_scheduler import CosineAnnealingLR, LinearLR, SequentialLR
def create_optimizer_and_scheduler(model, config):
"""
创建带warmup的优化器和学习率调度器。
使用带cosine衰减和linear warmup的AdamW。
"""
# 为权重衰减分离参数
# 不要对bias、layer norm和embedding参数应用权重衰减
no_decay = ['bias', 'layer_norm.weight', 'lm_head.weight']
optimizer_grouped_parameters = [
{
'params': [p for n, p in model.named_parameters()
if not any(nd in n for nd in no_decay)],
'weight_decay': config.weight_decay,
},
{
'params': [p for n, p in model.named_parameters()
if any(nd in n for nd in no_decay)],
'weight_decay': 0.0,
},
]
optimizer = AdamW(
optimizer_grouped_parameters,
lr=config.learning_rate,
betas=(config.beta1, config.beta2),
eps=1e-8,
)
# Warmup调度器
warmup_scheduler = LinearLR(
optimizer,
start_factor=0.0,
end_factor=1.0,
total_iters=config.warmup_steps
)
# Cosine衰减调度器
cosine_scheduler = CosineAnnealingLR(
optimizer,
T_max=config.max_steps - config.warmup_steps,
eta_min=config.learning_rate * config.min_lr_ratio
)
# 顺序调度器:先warmup然后cosine衰减
scheduler = SequentialLR(
optimizer,
schedulers=[warmup_scheduler, cosine_scheduler],
milestones=[config.warmup_steps]
)
return optimizer, scheduler
# 用法
optimizer, scheduler = create_optimizer_and_scheduler(model, config)
for step, batch in enumerate(dataloader):
loss = pretrain_step(model, optimizer, config, batch)
scheduler.step()
if step % 100 == 0:
current_lr = scheduler.get_last_lr()[0]
print(f"步骤 {step}: 损失={loss:.4f}, lr={current_lr:.2e}")
缩放定律
Chinchilla缩放定律(Hoffmann等人,2022)建立了最优计算分配:
| 参数数量 | 最优训练Token数 | 计算(FLOPs) |
|---|---|---|
| 1B | 20B | 1.6e19 |
| 7B | 1.4T | 8.2e20 |
| 70B | 1.4T | 8.2e21 |
| 400B | 3T+ | 3e23 |
关键见解: 模型应该为每个参数训练约20个token以获得最佳性能。
缩放定律公式
L(N, D) = E + A/N^alpha + B/D^beta
其中:
L是损失N是参数数量D是数据量(token数)E,A,B,alpha,beta是拟合常数
def chinchilla_loss(params: float, tokens: float) -> float:
"""
近似Chinchilla缩放定律。
Args:
params: 参数数量(十亿)
tokens: 训练token数(万亿)
"""
E = 1.69 # 不可减少的损失
A = 406.4
B = 998.1
alpha = 0.34
beta = 0.28
loss = E + A / (params ** alpha) + B / (tokens ** beta)
return loss
# 为7B模型找最优数据
params_7b = 7
optimal_tokens_7b = 20 * params_7b # Chinchilla: 每个参数约20个token
loss_7b = chinchilla_loss(params_7b, optimal_tokens_7b / 1000)
print(f"7B最优token数: {optimal_tokens_7b}B, 损失: {loss_7b:.3f}")
训练配置示例
from dataclasses import dataclass
from typing import Optional
@dataclass
class PreTrainingConfig:
"""LLM预训练配置。"""
# 模型架构
d_model: int = 4096
num_heads: int = 32
num_layers: int = 32
d_ff: int = 10952 # SwiGLU的8/3 * d_model
# 训练超参数
batch_size: int = 512 # 全局批量大小
micro_batch_size: int = 4 # 每GPU批量大小
learning_rate: float = 3e-4
weight_decay: float = 0.1
beta1: float = 0.9
beta2: float = 0.95
# 学习率调度
warmup_steps: int = 2000
max_steps: int = 1000000
min_lr_ratio: float = 0.1
# 数据
vocab_size: int = 128000
max_seq_len: int = 4096
def get_lr(self, step: int) -> float:
"""带warmup的cosine学习率调度。"""
if step < self.warmup_steps:
return self.learning_rate * step / self.warmup_steps
progress = (step - self.warmup_steps) / (self.max_steps - self.warmup_steps)
cosine_decay = 0.5 * (1 + math.cos(math.pi * progress))
return self.min_lr_ratio * self.learning_rate + (1 - self.min_lr_ratio) * self.learning_rate * cosine_decay
# 训练循环骨架
def pretrain_step(model, optimizer, config, batch):
"""单步预训练。"""
input_ids = batch['input_ids'] # (batch, seq_len)
attention_mask = batch['attention_mask']
# 前向传播
logits = model(input_ids, attention_mask)
# 计算损失(用于下一个token预测的偏移)
shift_logits = logits[..., :-1, :].contiguous()
shift_labels = input_ids[..., 1:].contiguous()
loss = compute_language_model_loss(shift_logits, shift_labels)
# 反向传播
loss.backward()
# 梯度裁剪
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
# 优化器步骤
optimizer.step()
optimizer.zero_grad()
return loss.item()
Emergent能力
在规模下出现但未明确训练的能力:
| 能力 | 出现位置 | 描述 |
|---|---|---|
| 上下文学习 | ~10B+ | 从prompt中的例子学习 |
| 思维链 | ~30B+ | 多步推理 |
| 指令遵循 | ~7B+(带SFT) | 理解并遵循指示 |
| 代码生成 | ~7B+ | 编写和调试代码 |
| 多语言 | ~7B+ | 跨语言迁移 |
重要: Emergent能力不是保证的 - 它们取决于训练数据和架构选择。
监督微调 (SFT)
指令微调
SFT教 会基础模型遵循指令并适当格式化响应。
数据格式
SFT使用prompt-response对:
sft_data = [
{
"instruction": "用简单术语解释量子计算。",
"input": "",
"output": "量子计算就像..."
},
{
"instruction": "写一个Python函数来反转字符串。",
"input": "",
"output": "def reverse_string(s):\n return s[::-1]"
},
]
# 为训练格式化SFT示例
def format_sft_prompt(example, tokenizer):
"""
为训练格式化SFT示例。
使用chat模板格式。
"""
messages = [
{"role": "user", "content": example["instruction"] + " " + example["input"]},
{"role": "assistant", "content": example["output"]}
]
# 应用chat模板
prompt = tokenizer.apply_chat_template(
messages,
tokenize=False,
add_generation_prompt=False
)
return prompt
# Llama 3的示例prompt:
# <|begin_of_text|><|start_header_id|>user<|end_header_id|>\n\n解释量子计算...<|eot_id|><|start_header_id|>assistant<|end_header_id|>\n\n量子计算就像...<|eot_id|>