Skip to content

Commit 2e75e26

Browse files
committed
feat(go): complete wrpc:http/outgoing-handler.handle serving support
Signed-off-by: Roman Volosatovs <[email protected]>
1 parent b901683 commit 2e75e26

File tree

14 files changed

+1416
-358
lines changed

14 files changed

+1416
-358
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/transport/src/lib.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1787,10 +1787,12 @@ impl<'a> Receive<'a> for Bytes {
17871787
where
17881788
T: Stream<Item = anyhow::Result<Bytes>> + Send + Sync + 'static,
17891789
{
1790+
trace!("receive byte list header");
17901791
let (len, payload) = receive_list_header(payload, rx).await?;
17911792
let cap = len
17921793
.try_into()
17931794
.context("list length does not fit in usize")?;
1795+
trace!(cap, "receive byte list contents");
17941796
let mut payload = receive_at_least(payload, rx, cap).await?;
17951797
Ok((payload.copy_to_bytes(cap), payload))
17961798
}
@@ -2027,21 +2029,24 @@ impl<'a> Receive<'a> for IncomingInputStream {
20272029
else {
20282030
bail!("stream subscription type mismatch")
20292031
};
2030-
trace!("decode stream");
2032+
trace!("decode byte stream");
20312033
let mut payload = receive_at_least(payload, rx, 1).await?;
20322034
match payload.get_u8() {
20332035
0 => {
2036+
trace!("decode pending byte stream");
20342037
let (items_tx, items_rx) = mpsc::channel(1);
20352038
let producer = spawn(async move {
20362039
let mut payload: Box<dyn Buf + Send + 'a> = Box::new(Bytes::new());
20372040
let mut subscriber = pin!(subscriber);
20382041
loop {
2042+
trace!("decode pending byte stream chunk");
20392043
match Bytes::receive_sync(payload, &mut subscriber).await {
20402044
Ok((vs, _)) if vs.is_empty() => {
20412045
trace!("stream end received, close stream");
20422046
return;
20432047
}
20442048
Ok((vs, buf)) => {
2049+
trace!(?vs, "decoded pending byte stream chunk");
20452050
payload = buf;
20462051
if let Err(err) = items_tx.send(Ok(vs)).await {
20472052
trace!(?err, "item receiver closed");
@@ -2067,6 +2072,7 @@ impl<'a> Receive<'a> for IncomingInputStream {
20672072
))
20682073
}
20692074
1 => {
2075+
trace!("decode ready byte stream");
20702076
let (buf, payload) = Bytes::receive_sync(payload, rx)
20712077
.await
20722078
.context("failed to receive bytes")?;

examples/go/http-outgoing-nats-server/main.go

Lines changed: 154 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"context"
88
"errors"
99
"fmt"
10+
"io"
1011
"log/slog"
1112
"net/http"
1213
"os"
@@ -20,49 +21,177 @@ import (
2021
wrpcnats "github.com/wrpc/wrpc/go/nats"
2122
)
2223

24+
type incomingBody struct {
25+
body io.Reader
26+
trailer http.Header
27+
trailerRx wrpc.Receiver[[]*wrpc.Tuple2[string, [][]byte]]
28+
}
29+
30+
func (r *incomingBody) Read(b []byte) (int, error) {
31+
n, err := r.body.Read(b)
32+
if err == io.EOF {
33+
trailers, err := r.trailerRx.Receive()
34+
if err != nil {
35+
return 0, fmt.Errorf("failed to receive trailers: %w", err)
36+
}
37+
for _, header := range trailers {
38+
for _, value := range header.V1 {
39+
r.trailer.Add(header.V0, string(value))
40+
}
41+
}
42+
return n, io.EOF
43+
}
44+
return n, err
45+
}
46+
47+
type outgoingBody struct {
48+
body io.ReadCloser
49+
trailer http.Header
50+
trailerCh chan<- []*wrpc.Tuple2[string, [][]byte]
51+
}
52+
53+
func (r *outgoingBody) Read(b []byte) (int, error) {
54+
slog.Debug("reading HTTP body chunk", "len", len(b))
55+
n, err := r.body.Read(b)
56+
slog.Debug("read HTTP body chunk", "len", n)
57+
if err == io.EOF {
58+
slog.Debug("HTTP body reached EOF, reading trailers")
59+
trailers := make([]*wrpc.Tuple2[string, [][]byte], 0, len(r.trailer))
60+
for header, values := range r.trailer {
61+
bs := make([][]byte, len(values))
62+
for i, value := range values {
63+
bs[i] = []byte(value)
64+
}
65+
trailers = append(trailers, &wrpc.Tuple2[string, [][]byte]{V0: header, V1: bs})
66+
}
67+
slog.Debug("sending trailers")
68+
r.trailerCh <- trailers
69+
close(r.trailerCh)
70+
return n, io.EOF
71+
}
72+
return n, err
73+
}
74+
75+
func (r *outgoingBody) Close() error {
76+
return r.body.Close()
77+
}
78+
79+
type trailerReceiver <-chan []*wrpc.Tuple2[string, [][]byte]
80+
81+
func (r trailerReceiver) Receive() ([]*wrpc.Tuple2[string, [][]byte], error) {
82+
trailers, ok := <-r
83+
if !ok {
84+
return nil, errors.New("trailer receiver channel closed")
85+
}
86+
return trailers, nil
87+
}
88+
89+
func (r trailerReceiver) Ready() bool {
90+
return false
91+
}
92+
2393
func run() error {
2494
nc, err := nats.Connect(nats.DefaultURL)
2595
if err != nil {
2696
return fmt.Errorf("failed to connect to NATS.io: %w", err)
2797
}
2898
defer nc.Close()
2999

30-
stop, err := outgoing_handler.ServeHandle(wrpcnats.NewClient(nc, "go"), func(ctx context.Context, request *types.RecordRequest, opts *types.RecordRequestOptions) (*types.RecordResponse, error) {
31-
scheme := request.Scheme.String()
32-
100+
stop, err := outgoing_handler.ServeHandle(wrpcnats.NewClient(nc, "go"), func(ctx context.Context, request *types.RequestRecord, opts *types.RequestOptionsRecord) (*wrpc.Result[types.ResponseRecord, types.ErrorCodeVariant], func(), error) {
101+
var scheme string
102+
switch disc := request.Scheme.Discriminant(); disc {
103+
case types.SchemeDiscriminant_Http:
104+
scheme = "http"
105+
case types.SchemeDiscriminant_Https:
106+
scheme = "https"
107+
case types.SchemeDiscriminant_Other:
108+
var ok bool
109+
scheme, ok = request.Scheme.GetOther()
110+
if !ok {
111+
return nil, nil, errors.New("invalid scheme")
112+
}
113+
default:
114+
return nil, nil, fmt.Errorf("invalid scheme discriminant %v", disc)
115+
}
33116
authority := ""
34117
if request.Authority != nil {
35118
authority = *request.Authority
36119
}
37-
38120
pathWithQuery := ""
39121
if request.PathWithQuery != nil {
40122
pathWithQuery = *request.PathWithQuery
41123
}
124+
url := fmt.Sprintf("%s://%s/%s", scheme, authority, pathWithQuery)
42125

43-
switch request.Method.Discriminant() {
44-
case types.DiscriminantMethod_Get:
45-
resp, err := http.Get(fmt.Sprintf("%s://%s/%s", scheme, authority, pathWithQuery))
46-
if err != nil {
47-
return nil, fmt.Errorf("request failed: %w", err)
48-
}
49-
headers := make([]*wrpc.Tuple2[string, [][]byte], 0, len(resp.Header))
50-
for header, values := range resp.Header {
51-
bs := make([][]byte, len(values))
52-
for i, value := range values {
53-
bs[i] = []byte(value)
54-
}
55-
headers = append(headers, &wrpc.Tuple2[string, [][]byte]{V0: header, V1: bs})
126+
var method string
127+
switch disc := request.Method.Discriminant(); disc {
128+
case types.MethodDiscriminant_Get:
129+
method = "get"
130+
case types.MethodDiscriminant_Head:
131+
method = "head"
132+
case types.MethodDiscriminant_Post:
133+
method = "post"
134+
case types.MethodDiscriminant_Put:
135+
method = "put"
136+
case types.MethodDiscriminant_Delete:
137+
method = "delete"
138+
case types.MethodDiscriminant_Connect:
139+
method = "connect"
140+
case types.MethodDiscriminant_Options:
141+
method = "options"
142+
case types.MethodDiscriminant_Trace:
143+
method = "trace"
144+
case types.MethodDiscriminant_Patch:
145+
method = "patch"
146+
case types.MethodDiscriminant_Other:
147+
var ok bool
148+
method, ok = request.Method.GetOther()
149+
if !ok {
150+
return nil, nil, errors.New("invalid HTTP method")
56151
}
57-
return &types.RecordResponse{
58-
Body: wrpc.NewPendingByteReader(bufio.NewReader(resp.Body)),
59-
Trailers: nil, // Trailers wrpc.ReadyReceiver[[]*wrpc.Tuple2[string, [][]byte]]
60-
Status: uint16(resp.StatusCode),
61-
Headers: headers,
62-
}, nil
63152
default:
64-
return nil, errors.New("only GET currently supported")
153+
return nil, nil, fmt.Errorf("invalid HTTP method discriminant %v", disc)
65154
}
155+
156+
var trailer http.Header
157+
req, err := http.NewRequest(method, url, &incomingBody{body: request.Body, trailer: trailer, trailerRx: request.Trailers})
158+
if err != nil {
159+
return nil, nil, fmt.Errorf("failed to construct a new HTTP request: %w", err)
160+
}
161+
req.Trailer = trailer
162+
for _, header := range request.Headers {
163+
for _, value := range header.V1 {
164+
req.Header.Add(header.V0, string(value))
165+
}
166+
}
167+
resp, err := http.DefaultClient.Do(req)
168+
if err != nil {
169+
return nil, nil, fmt.Errorf("request failed: %w", err)
170+
}
171+
172+
headers := make([]*wrpc.Tuple2[string, [][]byte], 0, len(resp.Header))
173+
for header, values := range resp.Header {
174+
bs := make([][]byte, len(values))
175+
for i, value := range values {
176+
bs[i] = []byte(value)
177+
}
178+
headers = append(headers, &wrpc.Tuple2[string, [][]byte]{V0: header, V1: bs})
179+
}
180+
181+
trailerCh := make(chan []*wrpc.Tuple2[string, [][]byte], 1)
182+
body := &outgoingBody{body: resp.Body, trailer: resp.Trailer, trailerCh: trailerCh}
183+
return &wrpc.Result[types.ResponseRecord, types.ErrorCodeVariant]{
184+
Ok: &types.ResponseRecord{
185+
Body: wrpc.NewPendingByteReader(bufio.NewReader(body)),
186+
Trailers: trailerReceiver(trailerCh),
187+
Status: uint16(resp.StatusCode),
188+
Headers: headers,
189+
},
190+
}, func() {
191+
if err := body.Close(); err != nil {
192+
slog.Warn("failed to close GET response body", "err", err)
193+
}
194+
}, nil
66195
})
67196
if err != nil {
68197
return err
@@ -82,7 +211,7 @@ func run() error {
82211

83212
func init() {
84213
slog.SetDefault(slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{
85-
AddSource: true, Level: slog.LevelDebug, ReplaceAttr: func(groups []string, a slog.Attr) slog.Attr {
214+
Level: slog.LevelDebug, ReplaceAttr: func(groups []string, a slog.Attr) slog.Attr {
86215
if a.Key == slog.TimeKey {
87216
return slog.Attr{}
88217
}

examples/rust/http-outgoing-nats-client/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ clap = { workspace = true, features = [
2323
futures = { workspace = true }
2424
http = { workspace = true, features = ["std"] }
2525
tokio = { workspace = true, features = ["io-std", "rt-multi-thread", "signal"] }
26+
tracing = { workspace = true, features = ["attributes"] }
2627
tracing-subscriber = { workspace = true, features = [
2728
"ansi",
2829
"env-filter",

examples/rust/http-outgoing-nats-client/src/main.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use http::uri::Uri;
77
use tokio::io::{stdout, AsyncWriteExt as _};
88
use tokio::sync::mpsc;
99
use tokio::try_join;
10+
use tracing::debug;
1011
use tracing_subscriber::layer::SubscriberExt as _;
1112
use tracing_subscriber::util::SubscriberInitExt as _;
1213
use wrpc_interface_http::{OutgoingHandler as _, Request, Response};
@@ -93,6 +94,7 @@ async fn main() -> anyhow::Result<()> {
9394
try_join!(
9495
async { tx.await.context("failed to transmit request") },
9596
async {
97+
debug!("receiving body");
9698
body.try_for_each(|chunk| async move {
9799
stdout()
98100
.write_all(&chunk)
@@ -102,9 +104,11 @@ async fn main() -> anyhow::Result<()> {
102104
})
103105
.await
104106
.context("failed to receive response body")?;
107+
debug!("received body");
105108

106109
println!();
107110

111+
debug!("receiving trailers");
108112
let trailers = trailers.await.context("failed to receive trailers")?;
109113
if let Some(trailers) = trailers {
110114
for (trailer, values) in trailers {
@@ -116,6 +120,7 @@ async fn main() -> anyhow::Result<()> {
116120
}
117121
}
118122
}
123+
debug!("received trailers");
119124
Ok(())
120125
}
121126
)?;

go/client.go

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,6 @@ package wrpc
22

33
import "context"
44

5-
type Transmitter interface {
6-
Transmit(context.Context, []uint32, []byte) error
7-
}
8-
9-
type Subscriber interface {
10-
Subscribe(func(context.Context, []byte)) (func() error, error)
11-
SubscribePath([]uint32, func(context.Context, []byte)) (func() error, error)
12-
}
13-
145
type ErrorSubscriber interface {
156
SubscribeError(func(context.Context, []byte)) (func() error, error)
167
}

0 commit comments

Comments
 (0)