Skip to content

10.6 LLM流式视角:流式生成与增量推理

核心问题:LLM时代的流式算法有什么新挑战?


开场问题:LLM的流式生成

想象你在使用ChatGPT,输入问题后:

你:请解释什么是流式算法?

ChatGPT:流式算法是一种处理无限数据流的算法...
         流式|算法|是一种|处理|无限|数据|流|的|算法|...
         Token逐个生成,用户边看边读

困惑:LLM为什么会"逐Token生成"?这不是"慢"吗?

LLM生成的本质

关键洞察:LLM的生成本质上是一个流式过程。

输入:前缀(用户的问题)
生成:Token1 → Token2 → Token3 → ...

每个Token的生成:
  输入:前缀 + 已生成的Token
  输出:下一个Token的概率分布
  选择:从分布中采样一个Token
  
约束:
  上下文窗口有限 → 无法存储无限历史
  只能看已生成的Token → 单遍扫描
  生成的Token不可撤销 → 在线决策

LLM的流式本质

Token级生成的流式特性

时间 → 

T0: 输入前缀 + Token1 → Token1
T1: 输入前缀 + Token1 + Token2 → Token2
T2: 输入前缀 + Token1 + Token2 + Token3 → Token3
...

每次生成:
  只计算最新Token的输出
  已计算的Token(KV Cache)被缓存

流式约束的映射

传统流式约束LLM流式约束
单遍扫描只能看已生成的Token
内存有限上下文窗口有限
实时响应用户期望即时反馈
不可撤销Token生成后不可删除

关键洞察:上下文窗口 = 流式算法的"内存约束"。


流式上下文处理

问题:上下文窗口有限,如何处理无限输入?

LLM上下文窗口:例如 4096 Tokens
输入文档:可能 10000 Tokens

问题:
  - 无法一次处理整个文档
  - 如何处理超过窗口的内容?

方案对比

方案思想优点缺点
截断只保留最近的上下文简单实现丢失早期信息
摘要压缩早期上下文保留关键信息摘要质量依赖模型
滑动窗口类似流式算法的滑动窗口自然过渡窗口大小权衡
分块处理分多次处理,汇总可处理长文档需要合并策略

滑动窗口注意力

python
class SlidingWindowAttention:
    """滑动窗口注意力机制"""
    def __init__(self, window_size=512):
        self.W = window_size
    
    def forward(self, query, key, value):
        """只关注最近W个Token"""
        L = query.shape[1]  # 序列长度
        
        # 创建滑动窗口掩码
        mask = torch.zeros(L, L)
        for i in range(L):
            for j in range(L):
                if j < i - self.W or j > i:  # 窗口外
                    mask[i, j] = -inf  # 掩盖
        
        # 缩放点积注意力
        scores = (query @ key.transpose(-2, -1)) / sqrt(d_k)
        scores = scores + mask  # 应用掩码
        return softmax(scores) @ value

复杂度:O(L · W) 而非 O(L²)

类比:这正是DGIM算法的滑动窗口思想!


KV Cache:增量推理的摘要结构

什么是KV Cache?

传统注意力计算:
  每次生成新Token,都要重新计算所有Token的Key和Value
  
KV Cache:
  缓存已计算Token的Key和Value
  新Token只需计算自己的KV,然后与缓存的KV一起做注意力

KV Cache的流式性质

python
class StreamingLLM:
    """流式LLM推理示意"""
    def __init__(self, model):
        self.model = model
        self.kv_cache = {}  # KV Cache
    
    def stream_generate(self, prompt, max_tokens=100):
        # 编码prompt(预填充)
        input_ids = self.model.encode(prompt)
        
        # 预填充:可以并行处理prompt
        past_key_values = None
        for token in input_ids:
            past_key_values = self.model.forward(
                token, past_key_values, use_cache=True
            )
        
        # 流式生成
        for _ in range(max_tokens):
            # 只计算最新Token
            next_token, past_key_values = self.model.forward(
                past_key_values=past_key_values,
                use_cache=True
            )
            yield self.model.decode(next_token)

KV Cache的空间复杂度

KV Cache大小:O(L · d)
  L: 序列长度
  d: 隐藏维度

问题:
  序列长度L增长 → KV Cache增长
  上下文窗口限制 → KV Cache不能无限增长

类比:KV Cache正是流式算法的"摘要结构"!


前沿流式技术(P1改进)

1. KV Cache的滑动窗口管理

问题:KV Cache增长超过内存限制。

解决方案:滑动窗口管理KV Cache。

