Files @ e23e3482a0b7
Branch filter:

Location: DA/protocols/vldb-protocols.py - annotation

Hannes Muehleisen
more stuff
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
b01cb2bb9139
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
232ee0cc752e
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
c0e3a977dfe5
e70ef62c68b1
e70ef62c68b1
3d604507d726
e70ef62c68b1
3d604507d726
232ee0cc752e
232ee0cc752e
232ee0cc752e
232ee0cc752e
e70ef62c68b1
e70ef62c68b1
b01cb2bb9139
b01cb2bb9139
232ee0cc752e
232ee0cc752e
b01cb2bb9139
b01cb2bb9139
b01cb2bb9139
b01cb2bb9139
b01cb2bb9139
b01cb2bb9139
b01cb2bb9139
b01cb2bb9139
b01cb2bb9139
b01cb2bb9139
b01cb2bb9139
b01cb2bb9139
3d604507d726
3d604507d726
3d604507d726
3d604507d726
3d604507d726
3d604507d726
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
232ee0cc752e
232ee0cc752e
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
7fbf056e912a
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
7fbf056e912a
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
c0e3a977dfe5
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
c0e3a977dfe5
e70ef62c68b1
e70ef62c68b1
3d604507d726
3d604507d726
3d604507d726
3d604507d726
3d604507d726
3d604507d726
3d604507d726
3d604507d726
3d604507d726
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
c0e3a977dfe5
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
c0e3a977dfe5
e70ef62c68b1
e70ef62c68b1
3d604507d726
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
c0e3a977dfe5
c0e3a977dfe5
c0e3a977dfe5
c0e3a977dfe5
c0e3a977dfe5
e70ef62c68b1
b01cb2bb9139
b01cb2bb9139
b01cb2bb9139
b01cb2bb9139
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
232ee0cc752e
232ee0cc752e
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
5dbb8a045bb1
c0e3a977dfe5
c0e3a977dfe5
c0e3a977dfe5
c0e3a977dfe5
c0e3a977dfe5
5dbb8a045bb1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
e70ef62c68b1
import os
import time
import sys
import csv
import re
import json
import subprocess

systems = [
	{'name':'hive-default', 'db':'hive'},
	{'name':'netcat-csv-lz4', 'db':'netcat', 'compress': 'lz4'},
	{'name':'netcat-csv-lz4-heavy', 'db':'netcat', 'compress': 'lz4-heavy'},
	{'name':'netcat-csv-gzip', 'db':'netcat', 'compress': 'gzip'},
	{'name':'netcat-csv-xz', 'db':'netcat', 'compress': 'xz'},
	{'name':'netcat-csv-snappy', 'db':'netcat', 'compress': 'snappy'},
	{'name':'netcat-csv', 'db':'netcat'},
	{'name':'mariadb-compress', 'db':'mariadb', 'compress': True},
	{'name':'db2-default', 'db':'db2'}, 
	{'name':'oracle-default', 'db':'oracle'}, 
	{'name':'postgres-default', 'db':'postgres'}, 
	{'name':'mariadb-default', 'db':'mariadb'}, 
	{'name':'monetdb-default', 'db':'monetdb'}, 
	{'name':'hbase-default', 'db':'hbase'},
	{'name':'mongodb-default', 'db':'mongodb'}]


networks = [
	{'name':'unlimited', 'throughput': -1, 'latency':-1}, 
	{'name':'gigabitethld', 'throughput': 1000, 'latency':0.3},
	{'name':'10mbitethhd', 'throughput': 10, 'latency':150}]

tuples = [1,100,1000,10000,100000,1000000]


# netcat our protocol messages
# we're gonna have a lot of these, so we generate them
chunksizes = [1000, 10000, 100000, 1000000, 10000000, 100000000]
compression_methods = ['lz4', 'snappy', 'gzip', 'xz', 'lz4-heavy', None]
extensions = ['col', 'row']
for chunksize in chunksizes:
	for compression_method in compression_methods:
		for extension in extensions:
			systems.append({
				'name': 'netcat-prot-%s-chunk-%d-%s' % (extension, chunksize, compression_method), 
				'db': 'netcat', 
				'protocol': True,
				'fileext': extension, 
				'chunksize': chunksize,
				'compress': compression_method})

systems = [{'name':'monetdb-colprot-nocomp', 'db':'monetdb', 'params': '--protocol=prot10'},
		   {'name':'monetdb-colprot-compsnappy', 'db':'monetdb', 'params': '--protocol=prot10compressed --compression=snappy'},
		   {'name':'monetdb-colprot-complz4', 'db':'monetdb', 'params': '--protocol=prot10compressed --compression=lz4'},
		   {'name':'monetdb-prot9', 'db':'monetdb', 'params': '--protocol=prot9'}]


nruns = 5
timeout = "10m"

netcat_port = 4444

fnull = open(os.devnull, 'w')

