Files @ 7fbf056e912a
Branch filter:

Location: DA/protocols/vldb-protocols.py

Hannes Muehleisen
first part new protocol
import os
import time
import sys
import csv
import re
import json
import subprocess

systems = [
	{'name':'hive-default', 'db':'hive'},
	{'name':'netcat-csv-lz4', 'db':'netcat', 'compress': 'lz4'},
	{'name':'netcat-csv-lz4-heavy', 'db':'netcat', 'compress': 'lz4-heavy'},
	{'name':'netcat-csv-gzip', 'db':'netcat', 'compress': 'gzip'},
	{'name':'netcat-csv-xz', 'db':'netcat', 'compress': 'xz'},
	{'name':'netcat-csv', 'db':'netcat'},
	{'name':'mariadb-compress', 'db':'mariadb', 'compress': True},
	{'name':'db2-default', 'db':'db2'}, 
	{'name':'oracle-default', 'db':'oracle'}, 
	{'name':'postgres-default', 'db':'postgres'}, 
	{'name':'mariadb-default', 'db':'mariadb'}, 
	{'name':'monetdb-default', 'db':'monetdb'}, 
	{'name':'hbase-default', 'db':'hbase'},
	{'name':'mongodb-default', 'db':'mongodb'}]

networks = [
	{'name':'unlimited', 'throughput': -1, 'latency':-1}]#, 
	#{'name':'gigabitethld', 'throughput': 1000, 'latency':0.3},
	#{'name':'10mbitethhd', 'throughput': 10, 'latency':150}]

#tuples = [1,100,1000,10000,100000,1000000,10000000]
tuples = [1,100,1000,10000]#,100000,1000000]

# netcat our protocol messages
# we're gonna have a lot of these, so we generate them
chunksizes = [100000, 1000000, 10000000, 100000000]
compression_methods = ['lz4', 'lz4-heavy', 'gzip', 'xz']
extensions = ['col', 'row']
for chunksize in chunksizes:
	for compression_method in compression_methods:
		for extension in extensions:
			systems.append({
				'name': 'netcat-prot-%s-chunk-%d-%s' % (extension, chunksize, compression_method), 
				'db': 'netcat', 
				'protocol': True,
				'fileext': extension, 
				'chunksize': chunksize,
				'compress': compression_method})

nruns = 5
timeout = "10m"

netcat_port = 4444

fnull = open(os.devnull, 'w')

def netcat_listener(compression):
	nclistener = subprocess.Popen(['nc', '-l', '-p', '%d' % netcat_port], stdout=subprocess.PIPE if compression != None else fnull, stderr=fnull)
	if compression == None:
		return nclistener
	else:
		if compression == 'lz4':
			uncompress_cmd = 'lz4 -dc'
		elif compression == 'lz4-heavy':
			uncompress_cmd = 'lz4 -dc -9'
		elif compression == 'gzip':
			uncompress_cmd = 'gunzip -f'
		elif compression == 'xz':
			uncompress_cmd = 'xz -d'
		return subprocess.Popen(uncompress_cmd.split(' '), stdin=nclistener.stdout, stdout = fnull)

def syscall(cmd):
	return os.system(cmd)

def rxbytes():
	return int(re.search("RX bytes:(\d+)", os.popen("ifconfig lo").read()).groups(0)[0])

def rxpackets():
	return int(re.search("RX packets:(\d+)", os.popen("ifconfig lo").read()).groups(0)[0])

