Changeset - b01cb2bb9139
[Not reviewed]
0 1 1
Mark Raasveldt - 9 years ago 2016-08-02 15:00:51
mark.raasveldt@gmail.com
Add netcat of our own protocol message.
2 files changed with 106 insertions and 4 deletions:
0 comments (0 inline, 0 general)
genprotocol.py
Show inline comments
 
new file 100644
 
import numpy
 
import pandas
 
from datetime import datetime
 
import time
 

	
 

	
 
def datetoint(col):
 
	return numpy.array([time.mktime(datetime.strptime(x, '%Y-%m-%d').timetuple()) for x in col], dtype=numpy.int64)
 

	
 
chunkbyte_choices = [100000, 1000000, 10000000, 100000000]
 
chunkbytes = 1000000
 

	
 
tuples = [1,100,1000,10000,100000,1000000]
 
filename = '/home/user/lineitem-%d.csv'
 

	
 
for chunkbytes in chunkbyte_choices:
 
	for tuple in tuples:
 
		cfile = pandas.read_csv(filename % tuple, sep='|', header=None, names=['l_orderkey','l_partkey','l_suppkey','l_linenumber','l_quantity','l_extendedprice','l_discount','l_tax','l_returnflag','l_linestatus','l_shipdate','l_commitdate','l_receiptdate','l_shipinstruct','l_shipmode','l_comment'])
 

	
 
		cfile.l_orderkey = cfile.l_orderkey.astype(numpy.int32)
 
		cfile.l_partkey = cfile.l_partkey.astype(numpy.int32)
 
		cfile.l_suppkey = cfile.l_suppkey.astype(numpy.int16)
 
		cfile.l_linenumber = cfile.l_linenumber.astype(numpy.int8)
 
		cfile.l_quantity = cfile.l_quantity.astype(numpy.int8)
 
		cfile.l_shipdate = datetoint(cfile.l_shipdate)
 
		cfile.l_commitdate = datetoint(cfile.l_commitdate)
 
		cfile.l_receiptdate = datetoint(cfile.l_receiptdate)
 

	
 
		out_col = open('lineitem-%dtpl-%dchunksize.col' % (tuple, chunkbytes), 'wb+')
 
		out_row = open('lineitem-%dtpl-%dchunksize.row' % (tuple, chunkbytes), 'wb+')
 

	
 
		startrow = 0
 

	
 
		while startrow < len(cfile):
 
			print(startrow)
 
			tmp = chunkbytes - 16
 
			row = 0
 
			rowsize = 0
 
			# for the current chunk, compute the amount of rows to transmit
 
			while startrow + row < len(cfile):
 
				rowsize = 0
 
				for i in range(len(cfile.columns)):
 
					col = cfile[cfile.columns[i]]
 
					if col.dtype == numpy.object:
 
						rowsize += len(col[row]) + 1
 
					else:
 
						rowsize += col.dtype.itemsize
 
				if (tmp < rowsize):
 
					break
 
				tmp -= rowsize
 
				row += 1
 

	
 
			# write column-protocol
 

	
 
			# message length
 
			numpy.array(chunkbytes - tmp, dtype=numpy.int64).tofile(out_col)
 
			# row count
 
			numpy.array(row, dtype=numpy.int64).tofile(out_col)
 

	
 
			for i in range(len(cfile.columns)):
 
				col = cfile[cfile.columns[i]]
 
				if col.dtype == numpy.object:
 
					for o in range(startrow, (startrow + row)):
 
						out_col.write(col[o])
 
						out_col.write('\0')
 
				else:
 
					numpy.array(col[startrow:(startrow + row)]).tofile(out_col)
 

	
 
			# write row-protocol
 

	
 
			# message length
 
			numpy.array(chunkbytes - tmp, dtype=numpy.int64).tofile(out_row)
 
			for r in range(startrow, startrow+row):
 
				for i in range(len(cfile.columns)):
 
					col = cfile[cfile.columns[i]]
 
					if col.dtype == numpy.object:
 
						out_row.write(col[r])
 
						out_row.write('\0')
 
					else:
 
						numpy.array(col[r]).tofile(out_row)
 

	
 
			startrow += row
 

	
 
		out_col.close()
 
		out_row.close()
