Skip to content

Commit ce274b0

Browse files
Extract ITestQueue<T> interface from BlockingQueue<T> (#5665)
Co-authored-by: Gregorius Soedharmo <[email protected]>
1 parent 22315d7 commit ce274b0

File tree

5 files changed

+247
-70
lines changed

5 files changed

+247
-70
lines changed

src/core/Akka.TestKit/Internal/BlockingCollectionTestActorQueue.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,25 +5,26 @@
55
// </copyright>
66
//-----------------------------------------------------------------------
77

8+
using System;
89
using System.Collections.Generic;
910

1011
namespace Akka.TestKit.Internal
1112
{
1213
/// <summary>
1314
/// This class represents an implementation of <see cref="ITestActorQueue{T}"/>
14-
/// that uses a <see cref="BlockingQueue{T}"/> as its backing store.
15+
/// that uses a <see cref="ITestQueue{T}"/> as its backing store.
1516
/// <remarks>Note! Part of internal API. Breaking changes may occur without notice. Use at own risk.</remarks>
1617
/// </summary>
1718
/// <typeparam name="T">The type of item to store.</typeparam>
1819
public class BlockingCollectionTestActorQueue<T> : ITestActorQueue<T>
1920
{
20-
private readonly BlockingQueue<T> _queue;
21+
private readonly ITestQueue<T> _queue;
2122

2223
/// <summary>
2324
/// Initializes a new instance of the <see cref="BlockingCollectionTestActorQueue{T}"/> class.
2425
/// </summary>
2526
/// <param name="queue">The queue to use as the backing store.</param>
26-
public BlockingCollectionTestActorQueue(BlockingQueue<T> queue)
27+
public BlockingCollectionTestActorQueue(ITestQueue<T> queue)
2728
{
2829
_queue = queue;
2930
}

src/core/Akka.TestKit/Internal/BlockingQueue.cs

Lines changed: 75 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
using System.Collections.Generic;
1212
using System.Linq;
1313
using System.Threading;
14+
using System.Threading.Tasks;
1415

1516
namespace Akka.TestKit.Internal
1617
{
@@ -20,54 +21,46 @@ namespace Akka.TestKit.Internal
2021
/// <remarks>Note! Part of internal API. Breaking changes may occur without notice. Use at own risk.</remarks>
2122
/// </summary>
2223
/// <typeparam name="T">The type of item to store.</typeparam>
23-
public class BlockingQueue<T>
24+
public class BlockingQueue<T> : ITestQueue<T>
2425
{
2526
private readonly BlockingCollection<Positioned> _collection = new BlockingCollection<Positioned>(new QueueWithAddFirst());
2627

27-
/// <summary>
28-
/// The number of items that are currently in the queue.
29-
/// </summary>
28+
/// <inheritdoc cref="Count"/>
3029
public int Count { get { return _collection.Count; } }
3130

32-
/// <summary>
33-
/// Adds the specified item to the end of the queue.
34-
/// </summary>
35-
/// <param name="item">The item to add to the queue.</param>
31+
/// <inheritdoc cref="Enqueue"/>
3632
public void Enqueue(T item)
3733
{
3834
if (!_collection.TryAdd(new Positioned(item)))
3935
throw new InvalidOperationException("Failed to enqueue item into the queue.");
4036
}
4137

42-
/// <summary>
43-
/// Adds the specified item to the front of the queue.
44-
/// </summary>
45-
/// <param name="item">The item to add to the queue.</param>
38+
/// <inheritdoc cref="EnqueueAsync"/>
39+
public async ValueTask EnqueueAsync(T item)
40+
{
41+
Enqueue(item);
42+
}
43+
4644
[Obsolete("This method will be removed from the public API in the future")]
4745
public void AddFirst(T item)
4846
{
4947
if(!_collection.TryAdd(new Positioned(item, first:true)))
5048
throw new InvalidOperationException("Failed to enqueue item into the head of the queue.");
5149
}
5250

53-
/// <summary>
54-
/// Tries to add the specified item to the end of the queue within the specified time period.
55-
/// A token can be provided to cancel the operation if needed.
56-
/// </summary>
57-
/// <param name="item">The item to add to the queue.</param>
58-
/// <param name="millisecondsTimeout">The number of milliseconds to wait for the add to complete.</param>
59-
/// <param name="cancellationToken">The cancellation token that can be used to cancel the operation.</param>
60-
/// <returns><c>true</c> if the add completed within the specified timeout; otherwise, <c>false</c>.</returns>
51+
/// <inheritdoc cref="TryEnqueue"/>
6152
public bool TryEnqueue(T item, int millisecondsTimeout, CancellationToken cancellationToken)
6253
{
6354
return _collection.TryAdd(new Positioned(item), millisecondsTimeout, cancellationToken);
6455
}
6556

66-
/// <summary>
67-
/// Tries to remove the specified item from the queue.
68-
/// </summary>
69-
/// <param name="item">The item to remove from the queue.</param>
70-
/// <returns><c>true</c> if the item was removed; otherwise, <c>false</c>.</returns>
57+
/// <inheritdoc cref="TryEnqueueAsync"/>
58+
public async ValueTask<bool> TryEnqueueAsync(T item, int millisecondsTimeout, CancellationToken cancellationToken)
59+
{
60+
return TryEnqueue(item, millisecondsTimeout, cancellationToken);
61+
}
62+
63+
/// <inheritdoc cref="TryTake(out T)"/>
7164
public bool TryTake(out T item)
7265
{
7366
if(_collection.TryTake(out var p))
@@ -79,14 +72,14 @@ public bool TryTake(out T item)
7972
return false;
8073
}
8174

82-
/// <summary>
83-
/// Tries to remove the specified item from the queue within the specified time period.
84-
/// A token can be provided to cancel the operation if needed.
85-
/// </summary>
86-
/// <param name="item">The item to remove from the queue.</param>
87-
/// <param name="millisecondsTimeout">The number of milliseconds to wait for the remove to complete.</param>
88-
/// <param name="cancellationToken">The cancellation token that can be used to cancel the operation.</param>
89-
/// <returns><c>true</c> if the remove completed within the specified timeout; otherwise, <c>false</c>.</returns>
75+
/// <inheritdoc cref="TryTakeAsync()"/>
76+
public async ValueTask<(bool success, T item)> TryTakeAsync()
77+
{
78+
var result = TryTake(out var item);
79+
return (result, item);
80+
}
81+
82+
/// <inheritdoc cref="TryTake(out T, int, CancellationToken)"/>
9083
public bool TryTake(out T item, int millisecondsTimeout, CancellationToken cancellationToken)
9184
{
9285
if(_collection.TryTake(out var p, millisecondsTimeout, cancellationToken))
@@ -98,27 +91,29 @@ public bool TryTake(out T item, int millisecondsTimeout, CancellationToken cance
9891
return false;
9992
}
10093

101-
/// <summary>
102-
/// Removes an item from the collection.
103-
/// </summary>
104-
/// <param name="cancellationToken">The cancellation token that can be used to cancel the operation.</param>
105-
/// <exception cref="OperationCanceledException">
106-
/// This exception is thrown when the operation is canceled.
107-
/// </exception>
108-
/// <returns>The item removed from the collection.</returns>
94+
/// <inheritdoc cref="TryTakeAsync(int, CancellationToken)"/>
95+
public async ValueTask<(bool success, T item)> TryTakeAsync(int millisecondsTimeout, CancellationToken cancellationToken)
96+
{
97+
var result = TryTake(out var item, millisecondsTimeout, cancellationToken);
98+
return (result, item);
99+
}
100+
101+
/// <inheritdoc cref="Take"/>
109102
public T Take(CancellationToken cancellationToken)
110103
{
111104
var p = _collection.Take(cancellationToken);
112105
return p.Value;
113106
}
114107

108+
/// <inheritdoc cref="TakeAsync"/>
109+
public async ValueTask<T> TakeAsync(CancellationToken cancellationToken)
110+
{
111+
return _collection.Take(cancellationToken).Value;
112+
}
113+
115114
#region Peek methods
116115

117-
/// <summary>
118-
/// Tries to remove the specified item from the queue.
119-
/// </summary>
120-
/// <param name="item">The item to remove from the queue.</param>
121-
/// <returns><c>true</c> if the item was removed; otherwise, <c>false</c>.</returns>
116+
/// <inheritdoc cref="TryPeek(out T)"/>
122117
public bool TryPeek(out T item)
123118
{
124119
if(_collection.TryTake(out var p))
@@ -131,14 +126,19 @@ public bool TryPeek(out T item)
131126
return false;
132127
}
133128

134-
/// <summary>
135-
/// Tries to remove the specified item from the queue within the specified time period.
136-
/// A token can be provided to cancel the operation if needed.
137-
/// </summary>
138-
/// <param name="item">The item to remove from the queue.</param>
139-
/// <param name="millisecondsTimeout">The number of milliseconds to wait for the remove to complete.</param>
140-
/// <param name="cancellationToken">The cancellation token that can be used to cancel the operation.</param>
141-
/// <returns><c>true</c> if the remove completed within the specified timeout; otherwise, <c>false</c>.</returns>
129+
/// <inheritdoc cref="TryPeekAsync()"/>
130+
public async ValueTask<(bool success, T item)> TryPeekAsync()
131+
{
132+
if(_collection.TryTake(out var p))
133+
{
134+
var item = p.Value;
135+
AddFirst(item);
136+
return (true, item);
137+
}
138+
return (false, default);
139+
}
140+
141+
/// <inheritdoc cref="TryPeek(out T, int, CancellationToken)"/>
142142
public bool TryPeek(out T item, int millisecondsTimeout, CancellationToken cancellationToken)
143143
{
144144
if(_collection.TryTake(out var p, millisecondsTimeout, cancellationToken))
@@ -151,27 +151,36 @@ public bool TryPeek(out T item, int millisecondsTimeout, CancellationToken cance
151151
return false;
152152
}
153153

154-
/// <summary>
155-
/// Removes an item from the collection.
156-
/// </summary>
157-
/// <param name="cancellationToken">The cancellation token that can be used to cancel the operation.</param>
158-
/// <exception cref="OperationCanceledException">
159-
/// This exception is thrown when the operation is canceled.
160-
/// </exception>
161-
/// <returns>The item removed from the collection.</returns>
154+
/// <inheritdoc cref="TryPeekAsync(int, CancellationToken)"/>
155+
public async ValueTask<(bool success, T item)> TryPeekAsync(int millisecondsTimeout, CancellationToken cancellationToken)
156+
{
157+
if(_collection.TryTake(out var p, millisecondsTimeout, cancellationToken))
158+
{
159+
var item = p.Value;
160+
AddFirst(item);
161+
return (true, item);
162+
}
163+
return (false, default);
164+
}
165+
166+
/// <inheritdoc cref="Peek"/>
162167
public T Peek(CancellationToken cancellationToken)
163168
{
164169
var p = _collection.Take(cancellationToken);
165170
AddFirst(p.Value);
166171
return p.Value;
167172
}
168173

174+
/// <inheritdoc cref="PeekAsync"/>
175+
public async ValueTask<T> PeekAsync(CancellationToken cancellationToken)
176+
{
177+
var val = _collection.Take(cancellationToken).Value;
178+
AddFirst(val);
179+
return val;
180+
}
169181
#endregion
170182

171-
/// <summary>
172-
/// Copies the items from the <see cref="BlockingQueue{T}"/> instance into a new <see cref="List{T}"/>.
173-
/// </summary>
174-
/// <returns>A <see cref="List{T}"/> containing copies of the elements of the collection</returns>
183+
/// <inheritdoc cref="ToList"/>
175184
public List<T> ToList()
176185
{
177186
var positionArray = _collection.ToArray();

src/core/Akka.TestKit/Internal/ITestActorQueue.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
// </copyright>
66
//-----------------------------------------------------------------------
77

8+
using System;
89
using System.Collections.Generic;
910

1011
namespace Akka.TestKit.Internal
@@ -30,6 +31,7 @@ public interface ITestActorQueue<T> : ITestActorQueueProducer<T>
3031
/// Copies all the items from the <see cref="ITestActorQueue{T}"/> instance into a new <see cref="List{T}"/>
3132
/// </summary>
3233
/// <returns>TBD</returns>
34+
[Obsolete("This method will be removed in the future")]
3335
List<T> ToList();
3436

3537
/// <summary>

0 commit comments

Comments
 (0)