diff --git a/custom_ops/gpu_ops/cpp_extensions.cc b/custom_ops/gpu_ops/cpp_extensions.cc index 1b260d62769..6718e97d56c 100644 --- a/custom_ops/gpu_ops/cpp_extensions.cc +++ b/custom_ops/gpu_ops/cpp_extensions.cc @@ -1041,10 +1041,11 @@ void NgramMatch(const paddle::Tensor& token_ids_all, const paddle::Tensor& seq_lens_decoder, const paddle::Tensor& max_dec_len, const int max_ngram_size, - const int max_draft_tokens); + const int max_draft_tokens, + const bool pad_to_max); -void HybridMtpNgram(const paddle::Tensor& input_ids, - const paddle::Tensor& input_ids_len, +void HybridMtpNgram(const paddle::Tensor& token_ids_all, + const paddle::Tensor& prompt_lens, const paddle::Tensor& pre_ids, const paddle::Tensor& step_idx, const paddle::Tensor& draft_token_num, @@ -1054,7 +1055,8 @@ void HybridMtpNgram(const paddle::Tensor& input_ids, const paddle::Tensor& max_dec_len, const int max_ngram_size, const int min_ngram_size, - const int max_draft_tokens); + const int max_draft_tokens, + const bool pad_to_max); // MTP void DraftModelPostprocess(const paddle::Tensor& base_model_draft_tokens, diff --git a/custom_ops/gpu_ops/speculate_decoding/draft_model/ngram_match_mixed.cu b/custom_ops/gpu_ops/speculate_decoding/draft_model/ngram_match_mixed.cu index 778a6112367..e084c67381d 100644 --- a/custom_ops/gpu_ops/speculate_decoding/draft_model/ngram_match_mixed.cu +++ b/custom_ops/gpu_ops/speculate_decoding/draft_model/ngram_match_mixed.cu @@ -29,8 +29,8 @@ // Also copies tentative matched tokens to scratch buffers. // ============================================================ __global__ void ngram_match_mixed_search_kernel( - const int64_t *input_ids, - const int64_t *input_ids_len, + const int64_t *token_ids_all, + const int64_t *prompt_lens, const int64_t *pre_ids, const int64_t *step_idx, const int *draft_token_num, @@ -38,7 +38,7 @@ __global__ void ngram_match_mixed_search_kernel( const int64_t *max_dec_len, int64_t *draft_tokens_copy, int32_t *seq_lens_this_time_copy, - int64_t input_ids_stride, + int64_t max_model_len, int64_t pre_ids_stride, int64_t draft_tokens_stride, int64_t max_batch_size, @@ -69,8 +69,9 @@ __global__ void ngram_match_mixed_search_kernel( if (draft_budget <= 0 || remaining_dec <= 0) return; int max_draft_tokens = static_cast(min(draft_budget, remaining_dec)); - const int64_t *cur_input_ids = input_ids + batch_idx * input_ids_stride; - const int64_t cur_input_ids_len = input_ids_len[batch_idx]; + const int64_t prompt_len = prompt_lens[batch_idx]; + const int64_t *cur_input_ids = token_ids_all + batch_idx * max_model_len; + const int64_t cur_input_ids_len = prompt_len; const int64_t *cur_pre_ids = pre_ids + batch_idx * pre_ids_stride; const int64_t cur_step_idx = step_idx[batch_idx]; @@ -143,7 +144,9 @@ __global__ void ngram_match_mixed_gather_kernel( int32_t *seq_lens_this_time, int64_t draft_tokens_stride, int64_t max_batch_size, - int threshold) { + int threshold, + int max_draft_tokens_param, + bool pad_to_max) { typedef cub::BlockScan BlockScanInt; __shared__ typename BlockScanInt::TempStorage temp_storage1; __shared__ typename BlockScanInt::TempStorage temp_storage2; @@ -202,9 +205,8 @@ __global__ void ngram_match_mixed_gather_kernel( } actual = min(actual, tentative); - seq_lens_this_time[tid] = actual; - - // Copy ngram draft tokens from scratch to output + // Copy ngram draft tokens from scratch to output FIRST + // (so subsequent padding doesn't overwrite real ngram hits) int ngram_to_copy = actual - ori; if (ngram_to_copy > 0) { int64_t *dst = draft_tokens + tid * draft_tokens_stride; @@ -213,6 +215,38 @@ __global__ void ngram_match_mixed_gather_kernel( dst[ori + k] = src[ori + k]; } } + + // === Pad seq_lens_this_time to num_speculative_tokens+1 for cudagraph + // stability === Hybrid MTP-ngram produces variable seq_lens_this_time + // depending on how many ngram positions hit (range: [num_model_steps+1, + // num_speculative_tokens+1]). cudagraph captures launch params (grid dim, + // kernel args) at capture time; if the captured slt differs from + // replay-time slt, downstream kernels read past valid ranges of cu_seqlens + // / slot_mapping etc., causing CUDA 700. + // + // When pad_to_max=true (cudagraph enabled), force slt = + // num_speculative_tokens+1 = max_draft_tokens + 1: positions beyond actual + // ngram hits get padded with a placeholder token. The target model will + // verify these placeholders and (almost always) reject them, but the verify + // cost is fixed per iteration => grid dim is now invariant. When + // pad_to_max= false (cudagraph disabled), keep the natural variable slt to + // avoid wasting verify compute on placeholders. + if (pad_to_max) { + int target_slt = max_draft_tokens_param + 1; + if (actual < target_slt) { + int64_t *dst = draft_tokens + tid * draft_tokens_stride; + // Reuse the last valid draft token as placeholder. It is a token the + // model could plausibly have produced, so attention math stays + // well-defined; rejection happens at the sampler level. + int64_t pad_token = (actual > 0) ? dst[actual - 1] : 0; + for (int k = actual; k < target_slt; k++) { + dst[k] = pad_token; + } + actual = target_slt; + } + } + + seq_lens_this_time[tid] = actual; } } @@ -228,8 +262,8 @@ static int sum_mixed_cpu(const int *value, int num) { return sum_value; } -static void find_candidate_pred_tokens_mixed(const int64_t *input_ids, - const int64_t *input_ids_len, +static void find_candidate_pred_tokens_mixed(const int64_t *token_ids_all, + const int64_t *prompt_lens, const int64_t *pre_ids, const int64_t *step_idx, const int *draft_token_num, @@ -237,7 +271,7 @@ static void find_candidate_pred_tokens_mixed(const int64_t *input_ids, int32_t *seq_lens_this_time, int32_t *seq_lens_decoder, int64_t *max_dec_len, - int64_t input_ids_stride, + int64_t max_model_len, int64_t pre_ids_stride, int64_t draft_tokens_stride, int64_t max_batch_size, @@ -268,11 +302,12 @@ static void find_candidate_pred_tokens_mixed(const int64_t *input_ids, int max_draft_tokens_query = static_cast(std::min(draft_budget, remaining_dec)); - const int64_t *cur_input_ids = input_ids + batch_idx * input_ids_stride; + const int64_t prompt_len = prompt_lens[batch_idx]; + const int64_t *cur_input_ids = token_ids_all + batch_idx * max_model_len; int64_t *cur_draft_tokens = draft_tokens + batch_idx * draft_tokens_stride; const int64_t *cur_pre_ids = pre_ids + batch_idx * pre_ids_stride; const int64_t cur_step_idx = step_idx[batch_idx]; - const int64_t cur_input_ids_len = input_ids_len[batch_idx]; + const int64_t cur_input_ids_len = prompt_len; unprocessed_batch_size--; auto sum_token_num = sum_mixed_cpu(seq_lens_this_time, batch_idx); @@ -363,8 +398,8 @@ static void find_candidate_pred_tokens_mixed(const int64_t *input_ids, // threshold enforcement + final token copy. // ============================================================ -void HybridMtpNgram(const paddle::Tensor &input_ids, - const paddle::Tensor &input_ids_len, +void HybridMtpNgram(const paddle::Tensor &token_ids_all, + const paddle::Tensor &prompt_lens, const paddle::Tensor &pre_ids, const paddle::Tensor &step_idx, const paddle::Tensor &draft_token_num, @@ -374,9 +409,9 @@ void HybridMtpNgram(const paddle::Tensor &input_ids, const paddle::Tensor &max_dec_len, const int max_ngram_size, const int min_ngram_size, - const int max_draft_tokens) { - auto input_ids_shape = input_ids.shape(); - const int64_t input_ids_stride = input_ids_shape[1]; + const int max_draft_tokens, + const bool pad_to_max) { + const int64_t max_model_len = token_ids_all.shape()[1]; auto pre_ids_shape = pre_ids.shape(); const int64_t pre_ids_stride = pre_ids_shape[1]; @@ -392,8 +427,8 @@ void HybridMtpNgram(const paddle::Tensor &input_ids, threshold = std::stoi(env_var); } - if (input_ids.is_gpu()) { - auto stream = input_ids.stream(); + if (token_ids_all.is_gpu()) { + auto stream = token_ids_all.stream(); // NOTE: GPU path does not pass seq_lens_decoder to kernels — the mixed // variant uses ori_seq_len_this_time == 0 to skip inactive items. This @@ -403,21 +438,28 @@ void HybridMtpNgram(const paddle::Tensor &input_ids, // counts tentative > 0, which is equivalent under this invariant. // Allocate scratch buffers for Phase 1 → Phase 2 communication + static paddle::Tensor s_draft_copy_mixed; + static paddle::Tensor s_seqlens_copy_mixed; + static paddle::Tensor s_seqlens_orig_mixed; + static int64_t s_scratch_batch_mixed = 0; + static int64_t s_scratch_stride_mixed = 0; + + if (max_batch_size > s_scratch_batch_mixed || + draft_tokens_stride > s_scratch_stride_mixed) { + s_draft_copy_mixed = paddle::empty({max_batch_size, draft_tokens_stride}, + paddle::DataType::INT64, + token_ids_all.place()); + s_seqlens_copy_mixed = paddle::empty( + {max_batch_size}, paddle::DataType::INT32, token_ids_all.place()); + s_seqlens_orig_mixed = paddle::empty( + {max_batch_size}, paddle::DataType::INT32, token_ids_all.place()); + s_scratch_batch_mixed = max_batch_size; + s_scratch_stride_mixed = draft_tokens_stride; + } + auto &draft_tokens_copy = s_draft_copy_mixed; + auto &seq_lens_this_time_copy = s_seqlens_copy_mixed; + auto &seq_lens_this_time_orig = s_seqlens_orig_mixed; - // Scratch copy of draft_tokens (Phase 1 writes tentative tokens here) - auto draft_tokens_copy = - paddle::empty({max_batch_size, draft_tokens_stride}, - paddle::DataType::INT64, - input_ids.place()); - - // Scratch copy of seq_lens_this_time (Phase 1 writes tentative counts) - auto seq_lens_this_time_copy = paddle::empty( - {max_batch_size}, paddle::DataType::INT32, input_ids.place()); - - // Save a copy of original seq_lens_this_time for Phase 2 - // (Phase 1 reads from the original, Phase 2 needs ori values) - auto seq_lens_this_time_orig = paddle::empty( - {max_batch_size}, paddle::DataType::INT32, input_ids.place()); cudaMemcpyAsync(seq_lens_this_time_orig.data(), seq_lens_this_time.data(), max_batch_size * sizeof(int32_t), @@ -434,8 +476,8 @@ void HybridMtpNgram(const paddle::Tensor &input_ids, NGRAM_BLOCK_THREADS, 0, stream>>>( - input_ids.data(), - input_ids_len.data(), + token_ids_all.data(), + prompt_lens.data(), pre_ids.data(), step_idx.data(), draft_token_num.data(), @@ -443,7 +485,7 @@ void HybridMtpNgram(const paddle::Tensor &input_ids, max_dec_len.data(), draft_tokens_copy.data(), seq_lens_this_time_copy.data(), - input_ids_stride, + max_model_len, pre_ids_stride, draft_tokens_stride, max_batch_size, @@ -461,11 +503,13 @@ void HybridMtpNgram(const paddle::Tensor &input_ids, const_cast(seq_lens_this_time.data()), draft_tokens_stride, max_batch_size, - threshold); + threshold, + max_draft_tokens, + pad_to_max); } else { find_candidate_pred_tokens_mixed( - input_ids.data(), - input_ids_len.data(), + token_ids_all.data(), + prompt_lens.data(), pre_ids.data(), step_idx.data(), draft_token_num.data(), @@ -473,7 +517,7 @@ void HybridMtpNgram(const paddle::Tensor &input_ids, const_cast(seq_lens_this_time.data()), const_cast(seq_lens_decoder.data()), const_cast(max_dec_len.data()), - input_ids_stride, + max_model_len, pre_ids_stride, draft_tokens_stride, max_batch_size, @@ -484,8 +528,8 @@ void HybridMtpNgram(const paddle::Tensor &input_ids, } PD_BUILD_STATIC_OP(hybrid_mtp_ngram) - .Inputs({"input_ids", - "input_ids_len", + .Inputs({"token_ids_all", + "prompt_lens", "pre_ids", "step_idx", "draft_token_num", @@ -495,7 +539,8 @@ PD_BUILD_STATIC_OP(hybrid_mtp_ngram) "max_dec_len"}) .Attrs({"max_ngram_size: int", "min_ngram_size: int", - "max_draft_tokens: int"}) + "max_draft_tokens: int", + "pad_to_max: bool"}) .Outputs({"draft_tokens_out", "seq_lens_this_time_out"}) .SetKernelFn(PD_KERNEL(HybridMtpNgram)) .SetInplaceMap({{"draft_tokens", "draft_tokens_out"}, diff --git a/custom_ops/gpu_ops/speculate_decoding/ngram_match.cu b/custom_ops/gpu_ops/speculate_decoding/ngram_match.cu index 2c42ad49539..b4fd71c7921 100644 --- a/custom_ops/gpu_ops/speculate_decoding/ngram_match.cu +++ b/custom_ops/gpu_ops/speculate_decoding/ngram_match.cu @@ -138,7 +138,9 @@ __global__ void ngram_match_gather_kernel( int32_t *seq_lens_this_time, int64_t draft_tokens_stride, int64_t max_batch_size, - int threshold) { + int threshold, + int max_draft_tokens_param, + bool pad_to_max) { typedef cub::BlockScan BlockScanInt; __shared__ typename BlockScanInt::TempStorage temp_storage1; __shared__ typename BlockScanInt::TempStorage temp_storage2; @@ -203,9 +205,8 @@ __global__ void ngram_match_gather_kernel( actual = min(tentative, budget); } - seq_lens_this_time[tid] = actual; - - // Copy draft tokens (slots 1..actual-1) from scratch to output + // Copy draft tokens (slots 1..actual-1) from scratch to output FIRST + // (so subsequent padding doesn't overwrite real ngram hits) if (actual > 1) { int64_t *dst = draft_tokens + tid * draft_tokens_stride; const int64_t *src = draft_tokens_copy + tid * draft_tokens_stride; @@ -213,6 +214,31 @@ __global__ void ngram_match_gather_kernel( dst[k] = src[k]; } } + + // === Pad seq_lens_this_time to num_speculative_tokens+1 for cudagraph + // stability === Variable seq_lens_this_time (range [1, + // num_speculative_tokens+1]) clashes with cudagraph's fixed launch params + // captured at warm-up time; downstream kernels read past valid cu_seqlens / + // slot_mapping when replay sees a smaller slt, leading to OOB / CUDA 700. + // When pad_to_max=true (cudagraph enabled), pad missing positions with a + // placeholder so slt is fixed at num_speculative_tokens+1. pad_to_max=false + // skips the padding cost when cudagraph is off. + if (pad_to_max) { + int target_slt = max_draft_tokens_param + 1; + if (actual < target_slt) { + int64_t *dst = draft_tokens + tid * draft_tokens_stride; + // Reuse the last valid draft token as placeholder. It is a token the + // model could plausibly have produced, so attention math stays + // well-defined; rejection happens at the sampler level. + int64_t pad_token = (actual > 0) ? dst[actual - 1] : 0; + for (int k = actual; k < target_slt; k++) { + dst[k] = pad_token; + } + actual = target_slt; + } + } + + seq_lens_this_time[tid] = actual; } } @@ -374,7 +400,8 @@ void NgramMatch(const paddle::Tensor &token_ids_all, const paddle::Tensor &seq_lens_decoder, const paddle::Tensor &max_dec_len, const int max_ngram_size, - const int max_draft_tokens) { + const int max_draft_tokens, + const bool pad_to_max) { const int64_t max_model_len = token_ids_all.shape()[1]; auto draft_tokens_shape = draft_tokens.shape(); @@ -448,7 +475,9 @@ void NgramMatch(const paddle::Tensor &token_ids_all, const_cast(seq_lens_this_time.data()), draft_tokens_stride, max_batch_size, - threshold); + threshold, + max_draft_tokens, + pad_to_max); } else { find_candidate_pred_tokens( token_ids_all.data(), @@ -478,7 +507,7 @@ PD_BUILD_STATIC_OP(ngram_match) "seq_lens_encoder", "seq_lens_decoder", "max_dec_len"}) - .Attrs({"max_ngram_size: int", "max_draft_tokens: int"}) + .Attrs({"max_ngram_size: int", "max_draft_tokens: int", "pad_to_max: bool"}) .Outputs({"draft_tokens_out", "seq_lens_this_time_out"}) .SetKernelFn(PD_KERNEL(NgramMatch)) .SetInplaceMap({{"draft_tokens", "draft_tokens_out"}, diff --git a/fastdeploy/spec_decode/mtp.py b/fastdeploy/spec_decode/mtp.py index 9cf9ee1785a..1d37fb481a3 100644 --- a/fastdeploy/spec_decode/mtp.py +++ b/fastdeploy/spec_decode/mtp.py @@ -497,16 +497,10 @@ def insert_tasks_v1( input_ids = request.prompt_token_ids + request.output_token_ids - self.model_inputs["input_ids_len"][idx] = length - 1 async_set_value(self.model_inputs["pre_ids"][idx : idx + 1], -1) self.model_inputs["input_ids"][idx : idx + 1, : length - 1] = self.target_model_inputs["input_ids"][ idx : idx + 1, 1:length ] - # TODO: use token_all_ids replace with input_ids_cpu - if getattr(self, "hybrid_mode", False) and "input_ids_cpu" in self.model_inputs: - self.model_inputs["input_ids_cpu"][idx : idx + 1, : length - 1] = self.target_model_inputs[ - "input_ids" - ][idx : idx + 1, 1:length].cpu() encoder_block_num = len(request.block_tables) async_set_value(self.model_inputs["encoder_block_lens"][idx : idx + 1], encoder_block_num) async_set_value(self.model_inputs["block_tables"][idx : idx + 1, :], -1) @@ -594,7 +588,6 @@ def insert_prefill_inputs(self, req_dicts: List[Request], num_running_requests: request = req_dicts[i] idx = request.idx length = len(request.prompt_token_ids) - self.model_inputs.input_ids_len[idx] = length - 1 if req_dicts[i].disaggregate_info is not None and req_dicts[i].disaggregate_info["role"] == "decode": length = len(request.prompt_token_ids) @@ -602,9 +595,6 @@ def insert_prefill_inputs(self, req_dicts: List[Request], num_running_requests: self.model_inputs["input_ids"][idx : idx + 1, : length - 1] = self.target_model_inputs[ "input_ids" ][idx : idx + 1, 1:length] - self.model_inputs["input_ids_cpu"][idx : idx + 1, : length - 1] = np.array( - request.prompt_token_ids - )[1:] self.model_inputs["pre_ids"][idx : idx + 1] = request.prompt_token_ids[-1] prefill_token_num = self.max_draft_token_num + 1 self.model_inputs["draft_tokens"][idx : idx + 1, 0:1] = paddle.to_tensor( @@ -633,9 +623,6 @@ def insert_prefill_inputs(self, req_dicts: List[Request], num_running_requests: self.model_inputs["input_ids"][idx : idx + 1, : length - 1] = self.target_model_inputs[ "input_ids" ][idx : idx + 1, 1:length] - self.model_inputs["input_ids_cpu"][idx : idx + 1, : length - 1] = np.array( - request.prompt_token_ids - )[1:] self.model_inputs["pre_ids"][idx : idx + 1] = -1 self.model_inputs["step_idx"][idx : idx + 1] = 0 if self.cache_config.enable_chunked_prefill: diff --git a/fastdeploy/spec_decode/mtp_cuda.py b/fastdeploy/spec_decode/mtp_cuda.py index d40b9f9229e..8eb3ec12a42 100644 --- a/fastdeploy/spec_decode/mtp_cuda.py +++ b/fastdeploy/spec_decode/mtp_cuda.py @@ -399,10 +399,13 @@ def _update_status(self): ) def _extend_draft_token_with_ngram_match(self): - # TODO: replace with gpu tensor + # pad_to_max forces hybrid kernel to write a fixed seq_lens_this_time + # = num_speculative_tokens + 1, padding unfilled ngram slots with a placeholder draft token. + # Required when target cudagraph is enabled (capture-time seq_lens_this_time + # must match replay-time seq_lens_this_time). hybrid_mtp_ngram( - self.model_inputs["input_ids_cpu"].cuda(), - self.model_inputs["input_ids_len"].cuda(), + self.model_inputs["token_ids_all"], + self.model_inputs["prompt_lens"], self.model_inputs["pre_ids"], self.model_inputs["step_idx"], self.target_model_inputs["actual_draft_token_num"], @@ -413,6 +416,7 @@ def _extend_draft_token_with_ngram_match(self): self.max_ngram_size, self.min_ngram_size, self.max_draft_token_num, + self.graph_opt_config.use_cudagraph, ) def padding_cudagraph_inputs(self) -> None: diff --git a/fastdeploy/spec_decode/ngram.py b/fastdeploy/spec_decode/ngram.py index f55d5fe83a9..ec6c5a35851 100644 --- a/fastdeploy/spec_decode/ngram.py +++ b/fastdeploy/spec_decode/ngram.py @@ -39,6 +39,11 @@ def _run_impl(self, share_inputs): """ run """ + # pad_to_max forces the kernel to write a fixed seq_lens_this_time = + # num_speculative_tokens + 1, padding unfilled draft slots with a placeholder token. + # Required when target cudagraph is enabled (capture-time slt must + # match replay-time slt; see ngram_match.cu for details). Disabled + # when cudagraph is off to avoid wasted verify on placeholders. ngram_match( share_inputs["token_ids_all"], share_inputs["prompt_lens"], @@ -51,4 +56,5 @@ def _run_impl(self, share_inputs): share_inputs["max_dec_len"], self.max_ngram_size, self.max_draft_token_num, + self.graph_opt_config.use_cudagraph, ) diff --git a/fastdeploy/worker/input_batch.py b/fastdeploy/worker/input_batch.py index fce8f240728..aa34784b2a5 100644 --- a/fastdeploy/worker/input_batch.py +++ b/fastdeploy/worker/input_batch.py @@ -767,12 +767,6 @@ def init_share_inputs(self): self.block_tables = paddle.clone(self.target_model_input_batch["block_tables"]) self.input_ids = paddle.clone(self.target_model_input_batch["input_ids"]) - self.input_ids_cpu = paddle.full( - shape=[self.scheduler_config.max_num_seqs, self.model_config.max_model_len], - fill_value=-1, - dtype="int64", - device="cpu", - ) self.seq_lens_this_time_buffer = paddle.clone(self.target_model_input_batch["seq_lens_this_time"]) self.seq_lens_encoder = paddle.clone(self.target_model_input_batch["seq_lens_encoder"]) @@ -785,7 +779,7 @@ def init_share_inputs(self): self.cu_seqlens_q_output = paddle.clone(self.target_model_input_batch["cu_seqlens_q_output"]) self.batch_id_per_token_output = paddle.clone(self.target_model_input_batch["batch_id_per_token_output"]) if "token_ids_all" in self.target_model_input_batch: - self.token_ids_all = paddle.clone(self.target_model_input_batch["token_ids_all"]) + self.token_ids_all = self.target_model_input_batch["token_ids_all"] # TODO: delete pre_ids in mtp self.pre_ids = paddle.full( [self.scheduler_config.max_num_seqs, self.model_config.max_model_len], @@ -902,7 +896,6 @@ def init_share_inputs(self): self.last_seq_lens_this_time = paddle.full_like( self.target_model_input_batch["seq_lens_this_time"], fill_value=-1, dtype="int32" ) - self.input_ids_len = paddle.zeros(shape=[self.scheduler_config.max_num_seqs, 1], dtype="int64", device="cpu") self.temp_scaled_logprobs = self.target_model_input_batch["temp_scaled_logprobs"] self.top_p_normalized_logprobs = self.target_model_input_batch["top_p_normalized_logprobs"] self.accept_num = self.target_model_input_batch["accept_num"] @@ -952,14 +945,12 @@ def swap_data(tensor, idx1, idx2): self.index_to_batch_id[i1], self.index_to_batch_id[i2] = self.index_to_batch_id[i2], self.index_to_batch_id[i1] swap_data(self.block_tables, i1, i2) swap_data(self.input_ids, i1, i2) - swap_data(self.input_ids_cpu, i1, i2) swap_data(self.seq_lens_this_time_buffer, i1, i2) swap_data(self.seq_lens_encoder, i1, i2) swap_data(self.seq_lens_decoder, i1, i2) swap_data(self.step_idx, i1, i2) swap_data(self.pre_ids, i1, i2) swap_data(self.encoder_block_lens, i1, i2) - swap_data(self.input_ids_len, i1, i2) swap_data(self.mask_rollback, i1, i2) swap_data(self.recompute_token_num, i1, i2) if self.enable_mm: @@ -982,7 +973,6 @@ def reset_model_inputs(self) -> None: # Clone the target model inputs to restore initial values self.block_tables = paddle.clone(self.target_model_input_batch["block_tables"]) self.input_ids = paddle.clone(self.target_model_input_batch["input_ids"]) - fill_paddle_tensor(self, "input_ids_cpu", -1) # NOTE(fix): Must reset seq_lens_this_time_buffer to avoid stale values from previous # RL round causing illegal memory access during CUDAGraph recapture (error 700). # When draft_model_use_cudagraph=true, padding_cudagraph_inputs() uses the full @@ -1011,7 +1001,7 @@ def reset_model_inputs(self) -> None: shape=[max_num_seqs * (max_draft_token_num + 1)], fill_value=0, dtype="int32" ) if "token_ids_all" in self.target_model_input_batch: - self.token_ids_all = paddle.clone(self.target_model_input_batch["token_ids_all"]) + self.token_ids_all = self.target_model_input_batch["token_ids_all"] # TODO: delete pre_ids in mtp self.pre_ids = paddle.full( [self.scheduler_config.max_num_seqs, self.model_config.max_model_len], @@ -1104,9 +1094,6 @@ def reset_model_inputs(self) -> None: if self.num_model_steps > 1: fill_paddle_tensor(self, "last_seq_lens_this_time", -1) - # Reset input IDs length - fill_paddle_tensor(self, "input_ids_len", 0) - # Reset various scores and flags self.temp_scaled_logprobs = self.target_model_input_batch["temp_scaled_logprobs"] self.top_p_normalized_logprobs = self.target_model_input_batch["top_p_normalized_logprobs"] diff --git a/tests/e2e/test_ernie_21b_mtp_ngram.py b/tests/e2e/test_ernie_21b_mtp_ngram.py new file mode 100644 index 00000000000..e80ba4cf6d8 --- /dev/null +++ b/tests/e2e/test_ernie_21b_mtp_ngram.py @@ -0,0 +1,373 @@ +# Copyright (c) 2026 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import os +import shutil +import signal +import subprocess +import sys +import time + +import pytest +from utils.serving_utils import ( + FD_API_PORT, + FD_CACHE_QUEUE_PORT, + FD_ENGINE_QUEUE_PORT, + FD_METRICS_PORT, + clean, + extract_logprobs, + get_stream_chunks, + is_port_open, + send_request, +) + + +def _build_speculate_metrics_baseline( + accepted_tokens, + rejected_tokens, + accept_ratio, + average_accept_length, + accepted_tokens_per_head, + accept_ratio_per_head, +): + """ + Build a tolerance-based baseline for speculate metrics. + + Integer counters remain strict, while floating-point fields and + per-head metric arrays use approximate comparison to reduce + environment-sensitive test flakiness. + """ + return { + "accepted_tokens": accepted_tokens, + "rejected_tokens": rejected_tokens, + "accept_ratio": pytest.approx(accept_ratio, abs=0.02), + "average_accept_length": pytest.approx(average_accept_length, abs=0.1), + "accepted_tokens_per_head": pytest.approx(accepted_tokens_per_head, abs=2), + "accept_ratio_per_head": pytest.approx(accept_ratio_per_head, abs=0.05), + } + + +BASELINE_SPECULATE_METRICS = _build_speculate_metrics_baseline( + accepted_tokens=100, + rejected_tokens=188, + accept_ratio=0.52, + average_accept_length=2.0833333333333335, + accepted_tokens_per_head=[48, 34, 14, 2, 1, 1], + accept_ratio_per_head=[ + 0.7083333333333334, + 0.4117647058823529, + 0.14285714285714285, + 0.5, + 1.0, + ], +) +BASELINE_SPECULATE_METRICS_WITH_LOGPROBS = _build_speculate_metrics_baseline( + accepted_tokens=100, + rejected_tokens=182, + accept_ratio=0.53, + average_accept_length=2.127659574468085, + accepted_tokens_per_head=[47, 34, 14, 3, 1, 1], + accept_ratio_per_head=[ + 0.723404255319149, + 0.4117647058823529, + 0.21428571428571427, + 0.3333333333333333, + 1.0, + ], +) + + +def _assert_speculate_metrics_match(actual, baseline, label): + """Per-field comparison against a tolerance-based baseline. + + Avoids whole-dict ``==`` so that an AssertionError isn't masked by a + TypeError when json.dumps tries to serialize pytest.approx wrappers in + the failure message. + """ + missing = set(baseline) - set(actual) + extra = set(actual) - set(baseline) + assert not missing and not extra, ( + f"[{label}] speculate_metrics keys mismatch: missing={missing}, extra={extra}, " + f"got_keys={sorted(actual.keys())}" + ) + for key, expected in baseline.items(): + got = actual[key] + assert got == expected, f"[{label}] field '{key}' mismatch:\n" f" got: {got}\n" f" expected: {expected}" + + +@pytest.fixture(scope="session", autouse=True) +def setup_and_run_server(): + """ + Pytest fixture that runs once per test session: + - Cleans ports before tests + - Starts the API server in hybrid MTP-Ngram mode as a subprocess + - Waits for server port to open (up to 5 minutes) + - Tears down server after all tests finish + """ + print("Pre-test port cleanup...") + clean() + + if os.path.exists("log") and os.path.isdir("log"): + shutil.rmtree("log") + + base_path = os.getenv("MODEL_PATH") + if base_path: + model_path = os.path.join(base_path, "ernie-4_5-21b-a3b-bf16-paddle") + else: + model_path = "./ernie-4_5-21b-a3b-bf16-paddle" + mtp_model_path = os.path.join(model_path, "mtp") + + speculative_config = { + "method": "mtp", + "model": mtp_model_path, + "num_speculative_tokens": 5, + "num_model_steps": 3, + "mtp_strategy": "with_ngram", + "max_ngram_size": 3, + "min_ngram_size": 1, + } + + log_path = "server.log" + cmd = [ + sys.executable, + "-m", + "fastdeploy.entrypoints.openai.api_server", + "--model", + model_path, + "--port", + str(FD_API_PORT), + "--tensor-parallel-size", + "1", + "--engine-worker-queue-port", + str(FD_ENGINE_QUEUE_PORT), + "--metrics-port", + str(FD_METRICS_PORT), + "--cache-queue-port", + str(FD_CACHE_QUEUE_PORT), + "--max-model-len", + "32768", + "--max-num-seqs", + "128", + "--quantization", + "wint4", + "--enable-overlap-schedule", + "--enable-logprob", + "--speculative-config", + json.dumps(speculative_config), + "--graph-optimization-config", + '{"use_cudagraph":true, "use_unique_memory_pool":true, "draft_model_use_cudagraph":true}', + ] + + with open(log_path, "w") as logfile: + process = subprocess.Popen( + cmd, + stdout=logfile, + stderr=subprocess.STDOUT, + start_new_session=True, + ) + + for _ in range(300): + if is_port_open("127.0.0.1", FD_API_PORT): + print(f"Server is up on port {FD_API_PORT}") + break + time.sleep(1) + else: + try: + os.killpg(process.pid, signal.SIGTERM) + clean() + except Exception as e: + print(f"Failed to kill process group: {e}") + raise RuntimeError(f"API server did not start on port {FD_API_PORT}") + + yield + + print("\n===== Post-test server cleanup... =====") + try: + os.killpg(process.pid, signal.SIGTERM) + clean() + print(f"server (pid={process.pid}) terminated") + except Exception as e: + print(f"Failed to terminate API server: {e}") + + +@pytest.fixture(scope="session") +def api_url(): + return f"http://0.0.0.0:{FD_API_PORT}/v1/chat/completions" + + +@pytest.fixture(scope="session") +def metrics_url(): + return f"http://0.0.0.0:{FD_METRICS_PORT}/metrics" + + +def test_mtp_ngram_stream(api_url): + """Hybrid MTP-Ngram streaming generation returns non-empty result with valid token counts.""" + payload = { + "model": "default", + "messages": [{"role": "user", "content": "牛顿的三大运动定律是什么?"}], + "max_tokens": 50, + "min_tokens": 10, + "temperature": 0, + "top_p": 0, + "seed": 42, + "stream": True, + "stream_options": {"include_usage": True, "continuous_usage_stats": True}, + } + response = send_request(url=api_url, payload=payload) + chunks = get_stream_chunks(response) + result = "".join(x["choices"][0]["delta"]["content"] for x in chunks[:-1]) + assert result != "", "Generation result is empty" + usage = chunks[-1]["usage"] + assert usage["completion_tokens"] <= payload["max_tokens"] + assert usage["completion_tokens"] >= payload["min_tokens"] + assert usage["total_tokens"] == usage["completion_tokens"] + usage["prompt_tokens"] + + +def test_mtp_ngram_non_stream(api_url): + """Hybrid MTP-Ngram non-streaming generation returns non-empty result with valid token counts.""" + payload = { + "model": "default", + "messages": [{"role": "user", "content": "牛顿的三大运动定律是什么?"}], + "max_tokens": 50, + "min_tokens": 10, + "temperature": 0, + "top_p": 0, + "seed": 42, + "stream": False, + } + response = send_request(url=api_url, payload=payload).json() + result = response["choices"][0]["message"]["content"] + assert result != "", "Generation result is empty" + usage = response["usage"] + assert usage["completion_tokens"] <= payload["max_tokens"] + assert usage["completion_tokens"] >= payload["min_tokens"] + assert usage["total_tokens"] == usage["completion_tokens"] + usage["prompt_tokens"] + + +def test_mtp_ngram_speculate_metrics(api_url): + """speculate_metrics contains the MTP + ngram acceptance stats for a repeated-prompt request.""" + # Prompt with repeated fragments to increase ngram match rate + content = ( + "国外项目风险管理研究起步较早,理论体系成熟。早期研究集中于保险与金融领域,后逐步扩展至工程项目、" + "公共管理等多领域。在理论层面,COSO《企业风险管理——整合框架》和ISO31000标准为风险管理提供了系统性" + "指导,强调风险识别、评估、应对与监控的全流程管理。风险识别方法包括故障树分析、事件树分析等;风险评估" + "则广泛应用VaR模型、蒙特卡洛模拟等量化工具。应对策略涵盖规避、转移、减轻和接受等,并衍生出风险共享、" + "升级等复杂策略。此外,组织文化、管理层支持等因素对风险管理有效性影响显著。近年来,随着科技发展," + "人工智能、大数据等技术被引入风险管理,推动其向智能化、自动化方向发展。请介绍一下国外关于项目风险管理" + "的文献研究综述,300字以内" + ) + payload = { + "model": "default", + "messages": [{"role": "user", "content": content}], + "max_tokens": 100, + "min_tokens": 20, + "temperature": 0, + "top_p": 0, + "seed": 42, + "stream": True, + "stream_options": {"include_usage": True, "continuous_usage_stats": True}, + } + response = send_request(url=api_url, payload=payload) + chunks = get_stream_chunks(response) + # chunks[-1] is the usage chunk; chunks[-2] is the last content chunk containing speculate_metrics + speculate_metrics = chunks[-2]["choices"][0].get("speculate_metrics") + # print(f"\n[test_mtp_ngram_speculate_metrics] speculate_metrics: {json.dumps(speculate_metrics, indent=2)}") + assert speculate_metrics is not None, "speculate_metrics is missing from last chunk" + + # accepted_tokens_per_head length should equal num_speculative_tokens + 1 (head 0 is the verified token). + # num_speculative_tokens=5 → 6 entries. + accepted_per_head = speculate_metrics.get("accepted_tokens_per_head") + assert accepted_per_head is not None, "accepted_tokens_per_head is missing" + assert len(accepted_per_head) == 6, ( + f"Expected 6 entries in accepted_tokens_per_head (num_speculative_tokens=5 + 1), " + f"got {len(accepted_per_head)}: {accepted_per_head}" + ) + # Monotonically non-increasing (each subsequent draft head is harder to accept) + for i in range(len(accepted_per_head) - 1): + assert ( + accepted_per_head[i] >= accepted_per_head[i + 1] + ), f"accepted_tokens_per_head is not monotonically non-increasing at index {i}: {accepted_per_head}" + + # Cross-field consistency: total accepted == sum of per-head + assert speculate_metrics["accepted_tokens"] == sum(accepted_per_head), ( + f"accepted_tokens ({speculate_metrics['accepted_tokens']}) != " + f"sum(accepted_tokens_per_head) ({sum(accepted_per_head)})" + ) + + # Baseline comparison (tolerance-based) against values captured in the reference environment. + if BASELINE_SPECULATE_METRICS is not None: + _assert_speculate_metrics_match( + speculate_metrics, BASELINE_SPECULATE_METRICS, label="test_mtp_ngram_speculate_metrics" + ) + + +def test_mtp_ngram_speculate_metrics_with_logprobs(api_url): + """speculate_metrics and logprobs coexist correctly when hybrid mode + logprobs are both enabled.""" + content = ( + "国外项目风险管理研究起步较早,理论体系成熟。早期研究集中于保险与金融领域,后逐步扩展至工程项目、" + "公共管理等多领域。在理论层面,COSO《企业风险管理——整合框架》和ISO31000标准为风险管理提供了系统性" + "指导,强调风险识别、评估、应对与监控的全流程管理。风险识别方法包括故障树分析、事件树分析等;风险评估" + "则广泛应用VaR模型、蒙特卡洛模拟等量化工具。应对策略涵盖规避、转移、减轻和接受等,并衍生出风险共享、" + "升级等复杂策略。此外,组织文化、管理层支持等因素对风险管理有效性影响显著。近年来,随着科技发展," + "人工智能、大数据等技术被引入风险管理,推动其向智能化、自动化方向发展。请介绍一下国外关于项目风险管理" + "的文献研究综述,300字以内" + ) + payload = { + "model": "default", + "messages": [{"role": "user", "content": content}], + "max_tokens": 100, + "min_tokens": 20, + "temperature": 0, + "top_p": 0, + "seed": 42, + "stream": True, + "stream_options": {"include_usage": True, "continuous_usage_stats": True}, + "logprobs": True, + "top_logprobs": 5, + } + response = send_request(url=api_url, payload=payload) + chunks = get_stream_chunks(response) + + # logprobs are present in each content chunk + logprobs_list = extract_logprobs(chunks) + assert len(logprobs_list) > 0, "No logprobs received" + for logprobs in logprobs_list: + assert "content" in logprobs + for item in logprobs["content"]: + assert "token" in item + assert "logprob" in item + assert "top_logprobs" in item + assert len(item["top_logprobs"]) <= 5 + + # speculate_metrics appears in the last content chunk and is consistent + speculate_metrics = chunks[-2]["choices"][0].get("speculate_metrics") + # print( + # f"\n[test_mtp_ngram_speculate_metrics_with_logprobs] " + # f"speculate_metrics: {json.dumps(speculate_metrics, indent=2)}" + # ) + assert speculate_metrics is not None, "speculate_metrics is missing from last chunk" + + accepted_per_head = speculate_metrics.get("accepted_tokens_per_head") + assert accepted_per_head is not None, "accepted_tokens_per_head is missing" + assert len(accepted_per_head) == 6 + assert speculate_metrics["accepted_tokens"] == sum(accepted_per_head) + + # Baseline comparison (tolerance-based) against values captured in the reference environment. + if BASELINE_SPECULATE_METRICS_WITH_LOGPROBS is not None: + _assert_speculate_metrics_match( + speculate_metrics, + BASELINE_SPECULATE_METRICS_WITH_LOGPROBS, + label="test_mtp_ngram_speculate_metrics_with_logprobs", + ) diff --git a/tests/operators/test_hybrid_mtp_ngram.py b/tests/operators/test_hybrid_mtp_ngram.py index 6c111f93763..4f473a9ec40 100644 --- a/tests/operators/test_hybrid_mtp_ngram.py +++ b/tests/operators/test_hybrid_mtp_ngram.py @@ -26,15 +26,19 @@ class TestNgramMatchMixed(unittest.TestCase): def setUp(self): self.max_bsz = 2 self.max_draft_tokens = 5 - self.max_len = 32 + self.max_model_len = 32 + self.prompt_len = 10 self.max_dec_len = 10 self.max_ngram_size = 5 self.min_ngram_size = 2 - # 初始化输入 tensor - self.input_ids = paddle.full(shape=[self.max_bsz, self.max_len], fill_value=-1, dtype="int64").cpu() - self.input_ids_len = paddle.full(shape=[self.max_bsz, 1], fill_value=-1, dtype="int64").cpu() - self.pre_ids = paddle.full(shape=[self.max_bsz, self.max_len], fill_value=-1, dtype="int64").cpu() + # token_ids_all layout: + # [0 .. prompt_len-1] <- prompt (Phase 1 search source) + # [prompt_len .. ] <- pad (-1) + # pre_ids carries the generated tokens used as Phase 2 search source. + self.token_ids_all = paddle.full(shape=[self.max_bsz, self.max_model_len], fill_value=-1, dtype="int64").cpu() + self.prompt_lens = paddle.full(shape=[self.max_bsz, 1], fill_value=-1, dtype="int64").cpu() + self.pre_ids = paddle.full(shape=[self.max_bsz, self.max_model_len], fill_value=-1, dtype="int64").cpu() self.step_idx = paddle.full(shape=[self.max_bsz, 1], fill_value=-1, dtype="int64").cpu() self.draft_token_num = paddle.full(shape=[self.max_bsz, 1], fill_value=-1, dtype="int32").cpu() self.draft_tokens = paddle.full( @@ -50,9 +54,10 @@ def setUp(self): dtype="int64", ).cpu() - # 设置具体数据 - self.input_ids[:, :10] = np.arange(0, 10) - self.input_ids_len[:] = 10 + # Fill prompt 0..9 into token_ids_all (Phase 1 search source) + self.token_ids_all[:, : self.prompt_len] = np.arange(0, self.prompt_len) + self.prompt_lens[:] = self.prompt_len + pre_ids_np = np.array([10, 9, 8, 7, 6, 10, 9, 8, 7], dtype="int32") self.pre_ids[:, : pre_ids_np.shape[0]] = pre_ids_np self.step_idx[:] = 8 @@ -63,14 +68,17 @@ def setUp(self): self.seq_lens_decoder[:] = 12 self.max_dec_len[:] = 512 - # 期望结果 + # Expected results (unchanged: kernel matching logic is identical; + # only the data source for prompt tokens moved from input_ids to + # token_ids_all[:, :prompt_len]). self.ref_seq_lens_this_time = np.array([[6], [6]], dtype="int32") self.ref_draft_tokens = np.array([[8, 7, 6, 10, 9, 8], [8, 7, 6, 10, 9, 8]], dtype="int64") def test_ngram_match_mixed(self): + """pad_to_max=False: GPU output matches the CPU reference baseline.""" hybrid_mtp_ngram( - self.input_ids, - self.input_ids_len, + self.token_ids_all, + self.prompt_lens, self.pre_ids, self.step_idx, self.draft_token_num, @@ -81,6 +89,7 @@ def test_ngram_match_mixed(self): self.max_ngram_size, self.min_ngram_size, self.max_draft_tokens, + False, ) np.testing.assert_allclose(self.seq_lens_this_time.numpy(), self.ref_seq_lens_this_time) diff --git a/tests/operators/test_ngram_match.py b/tests/operators/test_ngram_match.py index d4f36f51f06..978d65c3acb 100644 --- a/tests/operators/test_ngram_match.py +++ b/tests/operators/test_ngram_match.py @@ -61,6 +61,7 @@ def test_basic_match(self): max_dec_len, 3, 4, + False, # pad_to_max: match unchanged (no-pad) reference behavior ) # Extract non-zero tokens and assert the results. @@ -100,6 +101,7 @@ def test_no_match(self): max_dec_len, 3, 3, + False, # pad_to_max: match unchanged (no-pad) reference behavior ) # No match → should only keep 1 token diff --git a/tests/spec_decode/test_benchmark_ngram_kernel.py b/tests/spec_decode/test_benchmark_ngram_kernel.py index 270f611a0cc..3bc7308b8d7 100644 --- a/tests/spec_decode/test_benchmark_ngram_kernel.py +++ b/tests/spec_decode/test_benchmark_ngram_kernel.py @@ -155,6 +155,7 @@ def _run_gpu(ngram_match_fn, gpu_data): gpu_data["max_dec_len"], MAX_NGRAM_SIZE, MAX_DRAFT_TOKENS, + False, # pad_to_max: benchmark unrelated to cudagraph, measure no-pad cost ) diff --git a/tests/spec_decode/test_ngram_gpu_kernel.py b/tests/spec_decode/test_ngram_gpu_kernel.py index 83c88d78f12..0c16ef1d2c3 100644 --- a/tests/spec_decode/test_ngram_gpu_kernel.py +++ b/tests/spec_decode/test_ngram_gpu_kernel.py @@ -126,8 +126,8 @@ def _cpu_ngram_match( def _cpu_hybrid_mtp_ngram( - input_ids, - input_ids_len, + token_ids_all, + prompt_lens, pre_ids, step_idx, draft_token_num, @@ -145,7 +145,7 @@ def _cpu_hybrid_mtp_ngram( max_dec_len = max_dec_len.ravel() step_idx = step_idx.ravel() draft_token_num = draft_token_num.ravel() - input_ids_len = input_ids_len.ravel() + prompt_lens = prompt_lens.ravel() max_batch_size = seq_lens_this_time.shape[0] unprocessed = sum(1 for b in range(max_batch_size) if seq_lens_decoder[b] > 0) @@ -158,11 +158,11 @@ def _cpu_hybrid_mtp_ngram( if ori_slt == 0 or max_q <= 0: continue - cur_input_ids = input_ids[batch_idx] + cur_input_ids = token_ids_all[batch_idx] cur_draft = draft_tokens[batch_idx] cur_pre = pre_ids[batch_idx] cur_step = int(step_idx[batch_idx]) - cur_ids_len = int(input_ids_len[batch_idx]) + cur_ids_len = int(prompt_lens[batch_idx]) unprocessed -= 1 sum_tok = sum(int(seq_lens_this_time[i]) for i in range(batch_idx + 1)) @@ -263,9 +263,14 @@ def _make_mixed_test_data(batch_size=4, input_len=64, pre_ids_len=256, max_draft """Create realistic test tensors for hybrid_mtp_ngram op.""" rng = np.random.RandomState(seed) vocab_size = 1000 + # token_ids_all must be at least as large as the per-request budget; + # use pre_ids_len as a stand-in for max_model_len in tests. + max_model_len = max(pre_ids_len, input_len + 64) - input_ids = rng.randint(0, vocab_size, (batch_size, input_len)).astype(np.int64) - input_ids_len = np.full((batch_size, 1), input_len, dtype=np.int64) + prompt_tokens = rng.randint(0, vocab_size, (batch_size, input_len)).astype(np.int64) + token_ids_all = np.full((batch_size, max_model_len), -1, dtype=np.int64) + token_ids_all[:, :input_len] = prompt_tokens + prompt_lens = np.full((batch_size, 1), input_len, dtype=np.int64) pre_ids = np.zeros((batch_size, pre_ids_len), dtype=np.int64) step_idx = np.zeros((batch_size, 1), dtype=np.int64) @@ -280,13 +285,13 @@ def _make_mixed_test_data(batch_size=4, input_len=64, pre_ids_len=256, max_draft # Copy contiguous blocks from prompt to guarantee ngram matches gen_len = 20 src = rng.randint(0, max(1, input_len - gen_len)) - pre_ids[b, :gen_len] = input_ids[b, src : src + gen_len] - # step_idx = last valid position (0-based index), matches hybrid kernel semantics + pre_ids[b, :gen_len] = prompt_tokens[b, src : src + gen_len] + # step_idx = last valid position (0-based index) step_idx[b] = gen_len - 1 return { - "input_ids": input_ids, - "input_ids_len": input_ids_len, + "token_ids_all": token_ids_all, + "prompt_lens": prompt_lens, "pre_ids": pre_ids, "step_idx": step_idx, "draft_token_num": draft_token_num, @@ -360,6 +365,7 @@ def test_correctness_basic(self): gpu_data["max_dec_len"], max_ngram_size, max_draft_tokens, + False, ) paddle.device.synchronize() @@ -404,6 +410,7 @@ def test_correctness_varied_seeds(self): gpu_data["max_dec_len"], 3, 10, + False, ) paddle.device.synchronize() np.testing.assert_array_equal(gpu_data["seq_lens_this_time"].numpy(), cpu_slt) @@ -451,6 +458,7 @@ def test_large_batch_long_seq(self): gpu_data["max_dec_len"], 3, 10, + False, ) paddle.device.synchronize() finally: @@ -494,6 +502,7 @@ def test_single_batch_long_seq(self): gpu_data["max_dec_len"], 3, 10, + False, ) paddle.device.synchronize() np.testing.assert_array_equal(gpu_data["seq_lens_this_time"].numpy(), cpu_slt) @@ -537,6 +546,7 @@ def test_many_short_seqs(self): gpu_data["max_dec_len"], 3, 10, + False, ) paddle.device.synchronize() finally: @@ -564,6 +574,7 @@ def test_latency(self): d["max_dec_len"], 3, 10, + False, ) paddle.device.synchronize() @@ -586,6 +597,7 @@ def test_latency(self): gpu_data["max_dec_len"], 3, 10, + False, ) paddle.device.synchronize() t1 = time.perf_counter() @@ -636,6 +648,7 @@ def test_latency_scaling(self): gpu_data["max_dec_len"], 3, 10, + False, ) paddle.device.synchronize() @@ -655,6 +668,7 @@ def test_latency_scaling(self): gpu_data["max_dec_len"], 3, 10, + False, ) paddle.device.synchronize() gpu_ms = (time.perf_counter() - t0) / n_runs * 1000 @@ -737,6 +751,7 @@ def test_latency_extreme(self): gpu_data["max_dec_len"], 3, 10, + False, ) paddle.device.synchronize() @@ -756,6 +771,7 @@ def test_latency_extreme(self): gpu_data["max_dec_len"], 3, 10, + False, ) paddle.device.synchronize() t1 = time.perf_counter() @@ -813,8 +829,8 @@ def test_correctness_basic(self): cpu_draft = data["draft_tokens"].copy() cpu_slt = data["seq_lens_this_time"].copy() _cpu_hybrid_mtp_ngram( - data["input_ids"], - data["input_ids_len"], + data["token_ids_all"], + data["prompt_lens"], data["pre_ids"], data["step_idx"], data["draft_token_num"], @@ -829,8 +845,8 @@ def test_correctness_basic(self): gpu_data = _to_gpu(data) self.hybrid_mtp_ngram( - gpu_data["input_ids"], - gpu_data["input_ids_len"], + gpu_data["token_ids_all"], + gpu_data["prompt_lens"], gpu_data["pre_ids"], gpu_data["step_idx"], gpu_data["draft_token_num"], @@ -841,6 +857,7 @@ def test_correctness_basic(self): max_ngram_size, min_ngram_size, max_draft_tokens, + False, ) paddle.device.synchronize() @@ -857,8 +874,8 @@ def test_correctness_varied_seeds(self): cpu_draft = data["draft_tokens"].copy() cpu_slt = data["seq_lens_this_time"].copy() _cpu_hybrid_mtp_ngram( - data["input_ids"], - data["input_ids_len"], + data["token_ids_all"], + data["prompt_lens"], data["pre_ids"], data["step_idx"], data["draft_token_num"], @@ -872,8 +889,8 @@ def test_correctness_varied_seeds(self): ) gpu_data = _to_gpu(data) self.hybrid_mtp_ngram( - gpu_data["input_ids"], - gpu_data["input_ids_len"], + gpu_data["token_ids_all"], + gpu_data["prompt_lens"], gpu_data["pre_ids"], gpu_data["step_idx"], gpu_data["draft_token_num"], @@ -884,6 +901,7 @@ def test_correctness_varied_seeds(self): 3, 1, 10, + False, ) paddle.device.synchronize() np.testing.assert_array_equal(gpu_data["seq_lens_this_time"].numpy(), cpu_slt) @@ -900,8 +918,8 @@ def test_large_batch_long_seq(self): cpu_draft = data["draft_tokens"].copy() cpu_slt = data["seq_lens_this_time"].copy() _cpu_hybrid_mtp_ngram( - data["input_ids"], - data["input_ids_len"], + data["token_ids_all"], + data["prompt_lens"], data["pre_ids"], data["step_idx"], data["draft_token_num"], @@ -919,8 +937,8 @@ def test_large_batch_long_seq(self): os.environ["SPEC_TOKENUM_THRESHOLD"] = str(high_threshold) try: self.hybrid_mtp_ngram( - gpu_data["input_ids"], - gpu_data["input_ids_len"], + gpu_data["token_ids_all"], + gpu_data["prompt_lens"], gpu_data["pre_ids"], gpu_data["step_idx"], gpu_data["draft_token_num"], @@ -931,6 +949,7 @@ def test_large_batch_long_seq(self): 3, 1, 10, + False, ) paddle.device.synchronize() finally: @@ -947,8 +966,8 @@ def test_single_batch_long_seq(self): cpu_draft = data["draft_tokens"].copy() cpu_slt = data["seq_lens_this_time"].copy() _cpu_hybrid_mtp_ngram( - data["input_ids"], - data["input_ids_len"], + data["token_ids_all"], + data["prompt_lens"], data["pre_ids"], data["step_idx"], data["draft_token_num"], @@ -962,8 +981,8 @@ def test_single_batch_long_seq(self): ) gpu_data = _to_gpu(data) self.hybrid_mtp_ngram( - gpu_data["input_ids"], - gpu_data["input_ids_len"], + gpu_data["token_ids_all"], + gpu_data["prompt_lens"], gpu_data["pre_ids"], gpu_data["step_idx"], gpu_data["draft_token_num"], @@ -974,6 +993,7 @@ def test_single_batch_long_seq(self): 3, 1, 10, + False, ) paddle.device.synchronize() np.testing.assert_array_equal(gpu_data["seq_lens_this_time"].numpy(), cpu_slt) @@ -986,8 +1006,8 @@ def test_many_short_seqs(self): cpu_draft = data["draft_tokens"].copy() cpu_slt = data["seq_lens_this_time"].copy() _cpu_hybrid_mtp_ngram( - data["input_ids"], - data["input_ids_len"], + data["token_ids_all"], + data["prompt_lens"], data["pre_ids"], data["step_idx"], data["draft_token_num"], @@ -1005,8 +1025,8 @@ def test_many_short_seqs(self): os.environ["SPEC_TOKENUM_THRESHOLD"] = str(high_threshold) try: self.hybrid_mtp_ngram( - gpu_data["input_ids"], - gpu_data["input_ids_len"], + gpu_data["token_ids_all"], + gpu_data["prompt_lens"], gpu_data["pre_ids"], gpu_data["step_idx"], gpu_data["draft_token_num"], @@ -1017,6 +1037,7 @@ def test_many_short_seqs(self): 3, 1, 10, + False, ) paddle.device.synchronize() finally: @@ -1027,6 +1048,60 @@ def test_many_short_seqs(self): np.testing.assert_array_equal(gpu_data["seq_lens_this_time"].numpy(), cpu_slt) np.testing.assert_array_equal(gpu_data["draft_tokens"].numpy(), cpu_draft) + def test_pad_to_max(self): + """pad_to_max=True forces seq_lens_this_time to K+1 and fills unused + draft slots with a placeholder (= last valid draft token). + + Exercises the cudagraph-stability path: even when ngram has 0 hits, + slt must end up at max_draft_tokens + 1 so capture-time and replay- + time launch params match. + """ + # 8 batches with step_idx=0 → search short-circuits, no ngram match. + # ori_seq_len_this_time = 1 (the verified token only). Without pad, + # slt stays at 1; with pad_to_max=True it must be K+1. + data = _make_mixed_test_data(batch_size=8, seed=123) + data["step_idx"][:] = 0 # force no-match path + # Seed the verified token at position 0 of every draft_tokens row + # so we can identify the placeholder unambiguously. + data["draft_tokens"][:, 0] = 999 + data["seq_lens_this_time"][:] = 1 + + gpu_data = _to_gpu(data) + max_draft_tokens = 10 + self.hybrid_mtp_ngram( + gpu_data["token_ids_all"], + gpu_data["prompt_lens"], + gpu_data["pre_ids"], + gpu_data["step_idx"], + gpu_data["draft_token_num"], + gpu_data["draft_tokens"], + gpu_data["seq_lens_this_time"], + gpu_data["seq_lens_decoder"], + gpu_data["max_dec_len"], + 3, + 1, + max_draft_tokens, + True, # pad_to_max + ) + paddle.device.synchronize() + + target_slt = max_draft_tokens + 1 + slt = gpu_data["seq_lens_this_time"].numpy() + np.testing.assert_array_equal( + slt, + np.full_like(slt, target_slt), + err_msg=f"expected all slt == {target_slt} after pad, got {slt.tolist()}", + ) + + # ori was 1 so positions [1..K+1) should all hold the placeholder = + # draft_tokens[0] = 999. + drafts = gpu_data["draft_tokens"].numpy() + np.testing.assert_array_equal( + drafts[:, 1:target_slt], + np.full((drafts.shape[0], target_slt - 1), 999, dtype=drafts.dtype), + err_msg="padded slots should be filled with the placeholder (last valid draft token)", + ) + if __name__ == "__main__": unittest.main(verbosity=2) diff --git a/tests/spec_decode/test_ngram_proposer.py b/tests/spec_decode/test_ngram_proposer.py index cf54e5f28a5..001fb3b315c 100644 --- a/tests/spec_decode/test_ngram_proposer.py +++ b/tests/spec_decode/test_ngram_proposer.py @@ -46,6 +46,7 @@ def setUp(self): fd_config.speculative_config.max_ngram_size = 3 fd_config.speculative_config.min_ngram_size = 1 fd_config.scheduler_config.max_num_seqs = 2 + fd_config.graph_opt_config.use_cudagraph = False self.fd_config = fd_config bsz = fd_config.scheduler_config.max_num_seqs @@ -177,6 +178,29 @@ def test_run_with_match_respects_max_dec_len(self): slt = self.share_inputs["seq_lens_this_time"].numpy() np.testing.assert_array_equal(slt, [1, 1], err_msg="No drafts expected when max_dec_len budget exhausted") + # Pad-to-max path (cudagraph compatibility) + def test_run_pad_to_max_when_cudagraph_enabled(self): + """With use_cudagraph=True, NgramProposer passes pad_to_max=True to + the kernel; even with no ngram match, seq_lens_this_time is forced + to num_speculative_tokens + 1 so cudagraph capture/replay shapes + stay consistent. + """ + self.fd_config.graph_opt_config.use_cudagraph = True + proposer = NgramProposer(self.fd_config) + # No-match scenario (same as test_run_no_proposal_step_idx_zero). + self.share_inputs["step_idx"][:] = 0 + + proposer.run(self.share_inputs) + paddle.device.synchronize() + + target_slt = self.fd_config.speculative_config.num_speculative_tokens + 1 + slt = self.share_inputs["seq_lens_this_time"].numpy() + np.testing.assert_array_equal( + slt, + np.full_like(slt, target_slt), + err_msg=f"pad_to_max should force slt to {target_slt}, got {slt.tolist()}", + ) + if __name__ == "__main__": unittest.main(verbosity=2) diff --git a/tests/worker/test_reorder_split_prefill_and_decode.py b/tests/worker/test_reorder_split_prefill_and_decode.py index feb69c858c7..0e6c8546b05 100644 --- a/tests/worker/test_reorder_split_prefill_and_decode.py +++ b/tests/worker/test_reorder_split_prefill_and_decode.py @@ -1,4 +1,4 @@ -from unittest.mock import Mock +from unittest.mock import Mock, patch import paddle import pytest @@ -12,7 +12,11 @@ SpeculativeConfig, StructuredOutputsConfig, ) -from fastdeploy.worker.input_batch import InputBatch, reorder_split_prefill_and_decode +from fastdeploy.worker.input_batch import ( + InputBatch, + ProposerInputBatch, + reorder_split_prefill_and_decode, +) def create_mock_config(): @@ -61,6 +65,7 @@ def create_mock_config(): scheduler_config = Mock(spec=SchedulerConfig) scheduler_config.max_num_seqs = 10 scheduler_config.max_num_batched_tokens = 2048 + scheduler_config.max_extra_num_batched_tokens = 0 speculative_config = Mock(spec=SpeculativeConfig) speculative_config.method = None @@ -315,5 +320,119 @@ def test_reorder_all_prefill(self): assert paddle.equal_all(input_batch.input_ids[i], original_data[i]) +class TestProposerInputBatchReset: + """Cover ProposerInputBatch.reset_model_inputs CUDA + token_ids_all branch + (fastdeploy/worker/input_batch.py:972-985).""" + + def _make_config(self): + # Enable spec_decoding path so InputBatch.init_share_inputs allocates + # cu_seqlens_q_output / draft_tokens / accept_num etc. + fd_config = create_mock_config() + fd_config.speculative_config.method = "mtp" + fd_config.speculative_config.num_speculative_tokens = 1 + fd_config.speculative_config.num_model_steps = 1 + return fd_config + + def _make_target(self, fd_config): + target = InputBatch(fd_config) + target.init_share_inputs() + return target + + def _make_proposer(self, fd_config, target): + """Construct a ProposerInputBatch and manually populate only the + attributes that `reset_model_inputs` writes via `fill_paddle_tensor`. + Skipping full `init_share_inputs` avoids depending on rope_emb, + attention backends, and other heavy setup unrelated to the branch + under test.""" + proposer = ProposerInputBatch(fd_config, target) + + max_num_seqs = fd_config.scheduler_config.max_num_seqs + hidden_size = fd_config.model_config.hidden_size + max_draft_token_num = fd_config.speculative_config.num_speculative_tokens + + proposer.target_hidden_states = paddle.full([max_num_seqs, hidden_size], 0, dtype="bfloat16") + proposer.draft_tokens = paddle.full([max_num_seqs, max_draft_token_num + 1], -1, dtype="int64") + proposer.is_block_step = paddle.full([max_num_seqs, 1], False, dtype="bool") + proposer.batch_drop = paddle.full([max_num_seqs, 1], False, dtype="bool") + proposer.used_list_len = paddle.full([max_num_seqs], 0, dtype="int32") + proposer.first_token_hidden_states = paddle.full([max_num_seqs, hidden_size], -1) + proposer.batch_token_num = paddle.full([max_num_seqs], 0, dtype="int32") + proposer.next_token_num = paddle.full([max_num_seqs], 0, dtype="int32") + proposer.cu_batch_token_offset = paddle.full([max_num_seqs + 1], 0, dtype="int32") + proposer.cu_next_token_offset = paddle.full([max_num_seqs + 1], 0, dtype="int32") + proposer.mask_rollback = paddle.full([max_num_seqs, 1], 0, dtype="int32") + proposer.recompute_token_num = paddle.full([max_num_seqs, 1], 0, dtype="int32") + return proposer + + @patch("fastdeploy.worker.input_batch.current_platform") + def test_reset_rebinds_token_ids_all_on_cuda(self, mock_platform): + """When current_platform.is_cuda() and target has token_ids_all, + reset_model_inputs must re-pull token_ids_all from target (line 973) + and rebuild pre_ids from target.token_ids_all[bs_idx, prompt_len:].""" + mock_platform.is_cuda.return_value = True + mock_platform.is_xpu.return_value = False + + fd_config = self._make_config() + target = self._make_target(fd_config) + proposer = self._make_proposer(fd_config, target) + + max_num_seqs = fd_config.scheduler_config.max_num_seqs + max_model_len = fd_config.model_config.max_model_len + + # Rebind target.token_ids_all to a NEW tensor with known content so + # we can distinguish "reset re-pulled it" from "init_share_inputs + # already bound to the old reference". + new_token_ids_all = paddle.arange(max_num_seqs * max_model_len, dtype="int64").reshape( + [max_num_seqs, max_model_len] + ) + target.token_ids_all = new_token_ids_all + + # Set a non-zero prompt_len so the slice [:, prompt_len:] is non-trivial. + prompt_len_value = 3 + target.prompt_lens = paddle.full([max_num_seqs, 1], prompt_len_value, dtype="int64") + + proposer.reset_model_inputs() + + # Line 973 effect: token_ids_all rebound to target's new tensor. + assert proposer.token_ids_all is new_token_ids_all + + # Line 975-985 effect: pre_ids has correct shape and the prefix is + # token_ids_all[bs_idx, prompt_len:], suffix remains -1. + assert proposer.pre_ids.shape == [max_num_seqs, max_model_len] + valid_len = max_model_len - prompt_len_value + expected_prefix = new_token_ids_all[:, prompt_len_value:] + assert paddle.equal_all(proposer.pre_ids[:, :valid_len], expected_prefix) + assert paddle.equal_all( + proposer.pre_ids[:, valid_len:], + paddle.full([max_num_seqs, prompt_len_value], -1, dtype="int64"), + ) + + @patch("fastdeploy.worker.input_batch.current_platform") + def test_reset_falls_back_to_pre_ids_clone_when_no_token_ids_all(self, mock_platform): + """When current_platform.is_cuda() but target lacks token_ids_all, + reset_model_inputs takes the else branch (line 986-988): clone pre_ids, + set token_ids_all to None.""" + mock_platform.is_cuda.return_value = True + mock_platform.is_xpu.return_value = False + + fd_config = self._make_config() + target = self._make_target(fd_config) + proposer = self._make_proposer(fd_config, target) + + # Remove token_ids_all from target so the else branch fires. + del target.token_ids_all + # Provide a recognizable pre_ids on target. + max_num_seqs = fd_config.scheduler_config.max_num_seqs + max_model_len = fd_config.model_config.max_model_len + target.pre_ids = paddle.full([max_num_seqs, max_model_len], 42, dtype="int64") + + proposer.reset_model_inputs() + + assert proposer.token_ids_all is None + assert paddle.equal_all(proposer.pre_ids, paddle.full([max_num_seqs, max_model_len], 42, dtype="int64")) + # Clone, not reference share. + assert proposer.pre_ids is not target.pre_ids + + if __name__ == "__main__": pytest.main([__file__, "-v"])