-
Notifications
You must be signed in to change notification settings - Fork 998
Open
Description
假设在我的场景是,用二阶段消息更新redis缓存,假设是用户金额
在生产者直接把最新的数据发送到消费者,这样消费者不用做加减操作,消费者本身就是幂等性的了
但是需要在生产者传递一个version给消费者,因为消费者可能会乱序执行,导致旧数据覆盖新的数据,所以redis更新的时候需要乐观锁
//生产者,创建消息的地方
func RedisUpdateMoneyFullData() gin.HandlerFunc {{
return func(c *gin.Context) {
//我如果先查询version,并发情况下,这些version获取到的都是相同的
oldData:=gorm.find(select servion from xxxx where id=10)
//那么高并发下,2次查询的version可能都是1
//此时就会有问题,再更新到数据库还是相同的version
//所以这种方法并不行,除非在这里加一个锁
oldData.version=oldData.version+1
//创建消息
msg := dtmcli.NewMsg(DtmServer, gid).
Add(baseUrl+"/redis/updateRedis", &oldData)
//一进去到,DoAndSubmit里面,里面的prepare方法会立即发送消息给dtm
//dtm会把要发送的数据,也就是上面的oldData 存储到数据库中
err := msg.DoAndSubmit(baseUrl+"/QueryPrepare", func(bb *dtmcli.BranchBarrier) error {
begin := receiver.GormDb.Begin()
sourceTx := begin.Statement.ConnPool.(*sql.Tx)
return bb.Call(sourceTx, func(tx *sql.Tx) error {
//更新版本
updateSql := "update dtm_bank_a.user_account set balance = balance + ?,version= ?,where id = ?"
err := begin.Exec(updateSql, args.Amount,arg.Version args.Uid).Error
if err != nil {
return err
}
** //而理论上,应该是在这里把最新的version放到消息体里**
** //这样就是最新的version,但是因为**DoAndSubmi**t开始就需要msg消息体**
** //所以这里修改payloads是没用的,在源码通过唯一键判断,
//如果出错了,依旧会取到已经保存到数据库的payload
**
return nil
})
})
return err
}
}
//消费者,(消费者可能会乱序执行)
func RedisUpdateMoneyFullData() gin.HandlerFunc {{
return func(c *gin.Context) {
//在redis lua脚本里
//从消息体里获取到nowVersion,和redis里的oldVersion判断
//如果nowVersion>oldVersion,那么才允许更新
}}
我的解决办法
解决方案1
加锁
//生产者,创建消息的地方
func RedisUpdateMoneyFullData() gin.HandlerFunc {{
return func(c *gin.Context) {
//加锁,粒度为用户id
//但是这样性能损耗比较大
lock.lock(uid)
defer lock.unlock(uid)
oldData:=gorm.find(select servion from xxxx where id=10)
oldData.version=oldData.version+1
//创建消息
msg := dtmcli.NewMsg(DtmServer, gid).
Add(baseUrl+"/redis/updateRedis", &oldData)
err := msg.DoAndSubmit(baseUrl+"/QueryPrepare", func(bb *dtmcli.BranchBarrier) error {
begin := receiver.GormDb.Begin()
sourceTx := begin.Statement.ConnPool.(*sql.Tx)
return bb.Call(sourceTx, func(tx *sql.Tx) error {
updateSql := "update dtm_bank_a.user_account set balance = balance + ?,version= ?,where id = ?"
err := begin.Exec(updateSql, args.Amount,arg.Version args.Uid).Error
if err != nil {
return err
}
return nil
})
})
}解决方案2
把事务先开启,
方案2 目前测试是可行的,但是我并不知道会有什么隐式问题,我看了一下源码,貌似不会有问题,但是这种写法,我也感觉有点变扭,不知道是否可行
var request model.RequestArgsHTTP
err := c.BindJSON(&request)
if err != nil {
c.JSON(http.StatusBadRequest, err.Error())
return
}
gid := dtmcli.MustGenGid(DtmServer)
//先开启事务
begin := receiver.GormDb.Begin()
//直接更新
updateSql := "update dtm_bank_a.user_account set balance = balance + ?,version=version+1 where id = ?"
err := begin.Exec(updateSql, args.Amount, args.Uid).Error
if err != nil {
return err
}
if err != nil {
//如果在DoAndSubmit之前发生的错误,我们就需要手动回滚了,因为还没被屏障方法接管
begin.Rollback()
return
}
//获取最新version和金额
var oldAccount model.UserAccount
tx := begin.Where("id = ?", request.Uid).Find(&oldAccount)
if tx.Error != nil {
return
}
//此时就可以把最新的数据放到消息体里了
msg := dtmcli.NewMsg(DtmServer, gid).
Add(baseUrl+"/redis/updateRedisForFullData", &oldAccount)
//而DoAndSubmit只用来提交事务,之前是把事务放到里面执行,但是现在我们放外面执行先,但是不提交
// 只要不提交,那就不算数
err = msg.DoAndSubmit(baseUrl+"/QueryPrepare", func(bb *dtmcli.BranchBarrier) error {
sourceTx := begin.Statement.ConnPool.(*sql.Tx)
return bb.Call(sourceTx, func(tx1 *sql.Tx) error {
return nil
})
})
c.JSON(http.StatusOK, gin.H{
"msg": "成功",
"gid": gid,
})Metadata
Metadata
Assignees
Labels
No labels