Skip to content

Commit 925df2c

Browse files
authored
[Handler] Integrate the watcher's cache into request handler (#78)
### Description This PR integrates the cache into the handler in order to resolve the request target based on the new cache. ### Changes Made - Add the `IngressHostCacheReader` (the cache read-only access) to the handler - Add the logic for the new resolving (i.e. - try to resolve based on the cache, and if failed - resolve by the existing headers' logic) ### Testing - Added a flow test - for the entire request handling - Added unit tests for the resolving per case (i.e. - based on headers and based on ingress) ### References - Jira ticket link: https://iguazio.atlassian.net/browse/NUC-523 - External link: this PR is based on this branch - #76 ### Additional Notes - I left a **TODO** question inside the test file — it seems that when parsing the `targetURL`, the handler appends the path twice. This fix is relatively minor and has been tested manually, but since the current code still works, it's worth deciding whether we want to address it now.
1 parent 59f2bc3 commit 925df2c

File tree

5 files changed

+535
-166
lines changed

5 files changed

+535
-166
lines changed

pkg/dlx/dlx.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,8 @@ func NewDLX(parentLogger logger.Logger,
7171
options.TargetNameHeader,
7272
options.TargetPathHeader,
7373
options.TargetPort,
74-
options.MultiTargetStrategy)
74+
options.MultiTargetStrategy,
75+
watcher.GetIngressHostCacheReader())
7576
if err != nil {
7677
return nil, errors.Wrap(err, "Failed to create handler")
7778
}

pkg/dlx/handler.go

Lines changed: 61 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"sync"
3131
"time"
3232

33+
"github.com/v3io/scaler/pkg/ingresscache"
3334
"github.com/v3io/scaler/pkg/scalertypes"
3435

3536
"github.com/nuclio/errors"
@@ -49,6 +50,7 @@ type Handler struct {
4950
targetURLCache *cache.LRUExpireCache
5051
proxyLock sync.Locker
5152
lastProxyErrorTime time.Time
53+
ingressCache ingresscache.IngressHostCacheReader
5254
}
5355

