Files @ e70ef62c68b1
Branch filter:

Location: DA/protocols/vldb-protocols.py

Hannes Muehleisen
import
import os
import time
import sys
import csv
import re
import json
import subprocess

systems = [
	{'name':'netcat-csv-lz4', 'db':'netcat', 'compress': 'lz4'},
	{'name':'netcat-csv-lz4-heavy', 'db':'netcat', 'compress': 'lz4-heavy'},
	{'name':'netcat-csv-gzip', 'db':'netcat', 'compress': 'gzip'},
	{'name':'netcat-csv-xz', 'db':'netcat', 'compress': 'xz'},
	{'name':'netcat-csv', 'db':'netcat'},
	{'name':'mariadb-compress', 'db':'mariadb', 'compress': True},
	{'name':'db2-default', 'db':'db2'}, 
	{'name':'oracle-default', 'db':'oracle'}, 
	{'name':'postgres-default', 'db':'postgres'}, 
	{'name':'mariadb-default', 'db':'mariadb'}, 
	{'name':'monetdb-default', 'db':'monetdb'}, 
	{'name':'hbase-default', 'db':'hbase'}, 
	{'name':'mongodb-default', 'db':'mongodb'}]

networks = [
	{'name':'unlimited', 'throughput': -1, 'latency':-1}]#, 
	#{'name':'gigabitethld', 'throughput': 1000, 'latency':0.3},
	#{'name':'10mbitethhd', 'throughput': 10, 'latency':150}]

#tuples = [1,100,1000,10000,100000,1000000,10000000]
tuples = [1,100,1000,10000]#,100000,1000000]

nruns = 5
timeout = "10m"

netcat_port = 4444

fnull = open(os.devnull, 'w')

def netcat_listener(compression):
	nclistener = subprocess.Popen(['nc', '-l', '-p', '%d' % netcat_port], stdout=subprocess.PIPE if compression != None else fnull, stderr=fnull)
	if compression == None:
		return nclistener
	else:
		if compression == 'lz4':
			uncompress_cmd = 'lz4 -dc'
		elif compression == 'lz4-heavy':
			uncompress_cmd = 'lz4 -dc -9'
		elif compression == 'gzip':
			uncompress_cmd = 'gunzip -f'
		elif compression == 'xz':
			uncompress_cmd = 'xz -d'
		return subprocess.Popen(uncompress_cmd.split(' '), stdin=nclistener.stdout, stdout = fnull)

def syscall(cmd):
	return os.system(cmd)

def rxbytes():
	return int(re.search("RX bytes:(\d+)", os.popen("ifconfig lo").read()).groups(0)[0])

def rxpackets():
	return int(re.search("RX packets:(\d+)", os.popen("ifconfig lo").read()).groups(0)[0])

def benchmark_command(cmd, system, protocol, network, tuple, r, dummy):
	timeoutcmd = 'timeout --foreground -s KILL ' +  timeout
	if system['db'] == 'netcat':
		listener = netcat_listener(system['compress'] if 'compress' in system else None)
		timeoutcmd = ''
	startbytes = rxbytes()
	startpackets = rxpackets()
	start = time.time() # 
	retcode = syscall('/usr/bin/time --format=\'{"io_page_faults": %F, "memory_max_kb": %M, "cpu_kernel_sec": %S, "cpu_user_sec": %U}\' --quiet --output=timing ' + timeoutcmd + ' ' + cmd)
	duration = time.time() - start
	transmittedbytes = rxbytes() - startbytes
	transmittedpackets = rxpackets() - startpackets
	if retcode != 0:
		duration = -1
	stats = {'system': system['name'], 'db': system['db'], 'protocol': protocol, 'network': network['name'], 'throughput': network['throughput'], 'latency': network['latency'], 'tuple': tuple, 'run': r, 'time': duration, "bytes" : transmittedbytes, 'packets': transmittedpackets, 'timeout' : int(retcode != 0)}
	stats.update(json.load(open('timing')))
	if not dummy:
		w.writerow(stats)
	sys.stdout.flush()
	os.remove('timing')

	if system['db'] == 'netcat':
		listener.kill()


oq = """set colsep '|'
set ARRAYSIZE 100
SET LINESIZE 132
SET PAGESIZE 6000
set echo off
set feedback off
set linesize 1000
set pagesize 0
set sqlprompt ''
set trimspool on
set headsep off
SELECT * FROM lineitem where rownum < &1;
quit
"""

oqfile = open("query-oracle.sql", "w")
oqfile.write(oq)
oqfile.close()

w = csv.DictWriter(sys.stdout, ['system', 'db', 'protocol', 'network', 'throughput', 'latency', 'tuple', 'run', 'timeout', 'time', 'bytes', 'packets', 'cpu_kernel_sec', 'cpu_user_sec',  'io_page_faults', 'memory_max_kb'])

w.writeheader()

