Skip to content

VDK: SQL and Data Flow Improvements

Dilyan Marinov edited this page Feb 9, 2024 · 2 revisions

Current state

SQL files

SQL files can only contain a single query. For longer scripts with multiple queries, you have to use python.

def run(job_input: IJobInput):
    users = job_input.execute_query("SELECT * from Users");
    job_input.execute_query("ALTER TABLE Users ADD Email varchar(255);");
    # something else here    

SQL and python code is not decoupled. This makes it hard to maintain different versions of the data job. For one, checking diffs between versions in source control becomes more challenging. It also goes against the supposed modularity of vdk data jobs. You can't pluck out and SQL step and replace it with something else, for example. Your SQL code leaks into your python code for longer scripts unless you decide to have multiple SQL files with one query each. VDK users should be given an option to keep SQL code separate from python code.

Pulling data from one database and ingesting into another

Let's say we have an SQLite database and an Oracle database. We'd like to pull data from the SQLite database into Oracle. We configure the vdk data job to ingest into Oracle

[vdk]
ingest_method_default=ORACLE
db_default_type=oracle
oracle_use_secrets=True
oracle_user=username
oracle_connection_string=(DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(HOST=myhost.hostme.com)(PORT=1521))(CONNECT_DATA=(SID=mysid)))

And then have the following step

import sqlite3

def run(job_input: IJobInput):
    con = sqlite3.connect("company.db")
    cur = con.cursor()
    qres = cur.execute("SELECT * FROM Users")
    rows = qres.fetchall()
    # do something fancy with the data
    job_input.send_tabular_data_for_ingestion(rows=rows, column_names=["id", "username", "email"], destination_table="Users")

This works fine. The only problem is VDK already has an SQLite plugin. It even contains similar code to the one used in the example step. We're asking users to re-implement part of VDK every time they want to connect to a different database. They should just be able to use the existing SQLite plugin.

Passing data between steps

Can't front-load risky or expensive operations

The below example prepares the database for ingestion in the first step. In the second step, it does a GET request that has a high chance to fail because it fetches a large amount of data. If the request fails, the job fails. However, if the job fails, the SQL step was for nothing. We'd like to execute the risky request before we run the SQL, but we can't. It would be nice if we could front-load risky or expensive operations and use that data in later steps. This way, the job fails fast and no unnecessary steps are executed in case of failure.

10_complicated_query.sql

-- something complex that prepares the db for ingestion

20_fetch_data.py

def run(job_input: IJobInput):
    # takes a long time to execute
    data = []
    response = requests.get(f'https://api.oldserver.com/fetchall')
    # processing logic for response.json
    job_input.send_tabular_data_for_ingestion(rows=data, column_names=["id", "something", "something_else"], destination_table="stuff")

Can't re-use data that was already fetched

In the below example, we fetch data from an API and want to cross-reference it with data we fetch from a database. We can't split this up in two different steps. We can't pass the data we fetched from the API to a different step.

10_process_data.py

def run(job_input: IJobInput):
    data = []
    response = requests.get(f'https://api.someapi.com/users/active')
    data = response.json()
    data = process_data(data)
    # Want to further enrich the data with data I have in a db
    # Can't do it in a separate step, has to be this one again
    # The other option is to store it in a temp table and select it later
    con = sqlite3.connect("company.db")
    cur = con.cursor()
    qres = cur.execute("SELECT * FROM Users where Status='ACTIVE'")
    sup_data = qres.fetchall()
    data = cross_reference(data, sup_data)
    # Send for ingestion

Problem statement

  1. SQL support in VDK is not fully-featured
  2. Users do not have full control of the data flow in VDK data jobs

Proposed solutions

Execute whole SQL scripts as VDK steps

Overview

Executing SQL scripts as separate steps gives users the option to separate their SQL code from their python code. This way, it's much clearer what each step does, e.g. SQL steps always query a database.

Example

10_backup_data.sql

CREATE TABLE Products_Backup (
    id int,
    ProductID int,
    ProductName varchar(255),
    CategoryName varchar(255),
    Count
);

CREATE TABLE Users_Backup (
    id int,
    Name varchar(255),
    Location varchar(255),
    Email varchar(255),
);

INSERT INTO Users_Backup SELECT * FROM Users;
INSERT INTO Products_Backup SELECT * FROM Products;

20_python_processing.py