5456
func NewHandler(parentLogger logger.Logger,
@@ -57,7 +59,8 @@ func NewHandler(parentLogger logger.Logger,
5759
targetNameHeader string,
5860
targetPathHeader string,
5961
targetPort int,
60-
multiTargetStrategy scalertypes.MultiTargetStrategy) (Handler, error) {
62+
multiTargetStrategy scalertypes.MultiTargetStrategy,
63+
ingressCache ingresscache.IngressHostCacheReader) (Handler, error) {
6164
h := Handler{
6265
logger: parentLogger.GetChild("handler"),
6366
resourceStarter: resourceStarter,
@@ -69,17 +72,17 @@ func NewHandler(parentLogger logger.Logger,
6972
targetURLCache: cache.NewLRUExpireCache(100),
7073
proxyLock: &sync.Mutex{},
7174
lastProxyErrorTime: time.Now(),
75+
ingressCache: ingressCache,
7276
}
7377
h.HandleFunc = h.handleRequest
7478
return h, nil
7579
}
7680

7781
func (h *Handler) handleRequest(res http.ResponseWriter, req *http.Request) {
82+
var path string
83+
var err error
7884
var resourceNames []string
7985

80-
responseChannel := make(chan ResourceStatusResult, 1)
81-
defer close(responseChannel)
82-
8386
// first try to see if our request came from ingress controller
8487
forwardedHost := req.Header.Get("X-Forwarded-Host")
8588
forwardedPort := req.Header.Get("X-Forwarded-Port")
@@ -97,15 +100,15 @@ func (h *Handler) handleRequest(res http.ResponseWriter, req *http.Request) {
97100
resourceNames = append(resourceNames, resourceName)
98101
resourceTargetURLMap[resourceName] = targetURL
99102
} else {
100-
targetNameHeaderValue := req.Header.Get(h.targetNameHeader)
101-
path := req.Header.Get(h.targetPathHeader)
102-
if targetNameHeaderValue == "" {
103-
h.logger.WarnWith("When ingress not set, must pass header value",
104-
"missingHeader", h.targetNameHeader)
103+
path, resourceNames, err = h.getPathAndResourceNames(req)
104+
if err != nil {
105+
h.logger.WarnWith("Failed to get resource names and path from request",
106+
"error", err.Error(),
107+
"host", req.Host,
108+
"path", h.getRequestURLPath(req))
105109
res.WriteHeader(http.StatusBadRequest)
106110
return
107111
}
108-
resourceNames = strings.Split(targetNameHeaderValue, ",")
109112
for _, resourceName := range resourceNames {
110113
targetURL, status := h.parseTargetURL(resourceName, path)
111114
if targetURL == nil {
@@ -163,6 +166,43 @@ func (h *Handler) handleRequest(res http.ResponseWriter, req *http.Request) {
163166
proxy.ServeHTTP(res, req)
164167
}
165168

169+
func (h *Handler) getPathAndResourceNames(req *http.Request) (string, []string, error) {
170+
// first try to get the resource names and path from the ingress cache
171+
path, resourceNames, err := h.getValuesFromCache(req)
172+
if err == nil {
173+
return path, resourceNames, nil
174+
}
175+
176+
h.logger.DebugWith("Failed to get resource names from ingress cache, trying to extract from the request headers",
177+
"host", req.Host,
178+
"path", h.getRequestURLPath(req),
179+
"error", err.Error())
180+
181+
// old implementation for backward compatibility
182+
targetNameHeaderValue := req.Header.Get(h.targetNameHeader)
183+
path = req.Header.Get(h.targetPathHeader)
184+
if targetNameHeaderValue == "" {
185+
return "", nil, errors.New("No target name header found")
186+
}
187+
resourceNames = strings.Split(targetNameHeaderValue, ",")
188+
return path, resourceNames, nil
189+
}
190+
191+
func (h *Handler) getValuesFromCache(req *http.Request) (string, []string, error) {
192+
host := req.Host
193+
path := h.getRequestURLPath(req)
194+
resourceNames, err := h.ingressCache.Get(host, path)
195+
if err != nil {
196+
return "", nil, errors.New("Failed to get resource names from ingress cache")
197+
}
198+
199+
if len(resourceNames) == 0 {
200+
return "", nil, errors.New("No resources found in ingress cache")
201+
}
202+
203+
return path, resourceNames, nil
204+
}
205+
166206
func (h *Handler) parseTargetURL(resourceName, path string) (*url.URL, int) {
167207
serviceName, err := h.resourceScaler.ResolveServiceName(scalertypes.Resource{Name: resourceName})
168208
if err != nil {
@@ -178,17 +218,17 @@ func (h *Handler) parseTargetURL(resourceName, path string) (*url.URL, int) {
178218
}
179219

180220
func (h *Handler) startResources(resourceNames []string) *ResourceStatusResult {
181-
responseChannel := make(chan ResourceStatusResult, len(resourceNames))
182-
defer close(responseChannel)
221+
responseChan := make(chan ResourceStatusResult, len(resourceNames))
222+
defer close(responseChan)
183223

184224
// Start all resources in separate go routines
185225
for _, resourceName := range resourceNames {
186-
go h.resourceStarter.handleResourceStart(resourceName, responseChannel)
226+
go h.resourceStarter.handleResourceStart(resourceName, responseChan)
187227
}
188228

189229
// Wait for all resources to finish starting
190230
for range resourceNames {
191-
statusResult := <-responseChannel
231+
statusResult := <-responseChan
192232

193233
if statusResult.Error != nil {
194234
h.logger.WarnWith("Failed to start resource",
@@ -230,3 +270,10 @@ func (h *Handler) URLBadParse(resourceName string, err error) int {
230270
"err", errors.GetErrorStackString(err, 10))
231271
return http.StatusBadRequest
232272
}
273+
274+
func (h *Handler) getRequestURLPath(req *http.Request) string {
275+
if req.URL != nil {
276+
return req.URL.Path
277+
}
278+
return ""
279+
}

0 commit comments

Comments
 (0)