Skip to content

Commit 60f1718

Browse files
committed
[CHANGED] Remove no headers support (#1939)
Signed-off-by: Piotr Piotrowski <[email protected]>
1 parent ec9b58f commit 60f1718

File tree

4 files changed

+10
-129
lines changed

4 files changed

+10
-129
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@ _testmain.go
2121

2222
*.exe
2323

24+
# Git backup files
25+
*.orig
26+
2427
# Emacs
2528
*~
2629
\#*\#

jetstream/jetstream_options.go

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -124,25 +124,6 @@ func WithGetMsgSubject(subject string) GetMsgOpt {
124124
}
125125
}
126126

127-
// WithGetMsgNoHeaders sets whether the message headers should be returned
128-
// in the response. If set to true, the headers will not be returned.
129-
// This is useful for performance reasons when headers are not needed.
130-
func WithGetMsgNoHeaders() GetMsgOpt {
131-
return func(req *apiMsgGetRequest) error {
132-
req.NoHeaders = true
133-
return nil
134-
}
135-
}
136-
137-
// WithGetLastForSubjectNoHeaders sets whether the message headers should be
138-
// returned in the response for the last message on a subject.
139-
func WithGetLastForSubjectNoHeaders() GetLastForSubjectOpt {
140-
return func(req *apiMsgGetRequest) error {
141-
req.NoHeaders = true
142-
return nil
143-
}
144-
}
145-
146127
// PullMaxMessages limits the number of messages to be buffered in the client.
147128
// If not provided, a default of 500 messages will be used.
148129
// This option is exclusive with PullMaxBytes.

jetstream/stream.go

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -225,10 +225,9 @@ type (
225225
GetLastForSubjectOpt func(*apiMsgGetRequest) error
226226

227227
apiMsgGetRequest struct {
228-
Seq uint64 `json:"seq,omitempty"`
229-
LastFor string `json:"last_by_subj,omitempty"`
230-
NextFor string `json:"next_by_subj,omitempty"`
231-
NoHeaders bool `json:"no_hdr,omitempty"`
228+
Seq uint64 `json:"seq,omitempty"`
229+
LastFor string `json:"last_by_subj,omitempty"`
230+
NextFor string `json:"next_by_subj,omitempty"`
232231
}
233232

234233
// apiMsgGetResponse is the response for a Stream get request.
@@ -555,14 +554,14 @@ func (s *stream) getMsg(ctx context.Context, mreq *apiMsgGetRequest) (*RawStream
555554
if err != nil {
556555
return nil, err
557556
}
558-
return convertDirectGetMsgResponseToMsg(r.msg, mreq.NoHeaders)
557+
return convertDirectGetMsgResponseToMsg(r.msg)
559558
}
560559
gmSubj = fmt.Sprintf(apiDirectMsgGetT, s.name)
561560
r, err := s.js.apiRequest(ctx, gmSubj, req)
562561
if err != nil {
563562
return nil, err
564563
}
565-
return convertDirectGetMsgResponseToMsg(r.msg, mreq.NoHeaders)
564+
return convertDirectGetMsgResponseToMsg(r.msg)
566565
}
567566
req, err := json.Marshal(mreq)
568567
if err != nil {
@@ -602,7 +601,7 @@ func (s *stream) getMsg(ctx context.Context, mreq *apiMsgGetRequest) (*RawStream
602601
}, nil
603602
}
604603

