Skip to content

Commit 837e751

Browse files
dmancpakim249CAL
authored andcommitted
feat: Add reservations expiration tracking to DataAPI (#1733)
* Initialize subgraph * Initialize subgraph * Initialize subgraph * Save payment subgraph state * fix * Add reservation collector to dataapi * Add query examples + more test coverage * Remove docker compose * Add scripts for updating abi and adds a CI check when subgraph tests run * fix flags * fix lint error * Fix rebase issues * Inject the registry * wrap errors * expose more info from reservation data * fmt * rename updateCounts to updateMetrics * Document subgraph.NewApi in ejections tool
1 parent e7a1115 commit 837e751

File tree

15 files changed

+272
-55
lines changed

15 files changed

+272
-55
lines changed

disperser/cmd/dataapi/config.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ type Config struct {
2828
PrometheusApiAddr string
2929
SubgraphApiBatchMetadataAddr string
3030
SubgraphApiOperatorStateAddr string
31+
SubgraphApiPaymentsAddr string
3132
ServerMode string
3233
AllowOrigins []string
3334

@@ -63,6 +64,7 @@ func NewConfig(ctx *cli.Context) (Config, error) {
6364
SocketAddr: ctx.GlobalString(flags.SocketAddrFlag.Name),
6465
SubgraphApiBatchMetadataAddr: ctx.GlobalString(flags.SubgraphApiBatchMetadataAddrFlag.Name),
6566
SubgraphApiOperatorStateAddr: ctx.GlobalString(flags.SubgraphApiOperatorStateAddrFlag.Name),
67+
SubgraphApiPaymentsAddr: ctx.GlobalString(flags.SubgraphApiPaymentsAddrFlag.Name),
6668
BLSOperatorStateRetrieverAddr: ctx.GlobalString(flags.BlsOperatorStateRetrieverFlag.Name),
6769
EigenDAServiceManagerAddr: ctx.GlobalString(flags.EigenDAServiceManagerFlag.Name),
6870
EigenDADirectory: ctx.GlobalString(flags.EigenDADirectoryFlag.Name),

disperser/cmd/dataapi/flags/flags.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,15 +59,15 @@ var (
5959
}
6060
SubgraphApiBatchMetadataAddrFlag = cli.StringFlag{
6161
Name: common.PrefixFlag(FlagPrefix, "sub-batch-metadata-socket-addr"),
62-
//We need the socket address of the subgraph batch metadata api to pull the subgraph data from.
63-
Usage: "the socket address of the subgraph batch metadata api",
62+
//We need the URL of the subgraph batch metadata api to pull the subgraph data from.
63+
Usage: "the URL of the subgraph batch metadata api",
6464
EnvVar: common.PrefixEnvVar(envVarPrefix, "SUBGRAPH_BATCH_METADATA_API_SOCKET_ADDR"),
6565
Required: true,
6666
}
6767
SubgraphApiOperatorStateAddrFlag = cli.StringFlag{
6868
Name: common.PrefixFlag(FlagPrefix, "sub-op-state-socket-addr"),
69-
//We need the socket address of the subgraph operator state api to pull the subgraph data from.
70-
Usage: "the socket address of the subgraph operator state api",
69+
//We need the URL of the subgraph operator state api to pull the subgraph data from.
70+
Usage: "the URL of the subgraph operator state api",
7171
EnvVar: common.PrefixEnvVar(envVarPrefix, "SUBGRAPH_OPERATOR_STATE_API_SOCKET_ADDR"),
7272
Required: true,
7373
}
@@ -77,6 +77,13 @@ var (
7777
Required: false,
7878
EnvVar: common.PrefixEnvVar(envVarPrefix, "EIGENDA_DIRECTORY"),
7979
}
80+
SubgraphApiPaymentsAddrFlag = cli.StringFlag{
81+
Name: common.PrefixFlag(FlagPrefix, "sub-payments-socket-addr"),
82+
//We need the URL of the subgraph payments api to pull the subgraph data from.
83+
Usage: "the URL of the subgraph payments api",
84+
EnvVar: common.PrefixEnvVar(envVarPrefix, "SUBGRAPH_PAYMENTS_API_SOCKET_ADDR"),
85+
Required: true,
86+
}
8087
BlsOperatorStateRetrieverFlag = cli.StringFlag{
8188
Name: common.PrefixFlag(FlagPrefix, "bls-operator-state-retriever"),
8289
Usage: "[Deprecated: use EigenDADirectory instead] Address of the BLS operator state Retriever",
@@ -153,6 +160,7 @@ var requiredFlags = []cli.Flag{
153160
S3BucketNameFlag,
154161
SubgraphApiBatchMetadataAddrFlag,
155162
SubgraphApiOperatorStateAddrFlag,
163+
SubgraphApiPaymentsAddrFlag,
156164
PrometheusServerURLFlag,
157165
PrometheusServerUsernameFlag,
158166
PrometheusServerSecretFlag,

disperser/cmd/dataapi/main.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ func RunDataApi(ctx *cli.Context) error {
9595
var (
9696
reg = prometheus.NewRegistry()
9797
promClient = dataapi.NewPrometheusClient(promApi, config.PrometheusConfig.Cluster)
98-
subgraphApi = subgraph.NewApi(config.SubgraphApiBatchMetadataAddr, config.SubgraphApiOperatorStateAddr)
98+
subgraphApi = subgraph.NewApi(config.SubgraphApiBatchMetadataAddr, config.SubgraphApiOperatorStateAddr, config.SubgraphApiPaymentsAddr)
9999
subgraphClient = dataapi.NewSubgraphClient(subgraphApi, logger)
100100
chainState = coreeth.NewChainState(tx, client)
101101
indexedChainState = thegraph.MakeIndexedChainState(config.ChainStateConfig, chainState, logger)
@@ -108,6 +108,11 @@ func RunDataApi(ctx *cli.Context) error {
108108
Registry: reg,
109109
Backend: blobstorev2.BackendDynamoDB,
110110
})
111+
112+
// Register reservation collector
113+
reservationCollector := serverv2.NewReservationExpirationCollector(subgraphClient, logger)
114+
reg.MustRegister(reservationCollector)
115+
111116
metrics := dataapi.NewMetrics(config.ServerVersion, reg, blobMetadataStorev2, config.MetricsConfig.HTTPPort, logger)
112117
serverv2, err := serverv2.NewServerV2(
113118
dataapi.Config{

disperser/dataapi/Makefile

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,7 @@ generate-swagger-v2:
1515
swag fmt --dir ./v2
1616

1717
generate-swagger: generate-swagger-v1 generate-swagger-v2
18+
19+
run: build
20+
@echo " > Running dataapi..."
21+
cd .. && ./bin/dataapi

disperser/dataapi/metrics.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,6 @@ type MetricsConfig struct {
2525
}
2626

2727
type Metrics struct {
28-
registry *prometheus.Registry
29-
3028
NumRequests *prometheus.CounterVec
3129
CacheHitsTotal *prometheus.CounterVec
3230
Latency *prometheus.SummaryVec
@@ -40,14 +38,15 @@ type Metrics struct {
4038
SemversStakePctQuorum1 *prometheus.GaugeVec
4139
SemversStakePctQuorum2 *prometheus.GaugeVec
4240

41+
registry *prometheus.Registry
4342
httpPort string
4443
logger logging.Logger
4544
}
4645

4746
func NewMetrics(serverVersion uint, reg *prometheus.Registry, blobMetadataStore interface{}, httpPort string, logger logging.Logger) *Metrics {
4847
namespace := "eigenda_dataapi"
4948
if reg == nil {
50-
reg = prometheus.NewRegistry()
49+
panic("registry must not be nil")
5150
}
5251

5352
reg.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}))

disperser/dataapi/server_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"github.com/consensys/gnark-crypto/ecc/bn254/fp"
2929
gethcommon "github.com/ethereum/go-ethereum/common"
3030
"github.com/gin-gonic/gin"
31+
"github.com/prometheus/client_golang/prometheus"
3132
"github.com/prometheus/common/model"
3233
"github.com/stretchr/testify/assert"
3334
"github.com/stretchr/testify/mock"
@@ -53,7 +54,7 @@ var (
5354

5455
serverVersion = uint(1)
5556
mockTx = &coremock.MockWriter{}
56-
metrics = dataapi.NewMetrics(serverVersion, nil, nil, "9001", mockLogger)
57+
metrics = dataapi.NewMetrics(serverVersion, prometheus.NewRegistry(), nil, "9001", mockLogger)
5758
opId0, _ = core.OperatorIDFromHex("e22dae12a0074f20b8fc96a0489376db34075e545ef60c4845d264a732568311")
5859
opId1, _ = core.OperatorIDFromHex("e23cae12a0074f20b8fc96a0489376db34075e545ef60c4845d264b732568312")
5960
mockChainState, _ = coremock.NewChainDataMock(map[uint8]map[core.OperatorID]int{
@@ -73,7 +74,7 @@ var (
7374
})
7475
_ = mockTx.On("GetCurrentBlockNumber").Return(uint32(1), nil)
7576
_ = mockTx.On("GetQuorumCount").Return(uint8(2), nil)
76-
testDataApiServer, _ = dataapi.NewServer(config, blobstore, prometheusClient, subgraphClient, mockTx, mockChainState, mockIndexedChainState, mockLogger, dataapi.NewMetrics(serverVersion, nil, nil, "9001", mockLogger), &MockGRPCConnection{}, nil, nil)
77+
testDataApiServer, _ = dataapi.NewServer(config, blobstore, prometheusClient, subgraphClient, mockTx, mockChainState, mockIndexedChainState, mockLogger, dataapi.NewMetrics(serverVersion, prometheus.NewRegistry(), nil, "9001", mockLogger), &MockGRPCConnection{}, nil, nil)
7778
expectedRequestedAt = uint64(5567830000000000000)
7879
expectedDataLength = 32
7980
expectedBatchId = uint32(99)

disperser/dataapi/subgraph/api.go

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,23 +29,27 @@ type (
2929
QueryOperatorRemovedFromQuorum(ctx context.Context, startBlock, endBlock uint32) ([]*OperatorQuorum, error)
3030
QueryOperatorEjectionsGteBlockTimestampByOperatorId(ctx context.Context, blockTimestamp uint64, operatorId string, first uint, skip uint) ([]*OperatorEjection, error)
3131
QueryOperatorEjectionsGteBlockTimestamp(ctx context.Context, blockTimestamp uint64, first uint, skip uint) ([]*OperatorEjection, error)
32+
QueryReservations(ctx context.Context, currentTimestamp uint64, first, skip int) ([]*Reservation, error)
3233
}
3334

3435
api struct {
3536
uiMonitoringGql *graphql.Client
3637
operatorStateGql *graphql.Client
38+
paymentsGql *graphql.Client
3739
}
3840
)
3941

4042
var _ Api = (*api)(nil)
4143

42-
func NewApi(uiMonitoringSocketAddr string, operatorStateSocketAddr string) *api {
44+
func NewApi(uiMonitoringSocketAddr string, operatorStateSocketAddr string, paymentsSocketAddr string) *api {
4345
once.Do(func() {
4446
uiMonitoringGql := graphql.NewClient(uiMonitoringSocketAddr, nil)
4547
operatorStateGql := graphql.NewClient(operatorStateSocketAddr, nil)
48+
paymentsGql := graphql.NewClient(paymentsSocketAddr, nil)
4649
instance = &api{
4750
uiMonitoringGql: uiMonitoringGql,
4851
operatorStateGql: operatorStateGql,
52+
paymentsGql: paymentsGql,
4953
}
5054
})
5155
return instance
@@ -301,3 +305,17 @@ func (a *api) QueryOperatorRemovedFromQuorum(ctx context.Context, startBlock, en
301305
}
302306
return removedFromQuorums, nil
303307
}
308+
309+
func (a *api) QueryReservations(ctx context.Context, currentTimestamp uint64, first, skip int) ([]*Reservation, error) {
310+
variables := map[string]any{
311+
"currentTimestamp": graphql.Int(currentTimestamp),
312+
"first": graphql.Int(first),
313+
"skip": graphql.Int(skip),
314+
}
315+
result := new(queryReservations)
316+
err := a.paymentsGql.Query(ctx, result, variables)
317+
if err != nil {
318+
return nil, err
319+
}
320+
return result.Reservations, nil
321+
}

disperser/dataapi/subgraph/mock/api.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,3 +206,14 @@ func (m *MockSubgraphApi) QueryOperatorEjectionsGteBlockTimestampByOperatorId(ct
206206

207207
return value, args.Error(1)
208208
}
209+
210+
func (m *MockSubgraphApi) QueryReservations(ctx context.Context, currentTimestamp uint64, first, skip int) ([]*subgraph.Reservation, error) {
211+
args := m.Called()
212+
213+
var value []*subgraph.Reservation
214+
if args.Get(0) != nil {
215+
value = args.Get(0).([]*subgraph.Reservation)
216+
}
217+
218+
return value, args.Error(1)
219+
}

disperser/dataapi/subgraph/queries.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,14 @@ type (
8181
BlockNumber uint32
8282
Metadata *Operator
8383
}
84-
84+
Reservation struct {
85+
Account graphql.String
86+
SymbolsPerSecond graphql.String
87+
QuorumNumbers graphql.String
88+
QuorumSplits graphql.String
89+
StartTimestamp graphql.String
90+
EndTimestamp graphql.String
91+
}
8592
queryBatches struct {
8693
Batches []*Batches `graphql:"batches(orderDirection: $orderDirection, orderBy: $orderBy, first: $first, skip: $skip)"`
8794
}
@@ -118,4 +125,7 @@ type (
118125
queryOperatorEjectedsByOperatorID struct {
119126
OperatorEjections []*OperatorEjection `graphql:"operatorEjecteds(orderBy: blockTimestamp, where: {and: [{blockTimestamp_gte: $blockTimestamp_gte}, {operatorId: $operatorId}]}, first: $first, skip: $skip)"`
120127
}
128+
queryReservations struct {
129+
Reservations []*Reservation `graphql:"reservations(where: {startTimestamp_lte: $currentTimestamp, endTimestamp_gte: $currentTimestamp}, orderBy: startTimestamp, orderDirection: asc, first: $first, skip: $skip)"`
130+
}
121131
)

0 commit comments

Comments
 (0)