Files
@ 232ee0cc752e
Branch filter:
Location: DA/protocols/vldb-protocols.py
232ee0cc752e
9.7 KiB
text/x-python
snappy it is?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 | import os
import time
import sys
import csv
import re
import json
import subprocess
systems = [
{'name':'hive-default', 'db':'hive'},
{'name':'netcat-csv-lz4', 'db':'netcat', 'compress': 'lz4'},
{'name':'netcat-csv-lz4-heavy', 'db':'netcat', 'compress': 'lz4-heavy'},
{'name':'netcat-csv-gzip', 'db':'netcat', 'compress': 'gzip'},
{'name':'netcat-csv-xz', 'db':'netcat', 'compress': 'xz'},
{'name':'netcat-csv-snappy', 'db':'netcat', 'compress': 'snappy'},
{'name':'netcat-csv', 'db':'netcat'},
{'name':'mariadb-compress', 'db':'mariadb', 'compress': True},
{'name':'db2-default', 'db':'db2'},
{'name':'oracle-default', 'db':'oracle'},
{'name':'postgres-default', 'db':'postgres'},
{'name':'mariadb-default', 'db':'mariadb'},
{'name':'monetdb-default', 'db':'monetdb'},
{'name':'hbase-default', 'db':'hbase'},
{'name':'mongodb-default', 'db':'mongodb'}]
networks = [
{'name':'unlimited', 'throughput': -1, 'latency':-1}],
{'name':'gigabitethld', 'throughput': 1000, 'latency':0.3},
{'name':'10mbitethhd', 'throughput': 10, 'latency':150}]
tuples = [1,100,1000,10000,100000,1000000]
# netcat our protocol messages
# we're gonna have a lot of these, so we generate them
chunksizes = [1000, 10000, 100000, 1000000, 10000000, 100000000]
compression_methods = ['lz4', 'snappy', 'gzip', 'xz', 'lz4-heavy', None]
extensions = ['col', 'row']
for chunksize in chunksizes:
for compression_method in compression_methods:
for extension in extensions:
systems.append({
'name': 'netcat-prot-%s-chunk-%d-%s' % (extension, chunksize, compression_method),
'db': 'netcat',
'protocol': True,
'fileext': extension,
'chunksize': chunksize,
'compress': compression_method})
nruns = 5
timeout = "10m"
netcat_port = 4444
fnull = open(os.devnull, 'w')
def netcat_listener(compression):
nclistener = subprocess.Popen(['nc', '-l', '-p', '%d' % netcat_port], stdout=subprocess.PIPE if compression != None else fnull, stderr=fnull)
if compression == None:
return nclistener
else:
if compression == 'lz4':
uncompress_cmd = 'lz4 -dc'
elif compression == 'lz4-heavy':
uncompress_cmd = 'lz4 -dc -9'
elif compression == 'gzip':
uncompress_cmd = 'gunzip -f'
elif compression == 'xz':
uncompress_cmd = 'xz -d'
elif compression == 'snappy':
uncompress_cmd = 'snzip -d'
return subprocess.Popen(uncompress_cmd.split(' '), stdin=nclistener.stdout, stdout = fnull)
def syscall(cmd):
return os.system(cmd)
def rxbytes():
return int(re.search("RX bytes:(\d+)", os.popen("ifconfig lo").read()).groups(0)[0])
def rxpackets():
return int(re.search("RX packets:(\d+)", os.popen("ifconfig lo").read()).groups(0)[0])
def benchmark_command(cmd, system, protocol, network, tuple, r, dummy):
timeoutcmd = 'timeout --foreground -s KILL ' + timeout
if system['db'] == 'netcat':
listener = netcat_listener(system['compress'] if 'compress' in system else None)
timeoutcmd = ''
startbytes = rxbytes()
startpackets = rxpackets()
start = time.time() #
retcode = syscall('/usr/bin/time --format=\'{"io_page_faults": %F, "memory_max_kb": %M, "cpu_kernel_sec": %S, "cpu_user_sec": %U}\' --quiet --output=timing ' + timeoutcmd + ' ' + cmd)
duration = time.time() - start
transmittedbytes = rxbytes() - startbytes
transmittedpackets = rxpackets() - startpackets
if retcode != 0:
duration = -1
stats = {'system': system['name'], 'db': system['db'], 'protocol': protocol, 'network': network['name'], 'throughput': network['throughput'], 'latency': network['latency'], 'tuple': tuple, 'run': r, 'time': duration, "bytes" : transmittedbytes, 'packets': transmittedpackets, 'timeout' : int(retcode != 0), 'bin_orientation' : system['fileext'] if 'fileext' in system else '', 'bin_chunksize': system['chunksize'] if 'chunksize' in system else '', 'bin_compress':system['compress'] if 'compress' in system else ''}
stats.update(json.load(open('timing')))
if not dummy:
w.writerow(stats)
sys.stdout.flush()
os.remove('timing')
if system['db'] == 'netcat':
listener.kill()
oq = """set colsep '|'
set ARRAYSIZE 100
SET LINESIZE 132
SET PAGESIZE 6000
set echo off
set feedback off
set linesize 1000
set pagesize 0
set sqlprompt ''
set trimspool on
set headsep off
SELECT * FROM lineitem where rownum < &1;
quit
"""
oqfile = open("query-oracle.sql", "w")
oqfile.write(oq)
oqfile.close()
w = csv.DictWriter(sys.stdout, ['system', 'db', 'protocol', 'network', 'throughput', 'latency', 'tuple', 'run', 'timeout', 'time', 'bytes', 'packets', 'cpu_kernel_sec', 'cpu_user_sec', 'io_page_faults', 'memory_max_kb', 'bin_orientation', 'bin_chunksize', 'bin_compress'])
w.writeheader()
for r in range(nruns):
for system in systems:
for network in networks:
syscall("sudo tc qdisc del dev lo root netem 2>/dev/null")
syscall("sudo tc qdisc add dev lo root netem %s %s" % ('delay %fms' % network['latency'] if network['latency'] > 0 else '', 'rate %dmbit' % network['throughput'] if network['throughput'] > 0 else ''))
for tuple in tuples:
query = "SELECT * FROM lineitem LIMIT %d" % tuple
querycmd = ""
jdbcflags = ''
odbcdriver = ''
odbccmd = None
if system['db'] == 'postgres':
querycmd = 'psql %s --host 127.0.0.1 -w -t -A -c "%s" > /dev/null' % ('--set=sslcompression=1 --set=sslmode=require --set=keepalives=0' if 'compress' in system else '', query)
jdbcflags = 'org.postgresql.Driver jdbc:postgresql://127.0.0.1/user user user'
odbccmd = 'isql PostgreSQL -d, < query > /dev/null'
elif system['db'] == 'mariadb':
querycmd = 'mysql %s --host=127.0.0.1 user --skip-column-names --batch -e "%s" > /dev/null' % ('--compress' if 'compress' in system else '', query)
jdbcflags = 'org.mariadb.jdbc.Driver jdbc:mysql://127.0.0.1/user user null'
odbccmd = 'isql MySQL -d, < query > /dev/null'
elif system['db'] == 'monetdb':
querycmd = 'mclient -h 127.0.0.1 -p 50001 -fcsv -s "%s" > /dev/null' % query
jdbcflags = 'nl.cwi.monetdb.jdbc.MonetDriver jdbc:monetdb://127.0.0.1:50001/database monetdb monetdb'
odbccmd = 'isql MonetDB -d, < query > /dev/null'
elif system['db'] == 'db2':
db2qfile = open("db2query", "w")
db2qfile.write("connect to remotedb user user using user; \n" + query + ";\n")
db2qfile.close()
querycmd = 'db2 -tf db2query > /dev/null;'
jdbcflags = 'com.ibm.db2.jcc.DB2Driver jdbc:db2://127.0.0.1:50000/db user user'
os.environ['DB2INSTANCE'] = 'user'
odbccmd = 'isql DB2_SAMPLE -d, user user < query > /dev/null'
elif system['db'] == 'oracle':
os.environ['TNS_ADMIN'] = '/home/user/oracleconfig'
querycmd = 'sqlplus system/oracle@//127.0.0.1:49161/XE @query-oracle.sql %d > /dev/null' % tuple
jdbcflags = 'oracle.jdbc.driver.OracleDriver jdbc:oracle:thin:@127.0.0.1:49161:XE system oracle'
odbccmd = 'isql Oracle -d, < query > /dev/null'
# for JDBC/ODBCV
query = "SELECT * FROM lineitem where rownum < %d;" % tuple
elif system['db'] == 'mongodb':
querycmd = 'mongoexport -d lineitem -c things --csv --fields "l_orderkey,l_partkey,l_suppkey,l_linenumber,l_quantity,l_extendedprice,l_discount,l_tax,l_returnflag,l_linestatus,l_shipdate,l_commitdate,l_receiptdate,l_shipinstruct,l_shipmode,l_comment" --limit %d > /dev/null 2> /dev/null' % tuple
jdbcflags = None
odbccmd = None
odbcdriver = None
elif system['db'] == 'hbase':
os.environ['HBASE_HEAPSIZE'] = '10g'
query = "scan 'lineitem',{LIMIT=>%d}" % tuple
querycmd = 'hbase shell < query > /dev/null 2> /dev/null'
jdbcflags = None
odbccmd = None
odbcdriver = None
elif system['db'] == 'hive':
querycmd = None
jdbcflags = 'org.apache.hive.jdbc.HiveDriver jdbc:hive2://localhost:10000 user null'
odbccmd = None
odbcdriver = None
elif system['db'] == 'netcat':
if 'protocol' not in system:
filename = '/home/user/lineitem-%d.csv' % tuple
else:
filename = '/home/user/lineitem-%dtpl-%dchunksize.%s' % (tuple, system['chunksize'], system['fileext'])
compress_cmd = ''
if 'compress' in system:
if system['compress'] == 'lz4':
compress_cmd = 'lz4 -c - |'
elif system['compress'] == 'lz4-heavy':
compress_cmd = 'lz4 -9 -c - |'
elif system['compress'] == 'gzip':
compress_cmd = 'gzip |'
elif system['compress'] == 'xz':
compress_cmd = 'xz -z |'
elif system['compress'] == 'snappy':
compress_cmd = 'snzip -c |'
querycmd = 'cat %s | %s nc 127.0.0.1 %d' % (filename, compress_cmd, netcat_port)
jdbcflags = None
odbccmd = None
odbcdriver = None
else:
exit("unknown db %s" % system['db'])
qfile = open("query", "w")
qfile.write(query)
qfile.write("\n")
qfile.close()
jdbccmd = 'java -Xmx10G -Djava.security.egd=file:/dev/./urandom -cp /home/user/java/pmjc.jar:/home/user/java/db2jcc4.jar:/home/user/java/monetdb-jdbc-2.23.jar:/home/user/java/mariadb-java-client-1.4.6.jar:/home/user/java/ojdbc6_g.jar:/home/user/java/postgresql-9.4.1209.jar:/home/user/java/hive-jdbc-2.1.0-standalone.jar:/home/user/java/hadoop-common-2.6.4.jar nl.cwi.da.pmjc.Pmjc %s "%s" 1000 > /dev/null 2>/dev/null' % (jdbcflags, query)
# special case for hive
if querycmd is None:
querycmd = jdbccmd
jdbccmd = None
jdbcflags = None
# getting caches hot
benchmark_command(querycmd, system, 'native', network, tuple, r, True)
# native client
benchmark_command(querycmd, system, 'native', network, tuple, r, False)
if 'compress' not in system:
# odbc
if odbccmd is not None:
benchmark_command(odbccmd, system, 'odbc', network, tuple, r, False)
# jdbc
if jdbcflags is not None:
benchmark_command(jdbccmd, system, 'jdbc', network, tuple, r, False)
sys.stdout.flush()
os.remove('query')
syscall("sudo tc qdisc del dev lo root netem 2>/dev/null")
time.sleep(0.1)
|