diff --git a/vldb-protocols.py b/vldb-protocols.py new file mode 100644 index 0000000000000000000000000000000000000000..c5c2114e5aca61ae49ead94b4190d9f7929c67ee --- /dev/null +++ b/vldb-protocols.py @@ -0,0 +1,209 @@ +import os +import time +import sys +import csv +import re +import json +import subprocess + +systems = [ + {'name':'netcat-csv-lz4', 'db':'netcat', 'compress': 'lz4'}, + {'name':'netcat-csv-lz4-heavy', 'db':'netcat', 'compress': 'lz4-heavy'}, + {'name':'netcat-csv-gzip', 'db':'netcat', 'compress': 'gzip'}, + {'name':'netcat-csv-xz', 'db':'netcat', 'compress': 'xz'}, + {'name':'netcat-csv', 'db':'netcat'}, + {'name':'mariadb-compress', 'db':'mariadb', 'compress': True}, + {'name':'db2-default', 'db':'db2'}, + {'name':'oracle-default', 'db':'oracle'}, + {'name':'postgres-default', 'db':'postgres'}, + {'name':'mariadb-default', 'db':'mariadb'}, + {'name':'monetdb-default', 'db':'monetdb'}, + {'name':'hbase-default', 'db':'hbase'}, + {'name':'mongodb-default', 'db':'mongodb'}] + +networks = [ + {'name':'unlimited', 'throughput': -1, 'latency':-1}]#, + #{'name':'gigabitethld', 'throughput': 1000, 'latency':0.3}, + #{'name':'10mbitethhd', 'throughput': 10, 'latency':150}] + +#tuples = [1,100,1000,10000,100000,1000000,10000000] +tuples = [1,100,1000,10000]#,100000,1000000] + +nruns = 5 +timeout = "10m" + +netcat_port = 4444 + +fnull = open(os.devnull, 'w') + +def netcat_listener(compression): + nclistener = subprocess.Popen(['nc', '-l', '-p', '%d' % netcat_port], stdout=subprocess.PIPE if compression != None else fnull, stderr=fnull) + if compression == None: + return nclistener + else: + if compression == 'lz4': + uncompress_cmd = 'lz4 -dc' + elif compression == 'lz4-heavy': + uncompress_cmd = 'lz4 -dc -9' + elif compression == 'gzip': + uncompress_cmd = 'gunzip -f' + elif compression == 'xz': + uncompress_cmd = 'xz -d' + return subprocess.Popen(uncompress_cmd.split(' '), stdin=nclistener.stdout, stdout = fnull) + +def syscall(cmd): + return os.system(cmd) + +def rxbytes(): + return int(re.search("RX bytes:(\d+)", os.popen("ifconfig lo").read()).groups(0)[0]) + +def rxpackets(): + return int(re.search("RX packets:(\d+)", os.popen("ifconfig lo").read()).groups(0)[0]) + +def benchmark_command(cmd, system, protocol, network, tuple, r, dummy): + timeoutcmd = 'timeout --foreground -s KILL ' + timeout + if system['db'] == 'netcat': + listener = netcat_listener(system['compress'] if 'compress' in system else None) + timeoutcmd = '' + startbytes = rxbytes() + startpackets = rxpackets() + start = time.time() # + retcode = syscall('/usr/bin/time --format=\'{"io_page_faults": %F, "memory_max_kb": %M, "cpu_kernel_sec": %S, "cpu_user_sec": %U}\' --quiet --output=timing ' + timeoutcmd + ' ' + cmd) + duration = time.time() - start + transmittedbytes = rxbytes() - startbytes + transmittedpackets = rxpackets() - startpackets + if retcode != 0: + duration = -1 + stats = {'system': system['name'], 'db': system['db'], 'protocol': protocol, 'network': network['name'], 'throughput': network['throughput'], 'latency': network['latency'], 'tuple': tuple, 'run': r, 'time': duration, "bytes" : transmittedbytes, 'packets': transmittedpackets, 'timeout' : int(retcode != 0)} + stats.update(json.load(open('timing'))) + if not dummy: + w.writerow(stats) + sys.stdout.flush() + os.remove('timing') + + if system['db'] == 'netcat': + listener.kill() + + +oq = """set colsep '|' +set ARRAYSIZE 100 +SET LINESIZE 132 +SET PAGESIZE 6000 +set echo off +set feedback off +set linesize 1000 +set pagesize 0 +set sqlprompt '' +set trimspool on +set headsep off +SELECT * FROM lineitem where rownum < &1; +quit +""" + +oqfile = open("query-oracle.sql", "w") +oqfile.write(oq) +oqfile.close() + +w = csv.DictWriter(sys.stdout, ['system', 'db', 'protocol', 'network', 'throughput', 'latency', 'tuple', 'run', 'timeout', 'time', 'bytes', 'packets', 'cpu_kernel_sec', 'cpu_user_sec', 'io_page_faults', 'memory_max_kb']) + +w.writeheader() + +for r in range(nruns): + for system in systems: + for network in networks: + syscall("sudo tc qdisc del dev lo root netem 2>/dev/null") + syscall("sudo tc qdisc add dev lo root netem %s %s" % ('delay %fms' % network['latency'] if network['latency'] > 0 else '', 'rate %dmbit' % network['throughput'] if network['throughput'] > 0 else '')) + + for tuple in tuples: + query = "SELECT * FROM lineitem LIMIT %d" % tuple + + querycmd = "" + jdbcflags = '' + odbcdriver = '' + odbccmd = None + if system['db'] == 'postgres': + querycmd = 'psql %s --host 127.0.0.1 -w -t -A -c "%s" > /dev/null' % ('--set=sslcompression=1 --set=sslmode=require --set=keepalives=0' if 'compress' in system else '', query) + jdbcflags = '-u jdbc:postgresql://127.0.0.1/user -n user -d org.postgresql.Driver' + odbccmd = 'isql PostgreSQL -d, < query > /dev/null' + elif system['db'] == 'mariadb': + querycmd = 'mysql %s --host=127.0.0.1 user --skip-column-names --batch -e "%s" > /dev/null' % ('--compress' if 'compress' in system else '', query) + jdbcflags = '-u jdbc:mysql://127.0.0.1/user -n user -d org.mariadb.jdbc.Driver' + odbccmd = 'isql MySQL -d, < query > /dev/null' + elif system['db'] == 'monetdb': + querycmd = 'mclient -h 127.0.0.1 -p 50001 -fcsv -s "%s" > /dev/null' % query + jdbcflags = '-u jdbc:monetdb://127.0.0.1:50001/database -n monetdb -p monetdb -d nl.cwi.monetdb.jdbc.MonetDriver' + odbccmd = 'isql MonetDB -d, < query > /dev/null' + elif system['db'] == 'db2': + db2qfile = open("db2query", "w") + db2qfile.write("connect to remotedb user user using user; \n" + query + ";\n") + db2qfile.close() + querycmd = 'db2 -tf db2query > /dev/null;' + jdbcflags = '-u jdbc:db2://127.0.0.1:50000/db -d com.ibm.db2.jcc.DB2Driver -n user -p user' + os.environ['DB2INSTANCE'] = 'user' + odbccmd = 'isql DB2_SAMPLE -d, user user < query > /dev/null' + elif system['db'] == 'oracle': + os.environ['TNS_ADMIN'] = '/home/user/oracleconfig' + querycmd = 'sqlplus system/oracle@//127.0.0.1:49161/XE @query-oracle.sql %d > /dev/null' % tuple + jdbcflags = '-u jdbc:oracle:thin:@127.0.0.1:49161:XE -d oracle.jdbc.driver.OracleDriver -n system -p oracle' + odbccmd = 'isql Oracle -d, < query > /dev/null' + # for JDBC/ODBCV + query = "SELECT * FROM lineitem where rownum < %d;" % tuple + elif system['db'] == 'mongodb': + querycmd = 'mongoexport -d lineitem -c things --csv --fields "l_orderkey,l_partkey,l_suppkey,l_linenumber,l_quantity,l_extendedprice,l_discount,l_tax,l_returnflag,l_linestatus,l_shipdate,l_commitdate,l_receiptdate,l_shipinstruct,l_shipmode,l_comment" --limit %d > /dev/null 2> /dev/null' % tuple + jdbcflags = None + odbccmd = None + odbcdriver = None + elif system['db'] == 'hbase': + os.environ['HBASE_HEAPSIZE'] = '10g' + query = "scan 'lineitem',{LIMIT=>%d}" % tuple + querycmd = 'hbase shell < query > /dev/null 2> /dev/null' + jdbcflags = None + odbccmd = None + odbcdriver = None + elif system['db'] == 'netcat': + # open netcat for listening in a separate process + filename = '/home/user/lineitem-%d.csv' % tuple + compress_cmd = '' + if 'compress' in system: + if system['compress'] == 'lz4': + compress_cmd = 'lz4 -c - |' + elif system['compress'] == 'lz4-heavy': + compress_cmd = 'lz4 -9 -c - |' + elif system['compress'] == 'gzip': + compress_cmd = 'gzip |' + elif system['compress'] == 'xz': + compress_cmd = 'xz -z |' + querycmd = 'cat %s | %s nc 127.0.0.1 %d' % (filename, compress_cmd, netcat_port) + jdbcflags = None + odbccmd = None + odbcdriver = None + else: + exit("unknown db %s" % system['db']) + + qfile = open("query", "w") + qfile.write(query) + qfile.write("\n") + qfile.close() + + jdbccmd = 'java -Xmx10G -Djava.security.egd=file:/dev/./urandom -Djava.ext.dirs=/home/user/java/ -jar /home/user/java/sqlline.jar --fastConnect=true --outputformat=csv --isolation=TRANSACTION_SERIALIZABLE --silent=true --showHeader=false %s < query > /dev/null' % jdbcflags + + # getting caches hot + benchmark_command(querycmd, system, 'native', network, tuple, r, True) + + # native client + benchmark_command(querycmd, system, 'native', network, tuple, r, False) + if 'compress' not in system: + # odbc + if odbccmd is not None: + benchmark_command(odbccmd, system, 'odbc', network, tuple, r, False) + # jdbc + if jdbcflags is not None: + benchmark_command(jdbccmd, system, 'jdbc', network, tuple, r, False) + sys.stdout.flush() + os.remove('query') + time.sleep(1) + + + + syscall("sudo tc qdisc del dev lo root netem 2>/dev/null") + time.sleep(0.1)