Re: How can I increase one thread's throughput
Posted by:
Tom Shen
Date: July 06, 2016 09:28PM
My code is below and it require C++11 compatible complier
-----------------------------------------
#include <cstdlib>
#include <chrono>
#include <iostream>
#include <thread>
#include <vector>
#include <mysql.h>
#include <mysqld_error.h>
#include <NdbApi.hpp>
using namespace std;
#define PRINT_ERROR(code,msg) \
std::cout << "Error in " << __FILE__ << ", line: " << __LINE__ \
<< ", code: " << code \
<< ", msg: " << msg << "." << std::endl
#define MYSQLERROR(mysql) { \
PRINT_ERROR(mysql_errno(&mysql),mysql_error(&mysql)); \
exit(-1); }
#define APIERROR(error) { \
PRINT_ERROR(error.code,error.message); \
exit(-1); }
static void create_table(MYSQL &);
static void do_insert(Ndb_cluster_connection &, int, int);
static void run_application(MYSQL &, Ndb_cluster_connection &, const int, const int);
int main(int argc, char **argv)
{
if (argc < 3) {
cout << "Arguments are <socket mysqld> <connect_string cluster> [insert number per thread] [thread number] .\n";
exit(EXIT_FAILURE);
}
char * mysqld_sock = argv[1];
const char *connectstring = argv[2];
int number_per_thread = 10000;
if (argc >= 4) number_per_thread = atoi(argv[3]);
int thread_number = 1;
if (argc >= 5) thread_number = atoi(argv[4]);
ndb_init();
{
Ndb_cluster_connection cluster_connection(connectstring);
if (cluster_connection.connect(4, 5, 1))
{
std::cout << "Cluster management server was not ready within 30 secs.\n";
exit(EXIT_FAILURE);
}
if (cluster_connection.wait_until_ready(30,0) < 0)
{
std::cout << "Cluster was not ready within 30 secs.\n";
exit(EXIT_FAILURE);
}
MYSQL mysql;
if ( !mysql_init(&mysql) ) {
std::cout << "mysql_init failed\n";
exit(EXIT_FAILURE);
}
if ( !mysql_real_connect(&mysql, "localhost", "root", "", "", 0, mysqld_sock, 0) )
MYSQLERROR(mysql);
run_application(mysql, cluster_connection, thread_number, number_per_thread);
}
ndb_end(0);
return 0;
}
static void run_application(MYSQL &mysql, Ndb_cluster_connection &cluster_connection, const int thread_number, const int number_per_thread)
{
mysql_query(&mysql, "CREATE DATABASE IF NOT EXISTS ndb_examples");
if (mysql_query(&mysql, "USE ndb_examples") != 0) MYSQLERROR(mysql);
create_table(mysql);
vector<thread> vec;
auto test_start = chrono::system_clock::now();
for(int i = 0; i < thread_number; ++i)
vec.push_back(thread{do_insert, ref(cluster_connection), i*number_per_thread, number_per_thread});
for(auto iter = vec.begin(); iter != vec.end(); ++iter)
iter->join();
auto diff = chrono::system_clock::now() - test_start;
auto ms = chrono::duration_cast<chrono::milliseconds>(diff);
cout << "It takes " << ms.count() << " milliseconds for " << thread_number << " threads insert "
<< thread_number*number_per_thread << " records" << endl;
}
/*********************************************************
* Create a table named api_simple if it does not exist *
*********************************************************/
static void create_table(MYSQL &mysql)
{
while (mysql_query(&mysql,
"CREATE TABLE"
" api_simple"
" (ATTR1 INT UNSIGNED NOT NULL PRIMARY KEY,"
" ATTR2 INT UNSIGNED NOT NULL)"
" ENGINE=NDB"))
{
if (mysql_errno(&mysql) == ER_TABLE_EXISTS_ERROR)
{
std::cout << "MySQL Cluster already has example table: api_simple. "
<< "Dropping it..." << std::endl;
mysql_query(&mysql, "DROP TABLE api_simple");
}
else MYSQLERROR(mysql);
}
}
/**************************************************************************
* Using 5 transactions, insert 10 tuples in table: (0,0),(1,1),...,(9,9) *
**************************************************************************/
static void do_insert(Ndb_cluster_connection &cluster_connection, int start, int num)
{
Ndb myNdb(&cluster_connection, "ndb_examples" );
if (myNdb.init()) APIERROR(myNdb.getNdbError());
const NdbDictionary::Dictionary* myDict= myNdb.getDictionary();
const NdbDictionary::Table *myTable= myDict->getTable("api_simple");
if (myTable == NULL)
APIERROR(myDict->getNdbError());
for (int i = 0; i < num; i++) {
NdbTransaction *myTransaction= myNdb.startTransaction();
if (myTransaction == NULL) APIERROR(myNdb.getNdbError());
NdbOperation *myOperation= myTransaction->getNdbOperation(myTable);
if (myOperation == NULL) APIERROR(myTransaction->getNdbError());
myOperation->insertTuple();
myOperation->equal("ATTR1", start+i);
myOperation->setValue("ATTR2", start+i);
if (myTransaction->execute( NdbTransaction::Commit ) == -1)
APIERROR(myTransaction->getNdbError());
myNdb.closeTransaction(myTransaction);
}
}