Skip to content

Commit be33be6

Browse files
authored
.NET: AIAgentHostExecutor to use ToAgentRunResponse (microsoft#1439)
* AIAgentHostExecutor to use ToAgentRunResponse * Only run agent in stream mode when emit event is true
1 parent b6b29e6 commit be33be6

2 files changed

Lines changed: 22 additions & 78 deletions

File tree

dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/AIAgentHostExecutor.cs

Lines changed: 16 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -51,59 +51,31 @@ protected internal override async ValueTask OnCheckpointRestoredAsync(IWorkflowC
5151

5252
protected override async ValueTask TakeTurnAsync(List<ChatMessage> messages, IWorkflowContext context, bool? emitEvents, CancellationToken cancellationToken = default)
5353
{
54-
emitEvents ??= this._emitEvents;
55-
IAsyncEnumerable<AgentRunResponseUpdate> agentStream = this._agent.RunStreamingAsync(messages, this.EnsureThread(context), cancellationToken: cancellationToken);
56-
57-
List<AIContent> updates = [];
58-
ChatMessage? currentStreamingMessage = null;
59-
60-
await foreach (AgentRunResponseUpdate update in agentStream.ConfigureAwait(false))
54+
if (emitEvents ?? this._emitEvents)
6155
{
62-
if (string.IsNullOrEmpty(update.MessageId))
63-
{
64-
// Ignore updates that don't have a message ID.
65-
continue;
66-
}
56+
// Run the agent in streaming mode only when agent run update events are to be emitted.
57+
IAsyncEnumerable<AgentRunResponseUpdate> agentStream = this._agent.RunStreamingAsync(messages, this.EnsureThread(context), cancellationToken: cancellationToken);
6758

68-
if (emitEvents ?? this._emitEvents)
59+
List<AgentRunResponseUpdate> updates = [];
60+
61+
await foreach (AgentRunResponseUpdate update in agentStream.ConfigureAwait(false))
6962
{
7063
await context.AddEventAsync(new AgentRunUpdateEvent(this.Id, update), cancellationToken).ConfigureAwait(false);
71-
}
72-
73-
// TODO: FunctionCall request handling, and user info request handling.
74-
// In some sense: We should just let it be handled as a ChatMessage, though we should consider
75-
// providing some mechanisms to help the user complete the request, or route it out of the
76-
// workflow.
7764

78-
if (currentStreamingMessage is null || currentStreamingMessage.MessageId != update.MessageId)
79-
{
80-
await PublishCurrentMessageAsync().ConfigureAwait(false);
81-
currentStreamingMessage = new(update.Role ?? ChatRole.Assistant, update.Contents)
82-
{
83-
AuthorName = update.AuthorName,
84-
CreatedAt = update.CreatedAt,
85-
MessageId = update.MessageId,
86-
RawRepresentation = update.RawRepresentation,
87-
AdditionalProperties = update.AdditionalProperties
88-
};
65+
// TODO: FunctionCall request handling, and user info request handling.
66+
// In some sense: We should just let it be handled as a ChatMessage, though we should consider
67+
// providing some mechanisms to help the user complete the request, or route it out of the
68+
// workflow.
69+
updates.Add(update);
8970
}
9071

91-
updates.AddRange(update.Contents);
72+
await context.SendMessageAsync(updates.ToAgentRunResponse().Messages, cancellationToken: cancellationToken).ConfigureAwait(false);
9273
}
93-
94-
await PublishCurrentMessageAsync().ConfigureAwait(false);
95-
96-
async ValueTask PublishCurrentMessageAsync()
74+
else
9775
{
98-
if (currentStreamingMessage is not null && updates.Count > 0)
99-
{
100-
currentStreamingMessage.Contents = updates;
101-
updates = [];
102-
103-
await context.SendMessageAsync(currentStreamingMessage, cancellationToken: cancellationToken).ConfigureAwait(false);
104-
}
105-
106-
currentStreamingMessage = null;
76+
// Otherwise, run the agent in non-streaming mode.
77+
AgentRunResponse response = await this._agent.RunAsync(messages, this.EnsureThread(context), cancellationToken: cancellationToken).ConfigureAwait(false);
78+
await context.SendMessageAsync(response.Messages, cancellationToken: cancellationToken).ConfigureAwait(false);
10779
}
10880
}
10981
}

dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/SpecializedExecutorSmokeTests.cs

Lines changed: 6 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ internal sealed class TestWorkflowContext(string executorId, bool concurrentRuns
116116
{
117117
private readonly StateManager _stateManager = new();
118118

119-
public List<List<ChatMessage>> Updates { get; } = [];
119+
public List<ChatMessage> Updates { get; } = [];
120120

121121
public ValueTask AddEventAsync(WorkflowEvent workflowEvent, CancellationToken cancellationToken = default) =>
122122
default;
@@ -145,11 +145,11 @@ public ValueTask SendMessageAsync(object message, string? targetId = null, Cance
145145
{
146146
if (message is List<ChatMessage> messages)
147147
{
148-
this.Updates.Add(messages);
148+
this.Updates.AddRange(messages);
149149
}
150150
else if (message is ChatMessage chatMessage)
151151
{
152-
this.Updates.Add([chatMessage]);
152+
this.Updates.Add(chatMessage);
153153
}
154154

155155
return default;
@@ -176,52 +176,24 @@ public async Task Test_AIAgentStreamingMessage_AggregationAsync()
176176
"Quisque dignissim ante odio, at facilisis orci porta a. Duis mi augue, fringilla eu egestas a, pellentesque sed lacus."
177177
];
178178

179-
string[][] splits = MessageStrings.Select(t => t.Split()).ToArray();
180-
foreach (string[] messageSplits in splits)
181-
{
182-
for (int i = 0; i < messageSplits.Length - 1; i++)
183-
{
184-
messageSplits[i] += ' ';
185-
}
186-
}
187-
188179
List<ChatMessage> expected = TestAIAgent.ToChatMessages(MessageStrings);
189180

190181
TestAIAgent agent = new(expected);
191182
AIAgentHostExecutor host = new(agent);
192183

193184
TestWorkflowContext collectingContext = new(host.Id);
194185

195-
await host.TakeTurnAsync(new TurnToken(emitEvents: false), collectingContext);
186+
await host.TakeTurnAsync(new TurnToken(emitEvents: true), collectingContext);
196187

197188
// The first empty message is skipped.
198189
collectingContext.Updates.Should().HaveCount(MessageStrings.Length - 1);
199190

200191
for (int i = 1; i < MessageStrings.Length; i++)
201192
{
202193
string expectedText = MessageStrings[i];
203-
string[] expectedSplits = splits[i];
204-
205-
ChatMessage equivalent = expected[i];
206-
List<ChatMessage> collected = collectingContext.Updates[i - 1];
207-
208-
collected.Should().HaveCount(1);
209-
collected[0].Text.Should().Be(expectedText);
210-
collected[0].Contents.Should().HaveCount(splits[i].Length);
194+
ChatMessage collected = collectingContext.Updates[i - 1];
211195

212-
Action<AIContent>[] splitCheckActions = splits[i].Select(MakeSplitCheckAction).ToArray();
213-
Assert.Collection(collected[0].Contents, splitCheckActions);
214-
}
215-
216-
Action<AIContent> MakeSplitCheckAction(string splitString)
217-
{
218-
return Check;
219-
220-
void Check(AIContent content)
221-
{
222-
TextContent? text = content as TextContent;
223-
text!.Text.Should().Be(splitString);
224-
}
196+
collected.Text.Should().Be(expectedText);
225197
}
226198
}
227199
}

0 commit comments

Comments
 (0)