vldb-protocols.py
Show inline comments
 
@@ -4,13 +4,13 @@ import sys
 
import csv
 
import re
 
import json
 
import subprocess
 

	
 
systems = [
 
	{'name':'hive-default', 'db':'hive'}, 
 
	{'name':'hive-default', 'db':'hive'},
 
	{'name':'netcat-csv-lz4', 'db':'netcat', 'compress': 'lz4'},
 
	{'name':'netcat-csv-lz4-heavy', 'db':'netcat', 'compress': 'lz4-heavy'},
 
	{'name':'netcat-csv-gzip', 'db':'netcat', 'compress': 'gzip'},
 
	{'name':'netcat-csv-xz', 'db':'netcat', 'compress': 'xz'},
 
	{'name':'netcat-csv', 'db':'netcat'},
 
	{'name':'mariadb-compress', 'db':'mariadb', 'compress': True},
 
@@ -27,12 +27,28 @@ networks = [
 
	#{'name':'gigabitethld', 'throughput': 1000, 'latency':0.3},
 
	#{'name':'10mbitethhd', 'throughput': 10, 'latency':150}]
 

	
 
#tuples = [1,100,1000,10000,100000,1000000,10000000]
 
tuples = [1,100,1000,10000]#,100000,1000000]
 

	
 
# netcat our protocol messages
 
# we're gonna have a lot of these, so we generate them
 
chunksizes = [100000, 1000000, 10000000, 100000000]
 
compression_methods = ['lz4', 'lz4-heavy', 'gzip', 'xz']
 
extensions = ['col', 'row']
 
for chunksize in chunksizes:
 
	for compression_method in compression_methods:
 
		for extension in extensions:
 
			systems.append({
 
				'name': 'netcat-prot-%s-chunk-%d-%s' % (extension, chunksize, compression_method), 
 
				'db': 'netcat', 
 
				'protocol': True,
 
				'fileext': extension, 
 
				'chunksize': chunksize,
 
				'compress': compression_method})
 

	
 
nruns = 5
 
timeout = "10m"
 

	
 
netcat_port = 4444
 

	
 
fnull = open(os.devnull, 'w')
 
@@ -164,14 +180,16 @@ for r in range(nruns):
 
				elif system['db'] == 'hive':
 
					querycmd = None
 
					jdbcflags = 'org.apache.hive.jdbc.HiveDriver jdbc:hive2://localhost:10000 user null'
 
					odbccmd = None
 
					odbcdriver = None
 
				elif system['db'] == 'netcat':
 
					# open netcat for listening in a separate process
 
					filename = '/home/user/lineitem-%d.csv' % tuple
 
					if 'protocol' not in system:
 
						filename = '/home/user/lineitem-%d.csv' % tuple
 
					else:
 
						filename = '/home/user/lineitem-%dtpl-%dchunksize.%s' % (tuple, system['chunksize'], system['fileext'])
 
					compress_cmd = ''
 
					if 'compress' in system:
 
						if system['compress'] == 'lz4': 
 
							compress_cmd = 'lz4 -c - |'
 
						elif system['compress'] == 'lz4-heavy': 
 
							compress_cmd = 'lz4 -9 -c - |'
 
@@ -210,12 +228,11 @@ for r in range(nruns):
 
						benchmark_command(odbccmd, system, 'odbc', network, tuple, r, False)
 
					# jdbc
 
					if jdbcflags is not None:
 
						benchmark_command(jdbccmd, system, 'jdbc', network, tuple, r, False)
 
				sys.stdout.flush()
 
				os.remove('query')
 
				time.sleep(1)
 

	
 

	
 

	
 
			syscall("sudo tc qdisc del dev lo root netem 2>/dev/null")
 
			time.sleep(0.1)
0 comments (0 inline, 0 general)