Skip to content

Commit 04a98d3

Browse files
Implement nullability in streams Buffer (#7496)
* Implement nullability in streams Buffer * Update API approval list --------- Co-authored-by: Aaron Stannard <[email protected]>
1 parent 33daa3c commit 04a98d3

File tree

10 files changed

+278
-83
lines changed

10 files changed

+278
-83
lines changed

src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.DotNet.verified.txt

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,9 @@ namespace Akka.Streams
182182
public static Akka.Streams.Attributes CreateLogLevels(Akka.Event.LogLevel onElement = 0, Akka.Event.LogLevel onFinish = 0, Akka.Event.LogLevel onError = 3) { }
183183
public static Akka.Streams.Attributes CreateName(string name) { }
184184
public static string ExtractName(Akka.Streams.Implementation.IModule module, string defaultIfNotFound) { }
185-
public TAttr GetAttribute<TAttr>(TAttr defaultIfNotFound)
185+
[return: System.Diagnostics.CodeAnalysis.NotNullIfNotNullAttribute("defaultIfNotFound")]
186+
[return: System.Runtime.CompilerServices.NullableAttribute(2)]
187+
public TAttr GetAttribute<TAttr>([System.Runtime.CompilerServices.NullableAttribute(2)] TAttr defaultIfNotFound)
186188
where TAttr : class, Akka.Streams.Attributes.IAttribute { }
187189
public TAttr GetAttribute<TAttr>()
188190
where TAttr : class, Akka.Streams.Attributes.IAttribute { }
@@ -3504,7 +3506,13 @@ namespace Akka.Streams.Implementation
35043506
public override Akka.Streams.Implementation.IModule WithAttributes(Akka.Streams.Attributes attributes) { }
35053507
}
35063508
[Akka.Annotations.InternalApiAttribute()]
3507-
public sealed class QueueSink<T> : Akka.Streams.Stage.GraphStageWithMaterializedValue<Akka.Streams.SinkShape<T>, Akka.Streams.ISinkQueue<T>>
3509+
[System.Runtime.CompilerServices.NullableAttribute(new byte[] {
3510+
0,
3511+
1,
3512+
1,
3513+
1,
3514+
1})]
3515+
public sealed class QueueSink<[System.Runtime.CompilerServices.NullableAttribute(2)] T> : Akka.Streams.Stage.GraphStageWithMaterializedValue<Akka.Streams.SinkShape<T>, Akka.Streams.ISinkQueue<T>>
35083516
{
35093517
public readonly Akka.Streams.Inlet<T> In;
35103518
public QueueSink() { }
@@ -3514,16 +3522,26 @@ namespace Akka.Streams.Implementation
35143522
public override string ToString() { }
35153523
}
35163524
[Akka.Annotations.InternalApiAttribute()]
3517-
public sealed class QueueSource<TOut> : Akka.Streams.Stage.GraphStageWithMaterializedValue<Akka.Streams.SourceShape<TOut>, Akka.Streams.ISourceQueueWithComplete<TOut>>
3525+
[System.Runtime.CompilerServices.NullableAttribute(new byte[] {
3526+
0,
3527+
1,
3528+
1,
3529+
1,
3530+
1})]
3531+
public sealed class QueueSource<[System.Runtime.CompilerServices.NullableAttribute(2)] TOut> : Akka.Streams.Stage.GraphStageWithMaterializedValue<Akka.Streams.SourceShape<TOut>, Akka.Streams.ISourceQueueWithComplete<TOut>>
35183532
{
35193533
public QueueSource(int maxBuffer, Akka.Streams.OverflowStrategy overflowStrategy) { }
35203534
public Akka.Streams.Outlet<TOut> Out { get; }
35213535
public override Akka.Streams.SourceShape<TOut> Shape { get; }
35223536
public override Akka.Streams.Stage.ILogicAndMaterializedValue<Akka.Streams.ISourceQueueWithComplete<TOut>> CreateLogicAndMaterializedValue(Akka.Streams.Attributes inheritedAttributes) { }
3523-
public interface IInput<TOut> { }
3524-
public sealed class Materialized<TOut> : Akka.Streams.ISourceQueueWithComplete<TOut>, Akka.Streams.ISourceQueue<TOut>
3537+
public interface IInput<[System.Runtime.CompilerServices.NullableAttribute(2)] TOut> { }
3538+
[System.Runtime.CompilerServices.NullableAttribute(0)]
3539+
public sealed class Materialized<[System.Runtime.CompilerServices.NullableAttribute(2)] TOut> : Akka.Streams.ISourceQueueWithComplete<TOut>, Akka.Streams.ISourceQueue<TOut>
35253540
{
3526-
public Materialized(System.Action<Akka.Streams.Implementation.QueueSource<TOut>.IInput> invokeLogic, System.Threading.Tasks.TaskCompletionSource<object> completion) { }
3541+
public Materialized([System.Runtime.CompilerServices.NullableAttribute(new byte[] {
3542+
1,
3543+
1,
3544+
0})] System.Action<Akka.Streams.Implementation.QueueSource<TOut>.IInput> invokeLogic, System.Threading.Tasks.TaskCompletionSource<object> completion) { }
35273545
public void Complete() { }
35283546
public void Fail(System.Exception ex) { }
35293547
public System.Threading.Tasks.Task<Akka.Streams.IQueueOfferResult> OfferAsync(TOut element) { }
@@ -4094,7 +4112,10 @@ namespace Akka.Streams.Implementation.Fusing
40944112
protected override Akka.Streams.Stage.GraphStageLogic CreateLogic(Akka.Streams.Attributes inheritedAttributes) { }
40954113
}
40964114
[Akka.Annotations.InternalApiAttribute()]
4097-
public sealed class Buffer<T> : Akka.Streams.Implementation.Fusing.SimpleLinearGraphStage<T>
4115+
[System.Runtime.CompilerServices.NullableAttribute(new byte[] {
4116+
0,
4117+
1})]
4118+
public sealed class Buffer<[System.Runtime.CompilerServices.NullableAttribute(2)] T> : Akka.Streams.Implementation.Fusing.SimpleLinearGraphStage<T>
40984119
{
40994120
public Buffer(int count, Akka.Streams.OverflowStrategy overflowStrategy) { }
41004121
protected override Akka.Streams.Stage.GraphStageLogic CreateLogic(Akka.Streams.Attributes inheritedAttributes) { }
@@ -4113,7 +4134,10 @@ namespace Akka.Streams.Implementation.Fusing
41134134
public override string ToString() { }
41144135
}
41154136
[Akka.Annotations.InternalApiAttribute()]
4116-
public sealed class Delay<T> : Akka.Streams.Implementation.Fusing.SimpleLinearGraphStage<T>
4137+
[System.Runtime.CompilerServices.NullableAttribute(new byte[] {
4138+
0,
4139+
1})]
4140+
public sealed class Delay<[System.Runtime.CompilerServices.NullableAttribute(2)] T> : Akka.Streams.Implementation.Fusing.SimpleLinearGraphStage<T>
41174141
{
41184142
public Delay(System.TimeSpan delay, Akka.Streams.DelayOverflowStrategy strategy) { }
41194143
protected override Akka.Streams.Attributes InitialAttributes { get; }

src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Net.verified.txt

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,8 @@ namespace Akka.Streams
182182
public static Akka.Streams.Attributes CreateLogLevels(Akka.Event.LogLevel onElement = 0, Akka.Event.LogLevel onFinish = 0, Akka.Event.LogLevel onError = 3) { }
183183
public static Akka.Streams.Attributes CreateName(string name) { }
184184
public static string ExtractName(Akka.Streams.Implementation.IModule module, string defaultIfNotFound) { }
185-
public TAttr GetAttribute<TAttr>(TAttr defaultIfNotFound)
185+
[return: System.Runtime.CompilerServices.NullableAttribute(2)]
186+
public TAttr GetAttribute<TAttr>([System.Runtime.CompilerServices.NullableAttribute(2)] TAttr defaultIfNotFound)
186187
where TAttr : class, Akka.Streams.Attributes.IAttribute { }
187188
public TAttr GetAttribute<TAttr>()
188189
where TAttr : class, Akka.Streams.Attributes.IAttribute { }
@@ -3490,7 +3491,13 @@ namespace Akka.Streams.Implementation
34903491
public override Akka.Streams.Implementation.IModule WithAttributes(Akka.Streams.Attributes attributes) { }
34913492
}
34923493
[Akka.Annotations.InternalApiAttribute()]
3493-
public sealed class QueueSink<T> : Akka.Streams.Stage.GraphStageWithMaterializedValue<Akka.Streams.SinkShape<T>, Akka.Streams.ISinkQueue<T>>
3494+
[System.Runtime.CompilerServices.NullableAttribute(new byte[] {
3495+
0,
3496+
1,
3497+
1,
3498+
1,
3499+
1})]
3500+
public sealed class QueueSink<[System.Runtime.CompilerServices.NullableAttribute(2)] T> : Akka.Streams.Stage.GraphStageWithMaterializedValue<Akka.Streams.SinkShape<T>, Akka.Streams.ISinkQueue<T>>
34943501
{
34953502
public readonly Akka.Streams.Inlet<T> In;
34963503
public QueueSink() { }
@@ -3500,16 +3507,26 @@ namespace Akka.Streams.Implementation
35003507
public override string ToString() { }
35013508
}
35023509
[Akka.Annotations.InternalApiAttribute()]
3503-
public sealed class QueueSource<TOut> : Akka.Streams.Stage.GraphStageWithMaterializedValue<Akka.Streams.SourceShape<TOut>, Akka.Streams.ISourceQueueWithComplete<TOut>>
3510+
[System.Runtime.CompilerServices.NullableAttribute(new byte[] {
3511+
0,
3512+
1,
3513+
1,
3514+
1,
3515+
1})]
3516+
public sealed class QueueSource<[System.Runtime.CompilerServices.NullableAttribute(2)] TOut> : Akka.Streams.Stage.GraphStageWithMaterializedValue<Akka.Streams.SourceShape<TOut>, Akka.Streams.ISourceQueueWithComplete<TOut>>
35043517
{
35053518
public QueueSource(int maxBuffer, Akka.Streams.OverflowStrategy overflowStrategy) { }
35063519
public Akka.Streams.Outlet<TOut> Out { get; }
35073520
public override Akka.Streams.SourceShape<TOut> Shape { get; }
35083521
public override Akka.Streams.Stage.ILogicAndMaterializedValue<Akka.Streams.ISourceQueueWithComplete<TOut>> CreateLogicAndMaterializedValue(Akka.Streams.Attributes inheritedAttributes) { }
3509-
public interface IInput<TOut> { }
3510-
public sealed class Materialized<TOut> : Akka.Streams.ISourceQueueWithComplete<TOut>, Akka.Streams.ISourceQueue<TOut>
3522+
public interface IInput<[System.Runtime.CompilerServices.NullableAttribute(2)] TOut> { }
3523+
[System.Runtime.CompilerServices.NullableAttribute(0)]
3524+
public sealed class Materialized<[System.Runtime.CompilerServices.NullableAttribute(2)] TOut> : Akka.Streams.ISourceQueueWithComplete<TOut>, Akka.Streams.ISourceQueue<TOut>
35113525
{
3512-
public Materialized(System.Action<Akka.Streams.Implementation.QueueSource<TOut>.IInput> invokeLogic, System.Threading.Tasks.TaskCompletionSource<object> completion) { }
3526+
public Materialized([System.Runtime.CompilerServices.NullableAttribute(new byte[] {
3527+
1,
3528+
1,
3529+
0})] System.Action<Akka.Streams.Implementation.QueueSource<TOut>.IInput> invokeLogic, System.Threading.Tasks.TaskCompletionSource<object> completion) { }
35133530
public void Complete() { }
35143531
public void Fail(System.Exception ex) { }
35153532
public System.Threading.Tasks.Task<Akka.Streams.IQueueOfferResult> OfferAsync(TOut element) { }
@@ -4068,7 +4085,10 @@ namespace Akka.Streams.Implementation.Fusing
40684085
protected override Akka.Streams.Stage.GraphStageLogic CreateLogic(Akka.Streams.Attributes inheritedAttributes) { }
40694086
}
40704087
[Akka.Annotations.InternalApiAttribute()]
4071-
public sealed class Buffer<T> : Akka.Streams.Implementation.Fusing.SimpleLinearGraphStage<T>
4088+
[System.Runtime.CompilerServices.NullableAttribute(new byte[] {
4089+
0,
4090+
1})]
4091+
public sealed class Buffer<[System.Runtime.CompilerServices.NullableAttribute(2)] T> : Akka.Streams.Implementation.Fusing.SimpleLinearGraphStage<T>
40724092
{
40734093
public Buffer(int count, Akka.Streams.OverflowStrategy overflowStrategy) { }
40744094
protected override Akka.Streams.Stage.GraphStageLogic CreateLogic(Akka.Streams.Attributes inheritedAttributes) { }
@@ -4087,7 +4107,10 @@ namespace Akka.Streams.Implementation.Fusing
40874107
public override string ToString() { }
40884108
}
40894109
[Akka.Annotations.InternalApiAttribute()]
4090-
public sealed class Delay<T> : Akka.Streams.Implementation.Fusing.SimpleLinearGraphStage<T>
4110+
[System.Runtime.CompilerServices.NullableAttribute(new byte[] {
4111+
0,
4112+
1})]
4113+
public sealed class Delay<[System.Runtime.CompilerServices.NullableAttribute(2)] T> : Akka.Streams.Implementation.Fusing.SimpleLinearGraphStage<T>
40914114
{
40924115
public Delay(System.TimeSpan delay, Akka.Streams.DelayOverflowStrategy strategy) { }
40934116
protected override Akka.Streams.Attributes InitialAttributes { get; }

src/core/Akka.Streams/Attributes.cs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
using System;
99
using System.Collections.Generic;
10+
using System.Diagnostics.CodeAnalysis;
1011
using System.Linq;
1112
using System.Text;
1213
using Akka.Event;
@@ -325,8 +326,11 @@ public IEnumerable<TAttr> GetAttributeList<TAttr>() where TAttr : IAttribute
325326
/// <typeparam name="TAttr">TBD</typeparam>
326327
/// <param name="defaultIfNotFound">TBD</param>
327328
/// <returns>TBD</returns>
328-
public TAttr GetAttribute<TAttr>(TAttr defaultIfNotFound) where TAttr : class, IAttribute
329+
#nullable enable
330+
[return: NotNullIfNotNull("defaultIfNotFound")]
331+
public TAttr? GetAttribute<TAttr>(TAttr? defaultIfNotFound) where TAttr : class, IAttribute
329332
=> GetAttribute<TAttr>() ?? defaultIfNotFound;
333+
#nullable restore
330334

331335
/// <summary>
332336
/// Get the first (least specific) attribute of a given type or subtype thereof.

src/core/Akka.Streams/Implementation/ActorRefSourceActor.cs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
using Akka.Event;
1111
using Akka.Streams.Actors;
1212

13+
#nullable enable
1314
namespace Akka.Streams.Implementation
1415
{
1516
/// <summary>
@@ -40,7 +41,7 @@ public static Props Props(int bufferSize, OverflowStrategy overflowStrategy, Act
4041
/// <summary>
4142
/// TBD
4243
/// </summary>
43-
protected readonly IBuffer<T> Buffer;
44+
protected readonly IBuffer<T>? Buffer;
4445

4546
/// <summary>
4647
/// TBD
@@ -50,7 +51,6 @@ public static Props Props(int bufferSize, OverflowStrategy overflowStrategy, Act
5051
/// TBD
5152
/// </summary>
5253
public readonly OverflowStrategy OverflowStrategy;
53-
private ILoggingAdapter _log;
5454

5555
/// <summary>
5656
/// TBD
@@ -63,13 +63,13 @@ public ActorRefSourceActor(int bufferSize, OverflowStrategy overflowStrategy, in
6363
{
6464
BufferSize = bufferSize;
6565
OverflowStrategy = overflowStrategy;
66-
Buffer = bufferSize != 0 ? Implementation.Buffer.Create<T>(bufferSize, maxFixedBufferSize) : null;
66+
Buffer = bufferSize > 0 ? Implementation.Buffer.Create<T>(bufferSize, maxFixedBufferSize) : null;
6767
}
6868

6969
/// <summary>
7070
/// TBD
7171
/// </summary>
72-
protected ILoggingAdapter Log => _log ??= Context.GetLogger();
72+
protected ILoggingAdapter Log { get; } = Context.GetLogger();
7373

7474
/// <summary>
7575
/// TBD
@@ -90,7 +90,7 @@ protected bool DefaultReceive(object message)
9090
Context.Stop(Self);
9191
else if (message is Status.Success)
9292
{
93-
if (BufferSize == 0 || Buffer.IsEmpty)
93+
if (Buffer is null || Buffer.IsEmpty)
9494
OnCompleteThenStop(); // will complete the stream successfully
9595
else
9696
Context.Become(DrainBufferThenComplete);
@@ -112,7 +112,7 @@ protected virtual bool RequestElement(object message)
112112
if (message is Request)
113113
{
114114
// totalDemand is tracked by base
115-
if (BufferSize != 0)
115+
if (Buffer is not null)
116116
while (TotalDemand > 0L && !Buffer.IsEmpty)
117117
OnNext(Buffer.Dequeue());
118118

@@ -133,7 +133,7 @@ protected virtual bool ReceiveElement(T message)
133133
{
134134
if (TotalDemand > 0L)
135135
OnNext(message);
136-
else if (BufferSize == 0)
136+
else if (Buffer is null)
137137
Log.Debug("Dropping element because there is no downstream demand: [{0}]", message);
138138
else if (!Buffer.IsFull)
139139
Buffer.Enqueue(message);
@@ -189,7 +189,7 @@ private bool DrainBufferThenComplete(object message)
189189
// even if previously valid completion was requested via Status.Success
190190
OnErrorThenStop(failure.Cause);
191191
}
192-
else if (message is Request)
192+
else if (message is Request && Buffer is not null)
193193
{
194194
// totalDemand is tracked by base
195195
while (TotalDemand > 0L && !Buffer.IsEmpty)
@@ -201,7 +201,7 @@ private bool DrainBufferThenComplete(object message)
201201
else if (IsActive)
202202
Log.Debug(
203203
"Dropping element because Status.Success received already, only draining already buffered elements: [{0}] (pending: [{1}])",
204-
message, Buffer.Used);
204+
message, Buffer?.Used ?? 0);
205205
else
206206
return false;
207207

src/core/Akka.Streams/Implementation/Buffers.cs

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
using System.Runtime.CompilerServices;
1111
using Akka.Annotations;
1212

13+
#nullable enable
1314
namespace Akka.Streams.Implementation
1415
{
1516
/// <summary>
@@ -54,7 +55,7 @@ internal interface IBuffer<T>
5455
/// TBD
5556
/// </summary>
5657
/// <returns>TBD</returns>
57-
T Peek();
58+
T? Peek();
5859
/// <summary>
5960
/// TBD
6061
/// </summary>
@@ -161,7 +162,7 @@ internal abstract class FixedSizeBuffer<T> : IBuffer<T>
161162
/// </summary>
162163
protected long WriteIndex;
163164

164-
private readonly T[] _buffer;
165+
private readonly T?[] _buffer;
165166

166167
/// <summary>
167168
/// TBD
@@ -222,14 +223,20 @@ public void Enqueue(T element)
222223
/// <param name="element">TBD</param>
223224
/// <param name="maintenance">TBD</param>
224225
[MethodImpl(MethodImplOptions.NoInlining)]
225-
public void Put(long index, T element, bool maintenance) => _buffer[ToOffset(index, maintenance)] = element;
226+
public void Put(long index, T? element, bool maintenance) => _buffer[ToOffset(index, maintenance)] = element;
226227

227228
/// <summary>
228229
/// TBD
229230
/// </summary>
230231
/// <param name="index">TBD</param>
231232
/// <returns>TBD</returns>
232-
public T Get(long index) => _buffer[ToOffset(index, false)];
233+
public T Get(long index)
234+
{
235+
var elem = _buffer[ToOffset(index, false)];
236+
if(elem == null)
237+
throw new IndexOutOfRangeException($"Invalid buffer element at index {index}, element is null");
238+
return elem;
239+
}
233240

234241
/// <summary>
235242
/// TBD
@@ -354,7 +361,7 @@ private sealed class FixedQueue : IBuffer<T>
354361
private const int Size = 16;
355362
private const int Mask = 15;
356363

357-
private readonly T[] _queue = new T[Size];
364+
private readonly T?[] _queue = new T[Size];
358365
private readonly BoundedBuffer<T> _boundedBuffer;
359366
private int _head;
360367
private int _tail;
@@ -373,6 +380,9 @@ public FixedQueue(BoundedBuffer<T> boundedBuffer)
373380

374381
public void Enqueue(T element)
375382
{
383+
if(element is null)
384+
throw new ArgumentNullException(nameof(element));
385+
376386
if (_tail - _head == Size)
377387
{
378388
var queue = new DynamicQueue(Capacity);
@@ -392,12 +402,15 @@ public T Dequeue()
392402
{
393403
var pos = _head & Mask;
394404
var ret = _queue[pos];
405+
if(ret is null)
406+
throw new IndexOutOfRangeException();
407+
395408
_queue[pos] = default(T);
396409
_head += 1;
397410
return ret;
398411
}
399412

400-
public T Peek() => _tail == _head ? default(T) : _queue[_head & Mask];
413+
public T? Peek() => _tail == _head ? default(T) : _queue[_head & Mask];
401414

402415
public void Clear()
403416
{
@@ -431,12 +444,15 @@ public DynamicQueue(int capacity)
431444

432445
public T Dequeue()
433446
{
447+
if(First is null)
448+
throw new IndexOutOfRangeException();
449+
434450
var result = First.Value;
435451
RemoveFirst();
436452
return result;
437453
}
438454

439-
public T Peek() => First.Value;
455+
public T? Peek() => First is null ? default : First.Value;
440456

441457
public void DropHead() => RemoveFirst();
442458

@@ -498,7 +514,7 @@ public BoundedBuffer(int capacity)
498514
/// TBD
499515
/// </summary>
500516
/// <returns>TBD</returns>
501-
public T Peek() => _q.Peek();
517+
public T? Peek() => _q.Peek();
502518

503519
/// <summary>
504520
/// TBD

0 commit comments

Comments
 (0)