def netcat_listener(compression):
	nclistener = subprocess.Popen(['nc', '-l', '-p', '%d' % netcat_port], stdout=subprocess.PIPE if compression != None else fnull, stderr=fnull)
	if compression == None:
		return nclistener
	else:
		if compression == 'lz4':
			uncompress_cmd = 'lz4 -dc'
		elif compression == 'lz4-heavy':
			uncompress_cmd = 'lz4 -dc -9'
		elif compression == 'gzip':
			uncompress_cmd = 'gunzip -f'
		elif compression == 'xz':
			uncompress_cmd = 'xz -d'
		elif compression == 'snappy':
			uncompress_cmd = 'snzip -d'
		return subprocess.Popen(uncompress_cmd.split(' '), stdin=nclistener.stdout, stdout = fnull)

def syscall(cmd):
	return os.system(cmd)

def rxbytes():
	return int(re.search("RX bytes:(\d+)", os.popen("ifconfig lo").read()).groups(0)[0])

def rxpackets():
	return int(re.search("RX packets:(\d+)", os.popen("ifconfig lo").read()).groups(0)[0])

def benchmark_command(cmd, system, protocol, network, tuple, r, dummy):
	timeoutcmd = 'timeout --foreground -s KILL ' +  timeout
	if system['db'] == 'netcat':
		listener = netcat_listener(system['compress'] if 'compress' in system else None)
		timeoutcmd = ''
	startbytes = rxbytes()
	startpackets = rxpackets()
	start = time.time() # 
	retcode = syscall('/usr/bin/time --format=\'{"io_page_faults": %F, "memory_max_kb": %M, "cpu_kernel_sec": %S, "cpu_user_sec": %U}\' --quiet --output=timing ' + timeoutcmd + ' ' + cmd)
	duration = time.time() - start
	transmittedbytes = rxbytes() - startbytes
	transmittedpackets = rxpackets() - startpackets
	if retcode != 0:
		duration = -1
	stats = {'system': system['name'], 'db': system['db'], 'protocol': protocol, 'network': network['name'], 'throughput': network['throughput'], 'latency': network['latency'], 'tuple': tuple, 'run': r, 'time': duration, "bytes" : transmittedbytes, 'packets': transmittedpackets, 'timeout' : int(retcode != 0), 'bin_orientation' : system['fileext'] if 'fileext' in system else '', 'bin_chunksize': system['chunksize'] if 'chunksize' in system else '', 'bin_compress':system['compress'] if 'compress' in system else ''}
	stats.update(json.load(open('timing')))
	if not dummy:
		w.writerow(stats)
	sys.stdout.flush()
	os.remove('timing')

	if system['db'] == 'netcat':
		listener.kill()


oq = """set colsep '|'
set ARRAYSIZE 100
SET LINESIZE 132
SET PAGESIZE 6000
set echo off
set feedback off
set linesize 1000
set pagesize 0
set sqlprompt ''
set trimspool on
set headsep off
SELECT * FROM lineitem where rownum < &1;
quit
"""

oqfile = open("query-oracle.sql", "w")
oqfile.write(oq)
oqfile.close()

w = csv.DictWriter(sys.stdout, ['system', 'db', 'protocol', 'network', 'throughput', 'latency', 'tuple', 'run', 'timeout', 'time', 'bytes', 'packets', 'cpu_kernel_sec', 'cpu_user_sec',  'io_page_faults', 'memory_max_kb', 'bin_orientation', 'bin_chunksize', 'bin_compress'])

w.writeheader()

