|
new file 100644
|
|
|
import sys, time, logging, subprocess
|
|
|
from os.path import basename
|
|
|
|
|
|
from pymonetdb.sql.connections import Connection as connect
|
|
|
from pymonetdb.exceptions import Error as DBError
|
|
|
|
|
|
def create_table(conn, csvfile, q):
|
|
|
"""General function to create either a source, forcedsource,
|
|
|
object or objectfulloverlap table. The corresponding sql is
|
|
|
executed.
|
|
|
"""
|
|
|
|
|
|
tabname = basename(csvfile)[:-4].lower()
|
|
|
fname = tabname.split("_")[0]
|
|
|
sqlfile = "sql/create_" + fname + ".sql"
|
|
|
with open(sqlfile, 'r') as f:
|
|
|
qu = f.read()
|
|
|
params = {'tabname': tabname}
|
|
|
query = qu % (params)
|
|
|
|
|
|
try:
|
|
|
cursor = conn.cursor()
|
|
|
t0 = time.time()
|
|
|
cursor.execute(query)
|
|
|
t = round(1000 * (time.time() - t0), 3)
|
|
|
conn.commit()
|
|
|
cursor.close()
|
|
|
print "# CT: (Q%s: %s ms) Created table: %s" % (q, t, tabname)
|
|
|
except DBError, dbe:
|
|
|
print "Failed for reason %s on query %s" % (dbe, query)
|
|
|
raise
|
|
|
|
|
|
def create_merge_table(conn, csvfile, q):
|
|
|
"""General function to create a merge table
|
|
|
"""
|
|
|
|
|
|
tabname = basename(csvfile)[:-4].lower()
|
|
|
merge_tabname = tabname.split("_")[0]
|
|
|
with open('sql/create_merge_tabname.sql', 'r') as f:
|
|
|
qu = f.read()
|
|
|
params = {'merge_tabname': merge_tabname
|
|
|
,'tabname': tabname}
|
|
|
query = qu % (params)
|
|
|
|
|
|
try:
|
|
|
cursor = conn.cursor()
|
|
|
t0 = time.time()
|
|
|
cursor.execute(query)
|
|
|
t = round(1000 * (time.time() - t0), 3)
|
|
|
conn.commit()
|
|
|
cursor.close()
|
|
|
print "# CM: (Q%s: %s ms) Created merge table %s based upon: %s" \
|
|
|
% (q, t, merge_tabname, tabname)
|
|
|
except DBError, dbe:
|
|
|
print "Failed for reason %s on query %s" % (dbe, query)
|
|
|
raise
|
|
|
|
|
|
def alter_merge_table_add_table(conn, csvfile, q):
|
|
|
"""Add a (sub)table to the merge table
|
|
|
"""
|
|
|
|
|
|
tabname = basename(csvfile)[:-4].lower()
|
|
|
merge_tabname = tabname.split("_")[0]
|
|
|
with open('sql/alter_merge_table_add_table.sql', 'r') as f:
|
|
|
qu = f.read()
|
|
|
params = {'merge_tabname': merge_tabname
|
|
|
,'tabname': tabname}
|
|
|
query = qu % (params)
|
|
|
|
|
|
try:
|
|
|
cursor = conn.cursor()
|
|
|
t0 = time.time()
|
|
|
cursor.execute(query)
|
|
|
t = round(1000 * (time.time() - t0), 3)
|
|
|
conn.commit()
|
|
|
cursor.close()
|
|
|
print "# AM: (Q%s: %s ms) Alter merge table %s add table %s" \
|
|
|
% (q, t, merge_tabname, tabname)
|
|
|
except DBError, dbe:
|
|
|
print "Failed for reason %s on query %s" % (dbe, query)
|
|
|
raise
|
|
|
|
|
|
def alter_merge_table_add_pkey(conn, csvfile, q):
|
|
|
"""Add a primary key to the merge table
|
|
|
"""
|
|
|
|
|
|
tabname = basename(csvfile)[:-4].lower()
|
|
|
merge_tabname = tabname.split("_")[0]
|
|
|
if tabname.startswith("source_"):
|
|
|
pkey = "id"
|
|
|
elif tabname.startswith("forcedsource_"):
|
|
|
pkey = "deepSourceId,scienceCcdExposureId"
|
|
|
elif tabname.startswith("object_"):
|
|
|
pkey = "deepSourceId"
|
|
|
elif tabname.startswith("objectfulloverlap_"):
|
|
|
return
|
|
|
# NOTE: this sql is being used in load csv part as well (lsst_queries.py)
|
|
|
with open('sql/alter_tabname_add_pkey.sql', 'r') as f:
|
|
|
qu = f.read()
|
|
|
params = {'tabname': merge_tabname
|
|
|
,'pkey': pkey}
|
|
|
query = qu % (params)
|
|
|
|
|
|
try:
|
|
|
cursor = conn.cursor()
|
|
|
t0 = time.time()
|
|
|
cursor.execute(query)
|
|
|
t = round(1000 * (time.time() - t0), 3)
|
|
|
conn.commit()
|
|
|
cursor.close()
|
|
|
print "# AM: (Q%s: %s ms) Alter merge table %s add primary key (%s)" \
|
|
|
% (q, t, merge_tabname, pkey)
|
|
|
except DBError, dbe:
|
|
|
print "Failed for reason %s on query %s" % (dbe, query)
|
|
|
raise
|
|
|
|