Files
@ 72890999e24d
Branch filter:
Location: DA/lsst_blog/lsst_queries.py - annotation
72890999e24d
2.9 KiB
text/x-python
Running LSST baseline queries in MonetDB
72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d 72890999e24d | import time, logging
from os.path import basename
from pymonetdb.exceptions import Error as DBError
def loadcsv_drop_pkey(conn, csvfile, q):
"""
"""
tabname = basename(csvfile)[:-4].lower()
pkey = tabname + "_id_pkey"
with open('loadcsv_drop_pkey.sql', 'r') as f:
qcsv = f.read()
params = {'tabname': tabname
,'pkey': pkey}
query = qcsv % (params)
#print "Query:\n%s" % (query)
try:
cursor = conn.cursor()
t0 = time.time()
cursor.execute(query)
t = round(1000 * (time.time() - t0), 3)
conn.commit()
print "# AL: (Q%s: %s ms) Altered table %s dropped pkey %s" % (q, t, tabname, pkey)
logging.info("L;I?;Q%s;T%s;N0" % (q, t))
cursor.close()
except DBError, dbe:
print "Failed for reason %s on query %s" % (dbe, query)
raise
def alter_tabname_add_pkey(conn, csvfile, q):
"""Set a primary key for the table
"""
tabname = basename(csvfile)[:-4].lower()
if tabname.startswith("source_"):
pkey = "id"
elif tabname.startswith("forcedsource_"):
pkey = "deepSourceId,scienceCcdExposureId"
elif tabname.startswith("object_"):
pkey = "deepSourceId"
elif tabname.startswith("objectfulloverlap_"):
return
with open('sql/alter_tabname_add_pkey.sql', 'r') as f:
qcsv = f.read()
params = {'tabname': tabname
,'pkey': pkey}
query = qcsv % (params)
#print "query = %s" % (query)
try:
cursor = conn.cursor()
t0 = time.time()
cursor.execute(query)
t = round(1000 * (time.time() - t0), 3)
conn.commit()
print "# AT: (Q%s: %s ms) Altered table %s added pkey" % (q, t, tabname)
logging.info("L;I0;Q%s;T%s;N0" % (q, t))
cursor.close()
except DBError, dbe:
print "Failed for reason %s on query %s" % (dbe, query)
raise
def load_csv_copyinto(conn, csvfile, q):
"""Use copy into to load the csv file
"""
tabname = basename(csvfile)[:-4].lower()
with open('sql/loadcsv_copyinto.sql', 'r') as f:
qcsv = f.read()
params = {'tabname': tabname
,'csvfile': csvfile}
query = qcsv % (params)
#print "Query:\n%s" % (query)
try:
cursor = conn.cursor()
t0 = time.time()
n = cursor.execute(query)
t = round(1000 * (time.time() - t0), 3)
conn.commit()
print "# CI: (Q%s: %s ms) Copied %s records into table %s" % (q, t, n, tabname)
logging.info("L;I0;Q%s;T%s;N%s" % (q, t, n))
cursor.close()
except DBError, dbe:
print "Failed for reason %s on query %s" % (dbe, query)
raise
def load_csv(conn, csvfile, q):
"""Load the csvfile, then set the primary key
"""
#loadcsv_drop_pkey(conn, csvfile, q+=?)
load_csv_copyinto(conn, csvfile, q)
alter_tabname_add_pkey(conn, csvfile, q+1)
|