mirror of
https://github.com/bunkerity/bunkerweb
synced 2026-05-24 09:28:37 +00:00
refactoring of metrics, add timer phase on BW and add METRICS_MAX_BLOCKED_REQUESTS setting
This commit is contained in:
parent
a7a9055b8c
commit
659e7f4ae5
5 changed files with 155 additions and 31 deletions
|
|
@ -786,6 +786,7 @@ utils.get_phases = function()
|
|||
"preread",
|
||||
"log_stream",
|
||||
"log_default",
|
||||
"timer"
|
||||
}
|
||||
end
|
||||
|
||||
|
|
|
|||
|
|
@ -13,10 +13,15 @@ init_worker_by_lua_block {
|
|||
local ngx = ngx
|
||||
local INFO = ngx.INFO
|
||||
local ERR = ngx.ERR
|
||||
local WARN = ngx.WARN
|
||||
local NOTICE = ngx.NOTICE
|
||||
local worker = ngx.worker
|
||||
local worker_id = worker.id
|
||||
local timer_at = ngx.timer.at
|
||||
local require_plugin = helpers.require_plugin
|
||||
local new_plugin = helpers.new_plugin
|
||||
local call_plugin = helpers.call_plugin
|
||||
local tostring = tostring
|
||||
|
||||
-- Don't go further we are in loading state
|
||||
local is_loading, err = require "bunkerweb.utils".get_variable("IS_LOADING", false)
|
||||
|
|
@ -27,6 +32,71 @@ init_worker_by_lua_block {
|
|||
return
|
||||
end
|
||||
|
||||
-- Recurrent timer
|
||||
local recurrent_timer
|
||||
recurrent_timer = function(premature)
|
||||
|
||||
local worker_id = tostring(worker.id())
|
||||
local logger = require "bunkerweb.logger":new("TIMER-" .. worker_id)
|
||||
|
||||
-- Worker exiting
|
||||
if premature then
|
||||
logger:log(WARN, "worker is exiting, disabling timer")
|
||||
return
|
||||
end
|
||||
logger:log(INFO, "timer started")
|
||||
|
||||
-- Get plugins order
|
||||
local order, err = datastore:get("plugins_order", true)
|
||||
if not order then
|
||||
logger:log(ERR, "can't get plugins order from datastore : " .. err)
|
||||
return
|
||||
end
|
||||
|
||||
-- Call timer() methods
|
||||
logger:log(INFO, "calling timer() methods of plugins ...")
|
||||
for i, plugin_id in ipairs(order.timer) do
|
||||
-- Require call
|
||||
local plugin_lua, err = require_plugin(plugin_id)
|
||||
if plugin_lua == false then
|
||||
logger:log(ERR, err)
|
||||
elseif plugin_lua == nil then
|
||||
logger:log(INFO, err)
|
||||
else
|
||||
-- Check if plugin has timer method
|
||||
if plugin_lua.timer ~= nil then
|
||||
-- New call
|
||||
local ok, plugin_obj = new_plugin(plugin_lua)
|
||||
if not ok then
|
||||
logger:log(ERR, plugin_obj)
|
||||
else
|
||||
local ok, ret = call_plugin(plugin_obj, "timer")
|
||||
if not ok then
|
||||
logger:log(ERR, ret)
|
||||
elseif not ret.ret then
|
||||
logger:log(ERR, plugin_id .. ":timer() call failed : " .. ret.msg)
|
||||
else
|
||||
logger:log(INFO, plugin_id .. ":timer() call successful : " .. ret.msg)
|
||||
end
|
||||
end
|
||||
else
|
||||
logger:log(INFO, "skipped execution of " .. plugin_id .. " because method timer() is not defined")
|
||||
end
|
||||
end
|
||||
end
|
||||
logger:log(INFO, "called timer() methods of plugins")
|
||||
local hdl
|
||||
hdl, err = timer_at(5, recurrent_timer)
|
||||
if not hdl then
|
||||
logger:log(ERR, "can't create timer : " .. err)
|
||||
end
|
||||
end
|
||||
local hdl
|
||||
hdl, err = timer_at(0, recurrent_timer)
|
||||
if not hdl then
|
||||
logger:log(ERR, "can't create timer : " .. err)
|
||||
end
|
||||
|
||||
-- Instantiate lock
|
||||
local lock = require "resty.lock":new("worker_lock", { timeout = 10 })
|
||||
if not lock then
|
||||
|
|
|
|||
|
|
@ -3,23 +3,36 @@ local class = require "middleclass"
|
|||
local datastore = require "bunkerweb.datastore"
|
||||
local plugin = require "bunkerweb.plugin"
|
||||
local utils = require "bunkerweb.utils"
|
||||
local lrucache = require "resty.lrucache"
|
||||
|
||||
local metrics = class("metrics", plugin)
|
||||
|
||||
local lru, err_lru = lrucache.new(100000)
|
||||
if not lru then
|
||||
require "bunkerweb.logger":new("METRICS"):log(ERR, "failed to instantiate LRU cache : " .. err_lru)
|
||||
end
|
||||
|
||||
local ngx = ngx
|
||||
local shared = ngx.shared
|
||||
local subsystem = ngx.config.subsystem
|
||||
local ERR = ngx.ERR
|
||||
local HTTP_INTERNAL_SERVER_ERROR = ngx.HTTP_INTERNAL_SERVER_ERROR
|
||||
local HTTP_OK = ngx.HTTP_OK
|
||||
local worker = ngx.worker
|
||||
local worker_id = worker.id
|
||||
|
||||
local get_reason = utils.get_reason
|
||||
local get_country = utils.get_country
|
||||
local has_variable = utils.has_variable
|
||||
local encode = cjson.encode
|
||||
local decode = cjson.decode
|
||||
|
||||
local match = string.match
|
||||
local time = os.time
|
||||
local tonumber = tonumber
|
||||
local tostring = tostring
|
||||
local table_insert = table.insert
|
||||
local table_remove = table.remove
|
||||
|
||||
function metrics:initialize(ctx)
|
||||
-- Call parent initialize
|
||||
|
|
@ -33,9 +46,9 @@ function metrics:initialize(ctx)
|
|||
self.metrics_datastore = datastore:new(dict)
|
||||
end
|
||||
|
||||
function metrics:log()
|
||||
function metrics:log(bypass_checks)
|
||||
-- Don't go further if metrics is not enabled
|
||||
if self.variables["USE_METRICS"] == "no" then
|
||||
if not bypass_checks and self.variables["USE_METRICS"] == "no" then
|
||||
return self:ret(true, "metrics are disabled")
|
||||
end
|
||||
-- Store blocked requests
|
||||
|
|
@ -57,21 +70,60 @@ function metrics:log()
|
|||
method = self.ctx.bw.request_method,
|
||||
url = self.ctx.bw.request_uri,
|
||||
status = ngx.status,
|
||||
["user-agent"] = self.ctx.bw.http_user_agent or "",
|
||||
user_agent = self.ctx.bw.http_user_agent or "",
|
||||
reason = reason,
|
||||
data = data,
|
||||
}
|
||||
local ok
|
||||
ok, err = self.metrics_datastore:safe_rpush("metrics_requests", encode(request))
|
||||
if not ok then
|
||||
self.logger:log(ERR, "can't save request to datastore : " .. err)
|
||||
-- Get current requests
|
||||
local requests = lru:get("requests")
|
||||
if not requests then
|
||||
requests = {}
|
||||
end
|
||||
-- Add current request
|
||||
table_insert(requests, request)
|
||||
-- Remove old requests
|
||||
local nb_delete = #requests - tonumber(self.variables["METRICS_MAX_BLOCKED_REQUESTS"])
|
||||
while nb_delete > 0 do
|
||||
table_remove(requests, 1)
|
||||
nb_delete = nb_delete - 1
|
||||
end
|
||||
-- Update worker cache
|
||||
lru:set("requests", requests)
|
||||
end
|
||||
return self:ret(true, "success")
|
||||
end
|
||||
|
||||
function metrics:log_default()
|
||||
return self:log()
|
||||
local is_needed, err = has_variable("USE_METRICS", "yes")
|
||||
if is_needed == nil then
|
||||
return self:ret(false, "can't check USE_METRICS variable : " .. err)
|
||||
end
|
||||
if is_needed then
|
||||
return self:log(true)
|
||||
end
|
||||
return self:ret(true, "metrics not used")
|
||||
end
|
||||
|
||||
function metrics:timer()
|
||||
-- Check if metrics is used
|
||||
local is_needed, err = has_variable("USE_METRICS", "yes")
|
||||
if is_needed == nil then
|
||||
return self:ret(false, "can't check USE_METRICS variable : " .. err)
|
||||
end
|
||||
if not is_needed then
|
||||
return self:ret(true, "metrics not used")
|
||||
end
|
||||
-- Get worker requests
|
||||
local requests = lru:get("requests")
|
||||
if not requests then
|
||||
requests = {}
|
||||
end
|
||||
-- Push to dict
|
||||
local ok, err = self.metrics_datastore:set("requests_" .. tostring(worker_id()), encode(requests))
|
||||
if not ok then
|
||||
return self:ret(false, "can't update requests : " .. err)
|
||||
end
|
||||
return self:ret(true, "metrics updated")
|
||||
end
|
||||
|
||||
function metrics:api()
|
||||
|
|
@ -79,30 +131,21 @@ function metrics:api()
|
|||
if not match(self.ctx.bw.uri, "^/metrics/requests$") or self.ctx.bw.request_method ~= "GET" then
|
||||
return self:ret(false, "success")
|
||||
end
|
||||
|
||||
-- Get requests metrics
|
||||
local len, err = self.metrics_datastore:llen("metrics_requests")
|
||||
if not len then
|
||||
return self:ret(true, "error while getting length of metrics_requests : " .. err, HTTP_INTERNAL_SERVER_ERROR)
|
||||
end
|
||||
local i = 0
|
||||
local data = {}
|
||||
while i < len do
|
||||
local request
|
||||
request, err = self.metrics_datastore:lpop("metrics_requests")
|
||||
if request then
|
||||
table.insert(data, decode(request))
|
||||
else
|
||||
return self:ret(true, "error while getting metrics_requests : " .. err, HTTP_INTERNAL_SERVER_ERROR)
|
||||
local keys = self.metrics_datastore:keys()
|
||||
local requests = {}
|
||||
for _, key in ipairs(keys) do
|
||||
if key:match("^requests_") then
|
||||
local data, err = self.metrics_datastore:get(key)
|
||||
if not data then
|
||||
return self:ret(true, "error while fetching requests : " .. err, HTTP_INTERNAL_SERVER_ERROR)
|
||||
end
|
||||
for _, request in ipairs(decode(data)) do
|
||||
table_insert(requests, request)
|
||||
end
|
||||
end
|
||||
local ok
|
||||
ok, err = self.metrics_datastore:safe_rpush("metrics_requests", request)
|
||||
if not ok then
|
||||
self.logger:log(ERR, "can't save request to datastore : " .. err)
|
||||
end
|
||||
i = i + 1
|
||||
end
|
||||
return self:ret(true, data, HTTP_OK)
|
||||
return self:ret(true, requests, HTTP_OK)
|
||||
end
|
||||
|
||||
return metrics
|
||||
|
|
|
|||
|
|
@ -6,7 +6,7 @@
|
|||
"stream": "partial",
|
||||
"settings": {
|
||||
"USE_METRICS": {
|
||||
"context": "global",
|
||||
"context": "multisite",
|
||||
"default": "yes",
|
||||
"help": "Enable collection and retrieval of internal metrics.",
|
||||
"id": "use-metrics",
|
||||
|
|
@ -22,6 +22,15 @@
|
|||
"label": "Metrics memory size",
|
||||
"regex": "^\\d+[kKmMgG]?$",
|
||||
"type": "text"
|
||||
},
|
||||
"METRICS_MAX_BLOCKED_REQUESTS": {
|
||||
"context": "global",
|
||||
"default": "100",
|
||||
"help": "Maximum number of blocked requests to store (per worker).",
|
||||
"id": "metrics-max-blocked-requests",
|
||||
"label": "Metrics max blocked requests",
|
||||
"regex": "^\\d+$",
|
||||
"type": "text"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -42,5 +42,6 @@
|
|||
"reversescan"
|
||||
],
|
||||
"log_stream": ["badbehavior", "bunkernet"],
|
||||
"log_default": ["badbehavior", "bunkernet", "metrics"]
|
||||
"log_default": ["badbehavior", "bunkernet", "metrics"],
|
||||
"timer": ["metrics"]
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue