diff --git a/docs/decisions/0026-message-merger-invariants.md b/docs/decisions/0026-message-merger-invariants.md new file mode 100644 index 0000000000..58b32c3e0a --- /dev/null +++ b/docs/decisions/0026-message-merger-invariants.md @@ -0,0 +1,191 @@ +--- +status: accepted +contact: lokitoth +date: 2026-05-13 +deciders: lokitoth +consulted: +informed: +--- + +# MessageMerger Streaming Merge Invariants + +## Context and Problem Statement + +`Microsoft.Agents.AI.Workflows.MessageMerger` is the internal component that +folds a stream of `AgentResponseUpdate` items emitted by an agent (or by a +hosting executor wrapping an agent) into a single `AgentResponse` for a turn. +Multi-agent workflows (handoff, group chat, orchestration) rely on this +merger to produce a coherent transcript even when updates arrive interleaved +across responses, messages, and timestamps. + +Prior to this change the contract that hosting executors and the merger +should jointly enforce was implicit. The implementation also carried a small +amount of dead state (`createdTimes`) that was collected but never consumed, +suggesting that an earlier, timestamp-driven ordering scheme had been +abandoned without documentation, and the live code still ran an unreliable +`CreatedAt`-based sort that could reorder messages across concurrent agents +inside a single workflow super-step. There were no tests pinning down the +ordering or grouping behavior, so any future refactor risked silently +regressing it. + +The problem this ADR addresses is therefore: **what merge guarantees does +`MessageMerger` make to its callers, and how do we lock those guarantees in +without changing observable behavior in non-pathological cases?** + +## Decision Drivers + +- **A. Predictable ordering.** Developers consuming a merged `AgentResponse` + must be able to reason about the order in which messages appear without + having to know whether updates carried `CreatedAt`. +- **B. Coherent multi-agent transcripts.** When several agents stream into + one merger within a single workflow super-step, each agent's contribution + must read as a contiguous block; and a step's updates must precede the + next step's updates. +- **C. Stable hosting-executor contract.** A turn must be addressable by a + single `ResponseId`; updates without one are an exceptional, "dangling" + case rather than the norm. +- **D. Minimal behavioral change for non-pathological inputs.** This work + is intended to document and test current behavior, not to alter what + users see today in well-formed agent streams. +- **E. Discoverability for future contributors.** Known sharp edges (e.g. + cross-`ResponseId` ordering, dangling-update placement) should be written + down so they are not rediscovered as bugs. + +## Considered Options + +1. **Option 1 — Document invariants, pin them with tests, and use pure + emission/insertion order for both responses and the messages inside + each response.** Removes the unreliable `CreatedAt`-based sort and the + dead `createdTimes` collection. Behavioral change only for inputs that + relied on `CreatedAt` to re-order updates after the fact — which the + prior comparer could not do correctly anyway (non-transitive). +2. **Option 2 — Rewrite `MessageMerger` to group strictly by `AgentId` + (rather than `ResponseId`), and to use a stable, transitive comparer + that mixes `CreatedAt` with insertion index.** Behavioral change; would + also require updating hosting executors that currently assume + `ResponseId`-based grouping. +3. **Option 3 — Leave the code and tests as-is; capture edge cases only + in a working note.** No code or test change. + +## Decision Outcome + +Chosen option: **Option 1 — Document invariants, pin them with tests, +and use pure emission/insertion order; remove the dead `createdTimes` +collection.** + +This option satisfies driver A (predictable ordering: emission order is +the simplest reasoning model), driver B (per-`ResponseId` grouping plus +first-seen ordering across responses gives both per-agent blocks within a +step and step-ordering across steps), driver C (single ResponseId per +turn is unchanged), and driver E (invariants and edge cases are now +written down and covered by tests). + +Driver D (minimal behavioral change) is satisfied because well-formed +agent streams already emit updates in the order they want to appear; the +prior `CreatedAt`-based sort only mattered for pathological inputs (mixed +or out-of-order timestamps from concurrent agents), and on those inputs +the old comparer was non-transitive and therefore unreliable. Removing +the sort makes those inputs deterministic — they now follow emission +order — without disturbing the well-formed case. + +Option 2 was rejected for this iteration because it changes what callers +observe in well-formed flows and would require a coordinated change +across hosting executors. It is a candidate for a follow-up ADR if the +known edge cases below are reported as bugs in practice. + +Option 3 was rejected because it leaves the invariants un-tested and the +dead code in place, so the next refactor can break the contract without +any signal. + +### Invariants + +The following invariants are now part of the contract of +`MessageMerger`/hosting executors and are covered by tests in +`MessageMergerTests`: + +1. **Single `ResponseId` per turn.** Every `AgentResponseUpdate` produced + by a hosting executor in a single agent turn shares one `ResponseId`. + If the underlying agent does not supply one, the executor assigns it. + Updates with `ResponseId == null` are treated as "dangling" and + flattened into loose messages at the end of the merged response. +2. **Pure emission-order preservation.** Within a `ResponseId` block, + messages appear in the order their updates first arrived at the + merger. Across `ResponseId` blocks, blocks appear in first-seen order. + `CreatedAt` is **not** consulted when ordering messages or blocks — + only when stamping the merged response and its child messages. +3. **Per-`ResponseId` grouping (no interleaving).** Messages produced + under one `ResponseId` are emitted as a contiguous block in the merged + `AgentResponse`. Combined with Invariant 1, this yields: + - **Within a workflow super-step with one agent**: all messages + appear together, in emission order. + - **Within a super-step with multiple agents**: each agent's messages + are a contiguous block, ordered by which agent emitted first. + - **Across super-steps**: a step's blocks all precede the next step's + blocks, because the next step cannot start emitting until the + current step's emissions have arrived at the merger. + +### Consequences + +- Good, because the merge contract is now explicit, regression-tested, + and trivially reasoned about (it is just emission order with + per-`ResponseId` grouping). +- Good, because removing the unreliable `CreatedAt`-based sort eliminates + a latent bug — the prior comparer was non-transitive on mixed-timestamp + inputs, so `List.Sort` could in principle return any of several + orderings or throw on some runtimes. +- Good, because removing the unused `createdTimes` collection eliminates + a misleading code smell. +- Good, because hosting-executor authors have a written checklist of + invariants to satisfy. +- Neutral, because well-formed agent streams (those that emit updates in + the order they want to appear) see no change in output. +- Bad, because callers who relied on a server-supplied `CreatedAt` to + retro-correct out-of-order emissions will no longer see that + correction — they must ensure emission order matches desired output + order, or attach to `RawRepresentation` for original timestamps. + +## Validation + +- Unit tests in + `dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/MessageMergerTests.cs` + cover each invariant: + - Insertion-order preservation with no timestamps. + - Insertion-order preservation with mixed timestamps (emission order + wins over `CreatedAt`). + - Determinism across repeated runs with mixed timestamps. + - Per-`ResponseId` grouping for interleaved multi-agent streams within + a single step. + - Per-`ResponseId` grouping with distinct response ids. +- Existing tests for assembly, function-call/result ordering, and + `FinishReason` propagation continue to pass. + +## More Information + +### Known edge cases (intentionally not fixed in this ADR) + +These are properties of the *current* `MessageMerger` that callers should +be aware of. They are not invariants — they may change in a future ADR — +but they are present in the shipped behavior covered by tests above. + +| # | Edge case | Risk | Notes | +|---|-----------|------|-------| +| 1 | Cross-`ResponseId` ordering follows first-seen-`ResponseId` order, not chronological order across responses. | Medium | Acceptable today because each turn has a single `ResponseId` (Invariant 1); only matters if a caller deliberately interleaves multiple response ids inside one step. | +| 2 | Updates with `ResponseId == null` are always emitted **after** all keyed responses, regardless of arrival time. | Medium | Documented as the "dangling" path; agents should always emit a `ResponseId`. | +| 3 | Within a response, updates with `MessageId == null` are always emitted **after** keyed messages. | Low | Same rationale as #2, scoped to messages within a response. | +| 4 | The merged response's `CreatedAt` is set to `DateTimeOffset.UtcNow`; per-response `CreatedAt` is propagated onto each contained `ChatMessage` instead of being preserved at the response level. | Low | Callers who need original per-update timestamps should read them from `RawRepresentation` or capture them before merging. | +| 5 | Metadata on dangling (`ResponseId == null`) updates — `FinishReason`, `Usage`, `AgentId`, `AdditionalProperties` — is **not** merged into the final `AgentResponse`; only their `Messages` are surfaced. | Medium | Hosting executors must attach metadata to a keyed update if they want it reflected in the merged response. | +| 6 | Emission order is the **only** ordering signal — `CreatedAt` differences between updates are ignored when ordering. | Low | This is the intended behavior under Invariant 2; producers must emit in the desired output order. | + +If any of these become observable problems in production, the appropriate +follow-up is a new ADR that supersedes this one (likely realizing +"Option 2") rather than a silent fix. + +### Code references + +- `dotnet/src/Microsoft.Agents.AI.Workflows/MessageMerger.cs` — merger + implementation. The previously-unused `createdTimes` collection and + the `CreatedAt`-based sort have both been removed in this change. +- `dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/MessageMergerTests.cs` — + invariant tests added in this change. +- `AgentInvocationContext.ResponseId` and `ToStreamingResponseAsync` — + the hosting-executor side of Invariant 1. diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/MessageMerger.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/MessageMerger.cs index 4b702034ce..32919d1c3b 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/MessageMerger.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/MessageMerger.cs @@ -97,28 +97,6 @@ public void AddUpdate(AgentResponseUpdate update) } } - private int CompareByDateTimeOffset(AgentResponse left, AgentResponse right) - { - const int LESS = -1, EQ = 0, GREATER = 1; - - if (left.CreatedAt == right.CreatedAt) - { - return EQ; - } - - if (!left.CreatedAt.HasValue) - { - return GREATER; - } - - if (!right.CreatedAt.HasValue) - { - return LESS; - } - - return left.CreatedAt.Value.CompareTo(right.CreatedAt.Value); - } - public AgentResponse ComputeMerged(string primaryResponseId, string? primaryAgentId = null, string? primaryAgentName = null) { List messages = []; @@ -126,6 +104,15 @@ public AgentResponse ComputeMerged(string primaryResponseId, string? primaryAgen HashSet agentIds = []; HashSet finishReasons = []; + // Ordering contract (see docs/decisions/0026-message-merger-invariants.md): + // - Outer loop iterates ResponseIds in first-seen order, which preserves step + // ordering: each agent invocation owns its own ResponseId, and successive + // super-steps emit their first update only after the prior step's updates + // have all arrived. Iterating Dictionary<,>.Keys preserves insertion order. + // - Inner loop iterates MessageIds in first-seen order, then appends dangling + // updates last. This preserves emission order within an agent's block. + // We deliberately do NOT sort by CreatedAt: timestamps from concurrent agents + // or different clocks would interleave per-agent blocks and break Goal 1. foreach (string responseId in this._mergeStates.Keys) { ResponseMergeState mergeState = this._mergeStates[responseId]; @@ -136,14 +123,12 @@ public AgentResponse ComputeMerged(string primaryResponseId, string? primaryAgen responseList.Add(mergeState.ComputeDangling()); } - responseList.Sort(this.CompareByDateTimeOffset); responses[responseId] = responseList.Aggregate(MergeResponses); messages.AddRange(GetMessagesWithCreatedAt(responses[responseId])); } UsageDetails? usage = null; AdditionalPropertiesDictionary? additionalProperties = null; - HashSet createdTimes = []; foreach (AgentResponse response in responses.Values) { @@ -152,11 +137,6 @@ public AgentResponse ComputeMerged(string primaryResponseId, string? primaryAgen agentIds.Add(response.AgentId); } - if (response.CreatedAt.HasValue) - { - createdTimes.Add(response.CreatedAt.Value); - } - if (response.FinishReason.HasValue) { finishReasons.Add(response.FinishReason.Value); diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/MessageMergerTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/MessageMergerTests.cs index 09d83966aa..a0e2124cfc 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/MessageMergerTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/MessageMergerTests.cs @@ -1,6 +1,7 @@ // Copyright (c) Microsoft. All rights reserved. using System; +using System.Linq; using FluentAssertions; using Microsoft.Extensions.AI; @@ -42,6 +43,47 @@ public void Test_MessageMerger_AssemblesMessage() response.FinishReason.Should().BeNull(); } + [Fact] + public void Test_MessageMerger_PreservesFunctionCallOrderingWhenToolResultHasCreatedAt() + { + // Arrange + string responseId = Guid.NewGuid().ToString("N"); + string functionCallMessageId = Guid.NewGuid().ToString("N"); + string functionResultMessageId = Guid.NewGuid().ToString("N"); + string callId = Guid.NewGuid().ToString("N"); + DateTimeOffset toolResultCreatedAt = DateTimeOffset.UtcNow; + + MessageMerger merger = new(); + + merger.AddUpdate(new AgentResponseUpdate + { + ResponseId = responseId, + MessageId = functionCallMessageId, + AgentId = TestAgentId1, + Role = ChatRole.Assistant, + Contents = [new FunctionCallContent(callId, "handoff_to_TestAgent2")], + }); + merger.AddUpdate(new AgentResponseUpdate + { + ResponseId = responseId, + MessageId = functionResultMessageId, + AgentId = TestAgentId1, + CreatedAt = toolResultCreatedAt, + Role = ChatRole.Tool, + Contents = [new FunctionResultContent(callId, "Transferred.")], + }); + + // Act + AgentResponse response = merger.ComputeMerged(responseId); + + // Assert + response.Messages.Should().HaveCount(2); + response.Messages[0].Role.Should().Be(ChatRole.Assistant); + response.Messages[0].Contents.Should().ContainSingle().Which.Should().BeOfType(); + response.Messages[1].Role.Should().Be(ChatRole.Tool); + response.Messages[1].Contents.Should().ContainSingle().Which.Should().BeOfType(); + } + [Fact] public void Test_MessageMerger_PropagatesFinishReasonFromUpdates() { @@ -71,4 +113,494 @@ public void Test_MessageMerger_PropagatesFinishReasonFromUpdates() // Assert - FinishReason from the update should propagate through response.FinishReason.Should().Be(ChatFinishReason.ContentFilter); } + + #region Invariant 2: Output Order Preservation Tests + + [Fact] + public void Test_MessageMerger_PreservesInsertionOrder_WhenNoTimestamps() + { + // Arrange: Multiple updates without CreatedAt, in specific order A, B, C + string responseId = Guid.NewGuid().ToString("N"); + string messageIdA = Guid.NewGuid().ToString("N"); + string messageIdB = Guid.NewGuid().ToString("N"); + string messageIdC = Guid.NewGuid().ToString("N"); + + MessageMerger merger = new(); + + // Add updates without CreatedAt in order A, B, C + merger.AddUpdate(new AgentResponseUpdate + { + ResponseId = responseId, + MessageId = messageIdA, + Role = ChatRole.Assistant, + Contents = [new TextContent("Message A")], + // No CreatedAt + }); + merger.AddUpdate(new AgentResponseUpdate + { + ResponseId = responseId, + MessageId = messageIdB, + Role = ChatRole.Assistant, + Contents = [new TextContent("Message B")], + // No CreatedAt + }); + merger.AddUpdate(new AgentResponseUpdate + { + ResponseId = responseId, + MessageId = messageIdC, + Role = ChatRole.Assistant, + Contents = [new TextContent("Message C")], + // No CreatedAt + }); + + // Act + AgentResponse response = merger.ComputeMerged(responseId); + + // Assert: Output order should be A, B, C (insertion order) + response.Messages.Should().HaveCount(3); + response.Messages[0].Text.Should().Be("Message A"); + response.Messages[1].Text.Should().Be("Message B"); + response.Messages[2].Text.Should().Be("Message C"); + } + + [Fact] + public void Test_MessageMerger_PreservesInsertionOrder_WhenMixedTimestamps() + { + // Arrange: Updates with OUT-OF-ORDER timestamps relative to emission order. + // Emission order: A, B, C. + // Timestamp order: C (oldest), A, B (newest) - reverse of emission. + // Per Invariant 2, emission order wins; timestamps are ignored for ordering. + string responseId = Guid.NewGuid().ToString("N"); + string messageIdA = Guid.NewGuid().ToString("N"); + string messageIdB = Guid.NewGuid().ToString("N"); + string messageIdC = Guid.NewGuid().ToString("N"); + + DateTimeOffset timeOldest = DateTimeOffset.UtcNow.AddMinutes(-10); + DateTimeOffset timeMiddle = DateTimeOffset.UtcNow.AddMinutes(-5); + DateTimeOffset timeNewest = DateTimeOffset.UtcNow; + + MessageMerger merger = new(); + + // Emit A first but stamp it with timeMiddle. + merger.AddUpdate(new AgentResponseUpdate + { + ResponseId = responseId, + MessageId = messageIdA, + Role = ChatRole.Assistant, + CreatedAt = timeMiddle, + Contents = [new TextContent("Message A")], + }); + // Emit B second, without a timestamp. + merger.AddUpdate(new AgentResponseUpdate + { + ResponseId = responseId, + MessageId = messageIdB, + Role = ChatRole.Assistant, + // No CreatedAt + Contents = [new TextContent("Message B")], + }); + // Emit C third but stamp it with timeOldest - if we sorted by timestamp, + // C would come first; the new merger MUST keep emission order (A, B, C). + merger.AddUpdate(new AgentResponseUpdate + { + ResponseId = responseId, + MessageId = messageIdC, + Role = ChatRole.Assistant, + CreatedAt = timeOldest, + Contents = [new TextContent("Message C")], + }); + // Stamp a fourth message with the newest timestamp - it should still come last + // because it was emitted last, not because of its timestamp. + string messageIdD = Guid.NewGuid().ToString("N"); + merger.AddUpdate(new AgentResponseUpdate + { + ResponseId = responseId, + MessageId = messageIdD, + Role = ChatRole.Assistant, + CreatedAt = timeNewest, + Contents = [new TextContent("Message D")], + }); + + // Act + AgentResponse response = merger.ComputeMerged(responseId); + + // Assert: Emission order wins; CreatedAt is ignored for ordering. + response.Messages.Should().HaveCount(4); + response.Messages[0].Text.Should().Be("Message A"); + response.Messages[1].Text.Should().Be("Message B"); + response.Messages[2].Text.Should().Be("Message C"); + response.Messages[3].Text.Should().Be("Message D"); + } + + [Fact] + public void Test_MessageMerger_ReproducibleOrdering_WithMixedTimestamps() + { + // Arrange: 3+ messages with mixed null/non-null CreatedAt values + // This tests that the same input sequence produces the same output + // (run-to-run reproducibility for a fixed input) + string responseId = Guid.NewGuid().ToString("N"); + string messageIdA = Guid.NewGuid().ToString("N"); + string messageIdB = Guid.NewGuid().ToString("N"); + string messageIdC = Guid.NewGuid().ToString("N"); + + DateTimeOffset time10 = DateTimeOffset.UtcNow.AddSeconds(10); + DateTimeOffset time5 = DateTimeOffset.UtcNow.AddSeconds(5); + + MessageMerger merger = new(); + + // A: CreatedAt = time10, idx=0 + // B: CreatedAt = null, idx=1 + // C: CreatedAt = time5, idx=2 + merger.AddUpdate(new AgentResponseUpdate + { + ResponseId = responseId, + MessageId = messageIdA, + Role = ChatRole.Assistant, + CreatedAt = time10, + Contents = [new TextContent("Message A (T=10)")], + }); + merger.AddUpdate(new AgentResponseUpdate + { + ResponseId = responseId, + MessageId = messageIdB, + Role = ChatRole.Assistant, + // No CreatedAt + Contents = [new TextContent("Message B (no timestamp)")], + }); + merger.AddUpdate(new AgentResponseUpdate + { + ResponseId = responseId, + MessageId = messageIdC, + Role = ChatRole.Assistant, + CreatedAt = time5, + Contents = [new TextContent("Message C (T=5)")], + }); + + // Act - Run multiple times to verify reproducibility + AgentResponse response1 = merger.ComputeMerged(responseId); + + // Create a fresh merger with same data to verify reproducibility + MessageMerger merger2 = new(); + merger2.AddUpdate(new AgentResponseUpdate + { + ResponseId = responseId, + MessageId = messageIdA, + Role = ChatRole.Assistant, + CreatedAt = time10, + Contents = [new TextContent("Message A (T=10)")], + }); + merger2.AddUpdate(new AgentResponseUpdate + { + ResponseId = responseId, + MessageId = messageIdB, + Role = ChatRole.Assistant, + Contents = [new TextContent("Message B (no timestamp)")], + }); + merger2.AddUpdate(new AgentResponseUpdate + { + ResponseId = responseId, + MessageId = messageIdC, + Role = ChatRole.Assistant, + CreatedAt = time5, + Contents = [new TextContent("Message C (T=5)")], + }); + AgentResponse response2 = merger2.ComputeMerged(responseId); + + // Assert: Result is reproducible and consistent across runs with same input order. + // Per Invariant 2, both runs must produce emission order (A, B, C) regardless + // of the messages' CreatedAt values. + response1.Messages.Should().HaveCount(3); + response2.Messages.Should().HaveCount(3); + + response1.Messages.Select(m => m.Text).Should().ContainInOrder( + "Message A (T=10)", "Message B (no timestamp)", "Message C (T=5)"); + + // Both runs should produce identical ordering + for (int i = 0; i < 3; i++) + { + response1.Messages[i].Text.Should().Be(response2.Messages[i].Text); + } + } + + #endregion + + #region Invariant 3: Agent Message Grouping Tests + + [Fact] + public void Test_MessageMerger_GroupsMessagesByResponseId_InMultiAgentScenario() + { + // Arrange: Interleaved updates from Agent1 (R1) and Agent2 (R2) + string responseIdR1 = Guid.NewGuid().ToString("N"); + string responseIdR2 = Guid.NewGuid().ToString("N"); + string messageIdA1M1 = Guid.NewGuid().ToString("N"); + string messageIdA1M2 = Guid.NewGuid().ToString("N"); + string messageIdA2M1 = Guid.NewGuid().ToString("N"); + string messageIdA2M2 = Guid.NewGuid().ToString("N"); + + MessageMerger merger = new(); + + // Interleaved arrival: A1-msg1, A2-msg1, A1-msg2, A2-msg2 + merger.AddUpdate(new AgentResponseUpdate + { + ResponseId = responseIdR1, + MessageId = messageIdA1M1, + AgentId = TestAgentId1, + Role = ChatRole.Assistant, + Contents = [new TextContent("Agent1 Message 1")], + }); + merger.AddUpdate(new AgentResponseUpdate + { + ResponseId = responseIdR2, + MessageId = messageIdA2M1, + AgentId = TestAgentId2, + Role = ChatRole.Assistant, + Contents = [new TextContent("Agent2 Message 1")], + }); + merger.AddUpdate(new AgentResponseUpdate + { + ResponseId = responseIdR1, + MessageId = messageIdA1M2, + AgentId = TestAgentId1, + Role = ChatRole.Assistant, + Contents = [new TextContent("Agent1 Message 2")], + }); + merger.AddUpdate(new AgentResponseUpdate + { + ResponseId = responseIdR2, + MessageId = messageIdA2M2, + AgentId = TestAgentId2, + Role = ChatRole.Assistant, + Contents = [new TextContent("Agent2 Message 2")], + }); + + // Act + AgentResponse response = merger.ComputeMerged(responseIdR1); + + // Assert: Messages should be grouped by ResponseId (which groups by agent) + // Output should be either [A1-msg1, A1-msg2, A2-msg1, A2-msg2] or [A2-msg1, A2-msg2, A1-msg1, A1-msg2] + // The key invariant: Agent1's messages are contiguous, Agent2's messages are contiguous + response.Messages.Should().HaveCount(4); + + // Verify grouping - collect message texts and verify they're grouped by agent + var messageTexts = response.Messages.Select(m => m.Text).ToList(); + + // Find first Agent1 message index and first Agent2 message index + int firstA1Index = messageTexts.FindIndex(t => t.StartsWith("Agent1", StringComparison.Ordinal)); + int firstA2Index = messageTexts.FindIndex(t => t.StartsWith("Agent2", StringComparison.Ordinal)); + + // Assert both indices are valid (messages were found) + firstA1Index.Should().BeGreaterThanOrEqualTo(0, "Agent1 messages should be present in response"); + firstA2Index.Should().BeGreaterThanOrEqualTo(0, "Agent2 messages should be present in response"); + + // All Agent1 messages should be contiguous (either at start or after all Agent2 messages) + var a1Messages = messageTexts.Where(t => t.StartsWith("Agent1", StringComparison.Ordinal)).ToList(); + var a2Messages = messageTexts.Where(t => t.StartsWith("Agent2", StringComparison.Ordinal)).ToList(); + + a1Messages.Should().HaveCount(2); + a2Messages.Should().HaveCount(2); + + // Verify no interleaving: if A1 comes first, A2 should come after all A1 messages + if (firstA1Index < firstA2Index) + { + // A1 messages at indices 0, 1 and A2 messages at indices 2, 3 + messageTexts[0].Should().StartWith("Agent1"); + messageTexts[1].Should().StartWith("Agent1"); + messageTexts[2].Should().StartWith("Agent2"); + messageTexts[3].Should().StartWith("Agent2"); + } + else + { + // A2 messages at indices 0, 1 and A1 messages at indices 2, 3 + messageTexts[0].Should().StartWith("Agent2"); + messageTexts[1].Should().StartWith("Agent2"); + messageTexts[2].Should().StartWith("Agent1"); + messageTexts[3].Should().StartWith("Agent1"); + } + } + + [Fact] + public void Test_MessageMerger_MaintainsAgentGrouping_WithDifferentResponseIds() + { + // Arrange: Agent1 uses ResponseId=R1, Agent2 uses ResponseId=R2 + // Multiple messages per ResponseId to properly test contiguity + string responseIdR1 = Guid.NewGuid().ToString("N"); + string responseIdR2 = Guid.NewGuid().ToString("N"); + string messageIdA1M1 = Guid.NewGuid().ToString("N"); + string messageIdA1M2 = Guid.NewGuid().ToString("N"); + string messageIdA1M3 = Guid.NewGuid().ToString("N"); + string messageIdA2M1 = Guid.NewGuid().ToString("N"); + string messageIdA2M2 = Guid.NewGuid().ToString("N"); + string messageIdA2M3 = Guid.NewGuid().ToString("N"); + + MessageMerger merger = new(); + + // Interleaved arrival: A1-1, A2-1, A1-2, A2-2, A1-3, A2-3 + merger.AddUpdate(new AgentResponseUpdate + { + ResponseId = responseIdR1, + MessageId = messageIdA1M1, + AgentId = TestAgentId1, + Role = ChatRole.Assistant, + Contents = [new TextContent("Agent1 Response 1")], + }); + merger.AddUpdate(new AgentResponseUpdate + { + ResponseId = responseIdR2, + MessageId = messageIdA2M1, + AgentId = TestAgentId2, + Role = ChatRole.Assistant, + Contents = [new TextContent("Agent2 Response 1")], + }); + merger.AddUpdate(new AgentResponseUpdate + { + ResponseId = responseIdR1, + MessageId = messageIdA1M2, + AgentId = TestAgentId1, + Role = ChatRole.Assistant, + Contents = [new TextContent("Agent1 Response 2")], + }); + merger.AddUpdate(new AgentResponseUpdate + { + ResponseId = responseIdR2, + MessageId = messageIdA2M2, + AgentId = TestAgentId2, + Role = ChatRole.Assistant, + Contents = [new TextContent("Agent2 Response 2")], + }); + merger.AddUpdate(new AgentResponseUpdate + { + ResponseId = responseIdR1, + MessageId = messageIdA1M3, + AgentId = TestAgentId1, + Role = ChatRole.Assistant, + Contents = [new TextContent("Agent1 Response 3")], + }); + merger.AddUpdate(new AgentResponseUpdate + { + ResponseId = responseIdR2, + MessageId = messageIdA2M3, + AgentId = TestAgentId2, + Role = ChatRole.Assistant, + Contents = [new TextContent("Agent2 Response 3")], + }); + + // Act + AgentResponse response = merger.ComputeMerged(responseIdR1); + + // Assert: Messages from each agent are contiguous (not interleaved) + response.Messages.Should().HaveCount(6); + + var messageTexts = response.Messages.Select(m => m.Text).ToList(); + + // Verify all messages are present + messageTexts.Should().Contain("Agent1 Response 1"); + messageTexts.Should().Contain("Agent1 Response 2"); + messageTexts.Should().Contain("Agent1 Response 3"); + messageTexts.Should().Contain("Agent2 Response 1"); + messageTexts.Should().Contain("Agent2 Response 2"); + messageTexts.Should().Contain("Agent2 Response 3"); + + // Find indices to verify contiguity + int firstA1Index = messageTexts.FindIndex(t => t.StartsWith("Agent1", StringComparison.Ordinal)); + int lastA1Index = messageTexts.FindLastIndex(t => t.StartsWith("Agent1", StringComparison.Ordinal)); + int firstA2Index = messageTexts.FindIndex(t => t.StartsWith("Agent2", StringComparison.Ordinal)); + int lastA2Index = messageTexts.FindLastIndex(t => t.StartsWith("Agent2", StringComparison.Ordinal)); + + // Assert indices are valid + firstA1Index.Should().BeGreaterThanOrEqualTo(0, "Agent1 messages should be present"); + firstA2Index.Should().BeGreaterThanOrEqualTo(0, "Agent2 messages should be present"); + + // Verify contiguity: all Agent1 messages should span exactly 3 consecutive indices + (lastA1Index - firstA1Index).Should().Be(2, "Agent1 messages should be contiguous (3 messages spanning 2 index gaps)"); + (lastA2Index - firstA2Index).Should().Be(2, "Agent2 messages should be contiguous (3 messages spanning 2 index gaps)"); + + // Verify no interleaving: ranges should not overlap + bool a1BeforeA2 = lastA1Index < firstA2Index; + bool a2BeforeA1 = lastA2Index < firstA1Index; + (a1BeforeA2 || a2BeforeA1).Should().BeTrue("Agent message blocks should not interleave"); + } + + #endregion + + #region Step Ordering Tests (workflow goals) + + [Fact] + public void Test_MessageMerger_OrdersStepsBeforeNextStep_AndGroupsAgentsWithinStep() + { + // Scenario mirroring the workflow ordering goals: + // Step 1: Agent1 alone emits 2 updates. + // Step 2: Agent1 and Agent2 emit updates interleaved (concurrent in the step). + // Step 3: Agent2 alone emits 2 updates. + // Each agent invocation has its own ResponseId, so per-ResponseId grouping + // yields per-agent grouping. Steps are naturally serialized in emission + // order (the next step cannot emit until the prior step's updates arrive), + // so the first-seen-ResponseId ordering preserves step boundaries. + const string step1Agent1 = "step1-agent1"; + const string step2Agent1 = "step2-agent1"; + const string step2Agent2 = "step2-agent2"; + const string step3Agent2 = "step3-agent2"; + + MessageMerger merger = new(); + + // Step 1: Agent1 emits 2 messages. + AddSimpleUpdate(merger, step1Agent1, TestAgentId1, "S1-A1-m1"); + AddSimpleUpdate(merger, step1Agent1, TestAgentId1, "S1-A1-m2"); + + // Step 2: Agent1 and Agent2 emit interleaved (A1, A2, A1, A2). + AddSimpleUpdate(merger, step2Agent1, TestAgentId1, "S2-A1-m1"); + AddSimpleUpdate(merger, step2Agent2, TestAgentId2, "S2-A2-m1"); + AddSimpleUpdate(merger, step2Agent1, TestAgentId1, "S2-A1-m2"); + AddSimpleUpdate(merger, step2Agent2, TestAgentId2, "S2-A2-m2"); + + // Step 3: Agent2 emits 2 messages. + AddSimpleUpdate(merger, step3Agent2, TestAgentId2, "S3-A2-m1"); + AddSimpleUpdate(merger, step3Agent2, TestAgentId2, "S3-A2-m2"); + + // Act + AgentResponse response = merger.ComputeMerged(step1Agent1); + + // Assert: expected output order + // Step 1 block (Agent1): S1-A1-m1, S1-A1-m2 + // Step 2 Agent1 block: S2-A1-m1, S2-A1-m2 + // Step 2 Agent2 block: S2-A2-m1, S2-A2-m2 + // Step 3 block (Agent2): S3-A2-m1, S3-A2-m2 + response.Messages.Should().HaveCount(8); + var texts = response.Messages.Select(m => m.Text).ToList(); + texts.Should().ContainInOrder( + "S1-A1-m1", "S1-A1-m2", + "S2-A1-m1", "S2-A1-m2", + "S2-A2-m1", "S2-A2-m2", + "S3-A2-m1", "S3-A2-m2"); + + // Step boundaries: no step-2 or step-3 message may appear before the last step-1 message. + int lastStep1Index = texts.FindLastIndex(t => t.StartsWith("S1-", StringComparison.Ordinal)); + int firstStep2Index = texts.FindIndex(t => t.StartsWith("S2-", StringComparison.Ordinal)); + int lastStep2Index = texts.FindLastIndex(t => t.StartsWith("S2-", StringComparison.Ordinal)); + int firstStep3Index = texts.FindIndex(t => t.StartsWith("S3-", StringComparison.Ordinal)); + lastStep1Index.Should().BeLessThan(firstStep2Index, "all step-1 messages precede step-2"); + lastStep2Index.Should().BeLessThan(firstStep3Index, "all step-2 messages precede step-3"); + + // Within step 2 (multi-agent), per-agent blocks must be contiguous. + int firstS2A1 = texts.FindIndex(t => t.StartsWith("S2-A1", StringComparison.Ordinal)); + int lastS2A1 = texts.FindLastIndex(t => t.StartsWith("S2-A1", StringComparison.Ordinal)); + int firstS2A2 = texts.FindIndex(t => t.StartsWith("S2-A2", StringComparison.Ordinal)); + int lastS2A2 = texts.FindLastIndex(t => t.StartsWith("S2-A2", StringComparison.Ordinal)); + (lastS2A1 - firstS2A1).Should().Be(1, "Agent1 step-2 messages should be contiguous"); + (lastS2A2 - firstS2A2).Should().Be(1, "Agent2 step-2 messages should be contiguous"); + (lastS2A1 < firstS2A2 || lastS2A2 < firstS2A1).Should().BeTrue("agent blocks within a step must not interleave"); + + static void AddSimpleUpdate(MessageMerger m, string responseId, string agentId, string text) + { + m.AddUpdate(new AgentResponseUpdate + { + ResponseId = responseId, + MessageId = Guid.NewGuid().ToString("N"), + AgentId = agentId, + Role = ChatRole.Assistant, + Contents = [new TextContent(text)], + }); + } + } + + #endregion }