@@ -11,18 +11,18 @@ import (
1111 "net"
1212 "os"
1313 "strings"
14+ "sync"
1415 "time"
1516
17+ "github.com/ipfs/go-log"
1618 "github.com/libp2p/go-libp2p/core/peer"
1719 "github.com/mudler/LocalAI/pkg/utils"
20+ "github.com/mudler/edgevpn/pkg/config"
1821 "github.com/mudler/edgevpn/pkg/node"
1922 "github.com/mudler/edgevpn/pkg/protocol"
23+ "github.com/mudler/edgevpn/pkg/services"
2024 "github.com/mudler/edgevpn/pkg/types"
2125 "github.com/phayes/freeport"
22-
23- "github.com/ipfs/go-log"
24- "github.com/mudler/edgevpn/pkg/config"
25- "github.com/mudler/edgevpn/pkg/services"
2626 zlog "github.com/rs/zerolog/log"
2727
2828 "github.com/mudler/edgevpn/pkg/logger"
@@ -34,6 +34,11 @@ func GenerateToken() string {
3434 return newData .Base64 ()
3535}
3636
37+ func nodeID () string {
38+ hostname , _ := os .Hostname ()
39+ return hostname
40+ }
41+
3742func allocateLocalService (ctx context.Context , node * node.Node , listenAddr , service string ) error {
3843
3944 zlog .Info ().Msgf ("Allocating service '%s' on: %s" , service , listenAddr )
@@ -135,6 +140,15 @@ func copyStream(closer chan struct{}, dst io.Writer, src io.Reader) {
135140 io .Copy (dst , src )
136141}
137142
143+ var availableNodes = []NodeData {}
144+ var mu sync.Mutex
145+
146+ func GetAvailableNodes () []NodeData {
147+ mu .Lock ()
148+ defer mu .Unlock ()
149+ return availableNodes
150+ }
151+
138152// This is the main of the server (which keeps the env variable updated)
139153// This starts a goroutine that keeps LLAMACPP_GRPC_SERVERS updated with the discovered services
140154func LLamaCPPRPCServerDiscoverer (ctx context.Context , token string ) error {
@@ -151,19 +165,22 @@ func LLamaCPPRPCServerDiscoverer(ctx context.Context, token string) error {
151165 zlog .Error ().Msg ("Discoverer stopped" )
152166 return
153167 case tunnel := <- tunnels :
154-
155- totalTunnels = append (totalTunnels , tunnel )
168+ totalTunnels = append (totalTunnels , tunnel .TunnelAddress )
156169 os .Setenv ("LLAMACPP_GRPC_SERVERS" , strings .Join (totalTunnels , "," ))
157170 zlog .Debug ().Msgf ("setting LLAMACPP_GRPC_SERVERS to %s" , strings .Join (totalTunnels , "," ))
171+ mu .Lock ()
172+ defer mu .Unlock ()
173+ availableNodes = append (availableNodes , tunnel )
174+ zlog .Info ().Msgf ("Node %s available" , tunnel .ID )
158175 }
159176 }
160177 }()
161178
162179 return nil
163180}
164181
165- func discoveryTunnels (ctx context.Context , token string ) (chan string , error ) {
166- tunnels := make (chan string )
182+ func discoveryTunnels (ctx context.Context , token string ) (chan NodeData , error ) {
183+ tunnels := make (chan NodeData )
167184
168185 nodeOpts , err := newNodeOpts (token )
169186 if err != nil {
@@ -196,18 +213,24 @@ func discoveryTunnels(ctx context.Context, token string) (chan string, error) {
196213 zlog .Debug ().Msg ("Searching for workers" )
197214
198215 data := ledger .LastBlock ().Storage ["services_localai" ]
199- for k := range data {
216+ for k , v := range data {
200217 zlog .Info ().Msgf ("Found worker %s" , k )
201218 if _ , found := emitted [k ]; ! found {
202219 emitted [k ] = true
220+ nd := & NodeData {}
221+ if err := v .Unmarshal (nd ); err != nil {
222+ zlog .Error ().Msg ("cannot unmarshal node data" )
223+ continue
224+ }
203225 //discoveredPeers <- k
204226 port , err := freeport .GetFreePort ()
205227 if err != nil {
206228 fmt .Print (err )
207229 }
208230 tunnelAddress := fmt .Sprintf ("127.0.0.1:%d" , port )
209231 go allocateLocalService (ctx , n , tunnelAddress , k )
210- tunnels <- tunnelAddress
232+ nd .TunnelAddress = tunnelAddress
233+ tunnels <- * nd
211234 }
212235 }
213236 }
@@ -217,6 +240,12 @@ func discoveryTunnels(ctx context.Context, token string) (chan string, error) {
217240 return tunnels , err
218241}
219242
243+ type NodeData struct {
244+ Name string
245+ ID string
246+ TunnelAddress string
247+ }
248+
220249// This is the P2P worker main
221250func BindLLamaCPPWorker (ctx context.Context , host , port , token string ) error {
222251 llger := logger .New (log .LevelFatal )
@@ -255,7 +284,10 @@ func BindLLamaCPPWorker(ctx context.Context, host, port, token string) error {
255284 // If mismatch, update the blockchain
256285 if ! found {
257286 updatedMap := map [string ]interface {}{}
258- updatedMap [name ] = "p2p"
287+ updatedMap [name ] = & NodeData {
288+ Name : name ,
289+ ID : nodeID (),
290+ }
259291 ledger .Add ("services_localai" , updatedMap )
260292 }
261293 },
0 commit comments