def benchmark_command(cmd, system, protocol, network, tuple, r, dummy):
	timeoutcmd = 'timeout --foreground -s KILL ' +  timeout
	if system['db'] == 'netcat':
		listener = netcat_listener(system['compress'] if 'compress' in system else None)
		timeoutcmd = ''
	startbytes = rxbytes()
	startpackets = rxpackets()
	start = time.time() # 
	retcode = syscall('/usr/bin/time --format=\'{"io_page_faults": %F, "memory_max_kb": %M, "cpu_kernel_sec": %S, "cpu_user_sec": %U}\' --quiet --output=timing ' + timeoutcmd + ' ' + cmd)
	duration = time.time() - start
	transmittedbytes = rxbytes() - startbytes
	transmittedpackets = rxpackets() - startpackets
	if retcode != 0:
		duration = -1
	stats = {'system': system['name'], 'db': system['db'], 'protocol': protocol, 'network': network['name'], 'throughput': network['throughput'], 'latency': network['latency'], 'tuple': tuple, 'run': r, 'time': duration, "bytes" : transmittedbytes, 'packets': transmittedpackets, 'timeout' : int(retcode != 0), 'bin_orientation' : system['fileext'] if 'fileext' in system else '', 'bin_chunksize': system['chunksize'] if 'chunksize' in system else '', 'bin_compress':system['compress'] if 'compress' in system else ''}
	stats.update(json.load(open('timing')))
	if not dummy:
		w.writerow(stats)
	sys.stdout.flush()
	os.remove('timing')

	if system['db'] == 'netcat':
		listener.kill()


oq = """set colsep '|'
set ARRAYSIZE 100
SET LINESIZE 132
SET PAGESIZE 6000
set echo off
set feedback off
set linesize 1000
set pagesize 0
set sqlprompt ''
set trimspool on
set headsep off
SELECT * FROM lineitem where rownum < &1;
quit
"""

oqfile = open("query-oracle.sql", "w")
oqfile.write(oq)
oqfile.close()

w = csv.DictWriter(sys.stdout, ['system', 'db', 'protocol', 'network', 'throughput', 'latency', 'tuple', 'run', 'timeout', 'time', 'bytes', 'packets', 'cpu_kernel_sec', 'cpu_user_sec',  'io_page_faults', 'memory_max_kb', 'bin_orientation', 'bin_chunksize', 'bin_compress'])

w.writeheader()

