Changeset - 3d604507d726
[Not reviewed]
0 1 1
Mark Raasveldt - 9 years ago 2016-08-12 14:48:43
mark.raasveldt@gmail.com
Add pmodbc.c and add new protocol tests to vldb-protocols.py.
2 files changed with 153 insertions and 5 deletions:
0 comments (0 inline, 0 general)
pmodbc.c
Show inline comments
 
new file 100644
 

	
 
#include <stdio.h>
 
#include <stdlib.h>
 
#include <sql.h>
 
#include <sqlext.h>
 
#include <string.h>
 
#include <assert.h>
 

	
 
// compilation: gcc -g pmodbc.c -lodbc -o pmodbc
 

	
 
static void list_drivers();
 
static void query_db(char* dsn, char* query, int csv);
 
void extract_error(char *fn, SQLHANDLE handle, SQLSMALLINT type);
 

	
 
int main(int argc, char** argv) {
 
    if (argc < 4) {
 
        printf("Incorrect number of arguments.\n\n");
 
        printf("Arguments:\n");
 
        printf("pmodbc [dsn] [query] [csv={0,1}]\n\n");
 
        printf("DSN List:\n");
 
        list_drivers();
 
    } else {
 
        query_db(argv[1], argv[2], *argv[3] == '1');
 
    }
 
}
 

	
 

	
 
static void list_drivers() {
 
    SQLHENV env;
 
    char dsn[256];
 
    char desc[256];
 
    SQLSMALLINT dsn_ret;
 
    SQLSMALLINT desc_ret;
 
    SQLUSMALLINT direction;
 
    SQLRETURN ret;
 

	
 
    SQLAllocHandle(SQL_HANDLE_ENV, SQL_NULL_HANDLE, &env);
 
    SQLSetEnvAttr(env, SQL_ATTR_ODBC_VERSION, (void *) SQL_OV_ODBC3, 0);
 

	
 
    direction = SQL_FETCH_FIRST;
 
    while(SQL_SUCCEEDED(ret = SQLDataSources(env, direction, dsn, sizeof(dsn), &dsn_ret, desc, sizeof(desc), &desc_ret))) {
 
        direction = SQL_FETCH_NEXT;
 
        printf("%s - %s\n", dsn, desc);
 
        if (ret == SQL_SUCCESS_WITH_INFO) printf("\tdata truncation\n");
 
    }
 
}
 

	
 

	
 
static void query_db(char* dsn_str, char* query, int csv) {
 
    SQLHENV env;
 
    SQLHDBC dbc;
 
    SQLHSTMT stmt;
 
    SQLRETURN ret; /* ODBC API return status */
 
    SQLSMALLINT columns; /* number of columns in result-set */
 
    int row = 0;
 
    char dsn[1000];
 
    snprintf(dsn, 1000, "DSN=%s;", dsn_str);
 

	
 
    /* Allocate an environment handle */
 
    SQLAllocHandle(SQL_HANDLE_ENV, SQL_NULL_HANDLE, &env);
 
    /* We want ODBC 3 support */
 
    SQLSetEnvAttr(env, SQL_ATTR_ODBC_VERSION, (void *) SQL_OV_ODBC3, 0);
 
    /* Allocate a connection handle */
 
    ret = SQLAllocHandle(SQL_HANDLE_DBC, env, &dbc);
 
    if (!SQL_SUCCEEDED(ret)) {
 
      extract_error("SQLAllocHandle for dbc", env, SQL_HANDLE_ENV);
 
      exit(1);
 
    }
 
    /* Connect to the DSN mydsn */
 
    /* You will need to change mydsn to one you have created and tested */
 
    SQLDriverConnect(dbc, NULL, dsn, SQL_NTS, NULL, 0, NULL, SQL_DRIVER_COMPLETE);
 
    /* Allocate a statement handle */
 
    SQLAllocHandle(SQL_HANDLE_STMT, dbc, &stmt);
 
    /* Retrieve a list of tables */
 
    //SQLTables(stmt, NULL, 0, NULL, 0, NULL, 0, "TABLE", SQL_NTS);
 
    ret = SQLExecDirect(stmt, query, SQL_NTS);
 
    if (!SQL_SUCCEEDED(ret)) {
 
        extract_error("SQLExecuteDirect for dbc", stmt, SQL_HANDLE_STMT);
 
        exit(1);
 
    }
 
    /* How many columns are there */
 
    SQLNumResultCols(stmt, &columns);
 
    if (csv == 0) {
 
        while (SQL_SUCCEEDED(ret = SQLFetch(stmt))) {
 
        }
 
    } else {
 
        char csvbuf[1000];
 
        /* Loop through the rows in the result-set */
 
        while (SQL_SUCCEEDED(ret = SQLFetch(stmt))) {
 
            SQLUSMALLINT i;
 
            SQLLEN curlen = 0;
 
            SQLLEN indicator;
 
            /* Loop through the columns */
 
            for (i = 1; i <= columns; i++) {
 
                /* retrieve column data as a string */
 
                ret = SQLGetData(stmt, i, SQL_C_CHAR, csvbuf + curlen, sizeof(csvbuf) - curlen, &indicator);
 
                if (!SQL_SUCCEEDED(ret)) {
 
                    extract_error("SQLExecuteDirect for dbc", stmt, SQL_HANDLE_STMT);
 
                    exit(1);    
 
                }
 
                curlen += indicator;
 
                if (i != columns) 
 
                    csvbuf[curlen++] = ',';
 
            }
 
            puts(csvbuf);
 
        }
 
    }
 
    extract_error("SQLStmt for dbc", stmt, SQL_HANDLE_STMT);
 
    exit(1);
 
}
 

	
 
