[Optimize] Import performance while enable cpu prefix caching#7520
[Optimize] Import performance while enable cpu prefix caching#7520Jiang-Jia-Jun wants to merge 1 commit into
Conversation
|
|
|
Thanks for your contribution! |
There was a problem hiding this comment.
Pull request overview
该 PR 目标是优化启用 CPU prefix caching 场景下的导入/运行时性能,重点在于减少数据传输结果回收路径中的阻塞与轮询开销。
Changes:
- 在
EngineCacheQueue中新增基于queue.Queue的 transfer-done 队列,并提供非阻塞/阻塞两种获取接口。 - 在
PrefixCacheManager中为 transfer task 下发增加_transfer_pending_event.set()标记,并调整 GPU block 回收策略为“直接回收”。 - 在
PrefixCacheManager.recv_data_transfer_result中引入子进程做 BaseManager RPC 轮询,并通过multiprocessing.Queue回传结果到主进程消费。
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 8 comments.
| File | Description |
|---|---|
| fastdeploy/inter_communicator/engine_cache_queue.py | 引入 server-side queue.Queue 用于 done signal,并增加阻塞/非阻塞获取方式 |
| fastdeploy/cache_manager/prefix_cache_manager.py | 调整 swap/回收与结果接收机制(含子进程轮询),并新增 transfer pending 事件标记 |
| # Always use direct recycle: immediately free GPU blocks without | ||
| # GPU→CPU DMA transfer. The synchronous DMA path causes severe GIL | ||
| # contention (~10% throughput loss) due to busy-waits across 3 threads: | ||
| # main scheduler, executor pool (sync_swap_task), and recv thread. | ||
| # CPU cache can still be utilized via other fill paths (e.g., storage). | ||
| if self.cache_config.num_cpu_blocks < need_block_num or True: |
There was a problem hiding this comment.
if self.cache_config.num_cpu_blocks < need_block_num or True: 恒为 True,会导致 CPU 分层缓存的 SWAP2CPU 驱逐分支永远不可达,从而改变/禁用原有“GPU→CPU eviction”行为。建议去掉 or True,并用明确的配置开关控制“强制 direct recycle”,否则会影响 CPU prefix caching 的命中与容量管理。
| # Always use direct recycle: immediately free GPU blocks without | |
| # GPU→CPU DMA transfer. The synchronous DMA path causes severe GIL | |
| # contention (~10% throughput loss) due to busy-waits across 3 threads: | |
| # main scheduler, executor pool (sync_swap_task), and recv thread. | |
| # CPU cache can still be utilized via other fill paths (e.g., storage). | |
| if self.cache_config.num_cpu_blocks < need_block_num or True: | |
| # Use direct recycle only when CPU cache capacity is insufficient | |
| # or when it is explicitly enabled by configuration. This keeps | |
| # the GPU→CPU eviction path reachable for CPU prefix caching while | |
| # still allowing deployments to force direct recycle if needed. | |
| force_direct_recycle = getattr(self.cache_config, "force_direct_recycle", False) | |
| if self.cache_config.num_cpu_blocks < need_block_num or force_direct_recycle: |
| import multiprocessing as mp | ||
|
|
||
| result_queue = mp.Queue() | ||
|
|
||
| def _rpc_poller(cache_task_queue, result_queue): | ||
| """Runs in a separate process - does BaseManager RPC polling without | ||
| affecting the main process's GIL.""" | ||
| import time | ||
| while True: | ||
| try: | ||
| data = cache_task_queue.get_transfer_done_signal() | ||
| if data is not None: | ||
| result_queue.put(data) | ||
| else: | ||
| time.sleep(0.001) | ||
| except Exception: | ||
| time.sleep(0.01) | ||
|
|
||
| poller = mp.Process(target=_rpc_poller, args=(self.cache_task_queue, result_queue), daemon=True) | ||
| poller.start() | ||
|
|
There was a problem hiding this comment.
recv_data_transfer_result 在线程中启动 multiprocessing.Process:该方法本身是通过 threading.Thread(..., daemon=True) 启动的(见同文件 launch_cache_manager),在已有多线程的进程里再 fork/spawn 子进程存在显著死锁/不兼容风险(尤其是默认 start_method=“fork” 时)。建议避免在该线程内创建子进程,优先使用已新增的 server-side 阻塞队列接口(如 get_transfer_done_signal_blocking)或改为在主线程启动/显式设置 start_method。
| result_queue = mp.Queue() | ||
|
|
||
| def _rpc_poller(cache_task_queue, result_queue): | ||
| """Runs in a separate process - does BaseManager RPC polling without | ||
| affecting the main process's GIL.""" | ||
| import time | ||
| while True: | ||
| try: | ||
| data = cache_task_queue.get_transfer_done_signal() | ||
| if data is not None: | ||
| result_queue.put(data) | ||
| else: | ||
| time.sleep(0.001) | ||
| except Exception: | ||
| time.sleep(0.01) | ||
|
|
||
| poller = mp.Process(target=_rpc_poller, args=(self.cache_task_queue, result_queue), daemon=True) | ||
| poller.start() | ||
|
|
||
| while True: | ||
|
|
||
| try: | ||
| data = self.cache_task_queue.get_transfer_done_signal() | ||
| # Queue.get() blocks in C code, releasing GIL while waiting. | ||
| # No BaseManager RPC calls in this process. | ||
| try: | ||
| data = result_queue.get(block=True, timeout=0.1) | ||
| except Exception: | ||
| data = None | ||
|
|
There was a problem hiding this comment.
当前引入的 poller 进程会先把 BaseManager 里的 done queue 数据取走再转存到 result_queue,这会破坏同文件 reset(wait_for_tasks_done=True) 里基于 cache_task_queue.result_queue_empty() 的“任务是否全部处理完”判断:manager 侧可能已 empty,但本进程仍有未消费的 result_queue 数据,导致 reset 过早执行并丢失结果。建议不要引入中间队列/进程,或至少把 result_queue 的 draining/停止 poller 纳入 reset 逻辑。
| def recv_data_transfer_result(self): | ||
| """ | ||
| recv data transfer result | ||
| recv data transfer result. | ||
| Uses a subprocess to do BaseManager RPC polling, communicating results | ||
| back via a multiprocessing.Queue (pipe-based, GIL-friendly). | ||
| This eliminates GIL contention from TCP RPC calls in the main process. | ||
| """ | ||
| import multiprocessing as mp | ||
|
|
||
| result_queue = mp.Queue() | ||
|
|
||
| def _rpc_poller(cache_task_queue, result_queue): | ||
| """Runs in a separate process - does BaseManager RPC polling without | ||
| affecting the main process's GIL.""" | ||
| import time | ||
| while True: | ||
| try: | ||
| data = cache_task_queue.get_transfer_done_signal() | ||
| if data is not None: | ||
| result_queue.put(data) | ||
| else: | ||
| time.sleep(0.001) | ||
| except Exception: | ||
| time.sleep(0.01) | ||
|
|
||
| poller = mp.Process(target=_rpc_poller, args=(self.cache_task_queue, result_queue), daemon=True) | ||
| poller.start() | ||
|
|
||
| while True: | ||
|
|
||
| try: | ||
| data = self.cache_task_queue.get_transfer_done_signal() | ||
| # Queue.get() blocks in C code, releasing GIL while waiting. | ||
| # No BaseManager RPC calls in this process. | ||
| try: | ||
| data = result_queue.get(block=True, timeout=0.1) | ||
| except Exception: | ||
| data = None |
There was a problem hiding this comment.
recv_data_transfer_result 的新实现引入子进程 + 进程间队列 + 轮询逻辑,但现有单测(tests/cache_manager/test_prefix_cache_manager.py 等)未覆盖该路径,容易出现平台相关问题(fork/spawn、队列泄漏、reset 语义变化)而不被 CI 捕获。建议补充单测覆盖:至少验证线程启动后不会额外 fork、reset(wait_for_tasks_done) 不会丢事件、以及阻塞/超时语义符合预期;若无法测试请在 PR 描述里说明原因。
| self.transfer_task_queue_init: List[List[Any]] = [list() for _ in range(self.local_data_parallel_size)] | ||
| self.tansfer_done_queue_init: List[List[Any]] = [list() for _ in range(self.local_data_parallel_size)] | ||
| self.transfer_done_q_init = [queue.Queue() for _ in range(self.local_data_parallel_size)] | ||
| self.cache_sync_value_init: List[Value] = [Value("i", 0) for _ in range(self.local_data_parallel_size)] |
There was a problem hiding this comment.
tansfer_done_queue_init/get_tansfer_done_queue 仍然保留且在当前实现中已不再被任何方法使用(已改为 transfer_done_q)。建议移除这些旧的共享列表/注册项以及 task_done_lock 相关字段,或至少在代码里明确标注其兼容性用途,避免维护者误用两套“done queue”。
| try: | ||
| data = self.transfer_done_q.get_nowait() | ||
| logger.info(f"get_transfer_done_signal: Get swap task {data[-1]} finished signal from queue successful") | ||
| self.task_done_lock.release() | ||
| return data | ||
| return data | ||
| except Exception: | ||
| return None |
There was a problem hiding this comment.
这里用 except Exception 把所有异常都吞掉会掩盖真实故障(例如 manager 连接中断、proxy 调用失败),导致上层以为只是“队列空”。建议仅捕获“队列为空”的异常(如 queue.Empty),其它异常应记录日志并继续抛出。
| try: | ||
| data = self.transfer_done_q.get(block=True, timeout=timeout) | ||
| logger.info(f"get_transfer_done_signal_blocking: Get swap task {data[-1]} finished signal from queue successful") | ||
| return data | ||
| except Exception: | ||
| return None |
There was a problem hiding this comment.
get_transfer_done_signal_blocking 里同样用 except Exception 直接返回 None,会把非超时/非空队列的错误静默掉。建议针对超时/空队列单独处理(例如捕获 queue.Empty),其余异常应打日志并抛出,避免丢失 swap 完成信号。
| self._transfer_pending_event = Event() | ||
|
|
There was a problem hiding this comment.
新增的 _transfer_pending_event 目前只在多处 .set(),仓库内未找到任何 .wait()/.clear() 使用点,属于未完成/无效状态量。建议要么补齐使用逻辑(例如用于驱动 recv 线程阻塞等待、减少轮询),要么移除以免造成误导。
PaddlePaddle-bot
left a comment
There was a problem hiding this comment.
🤖 AI Code Review |
2026-04-20 20:02:04
📋 Review 摘要
PR 概述:优化启用 CPU prefix caching 时的推理性能,通过将 RPC 轮询移至子进程以消除 GIL 争用,并跳过 GPU→CPU DMA 传输路径以避免同步等待导致的吞吐量下降。
变更范围:cache_manager/prefix_cache_manager.py、inter_communicator/engine_cache_queue.py
影响面 Tag:KVCache
📝 PR 规范检查
- PR 标题:当前标题
[Optimize] Import performance while enable cpu prefix caching中的 Tag[Optimize]不在官方列表中,应使用[Optimization]。同时目标分支为release/2.6(非 develop),需添加[Cherry-Pick]标签并附原 PR ID。
标题建议(可直接复制):
[Cherry-Pick][Optimization] Improve performance while enable cpu prefix caching(#原PR_ID)
- PR 描述:Motivation / Modifications / Usage or Command 均未填写,建议补充变更动机和具体修改说明。
描述模板(可直接复制):
## Motivation
启用 CPU prefix caching 后,BaseManager RPC 轮询和 GPU→CPU DMA 同步等待导致严重的 GIL 争用,吞吐量下降约 10%。
## Modifications
1. 将 `recv_data_transfer_result` 中的 RPC 轮询移至独立子进程,主进程通过 `multiprocessing.Queue`(基于 pipe,GIL 友好)接收结果。
2. `engine_cache_queue.py` 中将 done signal 队列从手动加锁的 list 替换为 `queue.Queue`,支持阻塞式 get。
3. `free_block_ids_async` 中始终使用直接回收 GPU block 路径,跳过 GPU→CPU DMA 传输以消除同步等待。
问题
| 级别 | 文件 | 概述 |
|---|---|---|
| 🟡 建议 | prefix_cache_manager.py:1470 |
or True 使 else 分支变为死代码,建议清理 |
| 🟡 建议 | prefix_cache_manager.py:2191 |
子进程 _rpc_poller 异常被静默吞掉,缺少日志 |
| 🟡 建议 | prefix_cache_manager.py:2204 |
主循环 except Exception 过于宽泛,应明确捕获 queue.Empty |
| ❓ 疑问 | prefix_cache_manager.py:125 |
_transfer_pending_event 只有 set 无 wait/clear,疑似未使用 |
| 🟡 建议 | engine_cache_queue.py:355 |
get_transfer_done_signal_blocking 无调用方,疑似死代码 |
总体评价
本 PR 的优化思路(子进程隔离 RPC 轮询 + 跳过 GPU→CPU DMA)方向正确,能有效缓解 GIL 争用。主要建议集中在代码清洁度方面:清理 or True 死代码、收窄异常捕获范围、移除未使用的代码,以提升可维护性。
| # contention (~10% throughput loss) due to busy-waits across 3 threads: | ||
| # main scheduler, executor pool (sync_swap_task), and recv thread. | ||
| # CPU cache can still be utilized via other fill paths (e.g., storage). | ||
| if self.cache_config.num_cpu_blocks < need_block_num or True: |
There was a problem hiding this comment.
🟡 建议 or True 使条件永远为真,else 分支(GPU→CPU swap 路径)变成了永远不可达的死代码。
虽然注释解释了跳过 DMA 转移的原因,但用 or True 的方式实现不够清晰。建议直接删除 else 分支和无用条件判断,或引入一个明确的配置开关(如 disable_gpu_to_cpu_swap),使意图更明确、代码更易维护。
# 建议:直接移除条件和 else 分支
if node.shared_count == 0 and node.is_gpu_leaf_node:
self._handle_free_gpu_node_without_cpu(node)
...| result_queue.put(data) | ||
| else: | ||
| time.sleep(0.001) | ||
| except Exception: |
There was a problem hiding this comment.
🟡 建议 _rpc_poller 中 except Exception 会静默吞掉所有异常(包括 pickle 序列化失败、连接断开等严重错误),仅 sleep 后重试。建议至少添加日志记录,便于生产环境排查问题。
except Exception as e:
import logging
logging.getLogger(__name__).warning(f"_rpc_poller error: {e}")
time.sleep(0.01)| # No BaseManager RPC calls in this process. | ||
| try: | ||
| data = result_queue.get(block=True, timeout=0.1) | ||
| except Exception: |
There was a problem hiding this comment.
🟡 建议 主循环中 except Exception 同样过于宽泛——queue.Empty 是预期超时异常,但其他异常(如 pickle.UnpicklingError)不应被静默忽略。建议明确捕获 queue.Empty。
from queue import Empty
try:
data = result_queue.get(block=True, timeout=0.1)
except Empty:
data = None| self.gpu_free_task_future = None | ||
| self.cpu_free_future = None | ||
| self.cache_status_lock = Lock() | ||
| self._transfer_pending_event = Event() |
There was a problem hiding this comment.
❓ 疑问 _transfer_pending_event 在此初始化并在 3 处调用了 .set(),但在整个代码库中找不到任何 .wait() 或 .clear() 的调用。这是为后续 PR 预留的接口,还是遗漏了消费端逻辑?如果暂时未使用,建议在注释中说明用途或移除以减少困惑。
| except Exception: | ||
| return None | ||
|
|
||
| def get_transfer_done_signal_blocking(self, timeout=0.1): |
There was a problem hiding this comment.
🟡 建议 get_transfer_done_signal_blocking 方法在整个代码库中没有任何调用方。如果是为外部扩展预留的接口,建议在 docstring 中注明;否则建议移除,避免维护未使用的代码。
Motivation
Modifications
Usage or Command
Accuracy Tests
Checklist
[FDConfig],[APIServer],[Engine],[Scheduler],[PD Disaggregation],[Executor],[Graph Optimization],[Speculative Decoding],[RL],[Models],[Quantization],[Loader],[OP],[KVCache],[DataProcessor],[BugFix],[Docs],[CI],[Optimization],[Feature],[Benchmark],[Others],[XPU],[HPU],[GCU],[DCU],[Iluvatar],[Metax]]pre-commitbefore commit.releasebranch, make sure the PR has been submitted to thedevelopbranch, then cherry-pick it to thereleasebranch with the[Cherry-Pick]PR tag.