2323using System . Threading . Tasks ;
2424using System . Linq ;
2525using System ;
26+ using System . Net . Http ;
2627using System . Collections . Concurrent ;
2728using System . Net ;
2829using System . Threading ;
@@ -329,6 +330,12 @@ public CachedSchemaRegistryClient(IEnumerable<KeyValuePair<string, string>> conf
329330
330331 username = userPass [ 0 ] ;
331332 password = userPass [ 1 ] ;
333+ if ( authenticationHeaderValueProvider != null )
334+ {
335+ throw new ArgumentException (
336+ $ "Invalid authentication header value provider configuration: Cannot specify both custom provider and username/password") ;
337+ }
338+ authenticationHeaderValueProvider = new BasicAuthenticationHeaderValueProvider ( username , password ) ;
332339 }
333340 }
334341 else if ( basicAuthSource == "SASL_INHERIT" )
@@ -355,40 +362,112 @@ public CachedSchemaRegistryClient(IEnumerable<KeyValuePair<string, string>> conf
355362
356363 username = saslUsername . Value ;
357364 password = saslPassword . Value ;
365+ if ( authenticationHeaderValueProvider != null )
366+ {
367+ throw new ArgumentException (
368+ $ "Invalid authentication header value provider configuration: Cannot specify both custom provider and username/password") ;
369+ }
370+ authenticationHeaderValueProvider = new BasicAuthenticationHeaderValueProvider ( username , password ) ;
358371 }
359372 else
360373 {
361374 throw new ArgumentException (
362375 $ "Invalid value '{ basicAuthSource } ' specified for property '{ SchemaRegistryConfig . PropertyNames . SchemaRegistryBasicAuthCredentialsSource } '") ;
363376 }
364377
365- if ( authenticationHeaderValueProvider != null )
378+ var bearerAuthSource = config . FirstOrDefault ( prop =>
379+ prop . Key . ToLower ( ) == SchemaRegistryConfig . PropertyNames . SchemaRegistryBearerAuthCredentialsSource ) . Value ?? "" ;
380+
381+ if ( bearerAuthSource != "" && basicAuthSource != "" )
366382 {
367- if ( username != null || password != null )
368- {
369- throw new ArgumentException (
370- $ "Invalid authentication header value provider configuration: Cannot specify both custom provider and username/password") ;
371- }
383+ throw new ArgumentException (
384+ $ "Invalid authentication header value provider configuration: Cannot specify both basic and bearer authentication") ;
372385 }
373- else
386+
387+ string logicalCluster = null ;
388+ string identityPoolId = null ;
389+ string bearerToken = null ;
390+ string clientId = null ;
391+ string clientSecret = null ;
392+ string scope = null ;
393+ string tokenEndpointUrl = null ;
394+
395+ if ( bearerAuthSource == "STATIC_TOKEN" || bearerAuthSource == "OAUTHBEARER" )
374396 {
375- if ( username != null && password = = null )
397+ if ( authenticationHeaderValueProvider ! = null )
376398 {
377399 throw new ArgumentException (
378- $ "Invalid authentication header value provider configuration: Basic authentication username specified, but password not specified ") ;
400+ $ "Invalid authentication header value provider configuration: Cannot specify both custom provider and bearer authentication ") ;
379401 }
402+ logicalCluster = config . FirstOrDefault ( prop =>
403+ prop . Key . ToLower ( ) == SchemaRegistryConfig . PropertyNames . SchemaRegistryBearerAuthLogicalCluster ) . Value ;
380404
381- if ( username == null && password != null )
405+ identityPoolId = config . FirstOrDefault ( prop =>
406+ prop . Key . ToLower ( ) == SchemaRegistryConfig . PropertyNames . SchemaRegistryBearerAuthIdentityPoolId ) . Value ;
407+ if ( logicalCluster == null || identityPoolId == null )
382408 {
383409 throw new ArgumentException (
384- $ "Invalid authentication header value provider configuration: Basic authentication password specified, but username not specified") ;
385- }
386- else if ( username != null && password != null )
387- {
388- authenticationHeaderValueProvider = new BasicAuthenticationHeaderValueProvider ( username , password ) ;
410+ $ "Invalid bearer authentication provider configuration: Logical cluster and identity pool ID must be specified") ;
389411 }
390412 }
391413
414+ switch ( bearerAuthSource )
415+ {
416+ case "STATIC_TOKEN" :
417+ bearerToken = config . FirstOrDefault ( prop =>
418+ prop . Key . ToLower ( ) == SchemaRegistryConfig . PropertyNames . SchemaRegistryBearerAuthToken ) . Value ;
419+
420+ if ( bearerToken == null )
421+ {
422+ throw new ArgumentException (
423+ $ "Invalid authentication header value provider configuration: Bearer authentication token not specified") ;
424+ }
425+ authenticationHeaderValueProvider = new StaticBearerAuthenticationHeaderValueProvider ( bearerToken , logicalCluster , identityPoolId ) ;
426+ break ;
427+
428+ case "OAUTHBEARER" :
429+ clientId = config . FirstOrDefault ( prop =>
430+ prop . Key . ToLower ( ) == SchemaRegistryConfig . PropertyNames . SchemaRegistryBearerAuthClientId ) . Value ;
431+
432+ clientSecret = config . FirstOrDefault ( prop =>
433+ prop . Key . ToLower ( ) == SchemaRegistryConfig . PropertyNames . SchemaRegistryBearerAuthClientSecret ) . Value ;
434+
435+ scope = config . FirstOrDefault ( prop =>
436+ prop . Key . ToLower ( ) == SchemaRegistryConfig . PropertyNames . SchemaRegistryBearerAuthScope ) . Value ;
437+
438+ tokenEndpointUrl = config . FirstOrDefault ( prop =>
439+ prop . Key . ToLower ( ) == SchemaRegistryConfig . PropertyNames . SchemaRegistryBearerAuthTokenEndpointUrl ) . Value ;
440+
441+ if ( tokenEndpointUrl == null || clientId == null || clientSecret == null || scope == null )
442+ {
443+ throw new ArgumentException (
444+ $ "Invalid bearer authentication provider configuration: Token endpoint URL, client ID, client secret, and scope must be specified") ;
445+ }
446+ authenticationHeaderValueProvider = new BearerAuthenticationHeaderValueProvider (
447+ new HttpClient ( ) , clientId , clientSecret , scope , tokenEndpointUrl , logicalCluster , identityPoolId , maxRetries , retriesWaitMs , retriesMaxWaitMs ) ;
448+ break ;
449+
450+ case "CUSTOM" :
451+ if ( authenticationHeaderValueProvider == null )
452+ {
453+ throw new ArgumentException (
454+ $ "Invalid authentication header value provider configuration: Custom authentication provider must be specified") ;
455+ }
456+ if ( ! ( authenticationHeaderValueProvider is IAuthenticationBearerHeaderValueProvider ) )
457+ {
458+ throw new ArgumentException (
459+ $ "Invalid authentication header value provider configuration: Custom authentication provider must implement IAuthenticationBearerHeaderValueProvider") ;
460+ }
461+ break ;
462+
463+ case "" :
464+ break ;
465+
466+ default :
467+ throw new ArgumentException (
468+ $ "Invalid value '{ bearerAuthSource } ' specified for property '{ SchemaRegistryConfig . PropertyNames . SchemaRegistryBearerAuthCredentialsSource } '") ;
469+ }
470+
392471 foreach ( var property in config )
393472 {
394473 if ( ! property . Key . StartsWith ( "schema.registry." ) )
@@ -405,6 +484,14 @@ public CachedSchemaRegistryClient(IEnumerable<KeyValuePair<string, string>> conf
405484 property . Key != SchemaRegistryConfig . PropertyNames . SchemaRegistryLatestCacheTtlSecs &&
406485 property . Key != SchemaRegistryConfig . PropertyNames . SchemaRegistryBasicAuthCredentialsSource &&
407486 property . Key != SchemaRegistryConfig . PropertyNames . SchemaRegistryBasicAuthUserInfo &&
487+ property . Key != SchemaRegistryConfig . PropertyNames . SchemaRegistryBearerAuthCredentialsSource &&
488+ property . Key != SchemaRegistryConfig . PropertyNames . SchemaRegistryBearerAuthToken &&
489+ property . Key != SchemaRegistryConfig . PropertyNames . SchemaRegistryBearerAuthClientId &&
490+ property . Key != SchemaRegistryConfig . PropertyNames . SchemaRegistryBearerAuthClientSecret &&
491+ property . Key != SchemaRegistryConfig . PropertyNames . SchemaRegistryBearerAuthScope &&
492+ property . Key != SchemaRegistryConfig . PropertyNames . SchemaRegistryBearerAuthTokenEndpointUrl &&
493+ property . Key != SchemaRegistryConfig . PropertyNames . SchemaRegistryBearerAuthLogicalCluster &&
494+ property . Key != SchemaRegistryConfig . PropertyNames . SchemaRegistryBearerAuthIdentityPoolId &&
408495 property . Key != SchemaRegistryConfig . PropertyNames . SchemaRegistryKeySubjectNameStrategy &&
409496 property . Key != SchemaRegistryConfig . PropertyNames . SchemaRegistryValueSubjectNameStrategy &&
410497 property . Key != SchemaRegistryConfig . PropertyNames . SslCaLocation &&
0 commit comments