Skip to content

Commit 8e90774

Browse files
committed
provider status: internal metrics
1 parent 7c67b0f commit 8e90774

File tree

19 files changed

+2775
-225
lines changed

19 files changed

+2775
-225
lines changed

Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ mocks:
113113
mockery -case=underscore -dir app/market -output app/market/mocks -name Engine
114114
mockery -case=underscore -dir app/market -output app/market/mocks -name Facilitator
115115
mockery -case=underscore -dir marketplace -output marketplace/mocks -name Handler
116+
mockery -case=underscore -dir provider -output provider/mocks -name StatusClient
116117
mockery -case=underscore -dir provider/cluster -output provider/cluster/mocks -name Client
117118
mockery -case=underscore -dir provider/cluster -output provider/cluster/mocks -name Cluster
118119
mockery -case=underscore -dir provider/cluster -output provider/cluster/mocks -name Deployment

_run/kube/run.sh

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,14 @@ case "$1" in
4343
manifest)
4444
akash deployment sendmani deployment.yml "$2" -k master
4545
;;
46+
status)
47+
akash provider status
48+
;;
4649
ping)
4750
curl -I "hello.$(minikube ip).nip.io"
4851
;;
4952
*)
50-
echo "USAGE: $0 <init|akashd|send|query|marketplace|provider|deploy|manifest|ping>" >&2
53+
echo "USAGE: $0 <init|akashd|send|query|marketplace|provider|deploy|manifest|status|ping>" >&2
5154
exit 1
5255
;;
5356
esac

