Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions apisix/cli/ngx_tpl.lua
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ lua {
lua_shared_dict status-report {* meta.lua_shared_dict["status-report"] *};
{% end %}
lua_shared_dict nacos 10m;
lua_shared_dict consul 10m;
}

{% if enabled_stream_plugins["prometheus"] and not enable_http then %}
Expand Down
140 changes: 104 additions & 36 deletions apisix/discovery/consul/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,13 @@ local sort_type
local skip_service_map = core.table.new(0, 1)
local dump_params

local events
local events_list
local consul_services
local shared_dict_poll_interval

local consul_dict_name = "consul"
local consul_dict = ngx.shared.consul

local consul_dict_services_key = "services"

local default_skip_services = {"consul"}
local default_random_range = 5
Expand All @@ -61,25 +65,62 @@ local watch_type_catalog = 1
local watch_type_health = 2
local max_retry_time = 256

local _M = {
version = 0.3,
}
local function persist_all_services_to_shm()
if not consul_dict then
return
end

local data, err = core.json.encode(all_services)
if not data then
log.error("failed to encode consul services for shared dict: ", err)
return
end

local function discovery_consul_callback(data, event, source, pid)
all_services = data
log.notice("update local variable all_services, event is: ", event,
"source: ", source, "server pid:", pid,
", all services: ", json_delay_encode(all_services, true))
local ok, set_err = consul_dict:set(consul_dict_services_key, data)
if not ok then
log.error("failed to store consul services in shared dict: ", set_err)
return
end
end


local function sync_all_services_from_shm(force_log)
if not consul_dict then
return
end

local data = consul_dict:get(consul_dict_services_key)
if not data then
if force_log then
log.info("consul shared dict services empty")
end
return
end

local decoded, err = core.json.decode(data)
if not decoded then
log.error("failed to decode consul services from shared dict: ", err)
return
end

all_services = decoded
end


local _M = {
version = 0.3,
}

function _M.all_nodes()
return all_services
end


function _M.nodes(service_name)
if not all_services then
sync_all_services_from_shm(true)
end

if not all_services then
log.error("all_services is nil, failed to fetch nodes for : ", service_name)
return
Expand Down Expand Up @@ -113,6 +154,8 @@ local function update_all_services(consul_server_url, up_services)
consul_services[consul_server_url] = up_services

log.info("update all services: ", json_delay_encode(all_services, true))

persist_all_services_to_shm()
end


Expand Down Expand Up @@ -151,6 +194,10 @@ local function read_dump_services()

all_services = entity.services
log.info("load dump file into memory success")

if ngx_worker_id and ngx_worker_id() == 0 then
persist_all_services_to_shm()
end
end


Expand All @@ -168,6 +215,15 @@ local function write_dump_services()
end


local function shared_dict_sync_timer(premature)
if premature then
return
end

sync_all_services_from_shm(false)
end


local function show_dump_file()
if not dump_params then
return 503, "dump params is nil"
Expand Down Expand Up @@ -556,14 +612,6 @@ function _M.connect(premature, consul_server, retry_delay)

update_all_services(consul_server.consul_server_url, up_services)

--update events
local post_ok, post_err = events:post(events_list._source,
events_list.updating, all_services)
if not post_ok then
log.error("post_event failure with ", events_list._source,
", update all services error: ", post_err)
end

if dump_params then
ngx_timer_at(0, write_dump_services)
end
Expand Down Expand Up @@ -610,7 +658,18 @@ end


function _M.init_worker()
local consul_conf = local_conf.discovery.consul
local discovery_conf = local_conf.discovery
if not discovery_conf or not discovery_conf.consul then
return
end

local consul_conf = discovery_conf.consul

if not consul_dict then
error("lua_shared_dict \"" .. consul_dict_name .. "\" not configured")
end

shared_dict_poll_interval = consul_conf.shared_dict_poll_interval

if consul_conf.dump then
local dump = consul_conf.dump
Expand All @@ -621,25 +680,27 @@ function _M.init_worker()
end
end

events = require("apisix.events")
events_list = events:event_list(
"discovery_consul_update_all_services",
"updating"
)
sync_all_services_from_shm(true)

