diff --git a/apps/code/src/renderer/features/sessions/service/service.test.ts b/apps/code/src/renderer/features/sessions/service/service.test.ts index 83fbe1c40..26b36454b 100644 --- a/apps/code/src/renderer/features/sessions/service/service.test.ts +++ b/apps/code/src/renderer/features/sessions/service/service.test.ts @@ -58,7 +58,7 @@ const mockSessionStoreSetters = vi.hoisted(() => ({ removeSession: vi.fn(), updateSession: vi.fn(), appendEvents: vi.fn(), - enqueueMessage: vi.fn(), + enqueueMessage: vi.fn(() => "queue-id-default"), removeQueuedMessage: vi.fn(), clearMessageQueue: vi.fn(), dequeueMessagesAsText: vi.fn(() => null), @@ -70,6 +70,7 @@ const mockSessionStoreSetters = vi.hoisted(() => ({ appendOptimisticItem: vi.fn(), clearOptimisticItems: vi.fn(), replaceOptimisticWithEvent: vi.fn(), + updateCloudStatus: vi.fn(), })); const mockGetConfigOptionByCategory = vi.hoisted(() => @@ -915,6 +916,68 @@ describe("SessionService", () => { }); }); + describe("handleCloudTaskUpdate terminal-status routing", () => { + it("dequeues queued messages instead of clearing them when the run reaches a terminal status", async () => { + const service = getSessionService(); + let capturedOnData: + | ((payload: Record) => void) + | undefined; + mockTrpcCloudTask.onUpdate.subscribe.mockImplementation( + ( + _input: unknown, + opts: { onData: (p: Record) => void }, + ) => { + capturedOnData = opts.onData; + return { unsubscribe: vi.fn() }; + }, + ); + + const queuedMessage = { + id: "queue-1", + content: "gimme a joke", + queuedAt: Date.now(), + }; + const session = createMockSession({ + isCloud: true, + cloudStatus: "in_progress", + cloudBranch: "main", + messageQueue: [queuedMessage], + }); + mockSessionStoreSetters.getSessionByTaskId.mockReturnValue(session); + mockSessionStoreSetters.getSessions.mockReturnValue({ + "run-123": session, + }); + mockSessionStoreSetters.dequeueMessages.mockReturnValue([ + queuedMessage, + ] as never); + + service.watchCloudTask( + "task-123", + "run-123", + "https://api.anthropic.com", + 123, + ); + + expect(capturedOnData).toBeDefined(); + capturedOnData?.({ + kind: "status", + taskId: "task-123", + status: "completed", + stage: undefined, + output: undefined, + errorMessage: undefined, + branch: "main", + }); + + // Queue is drained via dequeueMessages (so resumeCloudRun can replay it), + // not silently dropped via clearMessageQueue. + expect(mockSessionStoreSetters.dequeueMessages).toHaveBeenCalledWith( + "task-123", + ); + expect(mockSessionStoreSetters.clearMessageQueue).not.toHaveBeenCalled(); + }); + }); + describe("reset", () => { it("clears connecting tasks", () => { const service = getSessionService(); @@ -1050,7 +1113,7 @@ describe("SessionService", () => { ); }); - it("preserves cloud attachment prompts when queueing a follow-up", async () => { + it("queues a cloud follow-up locally without dispatching while a prior turn is in flight", async () => { const service = getSessionService(); mockSessionStoreSetters.getSessionByTaskId.mockReturnValue( createMockSession({ @@ -1060,24 +1123,22 @@ describe("SessionService", () => { }), ); - const prompt: ContentBlock[] = [ - { type: "text", text: "read this" }, - { - type: "resource_link", - uri: "file:///tmp/test.txt", - name: "test.txt", - mimeType: "text/plain", - }, - ]; - - const result = await service.sendPrompt("task-123", prompt); + const result = await service.sendPrompt("task-123", "Hello cloud"); expect(result.stopReason).toBe("queued"); + // Held in the local messageQueue (queued bubble visible). expect(mockSessionStoreSetters.enqueueMessage).toHaveBeenCalledWith( "task-123", - "read this\n\nAttached files: test.txt", - prompt, + "Hello cloud", + "Hello cloud", ); + // user_message during a running turn would preempt the prior turn on + // the cloud — handleCloudTaskUpdate dispatches via the auto-flush + // once end_turn lands. + expect(mockTrpcCloudTask.sendCommand.mutate).not.toHaveBeenCalled(); + expect( + mockSessionStoreSetters.appendOptimisticItem, + ).not.toHaveBeenCalled(); }); it("sends prompt via tRPC when session is ready", async () => { diff --git a/apps/code/src/renderer/features/sessions/service/service.ts b/apps/code/src/renderer/features/sessions/service/service.ts index 5262ca607..175940115 100644 --- a/apps/code/src/renderer/features/sessions/service/service.ts +++ b/apps/code/src/renderer/features/sessions/service/service.ts @@ -92,6 +92,23 @@ const LOCAL_SESSION_RECOVERY_MESSAGE = const LOCAL_SESSION_RECOVERY_FAILED_MESSAGE = "Connecting to to the agent has been lost. Retry, or start a new session."; +function isUserPromptEcho(acpMsg: AcpMessage): boolean { + return ( + isJsonRpcRequest(acpMsg.message) && + acpMsg.message.method === "session/prompt" + ); +} + +function extractOptimisticUserMessage(session: AgentSession): string | null { + const parts: string[] = []; + for (const item of session.optimisticItems) { + if (item.type === "user_message" && item.content.length > 0) { + parts.push(item.content); + } + } + return parts.length > 0 ? parts.join("\n\n") : null; +} + /** * Build default configOptions for cloud sessions so the mode switcher * is available in the UI even without a local agent connection. @@ -1457,7 +1474,6 @@ export class SessionService { private async sendCloudPrompt( session: AgentSession, prompt: string | ContentBlock[], - options?: { skipQueueGuard?: boolean }, ): Promise<{ stopReason: string }> { const transport = getCloudPromptTransport(prompt); if (!transport.messageText && transport.filePaths.length === 0) { @@ -1469,7 +1485,14 @@ export class SessionService { } if (session.cloudStatus !== "in_progress") { - sessionStoreSetters.enqueueMessage(session.taskId, transport.promptText); + // Sandbox isn't accepting commands yet — keep the message in a local + // queue and let the in_progress handler in handleCloudTaskUpdate flush + // it once the run is ready. + sessionStoreSetters.enqueueMessage( + session.taskId, + transport.promptText, + prompt, + ); sessionStoreSetters.updateSession(session.taskRunId, { isPromptPending: true, }); @@ -1480,13 +1503,19 @@ export class SessionService { return { stopReason: "queued" }; } - if (!options?.skipQueueGuard && session.isPromptPending) { + // user_message commands sent during an in-flight turn preempt the prior + // turn on the cloud, breaking the response that's still being generated. + // Match the local agent: hold the follow-up locally as a queued bubble + // until the current turn's end_turn lands. handleCloudTaskUpdate's + // auto-flush re-enters this function with priorTurnInFlight=false once + // the agent is idle, and only then do we fire the user_message mutate. + if (session.isPromptPending) { sessionStoreSetters.enqueueMessage( session.taskId, transport.promptText, prompt, ); - log.info("Cloud message queued", { + log.info("Cloud message queued (prior turn in flight)", { taskId: session.taskId, queueLength: session.messageQueue.length + 1, }); @@ -1517,6 +1546,11 @@ export class SessionService { sessionStoreSetters.updateSession(session.taskRunId, { isPromptPending: true, }); + sessionStoreSetters.appendOptimisticItem(session.taskRunId, { + type: "user_message", + content: transport.promptText, + timestamp: Date.now(), + }); track(ANALYTICS_EVENTS.PROMPT_SENT, { task_id: session.taskId, @@ -1535,10 +1569,6 @@ export class SessionService { params, }); - sessionStoreSetters.updateSession(session.taskRunId, { - isPromptPending: false, - }); - if (!result.success) { throw new Error(result.error ?? "Failed to send cloud command"); } @@ -1546,94 +1576,30 @@ export class SessionService { const stopReason = (result.result as { stopReason?: string })?.stopReason ?? "end_turn"; - const freshSession = sessionStoreSetters.getSessionByTaskId( - session.taskId, - ); - if (freshSession && freshSession.messageQueue.length > 0) { - setTimeout(() => { - this.sendQueuedCloudMessages(session.taskId).catch((err) => { - log.error("Failed to send queued cloud messages", { - taskId: session.taskId, - error: err, - }); - }); - }, 0); - } - return { stopReason }; } catch (error) { - sessionStoreSetters.updateSession(session.taskRunId, { - isPromptPending: false, - }); - throw error; - } - } - - private async sendQueuedCloudMessages( - taskId: string, - attempt = 0, - pendingPrompt?: string | ContentBlock[], - ): Promise<{ stopReason: string }> { - // First attempt: atomically dequeue. Retries reuse the already-dequeued prompt. - const combinedPrompt = - pendingPrompt ?? - combineQueuedCloudPrompts(sessionStoreSetters.dequeueMessages(taskId)); - if (!combinedPrompt) return { stopReason: "skipped" }; - - const session = sessionStoreSetters.getSessionByTaskId(taskId); - if (!session) { - log.warn("No session found for queued cloud messages, message lost", { - taskId, - }); - return { stopReason: "no_session" }; - } - - log.info("Sending queued cloud messages", { - taskId, - promptLength: combinedPrompt.length, - attempt, - }); - - try { - return await this.sendCloudPrompt(session, combinedPrompt, { - skipQueueGuard: true, + // user_message commands can fail when the cloud's run is in a + // wind-down state — sometimes detectable via cloudStatus, sometimes + // not (the rejection arrives before the status update does). Either + // way the right recovery is resumeCloudRun, which spins up a fresh + // run inheriting the prior conversation state and carrying our + // prompt as `pending_user_message`. + const fresh = + sessionStoreSetters.getSessionByTaskId(session.taskId) ?? session; + log.warn("Cloud user_message failed; falling back to resume", { + taskId: session.taskId, + cloudStatus: fresh.cloudStatus, + error: String(error), }); - } catch (error) { - const maxRetries = 5; - if (attempt < maxRetries) { - const delayMs = Math.min(1000 * 2 ** attempt, 10_000); - log.warn("Cloud message send failed, scheduling retry", { - taskId, - attempt, - delayMs, - error: String(error), - }); - return new Promise((resolve) => { - setTimeout(() => { - resolve( - this.sendQueuedCloudMessages( - taskId, - attempt + 1, - combinedPrompt, - ).catch((err) => { - log.error("Queued cloud message retry failed", { - taskId, - attempt: attempt + 1, - error: err, - }); - return { stopReason: "error" }; - }), - ); - }, delayMs); + sessionStoreSetters.clearOptimisticItems(session.taskRunId); + try { + return await this.resumeCloudRun(fresh, prompt); + } catch (resumeError) { + sessionStoreSetters.updateSession(session.taskRunId, { + isPromptPending: false, }); + throw resumeError; } - - log.error("Queued cloud message send failed after max retries", { - taskId, - attempts: attempt + 1, - }); - toast.error("Failed to send follow-up message. Please try again."); - return { stopReason: "error" }; } } @@ -2890,26 +2856,24 @@ export class SessionService { if (delta <= 0) { // Already caught up — skip duplicate entries - } else if (delta <= update.newEntries.length) { - // Normal case: append only the tail (last `delta` entries) - const entriesToAppend = update.newEntries.slice(-delta); - let newEvents = convertStoredEntriesToEvents(entriesToAppend); - newEvents = this.filterSkippedPromptEvents( - taskRunId, - session, - newEvents, - ); - sessionStoreSetters.appendEvents(taskRunId, newEvents, expectedCount); - this.updatePromptStateFromEvents(taskRunId, newEvents); } else { - // Gap in data — append everything we have but don't jump processedLineCount - log.warn("Cloud task log count inconsistency", { - taskRunId, - currentCount, - expectedCount, - entriesReceived: update.newEntries.length, - }); - let newEvents = convertStoredEntriesToEvents(update.newEntries); + const isGap = delta > update.newEntries.length; + const entriesToAppend = isGap + ? update.newEntries + : update.newEntries.slice(-delta); + const nextProcessedCount = isGap + ? currentCount + update.newEntries.length + : expectedCount; + if (isGap) { + log.warn("Cloud task log count inconsistency", { + taskRunId, + currentCount, + expectedCount, + entriesReceived: update.newEntries.length, + }); + } + + let newEvents = convertStoredEntriesToEvents(entriesToAppend); newEvents = this.filterSkippedPromptEvents( taskRunId, session, @@ -2918,25 +2882,55 @@ export class SessionService { sessionStoreSetters.appendEvents( taskRunId, newEvents, - currentCount + update.newEntries.length, + nextProcessedCount, ); this.updatePromptStateFromEvents(taskRunId, newEvents); + + if (newEvents.some(isUserPromptEcho)) { + sessionStoreSetters.clearOptimisticItems(taskRunId); + } } } - // Flush queued messages when a cloud turn completes (detected via live log updates) + const isTerminalUpdate = + (update.kind === "status" || update.kind === "snapshot") && + isTerminalStatus(update.status); + + // Auto-flush queued cloud follow-ups once the agent's turn ends. The + // `user_message` command path turned out to be unreliable for this + // cloud — accepted during in_progress (preempting the prior turn), + // rejected or silently dropped after end_turn. resumeCloudRun creates + // a fresh task run that inherits the prior conversation state and + // processes the queued prompt as `pending_user_message` — the same + // path the initial run uses. Skip when this update brings the run + // terminal; the terminal-status block below handles that path. const sessionAfterLogs = sessionStoreSetters.getSessions()[taskRunId]; if ( + !isTerminalUpdate && sessionAfterLogs && !sessionAfterLogs.isPromptPending && sessionAfterLogs.messageQueue.length > 0 ) { - this.sendQueuedCloudMessages(sessionAfterLogs.taskId).catch((err) => { - log.error("Failed to send queued cloud messages after turn complete", { - taskId: sessionAfterLogs.taskId, - error: err, + const dequeued = sessionStoreSetters.dequeueMessages( + sessionAfterLogs.taskId, + ); + const combinedPrompt = combineQueuedCloudPrompts(dequeued); + if (combinedPrompt) { + log.info( + "Auto-flushing queued cloud messages via resume after turn end", + { + taskId: sessionAfterLogs.taskId, + queuedCount: dequeued.length, + }, + ); + this.resumeCloudRun(sessionAfterLogs, combinedPrompt).catch((err) => { + log.error("Failed to resume with queued cloud messages", { + taskId: sessionAfterLogs.taskId, + error: err, + }); + toast.error("Failed to send follow-up message. Please try again."); }); - }); + } } // Update cloud status fields if present @@ -2949,32 +2943,64 @@ export class SessionService { branch: update.branch, }); - // Auto-send queued messages when a resumed run becomes active + // The local messageQueue only ever holds messages submitted before the + // sandbox was ready (cloudStatus !== "in_progress"). Once the run goes + // in_progress, drain them through the regular send path — the cloud + // accepts user_message commands at any time once the sandbox is up. if (update.status === "in_progress") { const session = sessionStoreSetters.getSessions()[taskRunId]; if (session && session.messageQueue.length > 0) { - // Clear the pending flag first — resumeCloudRun sets it as a guard - // while waiting for the run to start. Now that the run is active, - // sendCloudPrompt needs the flag clear to actually send. + const dequeued = sessionStoreSetters.dequeueMessages(session.taskId); + const combinedPrompt = combineQueuedCloudPrompts(dequeued); sessionStoreSetters.updateSession(taskRunId, { isPromptPending: false, }); - this.sendQueuedCloudMessages(session.taskId).catch(() => { - // Retries exhausted — message was re-enqueued by - // sendQueuedCloudMessages, future stream-based completion detection - // will keep trying - }); + if (combinedPrompt) { + this.sendCloudPrompt(session, combinedPrompt).catch((err) => { + log.error("Failed to flush sandbox-ready queue", { + taskId: session.taskId, + error: err, + }); + toast.error( + "Failed to send follow-up message. Please try again.", + ); + }); + } } } if (isTerminalStatus(update.status)) { - // Clean up any pending resume messages that couldn't be sent const session = sessionStoreSetters.getSessions()[taskRunId]; - if ( - session && - (session.messageQueue.length > 0 || session.isPromptPending) - ) { - sessionStoreSetters.clearMessageQueue(session.taskId); + // Anything still local (sandbox-not-ready queue or an optimistic + // bubble whose user_message command never echoed back) cannot be + // delivered to a finished run. Replay through resumeCloudRun, which + // spins up a fresh task run carrying the prompt as + // `pending_user_message`. + const queuedPrompt = + session && session.messageQueue.length > 0 + ? combineQueuedCloudPrompts( + sessionStoreSetters.dequeueMessages(session.taskId), + ) + : null; + const optimisticPrompt = + !queuedPrompt && session + ? extractOptimisticUserMessage(session) + : null; + const replayPrompt = queuedPrompt ?? optimisticPrompt; + if (session && replayPrompt) { + sessionStoreSetters.clearOptimisticItems(taskRunId); + sessionStoreSetters.updateSession(taskRunId, { + isPromptPending: false, + }); + this.resumeCloudRun(session, replayPrompt).catch((err) => { + log.error("Failed to resume cloud run with queued messages", { + taskId: session.taskId, + error: err, + }); + toast.error("Failed to send follow-up message. Please try again."); + }); + } else if (session?.isPromptPending) { + sessionStoreSetters.clearOptimisticItems(taskRunId); sessionStoreSetters.updateSession(taskRunId, { isPromptPending: false, }); diff --git a/apps/code/src/renderer/features/sessions/stores/sessionStore.ts b/apps/code/src/renderer/features/sessions/stores/sessionStore.ts index 08b7c7f11..32c4b05ce 100644 --- a/apps/code/src/renderer/features/sessions/stores/sessionStore.ts +++ b/apps/code/src/renderer/features/sessions/stores/sessionStore.ts @@ -317,7 +317,7 @@ export const sessionStoreSetters = { taskId: string, content: string, rawPrompt?: string | ContentBlock[], - ) => { + ): string => { const id = `queue-${Date.now()}-${Math.random().toString(36).slice(2, 9)}`; useSessionStore.setState((state) => { const taskRunId = state.taskIdIndex[taskId]; @@ -333,6 +333,7 @@ export const sessionStoreSetters = { }); } }); + return id; }, removeQueuedMessage: (taskId: string, messageId: string) => {