cmd/akash/provider.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,7 @@ func doProviderRunCommand(session session.Session, cmd *cobra.Command, args []st
209209

210210
go func() {
211211
defer cancel()
212-
errch <- grpc.RunServer(ctx, session.Log(), "tcp", "9090", service.ManifestHandler(), cclient)
212+
errch <- grpc.RunServer(ctx, session.Log(), "tcp", "9090", service.ManifestHandler(), cclient, service)
213213
}()
214214

215215
go func() {
@@ -266,19 +266,18 @@ func doProviderStatusCommand(session session.Session, cmd *cobra.Command, args [
266266

267267
type outputItem struct {
268268
Provider *types.Provider
269-
Status *types.ServerStatus
270-
Error error `json:",omitempty"`
269+
Status *types.ServerStatusParseable
270+
Error string `json:",omitempty"`
271271
}
272272

273273
output := []outputItem{}
274274

275275
for _, provider := range providers {
276276
status, err := http.Status(session.Ctx(), &provider)
277277
if err != nil {
278-
output = append(output, outputItem{Provider: &provider, Error: err})
278+
output = append(output, outputItem{Provider: &provider, Error: err.Error()})
279279
continue
280280
}
281-
282281
output = append(output, outputItem{Provider: &provider, Status: status})
283282
}
284283

provider/bidengine/service.go

Lines changed: 44 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package bidengine
22

33
import (
44
"context"
5+
"errors"
56

67
lifecycle "github.com/boz/go-lifecycle"
78
"github.com/ovrclk/akash/provider/cluster"
@@ -10,7 +11,14 @@ import (
1011
"github.com/ovrclk/akash/types"
1112
)
1213

14+
var ErrNotRunning = errors.New("not running")
15+
16+
type StatusClient interface {
17+
Status(context.Context) (*types.ProviderBidengineStatus, error)
18+
}
19+
1320
type Service interface {
21+
StatusClient
1422
Close() error
1523
Done() <-chan struct{}
1624
}
@@ -34,13 +42,14 @@ func NewService(ctx context.Context, session session.Session, cluster cluster.Cl
3442
session.Log().Info("found orders", "count", len(existingOrders))
3543

3644
s := &service{
37-
session: session,
38-
cluster: cluster,
39-
bus: bus,
40-
sub: sub,
41-
orders: make(map[string]*order),
42-
drainch: make(chan *order),
43-
lc: lifecycle.New(),
45+
session: session,
46+
cluster: cluster,
47+
bus: bus,
48+
sub: sub,
49+
statusch: make(chan chan<- *types.ProviderBidengineStatus),
50+
orders: make(map[string]*order),
51+
drainch: make(chan *order),
52+
lc: lifecycle.New(),
4453
}
4554

4655
go s.lc.WatchContext(ctx)
@@ -56,8 +65,9 @@ type service struct {
5665
bus event.Bus
5766
sub event.Subscriber
5867

59-
orders map[string]*order
60-
drainch chan *order
68+
statusch chan chan<- *types.ProviderBidengineStatus
69+
orders map[string]*order
70+
drainch chan *order
6171

6272
lc lifecycle.Lifecycle
6373
}
@@ -71,6 +81,27 @@ func (s *service) Done() <-chan struct{} {
7181
return s.lc.Done()
7282
}
7383

84+
func (s *service) Status(ctx context.Context) (*types.ProviderBidengineStatus, error) {
85+
ch := make(chan *types.ProviderBidengineStatus, 1)
86+
87+
select {
88+
case <-s.lc.Done():
89+
return nil, ErrNotRunning
90+
case <-ctx.Done():
91+
return nil, ctx.Err()
92+
case s.statusch <- ch:
93+
}
94+
95+
select {
96+
case <-s.lc.Done():
97+
return nil, ErrNotRunning
98+
case <-ctx.Done():
99+
return nil, ctx.Err()
100+
case result := <-ch:
101+
return result, nil
102+
}
103+
}
104+
74105
func (s *service) run(existingOrders []existingOrder) {
75106
defer s.lc.ShutdownCompleted()
76107
defer s.sub.Close()
@@ -117,6 +148,10 @@ loop:
117148
s.orders[key] = order
118149

119150
}
151+
case ch := <-s.statusch:
152+
ch <- &types.ProviderBidengineStatus{
153+
Orders: uint32(len(s.orders)),
154+
}
120155
case order := <-s.drainch:
121156
// child done
122157
delete(s.orders, order.order.Path())

provider/bidengine/service_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,10 @@ func TestService(t *testing.T) {
8484

8585
testutil.SleepForThreadStart(t)
8686

87+
status, err := service.Status(ctx)
88+
assert.NoError(t, err)
89+
assert.NotNil(t, status)
90+
8791
assert.NoError(t, service.Close())
8892

8993
mock.AssertExpectationsForObjects(t, qclient, txclient, creso, cluster)

provider/cluster/inventory.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package cluster
22

33
import (
4+
"context"
45
"errors"
56
"time"
67

@@ -21,6 +22,7 @@ type inventoryService struct {
2122
client Client
2223
sub event.Subscriber
2324

25+
statusch chan chan<- *types.ProviderInventoryStatus
2426
lookupch chan inventoryRequest
2527
reservech chan inventoryRequest
2628
unreservech chan inventoryRequest
@@ -49,6 +51,7 @@ func newInventoryService(
4951
config: config,
5052
client: client,
5153
sub: sub,
54+
statusch: make(chan chan<- *types.ProviderInventoryStatus),
5255
lookupch: make(chan inventoryRequest),
5356
reservech: make(chan inventoryRequest),
5457
unreservech: make(chan inventoryRequest),
@@ -128,6 +131,27 @@ func (is *inventoryService) unreserve(order types.OrderID, resources types.Resou
128131
}
129132
}
130133

134+
func (is *inventoryService) status(ctx context.Context) (*types.ProviderInventoryStatus, error) {
135+
ch := make(chan *types.ProviderInventoryStatus, 1)
136+
137+
select {
138+
case <-is.lc.Done():
139+
return nil, ErrNotRunning
140+
case <-ctx.Done():
141+
return nil, ctx.Err()
142+
case is.statusch <- ch:
143+
}
144+
145+
select {
146+
case <-is.lc.Done():
147+
return nil, ErrNotRunning
148+
case <-ctx.Done():
149+
return nil, ctx.Err()
150+
case result := <-ch:
151+
return result, nil
152+
}
153+
}
154+
131155
type inventoryRequest struct {
132156
order types.OrderID
133157
resources types.ResourceList
@@ -237,6 +261,10 @@ loop:
237261

238262
req.ch <- inventoryResponse{err: errNotFound}
239263

264+
case ch := <-is.statusch:
265+
266+
ch <- is.getStatus(inventory, reservations)
267+
240268
case <-t.C:
241269
// run cluster inventory check
242270

@@ -287,6 +315,40 @@ func (is *inventoryService) runCheck() <-chan runner.Result {
287315
})
288316
}
289317

318+
func (is *inventoryService) getStatus(
319+
inventory []Node, reservations []*reservation) *types.ProviderInventoryStatus {
320+
321+
status := &types.ProviderInventoryStatus{
322+
Reservations: &types.ProviderInventoryStatus_Reservations{},
323+
}
324+
325+
for _, reservation := range reservations {
326+
total := &types.ResourceUnit{}
327+
328+
for _, resource := range reservation.Resources().GetResources() {
329+
total.CPU += resource.Unit.CPU
330+
total.Memory += resource.Unit.Memory
331+
total.Disk += resource.Unit.Disk
332+
}
333+
334+
if reservation.allocated {
335+
status.Reservations.Active = append(status.Reservations.Active, total)
336+
} else {
337+
status.Reservations.Pending = append(status.Reservations.Pending, total)
338+
}
339+
}
340+
341+
for _, node := range inventory {
342+
status.Available = append(status.Available, &types.ResourceUnit{
343+
CPU: node.Available().CPU,
344+
Memory: node.Available().Memory,
345+
Disk: node.Available().Disk,
346+
})
347+
}
348+
349+
return status
350+
}
351+
290352
func reservationAllocateable(inventory []Node, reservations []*reservation, newReservation *reservation) bool {
291353

292354
// 1. for each unallocated reservation, subtract its resources

provider/cluster/service.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,13 @@ type Cluster interface {
2020
Unreserve(types.OrderID, types.ResourceList) error
2121
}
2222

23+
type StatusClient interface {
24+
Status(context.Context) (*types.ProviderClusterStatus, error)
25+
}
26+
2327
// Manage compute cluster for the provider. Will eventually integrate with kubernetes, etc...
2428
type Service interface {
29+
StatusClient
2530
Cluster
2631
Close() error
2732
Ready() <-chan struct{}
@@ -68,6 +73,7 @@ func NewService(ctx context.Context, session session.Session, bus event.Bus, cli
6873
bus: bus,
6974
sub: sub,
7075
inventory: inventory,
76+
statusch: make(chan chan<- *types.ProviderClusterStatus),
7177
managers: make(map[string]*deploymentManager),
7278
managerch: make(chan *deploymentManager),
7379
log: log,
@@ -88,6 +94,7 @@ type service struct {
8894

8995
inventory *inventoryService
9096

97+
statusch chan chan<- *types.ProviderClusterStatus
9198
managers map[string]*deploymentManager
9299
managerch chan *deploymentManager
93100

@@ -117,6 +124,35 @@ func (s *service) Unreserve(order types.OrderID, resources types.ResourceList) e
117124
return err
118125
}
119126

127+
func (s *service) Status(ctx context.Context) (*types.ProviderClusterStatus, error) {
128+
129+
istatus, err := s.inventory.status(ctx)
130+
if err != nil {
131+
return nil, err
132+
}
133+
134+
ch := make(chan *types.ProviderClusterStatus, 1)
135+
136+
select {
137+
case <-s.lc.Done():
138+
return nil, ErrNotRunning
139+
case <-ctx.Done():
140+
return nil, ctx.Err()
141+
case s.statusch <- ch:
142+
}
143+
144+
select {
145+
case <-s.lc.Done():
146+
return nil, ErrNotRunning
147+
case <-ctx.Done():
148+
return nil, ctx.Err()
149+
case result := <-ch:
150+
result.Inventory = istatus
151+
return result, nil
152+
}
153+
154+
}
155+
120156
func (s *service) run(deployments []Deployment) {
121157
defer s.lc.ShutdownCompleted()
122158
defer s.sub.Close()
@@ -181,6 +217,12 @@ loop:
181217

182218
}
183219

220+
case ch := <-s.statusch:
221+
222+
ch <- &types.ProviderClusterStatus{
223+
Leases: uint32(len(s.managers)),
224+
}
225+
184226
case dm := <-s.managerch:
185227

186228
s.log.Debug("manager done", "lease", dm.lease)

provider/cluster/service_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@ func TestService_Reserve(t *testing.T) {
3939
assert.Equal(t, order.OrderID, reservation.OrderID())
4040
assert.Equal(t, group, reservation.Resources())
4141

42+
status, err := c.Status(ctx)
43+
assert.NoError(t, err)
44+
assert.NotNil(t, status)
45+
4246
require.NoError(t, c.Close())
4347

4448
_, err = c.Reserve(order.OrderID, group)

provider/grpc/client_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"github.com/ovrclk/akash/manifest"
1010
kmocks "github.com/ovrclk/akash/provider/cluster/kube/mocks"
1111
"github.com/ovrclk/akash/provider/manifest/mocks"
12+
pmocks "github.com/ovrclk/akash/provider/mocks"
1213
"github.com/ovrclk/akash/sdl"
1314
"github.com/ovrclk/akash/testutil"
1415
"github.com/stretchr/testify/assert"
@@ -35,12 +36,14 @@ func TestSendManifest(t *testing.T) {
3536
req, _, err := manifest.SignManifest(mani, signer, deployment)
3637
assert.NoError(t, err)
3738

39+
sclient := &pmocks.StatusClient{}
40+
3841
handler := &mocks.Handler{}
3942
handler.On("HandleManifest", mock.Anything, mock.Anything).Return(nil)
4043

4144
client := &kmocks.Client{}
4245

43-
server := newServer(log.NewTMLogger(os.Stdout), "tcp", ":3001", handler, client)
46+
server := newServer(log.NewTMLogger(os.Stdout), "tcp", ":3001", handler, client, sclient)
4447
go func() {
4548
err := server.listenAndServe()
4649
require.NoError(t, err)

0 commit comments

Comments
 (0)