Skip to content

Commit 148e221

Browse files
authored
Use ToChatCompletion / ToStreamingChatCompletionUpdates in CachingChatClient (#5616)
* Use ToChatCompletion / ToStreamingChatCompletionUpdates in CachingChatClient Adds a ToStreamingChatCompletionUpdates method that's the counterpart to the recently added ToChatCompletion. Then uses both from CachingChatClient instead of its now bespoke coalescing implementation. When coalescing is enabled (the default), CachingChatClient caches everything as a ChatCompletion, rather than distinguishing streaming and non-streaming. * Address PR feedback
1 parent c163960 commit 148e221

File tree

7 files changed

+281
-131
lines changed

7 files changed

+281
-131
lines changed

src/Libraries/Microsoft.Extensions.AI.Abstractions/ChatCompletion/ChatCompletion.cs

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,4 +87,53 @@ public ChatMessage Message
8787
/// <inheritdoc />
8888
public override string ToString() =>
8989
Choices is { Count: > 0 } choices ? string.Join(Environment.NewLine, choices) : string.Empty;
90+
91+
/// <summary>Creates an array of <see cref="StreamingChatCompletionUpdate" /> instances that represent this <see cref="ChatCompletion" />.</summary>
92+
/// <returns>An array of <see cref="StreamingChatCompletionUpdate" /> instances that may be used to represent this <see cref="ChatCompletion" />.</returns>
93+
public StreamingChatCompletionUpdate[] ToStreamingChatCompletionUpdates()
94+
{
95+
StreamingChatCompletionUpdate? extra = null;
96+
if (AdditionalProperties is not null || Usage is not null)
97+
{
98+
extra = new StreamingChatCompletionUpdate
99+
{
100+
AdditionalProperties = AdditionalProperties
101+
};
102+
103+
if (Usage is { } usage)
104+
{
105+
extra.Contents.Add(new UsageContent(usage));
106+
}
107+
}
108+
109+
int choicesCount = Choices.Count;
110+
var updates = new StreamingChatCompletionUpdate[choicesCount + (extra is null ? 0 : 1)];
111+
112+
for (int choiceIndex = 0; choiceIndex < choicesCount; choiceIndex++)
113+
{
114+
ChatMessage choice = Choices[choiceIndex];
115+
updates[choiceIndex] = new StreamingChatCompletionUpdate
116+
{
117+
ChoiceIndex = choiceIndex,
118+
119+
AdditionalProperties = choice.AdditionalProperties,
120+
AuthorName = choice.AuthorName,
121+
Contents = choice.Contents,
122+
RawRepresentation = choice.RawRepresentation,
123+
Role = choice.Role,
124+
125+
CompletionId = CompletionId,
126+
CreatedAt = CreatedAt,
127+
FinishReason = FinishReason,
128+
ModelId = ModelId
129+
};
130+
}
131+
132+
if (extra is not null)
133+
{
134+
updates[choicesCount] = extra;
135+
}
136+
137+
return updates;
138+
}
90139
}

src/Libraries/Microsoft.Extensions.AI.Abstractions/ChatCompletion/StreamingChatCompletionUpdate.cs

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,35 @@
99

1010
namespace Microsoft.Extensions.AI;
1111

