Skip to content

Commit 7732b36

Browse files
Add MapMaterializedValue for Source/Flow WithContext (#5711)
Co-authored-by: Gregorius Soedharmo <[email protected]>
1 parent 19811a1 commit 7732b36

File tree

5 files changed

+71
-19
lines changed

5 files changed

+71
-19
lines changed

src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1437,6 +1437,7 @@ namespace Akka.Streams.Dsl
14371437
public sealed class FlowWithContext<TIn, TCtxIn, TOut, TCtxOut, TMat> : Akka.Streams.GraphDelegate<Akka.Streams.FlowShape<System.ValueTuple<TIn, TCtxIn>, System.ValueTuple<TOut, TCtxOut>>, TMat>
14381438
{
14391439
public Akka.Streams.Dsl.Flow<System.ValueTuple<TIn, TCtxIn>, System.ValueTuple<TOut, TCtxOut>, TMat> AsFlow() { }
1440+
public Akka.Streams.Dsl.FlowWithContext<TIn, TCtxIn, TOut, TCtxOut, TMat2> MapMaterializedValue<TMat2>(System.Func<TMat, TMat2> combine) { }
14401441
public Akka.Streams.Dsl.FlowWithContext<TIn, TCtxIn, TOut2, TCtx2, TMat> Via<TOut2, TCtx2, TMat2>(Akka.Streams.IGraph<Akka.Streams.FlowShape<System.ValueTuple<TOut, TCtxOut>, System.ValueTuple<TOut2, TCtx2>>, TMat2> viaFlow) { }
14411442
public Akka.Streams.Dsl.FlowWithContext<TIn, TCtxIn, TOut2, TCtx2, TMat3> ViaMaterialized<TOut2, TCtx2, TMat2, TMat3>(Akka.Streams.IGraph<Akka.Streams.FlowShape<System.ValueTuple<TOut, TCtxOut>, System.ValueTuple<TOut2, TCtx2>>, TMat2> viaFlow, System.Func<TMat, TMat2, TMat3> combine) { }
14421443
}
@@ -2106,7 +2107,10 @@ namespace Akka.Streams.Dsl
21062107
{
21072108
public SourceWithContext(Akka.Streams.Dsl.Source<System.ValueTuple<TOut, TCtx>, TMat> source) { }
21082109
public Akka.Streams.Dsl.Source<System.ValueTuple<TOut, TCtx>, TMat> AsSource() { }
2110+
public Akka.Streams.Dsl.SourceWithContext<TOut, TCtx, TMat2> MapMaterializedValue<TMat2>(System.Func<TMat, TMat2> combine) { }
21092111
public TMat2 RunWith<TMat2>(Akka.Streams.IGraph<Akka.Streams.SinkShape<System.ValueTuple<TOut, TCtx>>, TMat2> sink, Akka.Streams.IMaterializer materializer) { }
2112+
public Akka.Streams.Dsl.IRunnableGraph<TMat> To<TMat2>(Akka.Streams.IGraph<Akka.Streams.SinkShape<System.ValueTuple<TOut, TCtx>>, TMat2> sink) { }
2113+
public Akka.Streams.Dsl.IRunnableGraph<TMat3> ToMaterialized<TMat2, TMat3>(Akka.Streams.IGraph<Akka.Streams.SinkShape<System.ValueTuple<TOut, TCtx>>, TMat2> sink, System.Func<TMat, TMat2, TMat3> combine) { }
21102114
public Akka.Streams.Dsl.SourceWithContext<TOut2, TCtx2, TMat> Via<TOut2, TCtx2, TMat2>(Akka.Streams.IGraph<Akka.Streams.FlowShape<System.ValueTuple<TOut, TCtx>, System.ValueTuple<TOut2, TCtx2>>, TMat2> viaFlow) { }
21112115
public Akka.Streams.Dsl.SourceWithContext<TOut2, TCtx2, TMat3> ViaMaterialized<TOut2, TCtx2, TMat2, TMat3>(Akka.Streams.IGraph<Akka.Streams.FlowShape<System.ValueTuple<TOut, TCtx>, System.ValueTuple<TOut2, TCtx2>>, TMat2> viaFlow, System.Func<TMat, TMat2, TMat3> combine) { }
21122116
}

src/core/Akka.Streams.Tests/Dsl/FlowWithContextSpec.cs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,23 @@ public void A_FlowWithContext_must_get_created_from_FlowAsFlowWithContext()
4040
.ExpectNext((new Message("az", 1L), 1L))
4141
.ExpectComplete();
4242
}
43+
44+
[Fact]
45+
public void A_FlowWithContext_must_be_able_to_map_materialized_value_via_FlowWithContext_MapMaterializedValue()
46+
{
47+
var materializedValue = "MatedValue";
48+
var mapMaterializedValueFlow = FlowWithContext.Create<Message, long>().MapMaterializedValue(_ => materializedValue);
49+
50+
var (matValue, probe) = Source.From(new[] { new Message("a", 1L) })
51+
.MapMaterializedValue(_ => 42)
52+
.AsSourceWithContext(m => m.Offset)
53+
.ViaMaterialized(mapMaterializedValueFlow, Keep.Both)
54+
.ToMaterialized(this.SinkProbe<(Message, long)>(), Keep.Both)
55+
.Run(Materializer);
56+
57+
matValue.ShouldBe((42, materializedValue));
58+
probe.Request(1).ExpectNext((new Message("a", 1L), 1L)).ExpectComplete();
59+
}
4360
}
4461

