Skip to content

Commit 24005ca

Browse files
committed
Reorder Source/FlowWithContext type parameters
1 parent 79594a3 commit 24005ca

File tree

9 files changed

+209
-128
lines changed

9 files changed

+209
-128
lines changed

src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.approved.txt

Lines changed: 35 additions & 32 deletions
Large diffs are not rendered by default.
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
//-----------------------------------------------------------------------
2+
// <copyright file="FlowWithContextSpec.cs" company="Akka.NET Project">
3+
// Copyright (C) 2009-2021 Lightbend Inc. <http://www.lightbend.com>
4+
// Copyright (C) 2013-2021 .NET Foundation <https://github.com/akkadotnet/akka.net>
5+
// </copyright>
6+
//-----------------------------------------------------------------------
7+
8+
using System;
9+
using System.Linq;
10+
using Akka.Streams.Dsl;
11+
using Akka.Streams.TestKit;
12+
using Akka.TestKit;
13+
using Xunit;
14+
using Xunit.Abstractions;
15+
16+
namespace Akka.Streams.Tests.Dsl
17+
{
18+
public class FlowWithContextSpec : AkkaSpec
19+
{
20+
private ActorMaterializer Materializer { get; }
21+
22+
public FlowWithContextSpec(ITestOutputHelper helper) : base(helper)
23+
{
24+
var settings = ActorMaterializerSettings.Create(Sys);
25+
Materializer = ActorMaterializer.Create(Sys, settings);
26+
}
27+
28+
[Fact]
29+
public void A_FlowWithContext_must_get_created_from_FlowAsFlowWithContext()
30+
{
31+
var flow = Flow.Create<Message>().Select(m => m.Copy(data: m.Data + "z"));
32+
var flowWithContext = flow.AsFlowWithContext<Message, long, Message, long, NotUsed, Message>((m, o) => new Message(m.Data, o), m => m.Offset);
33+
34+
Source.From(new[] { new Message("a", 1L) })
35+
.AsSourceWithContext(m => m.Offset)
36+
.Via(flowWithContext)
37+
.AsSource()
38+
.RunWith(this.SinkProbe<(Message, long)>(), Materializer)
39+
.Request(1)
40+
.ExpectNext((new Message("az", 1L), 1L))
41+
.ExpectComplete();
42+
}
43+
}
44+
45+
sealed class Message : IEquatable<Message>
46+
{
47+
public string Data { get; }
48+
public long Offset { get; }
49+
50+
public Message(string data, long offset)
51+
{
52+
Data = data;
53+
Offset = offset;
54+
}
55+
56+
public Message Copy(string data = null, long? offset = null) => new Message(data ?? Data, offset ?? Offset);
57+
58+
public bool Equals(Message other)
59+
{
60+
if (other is null) return false;
61+
if (ReferenceEquals(this, other)) return true;
62+
return string.Equals(Data, other.Data) && Offset == other.Offset;
63+
}
64+
65+
public override bool Equals(object obj)
66+
{
67+
if (obj is null) return false;
68+
if (ReferenceEquals(this, obj)) return true;
69+
return obj is Message other && Equals(other);
70+
}
71+
72+
public override int GetHashCode()
73+
{
74+
unchecked
75+
{
76+
return ((Data != null ? Data.GetHashCode() : 0) * 397) ^ Offset.GetHashCode();
77+
}
78+
}
79+
}
80+
}

src/core/Akka.Streams.Tests/Dsl/SourceWithContextSpec.cs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -126,17 +126,14 @@ public void SourceWithContext_must_pass_through_context_using_Select_and_Where()
126126
[Fact]
127127
public void SourceWithContext_must_pass_through_context_using_FlowWithContext()
128128
{
129-
var flowWithContext = FlowWithContext.Create<long, string>();
130-
131-
var msg = new Message("a", 1);
129+
var flowWithContext = FlowWithContext.Create<string, long>();
132130

133131
var sink = this.CreateSubscriberProbe<(string, long)>();
134132

135-
Source.From(new[] { msg })
133+
Source.From(new[] { new Message("a", 1L) })
136134
.AsSourceWithContext(x => x.Offset)
137135
.Select(x => x.Data)
138136
.Via(flowWithContext.Select(s => s + "b"))
139-
.AsSource()
140137
.RunWith(Sink.FromSubscriber(sink), Materializer);
141138

142139
var sub = sink.ExpectSubscription();

src/core/Akka.Streams.Tests/Dsl/WithContextUsageSpec.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ private static CommittableMessage<Record>[] GenInput(int start, int end) =>
214214
new CommittableMessage<Record>(new Record(GenKey(i), GenValue(i)), new CommittableOffsetImpl(i)))
215215
.ToArray();
216216

