Skip to content
Merged
21 changes: 21 additions & 0 deletions src/Confluent.SchemaRegistry/AuthCredentialsSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,25 @@ public enum AuthCredentialsSource
/// </summary>
SaslInherit
}

/// <summary>
/// Bearer auth credentials source.
/// </summary>
public enum BearerAuthCredentialsSource
{
/// <summary>
/// Credentials are specified via the `schema.registry.bearer.auth.token` config property.
/// </summary>
StaticToken,

/// <summary>
/// Credentials are specified via the `schema.registry.oauthbearer.auth.credentials.source` config property.
/// </summary>
OAuthBearer,

/// <summary>
/// User provides a custom implementation of IAuthenticationHeaderValueProvider.
/// </summary>
Custom
}
}
117 changes: 102 additions & 15 deletions src/Confluent.SchemaRegistry/CachedSchemaRegistryClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
using System.Threading.Tasks;
using System.Linq;
using System;
using System.Net.Http;
using System.Collections.Concurrent;
using System.Net;
using System.Threading;
Expand Down Expand Up @@ -329,6 +330,12 @@ public CachedSchemaRegistryClient(IEnumerable<KeyValuePair<string, string>> conf

username = userPass[0];
password = userPass[1];
if (authenticationHeaderValueProvider != null)
{
throw new ArgumentException(
$"Invalid authentication header value provider configuration: Cannot specify both custom provider and username/password");
}
authenticationHeaderValueProvider = new BasicAuthenticationHeaderValueProvider(username, password);
}
}
else if (basicAuthSource == "SASL_INHERIT")
Expand All @@ -355,40 +362,112 @@ public CachedSchemaRegistryClient(IEnumerable<KeyValuePair<string, string>> conf

username = saslUsername.Value;
password = saslPassword.Value;
if (authenticationHeaderValueProvider != null)
{
throw new ArgumentException(
$"Invalid authentication header value provider configuration: Cannot specify both custom provider and username/password");
}
authenticationHeaderValueProvider = new BasicAuthenticationHeaderValueProvider(username, password);
}
else
{
throw new ArgumentException(
$"Invalid value '{basicAuthSource}' specified for property '{SchemaRegistryConfig.PropertyNames.SchemaRegistryBasicAuthCredentialsSource}'");
}

if (authenticationHeaderValueProvider != null)
var bearerAuthSource = config.FirstOrDefault(prop =>
prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthCredentialsSource).Value ?? "";

if (bearerAuthSource != "" && basicAuthSource != "")
{
if (username != null || password != null)
{
throw new ArgumentException(
$"Invalid authentication header value provider configuration: Cannot specify both custom provider and username/password");
}
throw new ArgumentException(
$"Invalid authentication header value provider configuration: Cannot specify both basic and bearer authentication");
}
else

string logicalCluster = null;
string identityPoolId = null;
string bearerToken = null;
string clientId = null;
string clientSecret = null;
string scope = null;
string tokenEndpointUrl = null;

if (bearerAuthSource == "STATIC_TOKEN" || bearerAuthSource == "OAUTHBEARER")
{
if (username != null && password == null)
if (authenticationHeaderValueProvider != null)
{
throw new ArgumentException(
$"Invalid authentication header value provider configuration: Basic authentication username specified, but password not specified");
$"Invalid authentication header value provider configuration: Cannot specify both custom provider and bearer authentication");
}
logicalCluster = config.FirstOrDefault(prop =>
prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthLogicalCluster).Value;

if (username == null && password != null)
identityPoolId = config.FirstOrDefault(prop =>
prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthIdentityPoolId).Value;
if (logicalCluster == null || identityPoolId == null)
{
throw new ArgumentException(
$"Invalid authentication header value provider configuration: Basic authentication password specified, but username not specified");
}
else if (username != null && password != null)
{
authenticationHeaderValueProvider = new BasicAuthenticationHeaderValueProvider(username, password);
$"Invalid bearer authentication provider configuration: Logical cluster and identity pool ID must be specified");
}
}