if 0 ~= ngx_worker_id() then
events:register(discovery_consul_callback, events_list._source, events_list.updating)
return
local ok, err = ngx_timer_every(shared_dict_poll_interval, shared_dict_sync_timer)
if not ok then
log.error("failed to create consul shared dict sync timer: ", err)
end

log.notice("consul_conf: ", json_delay_encode(consul_conf, true))
default_weight = consul_conf.weight
sort_type = consul_conf.sort_type
-- set default service, used when the server node cannot be found
if consul_conf.default_service then
default_service = consul_conf.default_service
default_service.weight = default_weight
else
default_service = nil
end

if 0 ~= ngx_worker_id() then
return
end

log.notice("consul_conf: ", json_delay_encode(consul_conf, true))
if consul_conf.skip_services then
skip_service_map = core.table.new(0, #consul_conf.skip_services)
for _, v in ipairs(consul_conf.skip_services) do
Expand All @@ -651,18 +712,18 @@ function _M.init_worker()
skip_service_map[v] = true
end

local consul_servers_list, err = format_consul_params(consul_conf)
if err then
error("format consul config got error: " .. err)
local consul_servers_list, format_err = format_consul_params(consul_conf)
if format_err then
error("format consul config got error: " .. format_err)
end
log.info("consul_server_list: ", json_delay_encode(consul_servers_list, true))

consul_services = core.table.new(0, 1)
-- success or failure
for _, server in ipairs(consul_servers_list) do
local ok, err = ngx_timer_at(0, _M.connect, server)
if not ok then
error("create consul got error: " .. err)
local timer_ok, timer_err = ngx_timer_at(0, _M.connect, server)
if not timer_ok then
error("create consul got error: " .. timer_err)
end

if server.keepalive == false then
Expand All @@ -673,7 +734,14 @@ end


function _M.dump_data()
return {config = local_conf.discovery.consul, services = all_services }
if not all_services then
sync_all_services_from_shm(false)
end

local discovery_conf = local_conf.discovery
local consul_conf = discovery_conf and discovery_conf.consul

return {config = consul_conf, services = all_services }
end


Expand Down
1 change: 1 addition & 0 deletions apisix/discovery/consul/schema.lua
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ return {
},
token = {type = "string", default = ""},
fetch_interval = {type = "integer", minimum = 1, default = 3},
shared_dict_poll_interval = {type = "integer", minimum = 1, default = 1},
keepalive = {
type = "boolean",
default = true
Expand Down
1 change: 1 addition & 0 deletions conf/config.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,7 @@ nginx_config: # Config for render the template to generate n
# weight: 1 # Default 1
# fetch_interval: 3 # Default 3s. Effective only when keepalive is false.
# keepalive: true # Default to true. Use long pull to query Consul.
# shared_dict_poll_interval: 1 # Default 1s. Interval for workers to sync services from shared dict.
# default_service: # Define the default service to route traffic to.
# host: "127.0.0.1"
# port: 20999
Expand Down
3 changes: 3 additions & 0 deletions docs/en/latest/discovery/consul.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ discovery:
weight: 1 # default 1
fetch_interval: 5 # default 3 sec, only take effect for keepalive: false way
keepalive: true # default true, use the long pull way to query consul servers
shared_dict_poll_interval: 1 # optional, default 1 sec, workers poll the shared dict at this interval
sort_type: "origin" # default origin
default_service: # you can define default service when missing hit
host: "127.0.0.1"
Expand Down Expand Up @@ -80,6 +81,8 @@ The `sort_type` has four optional values:
- `host_sort`, sort by host
- `port_sort`, sort by port
- `combine_sort`, with the precondition that hosts are ordered, ports are also ordered.
- `shared_dict_poll_interval` controls how frequently (in seconds) non-sync workers refresh service
data from the shared dict. The default value is `1` and the value must be greater than `0`.

#### Dump Data

Expand Down
1 change: 1 addition & 0 deletions t/APISIX.pm
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ lua {
lua_shared_dict standalone-config 10m;
lua_shared_dict status-report 1m;
lua_shared_dict nacos 10m;
lua_shared_dict consul 10m;
}
_EOC_
}
Expand Down
Loading