VPP-Agent 支持数据库回滚功能
需求背景
现在前端发送请求数据到 upf 的流程架构如下图。前端请求 vpp-agent 的 api , vpp-agent 将数据保存到 redis 中; watcher 监控到数据的变化,将数据发送给 upf 。
在此实现方式中,前端收到 vpp-agent 的应答并非数据执行到 upf 中的结果,而是数据存入 vpp-agent 的数据库的结果;当 upf 插入数据失败时, watcher 可以接收到 upf 错误返回,但无法将错误信息报告到前端;当 upf 插入数据失败时, watcher 接收到 upf 错误返回,但没有对 redis 中的数据进行回滚,导致 vpp-agent 数据库的信息记录与 upf 实际运行状态不同步。
为优化以上问题,此次开发 vpp-agent 数据库回滚功能,保证 vpp-agent 数据库信息与 upf 实际运行状态能同步,同时使前端使用 restful api 时可以得到 upf 实时返回的结果。
设计方案
为使前端能得到 upf 实时返回的结果,而不是 vpp-agent 中间数据库的操作结果,现将前端的请求经 rest_api 模块处理后直接请求 upf ,获取结果后返回给前端;
为保留 vpp-agent 作为 upf 的数据库功能,当 upf 返回确定操作后,依然将记录存放到数据库中;
在原有 vpp-agent 数据库自动同步的基础上,取消已发送 upf 的数据的同步,避免重复发送数据到 upf 。
为此,(1)在 rest_api 模块将数据发送 upf 后、将数据插入数据库前,增加 cache 的操作,该操作将无需再同步 upf 的数据的唯一标识存放到 cache 中;(2)修改同步 upf 的 watcher 操作,该操作将先判断数据是否已经存放 cache ,有则删除 cache 且不需要发送 upf ,没有则发送 upf 同步数据。
上述唯一标识为 “ 增/删/改某条规则 ”,故标识设计为 rule id + post/delete/put 的形式。
代码实现
修改思路
1、修改 restful handler 增加 cache 调用
func GetMP2CacheTotally(key string) []string {
item := Mp2Cache.Get(key)
if item == nil {
return nil
} else {
return item.([]string)
}
}
func IsMp2CacheRecordExit(key string) bool {
var keys []string
sxKey := upf_model.Mp2Prefix
item := Mp2Cache.Get(sxKey)
if item != nil {
keys = item.([]string)
for _, k := range keys {
if k == key {
return true
}
}
}
return false
}
func AddMp2Cache(key string) {
var keys []string
sxKey := upf_model.Mp2Prefix
item := Mp2Cache.Get(sxKey)
if item != nil {
keys = item.([]string)
for _, k := range keys {
if k == key {
return
}
}
}
keys = append(keys, key)
Mp2Cache.Put(sxKey, keys)
}
func DelMp2Cache(key string) {
var keys []string
sxKey := upf_model.Mp2Prefix
item := Mp2Cache.Get(sxKey)
if item == nil {
return
}
keys = item.([]string)
for i, k := range keys {
if k == key {
keys[i] = keys[len(keys)-1]
keys[len(keys)-1] = ""
keys = keys[:len(keys)-1]
break
}
}
PfcpCache.Put(sxKey, keys)
}
2、修改 restful handler 增加 upf api 调用 (以 traffic rule post 请求为例)
// send to upf
mp2Id, err = p.CreateUUIDIndex(mp2TrafficRuleReqInfo.TrafficRuleId)
err = p.upfHandler.VppUpdateMP2TrafficRule(&mp2TrafficRuleReqInfo, mp2Id)
// add cache
cacheKey := mp2TrafficRuleReqInfo.TrafficRuleId
cache.AddMp2Cache(cacheKey)
3、修改 watcher 增加 cache 调用(以 traffic rule post 请求为例)
// the data in cache means it has send to upf
cacheKey := value.TrafficRuleId + "POST"
if cache.IsMp2CacheRecordExit(cacheKey) {
cache.DelMp2Cache(cacheKey)
d.log.Debugf("creating traffic rule information has been send to upf %+v", value)
return nil, nil
}
此次修改涉及接口
# mep 涉及接口
traffic_rule
dns_redirect_rule
acl_rule
bandwidth_rule
# mp2 涉及接口
traffic_rule
dns_redirect_rule
dns_rule
acl_rule
bandwidth_rule
测试步骤
1、 curl 请求
2、查看日志,确认同步过程绕开已发送 upf 的数据
3、如果 upf 发送不成功,确认数据库没有插入冗余数据
4、重启 vpp-agent 不影响数据同步。
相关知识
1、 vpp-agent Database Plugin
2、 VPP Connection in GoVPPMux Plugin
vpp-agent 与 vpp 的连接依赖 GoVppMux plugin 实现,主要有三种请求方式:同步请求、异步请求、多路请求。
三种请求方式都需要 vpp 提供 binary api 给 vpp-agent 使用。
同步请求方式:使用 sendRequest(requestData).RecevieReply(replyData)
方法f发送请求并获取应答, vpp-agent 发出请求会等待获取应答后再执行其他流程。
异步请求方式:使用 sendRequest
方法发送请求,使用RecevieReply
方法获取应答, vpp-agent 发出请求后可执行其他操作。
多路请求方式:使用SendMultiRequest
方法发送请求,使用ReceiveReply
方法获取多个应答。此方法适用于单个请求但有多个应答的场景中。
3、事务回滚的一般方法
1、事务表
2、消息队列
3、补偿机制(执行/回滚)
4、 TCC 模式(预占/确认/取消)
5、 Sagas 模式(拆分事务+补偿机制)
Reference
1、了解回滚的一般方法:https://blog.csdn.net/weixin_48182198/article/details/107968481
2、vpp-agent 数据库原理: https://docs.ligato.io/en/latest/plugins/db-plugins/
3、vpp-agent 连接 vpp 的原理: https://docs.ligato.io/en/latest/tutorials/07_vpp-connection/