Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions apps/code/src/main/services/cloud-task/schemas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ export const sendCommandInput = z.object({
"close",
"permission_response",
"set_config_option",
"shell_execute",
]),
params: z.record(z.string(), z.unknown()).optional(),
});
Expand Down
148 changes: 148 additions & 0 deletions apps/code/src/main/services/cloud-task/service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -571,4 +571,152 @@ describe("CloudTaskService", () => {
]);
expect(mockNetFetch).toHaveBeenCalledTimes(3);
});

it("routes shell_output and shell_exit notifications to typed cloud updates without log batching", async () => {
const updates: unknown[] = [];
service.on(CloudTaskEvent.Update, (payload) => updates.push(payload));

mockNetFetch
.mockResolvedValueOnce(
createJsonResponse({
id: "run-1",
status: "in_progress",
stage: null,
output: null,
error_message: null,
branch: "main",
updated_at: "2026-01-01T00:00:00Z",
}),
)
.mockResolvedValueOnce(
createJsonResponse([], 200, { "X-Has-More": "false" }),
);

const shellOutputFrame = {
type: "notification",
timestamp: "2026-01-01T00:00:01Z",
notification: {
jsonrpc: "2.0",
method: "_posthog/shell_output",
params: {
executionId: "exec-1",
stream: "stdout",
chunk: "vojta\n",
},
},
};
const shellExitFrame = {
type: "notification",
timestamp: "2026-01-01T00:00:02Z",
notification: {
jsonrpc: "2.0",
method: "_posthog/shell_exit",
params: { executionId: "exec-1", exitCode: 0, signal: null },
},
};

mockStreamFetch.mockResolvedValueOnce(
createOpenSseResponse(
`id: 1\ndata: ${JSON.stringify(shellOutputFrame)}\n\n` +
`id: 2\ndata: ${JSON.stringify(shellExitFrame)}\n\n`,
),
);

service.watch({
taskId: "task-1",
runId: "run-1",
apiHost: "https://app.example.com",
teamId: 2,
});

await waitFor(() =>
updates.some((u) => (u as { kind?: string }).kind === "shell_exit"),
);

const shellUpdates = updates.filter((u) => {
const kind = (u as { kind?: string }).kind;
return kind === "shell_output" || kind === "shell_exit";
});

expect(shellUpdates).toEqual([
{
taskId: "task-1",
runId: "run-1",
kind: "shell_output",
executionId: "exec-1",
stream: "stdout",
chunk: "vojta\n",
},
{
taskId: "task-1",
runId: "run-1",
kind: "shell_exit",
executionId: "exec-1",
exitCode: 0,
signal: null,
},
]);

// Shell notifications must not also be batched as log entries, otherwise
// the renderer re-processes the same chunks through the log conversion
// pipeline and the transcript gets noisy duplicates.
const logUpdates = updates.filter(
(u) => (u as { kind?: string }).kind === "logs",
);
expect(logUpdates).toEqual([]);
});

it("ignores shell_output with invalid params", async () => {
const updates: unknown[] = [];
service.on(CloudTaskEvent.Update, (payload) => updates.push(payload));

mockNetFetch
.mockResolvedValueOnce(
createJsonResponse({
id: "run-1",
status: "in_progress",
stage: null,
output: null,
error_message: null,
branch: "main",
updated_at: "2026-01-01T00:00:00Z",
}),
)
.mockResolvedValueOnce(
createJsonResponse([], 200, { "X-Has-More": "false" }),
);

// Missing executionId — extractor should reject and the event falls
// through to normal log batching.
const malformedFrame = {
type: "notification",
timestamp: "2026-01-01T00:00:01Z",
notification: {
jsonrpc: "2.0",
method: "_posthog/shell_output",
params: { stream: "stdout", chunk: "orphan\n" },
},
};

mockStreamFetch.mockResolvedValueOnce(
createOpenSseResponse(
`id: 1\ndata: ${JSON.stringify(malformedFrame)}\n\n`,
),
);

service.watch({
taskId: "task-1",
runId: "run-1",
apiHost: "https://app.example.com",
teamId: 2,
});

await waitFor(() =>
updates.some((u) => (u as { kind?: string }).kind === "logs"),
);

expect(
updates.some((u) => (u as { kind?: string }).kind === "shell_output"),
).toBe(false);
});
});
101 changes: 101 additions & 0 deletions apps/code/src/main/services/cloud-task/service.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { POSTHOG_NOTIFICATIONS } from "@posthog/agent";
import type { CloudTaskPermissionRequestUpdate } from "@shared/types";
import type { StoredLogEntry } from "@shared/types/session-events";
import { net } from "electron";
Expand Down Expand Up @@ -141,6 +142,78 @@ function isPermissionRequestEvent(
);
}

interface ShellOutputNotification {
executionId: string;
stream: "stdout" | "stderr";
chunk: string;
}

