From 8f64846921bdd08d39b82aa20ba08b5c2f4d1e1a Mon Sep 17 00:00:00 2001 From: Baoyuan Date: Wed, 19 Nov 2025 09:29:00 +0800 Subject: [PATCH 1/3] fix: change consul event to shared dict --- apisix/cli/ops.lua | 10 ++ apisix/discovery/consul/init.lua | 180 +++++++++++++++++++++++------ conf/config.yaml.example | 4 + docs/en/latest/discovery/consul.md | 14 +++ 4 files changed, 171 insertions(+), 37 deletions(-) diff --git a/apisix/cli/ops.lua b/apisix/cli/ops.lua index 0bb64b68977a..b33c21bc52f7 100644 --- a/apisix/cli/ops.lua +++ b/apisix/cli/ops.lua @@ -745,6 +745,16 @@ Please modify "admin_key" in conf/config.yaml . end + if enabled_discoveries["consul"] then + if not sys_conf["discovery_shared_dicts"] then + sys_conf["discovery_shared_dicts"] = {} + end + + local consul_conf = yaml_conf.discovery["consul"] or {} + local shared_size = consul_conf.shared_dict_size or "10m" + sys_conf["discovery_shared_dicts"]["apisix_discovery_consul"] = shared_size + end + -- fix up lua path sys_conf["extra_lua_path"] = get_lua_path(yaml_conf.apisix.extra_lua_path) sys_conf["extra_lua_cpath"] = get_lua_path(yaml_conf.apisix.extra_lua_cpath) diff --git a/apisix/discovery/consul/init.lua b/apisix/discovery/consul/init.lua index 4d3c0e46be25..55d6d4c4028d 100644 --- a/apisix/discovery/consul/init.lua +++ b/apisix/discovery/consul/init.lua @@ -41,6 +41,8 @@ local pcall = pcall local null = ngx.null local type = type local next = next +local ngx_shared = ngx.shared +local ngx_config = ngx.config local all_services = core.table.new(0, 5) local default_service @@ -49,9 +51,22 @@ local sort_type local skip_service_map = core.table.new(0, 1) local dump_params -local events -local events_list local consul_services +local shared_dict_poll_interval +local consul_dict +local subsystem = ngx_config and ngx_config.subsystem or "http" +local consul_dict_name = "apisix_discovery_consul" +if subsystem == "stream" then + consul_dict_name = consul_dict_name .. "-stream" +end +if ngx_shared then + consul_dict = ngx_shared[consul_dict_name] +end + +local consul_dict_services_key = "services" +local consul_dict_version_key = "version" +local local_shm_version = 0 +local default_shared_dict_poll_interval = 1 local default_skip_services = {"consul"} local default_random_range = 5 @@ -65,21 +80,16 @@ local _M = { version = 0.3, } - -local function discovery_consul_callback(data, event, source, pid) - all_services = data - log.notice("update local variable all_services, event is: ", event, - "source: ", source, "server pid:", pid, - ", all services: ", json_delay_encode(all_services, true)) -end - - function _M.all_nodes() return all_services end function _M.nodes(service_name) + if not all_services then + sync_all_services_from_shm(true) + end + if not all_services then log.error("all_services is nil, failed to fetch nodes for : ", service_name) return @@ -113,6 +123,8 @@ local function update_all_services(consul_server_url, up_services) consul_services[consul_server_url] = up_services log.info("update all services: ", json_delay_encode(all_services, true)) + + persist_all_services_to_shm() end @@ -151,6 +163,10 @@ local function read_dump_services() all_services = entity.services log.info("load dump file into memory success") + + if ngx_worker_id and ngx_worker_id() == 0 then + persist_all_services_to_shm() + end end @@ -168,6 +184,80 @@ local function write_dump_services() end +local function persist_all_services_to_shm() + if not consul_dict then + return + end + + local data, err = core.json.encode(all_services) + if not data then + log.error("failed to encode consul services for shared dict: ", err) + return + end + + local ok, set_err = consul_dict:set(consul_dict_services_key, data) + if not ok then + log.error("failed to store consul services in shared dict: ", set_err) + return + end + + local new_version, incr_err = consul_dict:incr(consul_dict_version_key, 1, 0) + if not new_version then + log.error("failed to bump consul shared dict version: ", incr_err) + return + end + + local_shm_version = new_version + log.notice("persist consul services to shared dict, version: ", new_version) +end + + +local function sync_all_services_from_shm(force_log) + if not consul_dict then + return + end + + local version = consul_dict:get(consul_dict_version_key) + if not version then + if force_log then + log.info("consul shared dict version not found") + end + return + end + + if version == local_shm_version and not force_log then + return + end + + local data = consul_dict:get(consul_dict_services_key) + if not data then + if force_log then + log.info("consul shared dict services empty") + end + return + end + + local decoded, err = core.json.decode(data) + if not decoded then + log.error("failed to decode consul services from shared dict: ", err) + return + end + + all_services = decoded + local_shm_version = version + log.notice("load consul services from shared dict, version: ", version) +end + + +local function shared_dict_sync_timer(premature) + if premature then + return + end + + sync_all_services_from_shm(false) +end + + local function show_dump_file() if not dump_params then return 503, "dump params is nil" @@ -556,14 +646,6 @@ function _M.connect(premature, consul_server, retry_delay) update_all_services(consul_server.consul_server_url, up_services) - --update events - local post_ok, post_err = events:post(events_list._source, - events_list.updating, all_services) - if not post_ok then - log.error("post_event failure with ", events_list._source, - ", update all services error: ", post_err) - end - if dump_params then ngx_timer_at(0, write_dump_services) end @@ -610,7 +692,22 @@ end function _M.init_worker() - local consul_conf = local_conf.discovery.consul + local discovery_conf = local_conf.discovery + if not discovery_conf or not discovery_conf.consul then + return + end + + local consul_conf = discovery_conf.consul + + if not consul_dict then + error("lua_shared_dict \"" .. consul_dict_name .. "\" not configured") + end + + shared_dict_poll_interval = consul_conf.shared_dict_poll_interval + or default_shared_dict_poll_interval + if shared_dict_poll_interval <= 0 then + shared_dict_poll_interval = default_shared_dict_poll_interval + end if consul_conf.dump then local dump = consul_conf.dump @@ -621,25 +718,27 @@ function _M.init_worker() end end - events = require("apisix.events") - events_list = events:event_list( - "discovery_consul_update_all_services", - "updating" - ) + sync_all_services_from_shm(true) - if 0 ~= ngx_worker_id() then - events:register(discovery_consul_callback, events_list._source, events_list.updating) - return + local ok, err = ngx_timer_every(shared_dict_poll_interval, shared_dict_sync_timer) + if not ok then + log.error("failed to create consul shared dict sync timer: ", err) end - log.notice("consul_conf: ", json_delay_encode(consul_conf, true)) default_weight = consul_conf.weight sort_type = consul_conf.sort_type - -- set default service, used when the server node cannot be found if consul_conf.default_service then default_service = consul_conf.default_service default_service.weight = default_weight + else + default_service = nil + end + + if 0 ~= ngx_worker_id() then + return end + + log.notice("consul_conf: ", json_delay_encode(consul_conf, true)) if consul_conf.skip_services then skip_service_map = core.table.new(0, #consul_conf.skip_services) for _, v in ipairs(consul_conf.skip_services) do @@ -651,18 +750,18 @@ function _M.init_worker() skip_service_map[v] = true end - local consul_servers_list, err = format_consul_params(consul_conf) - if err then - error("format consul config got error: " .. err) + local consul_servers_list, format_err = format_consul_params(consul_conf) + if format_err then + error("format consul config got error: " .. format_err) end log.info("consul_server_list: ", json_delay_encode(consul_servers_list, true)) consul_services = core.table.new(0, 1) -- success or failure for _, server in ipairs(consul_servers_list) do - local ok, err = ngx_timer_at(0, _M.connect, server) - if not ok then - error("create consul got error: " .. err) + local timer_ok, timer_err = ngx_timer_at(0, _M.connect, server) + if not timer_ok then + error("create consul got error: " .. timer_err) end if server.keepalive == false then @@ -673,7 +772,14 @@ end function _M.dump_data() - return {config = local_conf.discovery.consul, services = all_services } + if not all_services then + sync_all_services_from_shm(false) + end + + local discovery_conf = local_conf.discovery + local consul_conf = discovery_conf and discovery_conf.consul + + return {config = consul_conf, services = all_services } end diff --git a/conf/config.yaml.example b/conf/config.yaml.example index 139e30edc367..103969d7aa22 100644 --- a/conf/config.yaml.example +++ b/conf/config.yaml.example @@ -202,6 +202,7 @@ nginx_config: # Config for render the template to generate n worker-events-stream: 10m tars-stream: 1m upstream-healthcheck-stream: 10m + apisix_discovery_consul: 10m # Add other custom Nginx configurations. # Users are responsible for validating the custom configurations @@ -294,6 +295,7 @@ nginx_config: # Config for render the template to generate n tracing_buffer: 10m plugin-api-breaker: 10m etcd-cluster-health-check: 10m + apisix_discovery_consul: 10m discovery: 1m jwks: 1m introspection: 10m @@ -376,6 +378,8 @@ nginx_config: # Config for render the template to generate n # weight: 1 # Default 1 # fetch_interval: 3 # Default 3s. Effective only when keepalive is false. # keepalive: true # Default to true. Use long pull to query Consul. +# shared_dict_poll_interval: 1 # Default 1s. Interval for workers to sync services from shared dict. +# shared_dict_size: 10m # Size reserved for lua_shared_dict "apisix_discovery_consul". # default_service: # Define the default service to route traffic to. # host: "127.0.0.1" # port: 20999 diff --git a/docs/en/latest/discovery/consul.md b/docs/en/latest/discovery/consul.md index fd6758bc9abb..d0b504eb3b78 100644 --- a/docs/en/latest/discovery/consul.md +++ b/docs/en/latest/discovery/consul.md @@ -47,6 +47,8 @@ discovery: weight: 1 # default 1 fetch_interval: 5 # default 3 sec, only take effect for keepalive: false way keepalive: true # default true, use the long pull way to query consul servers + shared_dict_size: 10m # optional, default 10m, size of lua_shared_dict "apisix_discovery_consul" + shared_dict_poll_interval: 1 # optional, default 1 sec, workers poll the shared dict at this interval sort_type: "origin" # default origin default_service: # you can define default service when missing hit host: "127.0.0.1" @@ -81,6 +83,18 @@ The `sort_type` has four optional values: - `port_sort`, sort by port - `combine_sort`, with the precondition that hosts are ordered, ports are also ordered. +- `shared_dict_size` controls how much memory is reserved for `lua_shared_dict apisix_discovery_consul`. + Increase it if you expect many services or large node lists. Default: `10m`. +- `shared_dict_poll_interval` controls how frequently (in seconds) non-sync workers refresh service + data from the shared dict. The default value is `1` and the value must be greater than `0`. + +:::note +Consul discovery now persists the full service snapshot in `lua_shared_dict apisix_discovery_consul`. +Declare this shared dict in the HTTP block (and `lua_shared_dict apisix_discovery_consul-stream` +under the stream block if you enable stream proxy) in your `nginx.conf`/`conf/config.yaml` +before starting APISIX; otherwise the module will refuse to start. +::: + #### Dump Data When we need reload `apisix` online, as the `consul` module maybe loads data from CONSUL slower than load routes from ETCD, and would get the log at the moment before load successfully from consul: From f511a470475910b782a2e2ba1e5ef4415984bc81 Mon Sep 17 00:00:00 2001 From: Baoyuan Date: Fri, 28 Nov 2025 17:15:52 +0800 Subject: [PATCH 2/3] fix: dict --- apisix/cli/ngx_tpl.lua | 1 + apisix/cli/ops.lua | 10 -------- apisix/discovery/consul/init.lua | 41 ++---------------------------- apisix/discovery/consul/schema.lua | 1 + conf/config.yaml.example | 3 --- docs/en/latest/discovery/consul.md | 11 -------- t/APISIX.pm | 1 + 7 files changed, 5 insertions(+), 63 deletions(-) diff --git a/apisix/cli/ngx_tpl.lua b/apisix/cli/ngx_tpl.lua index bfca1ccba5a2..7ec585b6924d 100644 --- a/apisix/cli/ngx_tpl.lua +++ b/apisix/cli/ngx_tpl.lua @@ -77,6 +77,7 @@ lua { lua_shared_dict status-report {* meta.lua_shared_dict["status-report"] *}; {% end %} lua_shared_dict nacos 10m; + lua_shared_dict consul 10m; } {% if enabled_stream_plugins["prometheus"] and not enable_http then %} diff --git a/apisix/cli/ops.lua b/apisix/cli/ops.lua index b33c21bc52f7..0bb64b68977a 100644 --- a/apisix/cli/ops.lua +++ b/apisix/cli/ops.lua @@ -745,16 +745,6 @@ Please modify "admin_key" in conf/config.yaml . end - if enabled_discoveries["consul"] then - if not sys_conf["discovery_shared_dicts"] then - sys_conf["discovery_shared_dicts"] = {} - end - - local consul_conf = yaml_conf.discovery["consul"] or {} - local shared_size = consul_conf.shared_dict_size or "10m" - sys_conf["discovery_shared_dicts"]["apisix_discovery_consul"] = shared_size - end - -- fix up lua path sys_conf["extra_lua_path"] = get_lua_path(yaml_conf.apisix.extra_lua_path) sys_conf["extra_lua_cpath"] = get_lua_path(yaml_conf.apisix.extra_lua_cpath) diff --git a/apisix/discovery/consul/init.lua b/apisix/discovery/consul/init.lua index 55d6d4c4028d..f13eb19587f3 100644 --- a/apisix/discovery/consul/init.lua +++ b/apisix/discovery/consul/init.lua @@ -53,20 +53,10 @@ local dump_params local consul_services local shared_dict_poll_interval -local consul_dict -local subsystem = ngx_config and ngx_config.subsystem or "http" -local consul_dict_name = "apisix_discovery_consul" -if subsystem == "stream" then - consul_dict_name = consul_dict_name .. "-stream" -end -if ngx_shared then - consul_dict = ngx_shared[consul_dict_name] -end + +local consul_dict = ngx.shared.consul local consul_dict_services_key = "services" -local consul_dict_version_key = "version" -local local_shm_version = 0 -local default_shared_dict_poll_interval = 1 local default_skip_services = {"consul"} local default_random_range = 5 @@ -200,15 +190,6 @@ local function persist_all_services_to_shm() log.error("failed to store consul services in shared dict: ", set_err) return end - - local new_version, incr_err = consul_dict:incr(consul_dict_version_key, 1, 0) - if not new_version then - log.error("failed to bump consul shared dict version: ", incr_err) - return - end - - local_shm_version = new_version - log.notice("persist consul services to shared dict, version: ", new_version) end @@ -217,18 +198,6 @@ local function sync_all_services_from_shm(force_log) return end - local version = consul_dict:get(consul_dict_version_key) - if not version then - if force_log then - log.info("consul shared dict version not found") - end - return - end - - if version == local_shm_version and not force_log then - return - end - local data = consul_dict:get(consul_dict_services_key) if not data then if force_log then @@ -244,8 +213,6 @@ local function sync_all_services_from_shm(force_log) end all_services = decoded - local_shm_version = version - log.notice("load consul services from shared dict, version: ", version) end @@ -704,10 +671,6 @@ function _M.init_worker() end shared_dict_poll_interval = consul_conf.shared_dict_poll_interval - or default_shared_dict_poll_interval - if shared_dict_poll_interval <= 0 then - shared_dict_poll_interval = default_shared_dict_poll_interval - end if consul_conf.dump then local dump = consul_conf.dump diff --git a/apisix/discovery/consul/schema.lua b/apisix/discovery/consul/schema.lua index 5d6fc641e85b..554abaa0bb5c 100644 --- a/apisix/discovery/consul/schema.lua +++ b/apisix/discovery/consul/schema.lua @@ -26,6 +26,7 @@ return { }, token = {type = "string", default = ""}, fetch_interval = {type = "integer", minimum = 1, default = 3}, + shared_dict_poll_interval = {type = "integer", minimum = 1, default = 1}, keepalive = { type = "boolean", default = true diff --git a/conf/config.yaml.example b/conf/config.yaml.example index 103969d7aa22..2985be84f4dd 100644 --- a/conf/config.yaml.example +++ b/conf/config.yaml.example @@ -202,7 +202,6 @@ nginx_config: # Config for render the template to generate n worker-events-stream: 10m tars-stream: 1m upstream-healthcheck-stream: 10m - apisix_discovery_consul: 10m # Add other custom Nginx configurations. # Users are responsible for validating the custom configurations @@ -295,7 +294,6 @@ nginx_config: # Config for render the template to generate n tracing_buffer: 10m plugin-api-breaker: 10m etcd-cluster-health-check: 10m - apisix_discovery_consul: 10m discovery: 1m jwks: 1m introspection: 10m @@ -379,7 +377,6 @@ nginx_config: # Config for render the template to generate n # fetch_interval: 3 # Default 3s. Effective only when keepalive is false. # keepalive: true # Default to true. Use long pull to query Consul. # shared_dict_poll_interval: 1 # Default 1s. Interval for workers to sync services from shared dict. -# shared_dict_size: 10m # Size reserved for lua_shared_dict "apisix_discovery_consul". # default_service: # Define the default service to route traffic to. # host: "127.0.0.1" # port: 20999 diff --git a/docs/en/latest/discovery/consul.md b/docs/en/latest/discovery/consul.md index d0b504eb3b78..33d3426b71f7 100644 --- a/docs/en/latest/discovery/consul.md +++ b/docs/en/latest/discovery/consul.md @@ -47,7 +47,6 @@ discovery: weight: 1 # default 1 fetch_interval: 5 # default 3 sec, only take effect for keepalive: false way keepalive: true # default true, use the long pull way to query consul servers - shared_dict_size: 10m # optional, default 10m, size of lua_shared_dict "apisix_discovery_consul" shared_dict_poll_interval: 1 # optional, default 1 sec, workers poll the shared dict at this interval sort_type: "origin" # default origin default_service: # you can define default service when missing hit @@ -82,19 +81,9 @@ The `sort_type` has four optional values: - `host_sort`, sort by host - `port_sort`, sort by port - `combine_sort`, with the precondition that hosts are ordered, ports are also ordered. - -- `shared_dict_size` controls how much memory is reserved for `lua_shared_dict apisix_discovery_consul`. - Increase it if you expect many services or large node lists. Default: `10m`. - `shared_dict_poll_interval` controls how frequently (in seconds) non-sync workers refresh service data from the shared dict. The default value is `1` and the value must be greater than `0`. -:::note -Consul discovery now persists the full service snapshot in `lua_shared_dict apisix_discovery_consul`. -Declare this shared dict in the HTTP block (and `lua_shared_dict apisix_discovery_consul-stream` -under the stream block if you enable stream proxy) in your `nginx.conf`/`conf/config.yaml` -before starting APISIX; otherwise the module will refuse to start. -::: - #### Dump Data When we need reload `apisix` online, as the `consul` module maybe loads data from CONSUL slower than load routes from ETCD, and would get the log at the moment before load successfully from consul: diff --git a/t/APISIX.pm b/t/APISIX.pm index 4ef30e506e71..069f1111b7f7 100644 --- a/t/APISIX.pm +++ b/t/APISIX.pm @@ -296,6 +296,7 @@ lua { lua_shared_dict standalone-config 10m; lua_shared_dict status-report 1m; lua_shared_dict nacos 10m; + lua_shared_dict consul 10m; } _EOC_ } From f15b23a88ba6439f309b7d5ff3c49a1e0ecc27e0 Mon Sep 17 00:00:00 2001 From: Baoyuan Date: Fri, 28 Nov 2025 17:35:58 +0800 Subject: [PATCH 3/3] fix: lint error --- apisix/discovery/consul/init.lua | 87 ++++++++++++++++---------------- 1 file changed, 43 insertions(+), 44 deletions(-) diff --git a/apisix/discovery/consul/init.lua b/apisix/discovery/consul/init.lua index f13eb19587f3..f69785c1aa62 100644 --- a/apisix/discovery/consul/init.lua +++ b/apisix/discovery/consul/init.lua @@ -41,8 +41,6 @@ local pcall = pcall local null = ngx.null local type = type local next = next -local ngx_shared = ngx.shared -local ngx_config = ngx.config local all_services = core.table.new(0, 5) local default_service @@ -54,6 +52,7 @@ local dump_params local consul_services local shared_dict_poll_interval +local consul_dict_name = "consul" local consul_dict = ngx.shared.consul local consul_dict_services_key = "services" @@ -66,6 +65,48 @@ local watch_type_catalog = 1 local watch_type_health = 2 local max_retry_time = 256 +local function persist_all_services_to_shm() + if not consul_dict then + return + end + + local data, err = core.json.encode(all_services) + if not data then + log.error("failed to encode consul services for shared dict: ", err) + return + end + + local ok, set_err = consul_dict:set(consul_dict_services_key, data) + if not ok then + log.error("failed to store consul services in shared dict: ", set_err) + return + end +end + + +local function sync_all_services_from_shm(force_log) + if not consul_dict then + return + end + + local data = consul_dict:get(consul_dict_services_key) + if not data then + if force_log then + log.info("consul shared dict services empty") + end + return + end + + local decoded, err = core.json.decode(data) + if not decoded then + log.error("failed to decode consul services from shared dict: ", err) + return + end + + all_services = decoded +end + + local _M = { version = 0.3, } @@ -174,48 +215,6 @@ local function write_dump_services() end -local function persist_all_services_to_shm() - if not consul_dict then - return - end - - local data, err = core.json.encode(all_services) - if not data then - log.error("failed to encode consul services for shared dict: ", err) - return - end - - local ok, set_err = consul_dict:set(consul_dict_services_key, data) - if not ok then - log.error("failed to store consul services in shared dict: ", set_err) - return - end -end - - -local function sync_all_services_from_shm(force_log) - if not consul_dict then - return - end - - local data = consul_dict:get(consul_dict_services_key) - if not data then - if force_log then - log.info("consul shared dict services empty") - end - return - end - - local decoded, err = core.json.decode(data) - if not decoded then - log.error("failed to decode consul services from shared dict: ", err) - return - end - - all_services = decoded -end - - local function shared_dict_sync_timer(premature) if premature then return