rw-splitting + ro-balance
Posted by:
sharif uddin ()
Date: July 12, 2011 05:15AM
Hi,
I have managed to combine these to and it was working for a little while until i changed something i cannot remember or it just stopped. currently the read function is not picking up any backends or anything from the pool which i cannot understand why.
script is as follows:
require("proxy.auto-config")
local commands = require("proxy.commands")
if not proxy.global.config.ro_balance then
proxy.global.config.ro_balance = {
max_seconds_lag = 1, -- 10 seconds
max_bytes_lag = 1 * 1024, -- 10k
check_timeout = 1,
min_recheck_time = 20, -- 60 seconds
is_debug = true
}
end
if not proxy.global.lag then
proxy.global.lag = { }
end
local config = proxy.global.config.ro_balance
local backend_lag = proxy.global.lag
-- connection pool
if not proxy.global.config.rwsplit then
proxy.global.config.rwsplit = {
min_idle_connections = 0,
max_idle_connections = 8,
is_debug = true
}
end
local tokenizer = require("proxy.tokenizer")
local lb = require("proxy.balance")
local auto_config = require("proxy.auto-config")
-- read/write splitting sends all non-transactional SELECTs to the slaves
--
-- is_in_transaction tracks the state of the transactions
local is_in_transaction = false
-- if this was a SELECT SQL_CALC_FOUND_ROWS ... stay on the same connections
local is_in_select_calc_found_rows = false
local file = assert(io.open("/var/log/mysql_proxy.log", "a"))
---
-- pick a backend server
--
-- we take the lag into account
-- * ignore backends which aren't configured
-- * ignore backends which don't have a io-thread running
-- * ignore backends that are too far behind
function connect_server()
local fallback_ndx
local slave_ndx
local unknown_slave_ndx
local slave_bytes_lag
local now = os.time()
if is_debug then
file:write("\n")
file:write("[connect_server] " .. proxy.connection.client.src.name .. "\n")
end
local rw_ndx = 0
for b_ndx = 1, #proxy.global.backends do
local backend = proxy.global.backends[b_ndx]
local pool = backend.pool -- we don't have a username yet, try to find a connections which is idling
local cur_idle = pool.users[""].cur_idle_connections
pool.min_idle_connections = proxy.global.config.rwsplit.min_idle_connections
pool.max_idle_connections = proxy.global.config.rwsplit.max_idle_connections
if is_debug then
file:write(" [".. b_ndx .."].connected_clients = " .. backend.connected_clients .. "\n")
file:write(" [".. b_ndx .."].pool.cur_idle = " .. cur_idle .. "\n")
file:write(" [".. b_ndx .."].pool.max_idle = " .. pool.max_idle_connections .. "\n")
file:write(" [".. b_ndx .."].pool.min_idle = " .. pool.min_idle_connections .. "\n")
file:write(" [".. b_ndx .."].type = " .. backend.type .. "\n")
file:write(" [".. b_ndx .."].state = " .. backend.state .. "\n")
end
if backend.state ~= proxy.BACKEND_STATE_DOWN then
-- if the backend is not flagged as running and the recheck timeout expired, treat backend state as unknown
if backend_lag[backend.dst.name] and backend_lag[backend.dst.name].state ~= "running"
and now - backend_lag[backend.dst.name].check_ts > config.min_recheck_time then
file:write("backend " .. backend.dst.name .. " in state " .. backend_lag[backend.dst.name].state .. " and timeout expired, rechecking.\n")
backend_lag[backend.dst.name] = nil
end
file:write("current idle: " .. cur_idle .. "\n")
file:write("pool min: " .. pool.min_idle_connections .. "\n")
if backend.type == proxy.BACKEND_TYPE_RW and
cur_idle < pool.min_idle_connections then
-- the fallback
fallback_ndx = b_ndx
proxy.connection.backend_ndx = b_ndx
if is_debug then
file:write("setting fallback to be " .. backend.dst.name .. " # " .. b_ndx .. "\n")
file:write("proxy.connection.backend_ndx: " .. proxy.connection.backend_ndx .. "\n")
end
-- rw_ndx = b_ndx
break
-- elseif backend.type == proxy.BACKEND_TYPE_RW and
-- rw_ndx == 0 then
--file:write("2nd\n")
-- rw_ndx = b_ndx
else
file:write("3rd\n")
-- connect to the backends first we don't know yet
if not backend_lag[backend.dst.name] then
if is_debug then
file:write("setting unknown slave to " .. backend.dst.name .. " # " .. b_ndx .. "\n")
end
unknown_slave_ndx = b_ndx
proxy.connection.backend_ndx = b_ndx
file:write("unknown_slave_ndx" .. unknown_slave_ndx .. "\n")
elseif backend_lag[backend.dst.name].state == "running" then
if not slave_bytes_lag then
if is_debug then
file:write("setting known slave to " .. backend.dst.name .. " # " .. b_ndx .. "\n")
end
slave_ndx = b_ndx
proxy.connection.backend_ndx = b_ndx
-- rw_ndx = b_ndx
slave_bytes_lag = backend_lag[backend.dst.name].slave_bytes_lag
elseif backend_lag[backend.dst.name].slave_bytes_lag < slave_bytes_lag then
file:write("setting known slave to " .. backend.dst.name .. " # " .. b_ndx .. "\n")
slave_ndx = b_ndx
-- rw_ndx = b_ndx
proxy.connection.backend_ndx = b_ndx
slave_bytes_lag = backend_lag[backend.dst.name].slave_bytes_lag
end
end
end
else
if not backend_lag[backend.dst.name] then
backend_lag[backend.dst.name].state = "down"
backend_lag[backend.dst.name].check_ts = now
file:write("backend " .. backend.dst.name .. " detected DOWN, setting status and timestamp.\n")
elseif now - backend_lag[backend.dst.name].check_ts > config.min_recheck_time then
file:write("backend " .. backend.dst.name .. " DOWN and timeout expired, rechecking.\n")
recheck_slave_ndx = b_ndx
proxy.connection.backend_ndx = b_ndx
end
end
end
-- proxy.connection.backend_ndx = recheck_slave_ndx or unknown_slave_ndx or slave_ndx
if proxy.connection.backend_ndx == 0 then
if is_debug then
file:write(" [" .. rw_ndx .. "] taking master as default\n")
end
proxy.connection.backend_ndx = rw_ndx
else
proxy.connection.backend_ndx = recheck_slave_ndx or unknown_slave_ndx or slave_ndx or fallback_ndx
file:write("not taking master: " .. proxy.connection.backend_ndx .. "\n")
end
if proxy.connection.server then
if is_debug then
file:write(" using pooled connection from: " .. proxy.connection.backend_ndx .. "\n")
end
-- stay with it
return proxy.PROXY_IGNORE_RESULT
end
if is_debug then
file:write(" [" .. proxy.connection.backend_ndx .. "] idle-conns below min-idle\n")
file:write("(connect-server) There are " .. #proxy.global.backends .. " backends.\n")
file:write("(connect-server) using backend: " .. proxy.connection.backend_ndx .. ".\n")
file:write("(connect-server) backend: " .. proxy.global.backends[proxy.connection.backend_ndx].dst.name .. "\n")
end
end
---
-- put the successfully authed connection into the connection pool
--
-- @param auth the context information for the auth
--
-- auth.packet is the packet
function read_auth_result( auth )
-- if is_debug then
file:write("[read_auth_result] " .. proxy.connection.client.src.name .. "\n")
-- end
if auth.packet:byte() == proxy.MYSQLD_PACKET_OK then
-- auth was fine, disconnect from the server
proxy.connection.backend_ndx = 0
file:write("(read_auth_result) ... conection added\n");
elseif auth.packet:byte() == proxy.MYSQLD_PACKET_EOF then
-- we received either a
--
-- * MYSQLD_PACKET_ERR and the auth failed or
-- * MYSQLD_PACKET_EOF which means a OLD PASSWORD (4.0) was sent
file:write("(read_auth_result) ... not ok yet\n");
elseif auth.packet:byte() == proxy.MYSQLD_PACKET_ERR then
-- auth failed
file:write("(read_auth_result) ... auth failed\n");
end
end
function read_query(packet)
local cmd = commands.parse(packet)
local ret = proxy.global.config:handle(cmd)
if ret then return ret end
local c = proxy.connection.client
local tokens
local norm_query
-- print("[read_query] " .. proxy.connection.client.src.name)
file:write(" current backend = " .. proxy.connection.backend_ndx .. "\n")
if cmd.type == proxy.COM_QUIT then
-- don't send COM_QUIT to the backend. We manage the connection
-- in all aspects.
proxy.response = {
type = proxy.MYSQLD_PACKET_OK,
}
return proxy.PROXY_SEND_RESULT
end
-- read/write splitting
--
-- send all non-transactional SELECTs to a slave
if not is_in_transaction and
cmd.type == proxy.COM_QUERY then
file:write("found select" .. backend_ndx .. "\n")
tokens = tokens or assert(tokenizer.tokenize(cmd.query))
local stmt = tokenizer.first_stmt_token(tokens)
if stmt.token_name == "TK_SQL_SELECT" then
is_in_select_calc_found_rows = false
local is_insert_id = false
for i = 1, #tokens do
local token = tokens
-- SQL_CALC_FOUND_ROWS + FOUND_ROWS() have to be executed
-- on the same connection
-- print("token: " .. token.token_name)
-- print(" val: " .. token.text)
if not is_in_select_calc_found_rows and (token.token_name == "TK_SQL_SQL_CALC_FOUND_ROWS" or (token.token_name == "TK_FUNCTION" and token.text:upper() == "FOUND_ROWS")) then
is_in_select_calc_found_rows = true
elseif not is_insert_id and token.token_name == "TK_FUNCTION" and token.text:upper() == "LAST_INSERT_ID" then
is_insert_id = true
elseif not is_insert_id and token.token_name == "TK_LITERAL" then
local utext = token.text:upper()
if utext == "@@INSERT_ID" then
is_insert_id = true
end
end
-- we found the two special token, we can't find more
if is_insert_id and is_in_select_calc_found_rows then
break
end
end
-- if we ask for the last-insert-id we have to ask it on the original
-- connection
if is_insert_id then
file:write(" found a SELECT LAST_INSERT_ID(), going to master\n")
elseif is_in_select_calc_found_rows then
file:write(" need to calculate the rows, going to master\n")
else -- neither of these two, pick an idle slave
local backend_ndx = lb.idle_ro()
if backend_ndx > 0 then
-- proxy.connection.backend_ndx = backend_ndx
end
end
end
end
file:write("failsafe: " .. lb.idle_failsafe_rw() .. "\n")
-- no backend selected yet, pick a master
if proxy.connection.backend_ndx == 0 then
file:write("we don't have a backend right now, let's pick a master as a good default\n")
proxy.connection.backend_ndx = lb.idle_failsafe_rw()
end
-- by now we should have a backend
--
-- in case the master is down, we have to close the client connections
-- otherwise we can go on
if proxy.connection.backend_ndx == 0 then
file:write("I have no server backend, closing connection\n")
return proxy.PROXY_SEND_QUERY
end
file:write(proxy.global.backends[proxy.connection.backend_ndx].dst.name .. "\n")
--file:write(proxy.connection.backend_ndx .. "\n")
--
-- check the backend periodicly for its lag
--
-- translate the backend_ndx into its address
local backend_addr = proxy.global.backends[proxy.connection.backend_ndx].dst.name
backend_lag[backend_addr] = backend_lag[backend_addr] or {
state = "unchecked"
}
local now = os.time()
if config.is_debug then
-- file:write("(read_query) we are on backend: ".. backend_addr .. " in state: " .. backend_lag[backend_addr].state .. "\n")
end
if backend_lag[backend_addr].state == "unchecked" or
(backend_lag[backend_addr].state == "running" and
now - backend_lag[backend_addr].check_ts > config.check_timeout) then
if config.is_debug then
file:write("(read-query) unchecked, injecting a SHOW SLAVE STATUS\n")
end
proxy.queries:append(2, string.char(proxy.COM_QUERY) .. "SHOW SLAVE STATUS", { resultset_is_needed = true })
proxy.queries:append(1, packet, { resultset_is_needed = true })
return proxy.PROXY_SEND_QUERY
elseif proxy.global.backends[proxy.connection.backend_ndx].type == proxy.BACKEND_TYPE_RW or -- master
backend_lag[backend_addr].state == "running" then -- good slave
-- pass through
return
else
-- looks like this is a bad backend
-- let's get the client to connect to another backend
--
-- ... by closing the connection
proxy.response = {
type = proxy.MYSQLD_PACKET_ERR,
errmsg = "slave-state is " .. backend_lag[backend_addr].state,
sqlstate = "08S01"
}
proxy.queries:reset()
-- and now we have to tell the proxy to close the connection
--
proxy.connection.connection_close = true
return proxy.PROXY_SEND_RESULT
end
end
function read_query_result(inj)
local res
local fields
local show_slave_status = {}
-- pass through the client query
if inj.id == 1 then
res = assert(inj.resultset)
flags = res.flags
is_in_transaction = flags.in_trans
local have_last_insert_id = (res.insert_id and (res.insert_id > 0))
if not is_in_transaction and
not is_in_select_calc_found_rows and
not have_last_insert_id then
-- release the backend
proxy.connection.backend_ndx = 0
end
-- return
elseif inj.id == 2 then
res = inj.resultset
fields = res.fields
-- turn the resultset into local hash
for row in res.rows do
for field_id, field in pairs(row) do
show_slave_status[fields[field_id].name] = tostring(field)
if config.is_debug then
file:write( ("[%d] '%s' = '%s'"):format(field_id, fields[field_id].name, tostring(field)) .. "\n")
end
end
end
---
-- parse the SHOW SLAVE STATUS
--
-- what can happen ?
-- * no permissions (ERR)
-- * if not slave, result is empty
file:write("inj: " .. inj.id .. "\n")
local backend_addr = proxy.global.backends[proxy.connection.backend_ndx].dst.name
backend_lag[backend_addr].check_ts = os.time()
backend_lag[backend_addr].state = nil
if not show_slave_status["Master_Host"] then
-- this backend is not a slave
backend_lag[backend_addr].state = "noslave"
return proxy.PROXY_IGNORE_RESULT
end
if show_slave_status["Master_Log_File"] == show_slave_status["Relay_Master_Log_File"] then
-- ok, we use the same relay-log for reading and writing
backend_lag[backend_addr].slave_bytes_lag = tonumber(show_slave_status["Exec_Master_Log_Pos"]) - tonumber(show_slave_status["Read_Master_Log_Pos"])
else
backend_lag[backend_addr].slave_bytes_lag = nil
end
if show_slave_status["Seconds_Behind_Master"] then
backend_lag[backend_addr].seconds_lag = tonumber(show_slave_status["Seconds_Behind_Master"])
else
backend_lag[backend_addr].seconds_lag = nil
end
if show_slave_status["Slave_IO_Running"] == "No" then
backend_lag[backend_addr].state = "noiothread"
end
backend_lag[backend_addr].state = backend_lag[backend_addr].state or "running"
return proxy.PROXY_IGNORE_RESULT
end
end
---
-- close the connections if we have enough connections in the pool
--
-- @return nil - close connection
-- IGNORE_RESULT - store connection in the pool
function disconnect_client()
file:write("[disconnect_client] " .. proxy.connection.client.src.name .. "\n")
-- make sure we are disconnection from the connection
-- to move the connection into the pool
proxy.connection.backend_ndx = 0
end