ETL

SQLSorcery is also useful for simple script based ETL actions.

Note

Keep in mind memory constraints when attempting bulk insertions. You can also improve performance by batching inserts using the chunksize param. A sane default is batches of 1000.

Insert csv to table

Insert ~1 million IMDB ratings into a MySQL table.

1
2
3
4
5
6
from sqlsorcery import MySQL
import pandas as pd

sql = MySQL()
df = pd.read_csv("title.ratings.tsv", sep="\t")
sql.insert_into("ratings", df)

Copy table between databases

Copy the contents of a query in one database to another:

1
2
3
4
5
6
7
from sqlsorcery import MSSQL, PostgreSQL

ms = MSSQL()
pg = PostgreSQL()

df = pg.query("SELECT * FROM tablename")
ms.insert_into("new_table", df)

Query API endpoint and load into table

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
import requests
import pandas as pd
from sqlsorcery import SQLite

sql = SQLite(path="example.db")

response = requests.get("https://swapi.co/api/people/").json()
next_page = response["next"]

while next_page:
    response = requests.get(next_page).json()
    results = response["results"]
    next_page = response["next"]

    df = pd.DataFrame(results)
    df["film_appearances"] = len(df["films"])
    df = df[["name", "gender", "film_appearances"]]
    sql.insert_into("star_wars", df)

Update table values

It is often necessary to modify existing records in a table after loading. There are several ways to accomplish this in SQLSorcery depending on your use case including issuing raw commands or embedding within a stored procedure.

Via SQLAlchemy

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
import datetime
from sqlsorcery import MSSQL
import pandas as pd


sql = MSSQL()
df = pd.read_csv("daily_ratings.csv")
sql.insert_into("ratings_cache", df)
table = sql.table("ratings_cache")
# Adds today's date as the datestamp to all records
table.update().values(datestamp=datetime.date.today())

OR you could specify an additional WHERE clause

# If you wanted to override a specific rating
table.update().where(table.c.name=="Top Gun").values(avgRating="10")

Via pandas

With this scenario you would just modify the dataframe in memory before inserting into the database. This has trade-offs for performance as well as traceability.

1
2
3
4
5
6
7
8
9
import datetime
from sqlsorcery import MSSQL
import pandas as pd


sql = MSSQL()
df = pd.read_csv("daily_ratings.csv")
df["datestamp"] = datetime.date.today()
sql.insert_into("ratings_cache", df)

Via command

1
2
3
4
5
6
7
8
from sqlsorcery import MSSQL
import pandas as pd


sql = MSSQL()
df = pd.read_csv("daily_ratings.csv")
sql.insert_into("ratings_cache", df)
sql.exec_cmd("UPDATE ratings_cache SET datestamp = GETDATE()")

Truncate a table

It is often desirable to empty a table’s contents before loading additional records during an ETL process. This is commonly used in conjuntion with a cache table which will be further transformed after the raw data is loaded into the database.

There are several ways to accomplish this in SQLSorcery depending on your use case.

Drop and replace during insert

from sqlsorcery import MSSQL
import pandas as pd

sql = MSSQL()
df = pd.read_csv("daily_ratings.csv")
sql.insert_into("ratings_cache", df, if_exists="replace")

Truncate all records

Most databases support TRUNCATE TABLE statements which differ from DELETE FROM statements in how logging and diskspace is handled. A truncate will also reset any identity column on the table.

1
2
3
4
5
6
7
from sqlsorcery import MSSQL
import pandas as pd

sql = MSSQL()
sql.truncate("ratings_cache")
df = pd.read_csv("daily_ratings.csv")
sql.insert_into("ratings_cache", df)

Delete all records

This will flush the table’s contents, but will not reset the values in the identity column (such as an id or primary key). This is useful if you will want the insert to fail if the schema has changed.

1
2
3
4
5
6
7
from sqlsorcery import MSSQL
import pandas as pd

