rw-splitting + ro-balance
Posted by: sharif uddin
Date: July 12, 2011 05:15AM
Date: July 12, 2011 05:15AM
Hi,
I have managed to combine these to and it was working for a little while until i changed something i cannot remember or it just stopped. currently the read function is not picking up any backends or anything from the pool which i cannot understand why.
script is as follows:
require("proxy.auto-config")
local commands = require("proxy.commands")
if not proxy.global.config.ro_balance then
proxy.global.config.ro_balance = {
max_seconds_lag = 1, -- 10 seconds
max_bytes_lag = 1 * 1024, -- 10k
check_timeout = 1,
min_recheck_time = 20, -- 60 seconds
is_debug = true
}
end
if not proxy.global.lag then
proxy.global.lag = { }
end
local config = proxy.global.config.ro_balance
local backend_lag = proxy.global.lag
-- connection pool
if not proxy.global.config.rwsplit then
proxy.global.config.rwsplit = {
min_idle_connections = 0,
max_idle_connections = 8,
is_debug = true
}
end
local tokenizer = require("proxy.tokenizer")
local lb = require("proxy.balance")
local auto_config = require("proxy.auto-config")
-- read/write splitting sends all non-transactional SELECTs to the slaves
--
-- is_in_transaction tracks the state of the transactions
local is_in_transaction = false
-- if this was a SELECT SQL_CALC_FOUND_ROWS ... stay on the same connections
local is_in_select_calc_found_rows = false
local file = assert(io.open("/var/log/mysql_proxy.log", "a"))
---
-- pick a backend server
--
-- we take the lag into account
-- * ignore backends which aren't configured
-- * ignore backends which don't have a io-thread running
-- * ignore backends that are too far behind
function connect_server()
local fallback_ndx
local slave_ndx
local unknown_slave_ndx
local slave_bytes_lag
local now = os.time()
if is_debug then
file:write("\n")
file:write("[connect_server] " .. proxy.connection.client.src.name .. "\n")
end
local rw_ndx = 0
for b_ndx = 1, #proxy.global.backends do
local backend = proxy.global.backends[b_ndx]
local pool = backend.pool -- we don't have a username yet, try to find a connections which is idling
local cur_idle = pool.users[""].cur_idle_connections
pool.min_idle_connections = proxy.global.config.rwsplit.min_idle_connections
pool.max_idle_connections = proxy.global.config.rwsplit.max_idle_connections
if is_debug then
file:write(" [".. b_ndx .."].connected_clients = " .. backend.connected_clients .. "\n")
file:write(" [".. b_ndx .."].pool.cur_idle = " .. cur_idle .. "\n")
file:write(" [".. b_ndx .."].pool.max_idle = " .. pool.max_idle_connections .. "\n")
file:write(" [".. b_ndx .."].pool.min_idle = " .. pool.min_idle_connections .. "\n")
file:write(" [".. b_ndx .."].type = " .. backend.type .. "\n")
file:write(" [".. b_ndx .."].state = " .. backend.state .. "\n")
end
if backend.state ~= proxy.BACKEND_STATE_DOWN then
-- if the backend is not flagged as running and the recheck timeout expired, treat backend state as unknown
if backend_lag[backend.dst.name] and backend_lag[backend.dst.name].state ~= "running"
and now - backend_lag[backend.dst.name].check_ts > config.min_recheck_time then
file:write("backend " .. backend.dst.name .. " in state " .. backend_lag[backend.dst.name].state .. " and timeout expired, rechecking.\n")
backend_lag[backend.dst.name] = nil
end
file:write("current idle: " .. cur_idle .. "\n")
file:write("pool min: " .. pool.min_idle_connections .. "\n")
if backend.type == proxy.BACKEND_TYPE_RW and
cur_idle < pool.min_idle_connections then
-- the fallback
fallback_ndx = b_ndx
proxy.connection.backend_ndx = b_ndx
if is_debug then
file:write("setting fallback to be " .. backend.dst.name .. " # " .. b_ndx .. "\n")
file:write("proxy.connection.backend_ndx: " .. proxy.connection.backend_ndx .. "\n")
end
-- rw_ndx = b_ndx
break
-- elseif backend.type == proxy.BACKEND_TYPE_RW and
-- rw_ndx == 0 then
--file:write("2nd\n")
-- rw_ndx = b_ndx
else
file:write("3rd\n")
-- connect to the backends first we don't know yet
if not backend_lag[backend.dst.name] then
if is_debug then
file:write("setting unknown slave to " .. backend.dst.name .. " # " .. b_ndx .. "\n")
end
unknown_slave_ndx = b_ndx
proxy.connection.backend_ndx = b_ndx
file:write("unknown_slave_ndx" .. unknown_slave_ndx .. "\n")
elseif backend_lag[backend.dst.name].state == "running" then
if not slave_bytes_lag then
if is_debug then
file:write("setting known slave to " .. backend.dst.name .. " # " .. b_ndx .. "\n")
end
slave_ndx = b_ndx
proxy.connection.backend_ndx = b_ndx
-- rw_ndx = b_ndx
slave_bytes_lag = backend_lag[backend.dst.name].slave_bytes_lag
elseif backend_lag[backend.dst.name].slave_bytes_lag < slave_bytes_lag then
file:write("setting known slave to " .. backend.dst.name .. " # " .. b_ndx .. "\n")
slave_ndx = b_ndx
-- rw_ndx = b_ndx
proxy.connection.backend_ndx = b_ndx
slave_bytes_lag = backend_lag[backend.dst.name].slave_bytes_lag
end
end
end
else
if not backend_lag[backend.dst.name] then
backend_lag[backend.dst.name].state = "down"
backend_lag[backend.dst.name].check_ts = now
file:write("backend " .. backend.dst.name .. " detected DOWN, setting status and timestamp.\n")
elseif now - backend_lag[backend.dst.name].check_ts > config.min_recheck_time then
file:write("backend " .. backend.dst.name .. " DOWN and timeout expired, rechecking.\n")
recheck_slave_ndx = b_ndx
proxy.connection.backend_ndx = b_ndx
end
end
end
-- proxy.connection.backend_ndx = recheck_slave_ndx or unknown_slave_ndx or slave_ndx
if proxy.connection.backend_ndx == 0 then
if is_debug then
file:write(" [" .. rw_ndx .. "] taking master as default\n")
end
proxy.connection.backend_ndx = rw_ndx
else
proxy.connection.backend_ndx = recheck_slave_ndx or unknown_slave_ndx or slave_ndx or fallback_ndx
file:write("not taking master: " .. proxy.connection.backend_ndx .. "\n")
end
if proxy.connection.server then
if is_debug then
file:write(" using pooled connection from: " .. proxy.connection.backend_ndx .. "\n")
end
-- stay with it
return proxy.PROXY_IGNORE_RESULT
end
if is_debug then
file:write(" [" .. proxy.connection.backend_ndx .. "] idle-conns below min-idle\n")
file:write("(connect-server) There are " .. #proxy.global.backends .. " backends.\n")
file:write("(connect-server) using backend: " .. proxy.connection.backend_ndx .. ".\n")
file:write("(connect-server) backend: " .. proxy.global.backends[proxy.connection.backend_ndx].dst.name .. "\n")
end
end
---
-- put the successfully authed connection into the connection pool
--
-- @param auth the context information for the auth
--
-- auth.packet is the packet
function read_auth_result( auth )
-- if is_debug then
file:write("[read_auth_result] " .. proxy.connection.client.src.name .. "\n")
-- end
if auth.packet:byte() == proxy.MYSQLD_PACKET_OK then
-- auth was fine, disconnect from the server
proxy.connection.backend_ndx = 0
file:write("(read_auth_result) ... conection added\n");
elseif auth.packet:byte() == proxy.MYSQLD_PACKET_EOF then
-- we received either a
--
-- * MYSQLD_PACKET_ERR and the auth failed or
-- * MYSQLD_PACKET_EOF which means a OLD PASSWORD (4.0) was sent
file:write("(read_auth_result) ... not ok yet\n");
elseif auth.packet:byte() == proxy.MYSQLD_PACKET_ERR then
-- auth failed
file:write("(read_auth_result) ... auth failed\n");
end
end
function read_query(packet)
local cmd = commands.parse(packet)
local ret = proxy.global.config:handle(cmd)
if ret then return ret end
local c = proxy.connection.client
local tokens
local norm_query
-- print("[read_query] " .. proxy.connection.client.src.name)
file:write(" current backend = " .. proxy.connection.backend_ndx .. "\n")
if cmd.type == proxy.COM_QUIT then
-- don't send COM_QUIT to the backend. We manage the connection
-- in all aspects.
proxy.response = {
type = proxy.MYSQLD_PACKET_OK,
}
return proxy.PROXY_SEND_RESULT
end
-- read/write splitting
--
-- send all non-transactional SELECTs to a slave
if not is_in_transaction and
cmd.type == proxy.COM_QUERY then
file:write("found select" .. backend_ndx .. "\n")
tokens = tokens or assert(tokenizer.tokenize(cmd.query))
local stmt = tokenizer.first_stmt_token(tokens)
if stmt.token_name == "TK_SQL_SELECT" then
is_in_select_calc_found_rows = false
local is_insert_id = false
for i = 1, #tokens do
local token = tokens
-- SQL_CALC_FOUND_ROWS + FOUND_ROWS() have to be executed
-- on the same connection
-- print("token: " .. token.token_name)
-- print(" val: " .. token.text)
if not is_in_select_calc_found_rows and (token.token_name == "TK_SQL_SQL_CALC_FOUND_ROWS" or (token.token_name == "TK_FUNCTION" and token.text:upper() == "FOUND_ROWS")) then
is_in_select_calc_found_rows = true
elseif not is_insert_id and token.token_name == "TK_FUNCTION" and token.text:upper() == "LAST_INSERT_ID" then
is_insert_id = true
elseif not is_insert_id and token.token_name == "TK_LITERAL" then
local utext = token.text:upper()
if utext == "@@INSERT_ID" then
is_insert_id = true
end
end
-- we found the two special token, we can't find more
if is_insert_id and is_in_select_calc_found_rows then
break
end
end
-- if we ask for the last-insert-id we have to ask it on the original
-- connection
if is_insert_id then
file:write(" found a SELECT LAST_INSERT_ID(), going to master\n")
elseif is_in_select_calc_found_rows then
file:write(" need to calculate the rows, going to master\n")
else -- neither of these two, pick an idle slave
local backend_ndx = lb.idle_ro()
if backend_ndx > 0 then
-- proxy.connection.backend_ndx = backend_ndx
end
end
end
end
file:write("failsafe: " .. lb.idle_failsafe_rw() .. "\n")
-- no backend selected yet, pick a master
if proxy.connection.backend_ndx == 0 then
file:write("we don't have a backend right now, let's pick a master as a good default\n")
proxy.connection.backend_ndx = lb.idle_failsafe_rw()
end
-- by now we should have a backend
--
-- in case the master is down, we have to close the client connections
-- otherwise we can go on
if proxy.connection.backend_ndx == 0 then
file:write("I have no server backend, closing connection\n")
return proxy.PROXY_SEND_QUERY
end
file:write(proxy.global.backends[proxy.connection.backend_ndx].dst.name .. "\n")
--file:write(proxy.connection.backend_ndx .. "\n")
--
-- check the backend periodicly for its lag
--
-- translate the backend_ndx into its address
local backend_addr = proxy.global.backends[proxy.connection.backend_ndx].dst.name
backend_lag[backend_addr] = backend_lag[backend_addr] or {
state = "unchecked"
}
local now = os.time()
if config.is_debug then
-- file:write("(read_query) we are on backend: ".. backend_addr .. " in state: " .. backend_lag[backend_addr].state .. "\n")
end
if backend_lag[backend_addr].state == "unchecked" or
(backend_lag[backend_addr].state == "running" and
now - backend_lag[backend_addr].check_ts > config.check_timeout) then
if config.is_debug then
file:write("(read-query) unchecked, injecting a SHOW SLAVE STATUS\n")
end
proxy.queries:append(2, string.char(proxy.COM_QUERY) .. "SHOW SLAVE STATUS", { resultset_is_needed = true })
proxy.queries:append(1, packet, { resultset_is_needed = true })
return proxy.PROXY_SEND_QUERY
elseif proxy.global.backends[proxy.connection.backend_ndx].type == proxy.BACKEND_TYPE_RW or -- master
backend_lag[backend_addr].state == "running" then -- good slave
-- pass through
return
else
-- looks like this is a bad backend
-- let's get the client to connect to another backend
--
-- ... by closing the connection
proxy.response = {
type = proxy.MYSQLD_PACKET_ERR,
errmsg = "slave-state is " .. backend_lag[backend_addr].state,
sqlstate = "08S01"
}
proxy.queries:reset()
-- and now we have to tell the proxy to close the connection
--
proxy.connection.connection_close = true
return proxy.PROXY_SEND_RESULT
end
end
function read_query_result(inj)
local res
local fields
local show_slave_status = {}
-- pass through the client query
if inj.id == 1 then
res = assert(inj.resultset)
flags = res.flags
is_in_transaction = flags.in_trans
local have_last_insert_id = (res.insert_id and (res.insert_id > 0))
if not is_in_transaction and
not is_in_select_calc_found_rows and
not have_last_insert_id then
-- release the backend
proxy.connection.backend_ndx = 0
end
-- return
elseif inj.id == 2 then
res = inj.resultset
fields = res.fields
-- turn the resultset into local hash
for row in res.rows do
for field_id, field in pairs(row) do
show_slave_status[fields[field_id].name] = tostring(field)
if config.is_debug then
file:write( ("[%d] '%s' = '%s'"):format(field_id, fields[field_id].name, tostring(field)) .. "\n")
end
end
end
---
-- parse the SHOW SLAVE STATUS
--
-- what can happen ?
-- * no permissions (ERR)
-- * if not slave, result is empty
file:write("inj: " .. inj.id .. "\n")
local backend_addr = proxy.global.backends[proxy.connection.backend_ndx].dst.name
backend_lag[backend_addr].check_ts = os.time()
backend_lag[backend_addr].state = nil
if not show_slave_status["Master_Host"] then
-- this backend is not a slave
backend_lag[backend_addr].state = "noslave"
return proxy.PROXY_IGNORE_RESULT
end
if show_slave_status["Master_Log_File"] == show_slave_status["Relay_Master_Log_File"] then
-- ok, we use the same relay-log for reading and writing
backend_lag[backend_addr].slave_bytes_lag = tonumber(show_slave_status["Exec_Master_Log_Pos"]) - tonumber(show_slave_status["Read_Master_Log_Pos"])
else
backend_lag[backend_addr].slave_bytes_lag = nil
end
if show_slave_status["Seconds_Behind_Master"] then
backend_lag[backend_addr].seconds_lag = tonumber(show_slave_status["Seconds_Behind_Master"])
else
backend_lag[backend_addr].seconds_lag = nil
end
if show_slave_status["Slave_IO_Running"] == "No" then
backend_lag[backend_addr].state = "noiothread"
end
backend_lag[backend_addr].state = backend_lag[backend_addr].state or "running"
return proxy.PROXY_IGNORE_RESULT
end
end
---
-- close the connections if we have enough connections in the pool
--
-- @return nil - close connection
-- IGNORE_RESULT - store connection in the pool
function disconnect_client()
file:write("[disconnect_client] " .. proxy.connection.client.src.name .. "\n")
-- make sure we are disconnection from the connection
-- to move the connection into the pool
proxy.connection.backend_ndx = 0
end
I have managed to combine these to and it was working for a little while until i changed something i cannot remember or it just stopped. currently the read function is not picking up any backends or anything from the pool which i cannot understand why.
script is as follows:
require("proxy.auto-config")
local commands = require("proxy.commands")
if not proxy.global.config.ro_balance then
proxy.global.config.ro_balance = {
max_seconds_lag = 1, -- 10 seconds
max_bytes_lag = 1 * 1024, -- 10k
check_timeout = 1,
min_recheck_time = 20, -- 60 seconds
is_debug = true
}
end
if not proxy.global.lag then
proxy.global.lag = { }
end
local config = proxy.global.config.ro_balance
local backend_lag = proxy.global.lag
-- connection pool
if not proxy.global.config.rwsplit then
proxy.global.config.rwsplit = {
min_idle_connections = 0,
max_idle_connections = 8,
is_debug = true
}
end
local tokenizer = require("proxy.tokenizer")
local lb = require("proxy.balance")
local auto_config = require("proxy.auto-config")
-- read/write splitting sends all non-transactional SELECTs to the slaves
--
-- is_in_transaction tracks the state of the transactions
local is_in_transaction = false
-- if this was a SELECT SQL_CALC_FOUND_ROWS ... stay on the same connections
local is_in_select_calc_found_rows = false
local file = assert(io.open("/var/log/mysql_proxy.log", "a"))
---
-- pick a backend server
--
-- we take the lag into account
-- * ignore backends which aren't configured
-- * ignore backends which don't have a io-thread running
-- * ignore backends that are too far behind
function connect_server()
local fallback_ndx
local slave_ndx
local unknown_slave_ndx
local slave_bytes_lag
local now = os.time()
if is_debug then
file:write("\n")
file:write("[connect_server] " .. proxy.connection.client.src.name .. "\n")
end
local rw_ndx = 0
for b_ndx = 1, #proxy.global.backends do
local backend = proxy.global.backends[b_ndx]
local pool = backend.pool -- we don't have a username yet, try to find a connections which is idling
local cur_idle = pool.users[""].cur_idle_connections
pool.min_idle_connections = proxy.global.config.rwsplit.min_idle_connections
pool.max_idle_connections = proxy.global.config.rwsplit.max_idle_connections
if is_debug then
file:write(" [".. b_ndx .."].connected_clients = " .. backend.connected_clients .. "\n")
file:write(" [".. b_ndx .."].pool.cur_idle = " .. cur_idle .. "\n")
file:write(" [".. b_ndx .."].pool.max_idle = " .. pool.max_idle_connections .. "\n")
file:write(" [".. b_ndx .."].pool.min_idle = " .. pool.min_idle_connections .. "\n")
file:write(" [".. b_ndx .."].type = " .. backend.type .. "\n")
file:write(" [".. b_ndx .."].state = " .. backend.state .. "\n")
end
if backend.state ~= proxy.BACKEND_STATE_DOWN then
-- if the backend is not flagged as running and the recheck timeout expired, treat backend state as unknown
if backend_lag[backend.dst.name] and backend_lag[backend.dst.name].state ~= "running"
and now - backend_lag[backend.dst.name].check_ts > config.min_recheck_time then
file:write("backend " .. backend.dst.name .. " in state " .. backend_lag[backend.dst.name].state .. " and timeout expired, rechecking.\n")
backend_lag[backend.dst.name] = nil
end
file:write("current idle: " .. cur_idle .. "\n")
file:write("pool min: " .. pool.min_idle_connections .. "\n")
if backend.type == proxy.BACKEND_TYPE_RW and
cur_idle < pool.min_idle_connections then
-- the fallback
fallback_ndx = b_ndx
proxy.connection.backend_ndx = b_ndx
if is_debug then
file:write("setting fallback to be " .. backend.dst.name .. " # " .. b_ndx .. "\n")
file:write("proxy.connection.backend_ndx: " .. proxy.connection.backend_ndx .. "\n")
end
-- rw_ndx = b_ndx
break
-- elseif backend.type == proxy.BACKEND_TYPE_RW and
-- rw_ndx == 0 then
--file:write("2nd\n")
-- rw_ndx = b_ndx
else
file:write("3rd\n")
-- connect to the backends first we don't know yet
if not backend_lag[backend.dst.name] then
if is_debug then
file:write("setting unknown slave to " .. backend.dst.name .. " # " .. b_ndx .. "\n")
end
unknown_slave_ndx = b_ndx
proxy.connection.backend_ndx = b_ndx
file:write("unknown_slave_ndx" .. unknown_slave_ndx .. "\n")
elseif backend_lag[backend.dst.name].state == "running" then
if not slave_bytes_lag then
if is_debug then
file:write("setting known slave to " .. backend.dst.name .. " # " .. b_ndx .. "\n")
end
slave_ndx = b_ndx
proxy.connection.backend_ndx = b_ndx
-- rw_ndx = b_ndx
slave_bytes_lag = backend_lag[backend.dst.name].slave_bytes_lag
elseif backend_lag[backend.dst.name].slave_bytes_lag < slave_bytes_lag then
file:write("setting known slave to " .. backend.dst.name .. " # " .. b_ndx .. "\n")
slave_ndx = b_ndx
-- rw_ndx = b_ndx
proxy.connection.backend_ndx = b_ndx
slave_bytes_lag = backend_lag[backend.dst.name].slave_bytes_lag
end
end
end
else
if not backend_lag[backend.dst.name] then
backend_lag[backend.dst.name].state = "down"
backend_lag[backend.dst.name].check_ts = now
file:write("backend " .. backend.dst.name .. " detected DOWN, setting status and timestamp.\n")
elseif now - backend_lag[backend.dst.name].check_ts > config.min_recheck_time then
file:write("backend " .. backend.dst.name .. " DOWN and timeout expired, rechecking.\n")
recheck_slave_ndx = b_ndx
proxy.connection.backend_ndx = b_ndx
end
end
end
-- proxy.connection.backend_ndx = recheck_slave_ndx or unknown_slave_ndx or slave_ndx
if proxy.connection.backend_ndx == 0 then
if is_debug then
file:write(" [" .. rw_ndx .. "] taking master as default\n")
end
proxy.connection.backend_ndx = rw_ndx
else
proxy.connection.backend_ndx = recheck_slave_ndx or unknown_slave_ndx or slave_ndx or fallback_ndx
file:write("not taking master: " .. proxy.connection.backend_ndx .. "\n")
end
if proxy.connection.server then
if is_debug then
file:write(" using pooled connection from: " .. proxy.connection.backend_ndx .. "\n")
end
-- stay with it
return proxy.PROXY_IGNORE_RESULT
end
if is_debug then
file:write(" [" .. proxy.connection.backend_ndx .. "] idle-conns below min-idle\n")
file:write("(connect-server) There are " .. #proxy.global.backends .. " backends.\n")
file:write("(connect-server) using backend: " .. proxy.connection.backend_ndx .. ".\n")
file:write("(connect-server) backend: " .. proxy.global.backends[proxy.connection.backend_ndx].dst.name .. "\n")
end
end
---
-- put the successfully authed connection into the connection pool
--
-- @param auth the context information for the auth
--
-- auth.packet is the packet
function read_auth_result( auth )
-- if is_debug then
file:write("[read_auth_result] " .. proxy.connection.client.src.name .. "\n")
-- end
if auth.packet:byte() == proxy.MYSQLD_PACKET_OK then
-- auth was fine, disconnect from the server
proxy.connection.backend_ndx = 0
file:write("(read_auth_result) ... conection added\n");
elseif auth.packet:byte() == proxy.MYSQLD_PACKET_EOF then
-- we received either a
--
-- * MYSQLD_PACKET_ERR and the auth failed or
-- * MYSQLD_PACKET_EOF which means a OLD PASSWORD (4.0) was sent
file:write("(read_auth_result) ... not ok yet\n");
elseif auth.packet:byte() == proxy.MYSQLD_PACKET_ERR then
-- auth failed
file:write("(read_auth_result) ... auth failed\n");
end
end
function read_query(packet)
local cmd = commands.parse(packet)
local ret = proxy.global.config:handle(cmd)
if ret then return ret end
local c = proxy.connection.client
local tokens
local norm_query
-- print("[read_query] " .. proxy.connection.client.src.name)
file:write(" current backend = " .. proxy.connection.backend_ndx .. "\n")
if cmd.type == proxy.COM_QUIT then
-- don't send COM_QUIT to the backend. We manage the connection
-- in all aspects.
proxy.response = {
type = proxy.MYSQLD_PACKET_OK,
}
return proxy.PROXY_SEND_RESULT
end
-- read/write splitting
--
-- send all non-transactional SELECTs to a slave
if not is_in_transaction and
cmd.type == proxy.COM_QUERY then
file:write("found select" .. backend_ndx .. "\n")
tokens = tokens or assert(tokenizer.tokenize(cmd.query))
local stmt = tokenizer.first_stmt_token(tokens)
if stmt.token_name == "TK_SQL_SELECT" then
is_in_select_calc_found_rows = false
local is_insert_id = false
for i = 1, #tokens do
local token = tokens
-- SQL_CALC_FOUND_ROWS + FOUND_ROWS() have to be executed
-- on the same connection
-- print("token: " .. token.token_name)
-- print(" val: " .. token.text)
if not is_in_select_calc_found_rows and (token.token_name == "TK_SQL_SQL_CALC_FOUND_ROWS" or (token.token_name == "TK_FUNCTION" and token.text:upper() == "FOUND_ROWS")) then
is_in_select_calc_found_rows = true
elseif not is_insert_id and token.token_name == "TK_FUNCTION" and token.text:upper() == "LAST_INSERT_ID" then
is_insert_id = true
elseif not is_insert_id and token.token_name == "TK_LITERAL" then
local utext = token.text:upper()
if utext == "@@INSERT_ID" then
is_insert_id = true
end
end
-- we found the two special token, we can't find more
if is_insert_id and is_in_select_calc_found_rows then
break
end
end
-- if we ask for the last-insert-id we have to ask it on the original
-- connection
if is_insert_id then
file:write(" found a SELECT LAST_INSERT_ID(), going to master\n")
elseif is_in_select_calc_found_rows then
file:write(" need to calculate the rows, going to master\n")
else -- neither of these two, pick an idle slave
local backend_ndx = lb.idle_ro()
if backend_ndx > 0 then
-- proxy.connection.backend_ndx = backend_ndx
end
end
end
end
file:write("failsafe: " .. lb.idle_failsafe_rw() .. "\n")
-- no backend selected yet, pick a master
if proxy.connection.backend_ndx == 0 then
file:write("we don't have a backend right now, let's pick a master as a good default\n")
proxy.connection.backend_ndx = lb.idle_failsafe_rw()
end
-- by now we should have a backend
--
-- in case the master is down, we have to close the client connections
-- otherwise we can go on
if proxy.connection.backend_ndx == 0 then
file:write("I have no server backend, closing connection\n")
return proxy.PROXY_SEND_QUERY
end
file:write(proxy.global.backends[proxy.connection.backend_ndx].dst.name .. "\n")
--file:write(proxy.connection.backend_ndx .. "\n")
--
-- check the backend periodicly for its lag
--
-- translate the backend_ndx into its address
local backend_addr = proxy.global.backends[proxy.connection.backend_ndx].dst.name
backend_lag[backend_addr] = backend_lag[backend_addr] or {
state = "unchecked"
}
local now = os.time()
if config.is_debug then
-- file:write("(read_query) we are on backend: ".. backend_addr .. " in state: " .. backend_lag[backend_addr].state .. "\n")
end
if backend_lag[backend_addr].state == "unchecked" or
(backend_lag[backend_addr].state == "running" and
now - backend_lag[backend_addr].check_ts > config.check_timeout) then
if config.is_debug then
file:write("(read-query) unchecked, injecting a SHOW SLAVE STATUS\n")
end
proxy.queries:append(2, string.char(proxy.COM_QUERY) .. "SHOW SLAVE STATUS", { resultset_is_needed = true })
proxy.queries:append(1, packet, { resultset_is_needed = true })
return proxy.PROXY_SEND_QUERY
elseif proxy.global.backends[proxy.connection.backend_ndx].type == proxy.BACKEND_TYPE_RW or -- master
backend_lag[backend_addr].state == "running" then -- good slave
-- pass through
return
else
-- looks like this is a bad backend
-- let's get the client to connect to another backend
--
-- ... by closing the connection
proxy.response = {
type = proxy.MYSQLD_PACKET_ERR,
errmsg = "slave-state is " .. backend_lag[backend_addr].state,
sqlstate = "08S01"
}
proxy.queries:reset()
-- and now we have to tell the proxy to close the connection
--
proxy.connection.connection_close = true
return proxy.PROXY_SEND_RESULT
end
end
function read_query_result(inj)
local res
local fields
local show_slave_status = {}
-- pass through the client query
if inj.id == 1 then
res = assert(inj.resultset)
flags = res.flags
is_in_transaction = flags.in_trans
local have_last_insert_id = (res.insert_id and (res.insert_id > 0))
if not is_in_transaction and
not is_in_select_calc_found_rows and
not have_last_insert_id then
-- release the backend
proxy.connection.backend_ndx = 0
end
-- return
elseif inj.id == 2 then
res = inj.resultset
fields = res.fields
-- turn the resultset into local hash
for row in res.rows do
for field_id, field in pairs(row) do
show_slave_status[fields[field_id].name] = tostring(field)
if config.is_debug then
file:write( ("[%d] '%s' = '%s'"):format(field_id, fields[field_id].name, tostring(field)) .. "\n")
end
end
end
---
-- parse the SHOW SLAVE STATUS
--
-- what can happen ?
-- * no permissions (ERR)
-- * if not slave, result is empty
file:write("inj: " .. inj.id .. "\n")
local backend_addr = proxy.global.backends[proxy.connection.backend_ndx].dst.name
backend_lag[backend_addr].check_ts = os.time()
backend_lag[backend_addr].state = nil
if not show_slave_status["Master_Host"] then
-- this backend is not a slave
backend_lag[backend_addr].state = "noslave"
return proxy.PROXY_IGNORE_RESULT
end
if show_slave_status["Master_Log_File"] == show_slave_status["Relay_Master_Log_File"] then
-- ok, we use the same relay-log for reading and writing
backend_lag[backend_addr].slave_bytes_lag = tonumber(show_slave_status["Exec_Master_Log_Pos"]) - tonumber(show_slave_status["Read_Master_Log_Pos"])
else
backend_lag[backend_addr].slave_bytes_lag = nil
end
if show_slave_status["Seconds_Behind_Master"] then
backend_lag[backend_addr].seconds_lag = tonumber(show_slave_status["Seconds_Behind_Master"])
else
backend_lag[backend_addr].seconds_lag = nil
end
if show_slave_status["Slave_IO_Running"] == "No" then
backend_lag[backend_addr].state = "noiothread"
end
backend_lag[backend_addr].state = backend_lag[backend_addr].state or "running"
return proxy.PROXY_IGNORE_RESULT
end
end
---
-- close the connections if we have enough connections in the pool
--
-- @return nil - close connection
-- IGNORE_RESULT - store connection in the pool
function disconnect_client()
file:write("[disconnect_client] " .. proxy.connection.client.src.name .. "\n")
-- make sure we are disconnection from the connection
-- to move the connection into the pool
proxy.connection.backend_ndx = 0
end
Subject
Views
Written By
Posted
Sorry, you can't reply to this topic. It has been closed.
Content reproduced on this site is the property of the respective copyright holders. It is not reviewed in advance by Oracle and does not necessarily represent the opinion of Oracle or any other party.