@@ -44,11 +44,15 @@ type blockUpdateConsumer struct {
4444// 1) To establish and keep track of what the head block height of the blockchain is, so event streams know how far from the head they are
4545// 2) To feed new block information to any registered consumers
4646type blockListener struct {
47- ctx context.Context
48- c * ethConnector
49- backend rpcbackend.RPC
50- wsBackend rpcbackend.WebSocketRPCClient // if configured the getting the blockheight will not complete until WS connects, overrides backend once connected
51- listenLoopDone chan struct {}
47+ ctx context.Context
48+ c * ethConnector
49+ backend rpcbackend.RPC
50+ wsBackend rpcbackend.WebSocketRPCClient // if configured the getting the blockheight will not complete until WS connects, overrides backend once connected
51+ listenLoopDone chan struct {}
52+
53+ isStarted bool
54+ startDone chan struct {}
55+
5256 initialBlockHeightObtained chan struct {}
5357 newHeadsTap chan struct {}
5458 newHeadsSub rpcbackend.Subscription
@@ -73,6 +77,8 @@ func newBlockListener(ctx context.Context, c *ethConnector, conf config.Section,
7377 ctx : log .WithLogField (ctx , "role" , "blocklistener" ),
7478 c : c ,
7579 backend : c .backend , // use the HTTP backend - might get overwritten by a connected websocket later
80+ isStarted : false ,
81+ startDone : make (chan struct {}),
7682 initialBlockHeightObtained : make (chan struct {}),
7783 newHeadsTap : make (chan struct {}),
7884 highestBlock : - 1 ,
@@ -92,6 +98,22 @@ func newBlockListener(ctx context.Context, c *ethConnector, conf config.Section,
9298 return bl , nil
9399}
94100
101+ // setting block filter status updates that new block filter has been created
102+ func (bl * blockListener ) markStarted () {
103+ if ! bl .isStarted {
104+ bl .isStarted = true
105+ close (bl .startDone )
106+ }
107+ }
108+
109+ func (bl * blockListener ) waitUntilStarted (ctx context.Context ) {
110+ select {
111+ case <- bl .startDone :
112+ case <- bl .ctx .Done ():
113+ case <- ctx .Done ():
114+ }
115+ }
116+
95117func (bl * blockListener ) newHeadsSubListener () {
96118 for range bl .newHeadsSub .Notifications () {
97119 select {
@@ -106,7 +128,7 @@ func (bl *blockListener) newHeadsSubListener() {
106128// getBlockHeightWithRetry keeps retrying attempting to get the initial block height until successful
107129func (bl * blockListener ) establishBlockHeightWithRetry () error {
108130 wsConnected := false
109- return bl .c .retry .Do (bl .ctx , "get initial block height" , func (attempt int ) (retry bool , err error ) {
131+ return bl .c .retry .Do (bl .ctx , "get initial block height" , func (_ int ) (retry bool , err error ) {
110132
111133 // If we have a WebSocket backend, then we connect it and switch over to using it
112134 // (we accept an un-locked update here to backend, as the most important routine that's
@@ -136,16 +158,18 @@ func (bl *blockListener) establishBlockHeightWithRetry() error {
136158 bl .backend = bl .wsBackend
137159 }
138160
139- // Now get the block heiht
161+ // Now get the block height
140162 var hexBlockHeight ethtypes.HexInteger
141163 rpcErr := bl .backend .CallRPC (bl .ctx , & hexBlockHeight , "eth_blockNumber" )
142164 if rpcErr != nil {
143165 log .L (bl .ctx ).Warnf ("Block height could not be obtained: %s" , rpcErr .Message )
144166 return true , rpcErr .Error ()
145167 }
168+
146169 bl .mux .Lock ()
147170 bl .highestBlock = hexBlockHeight .BigInt ().Int64 ()
148171 bl .mux .Unlock ()
172+
149173 return false , nil
150174 })
151175}
@@ -162,6 +186,7 @@ func (bl *blockListener) listenLoop() {
162186 var filter string
163187 failCount := 0
164188 gapPotential := true
189+ firstIteration := true
165190 for {
166191 if failCount > 0 {
167192 if bl .c .doFailureDelay (bl .ctx , failCount ) {
@@ -170,12 +195,16 @@ func (bl *blockListener) listenLoop() {
170195 }
171196 } else {
172197 // Sleep for the polling interval, or until we're shoulder tapped by the newHeads listener
173- select {
174- case <- time .After (bl .blockPollingInterval ):
175- case <- bl .newHeadsTap :
176- case <- bl .ctx .Done ():
177- log .L (bl .ctx ).Debugf ("Block listener loop stopping" )
178- return
198+ if ! firstIteration {
199+ select {
200+ case <- time .After (bl .blockPollingInterval ):
201+ case <- bl .newHeadsTap :
202+ case <- bl .ctx .Done ():
203+ log .L (bl .ctx ).Debugf ("Block listener loop stopping" )
204+ return
205+ }
206+ } else {
207+ firstIteration = false
179208 }
180209 }
181210
@@ -186,6 +215,7 @@ func (bl *blockListener) listenLoop() {
186215 failCount ++
187216 continue
188217 }
218+ bl .markStarted ()
189219 }
190220
191221 var blockHashes []ethtypes.HexBytes0xPrefix
@@ -374,7 +404,7 @@ func (bl *blockListener) rebuildCanonicalChain() *list.Element {
374404 for {
375405 var bi * blockInfoJSONRPC
376406 var reason ffcapi.ErrorReason
377- err := bl .c .retry .Do (bl .ctx , "rebuild listener canonical chain" , func (attempt int ) (retry bool , err error ) {
407+ err := bl .c .retry .Do (bl .ctx , "rebuild listener canonical chain" , func (_ int ) (retry bool , err error ) {
378408 bi , reason , err = bl .getBlockInfoByNumber (bl .ctx , nextBlockNumber , false , "" )
379409 return reason != ffcapi .ErrorReasonNotFound , err
380410 })
@@ -428,7 +458,7 @@ func (bl *blockListener) trimToLastValidBlock() (lastValidBlock *minimalBlockInf
428458 currentViewBlock := lastElem .Value .(* minimalBlockInfo )
429459 var freshBlockInfo * blockInfoJSONRPC
430460 var reason ffcapi.ErrorReason
431- err := bl .c .retry .Do (bl .ctx , "rebuild listener canonical chain" , func (attempt int ) (retry bool , err error ) {
461+ err := bl .c .retry .Do (bl .ctx , "rebuild listener canonical chain" , func (_ int ) (retry bool , err error ) {
432462 freshBlockInfo , reason , err = bl .getBlockInfoByNumber (bl .ctx , currentViewBlock .number , false , "" )
433463 return reason != ffcapi .ErrorReasonNotFound , err
434464 })
@@ -471,23 +501,28 @@ func (bl *blockListener) dispatchToConsumers(consumers []*blockUpdateConsumer, u
471501 }
472502}
473503
474- func (bl * blockListener ) checkStartedLocked () {
504+ func (bl * blockListener ) checkAndStartListenerLoop () {
505+ bl .mux .Lock ()
506+ defer bl .mux .Unlock ()
475507 if bl .listenLoopDone == nil {
476508 bl .listenLoopDone = make (chan struct {})
477509 go bl .listenLoop ()
478510 }
479511}
480512
481- func (bl * blockListener ) addConsumer (c * blockUpdateConsumer ) {
513+ func (bl * blockListener ) addConsumer (ctx context.Context , c * blockUpdateConsumer ) {
514+ bl .checkAndStartListenerLoop ()
515+ bl .waitUntilStarted (ctx ) // need to make sure the listener is started before adding any consumers
482516 bl .mux .Lock ()
483517 defer bl .mux .Unlock ()
484- bl .checkStartedLocked ()
485518 bl .consumers [* c .id ] = c
486519}
487520
488521func (bl * blockListener ) getHighestBlock (ctx context.Context ) (int64 , bool ) {
522+ bl .checkAndStartListenerLoop ()
523+ // block height will be established as the first step of listener startup process
524+ // so we don't need to wait for the entire startup process to finish to return the result
489525 bl .mux .Lock ()
490- bl .checkStartedLocked ()
491526 highestBlock := bl .highestBlock
492527 bl .mux .Unlock ()
493528 // if not yet initialized, wait to be initialized
@@ -515,6 +550,9 @@ func (bl *blockListener) waitClosed() {
515550 bl .wsBackend .Close ()
516551 }
517552 if listenLoopDone != nil {
518- <- listenLoopDone
553+ select {
554+ case <- listenLoopDone :
555+ case <- bl .ctx .Done ():
556+ }
519557 }
520558}
0 commit comments