for r in range(nruns):
	for system in systems:
		for network in networks:
			syscall("sudo tc qdisc del dev lo root netem 2>/dev/null")
			syscall("sudo tc qdisc add dev lo root netem %s %s" % ('delay %fms' % network['latency'] if network['latency'] > 0 else '', 'rate %dmbit' % network['throughput'] if network['throughput'] > 0 else ''))

			for tuple in tuples:
				query = "SELECT * FROM lineitem LIMIT %d" % tuple

				querycmd = ""
				jdbcflags = ''
				odbcdriver = ''
				odbccmd = None
				if system['db'] == 'postgres':
					querycmd = 'psql %s --host 127.0.0.1 -w -t -A -c "%s" > /dev/null' % ('--set=sslcompression=1 --set=sslmode=require --set=keepalives=0' if 'compress' in system else '', query)
					jdbcflags = '-u jdbc:postgresql://127.0.0.1/user -n user -d org.postgresql.Driver'
					odbccmd = 'isql PostgreSQL -d, < query > /dev/null'
				elif system['db'] == 'mariadb':
					querycmd = 'mysql %s --host=127.0.0.1 user --skip-column-names --batch -e "%s" > /dev/null'  % ('--compress' if 'compress' in system else '', query)
					jdbcflags = '-u jdbc:mysql://127.0.0.1/user -n user -d org.mariadb.jdbc.Driver'
					odbccmd = 'isql MySQL -d, < query > /dev/null'
				elif system['db'] == 'monetdb':
					querycmd = 'mclient -h 127.0.0.1 -p 50001 -fcsv -s "%s" > /dev/null' % query
					jdbcflags = '-u jdbc:monetdb://127.0.0.1:50001/database -n monetdb -p monetdb -d nl.cwi.monetdb.jdbc.MonetDriver'
					odbccmd = 'isql MonetDB -d, < query > /dev/null'
				elif system['db'] == 'db2':
					db2qfile = open("db2query", "w")
					db2qfile.write("connect to remotedb user user using user; \n" + query + ";\n")
					db2qfile.close()
					querycmd = 'db2 -tf db2query > /dev/null;'
					jdbcflags = '-u jdbc:db2://127.0.0.1:50000/db -d com.ibm.db2.jcc.DB2Driver -n user -p user'
					os.environ['DB2INSTANCE'] = 'user'
					odbccmd = 'isql DB2_SAMPLE -d, user user < query > /dev/null'
				elif system['db'] == 'oracle':
					os.environ['TNS_ADMIN'] = '/home/user/oracleconfig'
					querycmd = 'sqlplus system/oracle@//127.0.0.1:49161/XE @query-oracle.sql %d > /dev/null' % tuple
					jdbcflags = '-u jdbc:oracle:thin:@127.0.0.1:49161:XE -d oracle.jdbc.driver.OracleDriver -n system -p oracle'
					odbccmd = 'isql Oracle -d, < query > /dev/null'
					# for JDBC/ODBCV
					query = "SELECT * FROM lineitem where rownum < %d;" % tuple
				elif system['db'] == 'mongodb':
					querycmd = 'mongoexport -d lineitem  -c things --csv --fields "l_orderkey,l_partkey,l_suppkey,l_linenumber,l_quantity,l_extendedprice,l_discount,l_tax,l_returnflag,l_linestatus,l_shipdate,l_commitdate,l_receiptdate,l_shipinstruct,l_shipmode,l_comment" --limit %d > /dev/null 2> /dev/null' % tuple
					jdbcflags = None
					odbccmd = None
					odbcdriver = None
				elif system['db'] == 'hbase':
					os.environ['HBASE_HEAPSIZE'] = '10g'
					query = "scan 'lineitem',{LIMIT=>%d}" % tuple
					querycmd = 'hbase shell < query > /dev/null 2> /dev/null'
					jdbcflags = None
					odbccmd = None
					odbcdriver = None
				elif system['db'] == 'netcat':
					# open netcat for listening in a separate process
					filename = '/home/user/lineitem-%d.csv' % tuple
					compress_cmd = ''
					if 'compress' in system:
						if system['compress'] == 'lz4': 
							compress_cmd = 'lz4 -c - |'
						elif system['compress'] == 'lz4-heavy': 
							compress_cmd = 'lz4 -9 -c - |'
						elif system['compress'] == 'gzip': 
							compress_cmd = 'gzip |'
						elif system['compress'] == 'xz': 
							compress_cmd = 'xz -z |'
					querycmd = 'cat %s | %s nc 127.0.0.1 %d' % (filename, compress_cmd, netcat_port)
					jdbcflags = None
					odbccmd = None
					odbcdriver = None
				else:
					exit("unknown db %s" % system['db'])

				qfile = open("query", "w")
				qfile.write(query)
				qfile.write("\n")
				qfile.close()

				jdbccmd = 'java -Xmx10G -Djava.security.egd=file:/dev/./urandom -Djava.ext.dirs=/home/user/java/ -jar /home/user/java/sqlline.jar --fastConnect=true --outputformat=csv  --isolation=TRANSACTION_SERIALIZABLE --silent=true --showHeader=false %s < query > /dev/null' % jdbcflags

				# getting caches hot
				benchmark_command(querycmd, system, 'native', network, tuple, r, True)

				# native client
				benchmark_command(querycmd, system, 'native', network, tuple, r, False)
				if 'compress' not in system:
					# odbc
					if odbccmd is not None:
						benchmark_command(odbccmd, system, 'odbc', network, tuple, r, False)
					# jdbc
					if jdbcflags is not None:
						benchmark_command(jdbccmd, system, 'jdbc', network, tuple, r, False)
				sys.stdout.flush()
				os.remove('query')
				time.sleep(1)



			syscall("sudo tc qdisc del dev lo root netem 2>/dev/null")
			time.sleep(0.1)