4562
sealed class Message : IEquatable<Message>

src/core/Akka.Streams.Tests/Dsl/SourceWithContextSpec.cs

Lines changed: 24 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -65,17 +65,13 @@ public void SourceWithContext_must_get_created_from_AsSourceWithContext()
6565
{
6666
var msg = new Message("a", 1);
6767

68-
var sink = this.CreateSubscriberProbe<(Message, long)>();
69-
7068
Source.From(new[] { msg })
7169
.AsSourceWithContext(x => x.Offset)
72-
.AsSource()
73-
.RunWith(Sink.FromSubscriber(sink), Materializer);
74-
75-
var sub = sink.ExpectSubscription();
76-
sub.Request(1);
77-
sink.ExpectNext((msg, 1L));
78-
sink.ExpectComplete();
70+
.ToMaterialized(this.SinkProbe<(Message, long)>(), Keep.Right)
71+
.Run(Materializer)
72+
.Request(1)
73+
.ExpectNext((msg, 1L))
74+
.ExpectComplete();
7975
}
8076

8177
[Fact]
@@ -100,8 +96,6 @@ public void SourceWithContext_must_be_able_to_get_turned_back_into_a_normal_sour
10096
[Fact]
10197
public void SourceWithContext_must_pass_through_context_using_Select_and_Where()
10298
{
103-
var sink = this.CreateSubscriberProbe<(string, long)>();
104-
10599
Source.From(new[]
106100
{
107101
new Message("A", 1),
@@ -113,14 +107,12 @@ public void SourceWithContext_must_pass_through_context_using_Select_and_Where()
113107
.Select(m => m.Data.ToLower())
114108
.Where(x => x != "b")
115109
.WhereNot(x => x == "d")
116-
.AsSource()
117-
.RunWith(Sink.FromSubscriber(sink), Materializer);
118-
119-
var sub = sink.ExpectSubscription();
120-
sub.Request(2);
121-
sink.ExpectNext(("a", 1L));
122-
sink.ExpectNext(("c", 4L));
123-
sink.ExpectComplete();
110+
.ToMaterialized(this.SinkProbe<(string, long)>(), Keep.Right)
111+
.Run(Materializer)
112+
.Request(2)
113+
.ExpectNext(("a", 1L))
114+
.ExpectNext(("c", 4L))
115+
.ExpectComplete();
124116
}
125117

126118
[Fact]
@@ -191,5 +183,18 @@ public void SourceWithContext_must_pass_through_sequence_of_context_per_element_
191183

192184
sink.ExpectComplete();
193185
}
186+
187+
[Fact]
188+
public void SourceWithContext_must_be_able_to_change_materialized_value_via_MapMaterializedValue()
189+
{
190+
var materializedValue = "MatedValue";
191+
192+
Source.Empty<Message>()
193+
.AsSourceWithContext(m => m.Offset)
194+
.MapMaterializedValue(_ => materializedValue)
195+
.To(Sink.Ignore<(Message, long)>())
196+
.Run(Materializer)
197+
.ShouldBe(materializedValue);
198+
}
194199
}
195200
}

