Files @ 72890999e24d
Branch filter:

Location: DA/lsst_blog/lsst_schema_queries.py

Bart Scheers
Running LSST baseline queries in MonetDB
import sys, time, logging, subprocess
from os.path import basename

from pymonetdb.sql.connections import Connection as connect
from pymonetdb.exceptions import Error as DBError

def create_table(conn, csvfile, q):
    """General function to create either a source, forcedsource,
    object or objectfulloverlap table. The corresponding sql is
    executed.
    """

    tabname = basename(csvfile)[:-4].lower()
    fname = tabname.split("_")[0]
    sqlfile = "sql/create_" + fname + ".sql"
    with open(sqlfile, 'r') as f:
        qu = f.read()
    params = {'tabname': tabname}
    query = qu % (params)

    try:
        cursor = conn.cursor()
        t0 = time.time()
        cursor.execute(query)
        t = round(1000 * (time.time() - t0), 3)
        conn.commit()
        cursor.close()
        print "# CT: (Q%s: %s ms) Created table: %s" % (q, t, tabname)
    except DBError, dbe:
        print "Failed for reason %s on query %s" % (dbe, query)
        raise

def create_merge_table(conn, csvfile, q):
    """General function to create a merge table
    """

    tabname = basename(csvfile)[:-4].lower()
    merge_tabname = tabname.split("_")[0]
    with open('sql/create_merge_tabname.sql', 'r') as f:
        qu = f.read()
    params = {'merge_tabname': merge_tabname
             ,'tabname': tabname}
    query = qu % (params)

    try:
        cursor = conn.cursor()
        t0 = time.time()
        cursor.execute(query)
        t = round(1000 * (time.time() - t0), 3)
        conn.commit()
        cursor.close()
        print "# CM: (Q%s: %s ms) Created merge table %s based upon: %s" \
            % (q, t, merge_tabname, tabname)
    except DBError, dbe:
        print "Failed for reason %s on query %s" % (dbe, query)
        raise

def alter_merge_table_add_table(conn, csvfile, q):
    """Add a (sub)table to the merge table
    """

    tabname = basename(csvfile)[:-4].lower()
    merge_tabname = tabname.split("_")[0]
    with open('sql/alter_merge_table_add_table.sql', 'r') as f:
        qu = f.read()
    params = {'merge_tabname': merge_tabname
             ,'tabname': tabname}
    query = qu % (params)

    try:
        cursor = conn.cursor()
        t0 = time.time()
        cursor.execute(query)
        t = round(1000 * (time.time() - t0), 3)
        conn.commit()
        cursor.close()
        print "# AM: (Q%s: %s ms) Alter merge table %s add table %s" \
            % (q, t, merge_tabname, tabname)
    except DBError, dbe:
        print "Failed for reason %s on query %s" % (dbe, query)
        raise

def alter_merge_table_add_pkey(conn, csvfile, q):
    """Add a primary key to the merge table
    """

    tabname = basename(csvfile)[:-4].lower()
    merge_tabname = tabname.split("_")[0]
    if tabname.startswith("source_"):
        pkey = "id"
    elif tabname.startswith("forcedsource_"):
        pkey = "deepSourceId,scienceCcdExposureId"
    elif tabname.startswith("object_"):
        pkey = "deepSourceId"
    elif tabname.startswith("objectfulloverlap_"):
        return
    # NOTE: this sql is being used in load csv part as well (lsst_queries.py)
    with open('sql/alter_tabname_add_pkey.sql', 'r') as f:
        qu = f.read()
    params = {'tabname': merge_tabname
             ,'pkey': pkey}
    query = qu % (params)

    try:
        cursor = conn.cursor()
        t0 = time.time()
        cursor.execute(query)
        t = round(1000 * (time.time() - t0), 3)
        conn.commit()
        cursor.close()
        print "# AM: (Q%s: %s ms) Alter merge table %s add primary key (%s)" \
            % (q, t, merge_tabname, pkey)
    except DBError, dbe:
        print "Failed for reason %s on query %s" % (dbe, query)
        raise