Skip to content

Commit 2d2eb67

Browse files
authored
Merge pull request #718 from ripienaar/async_writes
Support async write mode
2 parents dfb5a5e + a2993f6 commit 2d2eb67

16 files changed

+265
-2
lines changed

api/streams.go

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,77 @@ type JSApiStreamLeaderStepDownResponse struct {
299299
Success bool `json:"success,omitempty"`
300300
}
301301

302+
type PersistModeType int
303+
304+
const (
305+
// DefaultPersistMode specifies the default persist mode. Writes to the stream will immediately be flushed.
306+
// The publish acknowledgement will be sent after the persisting completes.
307+
DefaultPersistMode = PersistModeType(iota)
308+
// AsyncPersistMode specifies writes to the stream will be flushed asynchronously.
309+
// The publish acknowledgement may be sent before the persisting completes.
310+
// This means writes could be lost if they weren't flushed prior to a hard kill of the server.
311+
AsyncPersistMode
312+
)
313+
314+
func (wc PersistModeType) String() string {
315+
switch wc {
316+
case DefaultPersistMode:
317+
return "Default"
318+
case AsyncPersistMode:
319+
return "Async"
320+
default:
321+
return "Unknown Persist Mode Type"
322+
}
323+
}
324+
325+
func (wc PersistModeType) MarshalJSON() ([]byte, error) {
326+
switch wc {
327+
case DefaultPersistMode:
328+
return json.Marshal("default")
329+
case AsyncPersistMode:
330+
return json.Marshal("async")
331+
default:
332+
return nil, fmt.Errorf("can not marshal %v", wc)
333+
}
334+
}
335+
336+
func (wc PersistModeType) MarshalYAML() (any, error) {
337+
switch wc {
338+
case DefaultPersistMode:
339+
return "default", nil
340+
case AsyncPersistMode:
341+
return "async", nil
342+
default:
343+
return nil, fmt.Errorf("can not marshal %v", wc)
344+
}
345+
}
346+
347+
func (wc *PersistModeType) UnmarshalJSON(data []byte) error {
348+
switch string(data) {
349+
case jsonString("default"), jsonString(""):
350+
*wc = DefaultPersistMode
351+
case jsonString("async"):
352+
*wc = AsyncPersistMode
353+
default:
354+
return fmt.Errorf("can not unmarshal %q", data)
355+
}
356+
357+
return nil
358+
}
359+
360+
func (wc *PersistModeType) UnmarshalYAML(data *yaml.Node) error {
361+
switch data.Value {
362+
case "", "default":
363+
*wc = DefaultPersistMode
364+
case "async":
365+
*wc = AsyncPersistMode
366+
default:
367+
return fmt.Errorf("can not unmarshal %q", data.Value)
368+
}
369+
370+
return nil
371+
}
372+
302373
type DiscardPolicy int
303374

