Skip to content

Commit d256061

Browse files
authored
[BREAKING] Remove NotifyThreadOfNewMessagesAsync AIAgent helper (microsoft#2450)
* Remove NotifyThreadOfNewMessagesAsync helper from AIAgent * Fix bug * Update comment
1 parent 8515c0c commit d256061

10 files changed

Lines changed: 31 additions & 181 deletions

File tree

dotnet/samples/GettingStarted/AgentProviders/Agent_With_CustomImplementation/Program.cs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,16 @@ public override async Task<AgentRunResponse> RunAsync(IEnumerable<ChatMessage> m
3939
// Create a thread if the user didn't supply one.
4040
thread ??= this.GetNewThread();
4141

42+
if (thread is not CustomAgentThread typedThread)
43+
{
44+
throw new ArgumentException($"The provided thread is not of type {nameof(CustomAgentThread)}.", nameof(thread));
45+
}
46+
4247
// Clone the input messages and turn them into response messages with upper case text.
4348
List<ChatMessage> responseMessages = CloneAndToUpperCase(messages, this.DisplayName).ToList();
4449

4550
// Notify the thread of the input and output messages.
46-
await NotifyThreadOfNewMessagesAsync(thread, messages.Concat(responseMessages), cancellationToken);
51+
await typedThread.MessageStore.AddMessagesAsync(messages.Concat(responseMessages), cancellationToken);
4752

4853
return new AgentRunResponse
4954
{
@@ -58,11 +63,16 @@ public override async IAsyncEnumerable<AgentRunResponseUpdate> RunStreamingAsync
5863
// Create a thread if the user didn't supply one.
5964
thread ??= this.GetNewThread();
6065

66+
if (thread is not CustomAgentThread typedThread)
67+
{
68+
throw new ArgumentException($"The provided thread is not of type {nameof(CustomAgentThread)}.", nameof(thread));
69+
}
70+
6171
// Clone the input messages and turn them into response messages with upper case text.
6272
List<ChatMessage> responseMessages = CloneAndToUpperCase(messages, this.DisplayName).ToList();
6373

6474
// Notify the thread of the input and output messages.
65-
await NotifyThreadOfNewMessagesAsync(thread, messages.Concat(responseMessages), cancellationToken);
75+
await typedThread.MessageStore.AddMessagesAsync(messages.Concat(responseMessages), cancellationToken);
6676

6777
foreach (var message in responseMessages)
6878
{

dotnet/src/Microsoft.Agents.AI.Abstractions/AIAgent.cs

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -328,28 +328,4 @@ public abstract IAsyncEnumerable<AgentRunResponseUpdate> RunStreamingAsync(
328328
AgentThread? thread = null,
329329
AgentRunOptions? options = null,
330330
CancellationToken cancellationToken = default);
331-
332-
/// <summary>
333-
/// Notifies the specified thread about new messages that have been added to the conversation.
334-
/// </summary>
335-
/// <param name="thread">The conversation thread to notify about the new messages.</param>
336-
/// <param name="messages">The collection of new messages to report to the thread.</param>
337-
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests. The default is <see cref="CancellationToken.None"/>.</param>
338-
/// <returns>A task that represents the asynchronous notification operation.</returns>
339-
/// <exception cref="ArgumentNullException"><paramref name="thread"/> or <paramref name="messages"/> is <see langword="null"/>.</exception>
340-
/// <remarks>
341-
/// <para>
342-
/// This method ensures that conversation threads are kept informed about message additions, which
343-
/// is important for threads that manage their own state, memory components, or derived context.
344-
/// While all agent implementations should notify their threads, the specific actions taken by
345-
/// each thread type may vary.
346-
/// </para>
347-
/// </remarks>
348-
protected static async Task NotifyThreadOfNewMessagesAsync(AgentThread thread, IEnumerable<ChatMessage> messages, CancellationToken cancellationToken)
349-
{
350-
_ = Throw.IfNull(thread);
351-
_ = Throw.IfNull(messages);
352-
353-
await thread.MessagesReceivedAsync(messages, cancellationToken).ConfigureAwait(false);
354-
}
355331
}

dotnet/src/Microsoft.Agents.AI.Abstractions/AgentThread.cs

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,7 @@
11
// Copyright (c) Microsoft. All rights reserved.
22

33
using System;
4-
using System.Collections.Generic;
54
using System.Text.Json;
6-
using System.Threading;
7-
using System.Threading.Tasks;
8-
using Microsoft.Extensions.AI;
95
using Microsoft.Shared.Diagnostics;
106

117
namespace Microsoft.Agents.AI;
@@ -65,19 +61,6 @@ protected AgentThread()
6561
public virtual JsonElement Serialize(JsonSerializerOptions? jsonSerializerOptions = null)
6662
=> default;
6763

68-
/// <summary>
69-
/// This method is called when new messages have been contributed to the chat by any participant.
70-
/// </summary>
71-
/// <remarks>
72-
/// Inheritors can use this method to update their context based on the new message.
73-
/// </remarks>
74-
/// <param name="newMessages">The new messages.</param>
75-
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests. The default is <see cref="CancellationToken.None"/>.</param>
76-
/// <returns>A task that completes when the context has been updated.</returns>
77-
/// <exception cref="InvalidOperationException">The thread has been deleted.</exception>
78-
protected internal virtual Task MessagesReceivedAsync(IEnumerable<ChatMessage> newMessages, CancellationToken cancellationToken = default)
79-
=> Task.CompletedTask;
80-
8164
/// <summary>Asks the <see cref="AgentThread"/> for an object of the specified type <paramref name="serviceType"/>.</summary>
8265
/// <param name="serviceType">The type of object being requested.</param>
8366
/// <param name="serviceKey">An optional key that can be used to help identify the target service.</param>

dotnet/src/Microsoft.Agents.AI.Abstractions/InMemoryAgentThread.cs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@
44
using System.Collections.Generic;
55
using System.Diagnostics;
66
using System.Text.Json;
7-
using System.Threading;
8-
using System.Threading.Tasks;
97
using Microsoft.Extensions.AI;
108

119
namespace Microsoft.Agents.AI;
@@ -116,10 +114,6 @@ public override JsonElement Serialize(JsonSerializerOptions? jsonSerializerOptio
116114
public override object? GetService(Type serviceType, object? serviceKey = null) =>
117115
base.GetService(serviceType, serviceKey) ?? this.MessageStore?.GetService(serviceType, serviceKey);
118116

119-
/// <inheritdoc />
120-
protected internal override Task MessagesReceivedAsync(IEnumerable<ChatMessage> newMessages, CancellationToken cancellationToken = default)
121-
=> this.MessageStore.AddMessagesAsync(newMessages, cancellationToken);
122-
123117
[DebuggerBrowsable(DebuggerBrowsableState.Never)]
124118
private string DebuggerDisplay => $"Count = {this.MessageStore.Count}";
125119

dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowThread.cs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,6 @@ public WorkflowThread(Workflow workflow, JsonElement serializedThread, IWorkflow
6868

6969
public CheckpointInfo? LastCheckpoint { get; set; }
7070

71-
protected override Task MessagesReceivedAsync(IEnumerable<ChatMessage> newMessages, CancellationToken cancellationToken = default)
72-
=> this.MessageStore.AddMessagesAsync(newMessages, cancellationToken);
73-
7471
public override JsonElement Serialize(JsonSerializerOptions? jsonSerializerOptions = null)
7572
{
7673
JsonMarshaller marshaller = new(jsonSerializerOptions);

dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientAgent.cs

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,7 @@ public override async IAsyncEnumerable<AgentRunResponseUpdate> RunStreamingAsync
270270
this.UpdateThreadWithTypeAndConversationId(safeThread, chatResponse.ConversationId);
271271

272272
// To avoid inconsistent state we only notify the thread of the input messages if no error occurs after the initial request.
273-
await NotifyThreadOfNewMessagesAsync(safeThread, inputMessages.Concat(aiContextProviderMessages ?? []).Concat(chatResponse.Messages), cancellationToken).ConfigureAwait(false);
273+
await NotifyMessageStoreOfNewMessagesAsync(safeThread, inputMessages.Concat(aiContextProviderMessages ?? []).Concat(chatResponse.Messages), cancellationToken).ConfigureAwait(false);
274274

275275
// Notify the AIContextProvider of all new messages.
276276
await NotifyAIContextProviderOfSuccessAsync(safeThread, inputMessages, aiContextProviderMessages, chatResponse.Messages, cancellationToken).ConfigureAwait(false);
@@ -413,7 +413,7 @@ private async Task<TAgentRunResponse> RunCoreAsync<TAgentRunResponse, TChatClien
413413
}
414414

415415
// Only notify the thread of new messages if the chatResponse was successful to avoid inconsistent message state in the thread.
416-
await NotifyThreadOfNewMessagesAsync(safeThread, inputMessages.Concat(aiContextProviderMessages ?? []).Concat(chatResponse.Messages), cancellationToken).ConfigureAwait(false);
416+
await NotifyMessageStoreOfNewMessagesAsync(safeThread, inputMessages.Concat(aiContextProviderMessages ?? []).Concat(chatResponse.Messages), cancellationToken).ConfigureAwait(false);
417417

418418
// Notify the AIContextProvider of all new messages.
419419
await NotifyAIContextProviderOfSuccessAsync(safeThread, inputMessages, aiContextProviderMessages, chatResponse.Messages, cancellationToken).ConfigureAwait(false);
@@ -711,12 +711,26 @@ private void UpdateThreadWithTypeAndConversationId(ChatClientAgentThread thread,
711711
else
712712
{
713713
// If the service doesn't use service side thread storage (i.e. we got no id back from invocation), and
714-
// the thread has no MessageStore yet, and we have a custom messages store, we should update the thread
715-
// with the custom MessageStore so that it has somewhere to store the chat history.
716-
thread.MessageStore ??= this._agentOptions?.ChatMessageStoreFactory?.Invoke(new() { SerializedState = default, JsonSerializerOptions = null });
714+
// the thread has no MessageStore yet, we should update the thread with the custom MessageStore or
715+
// default InMemoryMessageStore so that it has somewhere to store the chat history.
716+
thread.MessageStore ??= this._agentOptions?.ChatMessageStoreFactory?.Invoke(new() { SerializedState = default, JsonSerializerOptions = null }) ?? new InMemoryChatMessageStore();
717717
}
718718
}
719719

720+
private static Task NotifyMessageStoreOfNewMessagesAsync(ChatClientAgentThread thread, IEnumerable<ChatMessage> newMessages, CancellationToken cancellationToken)
721+
{
722+
var messageStore = thread.MessageStore;
723+
724+
// Only notify the message store if we have one.
725+
// If we don't have one, it means that the chat history is service managed and the underlying service is responsible for storing messages.
726+
if (messageStore is not null)
727+
{
728+
return messageStore.AddMessagesAsync(newMessages, cancellationToken);
729+
}
730+
731+
return Task.CompletedTask;
732+
}
733+
720734
private string GetLoggingAgentName() => this.Name ?? "UnnamedAgent";
721735
#endregion
722736
}

dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientAgentThread.cs

Lines changed: 0 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,8 @@
11
// Copyright (c) Microsoft. All rights reserved.
22

33
using System;
4-
using System.Collections.Generic;
54
using System.Diagnostics;
65
using System.Text.Json;
7-
using System.Threading;
8-
using System.Threading.Tasks;
9-
using Microsoft.Extensions.AI;
106
using Microsoft.Shared.Diagnostics;
117

128
namespace Microsoft.Agents.AI;
@@ -181,33 +177,6 @@ public override JsonElement Serialize(JsonSerializerOptions? jsonSerializerOptio
181177
?? this.AIContextProvider?.GetService(serviceType, serviceKey)
182178
?? this.MessageStore?.GetService(serviceType, serviceKey);
183179

184-
/// <inheritdoc />
185-
protected override async Task MessagesReceivedAsync(IEnumerable<ChatMessage> newMessages, CancellationToken cancellationToken = default)
186-
{
187-
switch (this)
188-
{
189-
case { ConversationId: not null }:
190-
// If the thread messages are stored in the service
191-
// there is nothing to do here, since invoking the
192-
// service should already update the thread.
193-
break;
194-
195-
case { MessageStore: null }:
196-
// If there is no conversation id, and no store we can createa a default in memory store and add messages to it.
197-
this._messageStore = new InMemoryChatMessageStore();
198-
await this._messageStore!.AddMessagesAsync(newMessages, cancellationToken).ConfigureAwait(false);
199-
break;
200-
201-
case { MessageStore: not null }:
202-
// If a store has been provided, we need to add the messages to the store.
203-
await this._messageStore!.AddMessagesAsync(newMessages, cancellationToken).ConfigureAwait(false);
204-
break;
205-
206-
default:
207-
throw new UnreachableException();
208-
}
209-
}
210-
211180
[DebuggerBrowsable(DebuggerBrowsableState.Never)]
212181
private string DebuggerDisplay =>
213182
this.ConversationId is { } conversationId ? $"ConversationId = {conversationId}" :

dotnet/tests/Microsoft.Agents.AI.Abstractions.UnitTests/AIAgentTests.cs

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
using System.Threading.Tasks;
99
using Microsoft.Extensions.AI;
1010
using Moq;
11-
using Moq.Protected;
1211

1312
namespace Microsoft.Agents.AI.Abstractions.UnitTests;
1413

@@ -222,21 +221,6 @@ public void ValidateAgentIDIsIdempotent()
222221
Assert.Equal(id, agent.Id);
223222
}
224223

225-
[Fact]
226-
public async Task NotifyThreadOfNewMessagesNotifiesThreadAsync()
227-
{
228-
var cancellationToken = default(CancellationToken);
229-
230-
var messages = new[] { new ChatMessage(ChatRole.User, "msg1"), new ChatMessage(ChatRole.User, "msg2") };
231-
232-
var threadMock = new Mock<TestAgentThread> { CallBase = true };
233-
threadMock.SetupAllProperties();
234-
235-
await MockAgent.NotifyThreadOfNewMessagesAsync(threadMock.Object, messages, cancellationToken);
236-
237-
threadMock.Protected().Verify("MessagesReceivedAsync", Times.Once(), messages, cancellationToken);
238-
}
239-
240224
#region GetService Method Tests
241225

242226
/// <summary>
@@ -360,9 +344,6 @@ public abstract class TestAgentThread : AgentThread;
360344

361345
private sealed class MockAgent : AIAgent
362346
{
363-
public static new Task NotifyThreadOfNewMessagesAsync(AgentThread thread, IEnumerable<ChatMessage> messages, CancellationToken cancellationToken) =>
364-
AIAgent.NotifyThreadOfNewMessagesAsync(thread, messages, cancellationToken);
365-
366347
public override AgentThread GetNewThread()
367348
=> throw new NotImplementedException();
368349

dotnet/tests/Microsoft.Agents.AI.Abstractions.UnitTests/AgentThreadTests.cs

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
// Copyright (c) Microsoft. All rights reserved.
22

33
using System;
4-
using System.Collections.Generic;
5-
using Microsoft.Extensions.AI;
64

75
#pragma warning disable CA1861 // Avoid constant arrays as arguments
86

@@ -21,15 +19,6 @@ public void Serialize_ReturnsDefaultJsonElement()
2119
Assert.Equal(default, result);
2220
}
2321

24-
[Fact]
25-
public void MessagesReceivedAsync_ReturnsCompletedTask()
26-
{
27-
var thread = new TestAgentThread();
28-
var messages = new List<ChatMessage> { new(ChatRole.User, "hello") };
29-
var result = thread.MessagesReceivedAsync(messages);
30-
Assert.True(result.IsCompleted);
31-
}
32-
3322
#region GetService Method Tests
3423

3524
/// <summary>

dotnet/tests/Microsoft.Agents.AI.UnitTests/ChatClient/ChatClientAgentThreadTests.cs

Lines changed: 0 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
using System.Collections.Generic;
55
using System.Linq;
66
using System.Text.Json;
7-
using System.Threading;
87
using System.Threading.Tasks;
98
using Microsoft.Extensions.AI;
109
using Moq;
@@ -91,50 +90,6 @@ public void SetChatMessageStoreThrowsWhenConversationIdIsSet()
9190

9291
#endregion Constructor and Property Tests
9392

94-
#region OnNewMessagesAsync Tests
95-
96-
[Fact]
97-
public async Task OnNewMessagesAsyncDoesNothingWhenAgentServiceIdAsync()
98-
{
99-
// Arrange
100-
var thread = new ChatClientAgentThread { ConversationId = "thread-123" };
101-
var messages = new List<ChatMessage>
102-
{
103-
new(ChatRole.User, "Hello"),
104-
new(ChatRole.Assistant, "Hi there!")
105-
};
106-
var agent = new MessageSendingAgent();
107-
108-
// Act
109-
await agent.SendMessagesAsync(thread, messages, CancellationToken.None);
110-
Assert.Equal("thread-123", thread.ConversationId);
111-
Assert.Null(thread.MessageStore);
112-
}
113-
114-
[Fact]
115-
public async Task OnNewMessagesAsyncAddsMessagesToStoreAsync()
116-
{
117-
// Arrange
118-
var store = new InMemoryChatMessageStore();
119-
var thread = new ChatClientAgentThread { MessageStore = store };
120-
var messages = new List<ChatMessage>
121-
{
122-
new(ChatRole.User, "Hello"),
123-
new(ChatRole.Assistant, "Hi there!")
124-
};
125-
var agent = new MessageSendingAgent();
126-
127-
// Act
128-
await agent.SendMessagesAsync(thread, messages, CancellationToken.None);
129-
130-
// Assert
131-
Assert.Equal(2, store.Count);
132-
Assert.Equal("Hello", store[0].Text);
133-
Assert.Equal("Hi there!", store[1].Text);
134-
}
135-
136-
#endregion OnNewMessagesAsync Tests
137-
13893
#region Deserialize Tests
13994

14095
[Fact]
@@ -372,22 +327,4 @@ public void GetService_RequestingChatMessageStore_ReturnsChatMessageStore()
372327
}
373328

374329
#endregion
375-
376-
private sealed class MessageSendingAgent : AIAgent
377-
{
378-
public override AgentThread DeserializeThread(JsonElement serializedThread, JsonSerializerOptions? jsonSerializerOptions = null)
379-
=> throw new NotImplementedException();
380-
381-
public override AgentThread GetNewThread()
382-
=> throw new NotImplementedException();
383-
384-
public override Task<AgentRunResponse> RunAsync(IEnumerable<ChatMessage> messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default)
385-
=> throw new NotImplementedException();
386-
387-
public override IAsyncEnumerable<AgentRunResponseUpdate> RunStreamingAsync(IEnumerable<ChatMessage> messages, AgentThread? thread = null, AgentRunOptions? options = null, CancellationToken cancellationToken = default)
388-
=> throw new NotImplementedException();
389-
390-
public Task SendMessagesAsync(AgentThread thread, IEnumerable<ChatMessage> messages, CancellationToken cancellationToken = default)
391-
=> NotifyThreadOfNewMessagesAsync(thread, messages, cancellationToken);
392-
}
393330
}

0 commit comments

Comments
 (0)