Skip to content

[Optimize] Import performance while enable cpu prefix caching#7520

Closed
Jiang-Jia-Jun wants to merge 1 commit into
release/2.6from
optim/cpu-cache-0420
Closed

[Optimize] Import performance while enable cpu prefix caching#7520
Jiang-Jia-Jun wants to merge 1 commit into
release/2.6from
optim/cpu-cache-0420

Conversation

@Jiang-Jia-Jun
Copy link
Copy Markdown
Collaborator

Motivation

💡 If this PR is a Cherry Pick, the PR title needs to follow the format by adding the [Cherry-Pick] label at the very beginning and appending the original PR ID at the end. For example, [Cherry-Pick][CI] Add check trigger and logic(#5191)

💡 如若此PR是Cherry Pick,PR标题需遵循格式,在最开始加上[Cherry-Pick]标签,以及最后面加上原PR ID,例如[Cherry-Pick][CI] Add check trigger and logic(#5191)

Modifications

Usage or Command

Accuracy Tests

Checklist

  • Add at least a tag in the PR title.
    • Tag list: [[FDConfig],[APIServer],[Engine], [Scheduler], [PD Disaggregation], [Executor], [Graph Optimization], [Speculative Decoding], [RL], [Models], [Quantization], [Loader], [OP], [KVCache], [DataProcessor], [BugFix], [Docs], [CI], [Optimization], [Feature], [Benchmark], [Others], [XPU], [HPU], [GCU], [DCU], [Iluvatar], [Metax]]
    • You can add new tags based on the PR content, but the semantics must be clear.
  • Format your code, run pre-commit before commit.
  • Add unit tests. Please write the reason in this PR if no unit tests.
  • Provide accuracy results.
  • If the current PR is submitting to the release branch, make sure the PR has been submitted to the develop branch, then cherry-pick it to the release branch with the [Cherry-Pick] PR tag.

Copilot AI review requested due to automatic review settings April 20, 2026 11:41
@CLAassistant
Copy link
Copy Markdown

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.
You have signed the CLA already but the status is still pending? Let us recheck it.

@paddle-bot
Copy link
Copy Markdown

paddle-bot Bot commented Apr 20, 2026

Thanks for your contribution!

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

该 PR 目标是优化启用 CPU prefix caching 场景下的导入/运行时性能,重点在于减少数据传输结果回收路径中的阻塞与轮询开销。

Changes:

  • EngineCacheQueue 中新增基于 queue.Queue 的 transfer-done 队列,并提供非阻塞/阻塞两种获取接口。
  • PrefixCacheManager 中为 transfer task 下发增加 _transfer_pending_event.set() 标记,并调整 GPU block 回收策略为“直接回收”。
  • PrefixCacheManager.recv_data_transfer_result 中引入子进程做 BaseManager RPC 轮询,并通过 multiprocessing.Queue 回传结果到主进程消费。

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 8 comments.

File Description
fastdeploy/inter_communicator/engine_cache_queue.py 引入 server-side queue.Queue 用于 done signal,并增加阻塞/非阻塞获取方式
fastdeploy/cache_manager/prefix_cache_manager.py 调整 swap/回收与结果接收机制(含子进程轮询),并新增 transfer pending 事件标记

Comment on lines +1465 to +1470
# Always use direct recycle: immediately free GPU blocks without
# GPU→CPU DMA transfer. The synchronous DMA path causes severe GIL
# contention (~10% throughput loss) due to busy-waits across 3 threads:
# main scheduler, executor pool (sync_swap_task), and recv thread.
# CPU cache can still be utilized via other fill paths (e.g., storage).
if self.cache_config.num_cpu_blocks < need_block_num or True:
Copy link

Copilot AI Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if self.cache_config.num_cpu_blocks < need_block_num or True: 恒为 True,会导致 CPU 分层缓存的 SWAP2CPU 驱逐分支永远不可达,从而改变/禁用原有“GPU→CPU eviction”行为。建议去掉 or True,并用明确的配置开关控制“强制 direct recycle”,否则会影响 CPU prefix caching 的命中与容量管理。

Suggested change
# Always use direct recycle: immediately free GPU blocks without
# GPU→CPU DMA transfer. The synchronous DMA path causes severe GIL
# contention (~10% throughput loss) due to busy-waits across 3 threads:
# main scheduler, executor pool (sync_swap_task), and recv thread.
# CPU cache can still be utilized via other fill paths (e.g., storage).
if self.cache_config.num_cpu_blocks < need_block_num or True:
# Use direct recycle only when CPU cache capacity is insufficient
# or when it is explicitly enabled by configuration. This keeps
# the GPU→CPU eviction path reachable for CPU prefix caching while
# still allowing deployments to force direct recycle if needed.
force_direct_recycle = getattr(self.cache_config, "force_direct_recycle", False)
if self.cache_config.num_cpu_blocks < need_block_num or force_direct_recycle:

Copilot uses AI. Check for mistakes.
Comment on lines +2176 to +2196
import multiprocessing as mp

result_queue = mp.Queue()

def _rpc_poller(cache_task_queue, result_queue):
"""Runs in a separate process - does BaseManager RPC polling without
affecting the main process's GIL."""
import time
while True:
try:
data = cache_task_queue.get_transfer_done_signal()
if data is not None:
result_queue.put(data)
else:
time.sleep(0.001)
except Exception:
time.sleep(0.01)

poller = mp.Process(target=_rpc_poller, args=(self.cache_task_queue, result_queue), daemon=True)
poller.start()

Copy link

Copilot AI Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

recv_data_transfer_result 在线程中启动 multiprocessing.Process:该方法本身是通过 threading.Thread(..., daemon=True) 启动的(见同文件 launch_cache_manager),在已有多线程的进程里再 fork/spawn 子进程存在显著死锁/不兼容风险(尤其是默认 start_method=“fork” 时)。建议避免在该线程内创建子进程,优先使用已新增的 server-side 阻塞队列接口(如 get_transfer_done_signal_blocking)或改为在主线程启动/显式设置 start_method。

Copilot uses AI. Check for mistakes.
Comment on lines +2178 to +2206
result_queue = mp.Queue()

def _rpc_poller(cache_task_queue, result_queue):
"""Runs in a separate process - does BaseManager RPC polling without
affecting the main process's GIL."""
import time
while True:
try:
data = cache_task_queue.get_transfer_done_signal()
if data is not None:
result_queue.put(data)
else:
time.sleep(0.001)
except Exception:
time.sleep(0.01)

poller = mp.Process(target=_rpc_poller, args=(self.cache_task_queue, result_queue), daemon=True)
poller.start()

while True:

try:
data = self.cache_task_queue.get_transfer_done_signal()
# Queue.get() blocks in C code, releasing GIL while waiting.
# No BaseManager RPC calls in this process.
try:
data = result_queue.get(block=True, timeout=0.1)
except Exception:
data = None

Copy link

Copilot AI Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

当前引入的 poller 进程会先把 BaseManager 里的 done queue 数据取走再转存到 result_queue,这会破坏同文件 reset(wait_for_tasks_done=True) 里基于 cache_task_queue.result_queue_empty() 的“任务是否全部处理完”判断:manager 侧可能已 empty,但本进程仍有未消费的 result_queue 数据,导致 reset 过早执行并丢失结果。建议不要引入中间队列/进程,或至少把 result_queue 的 draining/停止 poller 纳入 reset 逻辑。

Copilot uses AI. Check for mistakes.
Comment on lines 2169 to +2205
def recv_data_transfer_result(self):
"""
recv data transfer result
recv data transfer result.
Uses a subprocess to do BaseManager RPC polling, communicating results
back via a multiprocessing.Queue (pipe-based, GIL-friendly).
This eliminates GIL contention from TCP RPC calls in the main process.
"""
import multiprocessing as mp

result_queue = mp.Queue()

def _rpc_poller(cache_task_queue, result_queue):
"""Runs in a separate process - does BaseManager RPC polling without
affecting the main process's GIL."""
import time
while True:
try:
data = cache_task_queue.get_transfer_done_signal()
if data is not None:
result_queue.put(data)
else:
time.sleep(0.001)
except Exception:
time.sleep(0.01)

poller = mp.Process(target=_rpc_poller, args=(self.cache_task_queue, result_queue), daemon=True)
poller.start()

while True:

try:
data = self.cache_task_queue.get_transfer_done_signal()
# Queue.get() blocks in C code, releasing GIL while waiting.
# No BaseManager RPC calls in this process.
try:
data = result_queue.get(block=True, timeout=0.1)
except Exception:
data = None
Copy link

Copilot AI Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

recv_data_transfer_result 的新实现引入子进程 + 进程间队列 + 轮询逻辑,但现有单测(tests/cache_manager/test_prefix_cache_manager.py 等)未覆盖该路径,容易出现平台相关问题(fork/spawn、队列泄漏、reset 语义变化)而不被 CI 捕获。建议补充单测覆盖:至少验证线程启动后不会额外 fork、reset(wait_for_tasks_done) 不会丢事件、以及阻塞/超时语义符合预期;若无法测试请在 PR 描述里说明原因。

Copilot generated this review using guidance from repository custom instructions.
Comment on lines 83 to 86
self.transfer_task_queue_init: List[List[Any]] = [list() for _ in range(self.local_data_parallel_size)]
self.tansfer_done_queue_init: List[List[Any]] = [list() for _ in range(self.local_data_parallel_size)]
self.transfer_done_q_init = [queue.Queue() for _ in range(self.local_data_parallel_size)]
self.cache_sync_value_init: List[Value] = [Value("i", 0) for _ in range(self.local_data_parallel_size)]
Copy link

Copilot AI Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tansfer_done_queue_init/get_tansfer_done_queue 仍然保留且在当前实现中已不再被任何方法使用(已改为 transfer_done_q)。建议移除这些旧的共享列表/注册项以及 task_done_lock 相关字段,或至少在代码里明确标注其兼容性用途,避免维护者误用两套“done queue”。

Copilot uses AI. Check for mistakes.
Comment on lines +348 to +353
try:
data = self.transfer_done_q.get_nowait()
logger.info(f"get_transfer_done_signal: Get swap task {data[-1]} finished signal from queue successful")
self.task_done_lock.release()
return data
return data
except Exception:
return None
Copy link

Copilot AI Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里用 except Exception 把所有异常都吞掉会掩盖真实故障(例如 manager 连接中断、proxy 调用失败),导致上层以为只是“队列空”。建议仅捕获“队列为空”的异常(如 queue.Empty),其它异常应记录日志并继续抛出。

Copilot uses AI. Check for mistakes.
Comment on lines +361 to +366
try:
data = self.transfer_done_q.get(block=True, timeout=timeout)
logger.info(f"get_transfer_done_signal_blocking: Get swap task {data[-1]} finished signal from queue successful")
return data
except Exception:
return None
Copy link

Copilot AI Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get_transfer_done_signal_blocking 里同样用 except Exception 直接返回 None,会把非超时/非空队列的错误静默掉。建议针对超时/空队列单独处理(例如捕获 queue.Empty),其余异常应打日志并抛出,避免丢失 swap 完成信号。

Copilot uses AI. Check for mistakes.
Comment on lines +125 to 126
self._transfer_pending_event = Event()

Copy link

Copilot AI Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

新增的 _transfer_pending_event 目前只在多处 .set(),仓库内未找到任何 .wait()/.clear() 使用点,属于未完成/无效状态量。建议要么补齐使用逻辑(例如用于驱动 recv 线程阻塞等待、减少轮询),要么移除以免造成误导。

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown

@PaddlePaddle-bot PaddlePaddle-bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 AI Code Review | 2026-04-20 20:02:04

📋 Review 摘要

PR 概述:优化启用 CPU prefix caching 时的推理性能,通过将 RPC 轮询移至子进程以消除 GIL 争用,并跳过 GPU→CPU DMA 传输路径以避免同步等待导致的吞吐量下降。
变更范围cache_manager/prefix_cache_manager.pyinter_communicator/engine_cache_queue.py
影响面 TagKVCache

📝 PR 规范检查

  1. PR 标题:当前标题 [Optimize] Import performance while enable cpu prefix caching 中的 Tag [Optimize] 不在官方列表中,应使用 [Optimization]。同时目标分支为 release/2.6(非 develop),需添加 [Cherry-Pick] 标签并附原 PR ID。

标题建议(可直接复制):

  • [Cherry-Pick][Optimization] Improve performance while enable cpu prefix caching(#原PR_ID)
  1. PR 描述:Motivation / Modifications / Usage or Command 均未填写,建议补充变更动机和具体修改说明。

描述模板(可直接复制):

## Motivation
启用 CPU prefix caching 后,BaseManager RPC 轮询和 GPU→CPU DMA 同步等待导致严重的 GIL 争用,吞吐量下降约 10%。

## Modifications
1. 将 `recv_data_transfer_result` 中的 RPC 轮询移至独立子进程,主进程通过 `multiprocessing.Queue`(基于 pipe,GIL 友好)接收结果。
2. `engine_cache_queue.py` 中将 done signal 队列从手动加锁的 list 替换为 `queue.Queue`,支持阻塞式 get。
3. `free_block_ids_async` 中始终使用直接回收 GPU block 路径,跳过 GPU→CPU DMA 传输以消除同步等待。

问题

级别 文件 概述
🟡 建议 prefix_cache_manager.py:1470 or True 使 else 分支变为死代码,建议清理
🟡 建议 prefix_cache_manager.py:2191 子进程 _rpc_poller 异常被静默吞掉,缺少日志
🟡 建议 prefix_cache_manager.py:2204 主循环 except Exception 过于宽泛,应明确捕获 queue.Empty
❓ 疑问 prefix_cache_manager.py:125 _transfer_pending_event 只有 set 无 wait/clear,疑似未使用
🟡 建议 engine_cache_queue.py:355 get_transfer_done_signal_blocking 无调用方,疑似死代码

总体评价

本 PR 的优化思路(子进程隔离 RPC 轮询 + 跳过 GPU→CPU DMA)方向正确,能有效缓解 GIL 争用。主要建议集中在代码清洁度方面:清理 or True 死代码、收窄异常捕获范围、移除未使用的代码,以提升可维护性。

# contention (~10% throughput loss) due to busy-waits across 3 threads:
# main scheduler, executor pool (sync_swap_task), and recv thread.
# CPU cache can still be utilized via other fill paths (e.g., storage).
if self.cache_config.num_cpu_blocks < need_block_num or True:
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 建议 or True 使条件永远为真,else 分支(GPU→CPU swap 路径)变成了永远不可达的死代码。

虽然注释解释了跳过 DMA 转移的原因,但用 or True 的方式实现不够清晰。建议直接删除 else 分支和无用条件判断,或引入一个明确的配置开关(如 disable_gpu_to_cpu_swap),使意图更明确、代码更易维护。

# 建议:直接移除条件和 else 分支
if node.shared_count == 0 and node.is_gpu_leaf_node:
    self._handle_free_gpu_node_without_cpu(node)
    ...

result_queue.put(data)
else:
time.sleep(0.001)
except Exception:
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 建议 _rpc_pollerexcept Exception 会静默吞掉所有异常(包括 pickle 序列化失败、连接断开等严重错误),仅 sleep 后重试。建议至少添加日志记录,便于生产环境排查问题。

except Exception as e:
    import logging
    logging.getLogger(__name__).warning(f"_rpc_poller error: {e}")
    time.sleep(0.01)

# No BaseManager RPC calls in this process.
try:
data = result_queue.get(block=True, timeout=0.1)
except Exception:
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 建议 主循环中 except Exception 同样过于宽泛——queue.Empty 是预期超时异常,但其他异常(如 pickle.UnpicklingError)不应被静默忽略。建议明确捕获 queue.Empty

from queue import Empty
try:
    data = result_queue.get(block=True, timeout=0.1)
except Empty:
    data = None

self.gpu_free_task_future = None
self.cpu_free_future = None
self.cache_status_lock = Lock()
self._transfer_pending_event = Event()
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❓ 疑问 _transfer_pending_event 在此初始化并在 3 处调用了 .set(),但在整个代码库中找不到任何 .wait().clear() 的调用。这是为后续 PR 预留的接口,还是遗漏了消费端逻辑?如果暂时未使用,建议在注释中说明用途或移除以减少困惑。

except Exception:
return None

def get_transfer_done_signal_blocking(self, timeout=0.1):
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 建议 get_transfer_done_signal_blocking 方法在整个代码库中没有任何调用方。如果是为外部扩展预留的接口,建议在 docstring 中注明;否则建议移除,避免维护未使用的代码。

@Jiang-Jia-Jun Jiang-Jia-Jun deleted the optim/cpu-cache-0420 branch May 7, 2026 03:51
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants