Skip to content

Commit 50d6eb8

Browse files
committed
Renames: nextIndex->next
1 parent 642d189 commit 50d6eb8

File tree

10 files changed

+38
-78
lines changed

10 files changed

+38
-78
lines changed

src/Propulsion.CosmosStore/CosmosStoreSink.fs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ module Internal =
6565

6666
let write (log: ILogger) (ctx: EventsContext) stream (span: Event[]) ct = task {
6767
let i = StreamSpan.index span
68-
let n = StreamSpan.nextIndex span
68+
let n = StreamSpan.next span
6969
span |> Seq.iter (fun x -> if x.IsUnfold then invalidOp "CosmosStore3 does not [yet] support ingesting unfolds")
7070
log.Debug("Writing {s}@{i}x{n}", stream, i, span.Length)
7171
let mapData = FsCodec.Core.EventData.Map StreamSpan.toNativeEventBody

src/Propulsion.Kafka/ProducerSinks.fs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ type StreamsProducerSink =
4545
| _ -> ()
4646
do! producer.Produce(key, message, ct = ct)
4747
| ValueNone -> ()
48-
return struct (outcome, Events.nextIndex span)
48+
return struct (outcome, Events.next span)
4949
}
5050
Sync.Factory.StartAsync
5151
( log, maxReadAhead, maxConcurrentStreams, handle,

src/Propulsion/Sinks.fs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ type Codec<'E> = FsCodec.IEventCodec<'E, EventBody, unit>
1717
module Events =
1818

1919
/// The Index of the next event ordinarily expected on the next handler invocation (assuming this invocation handles all successfully)
20-
let nextIndex: Event[] -> int64 = Streams.StreamSpan.nextIndex
20+
let next: Event[] -> int64 = Streams.StreamSpan.next
2121
/// The Index of the first event as supplied to this handler
2222
let index: Event[] -> int64 = Streams.StreamSpan.index
2323

src/Propulsion/Streams.fs

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -103,22 +103,21 @@ module StreamSpan =
103103
trimmed, metrics eventSize trimmed
104104

105105
let inline index (span: FsCodec.ITimelineEvent<'F>[]) = span[0].Index
106-
let inline nextIndex (span: FsCodec.ITimelineEvent<'F>[]) =
106+
let inline next (span: FsCodec.ITimelineEvent<'F>[]) =
107107
let l = span[span.Length - 1]
108108
if l.IsUnfold then l.Index else l.Index + 1L
109-
let inline dropBeforeIndex min = function
109+
let inline dropBefore min = function
110110
| [||] as xs -> xs
111-
| xs when nextIndex xs < min -> Array.empty
111+
| xs when next xs < min -> Array.empty
112112
| xs ->
113113
match index xs with
114114
| xi when xi = min -> xs
115115
| xi -> xs |> Array.skip (min - xi |> int)
116-
117116
let merge min (spans: FsCodec.ITimelineEvent<_>[][]) =
118117
let candidates =
119118
[| for span in spans do
120119
if span <> null then
121-
match dropBeforeIndex min span with
120+
match dropBefore min span with
122121
| [||] -> ()
123122
| xs -> xs |]
124123
if candidates.Length = 0 then null
@@ -132,7 +131,7 @@ module StreamSpan =
132131
for i in 1 .. candidates.Length - 1 do
133132
let x = candidates[i]
134133
let xIndex = index x
135-
let accNext = nextIndex acc
134+
let accNext = next acc
136135
if xIndex > accNext then // Gap
137136
match acc |> Array.filter (_.IsUnfold >> not) with
138137
| [||] -> ()
@@ -141,8 +140,8 @@ module StreamSpan =
141140
buffer.Add eventsOnly
142141
acc <- x
143142
// Overlapping, join
144-
elif nextIndex x > accNext then
145-
match dropBeforeIndex accNext x with
143+
elif next x > accNext then
144+
match dropBefore accNext x with
146145
| [||] -> ()
147146
| news ->
148147
acc <- [| for x in acc do if not x.IsUnfold then x

tests/Propulsion.Kafka.Integration/ConsumersIntegration.fs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ module Helpers =
127127
do! handler (getConsumer()) (deserialize consumerId event)
128128
(log: ILogger).Information("BATCHED CONSUMER Handled {c} events in {l} streams", c, streams.Length)
129129
let ts = Stopwatch.elapsed ts
130-
return seq { for x in streams -> struct (Ok (Propulsion.Sinks.Events.nextIndex x.span), ts) } }
130+
return seq { for x in streams -> struct (Ok (Propulsion.Sinks.Events.next x.span), ts) } }
131131
let stats = Stats(log, TimeSpan.FromSeconds 5.,TimeSpan.FromSeconds 5.)
132132
let messageIndexes = StreamNameSequenceGenerator()
133133
let consumer =
@@ -165,7 +165,7 @@ module Helpers =
165165
let handle _ (span: Propulsion.Sinks.Event[]) = async {
166166
for event in span do
167167
do! handler (getConsumer()) (deserialize consumerId event)
168-
return (), Propulsion.Sinks.Events.nextIndex span }
168+
return (), Propulsion.Sinks.Events.next span }
169169
let stats = Stats(log, TimeSpan.FromSeconds 5.,TimeSpan.FromSeconds 5.)
170170
let messageIndexes = StreamNameSequenceGenerator()
171171
let consumer =

tests/Propulsion.MessageDb.Integration/Tests.fs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ let ``It processes events for a category`` () = task {
8484
test <@ Array.chooseV Simple.codec.Decode events |> Array.forall ((=) (Simple.Hello { name = "world" })) @>
8585
if handled.Count >= 2000 then
8686
stop ()
87-
return struct ((), Propulsion.Sinks.Events.nextIndex events) }
87+
return struct ((), Propulsion.Sinks.Events.next events) }
8888
use sink = Propulsion.Sinks.Factory.StartConcurrentAsync(log, 2, 2, handle, stats)
8989
let source = MessageDbSource(
9090
log, TimeSpan.FromMinutes 1,
@@ -132,7 +132,7 @@ let ``It doesn't read the tail event again`` () = task {
132132
let stats = stats log
133133

134134
let handle _ events _ = task {
135-
return struct ((), Propulsion.Sinks.Events.nextIndex events) }
135+
return struct ((), Propulsion.Sinks.Events.next events) }
136136
use sink = Propulsion.Sinks.Factory.StartConcurrentAsync(log, 1, 1, handle, stats)
137137
let batchSize = 10
138138
let source = MessageDbSource(

tests/Propulsion.Tests/SinkHealthTests.fs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ type Scenario(testOutput) =
3030
return failwith "transient"
3131
else
3232
do! Async.Sleep (TimeSpan.FromSeconds 1)
33-
return (), Propulsion.Sinks.Events.nextIndex events }
33+
return (), Propulsion.Sinks.Events.next events }
3434
let sink = Propulsion.Sinks.Factory.StartConcurrent(log, 2, 2, handle, stats)
3535
let dispose () =
3636
sink.Stop()