12-
// Conceptually this combines the roles of ChatCompletion and ChatMessage in streaming output.
13-
// For ease of consumption, it also flattens the nested structure you see on streaming chunks in
14-
// the OpenAI/Gemini APIs, so instead of a dictionary of choices, each update represents a single
15-
// choice (and hence has its own role, choice ID, etc.).
16-
1712
/// <summary>
18-
/// Represents a single response chunk from an <see cref="IChatClient"/>.
13+
/// Represents a single streaming response chunk from an <see cref="IChatClient"/>.
1914
/// </summary>
15+
/// <remarks>
16+
/// <para>
17+
/// Conceptually, this combines the roles of <see cref="ChatCompletion"/> and <see cref="ChatMessage"/>
18+
/// in streaming output. For ease of consumption, it also flattens the nested structure you see on
19+
/// streaming chunks in some AI service, so instead of a dictionary of choices, each update represents a
20+
/// single choice (and hence has its own role, choice ID, etc.).
21+
/// </para>
22+
/// <para>
23+
/// <see cref="StreamingChatCompletionUpdate"/> is so named because it represents streaming updates
24+
/// to a single chat completion. As such, it is considered erroneous for multiple updates that are part
25+
/// of the same completion to contain competing values. For example, some updates that are part of
26+
/// the same completion may have a <see langword="null"/> <see cref="StreamingChatCompletionUpdate.Role"/>
27+
/// value, and others may have a non-<see langword="null"/> value, but all of those with a non-<see langword="null"/>
28+
/// value must have the same value (e.g. <see cref="ChatRole.Assistant"/>. It should never be the case, for example,
29+
/// that one <see cref="StreamingChatCompletionUpdate"/> in a completion has a role of <see cref="ChatRole.Assistant"/>
30+
/// while another has a role of "AI".
31+
/// </para>
32+
/// <para>
33+
/// The relationship between <see cref="ChatCompletion"/> and <see cref="StreamingChatCompletionUpdate"/> is
34+
/// codified in the <see cref="StreamingChatCompletionUpdateExtensions.ToChatCompletionAsync"/> and
35+
/// <see cref="ChatCompletion.ToStreamingChatCompletionUpdates"/>, which enable bidirectional conversions
36+
/// between the two. Note, however, that the conversion may be slightly lossy, for example if multiple updates
37+
/// all have different <see cref="StreamingChatCompletionUpdate.RawRepresentation"/> objects whereas there's
38+
/// only one slot for such an object available in <see cref="ChatCompletion.RawRepresentation"/>.
39+
/// </para>
40+
/// </remarks>
2041
public class StreamingChatCompletionUpdate
2142
{
2243
/// <summary>The completion update content items.</summary>

src/Libraries/Microsoft.Extensions.AI.Abstractions/ChatCompletion/StreamingChatCompletionUpdateExtensions.cs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// The .NET Foundation licenses this file to you under the MIT license.
33

44
using System.Collections.Generic;
5+
using System.Linq;
56
#if NET
67
using System.Runtime.InteropServices;
78
#endif
@@ -133,7 +134,22 @@ private static void ProcessUpdate(StreamingChatCompletionUpdate update, Dictiona
133134
/// <param name="coalesceContent">The corresponding option value provided to <see cref="ToChatCompletion"/> or <see cref="ToChatCompletionAsync"/>.</param>
134135
private static void AddMessagesToCompletion(Dictionary<int, ChatMessage> messages, ChatCompletion completion, bool coalesceContent)
135136
{
136-
foreach (var entry in messages)
137+
if (messages.Count <= 1)
138+
{
139+
foreach (var entry in messages)
140+
{
141+
AddMessage(completion, coalesceContent, entry);
142+
}
143+
}
144+
else
145+
{
146+
foreach (var entry in messages.OrderBy(entry => entry.Key))
147+
{
148+
AddMessage(completion, coalesceContent, entry);
149+
}
150+
}
151+
152+
static void AddMessage(ChatCompletion completion, bool coalesceContent, KeyValuePair<int, ChatMessage> entry)
137153
{
138154
if (entry.Value.Role == default)
139155
{
@@ -154,6 +170,8 @@ private static void AddMessagesToCompletion(Dictionary<int, ChatMessage> message
154170
if (content is UsageContent c)
155171
{
156172
completion.Usage = c.Details;
173+
entry.Value.Contents = entry.Value.Contents.ToList();
174+
_ = entry.Value.Contents.Remove(c);
157175
break;
158176
}
159177
}

src/Libraries/Microsoft.Extensions.AI/ChatCompletion/CachingChatClient.cs

Lines changed: 43 additions & 113 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33

44
using System.Collections.Generic;
55
using System.Runtime.CompilerServices;
6-
using System.Text;
76
using System.Threading;
87
using System.Threading.Tasks;
98
using Microsoft.Shared.Diagnostics;
@@ -48,13 +47,12 @@ public override async Task<ChatCompletion> CompleteAsync(IList<ChatMessage> chat
4847
// concurrent callers might trigger duplicate requests, but that's acceptable.
4948
var cacheKey = GetCacheKey(false, chatMessages, options);
5049

51-
if (await ReadCacheAsync(cacheKey, cancellationToken).ConfigureAwait(false) is ChatCompletion existing)
50+
if (await ReadCacheAsync(cacheKey, cancellationToken).ConfigureAwait(false) is not { } result)
5251
{
53-
return existing;
52+
result = await base.CompleteAsync(chatMessages, options, cancellationToken).ConfigureAwait(false);
53+
await WriteCacheAsync(cacheKey, result, cancellationToken).ConfigureAwait(false);
5454
}
5555

56-
var result = await base.CompleteAsync(chatMessages, options, cancellationToken).ConfigureAwait(false);
57-
await WriteCacheAsync(cacheKey, result, cancellationToken).ConfigureAwait(false);
5856
return result;
5957
}
6058

@@ -64,127 +62,59 @@ public override async IAsyncEnumerable<StreamingChatCompletionUpdate> CompleteSt
6462
{
6563
_ = Throw.IfNull(chatMessages);
6664

67-
var cacheKey = GetCacheKey(true, chatMessages, options);
68-
if (await ReadCacheStreamingAsync(cacheKey, cancellationToken).ConfigureAwait(false) is { } existingChunks)
65+
if (CoalesceStreamingUpdates)
6966
{
70-
// Yield all of the cached items.
71-
foreach (var chunk in existingChunks)
67+
// When coalescing updates, we cache non-streaming results coalesced from streaming ones. That means
68+
// we make a streaming request, yielding those results, but then convert those into a non-streaming
69+
// result and cache it. When we get a cache hit, we yield the non-streaming result as a streaming one.
70+
71+
var cacheKey = GetCacheKey(true, chatMessages, options);
72+
if (await ReadCacheAsync(cacheKey, cancellationToken).ConfigureAwait(false) is { } chatCompletion)
7273
{
73-
yield return chunk;
74+
// Yield all of the cached items.
75+
foreach (var chunk in chatCompletion.ToStreamingChatCompletionUpdates())
76+
{
77+
yield return chunk;
78+
}
79+
}
80+
else
81+
{
82+
// Yield and store all of the items.
83+
List<StreamingChatCompletionUpdate> capturedItems = [];
84+
await foreach (var chunk in base.CompleteStreamingAsync(chatMessages, options, cancellationToken).ConfigureAwait(false))
85+
{
86+
capturedItems.Add(chunk);
87+
yield return chunk;
88+
}
89+
90+
// Write the captured items to the cache as a non-streaming result.
91+
await WriteCacheAsync(cacheKey, capturedItems.ToChatCompletion(), cancellationToken).ConfigureAwait(false);
7492
}
7593
}
7694
else
7795
{
78-
// Yield and store all of the items.
79-
List<StreamingChatCompletionUpdate> capturedItems = [];
80-
await foreach (var chunk in base.CompleteStreamingAsync(chatMessages, options, cancellationToken).ConfigureAwait(false))
96+
var cacheKey = GetCacheKey(true, chatMessages, options);
97+
if (await ReadCacheStreamingAsync(cacheKey, cancellationToken).ConfigureAwait(false) is { } existingChunks)
8198
{
82-
capturedItems.Add(chunk);
83-
yield return chunk;
99+
// Yield all of the cached items.
100+
foreach (var chunk in existingChunks)
101+
{
102+
yield return chunk;
103+
}
84104
}
85-
86-
// If the caching client is configured to coalesce streaming updates, do so now within the capturedItems list.
87-
if (CoalesceStreamingUpdates)
105+
else
88106
{
89-
StringBuilder coalescedText = new();
90-
91-
// Iterate through all of the items in the list looking for contiguous items that can be coalesced.
92-
for (int startInclusive = 0; startInclusive < capturedItems.Count; startInclusive++)
107+
// Yield and store all of the items.
108+
List<StreamingChatCompletionUpdate> capturedItems = [];
109+
await foreach (var chunk in base.CompleteStreamingAsync(chatMessages, options, cancellationToken).ConfigureAwait(false))
93110
{
94-
// If an item isn't generally coalescable, skip it.
95-
StreamingChatCompletionUpdate update = capturedItems[startInclusive];
96-
if (update.ChoiceIndex != 0 ||
97-
update.Contents.Count != 1 ||
98-
update.Contents[0] is not TextContent textContent)
99-
{
100-
continue;
101-
}
102-
103-
// We found a coalescable item. Look for more contiguous items that are also coalescable with it.
104-
int endExclusive = startInclusive + 1;
105-
for (; endExclusive < capturedItems.Count; endExclusive++)
106-
{
107-
StreamingChatCompletionUpdate next = capturedItems[endExclusive];
108-
if (next.ChoiceIndex != 0 ||
109-
next.Contents.Count != 1 ||
110-
next.Contents[0] is not TextContent ||
111-
112-
// changing role or author would be really strange, but check anyway
113-
(update.Role is not null && next.Role is not null && update.Role != next.Role) ||
114-
(update.AuthorName is not null && next.AuthorName is not null && update.AuthorName != next.AuthorName))
115-
{
116-
break;
117-
}
118-
}
119-
120-
// If we couldn't find anything to coalesce, there's nothing to do.
121-
if (endExclusive - startInclusive <= 1)
122-
{
123-
continue;
124-
}
125-
126-
// We found a coalescable run of items. Create a new node to represent the run. We create a new one
127-
// rather than reappropriating one of the existing ones so as not to mutate an item already yielded.
128-
_ = coalescedText.Clear().Append(capturedItems[startInclusive].Text);
129-
130-
TextContent coalescedContent = new(null) // will patch the text after examining all items in the run
131-
{
132-
AdditionalProperties = textContent.AdditionalProperties?.Clone(),
133-
};
134-
135-
StreamingChatCompletionUpdate coalesced = new()
136-
{
137-
AdditionalProperties = update.AdditionalProperties?.Clone(),
138-
AuthorName = update.AuthorName,
139-
CompletionId = update.CompletionId,
140-
Contents = [coalescedContent],
141-
CreatedAt = update.CreatedAt,
142-
FinishReason = update.FinishReason,
143-
ModelId = update.ModelId,
144-
Role = update.Role,
145-
146-
// Explicitly don't include RawRepresentation. It's not applicable if one update ends up being used
147-
// to represent multiple, and it won't be serialized anyway.
148-
};
149-
150-
// Replace the starting node with the coalesced node.
151-
capturedItems[startInclusive] = coalesced;
152-
153-
// Now iterate through all the rest of the updates in the run, updating the coalesced node with relevant properties,
154-
// and nulling out the nodes along the way. We do this rather than removing the entry in order to avoid an O(N^2) operation.
155-
// We'll remove all the null entries at the end of the loop, using RemoveAll to do so, which can remove all of
156-
// the nulls in a single O(N) pass.
157-
for (int i = startInclusive + 1; i < endExclusive; i++)
158-
{
159-
// Grab the next item.
160-
StreamingChatCompletionUpdate next = capturedItems[i];
161-
capturedItems[i] = null!;
162-
163-
var nextContent = (TextContent)next.Contents[0];
164-
_ = coalescedText.Append(nextContent.Text);
165-
166-
coalesced.AuthorName ??= next.AuthorName;
167-
coalesced.CompletionId ??= next.CompletionId;
168-
coalesced.CreatedAt ??= next.CreatedAt;
169-
coalesced.FinishReason ??= next.FinishReason;
170-
coalesced.ModelId ??= next.ModelId;
171-
coalesced.Role ??= next.Role;
172-
}
173-
174-
// Complete the coalescing by patching the text of the coalesced node.
175-
coalesced.Text = coalescedText.ToString();
176-
177-
// Jump to the last update in the run, so that when we loop around and bump ahead,
178-
// we're at the next update just after the run.
179-
startInclusive = endExclusive - 1;
111+
capturedItems.Add(chunk);
112+
yield return chunk;
180113
}
181114

182-
// Remove all of the null slots left over from the coalescing process.
183-
_ = capturedItems.RemoveAll(u => u is null);
115+
// Write the captured items to the cache.
116+
await WriteCacheStreamingAsync(cacheKey, capturedItems, cancellationToken).ConfigureAwait(false);
184117
}
185-
186-
// Write the captured items to the cache.
187-
await WriteCacheStreamingAsync(cacheKey, capturedItems, cancellationToken).ConfigureAwait(false);
188118
}
189119
}
190120

0 commit comments

Comments
 (0)