N-MySQL connections shared between M-threads
Posted by: Ruslan Laishev
Date: April 21, 2016 03:50AM

Hello All!

I have a some task where I'm need to implement a "DB connection pool" is shared between multiple threads.

A number of DB-connections is in range 10-20, and a number of threads is 10-100k.

So, in main () I made DB-connection pool with :


...

ASC stmts [] = {
{$ASCINI("SELECT uai_chkauth(?, ?, ?)")},
{$ASCINI("SELECT uai_getstream2(?, ?)")}, /* {$ASCINI("CALL uai_getstream(?, ?, ?)")}
*/
{$ASCINI("SELECT uai_signreq(?, ?)")},
{$ASCINI("SELECT uai_acctng(?, ?, ?, ?)")}, /* rc = (uid, cid, sid, <type>);
type = 1: Start, 2: Stop; */
};
#define QR_ENDOFLIST sizeof(stmts)/sizeof(stmts[0])

typedef struct _db_ctx
{
ENTRY entry; /**< Entry header */
MYSQL * mysql; /** MySQL session context */

MYSQL_STMT * stmt[QR_ENDOFLIST];/* An array of pointer to prepared SQL statments area */
} DB_CTX;

...


/**
* @brief db_init - Initialize a pool of DB-connections context pool
* @param user - DB user name
* @param pass - DB user's password
* @param addr - RDBMS host address
* @param port - RDBMS IP port number
* @param dbname- A DB name
* @param threads - a number of DB-connections to be open
* @return
*/
int __db_init (
char *user,
char *pass,
char *addr,
unsigned port,
char *dbname,
unsigned threads
)
{
unsigned status, on = 1, i, i2, opts = CLIENT_COMPRESS | CLIENT_SSL | CLIENT_REMEMBER_OPTIONS; // | CLIENT_MULTI_RESULTS;

$LOG(STS$K_INFO, "Client version: %s, thread safe = %d",
mysql_get_client_info(), mysql_thread_safe ());


/* Validate size of the DB-connections pool */
threads = $RANGE(threads, 1, MAXDBTHREADS);

if ( mysql_library_init(0, NULL, NULL) )
return $LOG(STS$K_ERROR, "Could not initialize MySQL library");

/* Initialize DB-connection context */
for (i = 0 ; i < threads; i++)
{
if ( !(dbctx.mysql = mysql_init(NULL)) )
break;
if ( !i )
$LOG(STS$K_INFO, "Server version: %d", mysql_get_server_version(dbctx.mysql));

mysql_options(dbctx.mysql, MYSQL_OPT_RECONNECT, &on);

/* Open session to the MySQL */
if ( !mysql_real_connect(dbctx.mysql, addr, user, pass, dbname, port, NULL, opts) )
{
$LOG(STS$K_ERROR, "#%u connect(): %s", i, mysql_error(dbctx.mysql));
break;
}

/* Prepare statements */
for (i2 = 0; i2 < QR_ENDOFLIST; i2++)
{
if ( !(dbctx.stmt[i2] = mysql_stmt_init(dbctx.mysql)) )
return $LOG(STS$K_ERROR, "#%d/%d stmt_init() : %s", i, i2, mysql_error(dbctx.mysql));

if ( mysql_stmt_prepare(dbctx.stmt[i2], $ASCPTR(&stmts[i2]), $ASCLEN(&stmts[i2])) )
return $LOG(STS$K_ERROR, "#%d/%d stmt_prepare('%.*s') : %s", i, i2, $ASC(&stmts[i2]), mysql_error(dbctx.mysql));

$TRACE("#%d/%d , dbctx:%08X, mysql:%08X, stmt = %08X, '%.*s'", i, i2, &dbctx, dbctx.mysql, dbctx.stmt[i2], $ASC(&stmts[i2]));
}
}

$TRACE("%u DB-connections has been requested, %u is opened", threads, i);
threads = i;

/*
* Put has been initialized DB-contexts into the free pool
*/
for (i = 0 ; i < threads; i++)
$INSQTAIL(&free_dbctx, &dbctx, &opts);

if ( dbctxs = threads )
status = $LOG(STS$K_SUCCESS, "%u DB-connection contexts has been initialized", threads);
else status = $LOG(STS$K_FATAL, "No DB-connection contexts has been initialized!!!");

return status;
}