605-
func convertDirectGetMsgResponseToMsg(r *nats.Msg, noHeaders bool) (*RawStreamMsg, error) {
604+
func convertDirectGetMsgResponseToMsg(r *nats.Msg) (*RawStreamMsg, error) {
606605
// Check for 404/408. We would get a no-payload message and a "Status" header
607606
if len(r.Data) == 0 {
608607
val := r.Header.Get(statusHdr)
@@ -619,11 +618,7 @@ func convertDirectGetMsgResponseToMsg(r *nats.Msg, noHeaders bool) (*RawStreamMs
619618
}
620619
}
621620
}
622-
if noHeaders {
623-
return &RawStreamMsg{
624-
Data: r.Data,
625-
}, nil
626-
}
621+
627622
// Check for headers that give us the required information to
628623
// reconstruct the message.
629624
if len(r.Header) == 0 {

jetstream/test/stream_test.go

Lines changed: 0 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -1032,104 +1032,6 @@ func TestGetMsg(t *testing.T) {
10321032
}
10331033
}
10341034

1035-
func TestGetMsgNoHeaders(t *testing.T) {
1036-
srv := RunBasicJetStreamServer()
1037-
defer shutdownJSServerAndRemoveStorage(t, srv)
1038-
nc, err := nats.Connect(srv.ClientURL())
1039-
if err != nil {
1040-
t.Fatalf("Unexpected error: %v", err)
1041-
}
1042-
1043-
js, err := jetstream.New(nc)
1044-
if err != nil {
1045-
t.Fatalf("Unexpected error: %v", err)
1046-
}
1047-
defer nc.Close()
1048-
1049-
sNonDirect, err := js.CreateStream(context.Background(), jetstream.StreamConfig{Name: "NON_DIRECT", Subjects: []string{"FOO"}})
1050-
if err != nil {
1051-
t.Fatalf("Unexpected error: %v", err)
1052-
}
1053-
sDirect, err := js.CreateStream(context.Background(), jetstream.StreamConfig{Name: "DIRECT", Subjects: []string{"BAR"}})
1054-
if err != nil {
1055-
t.Fatalf("Unexpected error: %v", err)
1056-
}
1057-
1058-
for _, s := range []jetstream.Stream{sNonDirect, sDirect} {
1059-
if _, err := js.PublishMsg(context.Background(), &nats.Msg{
1060-
Data: []byte("msg without headers"),
1061-
Subject: s.CachedInfo().Config.Subjects[0],
1062-
Header: nats.Header{
1063-
"X-Nats-Test-Data": {"test_data"},
1064-
"X-Nats-Key": {"123"},
1065-
},
1066-
}); err != nil {
1067-
t.Fatalf("Unexpected error: %v", err)
1068-
}
1069-
1070-
msg, err := s.GetMsg(context.Background(), 1, jetstream.WithGetMsgNoHeaders())
1071-
if err != nil {
1072-
t.Fatalf("Unexpected error: %v", err)
1073-
}
1074-
if string(msg.Data) != "msg without headers" {
1075-
t.Fatalf("Invalid message data; want: msg without headers; got: %s", string(msg.Data))
1076-
}
1077-
if len(msg.Header) > 0 {
1078-
t.Fatalf("Expected no headers; got: %v", msg.Header)
1079-
}
1080-
}
1081-
}
1082-
1083-
func TestGetLastMsgForSubjectNoHeaders(t *testing.T) {
1084-
srv := RunBasicJetStreamServer()
1085-
defer shutdownJSServerAndRemoveStorage(t, srv)
1086-
nc, err := nats.Connect(srv.ClientURL())
1087-
if err != nil {
1088-
t.Fatalf("Unexpected error: %v", err)
1089-
}
1090-
1091-
js, err := jetstream.New(nc)
1092-
if err != nil {
1093-
t.Fatalf("Unexpected error: %v", err)
1094-
}
1095-
defer nc.Close()
1096-
1097-
sNonDirect, err := js.CreateStream(context.Background(), jetstream.StreamConfig{Name: "NON_DIRECT", Subjects: []string{"FOO"}})
1098-
if err != nil {
1099-
t.Fatalf("Unexpected error: %v", err)
1100-
}
1101-
sDirect, err := js.CreateStream(context.Background(), jetstream.StreamConfig{Name: "DIRECT", Subjects: []string{"BAR"}, AllowDirect: true})
1102-
if err != nil {
1103-
t.Fatalf("Unexpected error: %v", err)
1104-
}
1105-
1106-
for _, s := range []jetstream.Stream{sNonDirect, sDirect} {
1107-
t.Run("stream "+s.CachedInfo().Config.Name, func(t *testing.T) {
1108-
if _, err := js.PublishMsg(context.Background(), &nats.Msg{
1109-
Data: []byte("msg without headers"),
1110-
Subject: s.CachedInfo().Config.Subjects[0],
1111-
Header: nats.Header{
1112-
"X-Nats-Test-Data": {"test_data"},
1113-
"X-Nats-Key": {"123"},
1114-
},
1115-
}); err != nil {
1116-
t.Fatalf("Unexpected error: %v", err)
1117-
}
1118-
1119-
msg, err := s.GetLastMsgForSubject(context.Background(), s.CachedInfo().Config.Subjects[0], jetstream.WithGetLastForSubjectNoHeaders())
1120-
if err != nil {
1121-
t.Fatalf("Unexpected error: %v", err)
1122-
}
1123-
if string(msg.Data) != "msg without headers" {
1124-
t.Fatalf("Invalid message data; want: msg without headers; got: %s", string(msg.Data))
1125-
}
1126-
if len(msg.Header) > 0 {
1127-
t.Fatalf("Expected no headers; got: %v", msg.Header)
1128-
}
1129-
})
1130-
}
1131-
}
1132-
11331035
func TestGetLastMsgForSubject(t *testing.T) {
11341036
tests := []struct {
11351037
name string

0 commit comments

Comments
 (0)