MySQL Forums
Forum List  »  NDB clusters

Re: How can I increase one thread's throughput
Posted by: Tom Shen
Date: July 06, 2016 09:28PM

My code is below and it require C++11 compatible complier
-----------------------------------------
#include <cstdlib>
#include <chrono>
#include <iostream>
#include <thread>
#include <vector>

#include <mysql.h>
#include <mysqld_error.h>
#include <NdbApi.hpp>

using namespace std;

#define PRINT_ERROR(code,msg) \
std::cout << "Error in " << __FILE__ << ", line: " << __LINE__ \
<< ", code: " << code \
<< ", msg: " << msg << "." << std::endl
#define MYSQLERROR(mysql) { \
PRINT_ERROR(mysql_errno(&mysql),mysql_error(&mysql)); \
exit(-1); }
#define APIERROR(error) { \
PRINT_ERROR(error.code,error.message); \
exit(-1); }

static void create_table(MYSQL &);
static void do_insert(Ndb_cluster_connection &, int, int);
static void run_application(MYSQL &, Ndb_cluster_connection &, const int, const int);

int main(int argc, char **argv)
{
if (argc < 3) {
cout << "Arguments are <socket mysqld> <connect_string cluster> [insert number per thread] [thread number] .\n";
exit(EXIT_FAILURE);
}
char * mysqld_sock = argv[1];
const char *connectstring = argv[2];
int number_per_thread = 10000;
if (argc >= 4) number_per_thread = atoi(argv[3]);
int thread_number = 1;
if (argc >= 5) thread_number = atoi(argv[4]);

ndb_init();

{
Ndb_cluster_connection cluster_connection(connectstring);

if (cluster_connection.connect(4, 5, 1))
{
std::cout << "Cluster management server was not ready within 30 secs.\n";
exit(EXIT_FAILURE);
}

if (cluster_connection.wait_until_ready(30,0) < 0)
{
std::cout << "Cluster was not ready within 30 secs.\n";
exit(EXIT_FAILURE);
}

MYSQL mysql;
if ( !mysql_init(&mysql) ) {
std::cout << "mysql_init failed\n";
exit(EXIT_FAILURE);
}
if ( !mysql_real_connect(&mysql, "localhost", "root", "", "", 0, mysqld_sock, 0) )
MYSQLERROR(mysql);

run_application(mysql, cluster_connection, thread_number, number_per_thread);
}

ndb_end(0);

return 0;
}


static void run_application(MYSQL &mysql, Ndb_cluster_connection &cluster_connection, const int thread_number, const int number_per_thread)
{
mysql_query(&mysql, "CREATE DATABASE IF NOT EXISTS ndb_examples");
if (mysql_query(&mysql, "USE ndb_examples") != 0) MYSQLERROR(mysql);
create_table(mysql);

vector<thread> vec;
auto test_start = chrono::system_clock::now();
for(int i = 0; i < thread_number; ++i)
vec.push_back(thread{do_insert, ref(cluster_connection), i*number_per_thread, number_per_thread});
for(auto iter = vec.begin(); iter != vec.end(); ++iter)
iter->join();
auto diff = chrono::system_clock::now() - test_start;
auto ms = chrono::duration_cast<chrono::milliseconds>(diff);
cout << "It takes " << ms.count() << " milliseconds for " << thread_number << " threads insert "
<< thread_number*number_per_thread << " records" << endl;
}

/*********************************************************
* Create a table named api_simple if it does not exist *
*********************************************************/
static void create_table(MYSQL &mysql)
{
while (mysql_query(&mysql,
"CREATE TABLE"
" api_simple"
" (ATTR1 INT UNSIGNED NOT NULL PRIMARY KEY,"
" ATTR2 INT UNSIGNED NOT NULL)"
" ENGINE=NDB"))
{
if (mysql_errno(&mysql) == ER_TABLE_EXISTS_ERROR)
{
std::cout << "MySQL Cluster already has example table: api_simple. "
<< "Dropping it..." << std::endl;
mysql_query(&mysql, "DROP TABLE api_simple");
}
else MYSQLERROR(mysql);
}
}

/**************************************************************************
* Using 5 transactions, insert 10 tuples in table: (0,0),(1,1),...,(9,9) *
**************************************************************************/
static void do_insert(Ndb_cluster_connection &cluster_connection, int start, int num)
{
Ndb myNdb(&cluster_connection, "ndb_examples" );
if (myNdb.init()) APIERROR(myNdb.getNdbError());
const NdbDictionary::Dictionary* myDict= myNdb.getDictionary();
const NdbDictionary::Table *myTable= myDict->getTable("api_simple");

if (myTable == NULL)
APIERROR(myDict->getNdbError());

for (int i = 0; i < num; i++) {
NdbTransaction *myTransaction= myNdb.startTransaction();
if (myTransaction == NULL) APIERROR(myNdb.getNdbError());

NdbOperation *myOperation= myTransaction->getNdbOperation(myTable);
if (myOperation == NULL) APIERROR(myTransaction->getNdbError());

myOperation->insertTuple();
myOperation->equal("ATTR1", start+i);
myOperation->setValue("ATTR2", start+i);

if (myTransaction->execute( NdbTransaction::Commit ) == -1)
APIERROR(myTransaction->getNdbError());

myNdb.closeTransaction(myTransaction);
}
}

Options: ReplyQuote


Subject
Views
Written By
Posted
Re: How can I increase one thread's throughput
1043
July 06, 2016 09:28PM


Sorry, you can't reply to this topic. It has been closed.

Content reproduced on this site is the property of the respective copyright holders. It is not reviewed in advance by Oracle and does not necessarily represent the opinion of Oracle or any other party.