前端开发之动态管理Nginx集群的方法( 三 )
function _M.init_worker()
local err
plugin_configs, err = core.config.new("/plugin_configs", {
automatic = true,
item_schema = core.schema.plugin_config,
checker = plugin_checker,
})
end
而在 config_etcd 的 new 函数中,则会循环注册_automatic_fetch 定时器:
function _M.new(key, opts)
ngx_timer_at(0, _automatic_fetch, obj)
end
_automatic_fetch 函数会反复执行 sync_data 函数(包装到 xpcall 之下是为了捕获异常):
local function _automatic_fetch(premature, self)
local ok, err = xpcall(function()
local ok, err = sync_data(self)
end, debug.traceback)
ngx_timer_at(0, _automatic_fetch, self)
end
sync_data 函数将通过 etcd 的 watch 机制获取更新,它的实现机制我们接下来会详细分析 。
总结下:
APISIX 在每个 Nginx Worker 进程的启动过程中,通过 ngx.timer.at 函数将_automatic_fetch 插入定时器 。_automatic_fetch 函数执行时会通过 sync_data 函数,基于 watch 机制接收 etcd 中的配置变更通知,这样,每个 Nginx 节点、每个 Worker 进程都将保持最新的配置 。如此设计还有 1 个明显的优点:etcd 中的配置直接写入 Nginx Worker 进程中,这样处理请求时就能直接使用新配置,无须在进程间同步配置,这要比启动 1 个 agent 进程更简单!
lua-resty-etcd 库的 HTTP/1.1 协议sync_data 函数到底是怎样获取 etcd 的配置变更消息的呢?先看下 sync_data 源码:
local etcd = require("resty.etcd")
etcd_cli, err = etcd.new(etcd_conf)
local function sync_data(self)
local dir_res, err = waitdir(self.etcd_cli, self.key, self.prev_index + 1, self.timeout)
end
local function waitdir(etcd_cli, key, modified_index, timeout)
local res_func, func_err, http_cli = etcd_cli:watchdir(key, opts)
if http_cli then
local res_cancel, err_cancel = etcd_cli:watchcancel(http_cli)
end
end
这里实际与 etcd 通讯的是lua-resty-etcd 库 。它提供的 watchdir 函数用于接收 etcd 发现 key 目录对应 value 变更后发出的通知 。
watchcancel 函数又是做什么的呢?这其实是 OpenResty 生态的缺憾导致的 。etcd v3 已经支持高效的 gRPC 协议(底层为 HTTP2 协议) 。你可能听说过,HTTP2 不但具备多路复用的能力,还支持服务器直接推送消息,从 HTTP3 协议对照理解 HTTP2:

文章插图
然而,**Lua 生态目前并不支持 HTTP2 协议!**所以 lua-resty-etcd 库实际是通过低效的 HTTP/1.1 协议与 etcd 通讯的,因此接收/watch 通知也是通过带有超时的/v3/watch 请求完成的 。这个现象其实是由 2 个原因造成的:
- Nginx 将自己定位为边缘负载均衡,因此上游必然是企业内网,时延低、带宽大,所以对上游协议不必支持 HTTP2 协议!
- 当 Nginx 的 upstream 不能提供 HTTP2 机制给 Lua 时,Lua 只能基于 cosocket 自己实现了 。HTTP2 协议非常复杂,目前还没有生产环境可用的 HTTP2 cosocket 库 。

文章插图
我们可以验证下 watchdir 函数的实现细节:
-- lib/resty/etcd/v3.lua文件
function _M.watchdir(self, key, opts)
return watch(self, key, attr)
end
local function watch(self, key, attr)
callback_fun, err, http_cli = request_chunk(self, 'POST', '/watch',
opts, attr.timeout or self.timeout)
return callback_fun
end
local function request_chunk(self, method, path, opts, timeout)
http_cli, err = utils.http.new()
-- 发起TCP连接
endpoint, err = http_request_chunk(self, http_cli)
-- 发送HTTP请求
res, err = http_cli:request({
method = method,
path = endpoint.api_prefix .. path,
body = body,
推荐阅读
- win7开机后无法进入系统的处理
- 如何更改电脑的开机密码
- 孩子多大分房睡比较好
- 怎么查询被开通手机号?
- 电脑系统win10忘记开机密码强制重置教程
- 电脑没声音怎么办?电脑没声音了怎么恢复?
- 5 个网站将您的前端技能提升 100 倍
- 红茶中的生物碱,历史上最早的红茶
- 西安全运会开始和结束时间是多久?
- 客服|南宁一公司全是美女,都开奔驰奥迪,警方一查真相不堪入目