tests/Propulsion.Tests/SourceTests.fs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ type Scenario(testOutput) =
1414
let stats = { new Propulsion.Streams.Stats<_>(log, TimeSpan.FromMinutes 1, TimeSpan.FromMinutes 1)
1515
with member _.HandleOk x = ()
1616
member _.HandleExn(log, x) = () }
17-
let handle _ events _ = task { return struct ((), Propulsion.Sinks.Events.nextIndex events) }
17+
let handle _ events _ = task { return struct ((), Propulsion.Sinks.Events.next events) }
1818
let sink = Propulsion.Sinks.Factory.StartConcurrentAsync(log, 2, 2, handle, stats)
1919
let dispose () =
2020
sink.Stop()

tests/Propulsion.Tests/StreamStateTests.fs

Lines changed: 20 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -5,123 +5,84 @@ open Propulsion.Streams
55
open Swensen.Unquote
66
open Xunit
77

8-
module FsCodec301 = // Not yet merged, https://github.com/jet/FsCodec/pull/123
9-
open FsCodec
10-
open System
11-
/// <summary>An Event or Unfold that's been read from a Store and hence has a defined <c>Index</c> on the Event Timeline.</summary>
12-
[<NoComparison; NoEquality>]
13-
type TimelineEvent2<'Format>(index, eventType, data, meta, eventId, correlationId, causationId, timestamp, isUnfold, context, size) =
14-
15-
static member Create(index, eventType, data, ?meta, ?eventId, ?correlationId, ?causationId, ?timestamp, ?isUnfold, ?context, ?size): ITimelineEvent<'Format> =
16-
let isUnfold = defaultArg isUnfold false
17-
let meta = match meta with Some x -> x | None -> Unchecked.defaultof<_>
18-
let eventId = match eventId with Some x -> x | None -> Guid.Empty
19-
let ts = match timestamp with Some ts -> ts | None -> DateTimeOffset.UtcNow
20-
let size = defaultArg size 0
21-
TimelineEvent2(index, eventType, data, meta, eventId, Option.toObj correlationId, Option.toObj causationId, ts, isUnfold, Option.toObj context, size) :> _
22-
23-
static member Create(index, inner: IEventData<'Format>, ?isUnfold, ?context, ?size): ITimelineEvent<'Format> =
24-
let isUnfold = defaultArg isUnfold false
25-
let size = defaultArg size 0
26-
TimelineEvent2(index, inner.EventType, inner.Data, inner.Meta, inner.EventId, inner.CorrelationId, inner.CausationId, inner.Timestamp, isUnfold, Option.toObj context, size) :> _
27-
28-
override _.ToString() =
29-
let t = if isUnfold then "Unfold" else "Event"
30-
$"{t} {eventType} @{index} {context}"
31-
interface ITimelineEvent<'Format> with
32-
member _.Index = index
33-
member _.IsUnfold = isUnfold
34-
member _.Context = context
35-
member _.Size = size
36-
member _.EventType = eventType
37-
member _.Data = data
38-
member _.Meta = meta
39-
member _.EventId = eventId
40-
member _.CorrelationId = correlationId
41-
member _.CausationId = causationId
42-
member _.Timestamp = timestamp
43-
open FsCodec301
44-
458
let canonicalTime = System.DateTimeOffset.UtcNow
469

4710
let mk_ p c seg uc: FsCodec.ITimelineEvent<string>[] =
48-
let mk id et isUnfold = TimelineEvent2.Create(id, et, null, timestamp = canonicalTime, isUnfold = isUnfold, context = seg)
11+
let mk id et isUnfold = FsCodec.Core.TimelineEvent.Create(id, et, null, timestamp = canonicalTime, isUnfold = isUnfold, context = seg)
4912
[| for x in 0..c-1 -> mk (p + int64 x) (p + int64 x |> string) false
5013
for u in 0..uc-1 -> mk (p + int64 c) $"{p+int64 c}u{u}" true |]
5114
let mk p c = mk_ p c 0 0
52-
let merge = StreamSpan.merge
5315
let isSame = LanguagePrimitives.PhysicalEquality
54-
let dropBeforeIndex = StreamSpan.dropBeforeIndex
5516
let is (xs: FsCodec.ITimelineEvent<string>[][]) (res: FsCodec.ITimelineEvent<string>[][]) =
5617
(xs, res) ||> Seq.forall2 (fun x y -> (Array.isEmpty x && Array.isEmpty y)
5718
|| x[0].Index = y[0].Index && (x, y) ||> Seq.forall2 (fun x y -> x.EventType = y.EventType))
5819

5920
let [<Fact>] nothing () =
60-
let r = merge 0L [| mk 0L 0; mk 0L 0 |]
21+
let r = StreamSpan.merge 0L [| mk 0L 0; mk 0L 0 |]
6122
test <@ isSame null r @>
6223

6324
let [<Fact>] synced () =
64-
let r = merge 1L [| mk 0L 1; mk 0L 0 |]
25+
let r = StreamSpan.merge 1L [| mk 0L 1; mk 0L 0 |]
6526
test <@ isSame null r @>
6627

6728
let [<Fact>] ``no overlap`` () =
68-
let r = merge 0L [| mk 0L 1; mk 2L 2 |]
29+
let r = StreamSpan.merge 0L [| mk 0L 1; mk 2L 2 |]
6930
test <@ r |> is [| mk 0L 1; mk 2L 2 |] @>
7031

7132
let [<Fact>] overlap () =
72-
let r = merge 0L [| mk 0L 1; mk 0L 2 |]
33+
let r = StreamSpan.merge 0L [| mk 0L 1; mk 0L 2 |]
7334
test <@ r |> is [| mk 0L 2 |] @>
7435

7536
let [<Fact>] ``remove nulls`` () =
76-
let r = merge 1L [| mk 0L 1; mk 0L 2 |]
37+
let r = StreamSpan.merge 1L [| mk 0L 1; mk 0L 2 |]
7738
test <@ r |> is [| mk 1L 1 |] @>
7839

7940
let [<Fact>] adjacent () =
80-
let r = merge 0L [| mk 0L 1; mk 1L 2 |]
41+
let r = StreamSpan.merge 0L [| mk 0L 1; mk 1L 2 |]
8142
test <@ r |> is [| mk 0L 3 |] @>
8243

8344
let [<Fact>] ``adjacent to min`` () =
84-
let r = Array.map (dropBeforeIndex 2L) [| mk 0L 1; mk 1L 2 |]
45+
let r = Array.map (StreamSpan.dropBefore 2L) [| mk 0L 1; mk 1L 2 |]
8546
test <@ r |> is [| [||]; mk 2L 1 |] @>
8647

8748
let [<Fact>] ``adjacent to min merge`` () =
88-
let r = merge 2L [| mk 0L 1; mk 1L 2 |]
49+
let r = StreamSpan.merge 2L [| mk 0L 1; mk 1L 2 |]
8950
test <@ r |> is [| mk 2L 1 |] @>
9051

9152
let [<Fact>] ``adjacent to min no overlap`` () =
92-
let r = merge 2L [| mk 0L 1; mk 2L 1 |]
53+
let r = StreamSpan.merge 2L [| mk 0L 1; mk 2L 1 |]
9354
test <@ r |> is [| mk 2L 1|] @>
9455

9556
let [<Fact>] ``adjacent trim`` () =
96-
let r = Array.map (dropBeforeIndex 1L) [| mk 0L 2; mk 2L 2 |]
57+
let r = Array.map (StreamSpan.dropBefore 1L) [| mk 0L 2; mk 2L 2 |]
9758
test <@ r |> is [| mk 1L 1; mk 2L 2 |] @>
9859

9960
let [<Fact>] ``adjacent trim merge`` () =
100-
let r = merge 1L [| mk 0L 2; mk 2L 2 |]
61+
let r = StreamSpan.merge 1L [| mk 0L 2; mk 2L 2 |]
10162
test <@ r |> is [| mk 1L 3 |] @>
10263

10364
let [<Fact>] ``adjacent trim append`` () =
104-
let r = Array.map (dropBeforeIndex 1L) [| mk 0L 2; mk 2L 2; mk 5L 1 |]
65+
let r = Array.map (StreamSpan.dropBefore 1L) [| mk 0L 2; mk 2L 2; mk 5L 1 |]
10566
test <@ r |> is [| mk 1L 1; mk 2L 2; mk 5L 1 |] @>
10667

10768
let [<Fact>] ``adjacent trim append merge`` () =
108-
let r = merge 1L [| mk 0L 2; mk 2L 2; mk 5L 1|]
69+
let r = StreamSpan.merge 1L [| mk 0L 2; mk 2L 2; mk 5L 1|]
10970
test <@ r |> is [| mk 1L 3; mk 5L 1 |] @>
11071

11172
let [<Fact>] ``mixed adjacent trim append`` () =
112-
let r = Array.map (dropBeforeIndex 1L) [| mk 0L 2; mk 5L 1; mk 2L 2 |]
73+
let r = Array.map (StreamSpan.dropBefore 1L) [| mk 0L 2; mk 5L 1; mk 2L 2 |]
11374
test <@ r |> is [| mk 1L 1; mk 5L 1; mk 2L 2 |] @>
11475

11576
let [<Fact>] ``mixed adjacent trim append merge`` () =
116-
let r = merge 1L [| mk 0L 2; mk 5L 1; mk 2L 2|]
77+
let r = StreamSpan.merge 1L [| mk 0L 2; mk 5L 1; mk 2L 2|]
11778
test <@ r |> is [| mk 1L 3; mk 5L 1 |] @>
11879

11980
let [<Fact>] fail () =
120-
let r = merge 11614L [| null; mk 11614L 1 |]
81+
let r = StreamSpan.merge 11614L [| null; mk 11614L 1 |]
12182
test <@ r |> is [| mk 11614L 1 |] @>
12283

12384
let [<Fact>] ``fail 2`` () =
124-
let r = merge 11613L [| mk 11614L 1; null |]
85+
let r = StreamSpan.merge 11613L [| mk 11614L 1; null |]
12586
test <@ r |> is [| mk 11614L 1 |] @>
12687

12788
let (===) (xs: 't seq) (ys: 't seq) = (xs, ys) ||> Seq.forall2 isSame
@@ -137,7 +98,7 @@ let [<FsCheck.Xunit.Property(MaxTest = 1000)>] ``merges retain freshest unfolds,
13798
yield mk_ pos events seg unfolds
13899
pos <- pos + int64 events
139100
seg <- seg + 1 |]
140-
let res = merge 0L input
101+
let res = StreamSpan.merge 0L input
141102
// The only way to end up with a null output is by sending either no spans, or all empties
142103
if res = null then
143104
test <@ input |> Array.forall Array.isEmpty @>
@@ -165,7 +126,7 @@ let [<FsCheck.Xunit.Property(MaxTest = 1000)>] ``merges retain freshest unfolds,
165126
| _ -> true @>
166127