src/core/Akka.Streams/Dsl/FlowWithContext.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,12 @@ public FlowWithContext<TIn, TCtxIn, TOut2, TCtx2, TMat3> ViaMaterialized<TOut2,
5353
IGraph<FlowShape<(TOut, TCtxOut), (TOut2, TCtx2)>, TMat2> viaFlow, Func<TMat, TMat2, TMat3> combine) =>
5454
FlowWithContext.From(Flow.FromGraph(Inner).ViaMaterialized(viaFlow, combine));
5555

56+
/// <summary>
57+
/// Context-preserving variant of <see cref="Flow{TIn, TOut, TMat2}.MapMaterializedValue{TMat2}(Func{TMat2, TMat2})"/>.
58+
/// </summary>
59+
public FlowWithContext<TIn, TCtxIn, TOut, TCtxOut, TMat2> MapMaterializedValue<TMat2>(Func<TMat, TMat2> combine) =>
60+
FlowWithContext.From(Flow.FromGraph(Inner).MapMaterializedValue(combine));
61+
5662
[MethodImpl(MethodImplOptions.AggressiveInlining)]
5763
public Flow<(TIn, TCtxIn), (TOut, TCtxOut), TMat> AsFlow() => Flow.FromGraph(Inner);
5864
}

src/core/Akka.Streams/Dsl/SourceWithContext.cs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,26 @@ public SourceWithContext<TOut2, TCtx2, TMat3> ViaMaterialized<TOut2, TCtx2, TMat
4848
IGraph<FlowShape<(TOut, TCtx), (TOut2, TCtx2)>, TMat2> viaFlow, Func<TMat, TMat2, TMat3> combine) =>
4949
new SourceWithContext<TOut2, TCtx2, TMat3>(Source.FromGraph(Inner).ViaMaterialized(viaFlow, combine));
5050

51+
/// <summary>
52+
/// Connect this <see cref="SourceWithContext{TOut, TCtx, TMat2}"/> to a <see cref="Sink"/>,
53+
/// concatenating the processing steps of both.
54+
/// </summary>
55+
public IRunnableGraph<TMat> To<TMat2>(IGraph<SinkShape<(TOut, TCtx)>, TMat2> sink) =>
56+
Source.FromGraph(Inner).ToMaterialized(sink, Keep.Left);
57+
58+
/// <summary>
59+
/// Connect this <see cref="SourceWithContext{TOut, TCtx, TMat2}"/> to a <see cref="Sink"/>,
60+
/// concatenating the processing steps of both.
61+
/// </summary>
62+
public IRunnableGraph<TMat3> ToMaterialized<TMat2, TMat3>(IGraph<SinkShape<(TOut, TCtx)>, TMat2> sink, Func<TMat, TMat2, TMat3> combine) =>
63+
Source.FromGraph(Inner).ToMaterialized(sink, combine);
64+
65+
/// <summary>
66+
/// Context-preserving variant of <see cref="Source{TOut, TMat2}.MapMaterializedValue{TMat2}(Func{TMat2, TMat2})"/>.
67+
/// </summary>
68+
public SourceWithContext<TOut, TCtx, TMat2> MapMaterializedValue<TMat2>(Func<TMat, TMat2> combine) =>
69+
new SourceWithContext<TOut, TCtx, TMat2>(Source.FromGraph(Inner).MapMaterializedValue(combine));
70+
5171
/// <summary>
5272
/// Connect this <see cref="SourceWithContext{TOut,TCtx,TMat}"/> to a Sink and run it. The returned value is the materialized value of the Sink.
5373
/// Note that the ActorSystem can be used as the implicit materializer parameter to use the SystemMaterializer for running the stream.

0 commit comments

Comments
 (0)