304375
const (
@@ -637,6 +708,8 @@ type StreamConfig struct {
637708
AllowMsgCounter bool `json:"allow_msg_counter,omitempty" yaml:"allow_msg_counter" api_level:"2"`
638709
// AllowMsgSchedules allows the scheduling of messages.
639710
AllowMsgSchedules bool `json:"allow_msg_schedules,omitempty" yaml:"allow_msg_schedules" api_level:"2"`
711+
// PersistMode allows to opt-in to different persistence mode settings.
712+
PersistMode PersistModeType `json:"persist_mode,omitempty" yaml:"persist_mode" api_level:"2"`
640713
}
641714

642715
// StreamConsumerLimits describes limits and defaults for consumers created on a stream

go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ require (
1010
github.com/jedib0t/go-pretty/v6 v6.6.8
1111
github.com/klauspost/compress v1.18.0
1212
github.com/nats-io/jwt/v2 v2.8.0
13-
github.com/nats-io/nats-server/v2 v2.12.0-preview.2.0.20250908093425-e80bea299ef9
13+
github.com/nats-io/nats-server/v2 v2.12.0-preview.2.0.20250917115845-b2e3354fc02f
1414
github.com/nats-io/nats.go v1.45.0
1515
github.com/nats-io/nkeys v0.4.11
1616
github.com/nats-io/nuid v1.0.1
@@ -37,7 +37,7 @@ require (
3737
github.com/rogpeppe/go-internal v1.12.0 // indirect
3838
go.uber.org/automaxprocs v1.6.0 // indirect
3939
go.yaml.in/yaml/v2 v2.4.2 // indirect
40-
golang.org/x/crypto v0.41.0 // indirect
40+
golang.org/x/crypto v0.42.0 // indirect
4141
golang.org/x/sys v0.36.0 // indirect
4242
golang.org/x/time v0.13.0 // indirect
4343
google.golang.org/protobuf v1.36.8 // indirect

go.sum

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,14 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq
3333
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
3434
github.com/nats-io/jwt/v2 v2.8.0 h1:K7uzyz50+yGZDO5o772eRE7atlcSEENpL7P+b74JV1g=
3535
github.com/nats-io/jwt/v2 v2.8.0/go.mod h1:me11pOkwObtcBNR8AiMrUbtVOUGkqYjMQZ6jnSdVUIA=
36+
github.com/nats-io/nats-server/v2 v2.12.0-RC.4 h1:buc4i5XBV4peg8RKK0Mvgz3g0SE2KxisQE3Y8dUSMjA=
37+
github.com/nats-io/nats-server/v2 v2.12.0-RC.4/go.mod h1:nr8dhzqkP5E/lDwmn+A2CvQPMd1yDKXQI7iGg3lAvww=
3638
github.com/nats-io/nats-server/v2 v2.12.0-preview.2.0.20250907125124-d09bb5b0bff9 h1:z+9mzFaoWjjpGY+D0xSdgfU2EIyw661xghAm8HwgvEA=
3739
github.com/nats-io/nats-server/v2 v2.12.0-preview.2.0.20250907125124-d09bb5b0bff9/go.mod h1:v2/Zzi22k8vpHjUNubYNTLnuVsSBqsMnzsym6u+opVQ=
3840
github.com/nats-io/nats-server/v2 v2.12.0-preview.2.0.20250908093425-e80bea299ef9 h1:fA4bM++LjZyq61SvlDzAdAh6H5laQIEAQxgTUNUvwUE=
3941
github.com/nats-io/nats-server/v2 v2.12.0-preview.2.0.20250908093425-e80bea299ef9/go.mod h1:KV8jyty6TP3Mtd+Cb/gZAiBcRKTuN9YFL0JO0lJgKv4=
42+
github.com/nats-io/nats-server/v2 v2.12.0-preview.2.0.20250917115845-b2e3354fc02f h1:4uzVTq8yy2fB6H5jpYhc+Mq/OihZFYY0IkIbcqhHk1M=
43+
github.com/nats-io/nats-server/v2 v2.12.0-preview.2.0.20250917115845-b2e3354fc02f/go.mod h1:nr8dhzqkP5E/lDwmn+A2CvQPMd1yDKXQI7iGg3lAvww=
4044
github.com/nats-io/nats.go v1.45.0 h1:/wGPbnYXDM0pLKFjZTX+2JOw9TQPoIgTFrUaH97giwA=
4145
github.com/nats-io/nats.go v1.45.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g=
4246
github.com/nats-io/nkeys v0.4.11 h1:q44qGV008kYd9W1b1nEBkNzvnWxtRSQ7A8BoqRrcfa0=
@@ -68,6 +72,8 @@ go.yaml.in/yaml/v2 v2.4.2 h1:DzmwEr2rDGHl7lsFgAHxmNz/1NlQ7xLIrlN2h5d1eGI=
6872
go.yaml.in/yaml/v2 v2.4.2/go.mod h1:081UH+NErpNdqlCXm3TtEran0rJZGxAYx9hb/ELlsPU=
6973
golang.org/x/crypto v0.41.0 h1:WKYxWedPGCTVVl5+WHSSrOBT0O8lx32+zxmHxijgXp4=
7074
golang.org/x/crypto v0.41.0/go.mod h1:pO5AFd7FA68rFak7rOAGVuygIISepHftHnr8dr6+sUc=
75+
golang.org/x/crypto v0.42.0 h1:chiH31gIWm57EkTXpwnqf8qeuMUi0yekh6mT2AvFlqI=
76+
golang.org/x/crypto v0.42.0/go.mod h1:4+rDnOTJhQCx2q7/j6rAN5XDw8kPjeaXEUR2eL94ix8=
7177
golang.org/x/exp v0.0.0-20250819193227-8b4c13bb791b h1:DXr+pvt3nC887026GRP39Ej11UATqWDmWuS99x26cD0=
7278
golang.org/x/exp v0.0.0-20250819193227-8b4c13bb791b/go.mod h1:4QTo5u+SEIbbKW1RacMZq1YEfOBqeXa19JeshGi+zc4=
7379
golang.org/x/net v0.43.0 h1:lat02VYK2j4aLzMzecihNvTlJNQUq316m2Mr9rnM6YE=

schema_source/jetstream/api/v1/definitions.json

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1277,6 +1277,12 @@
12771277
"description": "Enables and sets a duration for adding server markers for delete, purge and max age limits",
12781278
"$ref": "#/definitions/golang_duration_nanos",
12791279
"minimum": 0
1280+
},
1281+
"persist_mode": {
1282+
"description": "Sets a specific persistence mode for writing to the Stream",
1283+
"type": "string",
1284+
"enum": ["", "default", "async"],
1285+
"default": ""
12801286
}
12811287
}
12821288
},