switch (bearerAuthSource)
{
case "STATIC_TOKEN":
bearerToken = config.FirstOrDefault(prop =>
prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthToken).Value;

if (bearerToken == null)
{
throw new ArgumentException(
$"Invalid authentication header value provider configuration: Bearer authentication token not specified");
}
authenticationHeaderValueProvider = new StaticBearerAuthenticationHeaderValueProvider(bearerToken, logicalCluster, identityPoolId);
break;

case "OAUTHBEARER":
clientId = config.FirstOrDefault(prop =>
prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthClientId).Value;

clientSecret = config.FirstOrDefault(prop =>
prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthClientSecret).Value;

scope = config.FirstOrDefault(prop =>
prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthScope).Value;

tokenEndpointUrl = config.FirstOrDefault(prop =>
prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthTokenEndpointUrl).Value;

if (tokenEndpointUrl == null || clientId == null || clientSecret == null || scope == null)
{
throw new ArgumentException(
$"Invalid bearer authentication provider configuration: Token endpoint URL, client ID, client secret, and scope must be specified");
}
authenticationHeaderValueProvider = new BearerAuthenticationHeaderValueProvider(
new HttpClient(), clientId, clientSecret, scope, tokenEndpointUrl, logicalCluster, identityPoolId, maxRetries, retriesWaitMs, retriesMaxWaitMs);
break;

case "CUSTOM":
if (authenticationHeaderValueProvider == null)
{
throw new ArgumentException(
$"Invalid authentication header value provider configuration: Custom authentication provider must be specified");
}
if(!(authenticationHeaderValueProvider is IAuthenticationBearerHeaderValueProvider))
{
throw new ArgumentException(
$"Invalid authentication header value provider configuration: Custom authentication provider must implement IAuthenticationBearerHeaderValueProvider");
}
break;

case "":
break;

default:
throw new ArgumentException(
$"Invalid value '{bearerAuthSource}' specified for property '{SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthCredentialsSource}'");
}

