10.6 LLM流式视角:流式生成与增量推理
核心问题:LLM时代的流式算法有什么新挑战?
开场问题:LLM的流式生成
想象你在使用ChatGPT,输入问题后:
你:请解释什么是流式算法?
ChatGPT:流式算法是一种处理无限数据流的算法...
流式|算法|是一种|处理|无限|数据|流|的|算法|...
Token逐个生成,用户边看边读困惑:LLM为什么会"逐Token生成"?这不是"慢"吗?
LLM生成的本质
关键洞察:LLM的生成本质上是一个流式过程。
输入:前缀(用户的问题)
生成:Token1 → Token2 → Token3 → ...
每个Token的生成:
输入:前缀 + 已生成的Token
输出:下一个Token的概率分布
选择:从分布中采样一个Token
约束:
上下文窗口有限 → 无法存储无限历史
只能看已生成的Token → 单遍扫描
生成的Token不可撤销 → 在线决策LLM的流式本质
Token级生成的流式特性
时间 →
T0: 输入前缀 + Token1 → Token1
T1: 输入前缀 + Token1 + Token2 → Token2
T2: 输入前缀 + Token1 + Token2 + Token3 → Token3
...
每次生成:
只计算最新Token的输出
已计算的Token(KV Cache)被缓存流式约束的映射
| 传统流式约束 | LLM流式约束 |
|---|---|
| 单遍扫描 | 只能看已生成的Token |
| 内存有限 | 上下文窗口有限 |
| 实时响应 | 用户期望即时反馈 |
| 不可撤销 | Token生成后不可删除 |
关键洞察:上下文窗口 = 流式算法的"内存约束"。
流式上下文处理
问题:上下文窗口有限,如何处理无限输入?
LLM上下文窗口:例如 4096 Tokens
输入文档:可能 10000 Tokens
问题:
- 无法一次处理整个文档
- 如何处理超过窗口的内容?方案对比
| 方案 | 思想 | 优点 | 缺点 |
|---|---|---|---|
| 截断 | 只保留最近的上下文 | 简单实现 | 丢失早期信息 |
| 摘要 | 压缩早期上下文 | 保留关键信息 | 摘要质量依赖模型 |
| 滑动窗口 | 类似流式算法的滑动窗口 | 自然过渡 | 窗口大小权衡 |
| 分块处理 | 分多次处理,汇总 | 可处理长文档 | 需要合并策略 |
滑动窗口注意力
python
class SlidingWindowAttention:
"""滑动窗口注意力机制"""
def __init__(self, window_size=512):
self.W = window_size
def forward(self, query, key, value):
"""只关注最近W个Token"""
L = query.shape[1] # 序列长度
# 创建滑动窗口掩码
mask = torch.zeros(L, L)
for i in range(L):
for j in range(L):
if j < i - self.W or j > i: # 窗口外
mask[i, j] = -inf # 掩盖
# 缩放点积注意力
scores = (query @ key.transpose(-2, -1)) / sqrt(d_k)
scores = scores + mask # 应用掩码
return softmax(scores) @ value复杂度:O(L · W) 而非 O(L²)
类比:这正是DGIM算法的滑动窗口思想!
KV Cache:增量推理的摘要结构
什么是KV Cache?
传统注意力计算:
每次生成新Token,都要重新计算所有Token的Key和Value
KV Cache:
缓存已计算Token的Key和Value
新Token只需计算自己的KV,然后与缓存的KV一起做注意力KV Cache的流式性质
python
class StreamingLLM:
"""流式LLM推理示意"""
def __init__(self, model):
self.model = model
self.kv_cache = {} # KV Cache
def stream_generate(self, prompt, max_tokens=100):
# 编码prompt(预填充)
input_ids = self.model.encode(prompt)
# 预填充:可以并行处理prompt
past_key_values = None
for token in input_ids:
past_key_values = self.model.forward(
token, past_key_values, use_cache=True
)
# 流式生成
for _ in range(max_tokens):
# 只计算最新Token
next_token, past_key_values = self.model.forward(
past_key_values=past_key_values,
use_cache=True
)
yield self.model.decode(next_token)KV Cache的空间复杂度
KV Cache大小:O(L · d)
L: 序列长度
d: 隐藏维度
问题:
序列长度L增长 → KV Cache增长
上下文窗口限制 → KV Cache不能无限增长类比:KV Cache正是流式算法的"摘要结构"!
前沿流式技术(P1改进)
1. KV Cache的滑动窗口管理
问题:KV Cache增长超过内存限制。
解决方案:滑动窗口管理KV Cache。
python
class SlidingKVCache:
"""滑动窗口KV Cache"""
def __init__(self, window_size=4096):
self.window_size = window_size
self.kv_cache = []
def add(self, key, value):
"""添加新的KV"""
self.kv_cache.append((key, value))
# 滑动窗口:移除过期的KV
if len(self.kv_cache) > self.window_size:
self.kv_cache.pop(0)
def get_all(self):
"""获取窗口内的所有KV"""
return self.kv_cache思想:类似DGIM算法,用有限窗口管理无限数据。
2. PagedAttention(vLLM)
问题:KV Cache内存碎片化,浪费空间。
解决方案:将KV Cache分页管理,类似操作系统虚拟内存。
python
class PagedAttention:
"""分页注意力(vLLM思想)"""
def __init__(self, page_size=16):
self.page_size = page_size
self.pages = {} # 物理页池
self.page_table = {} # 逻辑页到物理页映射
def allocate_page(self):
"""分配一个物理页"""
page_id = self._find_free_page()
self.pages[page_id] = Page()
return page_id
def map_logical_to_physical(self, logical_page, physical_page):
"""逻辑页映射到物理页"""
self.page_table[logical_page] = physical_page优势:
- 内存利用率高(避免碎片)
- 可以共享KV Cache(多请求共享前缀)
3. 上下文压缩技术
问题:长上下文需要压缩。
解决方案:
| 技术 | 思想 | 空间节省 |
|---|---|---|
| Gist | 训练模型压缩上下文 | 训练代价高 |
| AutoCompressor | 自动生成摘要 | 摘要质量依赖模型 |
| LLMLingua | 删除冗余Token | 简单实现 |
python
class ContextCompressor:
"""上下文压缩示意"""
def compress(self, long_context):
"""压缩长上下文"""
# 识别关键Token(类似Top-K)
important_tokens = self._extract_important(long_context)
# 生成摘要(类似摘要结构)
summary = self._summarize(long_context)
# 压缩结果 = 摘要 + 关键Token
compressed = summary + important_tokens
return compressed类比:这正是流式算法"摘要"思想的体现!
4. 流式RAG
问题:RAG需要实时更新知识。
解决方案:增量检索。
python
class StreamingRAG:
"""流式RAG示意"""
def __init__(self, vector_db, chunk_size=500):
self.vector_db = vector_db
self.chunk_size = chunk_size
self.chunk_buffer = []
def add_document_stream(self, doc_stream):
"""流式添加文档"""
for chunk in doc_stream:
self.chunk_buffer.append(chunk)
if len(self.chunk_buffer) >= self.chunk_size:
# 增量索引:不需要重建整个索引
self.vector_db.add_batch(self.chunk_buffer)
self.chunk_buffer = []
def query_streaming(self, query):
"""流式查询"""
# 检索相关文档(类似频率估计)
relevant_docs = self.vector_db.search(query)
# 增量生成回答
context = self._build_context(relevant_docs)
return self.llm.generate_streaming(query, context)类比:
- 增量索引 → 流式算法的"增量维护"
- 实时检索 → 流式算法的"实时响应"
在线学习:流式数据的持续学习
传统学习 vs 在线学习
| 批处理学习 | 在线学习 |
|---|---|
| 多轮遍历数据 | 单遍,即时更新 |
| 收敛到全局最优 | 追踪数据分布变化 |
| 静态模型 | 动态适应 |
流式微调
python
class OnlineFineTuner:
"""流式微调示意"""
def __init__(self, model, learning_rate=0.01):
self.model = model
self.lr = learning_rate
self.loss_history = []
def process_stream(self, data_stream):
"""处理流式数据"""
for x, y in data_stream:
# 预测
pred = self.model(x)
# 即时损失
loss = self.loss_fn(pred, y)
# 在线梯度下降
self.model.update(loss.gradient(), self.lr)
# 记录损失用于检测概念漂移
self.loss_history.append(loss)
# 概念漂移检测
if self._detect_drift(loss_history):
self.model.adapt() # 适应性调整
def _detect_drift(self, losses):
"""检测数据分布变化"""
# 使用滑动窗口监控损失
recent = losses[-100:]
historical = losses[:-100]
if mean(recent) > mean(historical) * 1.5:
return True # 检测到漂移
return False类比:
- 概念漂移检测 → 滑动窗口的"过期处理"
- 在线梯度下降 → 流式算法的"增量更新"
LLM与流式算法的交叉
LLM可以做什么?
| 任务 | LLM能力 |
|---|---|
| 生成流式算法代码 | ✓ 能生成CMS、HLL的Python实现 |
| 解释流式算法原理 | ✓ 能解释"为什么取最小值" |
| 分析流式约束 | 部分 能识别部分约束,但可能遗漏 |
| 设计摘要结构 | ✗ 无法自动设计最优摘要结构 |
| 权衡精度和空间 | ✗ 需要人类指导权衡决策 |
LLM不能做什么?
- 自动分析流式约束:LLM可能忽略"空间爆炸"问题
- 设计最优摘要结构:需要算法理论的数学证明
- 权衡精度和空间:需要理解业务需求和技术约束
人的价值
LLM时代,人的价值在于:
1. 识别问题是否适合流式
- LLM可能给出"静态方案"
- 人需要识别"数据永不停止"的约束
2. 设计合适的摘要结构
- LLM可能给出"精确哈希表"
- 人需要设计CMS或HLL
3. 分析精度保证
- LLM可能说"这个方案可行"
- 人需要分析"误差不超过10%"War Story:LLM的上下文窗口教训
案例分析
场景:用户上传100页PDF,要求LLM回答问题
LLM方案:
1. 把整个PDF编码成Token → 50000 Tokens
2. 上下文窗口只有4096 → 无法一次处理
LLM尝试:
截断前4096个Token → 用户问的问题在第50页 → 无法回答
正确方案:
1. 分块处理PDF(类似流式分块)
2. 用向量检索找到相关块(类似流式RAG)
3. 只编码相关块到上下文窗口教训:
- LLM的上下文窗口就是流式约束
- 需要"摘要结构"(检索+压缩)处理长文档
与分布式算法的对比
💡 与分布式算法的区别
分布式LLM推理需要多机协调:每台机器处理部分Token,然后汇总。
分布式KV Cache管理更复杂:需要跨机器同步。
单机流式LLM只需要考虑内存约束,分布式还需要考虑通信成本。
| 维度 | 分布式LLM推理 | 单机LLM推理 |
|---|---|---|
| KV Cache管理 | 需要跨机器同步 | 本地管理 |
| 通信成本 | 传输KV Cache | 无 |
| 约束 | 空间 + 通信 | 仅空间 |
LLM时代的流式应用
1. 流式文本处理
python
class StreamingTextProcessor:
"""流式文本处理"""
def __init__(self, llm):
self.llm = llm
self.cms = CountMinSketch(0.1, 0.01) # 词频估计
self.hll = HyperLogLog(14) # 不同词估计
def process_stream(self, text_stream):
for chunk in text_stream:
words = chunk.split()
for word in words:
self.cms.add(word)
self.hll.add(word)
def analyze(self):
"""分析文本统计"""
return {
'total_words': self.cms.get_total(),
'unique_words': self.hll.count(),
'top_words': self._get_top_k_words()
}2. 流式推荐
python
class StreamingRecommender:
"""流式推荐系统"""
def __init__(self):
self.user_profiles = {} # 用户画像(滑动窗口)
self.item_frequency = CountMinSketch(0.1, 0.01) # 物品频率
def process_interaction(self, user_id, item_id):
"""处理用户交互"""
# 更新物品频率
self.item_frequency.add(item_id)
# 更新用户画像(滑动窗口)
if user_id not in self.user_profiles:
self.user_profiles[user_id] = SlidingWindow(100)
self.user_profiles[user_id].add(item_id)
def recommend(self, user_id):
"""推荐"""
profile = self.user_profiles[user_id]
# 基于用户画像推荐
return self._generate_recommendations(profile)3. 流式检测
python
class StreamingAnomalyDetector:
"""流式异常检测"""
def __init__(self):
self.window = ExponentialWindow(decay_rate=0.01)
self.threshold = 2.0
def process(self, value):
"""处理数据点"""
self.window.add(value)
mean = self.window.query()
if value > mean * self.threshold:
return "异常!"
return "正常"小结
核心收获
- LLM的流式本质:逐Token生成是流式过程
- 上下文窗口约束:类比流式算法的内存约束
- KV Cache:增量推理的摘要结构
- 前沿技术:滑动窗口KV Cache、上下文压缩、流式RAG
- 在线学习:追踪数据分布变化
检查你的理解
- LLM生成为什么是流式过程?
- 上下文窗口如何对应流式约束?
- KV Cache的流式性质是什么?
- LLM时代流式算法的新挑战是什么?
下一步
学会了LLM流式视角后,让我们进行综合练习:10.7 综合练习
综合练习将检验:如何审查LLM给出的流式方案?