VPP-Agent 支持数据库回滚功能

需求背景

现在前端发送请求数据到 upf 的流程架构如下图。前端请求 vpp-agent 的 api , vpp-agent 将数据保存到 redis 中; watcher 监控到数据的变化,将数据发送给 upf 。

在此实现方式中,前端收到 vpp-agent 的应答并非数据执行到 upf 中的结果,而是数据存入 vpp-agent 的数据库的结果;当 upf 插入数据失败时, watcher 可以接收到 upf 错误返回,但无法将错误信息报告到前端;当 upf 插入数据失败时, watcher 接收到 upf 错误返回,但没有对 redis 中的数据进行回滚,导致 vpp-agent 数据库的信息记录与 upf 实际运行状态不同步。

为优化以上问题,此次开发 vpp-agent 数据库回滚功能,保证 vpp-agent 数据库信息与 upf 实际运行状态能同步,同时使前端使用 restful api 时可以得到 upf 实时返回的结果。

image-before1

设计方案

  1. 为使前端能得到 upf 实时返回的结果,而不是 vpp-agent 中间数据库的操作结果,现将前端的请求经 rest_api 模块处理后直接请求 upf ,获取结果后返回给前端;

  2. 为保留 vpp-agent 作为 upf 的数据库功能,当 upf 返回确定操作后,依然将记录存放到数据库中;

  3. 在原有 vpp-agent 数据库自动同步的基础上,取消已发送 upf 的数据的同步,避免重复发送数据到 upf 。

    为此,(1)在 rest_api 模块将数据发送 upf 后、将数据插入数据库前,增加 cache 的操作,该操作将无需再同步 upf 的数据的唯一标识存放到 cache 中;(2)修改同步 upf 的 watcher 操作,该操作将先判断数据是否已经存放 cache ,有则删除 cache 且不需要发送 upf ,没有则发送 upf 同步数据。

    上述唯一标识为 “ 增/删/改某条规则 ”,故标识设计为 rule id + post/delete/put 的形式。

image

image

image

image

代码实现

修改思路

1、修改 restful handler 增加 cache 调用

func GetMP2CacheTotally(key string) []string {
	item := Mp2Cache.Get(key)
	if item == nil {
		return nil
	} else {
		return item.([]string)
	}
}

func IsMp2CacheRecordExit(key string) bool {
	var keys []string
	sxKey := upf_model.Mp2Prefix
	item := Mp2Cache.Get(sxKey)
	if item != nil {
		keys = item.([]string)
		for _, k := range keys {
			if k == key {
				return true
			}
		}
	}
	return false
}

func AddMp2Cache(key string) {
	var keys []string
	sxKey := upf_model.Mp2Prefix
	item := Mp2Cache.Get(sxKey)
	if item != nil {
		keys = item.([]string)
		for _, k := range keys {
			if k == key {
				return
			}
		}
	}
	keys = append(keys, key)
	Mp2Cache.Put(sxKey, keys)
}

func DelMp2Cache(key string) {
	var keys []string
	sxKey := upf_model.Mp2Prefix
	item := Mp2Cache.Get(sxKey)
	if item == nil {
		return
	}
	keys = item.([]string)
	for i, k := range keys {
		if k == key {
			keys[i] = keys[len(keys)-1]
			keys[len(keys)-1] = ""
			keys = keys[:len(keys)-1]
			break
		}
	}
	PfcpCache.Put(sxKey, keys)
}

2、修改 restful handler 增加 upf api 调用 (以 traffic rule post 请求为例)

// send to upf
mp2Id, err = p.CreateUUIDIndex(mp2TrafficRuleReqInfo.TrafficRuleId)
err = p.upfHandler.VppUpdateMP2TrafficRule(&mp2TrafficRuleReqInfo, mp2Id)
// add cache
cacheKey := mp2TrafficRuleReqInfo.TrafficRuleId
cache.AddMp2Cache(cacheKey)

3、修改 watcher 增加 cache 调用(以 traffic rule post 请求为例)

// the data in cache means it has send to upf
cacheKey := value.TrafficRuleId + "POST"
if cache.IsMp2CacheRecordExit(cacheKey) {
	cache.DelMp2Cache(cacheKey)
	d.log.Debugf("creating traffic rule information has been send to upf %+v", value)
	return nil, nil
}

此次修改涉及接口

# mep 涉及接口
traffic_rule
dns_redirect_rule
acl_rule
bandwidth_rule
# mp2 涉及接口
traffic_rule
dns_redirect_rule
dns_rule
acl_rule
bandwidth_rule

测试步骤

1、 curl 请求

2、查看日志,确认同步过程绕开已发送 upf 的数据

image-20211130155127314

3、如果 upf 发送不成功,确认数据库没有插入冗余数据

4、重启 vpp-agent 不影响数据同步。

相关知识

1、 vpp-agent Database Plugin

image

2、 VPP Connection in GoVPPMux Plugin

vpp-agent 与 vpp 的连接依赖 GoVppMux plugin 实现,主要有三种请求方式:同步请求、异步请求、多路请求。

三种请求方式都需要 vpp 提供 binary api 给 vpp-agent 使用。

同步请求方式:使用 sendRequest(requestData).RecevieReply(replyData) 方法f发送请求并获取应答, vpp-agent 发出请求会等待获取应答后再执行其他流程。

异步请求方式:使用 sendRequest方法发送请求,使用RecevieReply方法获取应答, vpp-agent 发出请求后可执行其他操作。

多路请求方式:使用SendMultiRequest方法发送请求,使用ReceiveReply方法获取多个应答。此方法适用于单个请求但有多个应答的场景中。

3、事务回滚的一般方法

1、事务表

2、消息队列

3、补偿机制(执行/回滚)

4、 TCC 模式(预占/确认/取消)

5、 Sagas 模式(拆分事务+补偿机制)

Reference

1、了解回滚的一般方法:https://blog.csdn.net/weixin_48182198/article/details/107968481

2、vpp-agent 数据库原理: https://docs.ligato.io/en/latest/plugins/db-plugins/

3、vpp-agent 连接 vpp 的原理: https://docs.ligato.io/en/latest/tutorials/07_vpp-connection/