void extract_error(char *fn, SQLHANDLE handle, SQLSMALLINT type)
 
{
 
    SQLINTEGER   i = 0;
 
    SQLINTEGER   native;
 
    SQLCHAR  state[ 7 ];
 
    SQLCHAR  text[256];
 
    SQLSMALLINT  len;
 
    SQLRETURN    ret;
 

	
 
    fprintf(stderr,
 
            "\n"
 
            "The driver reported the following diagnostics whilst running "
 
            "%s\n\n",
 
            fn);
 

	
 
    do
 
    {
 
        ret = SQLGetDiagRec(type, handle, ++i, state, &native, text,
 
                            sizeof(text), &len );
 
        if (SQL_SUCCEEDED(ret))
 
            printf("%s:%d:%d:%s\n", state, i, native, text);
 
    }
 
    while( ret == SQL_SUCCESS );
 
}
vldb-protocols.py
Show inline comments
 
import os
 
import time
 
import sys
 
import csv
 
import re
 
import json
 
import subprocess
 

	
 
systems = [
 
	{'name':'hive-default', 'db':'hive'},
 
	{'name':'netcat-csv-lz4', 'db':'netcat', 'compress': 'lz4'},
 
	{'name':'netcat-csv-lz4-heavy', 'db':'netcat', 'compress': 'lz4-heavy'},
 
	{'name':'netcat-csv-gzip', 'db':'netcat', 'compress': 'gzip'},
 
	{'name':'netcat-csv-xz', 'db':'netcat', 'compress': 'xz'},
 
	{'name':'netcat-csv-snappy', 'db':'netcat', 'compress': 'snappy'},
 
	{'name':'netcat-csv', 'db':'netcat'},
 
	{'name':'mariadb-compress', 'db':'mariadb', 'compress': True},
 
	{'name':'db2-default', 'db':'db2'}, 
 
	{'name':'oracle-default', 'db':'oracle'}, 
 
	{'name':'postgres-default', 'db':'postgres'}, 
 
	{'name':'mariadb-default', 'db':'mariadb'}, 
 
	{'name':'monetdb-default', 'db':'monetdb'}, 
 
	{'name':'hbase-default', 'db':'hbase'},
 
	{'name':'mongodb-default', 'db':'mongodb'}]
 

	
 

	
 
networks = [
 
	{'name':'unlimited', 'throughput': -1, 'latency':-1}], 
 
	{'name':'unlimited', 'throughput': -1, 'latency':-1}, 
 
	{'name':'gigabitethld', 'throughput': 1000, 'latency':0.3},
 
	{'name':'10mbitethhd', 'throughput': 10, 'latency':150}]
 

	
 
