Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
using System.Threading.Tasks;
using System.Linq;
using System;
using System.ComponentModel;
using System.Net;
using System.Threading;
using System.Security.Cryptography.X509Certificates;

Expand All @@ -31,7 +31,7 @@ public record DekId(string KekName, string Subject, int? Version, DekFormat? Dek
/// <summary>
/// A caching DEK Registry client.
/// </summary>
public class CachedDekRegistryClient : IDekRegistryClient, IDisposable
public class CachedDekRegistryClient : IDekRegistryClient
{
private DekRestService restService;

Expand Down Expand Up @@ -71,12 +71,16 @@ public int MaxCachedKeys
/// <param name="authenticationHeaderValueProvider">
/// The authentication header value provider
/// </param>
/// <param name="proxy">
/// The proxy server to use for connections
/// </param>
public CachedDekRegistryClient(IEnumerable<KeyValuePair<string, string>> config,
IAuthenticationHeaderValueProvider authenticationHeaderValueProvider)
IAuthenticationHeaderValueProvider authenticationHeaderValueProvider,
IWebProxy proxy = null)
{
if (config == null)
{
throw new ArgumentNullException("config properties must be specified.");
throw new ArgumentNullException("config");
}
var schemaRegistryUrisMaybe = config.FirstOrDefault(prop =>
prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SchemaRegistryUrl);
Expand Down Expand Up @@ -236,8 +240,9 @@ public CachedDekRegistryClient(IEnumerable<KeyValuePair<string, string>> config,
$"Configured value for {SchemaRegistryConfig.PropertyNames.EnableSslCertificateVerification} must be a bool.");
}

this.restService = new DekRestService(schemaRegistryUris, timeoutMs, authenticationHeaderValueProvider,
SetSslConfig(config), sslVerify);
var sslCaLocation = config.FirstOrDefault(prop => prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SslCaLocation).Value;
var sslCaCertificate = string.IsNullOrEmpty(sslCaLocation) ? null : new X509Certificate2(sslCaLocation);
this.restService = new DekRestService(schemaRegistryUris, timeoutMs, authenticationHeaderValueProvider, SetSslConfig(config), sslVerify, sslCaCertificate, proxy);
}

/// <summary>
Expand Down Expand Up @@ -291,14 +296,6 @@ private List<X509Certificate2> SetSslConfig(IEnumerable<KeyValuePair<string, str
certificates.Add(new X509Certificate2(certificateLocation, certificatePassword));
}

var caLocation =
config.FirstOrDefault(prop => prop.Key.ToLower() == SchemaRegistryConfig.PropertyNames.SslCaLocation)
.Value ?? "";
if (!String.IsNullOrEmpty(caLocation))
{
certificates.Add(new X509Certificate2(caLocation));
}

return certificates;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,20 @@ public FieldEncryptionExecutor(IDekRegistryClient client, IClock clock)
Clock = clock ?? new Clock();
}

