Skip to content

refactor(go-examples): align with Rust, simplify #324

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions examples/go/hello-client/cmd/hello-client-nats/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@ func run() (err error) {
}
}
}()

for _, prefix := range os.Args[1:] {
prefixes := os.Args[1:]
if len(prefixes) == 0 {
prefixes = []string{"go"}
}
for _, prefix := range prefixes {
client := wrpcnats.NewClient(nc, wrpcnats.WithPrefix(prefix))
greeting, err := handler.Hello(context.Background(), client)
if err != nil {
Expand Down
7 changes: 5 additions & 2 deletions examples/go/streams-client/cmd/streams-client-nats/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,11 @@ func run() (err error) {
}
}
}()

for _, prefix := range os.Args[1:] {
prefixes := os.Args[1:]
if len(prefixes) == 0 {
prefixes = []string{"go"}
}
for _, prefix := range prefixes {
client := wrpcnats.NewClient(nc, wrpcnats.WithPrefix(prefix))
numbers, bytes, errCh, err := handler.Echo(context.Background(), client, &handler.Req{
Numbers: &ThrottleStream[uint64]{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"context"
"errors"
"fmt"
"log"
"log/slog"
Expand All @@ -27,36 +28,60 @@ func run() (err error) {
}
}
}()

for _, prefix := range os.Args[1:] {
prefixes := os.Args[1:]
if len(prefixes) == 0 {
prefixes = []string{"go"}
}
for _, prefix := range prefixes {
client := wrpcnats.NewClient(nc, wrpcnats.WithPrefix(prefix))
result, err := store.Open(context.Background(), client, "example")
if err != nil || result.Err != nil {
return fmt.Errorf("failed to call `wrpc-examples:keyvalue/store.open`: %w", err)
open, err := store.Open(context.Background(), client, "example")
if err != nil {
return fmt.Errorf("failed to invoke `wrpc-examples:keyvalue/store.open`: %w", err)
}
if open.Err != nil {
return fmt.Errorf("failed to open `example` bucket: %w", open.Err)
}
bucket := result.Ok
bucket := open.Ok

_, err = store.Bucket_Set(context.Background(), client, bucket.Borrow(), "foo", []byte("bar"))
set, err := store.Bucket_Set(context.Background(), client, bucket.Borrow(), "foo", []byte("bar"))
if err != nil {
return fmt.Errorf("failed to call `wrpc-examples:keyvalue/store.bucket.set`: %w", err)
return fmt.Errorf("failed to invoke `wrpc-examples:keyvalue/store.bucket.set`: %w", err)
}
exist, err := store.Bucket_Exists(context.Background(), client, bucket.Borrow(), "foo")
if set.Err != nil {
return fmt.Errorf("failed to set `foo`: %w", set.Err)
}

exists, err := store.Bucket_Exists(context.Background(), client, bucket.Borrow(), "foo")
if err != nil {
return fmt.Errorf("failed to call `wrpc-examples:keyvalue/store.bucket.exists`: %w", err)
} else {
fmt.Printf("%s exists: %t\n", prefix, *exist.Ok)
return fmt.Errorf("failed to invoke `wrpc-examples:keyvalue/store.bucket.exists`: %w", err)
}
if exists.Err != nil {
return fmt.Errorf("failed to check if `foo` exists: %w", exists.Err)
}
value, err := store.Bucket_Get(context.Background(), client, bucket.Borrow(), "foo")
if !*exists.Ok {
return errors.New("key `foo` does not exist in bucket")
}

get, err := store.Bucket_Get(context.Background(), client, bucket.Borrow(), "foo")
if err != nil {
return fmt.Errorf("failed to call `wrpc-examples:keyvalue/store.bucket.get`: %w", err)
} else {
fmt.Printf("%s get: %s\n", prefix, *value.Ok)
return fmt.Errorf("failed to invoke `wrpc-examples:keyvalue/store.bucket.get`: %w", err)
}
if get.Err != nil {
return fmt.Errorf("failed to get `foo`: %w", get.Err)
}
if string(*get.Ok) != "bar" {
return errors.New("key `foo` value is not `bar`")
}
keys, err := store.Bucket_ListKeys(context.Background(), client, bucket.Borrow(), nil)

listKeys, err := store.Bucket_ListKeys(context.Background(), client, bucket.Borrow(), nil)
if err != nil {
return fmt.Errorf("failed to call `wrpc-examples:keyvalue/store.bucket.list-keys`: %w", err)
} else {
fmt.Printf("%s keys: %v\n", prefix, (*keys.Ok).Keys)
return fmt.Errorf("failed to invoke `wrpc-examples:keyvalue/store.bucket.list-keys`: %w", err)
}
if listKeys.Err != nil {
return fmt.Errorf("failed to list keys: %w", listKeys.Err)
}
for _, key := range listKeys.Ok.Keys {
fmt.Printf("%s key: %s\n", prefix, key)
}
}
return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,99 +17,119 @@ import (
wrpcnats "wrpc.io/go/nats"
)

var _ store.Handler = &Handler{}
var (
errNoSuchStore = store.NewErrorNoSuchStore()
errInvalidDataType = store.NewErrorOther("invalid data type stored in map")
)

type Handler struct {
data map[Bucket]map[string][]uint8
lock sync.Mutex
sync.Map
}

type Bucket string
func Ok[T any](v T) *wrpc.Result[T, store.Error] {
return wrpc.Ok[store.Error](v)
}

func (h *Handler) Open(ctx context.Context, identifier string) (*wrpc.Result[wrpc.Own[store.Bucket], store.Error], error) {
h.lock.Lock()
defer h.lock.Unlock()
slog.InfoContext(ctx, "handling `wasi:keyvalue/store.open`", "identifier", identifier)
h.LoadOrStore(string(identifier), &sync.Map{})
return Ok(wrpc.Own[store.Bucket](identifier)), nil
}

if h.data == nil {
h.data = make(map[Bucket]map[string][]uint8)
func (h *Handler) Bucket_Get(ctx context.Context, bucket wrpc.Borrow[store.Bucket], key string) (*wrpc.Result[[]byte, store.Error], error) {
slog.InfoContext(ctx, "handling `wasi:keyvalue/store.bucket.get`", "bucket", bucket, "key", key)
v, ok := h.Load(string(bucket))
if !ok {
return wrpc.Err[[]byte](*errNoSuchStore), nil
}
bucket := (Bucket)(identifier)
if _, ok := h.data[bucket]; !ok {
h.data[bucket] = make(map[string][]uint8)
b, ok := v.(*sync.Map)
if !ok {
return wrpc.Err[[]byte](*errInvalidDataType), nil
}
return wrpc.Ok[store.Error, wrpc.Own[store.Bucket]](wrpc.Own[store.Bucket](bucket)), nil
}

func (h *Handler) Bucket_Get(ctx__ context.Context, self wrpc.Borrow[store.Bucket], key string) (*wrpc.Result[[]uint8, store.Error], error) {
h.lock.Lock()
defer h.lock.Unlock()

bucket := (Bucket)(self)
if _, ok := h.data[bucket]; !ok {
err := store.NewErrorNoSuchStore()
return wrpc.Err[[]uint8, store.Error](*err), err
v, ok = b.Load(key)
if !ok {
return Ok([]byte(nil)), nil
}
value, ok := h.data[bucket][key]
buf, ok := v.([]byte)
if !ok {
return wrpc.Ok[store.Error, []uint8](nil), nil
return wrpc.Err[[]byte](*errInvalidDataType), nil
}
return wrpc.Ok[store.Error, []uint8](value), nil
return Ok(buf), nil
}

func (h *Handler) Bucket_Set(ctx__ context.Context, self wrpc.Borrow[store.Bucket], key string, value []uint8) (*wrpc.Result[struct{}, store.Error], error) {
h.lock.Lock()
defer h.lock.Unlock()

bucket := (Bucket)(self)
if _, ok := h.data[bucket]; !ok {
err := store.NewErrorNoSuchStore()
return wrpc.Err[struct{}, store.Error](*err), err
func (h *Handler) Bucket_Set(ctx context.Context, bucket wrpc.Borrow[store.Bucket], key string, value []byte) (*wrpc.Result[struct{}, store.Error], error) {
slog.InfoContext(ctx, "handling `wrpc:keyvalue/store.bucket.set`", "bucket", bucket, "key", key, "value", value)
v, ok := h.Load(string(bucket))
if !ok {
return wrpc.Err[struct{}](*errNoSuchStore), nil
}
h.data[bucket][key] = value
return wrpc.Ok[store.Error, struct{}](struct{}{}), nil
b, ok := v.(*sync.Map)
if !ok {
return wrpc.Err[struct{}](*errInvalidDataType), nil
}
b.Store(key, value)
return Ok(struct{}{}), nil
}

func (h *Handler) Bucket_Delete(ctx__ context.Context, self wrpc.Borrow[store.Bucket], key string) (*wrpc.Result[struct{}, store.Error], error) {
h.lock.Lock()
defer h.lock.Unlock()

bucket := (Bucket)(self)
if _, ok := h.data[bucket]; !ok {
err := store.NewErrorNoSuchStore()
return wrpc.Err[struct{}, store.Error](*err), err
func (h *Handler) Bucket_Delete(ctx context.Context, bucket wrpc.Borrow[store.Bucket], key string) (*wrpc.Result[struct{}, store.Error], error) {
slog.InfoContext(ctx, "handling `wrpc:keyvalue/store.bucket.delete`", "bucket", bucket, "key", key)
v, ok := h.Load(string(bucket))
if !ok {
return wrpc.Err[struct{}](*errNoSuchStore), nil
}
delete(h.data[bucket], key)

return wrpc.Ok[store.Error, struct{}](struct{}{}), nil
b, ok := v.(*sync.Map)
if !ok {
return wrpc.Err[struct{}](*errInvalidDataType), nil
}
b.Delete(key)
return Ok(struct{}{}), nil
}

func (h *Handler) Bucket_Exists(ctx__ context.Context, self wrpc.Borrow[store.Bucket], key string) (*wrpc.Result[bool, store.Error], error) {
h.lock.Lock()
defer h.lock.Unlock()

bucket := (Bucket)(self)
if _, ok := h.data[bucket]; !ok {
err := store.NewErrorNoSuchStore()
return wrpc.Err[bool, store.Error](*err), err
func (h *Handler) Bucket_Exists(ctx context.Context, bucket wrpc.Borrow[store.Bucket], key string) (*wrpc.Result[bool, store.Error], error) {
slog.InfoContext(ctx, "handling `wrpc:keyvalue/store.bucket.exists`", "bucket", bucket, "key", key)
v, ok := h.Load(string(bucket))
if !ok {
return wrpc.Err[bool](*errNoSuchStore), nil
}
b, ok := v.(*sync.Map)
if !ok {
return wrpc.Err[bool](*errInvalidDataType), nil
}
_, ok := h.data[bucket][key]
return wrpc.Ok[store.Error, bool](ok), nil
_, ok = b.Load(key)
return Ok(ok), nil
}

func (h *Handler) Bucket_ListKeys(ctx__ context.Context, self wrpc.Borrow[store.Bucket], cursor *uint64) (*wrpc.Result[store.KeyResponse, store.Error], error) {
h.lock.Lock()
defer h.lock.Unlock()

bucket := (Bucket)(self)
if _, ok := h.data[bucket]; !ok {
err := store.NewErrorNoSuchStore()
return wrpc.Err[store.KeyResponse, store.Error](*err), err
func (h *Handler) Bucket_ListKeys(ctx context.Context, bucket wrpc.Borrow[store.Bucket], cursor *uint64) (*wrpc.Result[store.KeyResponse, store.Error], error) {
slog.InfoContext(ctx, "handling `wrpc:keyvalue/store.bucket.list-keys`", "bucket", bucket, "cursor", cursor)
if cursor != nil {
return wrpc.Err[store.KeyResponse](*store.NewErrorOther("cursors are not supported")), nil
}
v, ok := h.Load(string(bucket))
if !ok {
return wrpc.Err[store.KeyResponse](*errNoSuchStore), nil
}
keyResponse := store.KeyResponse{Keys: []string{}}
for k := range h.data[bucket] {
keyResponse.Keys = append(keyResponse.Keys, k)
b, ok := v.(*sync.Map)
if !ok {
return wrpc.Err[store.KeyResponse](*errInvalidDataType), nil
}
var keys []string
var err *store.Error
b.Range(func(k, _ any) bool {
s, ok := k.(string)
if !ok {
err = errInvalidDataType
return false
}
keys = append(keys, s)
return true
})
if err != nil {
return wrpc.Err[store.KeyResponse](*err), nil
}
return wrpc.Ok[store.Error, store.KeyResponse](keyResponse), nil
return Ok(store.KeyResponse{
Keys: keys,
Cursor: nil,
}), nil
}

func run() error {
Expand All @@ -131,15 +151,15 @@ func run() error {
client := wrpcnats.NewClient(nc, wrpcnats.WithPrefix("go"))
stop, err := server.Serve(client, &Handler{})
if err != nil {
return fmt.Errorf("failed to serve `keyvalue` world: %w", err)
return fmt.Errorf("failed to serve `server` world: %w", err)
}

signalCh := make(chan os.Signal, 1)
signal.Notify(signalCh, syscall.SIGINT)
<-signalCh

if err = stop(); err != nil {
return fmt.Errorf("failed to stop `keyvalue` world: %w", err)
return fmt.Errorf("failed to stop `server` world: %w", err)
}
return nil
}
Expand Down