-
Notifications
You must be signed in to change notification settings - Fork 881
fix(graph): introduce Status.SKIPPED for cancel_node; graph continues downstream #2258
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
5eda187
759efed
788472b
27f957b
6e2f93c
94e9ea3
9198f8e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -43,8 +43,8 @@ | |
| from ..telemetry import get_tracer | ||
| from ..types._events import ( | ||
| MultiAgentHandoffEvent, | ||
| MultiAgentNodeCancelEvent, | ||
| MultiAgentNodeInterruptEvent, | ||
| MultiAgentNodeSkipEvent, | ||
| MultiAgentNodeStartEvent, | ||
| MultiAgentNodeStopEvent, | ||
| MultiAgentNodeStreamEvent, | ||
|
|
@@ -68,7 +68,8 @@ class GraphState: | |
|
|
||
| Attributes: | ||
| status: Current execution status of the graph. | ||
| completed_nodes: Set of nodes that have completed execution. | ||
| completed_nodes: Set of nodes whose execution is settled — either completed normally or skipped via skip_node. | ||
| Both statuses satisfy downstream readiness checks; inspect node.execution_status to distinguish them. | ||
| failed_nodes: Set of nodes that failed during execution. | ||
| interrupted_nodes: Set of nodes that user interrupted during execution. | ||
| execution_order: List of nodes in the order they were executed. | ||
|
|
@@ -134,6 +135,9 @@ class GraphResult(MultiAgentResult): | |
|
|
||
| total_nodes: int = 0 | ||
| completed_nodes: int = 0 | ||
| """Number of nodes that successfully ran to completion (excludes skipped nodes).""" | ||
| skipped_nodes: int = 0 | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Issue: The Suggestion: Document this breaking change prominently in the PR description. Users who relied on
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The direction is the opposite — skipped nodes are added to
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Correction to my previous reply: |
||
| """Number of nodes bypassed via skip_node or cancel_node; downstream nodes continued executing.""" | ||
| failed_nodes: int = 0 | ||
| interrupted_nodes: int = 0 | ||
| execution_order: list["GraphNode"] = field(default_factory=list) | ||
|
|
@@ -929,13 +933,29 @@ async def _execute_node(self, node: GraphNode, invocation_state: dict[str, Any]) | |
| yield self._activate_interrupt(node, interrupts, from_hook=True) | ||
| return | ||
|
|
||
| if before_event.cancel_node: | ||
| cancel_message = ( | ||
| before_event.cancel_node if isinstance(before_event.cancel_node, str) else "node cancelled by user" | ||
| skip_value = before_event.skip_node or before_event.cancel_node | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The variable is still named
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1 on this observation. The
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed —
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Issue: The Suggestion: Add a brief note in the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch. Added a note to the |
||
|
|
||
| if skip_value: | ||
| skip_message = skip_value if isinstance(skip_value, str) else "node skipped by user" | ||
| logger.debug("reason=<%s> | node skipped, graph continues", skip_message) | ||
| yield MultiAgentNodeSkipEvent(node.node_id, skip_message) | ||
| node_result = NodeResult( | ||
| result=None, | ||
| execution_time=0, | ||
| status=Status.SKIPPED, | ||
| accumulated_usage=Usage(inputTokens=0, outputTokens=0, totalTokens=0), | ||
| accumulated_metrics=Metrics(latencyMs=0), | ||
| execution_count=0, | ||
| ) | ||
| logger.debug("reason=<%s> | cancelling execution", cancel_message) | ||
| yield MultiAgentNodeCancelEvent(node.node_id, cancel_message) | ||
| raise RuntimeError(cancel_message) | ||
| node.result = node_result | ||
| node.execution_time = 0 | ||
| node.execution_status = Status.SKIPPED | ||
| self.state.completed_nodes.add(node) | ||
| self.state.results[node.node_id] = node_result | ||
| self.state.execution_order.append(node) | ||
| self._accumulate_metrics(node_result) | ||
| yield MultiAgentNodeStopEvent(node_id=node.node_id, node_result=node_result) | ||
| return | ||
|
|
||
| # Build node input from satisfied dependencies | ||
| node_input = self._build_node_input(node) | ||
|
|
@@ -1120,7 +1140,7 @@ def _build_node_input(self, node: GraphNode) -> list[ContentBlock]: | |
|
|
||
| return node_responses | ||
|
|
||
| # Get satisfied dependencies | ||
| # Get satisfied dependencies, excluding skipped nodes (they produced no output) | ||
| dependency_results = {} | ||
| for edge in self.edges: | ||
| if ( | ||
|
|
@@ -1129,7 +1149,9 @@ def _build_node_input(self, node: GraphNode) -> list[ContentBlock]: | |
| and edge.from_node.node_id in self.state.results | ||
| ): | ||
| if edge.should_traverse(self.state): | ||
| dependency_results[edge.from_node.node_id] = self.state.results[edge.from_node.node_id] | ||
| nr = self.state.results[edge.from_node.node_id] | ||
| if nr.status != Status.SKIPPED: | ||
| dependency_results[edge.from_node.node_id] = nr | ||
|
|
||
| if not dependency_results: | ||
| # No dependencies - return task as ContentBlocks | ||
|
|
@@ -1180,7 +1202,8 @@ def _build_result(self, interrupts: list[Interrupt]) -> GraphResult: | |
| execution_count=self.state.execution_count, | ||
| execution_time=self.state.execution_time, | ||
| total_nodes=self.state.total_nodes, | ||
| completed_nodes=len(self.state.completed_nodes), | ||
| completed_nodes=sum(1 for n in self.state.completed_nodes if n.execution_status == Status.COMPLETED), | ||
| skipped_nodes=sum(1 for n in self.state.completed_nodes if n.execution_status == Status.SKIPPED), | ||
| failed_nodes=len(self.state.failed_nodes), | ||
| interrupted_nodes=len(self.state.interrupted_nodes), | ||
| execution_order=self.state.execution_order, | ||
|
|
@@ -1285,7 +1308,8 @@ def _from_dict(self, payload: dict[str, Any]) -> None: | |
| self.nodes[node_id] for node_id in (payload.get("completed_nodes") or []) if node_id in self.nodes | ||
| ) | ||
| for node in self.state.completed_nodes: | ||
| node.execution_status = Status.COMPLETED | ||
| nr = results.get(node.node_id) | ||
| node.execution_status = Status.SKIPPED if (nr and nr.status == Status.SKIPPED) else Status.COMPLETED | ||
|
|
||
| # Execution order (only nodes that still exist) | ||
| order_node_ids = payload.get("execution_order") or [] | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -44,8 +44,8 @@ | |
| from ..tools.decorator import tool | ||
| from ..types._events import ( | ||
| MultiAgentHandoffEvent, | ||
| MultiAgentNodeCancelEvent, | ||
| MultiAgentNodeInterruptEvent, | ||
| MultiAgentNodeSkipEvent, | ||
| MultiAgentNodeStartEvent, | ||
| MultiAgentNodeStopEvent, | ||
| MultiAgentNodeStreamEvent, | ||
|
|
@@ -774,14 +774,13 @@ async def _execute_swarm(self, invocation_state: dict[str, Any]) -> AsyncIterato | |
| yield self._activate_interrupt(current_node, interrupts) | ||
| break | ||
|
|
||
| if before_event.cancel_node: | ||
| cancel_message = ( | ||
| before_event.cancel_node | ||
| if isinstance(before_event.cancel_node, str) | ||
| else "node cancelled by user" | ||
| ) | ||
| logger.debug("reason=<%s> | cancelling execution", cancel_message) | ||
| yield MultiAgentNodeCancelEvent(current_node.node_id, cancel_message) | ||
| skip_value = before_event.skip_node or before_event.cancel_node | ||
|
|
||
| if skip_value: | ||
| skip_message = skip_value if isinstance(skip_value, str) else "node skipped by user" | ||
| logger.debug("reason=<%s> | node skipped, stopping swarm sequence", skip_message) | ||
| yield MultiAgentNodeSkipEvent(current_node.node_id, skip_message) | ||
| # Linear swarm: nodes depend on prior output; skip sets FAILED (unlike graph, which continues). | ||
| self.state.completion_status = Status.FAILED | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Issue: Setting Suggestion: Consider introducing
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Intentional. In a linear swarm, each node's output feeds the next — skip one and downstream nodes never get their input, so the sequence can't complete. |
||
| break | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -564,7 +564,11 @@ def __init__(self, node_id: str, agent_event: dict[str, Any]) -> None: | |
|
|
||
|
|
||
| class MultiAgentNodeCancelEvent(TypedEvent): | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Issue: The Suggestion: Either add a runtime deprecation warning in
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No |
||
| """Event emitted when a user cancels node execution from their BeforeNodeCallEvent hook.""" | ||
| """Planned event for when a node stops graph execution entirely (see issue #2401). | ||
|
|
||
| Not currently emitted by the library. To handle bypassed nodes, subscribe to | ||
| :class:`MultiAgentNodeSkipEvent` (type ``multiagent_node_skip``) instead. | ||
| """ | ||
|
|
||
| def __init__(self, node_id: str, message: str) -> None: | ||
| """Initialize with cancel message. | ||
|
|
@@ -582,6 +586,30 @@ def __init__(self, node_id: str, message: str) -> None: | |
| ) | ||
|
|
||
|
|
||
| class MultiAgentNodeSkipEvent(TypedEvent): | ||
| """Event emitted when a node is bypassed via :attr:`BeforeNodeCallEvent.skip_node`. | ||
|
|
||
| Also triggered by the deprecated :attr:`BeforeNodeCallEvent.cancel_node` alias. The | ||
| orchestrator's behavior after skip depends on its type: a graph continues executing | ||
| downstream nodes, while a swarm stops the current run. | ||
| """ | ||
|
|
||
| def __init__(self, node_id: str, message: str) -> None: | ||
| """Initialize with skip message. | ||
|
|
||
| Args: | ||
| node_id: Unique identifier for the node. | ||
| message: The node skip message. | ||
| """ | ||
| super().__init__( | ||
| { | ||
| "type": "multiagent_node_skip", | ||
| "node_id": node_id, | ||
| "message": message, | ||
| } | ||
| ) | ||
|
|
||
|
|
||
| class MultiAgentNodeInterruptEvent(TypedEvent): | ||
| """Event emitted when a node is interrupted.""" | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.