for r in range(nruns):
	for system in systems:
		for network in networks:
			syscall("sudo tc qdisc del dev lo root netem 2>/dev/null")
			syscall("sudo tc qdisc add dev lo root netem %s %s" % ('delay %fms' % network['latency'] if network['latency'] > 0 else '', 'rate %dmbit' % network['throughput'] if network['throughput'] > 0 else ''))

			for tuple in tuples:
				query = "SELECT * FROM lineitem LIMIT %d" % tuple

				querycmd = ""
				jdbcflags = ''
				odbcdriver = ''
				odbccmd = None
				if system['db'] == 'postgres':
					querycmd = 'psql %s --host 127.0.0.1 -w -t -A -c "%s" > /dev/null' % ('--set=sslcompression=1 --set=sslmode=require --set=keepalives=0' if 'compress' in system else '', query)
					jdbcflags = 'org.postgresql.Driver jdbc:postgresql://127.0.0.1/user user user'
					odbccmd = 'isql PostgreSQL -d, < query > /dev/null'
				elif system['db'] == 'mariadb':
					querycmd = 'mysql %s --host=127.0.0.1 user --skip-column-names --batch -e "%s" > /dev/null'  % ('--compress' if 'compress' in system else '', query)
					jdbcflags = 'org.mariadb.jdbc.Driver jdbc:mysql://127.0.0.1/user user null'
					odbccmd = 'isql MySQL -d, < query > /dev/null'
				elif system['db'] == 'monetdb':
					if 'params' in system:
						querycmd = '/home/user/monetdb-install/bin/mclient -h 127.0.0.1 -p 50001 -fcsv -s "%s" %s > /dev/null' % (query, system['params'])
						jdbcflags = None
						odbccmd = None
						odbcdriver = None
					else:
						querycmd = 'mclient -h 127.0.0.1 -p 50001 -fcsv -s "%s" > /dev/null' % query
						jdbcflags = 'nl.cwi.monetdb.jdbc.MonetDriver jdbc:monetdb://127.0.0.1:50001/database monetdb monetdb'
						odbccmd = 'isql MonetDB -d, < query > /dev/null'
				elif system['db'] == 'db2':
					db2qfile = open("db2query", "w")
					db2qfile.write("connect to remotedb user user using user; \n" + query + ";\n")
					db2qfile.close()
					querycmd = 'db2 -tf db2query > /dev/null;'
					jdbcflags = 'com.ibm.db2.jcc.DB2Driver jdbc:db2://127.0.0.1:50000/db user user'
					os.environ['DB2INSTANCE'] = 'user'
					odbccmd = 'isql DB2_SAMPLE -d, user user < query > /dev/null'
				elif system['db'] == 'oracle':
					os.environ['TNS_ADMIN'] = '/home/user/oracleconfig'
					querycmd = 'sqlplus system/oracle@//127.0.0.1:49161/XE @query-oracle.sql %d > /dev/null' % tuple
					jdbcflags = 'oracle.jdbc.driver.OracleDriver jdbc:oracle:thin:@127.0.0.1:49161:XE system oracle'
					odbccmd = 'isql Oracle -d, < query > /dev/null'
					# for JDBC/ODBCV
					query = "SELECT * FROM lineitem where rownum < %d" % tuple
				elif system['db'] == 'mongodb':
					querycmd = 'mongoexport -d lineitem  -c things --csv --fields "l_orderkey,l_partkey,l_suppkey,l_linenumber,l_quantity,l_extendedprice,l_discount,l_tax,l_returnflag,l_linestatus,l_shipdate,l_commitdate,l_receiptdate,l_shipinstruct,l_shipmode,l_comment" --limit %d > /dev/null 2> /dev/null' % tuple
					jdbcflags = None
					odbccmd = None
					odbcdriver = None
				elif system['db'] == 'hbase':
					os.environ['HBASE_HEAPSIZE'] = '10g'
					query = "scan 'lineitem',{LIMIT=>%d}" % tuple
					querycmd = 'hbase shell < query > /dev/null 2> /dev/null'
					jdbcflags = None
					odbccmd = None
					odbcdriver = None
				elif system['db'] == 'hive':
					querycmd = None
					jdbcflags = 'org.apache.hive.jdbc.HiveDriver jdbc:hive2://localhost:10000 user null'
					odbccmd = None
					odbcdriver = None
				elif system['db'] == 'netcat':
					if 'protocol' not in system:
						filename = '/home/user/lineitem-%d.csv' % tuple
					else:
						filename = '/home/user/lineitem-%dtpl-%dchunksize.%s' % (tuple, system['chunksize'], system['fileext'])
					compress_cmd = ''
					if 'compress' in system:
						if system['compress'] == 'lz4': 
							compress_cmd = 'lz4 -c - |'
						elif system['compress'] == 'lz4-heavy': 
							compress_cmd = 'lz4 -9 -c - |'
						elif system['compress'] == 'gzip': 
							compress_cmd = 'gzip |'
						elif system['compress'] == 'xz': 
							compress_cmd = 'xz -z |'
						elif system['compress'] == 'snappy': 
							compress_cmd = 'snzip -c |'
					querycmd = 'cat %s | %s nc 127.0.0.1 %d' % (filename, compress_cmd, netcat_port)
					jdbcflags = None
					odbccmd = None
					odbcdriver = None
				else:
					exit("unknown db %s" % system['db'])

				qfile = open("query", "w")
				qfile.write(query)
				qfile.write("\n")
				qfile.close()

				jdbccmd = 'java -Xmx10G -Djava.security.egd=file:/dev/./urandom -cp /home/user/java/pmjc.jar:/home/user/java/db2jcc4.jar:/home/user/java/monetdb-jdbc-2.23.jar:/home/user/java/mariadb-java-client-1.4.6.jar:/home/user/java/ojdbc6_g.jar:/home/user/java/postgresql-9.4.1209.jar:/home/user/java/hive-jdbc-2.1.0-standalone.jar:/home/user/java/hadoop-common-2.6.4.jar nl.cwi.da.pmjc.Pmjc %s "%s" 1000 > /dev/null 2>/dev/null' % (jdbcflags, query)
				
				# special case for hive
				if querycmd is None:
					querycmd = jdbccmd
					jdbccmd = None
					jdbcflags = None

				# getting caches hot
				benchmark_command(querycmd, system, 'native', network, tuple, r, True)

				# native client
				benchmark_command(querycmd, system, 'native', network, tuple, r, False)
				if 'compress' not in system:
					# odbc
					if odbccmd is not None:
						benchmark_command(odbccmd, system, 'odbc', network, tuple, r, False)
					# jdbc
					if jdbcflags is not None:
						benchmark_command(jdbccmd, system, 'jdbc', network, tuple, r, False)
				sys.stdout.flush()
				os.remove('query')



			syscall("sudo tc qdisc del dev lo root netem 2>/dev/null")
			time.sleep(0.1)