Files @ 0818c337d503
Branch filter:

Location: DA/lsst_blog/lsst_schema_queries.py

Bart Scheers
Add blog post url
import sys, time, logging, subprocess
from os.path import basename

from pymonetdb.sql.connections import Connection as connect
from pymonetdb.exceptions import Error as DBError

def create_table(conn, csvfile, q):
    """General function to create either a source, forcedsource,
    object or objectfulloverlap table. The corresponding sql is
    executed.
    """

    tabname = basename(csvfile)[:-4].lower()
    fname = tabname.split("_")[0]
    sqlfile = "sql/create_" + fname + ".sql"
    with open(sqlfile, 'r') as f:
        qu = f.read()
    params = {'tabname': tabname}
    query = qu % (params)

    try:
        cursor = conn.cursor()
        t0 = time.time()
        cursor.execute(query)
        t = round(1000 * (time.time() - t0), 3)
        conn.commit()
        cursor.close()
        print "# CT: (Q%s: %s ms) Created table: %s" % (q, t, tabname)
    except DBError, dbe:
        print "Failed for reason %s on query %s" % (dbe, query)
        raise

def create_merge_table(conn, csvfile, q):
    """General function to create a merge table
    """

    tabname = basename(csvfile)[:-4].lower()
    merge_tabname = tabname.split("_")[0]
    with open('sql/create_merge_tabname.sql', 'r') as f:
        qu = f.read()
    params = {'merge_tabname': merge_tabname
             ,'tabname': tabname}
    query = qu % (params)

    try:
        cursor = conn.cursor()
        t0 = time.time()
        cursor.execute(query)
        t = round(1000 * (time.time() - t0), 3)
        conn.commit()
        cursor.close()
        print "# CM: (Q%s: %s ms) Created merge table %s based upon: %s" \
            % (q, t, merge_tabname, tabname)
    except DBError, dbe:
        print "Failed for reason %s on query %s" % (dbe, query)
        raise

def alter_merge_table_add_table(conn, csvfile, q):
    """Add a (sub)table to the merge table
    """

    tabname = basename(csvfile)[:-4].lower()
    merge_tabname = tabname.split("_")[0]
    with open('sql/alter_merge_table_add_table.sql', 'r') as f:
        qu = f.read()
    params = {'merge_tabname': merge_tabname
             ,'tabname': tabname}
    query = qu % (params)

    try:
        cursor = conn.cursor()
        t0 = time.time()
        cursor.execute(query)
        t = round(1000 * (time.time() - t0), 3)
        conn.commit()
        cursor.close()
        print "# AM: (Q%s: %s ms) Alter merge table %s add table %s" \
            % (q, t, merge_tabname, tabname)
    except DBError, dbe:
        print "Failed for reason %s on query %s" % (dbe, query)
        raise

def alter_merge_table_add_pkey(conn, csvfile, q):
    """Add a primary key to the merge table
    """

    tabname = basename(csvfile)[:-4].lower()
    merge_tabname = tabname.split("_")[0]
    if tabname.startswith("source_"):
        pkey = "id"
    elif tabname.startswith("forcedsource_"):
        pkey = "deepSourceId,scienceCcdExposureId"
    elif tabname.startswith("object_"):
        pkey = "deepSourceId"
    elif tabname.startswith("objectfulloverlap_"):
        return
    # NOTE: this sql is being used in load csv part as well (lsst_queries.py)
    with open('sql/alter_tabname_add_pkey.sql', 'r') as f:
        qu = f.read()
    params = {'tabname': merge_tabname
             ,'pkey': pkey}
    query = qu % (params)

    try:
        cursor = conn.cursor()
        t0 = time.time()
        cursor.execute(query)
        t = round(1000 * (time.time() - t0), 3)
        conn.commit()
        cursor.close()
        print "# AM: (Q%s: %s ms) Alter merge table %s add primary key (%s)" \
            % (q, t, merge_tabname, pkey)
    except DBError, dbe:
        print "Failed for reason %s on query %s" % (dbe, query)
        raise