def run(job_input: IJobInput):
    log.info("Doing some fancy stuff to the original tables")
    # Do the fancy stuff

30_clean_up.sql

DROP TABLE Products_Backup;
DROP TABLE Users_Backup;

Pro-Con Analysis

PRO: Running SQL becomes a full feature. We can have more complex SQL-only jobs, for example.
PRO: In mixed jobs, SQL that is not interpolated can be separated into it's own steps.
CON: Implementing this efficiently might not be trivial for some python-sql libraries. Some libraries might not support executing whole scripts in one go.

Use available database plugins for fetching data

We should have a way to specify the database driver to use for the specific SQL file in config.ini. In case of executing SQL from python, we should be able to specify the driver in the function call. This way, we can configure multiple databases as data sources and query all of them, provided that the correct plugin is installed.

Overview

Example SQL Steps

10_select_all_users.sql

SELECT * from Users;

20_select_all_products.sql

SELECT * from Products;

30_select_all_suppliers.sql

SELECT * from Suppliers;

config.ini

[vdk]
sql_step_drivers=["impala": ["10_select_all_users.sql", "30_select_all_suppliers.sql"], "oracle": ["20_select_all_products.sql"]]

Example SQL in Python Steps

10_run_queries_from_python.py

def run(job_input: IJobInput):
    users = job_input.execute_query("SELECT * from Users", driver="impala")
    products = job_input.execute_query("SELECT * from Products", driver="impala")
    suppliers = job_input.execute_query("SELECT * from Suppliers", driver="oracle")

Pro-Con Analysis

PRO: Users don't have to write extra code for fetching data from non-default databases.
CON: Increased job complexity, e.g. we can have 10 SQL steps, each with a different data source.

Pass data between steps

Overview

If an SQL step ends with a SELECT query, then the result of this query should be available to other steps. If a python step returns a value, that value should be available to other steps. Python steps should be able to specify which data they need from which previous steps and then access it through a shared object, e.g. a data cache that's available in each step.

Example SQL to Python

10_run_sql.sql

CREATE TABLE Products (
    id int NOT NULL AUTO_INCREMENT,
    ProductID int,
    ProductName varchar(255),
    CategoryName varchar(255),
    Count int DEFAULT 0
);

SELECT ProductID, ProductName, CategoryName
FROM Products
INNER JOIN Categories ON Products.CategoryID = Categories.CategoryID;

20_run_python.py

import logging
from vdk.api.job_input import IJobInput

log = logging.getLogger(__name__)

@use_data(steps=[10_run_sql.sql])
def run(job_input: IJobInput, data_cache: IDataCache):
    products = data_cache["10_run_sql.sql"]
    for product in products:
        prod_id = product["ProductID"]
        response = requests.get(f'https://api.productcount.com/inv/{prod_id}')
        inv_count = response.json().get("count", 0)
        product["Count"] = inv_count
        job_input.send_object_for_ingestion(payload=product, destination_table="Products", method="oracle")

Example Python to Python

10_process_data.py

def run(job_input: IJobInput):
    data = []
    response = requests.get(f'https://api.someapi.com/users/active')
    data = response.json()
    data = process_data(data)
    return data

20_some_queries.sq

-- Do something in sql between the two python steps
Select * from Products;

30_more_processing.py

@use_data(steps=["10_process_data.py"])
def run(job_input: IJobInput, data_cache: IDataCache):
    data = data_cache["10_process_data.py"]
    con = sqlite3.connect("company.db")
    cur = con.cursor()
    qres = cur.execute("SELECT * FROM Users where Status='ACTIVE'")
    sup_data = qres.fetchall()
    data = cross_reference(data, sup_data)
    # Send for ingestion
    return data

40_something_else.py

@use_data(steps=["20_some_queries.sql", "30_more_processing.py"])
def run(job_input: IJobInput, data_cache: IDataCache):
     data = data_cache["20_some_queries.sql.py"]
     other_data = data_cache["30_more_processing.py"]

Pro-Con Analysis

PRO: Users can front-load expensive operations.
PRO: Different types of data processing can be decoupled.
PRO: Easier to understand where data is coming from and how it flows through the system.
PRO: We could go one step further and order the job as a dependency graph. This way, steps that don't depend on each other can run in parallel.
CON: Needs pre-processing of data job files to figure out which data to cache and which to discard.
CON: Memory constraints.

Clone this wiki locally