Skip to content

Commit 3a824a8

Browse files
BrennanConroyRuihan-Yin
authored andcommitted
Periodically flush when using PipeWriter in Json (dotnet#102541)
1 parent 3bd48c0 commit 3a824a8

23 files changed

+302
-147
lines changed

src/libraries/Common/src/System/Text/Json/PooledByteBufferWriter.cs

Lines changed: 33 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,20 +5,22 @@
55
using System.Diagnostics;
66
using System.Diagnostics.CodeAnalysis;
77
using System.IO;
8+
using System.IO.Pipelines;
89
using System.Runtime.CompilerServices;
910
using System.Threading;
1011
using System.Threading.Tasks;
1112

1213
namespace System.Text.Json
1314
{
14-
internal sealed class PooledByteBufferWriter : IBufferWriter<byte>, IDisposable
15+
internal sealed class PooledByteBufferWriter : PipeWriter, IDisposable
1516
{
1617
// This class allows two possible configurations: if rentedBuffer is not null then
1718
// it can be used as an IBufferWriter and holds a buffer that should eventually be
1819
// returned to the shared pool. If rentedBuffer is null, then the instance is in a
1920
// cleared/disposed state and it must re-rent a buffer before it can be used again.
2021
private byte[]? _rentedBuffer;
2122
private int _index;
23+
private readonly Stream? _stream;
2224

2325
private const int MinimumBufferSize = 256;
2426

@@ -41,6 +43,11 @@ public PooledByteBufferWriter(int initialCapacity) : this()
4143
_index = 0;
4244
}
4345

46+
public PooledByteBufferWriter(int initialCapacity, Stream stream) : this(initialCapacity)
47+
{
48+
_stream = stream;
49+
}
50+
4451
public ReadOnlyMemory<byte> WrittenMemory
4552
{
4653
get
@@ -127,43 +134,32 @@ public void InitializeEmptyInstance(int initialCapacity)
127134

128135
public static PooledByteBufferWriter CreateEmptyInstanceForCaching() => new PooledByteBufferWriter();
129136

130-
public void Advance(int count)
137+
public override void Advance(int count)
131138
{
132139
Debug.Assert(_rentedBuffer != null);
133140
Debug.Assert(count >= 0);
134141
Debug.Assert(_index <= _rentedBuffer.Length - count);
135142
_index += count;
136143
}
137144

138-
public Memory<byte> GetMemory(int sizeHint = MinimumBufferSize)
145+
public override Memory<byte> GetMemory(int sizeHint = MinimumBufferSize)
139146
{
140147
CheckAndResizeBuffer(sizeHint);
141148
return _rentedBuffer.AsMemory(_index);
142149
}
143150

144-
public Span<byte> GetSpan(int sizeHint = MinimumBufferSize)
151+
public override Span<byte> GetSpan(int sizeHint = MinimumBufferSize)
145152
{
146153
CheckAndResizeBuffer(sizeHint);
147154
return _rentedBuffer.AsSpan(_index);
148155
}
149156

150157
#if NET
151-
internal ValueTask WriteToStreamAsync(Stream destination, CancellationToken cancellationToken)
152-
{
153-
return destination.WriteAsync(WrittenMemory, cancellationToken);
154-
}
155-
156158
internal void WriteToStream(Stream destination)
157159
{
158160
destination.Write(WrittenMemory.Span);
159161
}
160162
#else
161-
internal Task WriteToStreamAsync(Stream destination, CancellationToken cancellationToken)
162-
{
163-
Debug.Assert(_rentedBuffer != null);
164-
return destination.WriteAsync(_rentedBuffer, 0, _index, cancellationToken);
165-
}
166-
167163
internal void WriteToStream(Stream destination)
168164
{
169165
Debug.Assert(_rentedBuffer != null);
@@ -217,6 +213,28 @@ private void CheckAndResizeBuffer(int sizeHint)
217213
Debug.Assert(_rentedBuffer.Length - _index > 0);
218214
Debug.Assert(_rentedBuffer.Length - _index >= sizeHint);
219215
}
216+
217+
public override async ValueTask<FlushResult> FlushAsync(CancellationToken cancellationToken = default)
218+
{
219+
Debug.Assert(_stream is not null);
220+
#if NET
221+
await _stream.WriteAsync(WrittenMemory, cancellationToken).ConfigureAwait(false);
222+
#else
223+
Debug.Assert(_rentedBuffer != null);
224+
await _stream.WriteAsync(_rentedBuffer, 0, _index, cancellationToken).ConfigureAwait(false);
225+
#endif
226+
Clear();
227+
228+
return new FlushResult(isCanceled: false, isCompleted: false);
229+
}
230+
231+
public override bool CanGetUnflushedBytes => true;
232+
public override long UnflushedBytes => _index;
233+
234+
// This type is used internally in JsonSerializer to help buffer and flush bytes to the underlying Stream.
235+
// It's only pretending to be a PipeWriter and doesn't need Complete or CancelPendingFlush for the internal usage.
236+
public override void CancelPendingFlush() => throw new NotImplementedException();
237+
public override void Complete(Exception? exception = null) => throw new NotImplementedException();
220238
}
221239

222240
internal static partial class ThrowHelper

src/libraries/System.Text.Json/src/Resources/Strings.resx

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -734,8 +734,8 @@
734734
<data name="PipeWriterCanceled" xml:space="preserve">
735735
<value>PipeWriter.FlushAsync was canceled.</value>
736736
</data>
737-
<data name="PipeWriterCompleted" xml:space="preserve">
738-
<value>PipeWriter has been completed, nothing more can be written to it.</value>
737+
<data name="PipeWriter_DoesNotImplementUnflushedBytes" xml:space="preserve">
738+
<value>The PipeWriter '{0}' does not implement PipeWriter.UnflushedBytes.</value>
739739
</data>
740740
<data name="InvalidNewLine" xml:space="preserve">
741741
<value>New line can be only "\n" or "\r\n".</value>

src/libraries/System.Text.Json/src/System.Text.Json.csproj

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,6 @@ The System.Text.Json library is built-in as part of the shared framework in .NET
102102
<Compile Include="System\Text\Json\Reader\Utf8JsonReader.TryGet.cs" />
103103
<Compile Include="System\Text\Json\Serialization\Arguments.cs" />
104104
<Compile Include="System\Text\Json\Serialization\ArgumentState.cs" />
105-
<Compile Include="System\Text\Json\Serialization\AsyncSerializationBufferWriterContext.cs" />
106105
<Compile Include="System\Text\Json\Serialization\Attributes\JsonObjectCreationHandlingAttribute.cs" />
107106
<Compile Include="System\Text\Json\Serialization\Attributes\JsonConstructorAttribute.cs" />
108107
<Compile Include="System\Text\Json\Serialization\Attributes\JsonConverterAttribute.cs" />

src/libraries/System.Text.Json/src/System/Text/Json/Serialization/AsyncSerializationBufferWriterContext.cs

Lines changed: 0 additions & 85 deletions
This file was deleted.

src/libraries/System.Text.Json/src/System/Text/Json/Serialization/Converters/Collection/ArrayConverter.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ protected override bool OnWriteResume(Utf8JsonWriter writer, TElement[] array, J
5656

5757
state.Current.EndCollectionElement();
5858

59-
if (ShouldFlush(writer, ref state))
59+
if (ShouldFlush(ref state))
6060
{
6161
state.Current.EnumeratorIndex = ++index;
6262
return false;

src/libraries/System.Text.Json/src/System/Text/Json/Serialization/Converters/Collection/DictionaryDefaultConverter.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ protected internal override bool OnWriteResume(
4545

4646
do
4747
{
48-
if (ShouldFlush(writer, ref state))
48+
if (ShouldFlush(ref state))
4949
{
5050
state.Current.CollectionEnumerator = enumerator;
5151
return false;

src/libraries/System.Text.Json/src/System/Text/Json/Serialization/Converters/Collection/DictionaryOfTKeyTValueConverter.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ protected internal override bool OnWriteResume(
6161
{
6262
do
6363
{
64-
if (ShouldFlush(writer, ref state))
64+
if (ShouldFlush(ref state))
6565
{
6666
state.Current.CollectionEnumerator = enumerator;
6767
return false;

src/libraries/System.Text.Json/src/System/Text/Json/Serialization/Converters/Collection/IAsyncEnumerableOfTConverter.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ protected override bool OnWriteResume(Utf8JsonWriter writer, TAsyncEnumerable va
107107
return true;
108108
}
109109

110-
if (ShouldFlush(writer, ref state))
110+
if (ShouldFlush(ref state))
111111
{
112112
return false;
113113
}

src/libraries/System.Text.Json/src/System/Text/Json/Serialization/Converters/Collection/IDictionaryConverter.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ protected internal override bool OnWriteResume(Utf8JsonWriter writer, TDictionar
6060

6161
do
6262
{
63-
if (ShouldFlush(writer, ref state))
63+
if (ShouldFlush(ref state))
6464
{
6565
state.Current.CollectionEnumerator = enumerator;
6666
return false;

src/libraries/System.Text.Json/src/System/Text/Json/Serialization/Converters/Collection/IEnumerableConverter.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ protected override bool OnWriteResume(
5959
JsonConverter<object?> converter = GetElementConverter(ref state);
6060
do
6161
{
62-
if (ShouldFlush(writer, ref state))
62+
if (ShouldFlush(ref state))
6363
{
6464
state.Current.CollectionEnumerator = enumerator;
6565
return false;

0 commit comments

Comments
 (0)