@@ -3,37 +3,47 @@ package explorer
33import (
44 "context"
55 "fmt"
6+ "strings"
7+ "sync"
68 "time"
79
810 "github.com/mudler/LocalAI/core/p2p"
911 "github.com/mudler/edgevpn/pkg/blockchain"
1012)
1113
1214type DiscoveryServer struct {
15+ sync.Mutex
1316 database * Database
1417 networkState * NetworkState
1518}
1619
1720type NetworkState struct {
18- Nodes map [string ]map [string ]p2p.NodeData
21+ Networks map [string ]Network
22+ }
23+
24+ func (s * DiscoveryServer ) NetworkState () * NetworkState {
25+ s .Lock ()
26+ defer s .Unlock ()
27+ return s .networkState
1928}
2029
2130func NewDiscoveryServer (db * Database ) * DiscoveryServer {
2231 return & DiscoveryServer {
2332 database : db ,
2433 networkState : & NetworkState {
25- Nodes : map [string ]map [ string ]p2p. NodeData {},
34+ Networks : map [string ]Network {},
2635 },
2736 }
2837}
2938
39+ type Network struct {
40+ Clusters []ClusterData
41+ }
42+
3043func (s * DiscoveryServer ) runBackground () {
3144 for _ , token := range s .database .TokenList () {
32-
33- fmt .Println ("Checking token" , token )
3445 c , cancel := context .WithTimeout (context .Background (), 50 * time .Second )
3546 defer cancel ()
36- fmt .Println ("Starting node" , token )
3747
3848 // Connect to the network
3949 // Get the number of nodes
@@ -45,39 +55,38 @@ func (s *DiscoveryServer) runBackground() {
4555 continue
4656 }
4757
48- fmt .Println ("Starting network" , token )
4958 err = n .Start (c )
5059 if err != nil {
5160 fmt .Println (err )
5261 continue
5362 }
54- fmt .Println ("ledger" , token )
5563
5664 ledger , err := n .Ledger ()
5765 if err != nil {
5866 fmt .Println (err )
5967 continue
6068 }
6169
62- ledgerKeys := make (chan string )
63- go s .getLedgerKeys (c , ledger , ledgerKeys )
70+ networkData := make (chan ClusterData )
6471
65- ledgerK := []string {}
66- fmt .Println ("waiting for ledger keys" , token )
72+ // get the network data - it takes the whole timeout
73+ // as we might not be connected to the network yet,
74+ // and few attempts would have to be made before bailing out
75+ go s .retrieveNetworkData (c , ledger , networkData )
6776
68- LOOP:
69- for {
70- select {
71- case <- c .Done ():
72- fmt .Println ("Context exhausted" )
73- break LOOP
74- case key := <- ledgerKeys :
75- ledgerK = append (ledgerK , key )
76- }
77+ ledgerK := []ClusterData {}
78+ for key := range networkData {
79+ ledgerK = append (ledgerK , key )
7780 }
7881
7982 fmt .Println ("Token network" , token )
80- fmt .Println ("Found the following ledger keys in the network" , ledgerK )
83+ fmt .Println ("Found the following workers in the network" , ledgerK )
84+
85+ s .Lock ()
86+ s .networkState .Networks [token ] = Network {
87+ Clusters : ledgerK ,
88+ }
89+ s .Unlock ()
8190 // get new services, allocate and return to the channel
8291
8392 // TODO:
@@ -89,26 +98,69 @@ func (s *DiscoveryServer) runBackground() {
8998 }
9099}
91100
92- func (s * DiscoveryServer ) getLedgerKeys (c context.Context , ledger * blockchain.Ledger , ledgerKeys chan string ) {
93- keys := map [string ]struct {}{}
101+ type ClusterData struct {
102+ Workers []string
103+ Type string
104+ }
105+
106+ func (s * DiscoveryServer ) retrieveNetworkData (c context.Context , ledger * blockchain.Ledger , networkData chan ClusterData ) {
107+ clusters := map [string ]ClusterData {}
108+
109+ defer func () {
110+ fmt .Println ("Defer clusters" , clusters )
111+
112+ for _ , n := range clusters {
113+ networkData <- n
114+ }
115+ close (networkData )
116+ }()
94117
95118 for {
96119 select {
97120 case <- c .Done ():
121+ fmt .Println ("Closing with ccluster" )
122+ fmt .Println (clusters )
98123 return
99124 default :
100125 time .Sleep (5 * time .Second )
101126
102127 data := ledger .LastBlock ().Storage
103- for k , _ := range data {
104- if _ , ok := keys [k ]; ! ok {
105- keys [k ] = struct {}{}
106- ledgerKeys <- k
128+ LEDGER:
129+ for d := range data {
130+ toScanForWorkers := false
131+ cd := ClusterData {}
132+ isWorkerCluster := d == p2p .WorkerID || (strings .Contains (d , "_" ) && strings .Contains (d , p2p .WorkerID ))
133+ isFederatedCluster := d == p2p .FederatedID || (strings .Contains (d , "_" ) && strings .Contains (d , p2p .FederatedID ))
134+ switch {
135+ case isWorkerCluster :
136+ toScanForWorkers = true
137+ cd .Type = "worker"
138+ case isFederatedCluster :
139+ toScanForWorkers = true
140+ cd .Type = "federated"
141+
142+ }
143+
144+ if ! toScanForWorkers {
145+ continue LEDGER
107146 }
147+
148+ DATA:
149+ for _ , v := range data [d ] {
150+ nd := & p2p.NodeData {}
151+ if err := v .Unmarshal (nd ); err != nil {
152+ continue DATA
153+ }
154+
155+ if nd .IsOnline () {
156+ (& cd ).Workers = append (cd .Workers , nd .ID )
157+ }
158+ }
159+
160+ clusters [d ] = cd
108161 }
109162 }
110163 }
111-
112164}
113165
114166// Start the discovery server. This is meant to be run in to a goroutine.
0 commit comments