From b01cb2bb9139ca2306fc01a0d259182837699c2a 2016-08-02 15:00:51 From: Mark Raasveldt Date: 2016-08-02 15:00:51 Subject: [PATCH] Add netcat of our own protocol message. --- diff --git a/genprotocol.py b/genprotocol.py new file mode 100644 index 0000000000000000000000000000000000000000..868bda532597d669c3437ec596b602b21b8ee08c --- /dev/null +++ b/genprotocol.py @@ -0,0 +1,85 @@ +import numpy +import pandas +from datetime import datetime +import time + + +def datetoint(col): + return numpy.array([time.mktime(datetime.strptime(x, '%Y-%m-%d').timetuple()) for x in col], dtype=numpy.int64) + +chunkbyte_choices = [100000, 1000000, 10000000, 100000000] +chunkbytes = 1000000 + +tuples = [1,100,1000,10000,100000,1000000] +filename = '/home/user/lineitem-%d.csv' + +for chunkbytes in chunkbyte_choices: + for tuple in tuples: + cfile = pandas.read_csv(filename % tuple, sep='|', header=None, names=['l_orderkey','l_partkey','l_suppkey','l_linenumber','l_quantity','l_extendedprice','l_discount','l_tax','l_returnflag','l_linestatus','l_shipdate','l_commitdate','l_receiptdate','l_shipinstruct','l_shipmode','l_comment']) + + cfile.l_orderkey = cfile.l_orderkey.astype(numpy.int32) + cfile.l_partkey = cfile.l_partkey.astype(numpy.int32) + cfile.l_suppkey = cfile.l_suppkey.astype(numpy.int16) + cfile.l_linenumber = cfile.l_linenumber.astype(numpy.int8) + cfile.l_quantity = cfile.l_quantity.astype(numpy.int8) + cfile.l_shipdate = datetoint(cfile.l_shipdate) + cfile.l_commitdate = datetoint(cfile.l_commitdate) + cfile.l_receiptdate = datetoint(cfile.l_receiptdate) + + out_col = open('lineitem-%dtpl-%dchunksize.col' % (tuple, chunkbytes), 'wb+') + out_row = open('lineitem-%dtpl-%dchunksize.row' % (tuple, chunkbytes), 'wb+') + + startrow = 0 + + while startrow < len(cfile): + print(startrow) + tmp = chunkbytes - 16 + row = 0 + rowsize = 0 + # for the current chunk, compute the amount of rows to transmit + while startrow + row < len(cfile): + rowsize = 0 + for i in range(len(cfile.columns)): + col = cfile[cfile.columns[i]] + if col.dtype == numpy.object: + rowsize += len(col[row]) + 1 + else: + rowsize += col.dtype.itemsize + if (tmp < rowsize): + break + tmp -= rowsize + row += 1 + + # write column-protocol + + # message length + numpy.array(chunkbytes - tmp, dtype=numpy.int64).tofile(out_col) + # row count + numpy.array(row, dtype=numpy.int64).tofile(out_col) + + for i in range(len(cfile.columns)): + col = cfile[cfile.columns[i]] + if col.dtype == numpy.object: + for o in range(startrow, (startrow + row)): + out_col.write(col[o]) + out_col.write('\0') + else: + numpy.array(col[startrow:(startrow + row)]).tofile(out_col) + + # write row-protocol + + # message length + numpy.array(chunkbytes - tmp, dtype=numpy.int64).tofile(out_row) + for r in range(startrow, startrow+row): + for i in range(len(cfile.columns)): + col = cfile[cfile.columns[i]] + if col.dtype == numpy.object: + out_row.write(col[r]) + out_row.write('\0') + else: + numpy.array(col[r]).tofile(out_row) + + startrow += row + + out_col.close() + out_row.close() diff --git a/vldb-protocols.py b/vldb-protocols.py index d3b59911a1424938f301860b0fa36d865420eae9..ba872178139c8b123ac60f187833f2b64bd18921 100644 --- a/vldb-protocols.py +++ b/vldb-protocols.py @@ -7,7 +7,7 @@ import json import subprocess systems = [ - {'name':'hive-default', 'db':'hive'}, + {'name':'hive-default', 'db':'hive'}, {'name':'netcat-csv-lz4', 'db':'netcat', 'compress': 'lz4'}, {'name':'netcat-csv-lz4-heavy', 'db':'netcat', 'compress': 'lz4-heavy'}, {'name':'netcat-csv-gzip', 'db':'netcat', 'compress': 'gzip'}, @@ -30,6 +30,22 @@ networks = [ #tuples = [1,100,1000,10000,100000,1000000,10000000] tuples = [1,100,1000,10000]#,100000,1000000] +# netcat our protocol messages +# we're gonna have a lot of these, so we generate them +chunksizes = [100000, 1000000, 10000000, 100000000] +compression_methods = ['lz4', 'lz4-heavy', 'gzip', 'xz'] +extensions = ['col', 'row'] +for chunksize in chunksizes: + for compression_method in compression_methods: + for extension in extensions: + systems.append({ + 'name': 'netcat-prot-%s-chunk-%d-%s' % (extension, chunksize, compression_method), + 'db': 'netcat', + 'protocol': True, + 'fileext': extension, + 'chunksize': chunksize, + 'compress': compression_method}) + nruns = 5 timeout = "10m" @@ -167,8 +183,10 @@ for r in range(nruns): odbccmd = None odbcdriver = None elif system['db'] == 'netcat': - # open netcat for listening in a separate process - filename = '/home/user/lineitem-%d.csv' % tuple + if 'protocol' not in system: + filename = '/home/user/lineitem-%d.csv' % tuple + else: + filename = '/home/user/lineitem-%dtpl-%dchunksize.%s' % (tuple, system['chunksize'], system['fileext']) compress_cmd = '' if 'compress' in system: if system['compress'] == 'lz4': @@ -213,7 +231,6 @@ for r in range(nruns): benchmark_command(jdbccmd, system, 'jdbc', network, tuple, r, False) sys.stdout.flush() os.remove('query') - time.sleep(1)