interface ShellExitNotification {
executionId: string;
exitCode: number | null;
signal: string | null;
}

function extractShellNotification(
data: unknown,
):
| { kind: "output"; params: ShellOutputNotification }
| { kind: "exit"; params: ShellExitNotification }
| null {
if (typeof data !== "object" || data === null) return null;
const d = data as Record<string, unknown>;
if (d.type !== "notification") return null;
const notification = d.notification as
| { method?: unknown; params?: unknown }
| undefined;
const method = notification?.method;
if (
method !== POSTHOG_NOTIFICATIONS.SHELL_OUTPUT &&
method !== POSTHOG_NOTIFICATIONS.SHELL_EXIT
) {
return null;
}
const params = notification?.params as Record<string, unknown> | undefined;
if (!params || typeof params !== "object") return null;

if (method === POSTHOG_NOTIFICATIONS.SHELL_OUTPUT) {
const executionId = params.executionId;
const stream = params.stream;
const chunk = params.chunk;
if (
typeof executionId === "string" &&
(stream === "stdout" || stream === "stderr") &&
typeof chunk === "string"
) {
return { kind: "output", params: { executionId, stream, chunk } };
}
return null;
}

if (method === POSTHOG_NOTIFICATIONS.SHELL_EXIT) {
const executionId = params.executionId;
const exitCode = params.exitCode;
const signal = params.signal;
if (
typeof executionId === "string" &&
(exitCode === null || typeof exitCode === "number") &&
(signal === null || typeof signal === "string")
) {
return {
kind: "exit",
params: {
executionId,
exitCode: exitCode as number | null,
signal: signal as string | null,
},
};
}
return null;
}

return null;
}

function createStreamStatusError(status: number): CloudTaskStreamError {
switch (status) {
case 401:
Expand Down Expand Up @@ -713,6 +786,34 @@ export class CloudTaskService extends TypedEventEmitter<CloudTaskEvents> {
return;
}

const shellNotification = extractShellNotification(event.data);
if (shellNotification) {
log.info("Shell notification received", {
kind: shellNotification.kind,
executionId: shellNotification.params.executionId,
});
if (shellNotification.kind === "output") {
this.emit(CloudTaskEvent.Update, {
taskId: watcher.taskId,
runId: watcher.runId,
kind: "shell_output" as const,
executionId: shellNotification.params.executionId,
stream: shellNotification.params.stream,
chunk: shellNotification.params.chunk,
});
} else {
this.emit(CloudTaskEvent.Update, {
taskId: watcher.taskId,
runId: watcher.runId,
kind: "shell_exit" as const,
executionId: shellNotification.params.executionId,
exitCode: shellNotification.params.exitCode,
signal: shellNotification.params.signal,
});
}
return;
}

watcher.pendingLogEntries.push(event.data as StoredLogEntry);
if (watcher.pendingLogEntries.length >= EVENT_BATCH_MAX_SIZE) {
this.flushLogBatch(key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ export function CommandCenterSessionView({
isPromptPending={isPromptPending}
promptStartedAt={promptStartedAt}
onSendPrompt={handleSendPrompt}
onBashCommand={isCloud ? undefined : handleBashCommand}
onBashCommand={handleBashCommand}
onCancelPrompt={handleCancelPrompt}
repoPath={repoPath}
cloudBranch={cloudBranch}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ export const MessageEditor = forwardRef<EditorHandle, MessageEditorProps>(
autoFocus,
context: { taskId, repoPath },
getPromptHistory,
capabilities: { bashMode: !isCloud },
capabilities: { bashMode: true },
onSubmit,
onBashCommand,
onBashModeChange,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,26 +112,33 @@ export function useSessionCallbacks({

const handleBashCommand = useCallback(
async (command: string) => {
if (!repoPath) return;
const currentSession = sessionRef.current;
const isCloud = currentSession?.isCloud === true;

if (!isCloud && !repoPath) return;

const execId = `user-shell-${Date.now()}-${Math.random().toString(36).slice(2, 9)}`;
const displayCwd = repoPath ?? "cloud-sandbox";

await getSessionService().startUserShellExecute(
taskId,
execId,
command,
repoPath,
displayCwd,
);

try {
const result = await trpcClient.shell.execute.mutate({
cwd: repoPath,
command,
});
const result = isCloud
? await getSessionService().executeCloudShellCommand(taskId, command)
: await trpcClient.shell.execute.mutate({
cwd: repoPath as string,
command,
});
await getSessionService().completeUserShellExecute(
taskId,
execId,
command,
repoPath,
displayCwd,
result,
);
} catch (error) {
Expand All @@ -140,7 +147,7 @@ export function useSessionCallbacks({
taskId,
execId,
command,
repoPath,
displayCwd,
{
stdout: "",
stderr: error instanceof Error ? error.message : "Command failed",
Expand Down
Loading
Loading