There is two routines to allocate/release DB-connection from pool:

int __db_getctx (DB_CTX **dbctx)
{
int status, count;
struct timespec stime = {0, 0}, etime = {0, 0}, delta = {3, 0};

/* Allocate DB context */
if ( 1 & $REMQHEAD(&free_dbctx, dbctx, &count) )
return SS$_NORMAL;

$TRACE("Cannot allocate DB context, start waiting %d seconds ...", delta);

if ( status = clock_gettime(CLOCK_REALTIME, &stime) )
$LOG(STS$K_ERROR, "clock_gettime -> %d", status);

__starlet$add_time (&stime, &delta, &etime);

pthread_mutex_lock(&dbmtx);

while ( !(1 & (status = $REMQHEAD(&free_dbctx, dbctx, &count))) )
{
status = pthread_cond_timedwait(&dbcond, &dbmtx, &etime);

if ( (status) && (status != ETIMEDOUT) )
status = $LOG( STS$K_ERROR, "pthread_cond_timedwait() -> %d, errno = %d", status, errno);
else if ( status == ETIMEDOUT )
{
status = $LOG( STS$K_ERROR, "Cannot allocate a DB context in %d second!", delta);
break;
}
}

pthread_mutex_unlock(&dbmtx);

return status;
}

int __db_retctx (DB_CTX *dbctx)
{
int status, count;

assert( dbctx );

/* Allocate DB context */
status = $INSQHEAD(&free_dbctx, dbctx, &count);

status = pthread_cond_signal (&dbcond);
status = status ? $LOG(STS$K_ERROR, "pthread_cond_timedwait() -> %d, errno = %d", status, errno)
: SS$_NORMAL;

return status;
}


...

/**
* @brief __db_getstream - Rertrive an information for the stream with a given UID and CID;
* @param uid - UID string, longword unsigned
* @param cid - Content ID, longword unsigned
* @param auth - Authorization URL template, counted string
* @return - condition code: SS$_NORMAL - on successful checking of the credentials
*/
int __db_getstream (
unsigned uid,
unsigned cid,
ASC * auth
)
{
unsigned status = SS$_NORMAL, count;
DB_CTX * dbctx;
MYSQL_BIND in[2] = {0}, out = {0};
MYSQL_RES * res;
MYSQL_STMT * stmt;
char url [ 256 ];

/* Declare input & output parameters ... */
in[0].buffer_type = in[1].buffer_type = MYSQL_TYPE_LONG;
in[0].buffer = &uid; in[1].buffer = &cid;
in[0].is_unsigned = in[1].is_unsigned = 1;
in[0].buffer_length = in[1].buffer_length = 0;

out.buffer_type = MYSQL_TYPE_STRING;
out.buffer_length = sizeof(url);
out.buffer = url;
out.length = &count;

/* Allocate DB context */
if ( !(1 & __db_getctx(&dbctx)) )
return $LOG(STS$K_ERROR, "No free DB context");

#if PREPARED_SQL_STATEMENT
stmt = dbctx->stmt[QR_GETSTREAM];
#else
if ( !(stmt = mysql_stmt_init(dbctx->mysql)) )
return $LOG(STS$K_ERROR, "stmt_init() : %s", mysql_error(dbctx->mysql));

if ( mysql_stmt_prepare(stmt, $ASCPTR(&stmts[QR_GETSTREAM]), $ASCLEN(&stmts[QR_GETSTREAM])) )
return $LOG(STS$K_ERROR, "stmt_prepare('%.*s') : %s", $ASC(&stmts[QR_GETSTREAM]), mysql_error(dbctx->mysql));
#endif
//$TRACE("dbctx:%08X, mysql:%08X, stmt:%08X, UID = %d, CID = %d", dbctx, dbctx->mysql, stmt, uid, cid);

/*
* Bind local variable to input argument of the SQL' routine
*/
if ( mysql_stmt_bind_param(stmt, in) )
status = $LOG(STS$K_ERROR, "dbctx:%08X : %s", dbctx, mysql_stmt_error (stmt));

/*
* Execute preparsed SQL statement, check condition
*/
if ( mysql_stmt_execute(stmt) )
status = $LOG(STS$K_ERROR, "execute(dbctx:%08X, stmt:%08X) : %s", dbctx, stmt, mysql_stmt_error (stmt));
else if ( res = mysql_stmt_result_metadata(stmt) )
{
/*
* We got some result - performs a binding OUTput parameters
* to local variables and fetch it
*/
if ( mysql_stmt_bind_result(stmt, &out) )
status = $LOG(STS$K_ERROR, "bind_result(dbctx:%08X, stmt:%08X) : %s", dbctx, stmt, mysql_stmt_error (stmt));

if ( mysql_stmt_fetch(stmt) )
status = $LOG(STS$K_ERROR, "fetch(dbctx:%08X, stmt:%08X) : %s", dbctx, stmt, mysql_stmt_error (stmt));
else {
status = SS$_NORMAL;
$ASCLEN(auth) = (unsigned char) count;
memcpy($ASCPTR(auth), url, count);
}

mysql_free_result(res);
}
else status = $LOG(STS$K_ERROR, "No data is returned for UID=%d, execute(dbctx:%08X, stmt:%08X)", uid, dbctx, stmt);

#if PREPARED_SQL_STATEMENT
if ( mysql_stmt_reset(stmt) )
#else
if ( mysql_stmt_close(stmt) )
#endif
$LOG(STS$K_ERROR, "stmt_reset/close(dbctx:%08X, stmt:%08X) : %s", dbctx, stmt, mysql_stmt_error (stmt));


/* Release DB context */
if ( !(1 & __db_retctx(dbctx)) )
return $LOG(STS$K_FATAL, "dbctx:%08X : Cannot release!", dbctx);

return status;
}