for r in range(nruns):
	for system in systems:
		for network in networks:
			syscall("sudo tc qdisc del dev lo root netem 2>/dev/null")
			syscall("sudo tc qdisc add dev lo root netem %s %s" % ('delay %fms' % network['latency'] if network['latency'] > 0 else '', 'rate %dmbit' % network['throughput'] if network['throughput'] > 0 else ''))

			for tuple in tuples:
				query = "SELECT * FROM lineitem LIMIT %d" % tuple

				querycmd = ""
				jdbcflags = ''
				odbcdriver = ''
				odbccmd = None
				if system['db'] == 'postgres':
					querycmd = 'psql %s --host 127.0.0.1 -w -t -A -c "%s" > /dev/null' % ('--set=sslcompression=1 --set=sslmode=require --set=keepalives=0' if 'compress' in system else '', query)
					jdbcflags = 'org.postgresql.Driver jdbc:postgresql://127.0.0.1/user user user'
					odbccmd = 'isql PostgreSQL -d, < query > /dev/null'
				elif system['db'] == 'mariadb':
					querycmd = 'mysql %s --host=127.0.0.1 user --skip-column-names --batch -e "%s" > /dev/null'  % ('--compress' if 'compress' in system else '', query)
					jdbcflags = 'org.mariadb.jdbc.Driver jdbc:mysql://127.0.0.1/user user null'
					odbccmd = 'isql MySQL -d, < query > /dev/null'
				elif system['db'] == 'monetdb':
					querycmd = 'mclient -h 127.0.0.1 -p 50001 -fcsv -s "%s" > /dev/null' % query
					jdbcflags = 'nl.cwi.monetdb.jdbc.MonetDriver jdbc:monetdb://127.0.0.1:50001/database monetdb monetdb'
					odbccmd = 'isql MonetDB -d, < query > /dev/null'
				elif system['db'] == 'db2':
					db2qfile = open("db2query", "w")
					db2qfile.write("connect to remotedb user user using user; \n" + query + ";\n")
					db2qfile.close()
					querycmd = 'db2 -tf db2query > /dev/null;'
					jdbcflags = 'com.ibm.db2.jcc.DB2Driver jdbc:db2://127.0.0.1:50000/db user user'
					os.environ['DB2INSTANCE'] = 'user'
					odbccmd = 'isql DB2_SAMPLE -d, user user < query > /dev/null'
				elif system['db'] == 'oracle':
					os.environ['TNS_ADMIN'] = '/home/user/oracleconfig'
					querycmd = 'sqlplus system/oracle@//127.0.0.1:49161/XE @query-oracle.sql %d > /dev/null' % tuple
					jdbcflags = 'oracle.jdbc.driver.OracleDriver jdbc:oracle:thin:@127.0.0.1:49161:XE system oracle'
					odbccmd = 'isql Oracle -d, < query > /dev/null'
					# for JDBC/ODBCV
					query = "SELECT * FROM lineitem where rownum < %d;" % tuple
				elif system['db'] == 'mongodb':
					querycmd = 'mongoexport -d lineitem  -c things --csv --fields "l_orderkey,l_partkey,l_suppkey,l_linenumber,l_quantity,l_extendedprice,l_discount,l_tax,l_returnflag,l_linestatus,l_shipdate,l_commitdate,l_receiptdate,l_shipinstruct,l_shipmode,l_comment" --limit %d > /dev/null 2> /dev/null' % tuple
					jdbcflags = None
					odbccmd = None
					odbcdriver = None
				elif system['db'] == 'hbase':
					os.environ['HBASE_HEAPSIZE'] = '10g'
					query = "scan 'lineitem',{LIMIT=>%d}" % tuple
					querycmd = 'hbase shell < query > /dev/null 2> /dev/null'
					jdbcflags = None
					odbccmd = None
					odbcdriver = None
				elif system['db'] == 'hive':
					querycmd = None
					jdbcflags = 'org.apache.hive.jdbc.HiveDriver jdbc:hive2://localhost:10000 user null'
					odbccmd = None
					odbcdriver = None
				elif system['db'] == 'netcat':
					if 'protocol' not in system:
						filename = '/home/user/lineitem-%d.csv' % tuple
					else:
						filename = '/home/user/lineitem-%dtpl-%dchunksize.%s' % (tuple, system['chunksize'], system['fileext'])
					compress_cmd = ''
					if 'compress' in system:
						if system['compress'] == 'lz4': 
							compress_cmd = 'lz4 -c - |'
						elif system['compress'] == 'lz4-heavy': 
							compress_cmd = 'lz4 -9 -c - |'
						elif system['compress'] == 'gzip': 
							compress_cmd = 'gzip |'
						elif system['compress'] == 'xz': 
							compress_cmd = 'xz -z |'
					querycmd = 'cat %s | %s nc 127.0.0.1 %d' % (filename, compress_cmd, netcat_port)
					jdbcflags = None
					odbccmd = None
					odbcdriver = None
				else:
					exit("unknown db %s" % system['db'])

				qfile = open("query", "w")
				qfile.write(query)
				qfile.write("\n")
				qfile.close()

				jdbccmd = 'java -Xmx10G -Djava.security.egd=file:/dev/./urandom -cp /home/user/java/pmjc.jar:/home/user/java/db2jcc4.jar:/home/user/java/monetdb-jdbc-2.23.jar:/home/user/java/mariadb-java-client-1.4.6.jar:/home/user/java/ojdbc6_g.jar:/home/user/java/postgresql-9.4.1209.jar:/home/user/java/hive-jdbc-2.1.0-standalone.jar:/home/user/java/hadoop-common-2.6.4.jar nl.cwi.da.pmjc.Pmjc %s "%s" 1000 > /dev/null 2>/dev/null' % (jdbcflags, query)
				
				# special case for hive
				if querycmd is None:
					querycmd = jdbccmd
					jdbccmd = None
					jdbcflags = None

				# getting caches hot
				benchmark_command(querycmd, system, 'native', network, tuple, r, True)

				# native client
				benchmark_command(querycmd, system, 'native', network, tuple, r, False)
				if 'compress' not in system:
					# odbc
					if odbccmd is not None:
						benchmark_command(odbccmd, system, 'odbc', network, tuple, r, False)
					# jdbc
					if jdbcflags is not None:
						benchmark_command(jdbccmd, system, 'jdbc', network, tuple, r, False)
				sys.stdout.flush()
				os.remove('query')



			syscall("sudo tc qdisc del dev lo root netem 2>/dev/null")
			time.sleep(0.1)