tuples = [1,100,1000,10000,100000,1000000]
 

	
 

	
 
# netcat our protocol messages
 
# we're gonna have a lot of these, so we generate them
 
chunksizes = [1000, 10000, 100000, 1000000, 10000000, 100000000]
 
compression_methods = ['lz4', 'snappy', 'gzip', 'xz', 'lz4-heavy', None]
 
extensions = ['col', 'row']
 
for chunksize in chunksizes:
 
	for compression_method in compression_methods:
 
		for extension in extensions:
 
			systems.append({
 
				'name': 'netcat-prot-%s-chunk-%d-%s' % (extension, chunksize, compression_method), 
 
				'db': 'netcat', 
 
				'protocol': True,
 
				'fileext': extension, 
 
				'chunksize': chunksize,
 
				'compress': compression_method})
 

	
 
systems = [{'name':'monetdb-colprot-nocomp', 'db':'monetdb', 'params': '--protocol=prot10'},
 
		   {'name':'monetdb-colprot-compsnappy', 'db':'monetdb', 'params': '--protocol=prot10compressed --compression=snappy'},
 
		   {'name':'monetdb-colprot-complz4', 'db':'monetdb', 'params': '--protocol=prot10compressed --compression=lz4'},
 
		   {'name':'monetdb-prot9', 'db':'monetdb', 'params': '--protocol=prot9'}]
 

	
 

	
 
nruns = 5
 
timeout = "10m"
 

	
 
netcat_port = 4444
 

	
 
fnull = open(os.devnull, 'w')
 

	
 
def netcat_listener(compression):
 
	nclistener = subprocess.Popen(['nc', '-l', '-p', '%d' % netcat_port], stdout=subprocess.PIPE if compression != None else fnull, stderr=fnull)
 
	if compression == None:
 
		return nclistener
 
	else:
 
		if compression == 'lz4':
 
			uncompress_cmd = 'lz4 -dc'
 
		elif compression == 'lz4-heavy':
 
			uncompress_cmd = 'lz4 -dc -9'
 
		elif compression == 'gzip':
 
			uncompress_cmd = 'gunzip -f'
 
		elif compression == 'xz':
 
			uncompress_cmd = 'xz -d'
 
		elif compression == 'snappy':
 
			uncompress_cmd = 'snzip -d'
 
		return subprocess.Popen(uncompress_cmd.split(' '), stdin=nclistener.stdout, stdout = fnull)
 

	
 
def syscall(cmd):
 
	return os.system(cmd)
 

	
 
def rxbytes():
 
	return int(re.search("RX bytes:(\d+)", os.popen("ifconfig lo").read()).groups(0)[0])
 

	
 
def rxpackets():
 
	return int(re.search("RX packets:(\d+)", os.popen("ifconfig lo").read()).groups(0)[0])
 

	
 
def benchmark_command(cmd, system, protocol, network, tuple, r, dummy):
 
	timeoutcmd = 'timeout --foreground -s KILL ' +  timeout
 
	if system['db'] == 'netcat':
 
		listener = netcat_listener(system['compress'] if 'compress' in system else None)
 
		timeoutcmd = ''
 
	startbytes = rxbytes()
 
	startpackets = rxpackets()
 
	start = time.time() # 
 
	retcode = syscall('/usr/bin/time --format=\'{"io_page_faults": %F, "memory_max_kb": %M, "cpu_kernel_sec": %S, "cpu_user_sec": %U}\' --quiet --output=timing ' + timeoutcmd + ' ' + cmd)
 
	duration = time.time() - start
 
	transmittedbytes = rxbytes() - startbytes
 
	transmittedpackets = rxpackets() - startpackets
 
	if retcode != 0:
 
		duration = -1
 
	stats = {'system': system['name'], 'db': system['db'], 'protocol': protocol, 'network': network['name'], 'throughput': network['throughput'], 'latency': network['latency'], 'tuple': tuple, 'run': r, 'time': duration, "bytes" : transmittedbytes, 'packets': transmittedpackets, 'timeout' : int(retcode != 0), 'bin_orientation' : system['fileext'] if 'fileext' in system else '', 'bin_chunksize': system['chunksize'] if 'chunksize' in system else '', 'bin_compress':system['compress'] if 'compress' in system else ''}
 