217-
private static SourceWithContext<Offset, Record, NotUsed> CreateSourceWithContext(
217+
private static SourceWithContext<Record, Offset, NotUsed> CreateSourceWithContext(
218218
params CommittableMessage<Record>[] messages) =>
219219
CommittableConsumer.CommittableSource(messages)
220220
.AsSourceWithContext(m => new Offset(m.Offset.Offset))

src/core/Akka.Streams/Dsl/FlowOperations.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2399,7 +2399,7 @@ public static Flow<T, T, TMat> Watch<T, TMat>(this Flow<T, T, TMat> flow, IActor
23992399
/// <typeparam name="TCtxOut">Resulting context type</typeparam>
24002400
/// <typeparam name="TMat">Materialized value type</typeparam>
24012401
/// <typeparam name="TIn2">Type of passed flow elements</typeparam>
2402-
public static FlowWithContext<TCtxIn, TIn, TCtxOut, TOut, TMat> AsFlowWithContext<TCtxIn, TIn, TCtxOut, TOut, TMat, TIn2>(
2402+
public static FlowWithContext<TIn, TCtxIn, TOut, TCtxOut, TMat> AsFlowWithContext<TIn, TCtxIn, TOut, TCtxOut, TMat, TIn2>(
24032403
this Flow<TIn2, TOut, TMat> flow,
24042404
Func<TIn, TCtxIn, TIn2> collapseContext,
24052405
Func<TOut, TCtxOut> extractContext)

src/core/Akka.Streams/Dsl/FlowWithContext.cs

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
using System;
99
using System.Runtime.CompilerServices;
10+
using Akka.Annotations;
1011

1112
namespace Akka.Streams.Dsl
1213
{
@@ -17,28 +18,27 @@ namespace Akka.Streams.Dsl
1718
/// operations.
1819
///
1920
/// An "empty" flow can be created by calling <see cref="FlowWithContext.Create{TCtx,TIn}"/>.
20-
///
21-
/// API MAY CHANGE
22-
///</summary>
23-
public sealed class FlowWithContext<TCtxIn, TIn, TCtxOut, TOut, TMat>
21+
///</summary>
22+
[ApiMayChange]
23+
public sealed class FlowWithContext<TIn, TCtxIn, TOut, TCtxOut, TMat>
2424
: GraphDelegate<FlowShape<(TIn, TCtxIn), (TOut, TCtxOut)>, TMat>
2525
{
26-
internal FlowWithContext(Flow<(TIn, TCtxIn), (TOut, TCtxOut), TMat> flow)
26+
internal FlowWithContext(Flow<(TIn, TCtxIn), (TOut, TCtxOut), TMat> flow)
2727
: base(flow)
2828
{
2929
}
30-
30+
3131
///<summary>
3232
/// Transform this flow by the regular flow. The given flow must support manual context propagation by
3333
/// taking and producing tuples of (data, context).
3434
///
3535
/// This can be used as an escape hatch for operations that are not (yet) provided with automatic
3636
/// context propagation here.
3737
///</summary>
38-
public FlowWithContext<TCtxIn, TIn, TCtx2, TOut2, TMat> Via<TCtx2, TOut2, TMat2>(
38+
public FlowWithContext<TIn, TCtxIn, TOut2, TCtx2, TMat> Via<TOut2, TCtx2, TMat2>(
3939
IGraph<FlowShape<(TOut, TCtxOut), (TOut2, TCtx2)>, TMat2> viaFlow) =>
4040
FlowWithContext.From(Flow.FromGraph(Inner).Via(viaFlow));
41-
41+
4242
///<summary>
4343
/// Transform this flow by the regular flow. The given flow must support manual context propagation by
4444
/// taking and producing tuples of (data, context).
@@ -49,7 +49,7 @@ public FlowWithContext<TCtxIn, TIn, TCtx2, TOut2, TMat> Via<TCtx2, TOut2, TMat2>
4949
/// The <paramref name="combine"/> function is used to compose the materialized values of this flow and that
5050
/// flow into the materialized value of the resulting Flow.
5151
///</summary>
52-
public FlowWithContext<TCtxIn, TIn, TCtx2, TOut2, TMat3> ViaMaterialized<TCtx2, TOut2, TMat2, TMat3>(
52+
public FlowWithContext<TIn, TCtxIn, TOut2, TCtx2, TMat3> ViaMaterialized<TOut2, TCtx2, TMat2, TMat3>(
5353
IGraph<FlowShape<(TOut, TCtxOut), (TOut2, TCtx2)>, TMat2> viaFlow, Func<TMat, TMat2, TMat3> combine) =>
5454
FlowWithContext.From(Flow.FromGraph(Inner).ViaMaterialized(viaFlow, combine));
5555

@@ -60,17 +60,17 @@ public FlowWithContext<TCtxIn, TIn, TCtx2, TOut2, TMat3> ViaMaterialized<TCtx2,
6060
public static class FlowWithContext
6161
{
6262
/// <summary>
63-
/// Creates an "empty" <see cref="FlowWithContext{TCtxIn,TIn,TCtxOut,TOut,TMat}"/> that passes elements through with their context unchanged.
63+
/// Creates an "empty" <see cref="FlowWithContext{TIn,TCtxIn,TOut,TCtxOut,TMat}"/> that passes elements through with their context unchanged.
6464
/// </summary>
65-
/// <typeparam name="TCtx"></typeparam>
6665
/// <typeparam name="TIn"></typeparam>
66+
/// <typeparam name="TCtx"></typeparam>
6767
/// <returns></returns>
68-
public static FlowWithContext<TCtx, TIn, TCtx, TIn, NotUsed> Create<TCtx, TIn>()
68+
public static FlowWithContext<TIn, TCtx, TIn, TCtx, NotUsed> Create<TIn, TCtx>()
6969
{
7070
var under = Flow.Create<(TIn, TCtx), NotUsed>();
71-
return new FlowWithContext<TCtx, TIn, TCtx, TIn, NotUsed>(under);
71+
return new FlowWithContext<TIn, TCtx, TIn, TCtx, NotUsed>(under);
7272
}
73-
73+
7474
/// <summary>
7575
/// Creates a FlowWithContext from a regular flow that operates on a pair of `(data, context)` elements.
7676
/// </summary>
@@ -81,8 +81,8 @@ public static FlowWithContext<TCtx, TIn, TCtx, TIn, NotUsed> Create<TCtx, TIn>()
8181
/// <typeparam name="TOut"></typeparam>
8282
/// <typeparam name="TMat"></typeparam>
8383
/// <returns></returns>
84-
public static FlowWithContext<TCtxIn, TIn, TCtxOut, TOut, TMat> From<TCtxIn, TIn, TCtxOut, TOut, TMat>(
85-
Flow<(TIn, TCtxIn), (TOut, TCtxOut), TMat> flow) =>
86-
new FlowWithContext<TCtxIn, TIn, TCtxOut, TOut, TMat>(flow);
84+
public static FlowWithContext<TIn, TCtxIn, TOut, TCtxOut, TMat> From<TIn, TCtxIn, TOut, TCtxOut, TMat>(
85+
Flow<(TIn, TCtxIn), (TOut, TCtxOut), TMat> flow) =>
86+
new FlowWithContext<TIn, TCtxIn, TOut, TCtxOut, TMat>(flow);
8787
}
8888
}

0 commit comments

Comments
 (0)