python
class SlidingKVCache:
    """滑动窗口KV Cache"""
    def __init__(self, window_size=4096):
        self.window_size = window_size
        self.kv_cache = []
    
    def add(self, key, value):
        """添加新的KV"""
        self.kv_cache.append((key, value))
        
        # 滑动窗口:移除过期的KV
        if len(self.kv_cache) > self.window_size:
            self.kv_cache.pop(0)
    
    def get_all(self):
        """获取窗口内的所有KV"""
        return self.kv_cache

思想:类似DGIM算法,用有限窗口管理无限数据。

2. PagedAttention(vLLM)

问题:KV Cache内存碎片化,浪费空间。

解决方案:将KV Cache分页管理,类似操作系统虚拟内存。

python
class PagedAttention:
    """分页注意力(vLLM思想)"""
    def __init__(self, page_size=16):
        self.page_size = page_size
        self.pages = {}  # 物理页池
        self.page_table = {}  # 逻辑页到物理页映射
    
    def allocate_page(self):
        """分配一个物理页"""
        page_id = self._find_free_page()
        self.pages[page_id] = Page()
        return page_id
    
    def map_logical_to_physical(self, logical_page, physical_page):
        """逻辑页映射到物理页"""
        self.page_table[logical_page] = physical_page

优势

  • 内存利用率高(避免碎片)
  • 可以共享KV Cache(多请求共享前缀)

3. 上下文压缩技术

问题:长上下文需要压缩。

解决方案

技术思想空间节省
Gist训练模型压缩上下文训练代价高
AutoCompressor自动生成摘要摘要质量依赖模型
LLMLingua删除冗余Token简单实现
python
class ContextCompressor:
    """上下文压缩示意"""
    def compress(self, long_context):
        """压缩长上下文"""
        # 识别关键Token(类似Top-K)
        important_tokens = self._extract_important(long_context)
        
        # 生成摘要(类似摘要结构)
        summary = self._summarize(long_context)
        
        # 压缩结果 = 摘要 + 关键Token
        compressed = summary + important_tokens
        return compressed

类比:这正是流式算法"摘要"思想的体现!

4. 流式RAG

问题:RAG需要实时更新知识。

解决方案:增量检索。

python
class StreamingRAG:
    """流式RAG示意"""
    def __init__(self, vector_db, chunk_size=500):
        self.vector_db = vector_db
        self.chunk_size = chunk_size
        self.chunk_buffer = []
    
    def add_document_stream(self, doc_stream):
        """流式添加文档"""
        for chunk in doc_stream:
            self.chunk_buffer.append(chunk)
            
            if len(self.chunk_buffer) >= self.chunk_size:
                # 增量索引:不需要重建整个索引
                self.vector_db.add_batch(self.chunk_buffer)
                self.chunk_buffer = []
    
    def query_streaming(self, query):
        """流式查询"""
        # 检索相关文档(类似频率估计)
        relevant_docs = self.vector_db.search(query)
        
        # 增量生成回答
        context = self._build_context(relevant_docs)
        return self.llm.generate_streaming(query, context)

类比

  • 增量索引 → 流式算法的"增量维护"
  • 实时检索 → 流式算法的"实时响应"

在线学习:流式数据的持续学习

传统学习 vs 在线学习

批处理学习在线学习
多轮遍历数据单遍,即时更新
收敛到全局最优追踪数据分布变化
静态模型动态适应

流式微调

python
class OnlineFineTuner:
    """流式微调示意"""
    def __init__(self, model, learning_rate=0.01):
        self.model = model
        self.lr = learning_rate
        self.loss_history = []
    
    def process_stream(self, data_stream):
        """处理流式数据"""
        for x, y in data_stream:
            # 预测
            pred = self.model(x)
            
            # 即时损失
            loss = self.loss_fn(pred, y)
            
            # 在线梯度下降
            self.model.update(loss.gradient(), self.lr)
            
            # 记录损失用于检测概念漂移
            self.loss_history.append(loss)
            
            # 概念漂移检测
            if self._detect_drift(loss_history):
                self.model.adapt()  # 适应性调整
    
    def _detect_drift(self, losses):
        """检测数据分布变化"""
        # 使用滑动窗口监控损失
        recent = losses[-100:]
        historical = losses[:-100]
        
        if mean(recent) > mean(historical) * 1.5:
            return True  # 检测到漂移
        return False

类比

  • 概念漂移检测 → 滑动窗口的"过期处理"
  • 在线梯度下降 → 流式算法的"增量更新"

LLM与流式算法的交叉

LLM可以做什么?

