-
Notifications
You must be signed in to change notification settings - Fork 917
[azidentity] Implement support for custom token endpoint mode in WorkloadIdentityCredential with CA data support #25057
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 10 commits
94a696b
c857416
153a718
b693895
aa168cc
6ad38c9
7153143
f7ac5a7
0642c65
0ec7258
67144d7
2997901
bc8788d
9e50159
d4c9718
bdac69f
69166e1
e788d20
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,9 +7,16 @@ | |
package azidentity | ||
|
||
import ( | ||
"bytes" | ||
"context" | ||
"crypto/tls" | ||
"crypto/x509" | ||
"errors" | ||
"fmt" | ||
"net/http" | ||
"net/url" | ||
"os" | ||
"strings" | ||
"sync" | ||
"time" | ||
|
||
|
@@ -87,14 +94,22 @@ func NewWorkloadIdentityCredential(options *WorkloadIdentityCredentialOptions) ( | |
return nil, errors.New("no tenant ID specified. Check pod configuration or set TenantID in the options") | ||
} | ||
} | ||
|
||
w := WorkloadIdentityCredential{file: file, mtx: &sync.RWMutex{}} | ||
caco := ClientAssertionCredentialOptions{ | ||
caco := &ClientAssertionCredentialOptions{ | ||
AdditionallyAllowedTenants: options.AdditionallyAllowedTenants, | ||
Cache: options.Cache, | ||
ClientOptions: options.ClientOptions, | ||
DisableInstanceDiscovery: options.DisableInstanceDiscovery, | ||
} | ||
cred, err := NewClientAssertionCredential(tenantID, clientID, w.getAssertion, &caco) | ||
|
||
// configure identity binding if environment variables are present. | ||
// In identity binding enabled mode, a dedicated transport will be used for proxying token requests to a dedicated endpoint. | ||
if err := w.configureIdentityBinding(caco); err != nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The SDK should not mention any AKS specific things. Identity bindings is an AKS specific API. See #24442 (review) for my last review in this repo. |
||
return nil, err | ||
} | ||
|
||
cred, err := NewClientAssertionCredential(tenantID, clientID, w.getAssertion, caco) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
@@ -139,3 +154,205 @@ func (w *WorkloadIdentityCredential) getAssertion(context.Context) (string, erro | |
} | ||
return w.assertion, nil | ||
} | ||
|
||
// configureIdentityBinding configures identity binding mode if the required environment variables are present | ||
func (w *WorkloadIdentityCredential) configureIdentityBinding(caco *ClientAssertionCredentialOptions) error { | ||
bcho marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// check for identity binding mode environment variables | ||
kubernetesTokenEndpointStr := os.Getenv(azureKubernetesTokenEndpoint) | ||
kubernetesSNIName := os.Getenv(azureKubernetesSNIName) | ||
kubernetesCAFile := os.Getenv(azureKubernetesCAFile) | ||
|
||
if kubernetesTokenEndpointStr == "" && kubernetesSNIName == "" && kubernetesCAFile == "" { | ||
// identity binding is not set | ||
return nil | ||
} | ||
|
||
// All three variables must be present for identity binding mode | ||
if kubernetesTokenEndpointStr == "" || kubernetesSNIName == "" || kubernetesCAFile == "" { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Only
bcho marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return errors.New("identity binding mode requires all three environment variables: AZURE_KUBERNETES_TOKEN_ENDPOINT, AZURE_KUBERNETES_SNI_NAME, and AZURE_KUBERNETES_CA_FILE") | ||
} | ||
|
||
transporter, err := newIdentityBindingTransport( | ||
bcho marked this conversation as resolved.
Show resolved
Hide resolved
|
||
kubernetesCAFile, kubernetesSNIName, kubernetesTokenEndpointStr, | ||
caco.Transport, | ||
) | ||
if err != nil { | ||
return err | ||
} | ||
caco.Transport = transporter | ||
return nil | ||
} | ||
|
||
const ( | ||
tokenEndpointSuffix = "/oauth2/v2.0/token" | ||
caReloadInterval = 10 * time.Minute | ||
bcho marked this conversation as resolved.
Show resolved
Hide resolved
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should only be hitting the token endpoint when we need a new token, so why do we need caching at this layer at all? |
||
) | ||
|
||
// identityBindingTransport is a custom HTTP transport that redirects token requests | ||
// to the Kubernetes token endpoint when in identity binding mode | ||
type identityBindingTransport struct { | ||
bcho marked this conversation as resolved.
Show resolved
Hide resolved
|
||
caFile string | ||
bcho marked this conversation as resolved.
Show resolved
Hide resolved
|
||
sniName string | ||
tokenEndpoint *url.URL | ||
fallbackTransporter policy.Transporter | ||
|
||
mtx *sync.RWMutex | ||
|
||
nextRead time.Time | ||
currentCA []byte | ||
transport *http.Transport | ||
} | ||
|
||
func newIdentityBindingTransport( | ||
bcho marked this conversation as resolved.
Show resolved
Hide resolved
|
||
caFile, sniName, tokenEndpointStr string, | ||
fallbackTransporter policy.Transporter, | ||
) (*identityBindingTransport, error) { | ||
tokenEndpoint, err := url.Parse(tokenEndpointStr) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Require this to use |
||
if err != nil { | ||
return nil, fmt.Errorf("failed to parse token endpoint URL %q: %w", tokenEndpointStr, err) | ||
} | ||
bcho marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
if fallbackTransporter == nil { | ||
// FIXME: can we callback to the defaultHTTPClient from azcore/runtime? | ||
fallbackTransporter = http.DefaultClient | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah this looks wrong to me. |
||
} | ||
|
||
initialTransport := func() *http.Transport { | ||
// try reusing the user provided transport if available | ||
if httpClient, ok := fallbackTransporter.(*http.Client); ok { | ||
if transport, ok := httpClient.Transport.(*http.Transport); ok { | ||
return transport.Clone() | ||
} | ||
} | ||
|
||
// if the user did not provide a policy.Transporter or it's not a *http.Client, | ||
// we fall back to the default one. | ||
// FIXME: can we callback to the defaultHTTPClient from azcore/runtime? | ||
if transport, ok := http.DefaultTransport.(*http.Transport); ok { | ||
return transport.Clone() | ||
} | ||
|
||
// this should not happen, but if the user mutates the net/http.DefaultTransport | ||
// to something else, we fall back to a sane default | ||
return &http.Transport{ | ||
Comment on lines
+207
to
+209
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Require the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't see a good way to declare such thing because
WDYT? |
||
ForceAttemptHTTP2: true, | ||
MaxIdleConns: 100, | ||
IdleConnTimeout: 90 * time.Second, | ||
TLSHandshakeTimeout: 10 * time.Second, | ||
} | ||
}() | ||
|
||
tr := &identityBindingTransport{ | ||
caFile: caFile, | ||
sniName: sniName, | ||
tokenEndpoint: tokenEndpoint, | ||
fallbackTransporter: fallbackTransporter, | ||
mtx: &sync.RWMutex{}, | ||
transport: initialTransport, | ||
} | ||
|
||
// perform an initial load to surface any issues with the CA file and transport settings. | ||
// Lock is not held here as this is called in the constructor | ||
if err := tr.reloadCA(); err != nil { | ||
return nil, err | ||
} | ||
|
||
return tr, nil | ||
} | ||
|
||
func (i *identityBindingTransport) Do(req *http.Request) (*http.Response, error) { | ||
if !strings.HasSuffix(req.URL.Path, tokenEndpointSuffix) { | ||
// not a token request, fallback to the original transporter | ||
return i.fallbackTransporter.Do(req) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the client does discovery and figures out the actual token endpoint at runtime, so to me it would be safer to check for that exact value instead of hard coding something like this. Entra is free to change its token endpoint at any time. |
||
} | ||
|
||
tr, err := i.getTokenTransporter() | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
newReq := req.Clone(req.Context()) | ||
newReq.URL.Scheme = i.tokenEndpoint.Scheme // this will always be https | ||
newReq.URL.Host = i.tokenEndpoint.Host | ||
newReq.URL.Path = "" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use the path from the token endpoint as well.
bcho marked this conversation as resolved.
Show resolved
Hide resolved
|
||
newReq.Host = i.tokenEndpoint.Host | ||
|
||
return tr.RoundTrip(newReq) | ||
} | ||
|
||
func (i *identityBindingTransport) getTokenTransporter() (*http.Transport, error) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would expect this to just make a new transport every time it is called with the assumption that that will only happen when the AD access token expires. |
||
i.mtx.RLock() | ||
if i.nextRead.Before(time.Now()) { | ||
i.mtx.RUnlock() | ||
i.mtx.Lock() | ||
defer i.mtx.Unlock() | ||
// double check on the read time | ||
if now := time.Now(); i.nextRead.Before(now) { | ||
if err := i.reloadCA(); err != nil { | ||
// we return error if any attempt of reloading CA fails | ||
// This should surface in the token calls and we expect the caller to | ||
// have proper error handling / rate limit so we don't fall into deadloop here | ||
// due to scenario like broken CA file. | ||
return nil, err | ||
} | ||
} | ||
} else { | ||
defer i.mtx.RUnlock() | ||
} | ||
return i.transport, nil | ||
} | ||
|
||
func (i *identityBindingTransport) createTransportWithCAPool( | ||
fromTransport *http.Transport, | ||
caPool *x509.CertPool, | ||
) *http.Transport { | ||
transport := fromTransport.Clone() | ||
if transport.TLSClientConfig == nil { | ||
transport.TLSClientConfig = &tls.Config{} | ||
} | ||
transport.TLSClientConfig.ServerName = i.sniName | ||
transport.TLSClientConfig.RootCAs = caPool | ||
return transport | ||
} | ||
|
||
// reloadCA attempts to read the latest CA from the CA file and updates the transport if the content has changed. | ||
// If a new CA is discovered, the existing transport will be replaced with a new one that uses the new CA. | ||
// It expects the caller to hold the write lock on i.mtx to ensure thread safety. | ||
func (i *identityBindingTransport) reloadCA() error { | ||
newCA, err := os.ReadFile(i.caFile) | ||
if err != nil { | ||
return fmt.Errorf("read CA file %q: %w", i.caFile, err) | ||
} | ||
|
||
if len(newCA) == 0 { | ||
// the CA file might be in the middle of rotation without the content written. | ||
// We return nil and rely on next check. | ||
return nil | ||
} | ||
|
||
if bytes.Equal(i.currentCA, newCA) { | ||
// no change in CA content, no need to replace | ||
i.nextRead = time.Now().Add(caReloadInterval) | ||
return nil | ||
} | ||
|
||
newCAPool := x509.NewCertPool() | ||
if !newCAPool.AppendCertsFromPEM(newCA) { | ||
return fmt.Errorf("parse CA file %q: no valid certificates found", i.caFile) | ||
} | ||
|
||
newTransport := i.createTransportWithCAPool(i.transport, newCAPool) | ||
oldTransport := i.transport | ||
|
||
i.transport = newTransport | ||
i.currentCA = newCA | ||
i.nextRead = time.Now().Add(caReloadInterval) | ||
|
||
if oldTransport != nil { | ||
// drop any idle connections from previous transport so new requests can be | ||
// moved to the new transport | ||
oldTransport.CloseIdleConnections() | ||
} | ||
|
||
return nil | ||
} |
Uh oh!
There was an error while loading. Please reload this page.