foreach (var property in config)
{
if (!property.Key.StartsWith("schema.registry."))
Expand All @@ -405,6 +484,14 @@ public CachedSchemaRegistryClient(IEnumerable<KeyValuePair<string, string>> conf
property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryLatestCacheTtlSecs &&
property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryBasicAuthCredentialsSource &&
property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryBasicAuthUserInfo &&
property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthCredentialsSource &&
property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthToken &&
property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthClientId &&
property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthClientSecret &&
property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthScope &&
property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthTokenEndpointUrl &&
property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthLogicalCluster &&
property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryBearerAuthIdentityPoolId &&
property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryKeySubjectNameStrategy &&
property.Key != SchemaRegistryConfig.PropertyNames.SchemaRegistryValueSubjectNameStrategy &&
property.Key != SchemaRegistryConfig.PropertyNames.SslCaLocation &&
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
using System;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Threading.Tasks;
using System.Collections.Generic;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;

namespace Confluent.SchemaRegistry
{
class BearerToken
{
[JsonProperty("access_token")]
public string AccessToken { get; set; }
[JsonProperty("token_type")]
public string TokenType { get; set; }
[JsonProperty("expires_in")]
public int ExpiresIn { get; set; }
[JsonProperty("scope")]
public string Scope { get; set; }
[JsonIgnore]
public double ExpiryTime { get; set; }
}

public class BearerAuthenticationHeaderValueProvider : IAuthenticationBearerHeaderValueProvider, IDisposable
{
private readonly string clientId;
private readonly string clientSecret;
private readonly string scope;
private readonly string tokenEndpoint;
private readonly string logicalCluster;
private readonly string identityPool;
private readonly int maxRetries;
private readonly int retriesWaitMs;
private readonly int retriesMaxWaitMs;
private readonly HttpClient httpClient;
private volatile BearerToken token;
private const float tokenExpiryThreshold = 0.8f;

public BearerAuthenticationHeaderValueProvider(
HttpClient httpClient,
string clientId,
string clientSecret,
string scope,
string tokenEndpoint,
string logicalCluster,
string identityPool,
int maxRetries,
int retriesWaitMs,
int retriesMaxWaitMs)
{
this.httpClient = httpClient;
this.clientId = clientId;
this.clientSecret = clientSecret;
this.scope = scope;
this.tokenEndpoint = tokenEndpoint;
this.logicalCluster = logicalCluster;
this.identityPool = identityPool;
this.maxRetries = maxRetries;
this.retriesWaitMs = retriesWaitMs;
this.retriesMaxWaitMs = retriesMaxWaitMs;
}

public async Task InitOrRefreshAsync()
{
await GenerateToken();
}

public bool NeedsInitOrRefresh()
{
return token == null || DateTimeOffset.UtcNow.ToUnixTimeSeconds() >= token.ExpiryTime;
}

private HttpRequestMessage CreateTokenRequest()
{
HttpRequestMessage request = new HttpRequestMessage(HttpMethod.Post, tokenEndpoint);

request.Content = new FormUrlEncodedContent(new[]
{
new KeyValuePair<string, string>("grant_type", "client_credentials"),
new KeyValuePair<string, string>("client_id", clientId),
new KeyValuePair<string, string>("client_secret", clientSecret),
new KeyValuePair<string, string>("scope", scope)
});

return request;
}

private async Task GenerateToken()
{
var request = CreateTokenRequest();

for (int i = 0; i < maxRetries + 1; i++){
try
{
var response = await httpClient.SendAsync(request).ConfigureAwait(continueOnCapturedContext: false);
response.EnsureSuccessStatusCode();
var tokenResponse = await response.Content.ReadAsStringAsync();
token = JObject.Parse(tokenResponse).ToObject<BearerToken>(JsonSerializer.Create());
token.ExpiryTime = DateTimeOffset.UtcNow.ToUnixTimeSeconds() + (int)(token.ExpiresIn * tokenExpiryThreshold);
return;
}
catch (Exception e)
{
if (i == maxRetries)
{
throw new Exception("Failed to fetch token from server: " + e.Message);
}
await Task.Delay(RetryUtility.CalculateRetryDelay(retriesWaitMs, retriesMaxWaitMs, i));
}
}
}

public AuthenticationHeaderValue GetAuthenticationHeader()
{
if (this.token == null)
{
throw new InvalidOperationException("Token not initialized");
}

return new AuthenticationHeaderValue("Bearer", this.token.AccessToken);
}

public string GetLogicalCluster() => this.logicalCluster;

public string GetIdentityPool() => this.identityPool;

public void Dispose()
{
this.token = null;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright 2025 Confluent Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Refer to LICENSE for more information.

using System.Net.Http.Headers;
using System.Threading.Tasks;
namespace Confluent.SchemaRegistry
{
/// <summary>
/// An interface defining HTTP client authentication header values.
/// </summary>
public interface IAuthenticationBearerHeaderValueProvider : IAuthenticationHeaderValueProvider
{
public Task InitOrRefreshAsync();

public bool NeedsInitOrRefresh();

/// <summary>
/// Get the authentication header for HTTP requests
/// </summary>
/// <returns>
/// The authentication header for HTTP request messages
/// </returns>
///

AuthenticationHeaderValue GetAuthenticationHeader();

/// <summary>
/// Get the logical cluster for HTTP requests
/// </summary>
/// <returns>
/// The logical cluster for HTTP request messages
/// </returns>
string GetLogicalCluster();
/// <summary>
/// Get the identity pool for HTTP requests
/// </summary>
/// <returns>
/// The identity pool for HTTP request messages
/// </returns>
string GetIdentityPool();
}
}
Loading