Skip to content

Commit 2fe44be

Browse files
authored
feat: llm bridge server communicates with zipper in memory way (#996)
# Description 1. Implement `frame.Listener` using a golang channel. 2. Zipper supports managing multiple `frame.Listener` instances. 3. The LLM bridge server communicates with Zipper using the golang channel implementation of `frame.Listener`.
1 parent 6152a12 commit 2fe44be

File tree

12 files changed

+502
-66
lines changed

12 files changed

+502
-66
lines changed

cli/serve.go

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,10 @@ import (
2222

2323
"github.com/spf13/cobra"
2424
"github.com/yomorun/yomo"
25+
"github.com/yomorun/yomo/core/auth"
2526
"github.com/yomorun/yomo/core/ylog"
2627
pkgconfig "github.com/yomorun/yomo/pkg/config"
28+
"github.com/yomorun/yomo/pkg/listener/mem"
2729
"github.com/yomorun/yomo/pkg/log"
2830
"github.com/yomorun/yomo/pkg/trace"
2931

@@ -68,15 +70,18 @@ var serveCmd = &cobra.Command{
6870
// listening address.
6971
listenAddr := fmt.Sprintf("%s:%d", conf.Host, conf.Port)
7072

73+
// memory listener
74+
var listener *mem.Listener
75+
7176
options := []yomo.ZipperOption{}
7277
tokenString := ""
7378
if _, ok := conf.Auth["type"]; ok {
7479
if tokenString, ok = conf.Auth["token"]; ok {
7580
options = append(options, yomo.WithAuth("token", tokenString))
7681
}
7782
}
78-
// check llm bridge server config
79-
// parse the llm bridge config
83+
84+
// check and parse the llm bridge server config
8085
bridgeConf := conf.Bridge
8186
aiConfig, err := ai.ParseConfig(bridgeConf)
8287
if err != nil {
@@ -88,8 +93,10 @@ var serveCmd = &cobra.Command{
8893
}
8994
}
9095
if aiConfig != nil {
96+
listener = mem.Listen()
9197
// add AI connection middleware
9298
options = append(options, yomo.WithZipperConnMiddleware(ai.RegisterFunctionMW()))
99+
options = append(options, yomo.WithFrameListener(listener))
93100
}
94101
// new zipper
95102
zipper, err := yomo.NewZipper(
@@ -108,7 +115,13 @@ var serveCmd = &cobra.Command{
108115
registerAIProvider(aiConfig)
109116
// start the llm api server
110117
go func() {
111-
err := ai.Serve(aiConfig, listenAddr, fmt.Sprintf("token:%s", tokenString), ylog.Default())
118+
conn, _ := listener.Dial()
119+
source := ai.NewSource(conn, auth.NewCredential(fmt.Sprintf("token:%s", tokenString)))
120+
121+
conn2, _ := listener.Dial()
122+
reducer := ai.NewReducer(conn2, auth.NewCredential(fmt.Sprintf("token:%s", tokenString)))
123+
124+
err := ai.Serve(aiConfig, ylog.Default(), source, reducer)
112125
if err != nil {
113126
log.FailureStatusEvent(os.Stdout, err.Error())
114127
return

cli/serverless/golang/serverless.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,6 @@ func (s *GolangServerless) Init(opts *serverless.Options) error {
127127

128128
// Build compiles the serverless to executable
129129
func (s *GolangServerless) Build(clean bool) error {
130-
log.PendingStatusEvent(os.Stdout, "Building YoMo Stream Function instance...")
131130
// check if the file exists
132131
appPath := s.source
133132
if _, err := os.Stat(appPath); os.IsNotExist(err) {
@@ -203,7 +202,6 @@ func (s *GolangServerless) Build(clean bool) error {
203202
if clean {
204203
file.Remove(s.tempDir)
205204
}
206-
log.SuccessStatusEvent(os.Stdout, "YoMo Stream Function build successful!")
207205
return nil
208206
}
209207

core/server.go

Lines changed: 29 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -143,20 +143,32 @@ func (s *Server) Serve(ctx context.Context, conn net.PacketConn) error {
143143

144144
defer closeServer(s.downstreams, s.connector, s.listener, s.router)
145145

146-
errCount := 0
147-
for {
148-
fconn, err := s.listener.Accept(s.ctx)
149-
if err != nil {
150-
if err == s.ctx.Err() {
151-
return ErrServerClosed
146+
listeners := append(s.opts.listeners, s.listener)
147+
148+
var wg sync.WaitGroup
149+
for _, l := range listeners {
150+
wg.Add(1)
151+
go func(l frame.Listener) {
152+
errCount := 0
153+
for {
154+
fconn, err := l.Accept(s.ctx)
155+
if err != nil {
156+
if err == s.ctx.Err() {
157+
wg.Done()
158+
return
159+
}
160+
errCount++
161+
s.logger.Error("accepted an error when accepting a connection", "err", err, "err_count", errCount)
162+
continue
163+
}
164+
165+
go s.handleFrameConn(fconn, s.logger)
152166
}
153-
errCount++
154-
s.logger.Error("accepted an error when accepting a connection", "err", err, "err_count", errCount)
155-
continue
156-
}
157-
158-
go s.handleFrameConn(fconn, s.logger)
167+
}(l)
159168
}
169+
170+
wg.Wait()
171+
return ErrServerClosed
160172
}
161173

162174
func (s *Server) handleFrameConn(fconn frame.Conn, logger *slog.Logger) {
@@ -380,7 +392,11 @@ func (s *Server) routingDataFrame(c *Context) error {
380392

381393
// dispatch every DataFrames to all downstreams
382394
func (s *Server) dispatchToDownstreams(c *Context) error {
383-
dataFrame := c.Frame
395+
dataFrame := &frame.DataFrame{
396+
Tag: c.Frame.Tag,
397+
Payload: c.Frame.Payload,
398+
Metadata: c.Frame.Metadata,
399+
}
384400
if c.Connection.ClientType() == ClientTypeUpstreamZipper {
385401
c.Logger.Debug("ignored client", "client_type", c.Connection.ClientType().String())
386402
// loop protection

core/server_options.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77

88
"github.com/quic-go/quic-go"
99
"github.com/yomorun/yomo/core/auth"
10+
"github.com/yomorun/yomo/core/frame"
1011
"github.com/yomorun/yomo/core/router"
1112
"github.com/yomorun/yomo/core/ylog"
1213
)
@@ -38,6 +39,7 @@ type serverOptions struct {
3839
router router.Router
3940
connMiddlewares []ConnMiddleware
4041
frameMiddlewares []FrameMiddleware
42+
listeners []frame.Listener
4143
}
4244

4345
func defaultServerOptions() *serverOptions {
@@ -120,3 +122,10 @@ func WithConnMiddleware(mws ...ConnMiddleware) ServerOption {
120122
o.connMiddlewares = append(o.connMiddlewares, mws...)
121123
}
122124
}
125+
126+
// WithFrameListener adds a Listener other than a quic.Listener.
127+
func WithFrameListener(l ...frame.Listener) ServerOption {
128+
return func(o *serverOptions) {
129+
o.listeners = append(o.listeners, l...)
130+
}
131+
}

options.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66

77
"github.com/quic-go/quic-go"
88
"github.com/yomorun/yomo/core"
9+
"github.com/yomorun/yomo/core/frame"
910
"github.com/yomorun/yomo/core/router"
1011
)
1112

@@ -147,4 +148,11 @@ var (
147148
o.serverOption = append(o.serverOption, core.WithFrameMiddleware(mw...))
148149
}
149150
}
151+
152+
// WithFrameListener adds a Listener other than a quic.Listener.
153+
WithFrameListener = func(l ...frame.Listener) ZipperOption {
154+
return func(o *zipperOptions) {
155+
o.serverOption = append(o.serverOption, core.WithFrameListener(l...))
156+
}
157+
}
150158
)

pkg/bridge/ai/api_server.go

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"time"
1414

1515
openai "github.com/sashabaranov/go-openai"
16+
"github.com/yomorun/yomo"
1617
"github.com/yomorun/yomo/ai"
1718
"github.com/yomorun/yomo/pkg/bridge/ai/provider"
1819
"github.com/yomorun/yomo/pkg/bridge/ai/register"
@@ -34,23 +35,20 @@ const (
3435

3536
// BasicAPIServer provides restful service for end user
3637
type BasicAPIServer struct {
37-
zipperAddr string
38-
credential string
3938
httpHandler http.Handler
4039
}
4140

4241
// Serve starts the Basic API Server
43-
func Serve(config *Config, zipperListenAddr string, credential string, logger *slog.Logger) error {
42+
func Serve(config *Config, logger *slog.Logger, source yomo.Source, reducer yomo.StreamFunction) error {
4443
provider, err := provider.GetProvider(config.Server.Provider)
4544
if err != nil {
4645
return err
4746
}
48-
srv, err := NewBasicAPIServer(config, zipperListenAddr, credential, provider, logger)
47+
srv, err := NewBasicAPIServer(config, provider, source, reducer, logger)
4948
if err != nil {
5049
return err
5150
}
5251

53-
logger.Info("start AI Bridge service", "addr", config.Server.Addr, "provider", provider.Name())
5452
return http.ListenAndServe(config.Server.Addr, srv.httpHandler)
5553
}
5654

@@ -80,24 +78,23 @@ func DecorateHandler(h http.Handler, decorates ...func(handler http.Handler) htt
8078
}
8179

8280
// NewBasicAPIServer creates a new restful service
83-
func NewBasicAPIServer(config *Config, zipperAddr, credential string, provider provider.LLMProvider, logger *slog.Logger) (*BasicAPIServer, error) {
84-
zipperAddr = parseZipperAddr(zipperAddr)
85-
81+
func NewBasicAPIServer(config *Config, provider provider.LLMProvider, source yomo.Source, reducer yomo.StreamFunction, logger *slog.Logger) (*BasicAPIServer, error) {
8682
logger = logger.With("service", "llm-bridge")
8783

88-
service := NewService(zipperAddr, provider, &ServiceOptions{
84+
opts := &ServiceOptions{
8985
Logger: logger,
90-
CredentialFunc: func(r *http.Request) (string, error) { return credential, nil },
91-
})
86+
SourceBuilder: func() yomo.Source { return source },
87+
ReducerBuilder: func() yomo.StreamFunction { return reducer },
88+
}
89+
service := NewService(provider, opts)
9290

9391
mux := NewServeMux(service)
9492

9593
server := &BasicAPIServer{
96-
zipperAddr: zipperAddr,
97-
credential: credential,
9894
httpHandler: DecorateHandler(mux, decorateReqContext(service, logger)),
9995
}
10096

97+
logger.Info("start AI Bridge service", "addr", config.Server.Addr, "provider", provider.Name())
10198
return server, nil
10299
}
103100

pkg/bridge/ai/api_server_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,9 @@ func TestServer(t *testing.T) {
4646
return mockCaller(nil), err
4747
}
4848

49-
service := newService("fake_zipper_addr", pd, newCaller, &ServiceOptions{
50-
SourceBuilder: func(_, _ string) yomo.Source { return flow },
51-
ReducerBuilder: func(_, _ string) yomo.StreamFunction { return flow },
49+
service := newService(pd, newCaller, &ServiceOptions{
50+
SourceBuilder: func() yomo.Source { return flow },
51+
ReducerBuilder: func() yomo.StreamFunction { return flow },
5252
MetadataExchanger: func(_ string) (metadata.M, error) { return metadata.M{"hello": "llm bridge"}, nil },
5353
})
5454

pkg/bridge/ai/reducer.go

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
package ai
2+
3+
import (
4+
"github.com/yomorun/yomo"
5+
"github.com/yomorun/yomo/core"
6+
"github.com/yomorun/yomo/core/auth"
7+
"github.com/yomorun/yomo/core/frame"
8+
"github.com/yomorun/yomo/core/metadata"
9+
"github.com/yomorun/yomo/core/serverless"
10+
"github.com/yomorun/yomo/pkg/id"
11+
"github.com/yomorun/yomo/pkg/listener/mem"
12+
)
13+
14+
var _ yomo.Source = &memSource{}
15+
16+
type memSource struct {
17+
cred *auth.Credential
18+
conn *mem.FrameConn
19+
}
20+
21+
func NewSource(conn *mem.FrameConn, cred *auth.Credential) yomo.Source {
22+
return &memSource{
23+
conn: conn,
24+
cred: cred,
25+
}
26+
}
27+
28+
func (m *memSource) Connect() error {
29+
hf := &frame.HandshakeFrame{
30+
Name: "fc-source",
31+
ID: id.New(),
32+
ClientType: byte(core.ClientTypeSource),
33+
AuthName: m.cred.Name(),
34+
AuthPayload: m.cred.Payload(),
35+
Version: core.Version,
36+
}
37+
38+
return m.conn.Handshake(hf)
39+
}
40+
41+
func (m *memSource) Write(tag uint32, data []byte) error {
42+
df := &frame.DataFrame{
43+
Tag: tag,
44+
Payload: data,
45+
}
46+
return m.conn.WriteFrame(df)
47+
}
48+
49+
func (m *memSource) Close() error { return nil }
50+
func (m *memSource) SetErrorHandler(_ func(_ error)) {}
51+
func (m *memSource) WriteWithTarget(_ uint32, _ []byte, _ string) error { return nil }
52+
53+
type memStreamFunction struct {
54+
observedTags []uint32
55+
handler core.AsyncHandler
56+
cred *auth.Credential
57+
conn *mem.FrameConn
58+
}
59+
60+
// NewReducer creates a new instance of memory StreamFunction.
61+
func NewReducer(conn *mem.FrameConn, cred *auth.Credential) yomo.StreamFunction {
62+
return &memStreamFunction{
63+
conn: conn,
64+
cred: cred,
65+
}
66+
}
67+
68+
func (m *memStreamFunction) Close() error {
69+
return nil
70+
}
71+
72+
func (m *memStreamFunction) Connect() error {
73+
hf := &frame.HandshakeFrame{
74+
Name: "fc-reducer",
75+
ID: id.New(),
76+
ClientType: byte(core.ClientTypeStreamFunction),
77+
AuthName: m.cred.Name(),
78+
AuthPayload: m.cred.Payload(),
79+
ObserveDataTags: m.observedTags,
80+
Version: core.Version,
81+
}
82+
83+
if err := m.conn.Handshake(hf); err != nil {
84+
return nil
85+
}
86+
87+
go func() {
88+
for {
89+
f, err := m.conn.ReadFrame()
90+
if err != nil {
91+
return
92+
}
93+
94+
switch ff := f.(type) {
95+
case *frame.DataFrame:
96+
go m.onDataFrame(ff)
97+
default:
98+
return
99+
}
100+
}
101+
}()
102+
103+
return nil
104+
}
105+
106+
func (m *memStreamFunction) onDataFrame(dataFrame *frame.DataFrame) {
107+
md, err := metadata.Decode(dataFrame.Metadata)
108+
if err != nil {
109+
return
110+
}
111+
112+
serverlessCtx := serverless.NewContext(m.conn, dataFrame.Tag, md, dataFrame.Payload)
113+
m.handler(serverlessCtx)
114+
}
115+
116+
func (m *memStreamFunction) SetHandler(fn core.AsyncHandler) error {
117+
m.handler = fn
118+
return nil
119+
}
120+
121+
func (m *memStreamFunction) Init(_ func() error) error { return nil }
122+
func (m *memStreamFunction) SetCronHandler(_ string, _ core.CronHandler) error { return nil }
123+
func (m *memStreamFunction) SetErrorHandler(_ func(err error)) {}
124+
func (m *memStreamFunction) SetObserveDataTags(tags ...uint32) { m.observedTags = tags }
125+
func (m *memStreamFunction) SetPipeHandler(fn core.PipeHandler) error { return nil }
126+
func (m *memStreamFunction) SetWantedTarget(string) {}
127+
func (m *memStreamFunction) Wait() {}
128+
129+
var _ yomo.StreamFunction = &memStreamFunction{}

0 commit comments

Comments
 (0)