Skip to content

Commit 47ec6c9

Browse files
committed
address shouldRespondDelta todo
Signed-off-by: Joe McGuire <[email protected]>
1 parent 72c5f95 commit 47ec6c9

File tree

1 file changed

+27
-29
lines changed
  • internal/kgateway/agentgatewaysyncer/krtxds

1 file changed

+27
-29
lines changed

internal/kgateway/agentgatewaysyncer/krtxds/xds.go

Lines changed: 27 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -470,34 +470,7 @@ func (s *DiscoveryServer) processDeltaRequest(req *discovery.DeltaDiscoveryReque
470470
stype := v3.GetShortType(req.TypeUrl)
471471
log.Debug("ADS: REQ resources", "type", stype, "connection", con.ID(), "subscribe", len(req.ResourceNamesSubscribe), "unsubscribe", len(req.ResourceNamesUnsubscribe), "nonce", req.ResponseNonce)
472472

473-
// TODO: this code should probably be moved to shouldRespondDelta, and pass in the nack handler as a parameter.
474-
// if we need to.
475-
if req.ErrorDetail != nil && s.nackHandler != nil {
476-
gateway := kgwxds.AgentgatewayID(con.node)
477-
nackEvent := nack.NackEvent{
478-
Gateway: gateway,
479-
TypeUrl: req.TypeUrl,
480-
ErrorMsg: req.ErrorDetail.GetMessage(),
481-
Timestamp: time.Now(),
482-
}
483-
s.nackHandler.HandleNack(&nackEvent)
484-
}
485-
// Check for ACK that resolves a previous NACK before processing the request
486-
if req.ErrorDetail == nil && s.nackHandler != nil && req.ResponseNonce != "" {
487-
// Check if there was a previous error for this type
488-
previousInfo := con.proxy.GetWatchedResource(req.TypeUrl)
489-
if previousInfo != nil && previousInfo.LastError != "" {
490-
gateway := kgwxds.AgentgatewayID(con.node)
491-
ackEvent := nack.AckEvent{
492-
Gateway: gateway,
493-
TypeUrl: req.TypeUrl,
494-
Timestamp: time.Now(),
495-
}
496-
s.nackHandler.HandleAck(&ackEvent)
497-
}
498-
}
499-
500-
shouldRespond := shouldRespondDelta(con, req)
473+
shouldRespond := shouldRespondDelta(con, req, s.nackHandler)
501474
if !shouldRespond {
502475
log.Debug("no response needed")
503476
return nil
@@ -522,7 +495,7 @@ func (s *DiscoveryServer) processDeltaRequest(req *discovery.DeltaDiscoveryReque
522495

523496
// shouldRespondDelta determines whether this request needs to be responded back. It applies the ack/nack rules as per xds protocol
524497
// using WatchedResource for previous state and discovery request for the current state.
525-
func shouldRespondDelta(con *Connection, request *discovery.DeltaDiscoveryRequest) bool {
498+
func shouldRespondDelta(con *Connection, request *discovery.DeltaDiscoveryRequest, nackHandler *nack.NackHandler) bool {
526499
stype := v3.GetShortType(request.TypeUrl)
527500

528501
// If there is an error in request that means previous response is erroneous.
@@ -537,6 +510,17 @@ func shouldRespondDelta(con *Connection, request *discovery.DeltaDiscoveryReques
537510
wr.LastError = request.ErrorDetail.GetMessage()
538511
return wr
539512
})
513+
514+
if nackHandler != nil {
515+
gateway := kgwxds.AgentgatewayID(con.node)
516+
nackEvent := nack.NackEvent{
517+
Gateway: gateway,
518+
TypeUrl: request.TypeUrl,
519+
ErrorMsg: request.ErrorDetail.GetMessage(),
520+
Timestamp: time.Now(),
521+
}
522+
nackHandler.HandleNack(&nackEvent)
523+
}
540524
return false
541525
}
542526

@@ -589,11 +573,14 @@ func shouldRespondDelta(con *Connection, request *discovery.DeltaDiscoveryReques
589573

590574
var alwaysRespond bool
591575
var subChanged bool
576+
var hadPreviousError bool
592577

593578
// Update resource names, and record ACK if required.
594579
con.proxy.UpdateWatchedResource(request.TypeUrl, func(wr *model.WatchedResource) *model.WatchedResource {
595580
wr.ResourceNames, _, subChanged = deltaWatchedResources(wr.ResourceNames, request)
596581
if !spontaneousReq {
582+
// Check if there was a previous error before clearing it
583+
hadPreviousError = wr.LastError != ""
597584
// Clear last error, we got an ACK.
598585
// Otherwise, this is just a change in resource subscription, so leave the last ACK info in place.
599586
wr.LastError = ""
@@ -604,6 +591,17 @@ func shouldRespondDelta(con *Connection, request *discovery.DeltaDiscoveryReques
604591
return wr
605592
})
606593

594+
// Handle ACK event if we cleared a previous error
595+
if !spontaneousReq && hadPreviousError && nackHandler != nil && request.ResponseNonce != "" {
596+
gateway := kgwxds.AgentgatewayID(con.node)
597+
ackEvent := nack.AckEvent{
598+
Gateway: gateway,
599+
TypeUrl: request.TypeUrl,
600+
Timestamp: time.Now(),
601+
}
602+
nackHandler.HandleAck(&ackEvent)
603+
}
604+
607605
// It is invalid in the below two cases:
608606
// 1. no subscribed resources change from spontaneous delta request.
609607
// 2. subscribed resources changes from ACK.

0 commit comments

Comments
 (0)