sql = MSSQL()
sql.delete("ratings_cache")
df = pd.read_csv("daily_ratings.csv")
sql.insert_into("ratings_cache", df)

Delete specific records

You might also find it necessary to only delete a subset of records. To do so you can drop down into SQLAlchemy to pass a WHERE clause.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
import datetime
from sqlsorcery import MSSQL
import pandas as pd


sql = MSSQL()
table = sql.table("ratings_cache")
table.delete().where(table.c.datestamp == datetime.date.today())
df = pd.read_csv("daily_ratings.csv")
sql.insert_into("ratings_cache", df)

Execute a stored procedure

The following command will execute a stored procedure called sproc_upsert_ratings which merges data from a daily cache table of movie ratings into longitudinal table which stores all the daily results over time.

1
2
3
4
5
6
7
from sqlsorcery import MSSQL
import pandas as pd

sql = MSSQL()
df = pd.read_csv("daily_ratings.csv")
sql.insert_into("ratings_cache", df, if_exists="replace")
sql.exec_sproc("sproc_upsert_ratings")

The content of this stored procedure might look like:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
IF OBJECT_ID('sproc_upsert_ratings') IS NULL
    EXEC('CREATE PROCEDURE sproc_upsert_ratings AS SET NOCOUNT ON;')
GO

ALTER PROCEDURE dbo.sproc_upsert_ratings AS
BEGIN
    SET NOCOUNT ON;

    MERGE dbo.factRatings AS target
    USING dbo.ratings_cache AS source
    ON (target.id = source.id)
    WHEN MATCHED THEN
        UPDATE SET name = source.Name
            ,avgRating = source.avgRating
            ,numVotes = source.numVotes
    WHEN NOT MATCHED THEN
        INSERT (id, name, avgRating, numVotes)
        VALUES (source.id, source.name, source.avgRating, source.numVotes)
END;

Note

If your stored procedure does not return a result, you can/should pass the autocommit=True param. For more information on autocommit see SQLAlchemy’s documentation <https://docs.sqlalchemy.org/en/13/core/connections.html#understanding-autocommit>

Execute any arbitrary command

Any valid SQL command can be passed raw to be executed. This is a catch all for things like function calls, create, or drop commands, etc.

Create a table from SQL command string

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
from sqlsorcery import MSSQL

sql = MSSQL()

table = """
    CREATE TABLE star_wars (
        name VARCHAR(100) NULL,
        gender VARCHAR(25) NULL,
        film_appearances INT NULL
    )
"""
sql.exec_cmd(table)

Create a table from a .sql file

Assuming you have a .sql file named table.auth_user.sql:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
CREATE TABLE IF NOT EXISTS auth_user (
    id SERIAL NOT NULL CONSTRAINT auth_user_pkey PRIMARY KEY,
    password VARCHAR(128) NOT NULL,
    last_login TIMESTAMP WITH TIME ZONE,
    is_superuser BOOLEAN NOT NULL,
    username VARCHAR(150)NOT NULL CONSTRAINT auth_user_username_key UNIQUE,
    first_name VARCHAR(30) NOT NULL,
    last_name VARCHAR(150) NOT NULL,
    email VARCHAR(254) NOT NULL,
    is_staff BOOLEAN NOT NULL,
    is_active BOOLEAN NOT NULL,
    date_joined TIMESTAMP WITH TIME ZONE NOT NULL
);

ALTER TABLE auth_user OWNER TO admin;

CREATE INDEX IF NOT EXISTS auth_user_username_idx ON auth_user (username);

You can execute it like so:

1
2
3
4
from sqlsorcery import MSSQL

sql = MSSQL()
sql.exec_cmd_from_file("table.auth_user.sql")

Drop a table from SQL command string

1
2
3
4
from sqlsorcery import MSSQL

sql = MSSQL()
sql.exec_cmd("DROP TABLE star_wars")

Note

Keep in mind this is merely an example of the types of commands that can be sent through raw. A cleaner way to drop a table is sql.table('star_wars').drop().