-
Notifications
You must be signed in to change notification settings - Fork 767
Description
[toc]
主从数据库(读写分离)设计
需求分析
- 将单一数据库的部属方式更新为主从(master-slave)结构的部属方式,将读写分离,解决单一数据库处理能力不足的问题
- 主从数据库之间通过数据库自身的支持进行数据单向(主 --> 从)同步
- 当一个操作存在CUD(新增、更新、删除)的(事务)操作时,走主数据库
- 当一个操作只存在读取操作,并且不能忍受数据延迟时,走主数据库
- 当一个操作只存在读取操作,并且可以容忍一定的数据延迟时,可以走从数据库(大多数前台数据都是此类型)
- 是否走从数据库,应当可配置可控
- 是否走从数据库,对于业务实现应该是无感的
- 实现一些从数据库的选择决策算法(随机,顺序,权重等)
需求实现
主从数据库配置选项
从数据库配置选项
/// <summary>
/// 从数据库选项
/// </summary>
public class SlaveDatabaseOptions : DataErrorInfoBase
{
/// <summary>
/// 获取或设置 权重
/// </summary>
public int Weight { get; set; }
/// <summary>
/// 获取或设置 数据库连接串
/// </summary>
public string ConnectionString { get; set; }
}
在数据上下文配置中使用,数据上下文配置中,配置了主库连接字符串ConnectionString
,从数据库选项集合Slaves
,从数据库选择策略名SlaveSelectorName
/// <summary>
/// 数据上下文配置节点
/// </summary>
public class OsharpDbContextOptions
{
// 其他属性
/// <summary>
/// 获取或设置 主数据库连接字符串
/// </summary>
public string ConnectionString { get; set; }
/// <summary>
/// 获取或设置 从数据库轮询策略
/// </summary>
public string SlaveSelectorName { get; set; }
/// <summary>
/// 获取或设置 从数据库选项集合
/// </summary>
public SlaveDatabaseOptions[] Slaves { get; set; }
// 其他属性
}
相应的appsetting.json
中的JSON配置节点
{
"OSharp": {
"DbContexts": {
"SqlServer": {
"DbContextTypeName": "OSharp.Entity.DefaultDbContext,OSharp.EntityFrameworkCore",
"ConnectionString": "Server=localhost;Database=osharpns-dev-api;User Id=sa;Password=Abc123456!;MultipleActiveResultSets=true",
"Slaves": [
{
"Name": "Slave01",
"Weight": 2,
"ConnectionString": "Server=localhost;Database=osharpns-dev-api-slave01;User Id=sa;Password=Abc123456!;MultipleActiveResultSets=true"
},
{
"Name": "Slave02",
"Weight": 5,
"ConnectionString": "Server=localhost;Database=osharpns-dev-api-slave02;User Id=sa;Password=Abc123456!;MultipleActiveResultSets=true"
}
],
"SlaveSelectorName": "Weight",
"DatabaseType": "SqlServer",
"AuditEntityEnabled": true,
"AutoMigrationEnabled": true
}
}
}
}
主从分离数据存储实现
实现思路:
构建数据上下文
DbContext
构建数据上下文选项构建者
DbContextOptionsBuilder
应用具体的数据库驱动平台数据库连接字符串提供者
IConnectionStringProvider
,动态获取数据库连接字符串数据库主从分离策略
IMasterSlaveSplitPolicy
,决定走主数据库还是走从数据库返回主数据库连接字符串
从数据库选择器
ISlaveDatabaseSelector
,从多个从数据库中选择一个执行数据读取从数据库随机选择器
RandomSlaveDatabaseSelector
从数据库顺序选择器
SequenceSlaveDatabaseSelector
从数据库滑动加权选择器
WeightSlaveDatabaseSelector
返回从数据库连接字符串
流程图:
具体实现:
业务实现的时候从主从分离的数据库中进行数据存取操作,用哪个数据库,主要就是取决于数据库连接字符串的选择,因此,只要在创建数据上下文DbContext
实例之前决定好数据库连接字符串,就能动态选择不同的数据库。
OSharp 框架是在构建DbContextOptionsBuilder
的时候使用数据库连接字符串的,定义了一个**数据库连接字符串提供者IConnectionStringProvider
**接口来动态提供要连接的数据库连接字符串
/// <summary>
/// 构建<see cref="DbContextOptionsBuilder"/>,附加必要的扩展属性
/// </summary>
public static DbContextOptionsBuilder BuildDbContextOptionsBuilder<TDbContext>(this IServiceProvider provider, DbContextOptionsBuilder builder) where TDbContext : DbContext
{
...
IDbContextOptionsBuilderDriveHandler driveHandler = provider.GetServices<IDbContextOptionsBuilderDriveHandler>()
.LastOrDefault(m => m.Type == databaseType);
if (driveHandler == null)
{
throw new OsharpException($"无法解析类型为 {databaseType} 的 {typeof(IDbContextOptionsBuilderDriveHandler).DisplayName()} 实例");
}
/*
//旧代码
ScopedDictionary scopedDictionary = provider.GetService<ScopedDictionary>();
string key = $"DbConnection_{osharpDbContextOptions.ConnectionString}";
DbConnection existingDbConnection = scopedDictionary.GetValue<DbConnection>(key);
builder = driveHandler.Handle(builder, osharpDbContextOptions.ConnectionString, existingDbConnection);
*/
//新代码
IConnectionStringProvider connectionStringProvider = provider.GetRequiredService<IConnectionStringProvider>();
string connectionString = connectionStringProvider.GetConnectionString(typeof(TDbContext));
ScopedDictionary scopedDictionary = provider.GetService<ScopedDictionary>();
string key = $"DbConnection_{connectionString}";
DbConnection existingDbConnection = scopedDictionary.GetValue<DbConnection>(key);
builder = driveHandler.Handle(builder, connectionString, existingDbConnection);
...
}
数据库连接字符串提供者IConnectionStringProvider
/// <summary>
/// 数据库连接字符串提供器
/// </summary>
public interface IConnectionStringProvider
{
/// <summary>
/// 获取指定数据上下文类型的数据库连接字符串
/// </summary>
/// <param name="dbContextType">数据上下文类型</param>
/// <returns></returns>
string GetConnectionString(Type dbContextType);
}
IConnectionStringProvider
实现类ConnectionStringProvider
/// <summary>
/// 数据库连接字符串提供者
/// </summary>
public class ConnectionStringProvider : IConnectionStringProvider
{
private readonly IDictionary<string, OsharpDbContextOptions> _dbContexts;
private readonly ISlaveDatabaseSelector[] _slaveDatabaseSelectors;
private readonly IMasterSlaveSplitPolicy _masterSlavePolicy;
/// <summary>
/// 初始化一个<see cref="ConnectionStringProvider"/>类型的新实例
/// </summary>
public ConnectionStringProvider(IServiceProvider provider)
{
_dbContexts = provider.GetOSharpOptions().DbContexts;
_masterSlavePolicy = provider.GetService<IMasterSlaveSplitPolicy>();
_slaveDatabaseSelectors = provider.GetServices<ISlaveDatabaseSelector>().ToArray();
}
/// <summary>
/// 获取指定数据上下文类型的数据库连接字符串
/// </summary>
/// <param name="dbContextType">数据上下文类型</param>
/// <returns></returns>
public virtual string GetConnectionString(Type dbContextType)
{
OsharpDbContextOptions dbContextOptions = _dbContexts.Values.FirstOrDefault(m => m.DbContextType == dbContextType);
if (dbContextOptions == null)
{
throw new OsharpException($"数据上下文“{dbContextType}”的数据上下文配置信息不存在");
}
bool isSlave = _masterSlavePolicy.IsToSlaveDatabase(dbContextOptions);
if (!isSlave)
{
return dbContextOptions.ConnectionString;
}
SlaveDatabaseOptions[] slaves = dbContextOptions.Slaves;
ISlaveDatabaseSelector slaveDatabaseSelector = _slaveDatabaseSelectors.LastOrDefault(m => m.Name == dbContextOptions.SlaveSelectorName)
?? _slaveDatabaseSelectors.First(m => m.Name == "Weight");
SlaveDatabaseOptions slave = slaveDatabaseSelector.Select(slaves);
return slave.ConnectionString;
}
}
数据库主从分离策略IMasterSlaveSplitPolicy
/// <summary>
/// 定义数据库主从分离策略
/// </summary>
public interface IMasterSlaveSplitPolicy
{
/// <summary>
/// 是否前往从数据库
/// </summary>
/// <param name="options">数据上下文选项</param>
/// <returns></returns>
bool IsToSlaveDatabase(OsharpDbContextOptions options);
}
IMasterSlaveSplitPolicy
实现类MasterSlaveSplitPolicy
主从分离策略,默认流程如下:
上下文配置中从数据库配置不存在
工作单元启用了事务
当前执行Function为空或者Function上没有设置走从库
走主数据库
Function设置走从库
走从数据库
作为业务功能描述的Function
信息中,有一个选项配置Function.IsSlaveDatabase
是否走从库
只有只读业务,并且能忍受一定的数据延迟的,才应配置为走从库
如果业务涉及新增、更新、删除操作,默认策略将忽略
IsSlaveDatabase
配置
/// <summary>
/// 定义功能信息
/// </summary>
public interface IFunction : IEntity<Guid>, ILockable, IEntityHash
{
//...
/// <summary>
/// 获取或设置 是否从库读取数据
/// </summary>
bool IsSlaveDatabase { get; set; }
//...
}
默认主从分离策略实现MasterSlaveSplitPolicy
/// <summary>
/// 主从分离策略
/// </summary>
internal class MasterSlaveSplitPolicy : IMasterSlaveSplitPolicy
{
private readonly IUnitOfWork _unitOfWork;
private readonly ScopedDictionary _scopedDict;
/// <summary>
/// 初始化一个<see cref="MasterSlaveSplitPolicy"/>类型的新实例
/// </summary>
public MasterSlaveSplitPolicy(IServiceProvider provider)
{
_unitOfWork = provider.GetUnitOfWork(false);
_scopedDict = provider.GetRequiredService<ScopedDictionary>();
}
/// <summary>
/// 是否前往从数据库
/// </summary>
/// <param name="options">数据上下文选项</param>
/// <returns></returns>
public bool IsToSlaveDatabase(OsharpDbContextOptions options)
{
SlaveDatabaseOptions[] slaves = options.Slaves;
if (slaves.IsNullOrEmpty())
{
return false;
}
//允许工作单元事务,走主库
if (_unitOfWork.IsEnabledTransaction)
{
return false;
}
// 在Function显式配置走从库,才走从库
IFunction function = _scopedDict.Function;
if (function == null || !function.IsSlaveDatabase)
{
return false;
}
return true;
}
}
从数据库选择器ISlaveDatabaseSelector
/// <summary>
/// 定义从数据库选择功能
/// </summary>
[MultipleDependency]
public interface ISlaveDatabaseSelector
{
/// <summary>
/// 获取 名称
/// </summary>
string Name { get; }
/// <summary>
/// 从所有从数据库中返回一个
/// </summary>
/// <param name="slaves">所有从数据库</param>
/// <returns></returns>
SlaveDatabaseOptions Select(SlaveDatabaseOptions[] slaves);
}
从数据库随机选择器RandomSlaveDatabaseSelector
/// <summary>
/// 随机从数据库选择器
/// </summary>
public sealed class RandomSlaveDatabaseSelector : ISlaveDatabaseSelector
{
private static readonly Random Random = new Random();
private readonly ILogger _logger;
/// <summary>
/// 初始化一个<see cref="RandomSlaveDatabaseSelector"/>类型的新实例
/// </summary>
public RandomSlaveDatabaseSelector(IServiceProvider provider)
{
_logger = provider.GetLogger(GetType());
}
/// <summary>
/// 获取 名称
/// </summary>
public string Name => "Random";
/// <summary>
/// 从所有从数据库中返回一个
/// </summary>
/// <param name="slaves">所有从数据库</param>
/// <returns></returns>
public SlaveDatabaseOptions Select(SlaveDatabaseOptions[] slaves)
{
SlaveDatabaseOptions slave = Random.NextItem(slaves);
_logger.LogDebug($"随机选取了“{slave.Name}”的从数据库");
return slave;
}
}
从数据库顺序选择器SequenceSlaveDatabaseSelector
/// <summary>
/// 顺序轮询从数据库选择器
/// </summary>
public sealed class SequenceSlaveDatabaseSelector : ISlaveDatabaseSelector
{
private static readonly object LockObj = new object();
private int _sequenceIndex;
private readonly ILogger _logger;
/// <summary>
/// 初始化一个<see cref="SequenceSlaveDatabaseSelector"/>类型的新实例
/// </summary>
public SequenceSlaveDatabaseSelector(IServiceProvider provider)
{
_logger = provider.GetLogger(GetType());
}
/// <summary>
/// 获取 名称
/// </summary>
public string Name => "Sequence";
/// <summary>
/// 从所有从数据库中返回一个
/// </summary>
/// <param name="slaves">所有从数据库</param>
/// <returns></returns>
public SlaveDatabaseOptions Select(SlaveDatabaseOptions[] slaves)
{
lock (LockObj)
{
if (_sequenceIndex > slaves.Length - 1)
{
_sequenceIndex = 0;
}
SlaveDatabaseOptions slave = slaves[_sequenceIndex];
_logger.LogDebug($"顺序选取了“{slave.Name}”的从数据库,顺序号:{_sequenceIndex}");
_sequenceIndex++;
return slave;
}
}
}
从数据库平滑加权选择器WeightSlaveDatabaseSelector
/// <summary>
/// 平滑权重从数据库选择器
/// </summary>
public sealed class WeightSlaveDatabaseSelector : ISlaveDatabaseSelector
{
private static readonly object LockObj = new object();
private readonly ILogger _logger;
private Queue<int> _queue = new Queue<int>();
/// <summary>
/// 初始化一个<see cref="WeightSlaveDatabaseSelector"/>类型的新实例
/// </summary>
public WeightSlaveDatabaseSelector(IServiceProvider provider)
{
_logger = provider.GetLogger(GetType());
}
/// <summary>
/// 获取 名称
/// </summary>
public string Name => "Weight";
/// <summary>
/// 从所有从数据库中返回一个
/// </summary>
/// <param name="slaves">所有从数据库</param>
/// <returns></returns>
public SlaveDatabaseOptions Select(SlaveDatabaseOptions[] slaves)
{
lock (LockObj)
{
if (_queue.Count == 0)
{
_queue = GetIndexes(slaves);
}
int index = _queue.Dequeue();
SlaveDatabaseOptions slave = slaves[index];
_logger.LogDebug($"平滑权重选取了“{slave.Name}”的从数据库,权重:{slave.Weight}");
return slave;
}
}
private static Queue<int> GetIndexes(SlaveDatabaseOptions[] slaves)
{
SlaveDatabaseOptionsWrap[] wraps = slaves.Select(m => new SlaveDatabaseOptionsWrap(m)).ToArray();
int sum = wraps.Sum(m => m.Weight);
Queue<int> queue = new Queue<int>();
for (int i = 0; i < sum; i++)
{
int index = NextIndex(wraps);
queue.Enqueue(index);
}
return queue;
}
private static int NextIndex(SlaveDatabaseOptionsWrap[] wraps)
{
int index = -1, total = 0;
for (int i = 0; i < wraps.Length; i++)
{
SlaveDatabaseOptionsWrap wrap = wraps[i];
wrap.Current += wrap.Weight;
total += wrap.Weight;
if (index == -1 || wraps[index].Current < wrap.Current)
{
index = i;
}
}
wraps[index].Current -= total;
return index;
}
private class SlaveDatabaseOptionsWrap : SlaveDatabaseOptions
{
public SlaveDatabaseOptionsWrap(SlaveDatabaseOptions slave)
{
Weight = slave.Weight;
}
/// <summary>
/// 获取或设置 当前权重
/// </summary>
public int Current { get; set; }
}
}