@@ -105,114 +112,120 @@ def benchmark_command(cmd, system, protocol, network, tuple, r, dummy):
 
		listener.kill()
 

	
 

	
 
oq = """set colsep '|'
 
set ARRAYSIZE 100
 
SET LINESIZE 132
 
SET PAGESIZE 6000
 
set echo off
 
set feedback off
 
set linesize 1000
 
set pagesize 0
 
set sqlprompt ''
 
set trimspool on
 
set headsep off
 
SELECT * FROM lineitem where rownum < &1;
 
quit
 
"""
 

	
 
oqfile = open("query-oracle.sql", "w")
 
oqfile.write(oq)
 
oqfile.close()
 

	
 
w = csv.DictWriter(sys.stdout, ['system', 'db', 'protocol', 'network', 'throughput', 'latency', 'tuple', 'run', 'timeout', 'time', 'bytes', 'packets', 'cpu_kernel_sec', 'cpu_user_sec',  'io_page_faults', 'memory_max_kb', 'bin_orientation', 'bin_chunksize', 'bin_compress'])
 

	
 
w.writeheader()
 

	
 
for r in range(nruns):
 
	for system in systems:
 
		for network in networks:
 
			syscall("sudo tc qdisc del dev lo root netem 2>/dev/null")
 
			syscall("sudo tc qdisc add dev lo root netem %s %s" % ('delay %fms' % network['latency'] if network['latency'] > 0 else '', 'rate %dmbit' % network['throughput'] if network['throughput'] > 0 else ''))
 

	
 
			for tuple in tuples:
 
				query = "SELECT * FROM lineitem LIMIT %d" % tuple
 

	
 
				querycmd = ""
 
				jdbcflags = ''
 
				odbcdriver = ''
 
				odbccmd = None
 
				if system['db'] == 'postgres':
 
					querycmd = 'psql %s --host 127.0.0.1 -w -t -A -c "%s" > /dev/null' % ('--set=sslcompression=1 --set=sslmode=require --set=keepalives=0' if 'compress' in system else '', query)
 
					jdbcflags = 'org.postgresql.Driver jdbc:postgresql://127.0.0.1/user user user'
 
					odbccmd = 'isql PostgreSQL -d, < query > /dev/null'
 
				elif system['db'] == 'mariadb':
 
					querycmd = 'mysql %s --host=127.0.0.1 user --skip-column-names --batch -e "%s" > /dev/null'  % ('--compress' if 'compress' in system else '', query)
 
					jdbcflags = 'org.mariadb.jdbc.Driver jdbc:mysql://127.0.0.1/user user null'
 
					odbccmd = 'isql MySQL -d, < query > /dev/null'
 
				elif system['db'] == 'monetdb':
 
					querycmd = 'mclient -h 127.0.0.1 -p 50001 -fcsv -s "%s" > /dev/null' % query
 
					jdbcflags = 'nl.cwi.monetdb.jdbc.MonetDriver jdbc:monetdb://127.0.0.1:50001/database monetdb monetdb'
 
					odbccmd = 'isql MonetDB -d, < query > /dev/null'
 
					if 'params' in system:
 
						querycmd = '/home/user/monetdb-install/bin/mclient -h 127.0.0.1 -p 50001 -fcsv -s "%s" %s > /dev/null' % (query, system['params'])
 
						jdbcflags = None
 
						odbccmd = None
 
						odbcdriver = None
 
					else:
 
						querycmd = 'mclient -h 127.0.0.1 -p 50001 -fcsv -s "%s" > /dev/null' % query
 
						jdbcflags = 'nl.cwi.monetdb.jdbc.MonetDriver jdbc:monetdb://127.0.0.1:50001/database monetdb monetdb'
 
						odbccmd = 'isql MonetDB -d, < query > /dev/null'
 
				elif system['db'] == 'db2':
 
					db2qfile = open("db2query", "w")
 
					db2qfile.write("connect to remotedb user user using user; \n" + query + ";\n")
 
					db2qfile.close()
 
					querycmd = 'db2 -tf db2query > /dev/null;'
 
					jdbcflags = 'com.ibm.db2.jcc.DB2Driver jdbc:db2://127.0.0.1:50000/db user user'
 
					os.environ['DB2INSTANCE'] = 'user'
 
					odbccmd = 'isql DB2_SAMPLE -d, user user < query > /dev/null'
 
				elif system['db'] == 'oracle':
 
					os.environ['TNS_ADMIN'] = '/home/user/oracleconfig'
 
					querycmd = 'sqlplus system/oracle@//127.0.0.1:49161/XE @query-oracle.sql %d > /dev/null' % tuple
 
					jdbcflags = 'oracle.jdbc.driver.OracleDriver jdbc:oracle:thin:@127.0.0.1:49161:XE system oracle'
 
					odbccmd = 'isql Oracle -d, < query > /dev/null'
 
					# for JDBC/ODBCV
 
					query = "SELECT * FROM lineitem where rownum < %d;" % tuple
 
					query = "SELECT * FROM lineitem where rownum < %d" % tuple
 
				elif system['db'] == 'mongodb':
 
					querycmd = 'mongoexport -d lineitem  -c things --csv --fields "l_orderkey,l_partkey,l_suppkey,l_linenumber,l_quantity,l_extendedprice,l_discount,l_tax,l_returnflag,l_linestatus,l_shipdate,l_commitdate,l_receiptdate,l_shipinstruct,l_shipmode,l_comment" --limit %d > /dev/null 2> /dev/null' % tuple
 
					jdbcflags = None
 
					odbccmd = None
 
					odbcdriver = None
 
				elif system['db'] == 'hbase':
 
					os.environ['HBASE_HEAPSIZE'] = '10g'
 
					query = "scan 'lineitem',{LIMIT=>%d}" % tuple
 
					querycmd = 'hbase shell < query > /dev/null 2> /dev/null'
 
					jdbcflags = None
 
					odbccmd = None
 
					odbcdriver = None
 
				elif system['db'] == 'hive':
 
					querycmd = None
 
					jdbcflags = 'org.apache.hive.jdbc.HiveDriver jdbc:hive2://localhost:10000 user null'
 
					odbccmd = None
 
					odbcdriver = None
 
				elif system['db'] == 'netcat':
 
					if 'protocol' not in system:
 
						filename = '/home/user/lineitem-%d.csv' % tuple
 
					else:
 
						filename = '/home/user/lineitem-%dtpl-%dchunksize.%s' % (tuple, system['chunksize'], system['fileext'])
 
					compress_cmd = ''
 
					if 'compress' in system:
 
						if system['compress'] == 'lz4': 
 
							compress_cmd = 'lz4 -c - |'
 
						elif system['compress'] == 'lz4-heavy': 
 
							compress_cmd = 'lz4 -9 -c - |'
 
						elif system['compress'] == 'gzip': 
 
							compress_cmd = 'gzip |'
 
						elif system['compress'] == 'xz': 
 
							compress_cmd = 'xz -z |'
 
						elif system['compress'] == 'snappy': 
 
							compress_cmd = 'snzip -c |'
 
					querycmd = 'cat %s | %s nc 127.0.0.1 %d' % (filename, compress_cmd, netcat_port)
 
					jdbcflags = None
 
					odbccmd = None
 
					odbcdriver = None
 
				else:
 
					exit("unknown db %s" % system['db'])
 

	
 
				qfile = open("query", "w")
 
				qfile.write(query)
 
				qfile.write("\n")
 
				qfile.close()
 

	
 
				jdbccmd = 'java -Xmx10G -Djava.security.egd=file:/dev/./urandom -cp /home/user/java/pmjc.jar:/home/user/java/db2jcc4.jar:/home/user/java/monetdb-jdbc-2.23.jar:/home/user/java/mariadb-java-client-1.4.6.jar:/home/user/java/ojdbc6_g.jar:/home/user/java/postgresql-9.4.1209.jar:/home/user/java/hive-jdbc-2.1.0-standalone.jar:/home/user/java/hadoop-common-2.6.4.jar nl.cwi.da.pmjc.Pmjc %s "%s" 1000 > /dev/null 2>/dev/null' % (jdbcflags, query)
 
				
0 comments (0 inline, 0 general)