public override void Configure(IEnumerable<KeyValuePair<string, string>> config)
public override void Configure(IEnumerable<KeyValuePair<string, string>> config,
ISchemaRegistryClient client = null)
{
Configs = config;
if (Client == null)
{
Client = new CachedDekRegistryClient(Configs);
if (client != null)
{
Client = new CachedDekRegistryClient(Configs, client.AuthHeaderProvider, client.Proxy);
}
else
{
Client = new CachedDekRegistryClient(Configs);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

using System;
using System.Collections.Generic;
using System.Net;
using System.Net.Http;
using System.Threading.Tasks;
using System.Security.Cryptography.X509Certificates;
Expand All @@ -29,9 +30,9 @@ public class DekRestService : RestService
/// </summary>
public DekRestService(string schemaRegistryUrl, int timeoutMs,
IAuthenticationHeaderValueProvider authenticationHeaderValueProvider, List<X509Certificate2> certificates,
bool enableSslCertificateVerification) :
bool enableSslCertificateVerification, X509Certificate2 sslCaCertificate = null, IWebProxy proxy = null) :
base(schemaRegistryUrl, timeoutMs, authenticationHeaderValueProvider, certificates,
enableSslCertificateVerification)
enableSslCertificateVerification, sslCaCertificate, proxy)
{
}

Expand Down
3 changes: 2 additions & 1 deletion src/Confluent.SchemaRegistry.Rules/CelExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ public CelExecutor()
{
}

public void Configure(IEnumerable<KeyValuePair<string, string>> config)
public void Configure(IEnumerable<KeyValuePair<string, string>> config,
ISchemaRegistryClient client = null)
{
}

Expand Down
3 changes: 2 additions & 1 deletion src/Confluent.SchemaRegistry.Rules/CelFieldExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ public CelFieldExecutor()
public override string Type() => RuleType;


public override void Configure(IEnumerable<KeyValuePair<string, string>> config)
public override void Configure(IEnumerable<KeyValuePair<string, string>> config,
ISchemaRegistryClient client = null)
{
}

Expand Down
3 changes: 2 additions & 1 deletion src/Confluent.SchemaRegistry.Rules/JsonataExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ public JsonataExecutor()
{
}

public void Configure(IEnumerable<KeyValuePair<string, string>> config)
public void Configure(IEnumerable<KeyValuePair<string, string>> config,
ISchemaRegistryClient client = null)
{
}

Expand Down
3 changes: 2 additions & 1 deletion src/Confluent.SchemaRegistry/AsyncSerde.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ protected AsyncSerde(ISchemaRegistryClient schemaRegistryClient, SerdeConfig con

foreach (IRuleExecutor executor in this.ruleRegistry.GetExecutors())
{
executor.Configure(ruleConfigs);
executor.Configure(ruleConfigs, schemaRegistryClient);
}
}

Expand Down Expand Up @@ -297,6 +297,7 @@ protected async Task<object> ExecuteMigrations(
/// <param name="source"></param>
/// <param name="target"></param>
/// <param name="message"></param>
/// <param name="fieldTransformer"></param>
/// <returns></returns>
/// <exception cref="RuleConditionException"></exception>
/// <exception cref="ArgumentException"></exception>
Expand Down
21 changes: 18 additions & 3 deletions src/Confluent.SchemaRegistry/CachedSchemaRegistryClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ namespace Confluent.SchemaRegistry
/// - <see cref="CachedSchemaRegistryClient.GetSchemaBySubjectAndIdAsync(string, int, string)" />
/// - <see cref="CachedSchemaRegistryClient.RegisterSchemaAsync(string, Schema, bool)" />
/// - <see cref="CachedSchemaRegistryClient.RegisterSchemaAsync(string, string, bool)" />
/// - <see cref="CachedSchemaRegistryClient.GetRegisteredSchemaAsync(string, int)" />
/// - <see cref="CachedSchemaRegistryClient.GetRegisteredSchemaAsync(string, int, bool)" />
///
/// The following method calls do NOT cache results:
/// - <see cref="CachedSchemaRegistryClient.LookupSchemaAsync(string, Schema, bool, bool)" />
Expand All @@ -54,11 +54,13 @@ namespace Confluent.SchemaRegistry
/// - <see cref="CachedSchemaRegistryClient.GetCompatibilityAsync(string)" />
/// - <see cref="CachedSchemaRegistryClient.UpdateCompatibilityAsync(Compatibility, string)" />
/// </summary>
public class CachedSchemaRegistryClient : ISchemaRegistryClient, IDisposable
public class CachedSchemaRegistryClient : ISchemaRegistryClient
{
private readonly List<SchemaReference> EmptyReferencesList = new List<SchemaReference>();

private IEnumerable<KeyValuePair<string, string>> config;
private IAuthenticationHeaderValueProvider authHeaderProvider;
private IWebProxy proxy;

private IRestService restService;
private int identityMapCapacity;
Expand Down Expand Up @@ -117,6 +119,16 @@ public IEnumerable<KeyValuePair<string, string>> Config
=> config;


/// <inheritdoc />
public IAuthenticationHeaderValueProvider AuthHeaderProvider
=> authHeaderProvider;


/// <inheritdoc />
public IWebProxy Proxy
=> proxy;


/// <inheritdoc />
public int MaxCachedSchemas
=> identityMapCapacity;
Expand Down Expand Up @@ -176,10 +188,12 @@ public CachedSchemaRegistryClient(IEnumerable<KeyValuePair<string, string>> conf
{
if (config == null)
{
throw new ArgumentNullException("config properties must be specified.");
throw new ArgumentNullException("config");
}

this.config = config;
this.authHeaderProvider = authenticationHeaderValueProvider;
this.proxy = proxy;

keySubjectNameStrategy = GetKeySubjectNameStrategy(config);
valueSubjectNameStrategy = GetValueSubjectNameStrategy(config);
Expand Down Expand Up @@ -663,6 +677,7 @@ public async Task<RegisteredSchema> GetLatestSchemaAsync(string subject)
return schema;
}

/// <inheritdoc/>
public async Task<RegisteredSchema> GetLatestWithMetadataAsync(string subject,
IDictionary<string, string> metadata, bool ignoreDeletedSchemas)
{
Expand Down
3 changes: 2 additions & 1 deletion src/Confluent.SchemaRegistry/ErrorAction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ public class ErrorAction : IRuleAction
{
public static readonly string ActionType = "ERROR";

public void Configure(IEnumerable<KeyValuePair<string, string>> config)
public void Configure(IEnumerable<KeyValuePair<string, string>> config,
ISchemaRegistryClient client = null)
{
}

Expand Down
3 changes: 2 additions & 1 deletion src/Confluent.SchemaRegistry/FieldRuleExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ namespace Confluent.SchemaRegistry
{
public abstract class FieldRuleExecutor : IRuleExecutor
{
public abstract void Configure(IEnumerable<KeyValuePair<string, string>> config);
public abstract void Configure(IEnumerable<KeyValuePair<string, string>> config,
ISchemaRegistryClient client = null);

public abstract string Type();

Expand Down
4 changes: 3 additions & 1 deletion src/Confluent.SchemaRegistry/IRuleBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ public interface IRuleBase : IDisposable
/// Configure the rule executor or action
/// </summary>
/// <param name="config"></param>
void Configure(IEnumerable<KeyValuePair<string, string>> config);
/// <param name="client"></param>
void Configure(IEnumerable<KeyValuePair<string, string>> config,
ISchemaRegistryClient client = null);

/// <summary>
/// The type of rule executor or action
Expand Down
21 changes: 20 additions & 1 deletion src/Confluent.SchemaRegistry/ISchemaRegistryClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

using System;
using System.Collections.Generic;
using System.Net;
using System.Threading.Tasks;


Expand All @@ -32,6 +33,18 @@ public interface ISchemaRegistryClient : IDisposable
IEnumerable<KeyValuePair<string, string>> Config { get; }


/// <summary>
/// The authentication header provider.
/// </summary>
IAuthenticationHeaderValueProvider AuthHeaderProvider { get; }


/// <summary>
/// The web proxy.
/// </summary>
IWebProxy Proxy { get; }


/// <summary>
/// The maximum capacity of the local schema cache.
/// </summary>
Expand Down Expand Up @@ -141,7 +154,7 @@ public interface ISchemaRegistryClient : IDisposable


/// <summary>
/// Gets the schema uniquely identified by <paramref name="subject"> and <paramref name="id" />.
/// Gets the schema uniquely identified by <paramref name="subject" /> and <paramref name="id" />.
/// </summary>
/// <param name="subject">
/// The subject.
Expand Down Expand Up @@ -238,6 +251,12 @@ public interface ISchemaRegistryClient : IDisposable
/// <param name="subject">
/// The subject to get the latest associated schema for.
/// </param>
/// <param name="metadata">
/// The metadata to search for.
/// </param>
/// <param name="ignoreDeletedSchemas">
/// Whether to ignore deleted schemas.
/// </param>
/// <returns>
/// The latest schema with the given metadata registered against <paramref name="subject" />.
/// </returns>
Expand Down
3 changes: 2 additions & 1 deletion src/Confluent.SchemaRegistry/NoneAction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ public class NoneAction : IRuleAction
{
public static readonly string ActionType = "NONE";

public void Configure(IEnumerable<KeyValuePair<string, string>> config)
public void Configure(IEnumerable<KeyValuePair<string, string>> config,
ISchemaRegistryClient client = null)
{
}

Expand Down
1 change: 0 additions & 1 deletion src/Confluent.SchemaRegistry/Rest/RestService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
using System.Threading.Tasks;
using X509Certificate2 = System.Security.Cryptography.X509Certificates.X509Certificate2;

using System.Security.Cryptography.X509Certificates;
using System.Net.Security;

namespace Confluent.SchemaRegistry
Expand Down