任务LLM能力
生成流式算法代码✓ 能生成CMS、HLL的Python实现
解释流式算法原理✓ 能解释"为什么取最小值"
分析流式约束部分 能识别部分约束,但可能遗漏
设计摘要结构✗ 无法自动设计最优摘要结构
权衡精度和空间✗ 需要人类指导权衡决策

LLM不能做什么?

  1. 自动分析流式约束:LLM可能忽略"空间爆炸"问题
  2. 设计最优摘要结构:需要算法理论的数学证明
  3. 权衡精度和空间:需要理解业务需求和技术约束

人的价值

LLM时代,人的价值在于:

1. 识别问题是否适合流式
   - LLM可能给出"静态方案"
   - 人需要识别"数据永不停止"的约束

2. 设计合适的摘要结构
   - LLM可能给出"精确哈希表"
   - 人需要设计CMS或HLL

3. 分析精度保证
   - LLM可能说"这个方案可行"
   - 人需要分析"误差不超过10%"

War Story:LLM的上下文窗口教训

案例分析

场景:用户上传100页PDF,要求LLM回答问题

LLM方案:
  1. 把整个PDF编码成Token → 50000 Tokens
  2. 上下文窗口只有4096 → 无法一次处理
  
LLM尝试:
  截断前4096个Token → 用户问的问题在第50页 → 无法回答
  
正确方案:
  1. 分块处理PDF(类似流式分块)
  2. 用向量检索找到相关块(类似流式RAG)
  3. 只编码相关块到上下文窗口

教训

  • LLM的上下文窗口就是流式约束
  • 需要"摘要结构"(检索+压缩)处理长文档

与分布式算法的对比

💡 与分布式算法的区别

分布式LLM推理需要多机协调:每台机器处理部分Token,然后汇总。

分布式KV Cache管理更复杂:需要跨机器同步。

单机流式LLM只需要考虑内存约束,分布式还需要考虑通信成本。

维度分布式LLM推理单机LLM推理
KV Cache管理需要跨机器同步本地管理
通信成本传输KV Cache
约束空间 + 通信仅空间

LLM时代的流式应用

1. 流式文本处理

python
class StreamingTextProcessor:
    """流式文本处理"""
    def __init__(self, llm):
        self.llm = llm
        self.cms = CountMinSketch(0.1, 0.01)  # 词频估计
        self.hll = HyperLogLog(14)  # 不同词估计
    
    def process_stream(self, text_stream):
        for chunk in text_stream:
            words = chunk.split()
            for word in words:
                self.cms.add(word)
                self.hll.add(word)
    
    def analyze(self):
        """分析文本统计"""
        return {
            'total_words': self.cms.get_total(),
            'unique_words': self.hll.count(),
            'top_words': self._get_top_k_words()
        }

2. 流式推荐

python
class StreamingRecommender:
    """流式推荐系统"""
    def __init__(self):
        self.user_profiles = {}  # 用户画像(滑动窗口)
        self.item_frequency = CountMinSketch(0.1, 0.01)  # 物品频率
    
    def process_interaction(self, user_id, item_id):
        """处理用户交互"""
        # 更新物品频率
        self.item_frequency.add(item_id)
        
        # 更新用户画像(滑动窗口)
        if user_id not in self.user_profiles:
            self.user_profiles[user_id] = SlidingWindow(100)
        self.user_profiles[user_id].add(item_id)
    
    def recommend(self, user_id):
        """推荐"""
        profile = self.user_profiles[user_id]
        # 基于用户画像推荐
        return self._generate_recommendations(profile)

3. 流式检测

python
class StreamingAnomalyDetector:
    """流式异常检测"""
    def __init__(self):
        self.window = ExponentialWindow(decay_rate=0.01)
        self.threshold = 2.0
    
    def process(self, value):
        """处理数据点"""
        self.window.add(value)
        mean = self.window.query()
        
        if value > mean * self.threshold:
            return "异常!"
        return "正常"

小结

核心收获

  1. LLM的流式本质:逐Token生成是流式过程
  2. 上下文窗口约束:类比流式算法的内存约束
  3. KV Cache:增量推理的摘要结构
  4. 前沿技术:滑动窗口KV Cache、上下文压缩、流式RAG
  5. 在线学习:追踪数据分布变化

检查你的理解

  1. LLM生成为什么是流式过程?
  2. 上下文窗口如何对应流式约束?
  3. KV Cache的流式性质是什么?
  4. LLM时代流式算法的新挑战是什么?

下一步

学会了LLM流式视角后,让我们进行综合练习:10.7 综合练习

综合练习将检验:如何审查LLM给出的流式方案?

新时代的算法课程