167128
// resulting span sequence must be monotonic, with a gap of at least 1 in the Index ranges per span
168-
test <@ res |> Seq.pairwise |> Seq.forall (fun (x, y) -> StreamSpan.nextIndex x < StreamSpan.index y) @>
129+
test <@ res |> Seq.pairwise |> Seq.forall (fun (x, y) -> StreamSpan.next x < StreamSpan.index y) @>
169130

170131
let others = res |> Array.take (res.Length - 1)
171132
// Only the last span can have unfolds

tools/Propulsion.Tool/Sync.fs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ type Stats(log: ILogger, statsInterval, stateInterval, logExternalStats) =
206206

207207
let private handle isValidEvent stream (events: Propulsion.Sinks.Event[]): Async<Outcome * int64> = async {
208208
let ham, spam = events |> Array.partition isValidEvent
209-
return Outcome.render_ stream ham spam 0, Propulsion.Sinks.Events.nextIndex events }
209+
return Outcome.render_ stream ham spam 0, Propulsion.Sinks.Events.next events }
210210

211211
let eofSignalException = System.Threading.Tasks.TaskCanceledException "Stopping; FeedMonitor wait completed"
212212
let run appName (c: Args.Configuration, p: ParseResults<Parameters>) = async {
@@ -249,7 +249,7 @@ let run appName (c: Args.Configuration, p: ParseResults<Parameters>) = async {
249249
let json = Propulsion.Codec.NewtonsoftJson.RenderedSpan.ofStreamSpan stream events
250250
|> Propulsion.Codec.NewtonsoftJson.Serdes.Serialize
251251
do! producer.ProduceAsync(FsCodec.StreamName.toString stream, json) |> Async.Ignore
252-
return Outcome.render_ stream ham spam 0, Propulsion.Sinks.Events.nextIndex events }
252+
return Outcome.render_ stream ham spam 0, Propulsion.Sinks.Events.next events }
253253
Propulsion.Sinks.Factory.StartConcurrent(Log.Logger, maxReadAhead, maxConcurrentProcessors, handle a.Filters.EventFilter, stats,
254254
requireAll = requireAll)
255255
| SubCommand.Sync sa ->

0 commit comments

Comments
 (0)