schemas/jetstream/api/v1/stream_configuration.json

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -455,6 +455,16 @@
455455
"$comment": "nanoseconds depicting a duration in time, signed 64 bit integer",
456456
"type": "integer",
457457
"maximum": 9223372036854775807
458+
},
459+
"persist_mode": {
460+
"description": "Sets a specific persistence mode for writing to the Stream",
461+
"type": "string",
462+
"enum": [
463+
"",
464+
"default",
465+
"async"
466+
],
467+
"default": ""
458468
}
459469
}
460470
}

schemas/jetstream/api/v1/stream_create_request.json

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -458,6 +458,16 @@
458458
"$comment": "nanoseconds depicting a duration in time, signed 64 bit integer",
459459
"type": "integer",
460460
"maximum": 9223372036854775807
461+
},
462+
"persist_mode": {
463+
"description": "Sets a specific persistence mode for writing to the Stream",
464+
"type": "string",
465+
"enum": [
466+
"",
467+
"default",
468+
"async"
469+
],
470+
"default": ""
461471
}
462472
}
463473
},

schemas/jetstream/api/v1/stream_create_response.json

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -473,6 +473,16 @@
473473
"$comment": "nanoseconds depicting a duration in time, signed 64 bit integer",
474474
"type": "integer",
475475
"maximum": 9223372036854775807
476+
},
477+
"persist_mode": {
478+
"description": "Sets a specific persistence mode for writing to the Stream",
479+
"type": "string",
480+
"enum": [
481+
"",
482+
"default",
483+
"async"
484+
],
485+
"default": ""
476486
}
477487
}
478488
}

schemas/jetstream/api/v1/stream_info_response.json

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -473,6 +473,16 @@
473473
"$comment": "nanoseconds depicting a duration in time, signed 64 bit integer",
474474
"type": "integer",
475475
"maximum": 9223372036854775807
476+
},
477+
"persist_mode": {
478+
"description": "Sets a specific persistence mode for writing to the Stream",
479+
"type": "string",
480+
"enum": [
481+
"",
482+
"default",
483+
"async"
484+
],
485+
"default": ""
476486
}
477487
}
478488
}

schemas/jetstream/api/v1/stream_list_response.json

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -538,6 +538,16 @@
538538
"$comment": "nanoseconds depicting a duration in time, signed 64 bit integer",
539539
"type": "integer",
540540
"maximum": 9223372036854775807
541+
},
542+
"persist_mode": {
543+
"description": "Sets a specific persistence mode for writing to the Stream",
544+
"type": "string",
545+
"enum": [
546+
"",
547+
"default",
548+
"async"
549+
],
550+
"default": ""
541551
}
542552
}
543553
}

schemas/jetstream/api/v1/stream_restore_request.json

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -462,6 +462,16 @@
462462
"$comment": "nanoseconds depicting a duration in time, signed 64 bit integer",
463463
"type": "integer",
464464
"maximum": 9223372036854775807
465+
},
466+
"persist_mode": {
467+
"description": "Sets a specific persistence mode for writing to the Stream",
468+
"type": "string",
469+
"enum": [
470+
"",
471+
"default",
472+
"async"
473+
],
474+
"default": ""
465475
}
466476
}
467477
},

0 commit comments

Comments
 (0)