import os import time import sys import csv import re import json import subprocess systems = [ {'name':'hive-default', 'db':'hive'}, {'name':'netcat-csv-lz4', 'db':'netcat', 'compress': 'lz4'}, {'name':'netcat-csv-lz4-heavy', 'db':'netcat', 'compress': 'lz4-heavy'}, {'name':'netcat-csv-gzip', 'db':'netcat', 'compress': 'gzip'}, {'name':'netcat-csv-xz', 'db':'netcat', 'compress': 'xz'}, {'name':'netcat-csv-snappy', 'db':'netcat', 'compress': 'snappy'}, {'name':'netcat-csv', 'db':'netcat'}, {'name':'mariadb-compress', 'db':'mariadb', 'compress': True}, {'name':'db2-default', 'db':'db2'}, {'name':'oracle-default', 'db':'oracle'}, {'name':'postgres-default', 'db':'postgres'}, {'name':'mariadb-default', 'db':'mariadb'}, {'name':'monetdb-default', 'db':'monetdb'}, {'name':'hbase-default', 'db':'hbase'}, {'name':'mongodb-default', 'db':'mongodb'}] networks = [ {'name':'unlimited', 'throughput': -1, 'latency':-1}, {'name':'gigabitethld', 'throughput': 1000, 'latency':0.3}, {'name':'10mbitethhd', 'throughput': 10, 'latency':150}] tuples = [1,100,1000,10000,100000,1000000] # netcat our protocol messages # we're gonna have a lot of these, so we generate them chunksizes = [1000, 10000, 100000, 1000000, 10000000, 100000000] compression_methods = ['lz4', 'snappy', 'gzip', 'xz', 'lz4-heavy', None] extensions = ['col', 'row'] for chunksize in chunksizes: for compression_method in compression_methods: for extension in extensions: systems.append({ 'name': 'netcat-prot-%s-chunk-%d-%s' % (extension, chunksize, compression_method), 'db': 'netcat', 'protocol': True, 'fileext': extension, 'chunksize': chunksize, 'compress': compression_method}) systems = [{'name':'monetdb-colprot-nocomp', 'db':'monetdb', 'params': '--protocol=prot10'}, {'name':'monetdb-colprot-compsnappy', 'db':'monetdb', 'params': '--protocol=prot10compressed --compression=snappy'}, {'name':'monetdb-colprot-complz4', 'db':'monetdb', 'params': '--protocol=prot10compressed --compression=lz4'}, {'name':'monetdb-prot9', 'db':'monetdb', 'params': '--protocol=prot9'}] nruns = 5 timeout = "10m" netcat_port = 4444 fnull = open(os.devnull, 'w') def netcat_listener(compression): nclistener = subprocess.Popen(['nc', '-l', '-p', '%d' % netcat_port], stdout=subprocess.PIPE if compression != None else fnull, stderr=fnull) if compression == None: return nclistener else: if compression == 'lz4': uncompress_cmd = 'lz4 -dc' elif compression == 'lz4-heavy': uncompress_cmd = 'lz4 -dc -9' elif compression == 'gzip': uncompress_cmd = 'gunzip -f' elif compression == 'xz': uncompress_cmd = 'xz -d' elif compression == 'snappy': uncompress_cmd = 'snzip -d' return subprocess.Popen(uncompress_cmd.split(' '), stdin=nclistener.stdout, stdout = fnull) def syscall(cmd): return os.system(cmd) def rxbytes(): return int(re.search("RX bytes:(\d+)", os.popen("ifconfig lo").read()).groups(0)[0]) def rxpackets(): return int(re.search("RX packets:(\d+)", os.popen("ifconfig lo").read()).groups(0)[0]) def benchmark_command(cmd, system, protocol, network, tuple, r, dummy): timeoutcmd = 'timeout --foreground -s KILL ' + timeout if system['db'] == 'netcat': listener = netcat_listener(system['compress'] if 'compress' in system else None) timeoutcmd = '' startbytes = rxbytes() startpackets = rxpackets() start = time.time() # retcode = syscall('/usr/bin/time --format=\'{"io_page_faults": %F, "memory_max_kb": %M, "cpu_kernel_sec": %S, "cpu_user_sec": %U}\' --quiet --output=timing ' + timeoutcmd + ' ' + cmd) duration = time.time() - start transmittedbytes = rxbytes() - startbytes transmittedpackets = rxpackets() - startpackets if retcode != 0: duration = -1 stats = {'system': system['name'], 'db': system['db'], 'protocol': protocol, 'network': network['name'], 'throughput': network['throughput'], 'latency': network['latency'], 'tuple': tuple, 'run': r, 'time': duration, "bytes" : transmittedbytes, 'packets': transmittedpackets, 'timeout' : int(retcode != 0), 'bin_orientation' : system['fileext'] if 'fileext' in system else '', 'bin_chunksize': system['chunksize'] if 'chunksize' in system else '', 'bin_compress':system['compress'] if 'compress' in system else ''} stats.update(json.load(open('timing'))) if not dummy: w.writerow(stats) sys.stdout.flush() os.remove('timing') if system['db'] == 'netcat': listener.kill() oq = """set colsep '|' set ARRAYSIZE 100 SET LINESIZE 132 SET PAGESIZE 6000 set echo off set feedback off set linesize 1000 set pagesize 0 set sqlprompt '' set trimspool on set headsep off SELECT * FROM lineitem where rownum < &1; quit """ oqfile = open("query-oracle.sql", "w") oqfile.write(oq) oqfile.close() w = csv.DictWriter(sys.stdout, ['system', 'db', 'protocol', 'network', 'throughput', 'latency', 'tuple', 'run', 'timeout', 'time', 'bytes', 'packets', 'cpu_kernel_sec', 'cpu_user_sec', 'io_page_faults', 'memory_max_kb', 'bin_orientation', 'bin_chunksize', 'bin_compress']) w.writeheader() for r in range(nruns): for system in systems: for network in networks: syscall("sudo tc qdisc del dev lo root netem 2>/dev/null") syscall("sudo tc qdisc add dev lo root netem %s %s" % ('delay %fms' % network['latency'] if network['latency'] > 0 else '', 'rate %dmbit' % network['throughput'] if network['throughput'] > 0 else '')) for tuple in tuples: query = "SELECT * FROM lineitem LIMIT %d" % tuple querycmd = "" jdbcflags = '' odbcdriver = '' odbccmd = None if system['db'] == 'postgres': querycmd = 'psql %s --host 127.0.0.1 -w -t -A -c "%s" > /dev/null' % ('--set=sslcompression=1 --set=sslmode=require --set=keepalives=0' if 'compress' in system else '', query) jdbcflags = 'org.postgresql.Driver jdbc:postgresql://127.0.0.1/user user user' odbccmd = 'isql PostgreSQL -d, < query > /dev/null' elif system['db'] == 'mariadb': querycmd = 'mysql %s --host=127.0.0.1 user --skip-column-names --batch -e "%s" > /dev/null' % ('--compress' if 'compress' in system else '', query) jdbcflags = 'org.mariadb.jdbc.Driver jdbc:mysql://127.0.0.1/user user null' odbccmd = 'isql MySQL -d, < query > /dev/null' elif system['db'] == 'monetdb': if 'params' in system: querycmd = '/home/user/monetdb-install/bin/mclient -h 127.0.0.1 -p 50001 -fcsv -s "%s" %s > /dev/null' % (query, system['params']) jdbcflags = None odbccmd = None odbcdriver = None else: querycmd = 'mclient -h 127.0.0.1 -p 50001 -fcsv -s "%s" > /dev/null' % query jdbcflags = 'nl.cwi.monetdb.jdbc.MonetDriver jdbc:monetdb://127.0.0.1:50001/database monetdb monetdb' odbccmd = 'isql MonetDB -d, < query > /dev/null' elif system['db'] == 'db2': db2qfile = open("db2query", "w") db2qfile.write("connect to remotedb user user using user; \n" + query + ";\n") db2qfile.close() querycmd = 'db2 -tf db2query > /dev/null;' jdbcflags = 'com.ibm.db2.jcc.DB2Driver jdbc:db2://127.0.0.1:50000/db user user' os.environ['DB2INSTANCE'] = 'user' odbccmd = 'isql DB2_SAMPLE -d, user user < query > /dev/null' elif system['db'] == 'oracle': os.environ['TNS_ADMIN'] = '/home/user/oracleconfig' querycmd = 'sqlplus system/oracle@//127.0.0.1:49161/XE @query-oracle.sql %d > /dev/null' % tuple jdbcflags = 'oracle.jdbc.driver.OracleDriver jdbc:oracle:thin:@127.0.0.1:49161:XE system oracle' odbccmd = 'isql Oracle -d, < query > /dev/null' # for JDBC/ODBCV query = "SELECT * FROM lineitem where rownum < %d" % tuple elif system['db'] == 'mongodb': querycmd = 'mongoexport -d lineitem -c things --csv --fields "l_orderkey,l_partkey,l_suppkey,l_linenumber,l_quantity,l_extendedprice,l_discount,l_tax,l_returnflag,l_linestatus,l_shipdate,l_commitdate,l_receiptdate,l_shipinstruct,l_shipmode,l_comment" --limit %d > /dev/null 2> /dev/null' % tuple jdbcflags = None odbccmd = None odbcdriver = None elif system['db'] == 'hbase': os.environ['HBASE_HEAPSIZE'] = '10g' query = "scan 'lineitem',{LIMIT=>%d}" % tuple querycmd = 'hbase shell < query > /dev/null 2> /dev/null' jdbcflags = None odbccmd = None odbcdriver = None elif system['db'] == 'hive': querycmd = None jdbcflags = 'org.apache.hive.jdbc.HiveDriver jdbc:hive2://localhost:10000 user null' odbccmd = None odbcdriver = None elif system['db'] == 'netcat': if 'protocol' not in system: filename = '/home/user/lineitem-%d.csv' % tuple else: filename = '/home/user/lineitem-%dtpl-%dchunksize.%s' % (tuple, system['chunksize'], system['fileext']) compress_cmd = '' if 'compress' in system: if system['compress'] == 'lz4': compress_cmd = 'lz4 -c - |' elif system['compress'] == 'lz4-heavy': compress_cmd = 'lz4 -9 -c - |' elif system['compress'] == 'gzip': compress_cmd = 'gzip |' elif system['compress'] == 'xz': compress_cmd = 'xz -z |' elif system['compress'] == 'snappy': compress_cmd = 'snzip -c |' querycmd = 'cat %s | %s nc 127.0.0.1 %d' % (filename, compress_cmd, netcat_port) jdbcflags = None odbccmd = None odbcdriver = None else: exit("unknown db %s" % system['db']) qfile = open("query", "w") qfile.write(query) qfile.write("\n") qfile.close() jdbccmd = 'java -Xmx10G -Djava.security.egd=file:/dev/./urandom -cp /home/user/java/pmjc.jar:/home/user/java/db2jcc4.jar:/home/user/java/monetdb-jdbc-2.23.jar:/home/user/java/mariadb-java-client-1.4.6.jar:/home/user/java/ojdbc6_g.jar:/home/user/java/postgresql-9.4.1209.jar:/home/user/java/hive-jdbc-2.1.0-standalone.jar:/home/user/java/hadoop-common-2.6.4.jar nl.cwi.da.pmjc.Pmjc %s "%s" 1000 > /dev/null 2>/dev/null' % (jdbcflags, query) # special case for hive if querycmd is None: querycmd = jdbccmd jdbccmd = None jdbcflags = None # getting caches hot benchmark_command(querycmd, system, 'native', network, tuple, r, True) # native client benchmark_command(querycmd, system, 'native', network, tuple, r, False) if 'compress' not in system: # odbc if odbccmd is not None: benchmark_command(odbccmd, system, 'odbc', network, tuple, r, False) # jdbc if jdbcflags is not None: benchmark_command(jdbccmd, system, 'jdbc', network, tuple, r, False) sys.stdout.flush() os.remove('query') syscall("sudo tc qdisc del dev lo root netem 2>/dev/null") time.sleep(0.1)