# VPP-Agent 支持数据库回滚功能 ## 需求背景 现在前端发送请求数据到 upf 的流程架构如下图。前端请求 vpp-agent 的 api , vpp-agent 将数据保存到 redis 中; watcher 监控到数据的变化,将数据发送给 upf 。 在此实现方式中,前端收到 vpp-agent 的应答并非数据执行到 upf 中的结果,而是数据存入 vpp-agent 的数据库的结果;当 upf 插入数据失败时, watcher 可以接收到 upf 错误返回,但无法将错误信息报告到前端;当 upf 插入数据失败时, watcher 接收到 upf 错误返回,但没有对 redis 中的数据进行回滚,导致 vpp-agent 数据库的信息记录与 upf 实际运行状态不同步。 为优化以上问题,此次开发 vpp-agent 数据库回滚功能,保证 vpp-agent 数据库信息与 upf 实际运行状态能同步,同时使前端使用 restful api 时可以得到 upf 实时返回的结果。 ![image-before1](../../../../_static/vpp_agent_db_rollback_before1.png) ## 设计方案 1. 为使前端能得到 upf 实时返回的结果,而不是 vpp-agent 中间数据库的操作结果,现将前端的请求经 rest_api 模块处理后直接请求 upf ,获取结果后返回给前端; 2. 为保留 vpp-agent 作为 upf 的数据库功能,当 upf 返回确定操作后,依然将记录存放到数据库中; 3. 在原有 vpp-agent 数据库自动同步的基础上,取消已发送 upf 的数据的同步,避免重复发送数据到 upf 。 为此,(1)在 rest_api 模块将数据发送 upf 后、将数据插入数据库前,增加 cache 的操作,该操作将无需再同步 upf 的数据的唯一标识存放到 cache 中;(2)修改同步 upf 的 watcher 操作,该操作将先判断数据是否已经存放 cache ,有则删除 cache 且不需要发送 upf ,没有则发送 upf 同步数据。 上述唯一标识为 “ 增/删/改某条规则 ”,故标识设计为 rule id + post/delete/put 的形式。 ![image](../../../../_static/vpp_agent_db_rollback_increase.png) ![image](../../../../_static/vpp_agent_db_rollback_change.png) ![image](../../../../_static/vpp_agent_db_rollback_decrease.png) ![image](../../../../_static/vpp_agent_db_rollback_restart.png) ## 代码实现 ### 修改思路 1、修改 restful handler 增加 cache 调用 ```go func GetMP2CacheTotally(key string) []string { item := Mp2Cache.Get(key) if item == nil { return nil } else { return item.([]string) } } func IsMp2CacheRecordExit(key string) bool { var keys []string sxKey := upf_model.Mp2Prefix item := Mp2Cache.Get(sxKey) if item != nil { keys = item.([]string) for _, k := range keys { if k == key { return true } } } return false } func AddMp2Cache(key string) { var keys []string sxKey := upf_model.Mp2Prefix item := Mp2Cache.Get(sxKey) if item != nil { keys = item.([]string) for _, k := range keys { if k == key { return } } } keys = append(keys, key) Mp2Cache.Put(sxKey, keys) } func DelMp2Cache(key string) { var keys []string sxKey := upf_model.Mp2Prefix item := Mp2Cache.Get(sxKey) if item == nil { return } keys = item.([]string) for i, k := range keys { if k == key { keys[i] = keys[len(keys)-1] keys[len(keys)-1] = "" keys = keys[:len(keys)-1] break } } PfcpCache.Put(sxKey, keys) } ``` 2、修改 restful handler 增加 upf api 调用 (以 traffic rule post 请求为例) ```go // send to upf mp2Id, err = p.CreateUUIDIndex(mp2TrafficRuleReqInfo.TrafficRuleId) err = p.upfHandler.VppUpdateMP2TrafficRule(&mp2TrafficRuleReqInfo, mp2Id) // add cache cacheKey := mp2TrafficRuleReqInfo.TrafficRuleId cache.AddMp2Cache(cacheKey) ``` 3、修改 watcher 增加 cache 调用(以 traffic rule post 请求为例) ```go // the data in cache means it has send to upf cacheKey := value.TrafficRuleId + "POST" if cache.IsMp2CacheRecordExit(cacheKey) { cache.DelMp2Cache(cacheKey) d.log.Debugf("creating traffic rule information has been send to upf %+v", value) return nil, nil } ``` ### 此次修改涉及接口 ```bash # mep 涉及接口 traffic_rule dns_redirect_rule acl_rule bandwidth_rule # mp2 涉及接口 traffic_rule dns_redirect_rule dns_rule acl_rule bandwidth_rule ``` ## 测试步骤 1、 [curl 请求](./VPP_Agent_restful_example.md) 2、查看日志,确认同步过程绕开已发送 upf 的数据 ![image-20211130155127314](../../../../_static/vpp_agent_db_rollback_log_page.png) 3、如果 upf 发送不成功,确认数据库没有插入冗余数据 4、重启 vpp-agent 不影响数据同步。 ## 相关知识 ### 1、 vpp-agent Database Plugin ![image](../../../../_static/vpp_agent_db_rollback_datasync_watch.png) ### 2、 VPP Connection in GoVPPMux Plugin vpp-agent 与 vpp 的连接依赖 GoVppMux plugin 实现,主要有三种请求方式:同步请求、异步请求、多路请求。 三种请求方式都需要 vpp 提供 binary api 给 vpp-agent 使用。 同步请求方式:使用 `sendRequest(requestData).RecevieReply(replyData)` 方法f发送请求并获取应答, vpp-agent 发出请求会等待获取应答后再执行其他流程。 异步请求方式:使用 `sendRequest`方法发送请求,使用`RecevieReply`方法获取应答, vpp-agent 发出请求后可执行其他操作。 多路请求方式:使用`SendMultiRequest`方法发送请求,使用`ReceiveReply`方法获取多个应答。此方法适用于单个请求但有多个应答的场景中。 ### 3、事务回滚的一般方法 1、事务表 2、消息队列 3、补偿机制(执行/回滚) 4、 TCC 模式(预占/确认/取消) 5、 Sagas 模式(拆分事务+补偿机制) ## Reference 1、了解回滚的一般方法:https://blog.csdn.net/weixin_48182198/article/details/107968481 2、vpp-agent 数据库原理: https://docs.ligato.io/en/latest/plugins/db-plugins/ 3、vpp-agent 连接 vpp 的原理: https://docs.ligato.io/en/latest/tutorials/07_vpp-connection/