... Run listing ...


(gdb) run -config=QInfo.cfg
Starting program: /home/sysman/STM-QInfo -config=QInfo.cfg
warning: no loadable sections found in added symbol-file system-supplied DSO at 0x7ffff7ffd000
21-04-2016 12:43:20.049 [00c908c0 STM-QINFO-SVC\main\001387] %STMQRSVC-I:Starting ... (built at Apr 21 2016 11:36:03 with CC 4.8.4)

...

21-04-2016 12:43:20.050 [00c908c0 STM-QINFO-DB\__db_init\000155] %STMQRSVC-I:Client version: 6.1.6, thread safe = 1
21-04-2016 12:43:20.050 [00c908c0 STM-QINFO-DB\__db_init\000170] %STMQRSVC-I:Server version: 0
21-04-2016 12:43:20.264 [00c908c0 STM-QINFO-DB\__db_init\000190] #0/0 , dbctx:00C8D940, mysql:00C940D0, stmt = 00CB0950, 'SELECT uai_chkauth(?, ?, ?)'
21-04-2016 12:43:20.317 [00c908c0 STM-QINFO-DB\__db_init\000190] #0/1 , dbctx:00C8D940, mysql:00C940D0, stmt = 00CB2530, 'SELECT uai_getstream2(?, ?)'
21-04-2016 12:43:20.370 [00c908c0 STM-QINFO-DB\__db_init\000190] #0/2 , dbctx:00C8D940, mysql:00C940D0, stmt = 00CB4110, 'SELECT uai_signreq(?, ?)'
21-04-2016 12:43:20.424 [00c908c0 STM-QINFO-DB\__db_init\000190] #0/3 , dbctx:00C8D940, mysql:00C940D0, stmt = 00CB5CF0, 'SELECT uai_acctng(?, ?, ?, ?)'
21-04-2016 12:43:20.654 [00c908c0 STM-QINFO-DB\__db_init\000190] #1/0 , dbctx:00C8D978, mysql:00CB78D0, stmt = 00CC0320, 'SELECT uai_chkauth(?, ?, ?)'
21-04-2016 12:43:20.712 [00c908c0 STM-QINFO-DB\__db_init\000190] #1/1 , dbctx:00C8D978, mysql:00CB78D0, stmt = 00CC1F00, 'SELECT uai_getstream2(?, ?)'
21-04-2016 12:43:20.770 [00c908c0 STM-QINFO-DB\__db_init\000190] #1/2 , dbctx:00C8D978, mysql:00CB78D0, stmt = 00CC3AE0, 'SELECT uai_signreq(?, ?)'
21-04-2016 12:43:20.828 [00c908c0 STM-QINFO-DB\__db_init\000190] #1/3 , dbctx:00C8D978, mysql:00CB78D0, stmt = 00CC56C0, 'SELECT uai_acctng(?, ?, ?, ?)'
21-04-2016 12:43:21.056 [00c908c0 STM-QINFO-DB\__db_init\000190] #2/0 , dbctx:00C8D9B0, mysql:00CC72A0, stmt = 00CCFCF0, 'SELECT uai_chkauth(?, ?, ?)'
21-04-2016 12:43:21.113 [00c908c0 STM-QINFO-DB\__db_init\000190] #2/1 , dbctx:00C8D9B0, mysql:00CC72A0, stmt = 00CD18D0, 'SELECT uai_getstream2(?, ?)'
21-04-2016 12:43:21.171 [00c908c0 STM-QINFO-DB\__db_init\000190] #2/2 , dbctx:00C8D9B0, mysql:00CC72A0, stmt = 00CD34B0, 'SELECT uai_signreq(?, ?)'
21-04-2016 12:43:21.228 [00c908c0 STM-QINFO-DB\__db_init\000190] #2/3 , dbctx:00C8D9B0, mysql:00CC72A0, stmt = 00CD5090, 'SELECT uai_acctng(?, ?, ?, ?)'
21-04-2016 12:43:21.228 [00c908c0 STM-QINFO-DB\__db_init\000194] 3 DB-connections has been requested, 3 is opened
21-04-2016 12:43:21.228 [00c908c0 STM-QINFO-DB\__db_init\000204] %STMQRSVC-S:3 DB-connection contexts has been initialized
21-04-2016 12:43:21.228 [00c908c0 STM-QINFO-SVC\main\001412] Allocated 8192 bytes for index area for 512 streams
21-04-2016 12:43:21.228 [f7ffb700 STM-QINFO-SVC\listener\001015] Starting listener sd = 12 ...
21-04-2016 12:43:21.633 [f7ffb700 STM-QINFO-SVC\listener\001068] %STMQRSVC-I:Open connection from 127.0.0.1:20945, sd = 13.
21-04-2016 12:43:21.633 [f77fa700 STM-QINFO-SVC\processor\000801] [#13] Starting session. Sent initial attributes for authentication ...
21-04-2016 12:43:21.740 [f77fa700 STM-QINFO-SVC\chkauth\000627] %STMQRSVC-S:[#13] UID=316 is authenticated.
21-04-2016 12:43:21.740 [f77fa700 STM-QINFO-SVC\processor\000863] %STMQRSVC-S:[#13] Open session for UID=316.
STM-QInfo: ../STM-QInfo/STM-QInfo-DB.c:121: __db_retctx: Assertion `dbctx' failed.
[New LWP 12658]

Program received signal SIGABRT, Aborted.
[Switching to LWP 12658]
0x0000000000617079 in raise ()
(gdb) back
#0 0x0000000000617079 in raise ()
#1 0x00000000005a1e98 in abort ()
#2 0x000000000059c7e4 in __assert_fail_base ()
#3 0x000000000059c83e in __assert_fail ()
#4 0x000000000040a45f in __db_retctx ()
#5 0x000000000040b18b in __db_getstream ()
#6 0x0000000000407447 in regcid ()
#7 0x000000000040905c in processor ()
#8 0x000000000040cfd2 in start_thread ()
#9 0x00000000005f4749 in clone ()
(gdb)



So, I'm puzzled for now: may be I made something wrong at initializition time or at MySQL API routines ?


Thanks in advance.

Options: ReplyQuote


Subject
Views
Written By
Posted
N-MySQL connections shared between M-threads
2215
April 21, 2016 03:50AM


Sorry, you can't reply to this topic. It has been closed.

Content reproduced on this site is the property of the respective copyright holders. It is not reviewed in advance by Oracle and does not necessarily represent the opinion of Oracle or any other party.