@@ -11,18 +11,18 @@ import (
1111 "net"
1212 "os"
1313 "strings"
14+ "sync"
1415 "time"
1516
17+ "github.com/ipfs/go-log"
1618 "github.com/libp2p/go-libp2p/core/peer"
1719 "github.com/mudler/LocalAI/pkg/utils"
20+ "github.com/mudler/edgevpn/pkg/config"
1821 "github.com/mudler/edgevpn/pkg/node"
1922 "github.com/mudler/edgevpn/pkg/protocol"
23+ "github.com/mudler/edgevpn/pkg/services"
2024 "github.com/mudler/edgevpn/pkg/types"
2125 "github.com/phayes/freeport"
22-
23- "github.com/ipfs/go-log"
24- "github.com/mudler/edgevpn/pkg/config"
25- "github.com/mudler/edgevpn/pkg/services"
2626 zlog "github.com/rs/zerolog/log"
2727
2828 "github.com/mudler/edgevpn/pkg/logger"
@@ -34,6 +34,15 @@ func GenerateToken() string {
3434 return newData .Base64 ()
3535}
3636
37+ func IsP2PEnabled () bool {
38+ return true
39+ }
40+
41+ func nodeID () string {
42+ hostname , _ := os .Hostname ()
43+ return hostname
44+ }
45+
3746func allocateLocalService (ctx context.Context , node * node.Node , listenAddr , service string ) error {
3847
3948 zlog .Info ().Msgf ("Allocating service '%s' on: %s" , service , listenAddr )
@@ -135,14 +144,27 @@ func copyStream(closer chan struct{}, dst io.Writer, src io.Reader) {
135144 io .Copy (dst , src )
136145}
137146
147+ var availableNodes = []NodeData {}
148+ var mu sync.Mutex
149+
150+ func GetAvailableNodes () []NodeData {
151+ mu .Lock ()
152+ defer mu .Unlock ()
153+ return availableNodes
154+ }
155+
138156// This is the main of the server (which keeps the env variable updated)
139157// This starts a goroutine that keeps LLAMACPP_GRPC_SERVERS updated with the discovered services
140158func LLamaCPPRPCServerDiscoverer (ctx context.Context , token string ) error {
141159 tunnels , err := discoveryTunnels (ctx , token )
142160 if err != nil {
143161 return err
144162 }
145-
163+ // TODO: discoveryTunnels should return all the nodes that are available?
164+ // In this way we updated availableNodes here instead of appending
165+ // e.g. we have a LastSeen field in NodeData that is updated in discoveryTunnels
166+ // each time the node is seen
167+ // In this case the below function should be idempotent and just keep track of the nodes
146168 go func () {
147169 totalTunnels := []string {}
148170 for {
@@ -151,19 +173,22 @@ func LLamaCPPRPCServerDiscoverer(ctx context.Context, token string) error {
151173 zlog .Error ().Msg ("Discoverer stopped" )
152174 return
153175 case tunnel := <- tunnels :
154-
155- totalTunnels = append (totalTunnels , tunnel )
176+ totalTunnels = append (totalTunnels , tunnel .TunnelAddress )
156177 os .Setenv ("LLAMACPP_GRPC_SERVERS" , strings .Join (totalTunnels , "," ))
157178 zlog .Debug ().Msgf ("setting LLAMACPP_GRPC_SERVERS to %s" , strings .Join (totalTunnels , "," ))
179+ mu .Lock ()
180+ defer mu .Unlock ()
181+ availableNodes = append (availableNodes , tunnel )
182+ zlog .Info ().Msgf ("Node %s available" , tunnel .ID )
158183 }
159184 }
160185 }()
161186
162187 return nil
163188}
164189
165- func discoveryTunnels (ctx context.Context , token string ) (chan string , error ) {
166- tunnels := make (chan string )
190+ func discoveryTunnels (ctx context.Context , token string ) (chan NodeData , error ) {
191+ tunnels := make (chan NodeData )
167192
168193 nodeOpts , err := newNodeOpts (token )
169194 if err != nil {
@@ -196,18 +221,24 @@ func discoveryTunnels(ctx context.Context, token string) (chan string, error) {
196221 zlog .Debug ().Msg ("Searching for workers" )
197222
198223 data := ledger .LastBlock ().Storage ["services_localai" ]
199- for k := range data {
224+ for k , v := range data {
200225 zlog .Info ().Msgf ("Found worker %s" , k )
201226 if _ , found := emitted [k ]; ! found {
202227 emitted [k ] = true
228+ nd := & NodeData {}
229+ if err := v .Unmarshal (nd ); err != nil {
230+ zlog .Error ().Msg ("cannot unmarshal node data" )
231+ continue
232+ }
203233 //discoveredPeers <- k
204234 port , err := freeport .GetFreePort ()
205235 if err != nil {
206236 fmt .Print (err )
207237 }
208238 tunnelAddress := fmt .Sprintf ("127.0.0.1:%d" , port )
209239 go allocateLocalService (ctx , n , tunnelAddress , k )
210- tunnels <- tunnelAddress
240+ nd .TunnelAddress = tunnelAddress
241+ tunnels <- * nd
211242 }
212243 }
213244 }
@@ -255,7 +286,10 @@ func BindLLamaCPPWorker(ctx context.Context, host, port, token string) error {
255286 // If mismatch, update the blockchain
256287 if ! found {
257288 updatedMap := map [string ]interface {}{}
258- updatedMap [name ] = "p2p"
289+ updatedMap [name ] = & NodeData {
290+ Name : name ,
291+ ID : nodeID (),
292+ }
259293 ledger .Add ("services_localai" , updatedMap )
260294 }
261295 },
0 commit comments