-
Notifications
You must be signed in to change notification settings - Fork 54
Commit
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
# Copyright 2024-2025 Broadcom | ||
# SPDX-License-Identifier: Apache-2.0 | ||
|
||
|
||
class DataQualityException(Exception): | ||
""" | ||
Exception raised for errors with the quality of the data. | ||
Attributes: | ||
checked_object -- Object that the quality checks are ran against | ||
target_table -- DWH table where target data is loaded | ||
source_view -- View from which the raw data is loaded from | ||
""" | ||
|
||
def __init__(self, checked_object, target_table, source_view): | ||
self.checked_object = checked_object | ||
self.target_table = target_table | ||
self.source_view = source_view | ||
self.message = f"""What happened: Error occurred while performing quality checks.\n | ||
Why it happened: Object: {checked_object} is not passing the quality checks.\n | ||
Consequences: The source view data will not be processed to the target table - {target_table}.\n | ||
Countermeasures: Check the source view: {source_view} what data is trying to be processed.""" | ||
super().__init__(self.message) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
# Copyright 2023-2024 Broadcom | ||
# SPDX-License-Identifier: Apache-2.0 | ||
import logging | ||
|
||
from vdk.api.job_input import IJobInput | ||
from vdk.plugin.trino.trino_utils import TrinoTemplateQueries | ||
|
||
log = logging.getLogger(__name__) | ||
|
||
|
||
def run(job_input: IJobInput): | ||
""" | ||
In this step we try to recover potentially unexistent target table from backup. | ||
In some cases the template might fail during the step where new data is written in target table | ||
(last step where tmp_target_table contents are moved to target_table). If this happens, the job fails and | ||
target table is no longer present. Fortunately it has a backup. | ||
So when the job is retried, this first step should recover the target (if the reason for the previous fail | ||
is no longer present). | ||
""" | ||
|
||
args = job_input.get_arguments() | ||
target_schema = args.get("target_schema") | ||
target_table = args.get("target_table") | ||
trino_queries = TrinoTemplateQueries(job_input) | ||
|
||
trino_queries.ensure_target_exists_step(db=target_schema, target_name=target_table) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
(SELECT * FROM "{source_schema}"."{source_view}" LIMIT 0) | ||
Check failure on line 1 in projects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/templates/load/fact/insert/01-test-if-view-matches-target.sql Codacy Production / Codacy Static Code Analysisprojects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/templates/load/fact/insert/01-test-if-view-matches-target.sql#L1
Check failure on line 1 in projects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/templates/load/fact/insert/01-test-if-view-matches-target.sql Codacy Production / Codacy Static Code Analysisprojects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/templates/load/fact/insert/01-test-if-view-matches-target.sql#L1
Check failure on line 1 in projects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/templates/load/fact/insert/01-test-if-view-matches-target.sql Codacy Production / Codacy Static Code Analysisprojects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/templates/load/fact/insert/01-test-if-view-matches-target.sql#L1
Check failure on line 1 in projects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/templates/load/fact/insert/01-test-if-view-matches-target.sql Codacy Production / Codacy Static Code Analysisprojects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/templates/load/fact/insert/01-test-if-view-matches-target.sql#L1
|
||
UNION ALL | ||
(SELECT * FROM "{target_schema}"."{target_table}" LIMIT 0) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,100 @@ | ||
# Copyright 2023-2024 Broadcom | ||
# SPDX-License-Identifier: Apache-2.0 | ||
import os | ||
import logging | ||
|
||
from vdk.api.job_input import IJobInput | ||
from vdk.plugin.trino.templates.data_quality_exception import DataQualityException | ||
from vdk.plugin.trino.trino_utils import CommonUtilities | ||
|
||
|
||
SQL_FILES_FOLDER = ( | ||
os.path.dirname(os.path.abspath(__file__)) + "/02-requisite-sql-scripts" | ||
) | ||
|
||
log = logging.getLogger(__name__) | ||
|
||
""" | ||
This step is intened to handle quality checks if such provided | ||
and stop the data from being populated into the target table if the check has negative outcome. | ||
Otherwise the data will be directly processed according to the used template type | ||
""" | ||
|
||
|
||
def run(job_input: IJobInput): | ||
job_arguments = job_input.get_arguments() | ||
|
||
check = job_arguments.get("check") | ||
if check: | ||
source_schema = job_arguments.get("source_schema") | ||
source_view = job_arguments.get("source_view") | ||
target_schema = job_arguments.get("target_schema") | ||
target_table = job_arguments.get("target_table") | ||
insert_query = CommonUtilities.get_file_content(SQL_FILES_FOLDER, "02-insert-into-target.sql") | ||
create_table_query = CommonUtilities.get_file_content(SQL_FILES_FOLDER, "02-create-clone-table.sql") | ||
create_view_query = CommonUtilities.get_file_content(SQL_FILES_FOLDER, "02-create-consolidated-view.sql") | ||
drop_table_query = CommonUtilities.get_file_content(SQL_FILES_FOLDER, "02-drop-table.sql") | ||
drop_view_query = CommonUtilities.get_file_content(SQL_FILES_FOLDER, "02-drop-view.sql") | ||
|
||
staging_schema = job_arguments.get("staging_schema", target_schema) | ||
staging_table_name = CommonUtilities.get_staging_table_name(target_schema, target_table) | ||
|
||
target_table_full_name = f"{target_schema}.{target_table}" | ||
|
||
#Drop table if already exists | ||
drop_staging_table = drop_table_query.format( | ||
target_schema=staging_schema, | ||
target_table=staging_table_name | ||
) | ||
job_input.execute_query(drop_staging_table) | ||
#create staging table with exact schema of target table | ||
create_staging_table = create_table_query.format( | ||
table_schema=staging_schema, | ||
table_name=staging_table_name, | ||
target_schema=target_schema, | ||
target_table=target_table | ||
) | ||
job_input.execute_query(create_staging_table) | ||
#insert all source_view data into staging table | ||
insert_into_staging = insert_query.format( | ||
target_schema=staging_schema, | ||
target_table=staging_table_name, | ||
source_schema=source_schema, | ||
source_view=source_view | ||
) | ||
job_input.execute_query(insert_into_staging) | ||
|
||
view_schema = staging_schema | ||
view_name = f"vw_{staging_table_name}" | ||
# Drop view if already exist | ||
drop_view = drop_view_query.format( | ||
view_schema=view_schema, | ||
view_name=view_name | ||
) | ||
job_input.execute_query(drop_view) | ||
|
||
#create consolidated view of staging and target table | ||
create_view = create_view_query.format( | ||
view_schema=view_schema, | ||
view_name=view_name, | ||
target_schema=target_schema, | ||
target_table=target_table, | ||
staging_schema=staging_schema, | ||
staging_table=staging_table_name | ||
) | ||
job_input.execute_query(create_view) | ||
|
||
view_full_name = f"{view_schema}.{view_name}" | ||
|
||
if check(view_full_name): | ||
job_input.execute_query(f"SELECT * FROM " | ||
f"information_schema.tables WHERE table_schema = '{staging_schema}' " | ||
f"AND table_name = '{staging_table_name}'") | ||
log.debug("Data View is created successfully.") | ||
else: | ||
raise DataQualityException( | ||
checked_object=view_full_name, | ||
source_view=f"{source_schema}.{source_view}", | ||
target_table=target_table_full_name, | ||
) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
CREATE TABLE IF NOT EXISTS "{table_schema}"."{table_name}"( | ||
Check failure on line 1 in projects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/templates/load/fact/insert/02-requisite-sql-scripts/02-create-clone-table.sql Codacy Production / Codacy Static Code Analysisprojects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/templates/load/fact/insert/02-requisite-sql-scripts/02-create-clone-table.sql#L1
Check failure on line 1 in projects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/templates/load/fact/insert/02-requisite-sql-scripts/02-create-clone-table.sql Codacy Production / Codacy Static Code Analysisprojects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/templates/load/fact/insert/02-requisite-sql-scripts/02-create-clone-table.sql#L1
Check failure on line 1 in projects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/templates/load/fact/insert/02-requisite-sql-scripts/02-create-clone-table.sql Codacy Production / Codacy Static Code Analysisprojects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/templates/load/fact/insert/02-requisite-sql-scripts/02-create-clone-table.sql#L1
|
||
LIKE "{target_schema}"."{target_table}" | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
CREATE VIEW {view_schema}.{view_name} | ||
Check failure on line 1 in projects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/templates/load/fact/insert/02-requisite-sql-scripts/02-create-consolidated-view.sql Codacy Production / Codacy Static Code Analysisprojects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/templates/load/fact/insert/02-requisite-sql-scripts/02-create-consolidated-view.sql#L1
Check failure on line 1 in projects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/templates/load/fact/insert/02-requisite-sql-scripts/02-create-consolidated-view.sql Codacy Production / Codacy Static Code Analysisprojects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/templates/load/fact/insert/02-requisite-sql-scripts/02-create-consolidated-view.sql#L1
|
||
AS | ||
SELECT DISTINCT * | ||
FROM | ||
( | ||
SELECT * | ||
FROM {target_schema}.{target_table} | ||
UNION ALL | ||
SELECT * | ||
FROM {staging_schema}.{staging_table} | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
DROP TABLE IF EXISTS "{target_schema}"."{target_table}" | ||
Check failure on line 1 in projects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/templates/load/fact/insert/02-requisite-sql-scripts/02-drop-table.sql Codacy Production / Codacy Static Code Analysisprojects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/templates/load/fact/insert/02-requisite-sql-scripts/02-drop-table.sql#L1
Check failure on line 1 in projects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/templates/load/fact/insert/02-requisite-sql-scripts/02-drop-table.sql Codacy Production / Codacy Static Code Analysisprojects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/templates/load/fact/insert/02-requisite-sql-scripts/02-drop-table.sql#L1
Check failure on line 1 in projects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/templates/load/fact/insert/02-requisite-sql-scripts/02-drop-table.sql Codacy Production / Codacy Static Code Analysisprojects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/templates/load/fact/insert/02-requisite-sql-scripts/02-drop-table.sql#L1
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
DROP View IF EXISTS "{view_schema}"."{view_name}" | ||
Check failure on line 1 in projects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/templates/load/fact/insert/02-requisite-sql-scripts/02-drop-view.sql Codacy Production / Codacy Static Code Analysisprojects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/templates/load/fact/insert/02-requisite-sql-scripts/02-drop-view.sql#L1
Check failure on line 1 in projects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/templates/load/fact/insert/02-requisite-sql-scripts/02-drop-view.sql Codacy Production / Codacy Static Code Analysisprojects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/templates/load/fact/insert/02-requisite-sql-scripts/02-drop-view.sql#L1
Check warning on line 1 in projects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/templates/load/fact/insert/02-requisite-sql-scripts/02-drop-view.sql Codacy Production / Codacy Static Code Analysisprojects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/templates/load/fact/insert/02-requisite-sql-scripts/02-drop-view.sql#L1
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
INSERT INTO {target_schema}.{target_table} | ||
Check failure on line 1 in projects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/templates/load/fact/insert/02-requisite-sql-scripts/02-insert-into-target.sql Codacy Production / Codacy Static Code Analysisprojects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/templates/load/fact/insert/02-requisite-sql-scripts/02-insert-into-target.sql#L1
Check failure on line 1 in projects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/templates/load/fact/insert/02-requisite-sql-scripts/02-insert-into-target.sql Codacy Production / Codacy Static Code Analysisprojects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/templates/load/fact/insert/02-requisite-sql-scripts/02-insert-into-target.sql#L1
Check failure on line 1 in projects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/templates/load/fact/insert/02-requisite-sql-scripts/02-insert-into-target.sql Codacy Production / Codacy Static Code Analysisprojects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/templates/load/fact/insert/02-requisite-sql-scripts/02-insert-into-target.sql#L1
|
||
( | ||
SELECT * | ||
FROM {source_schema}.{source_view} | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
DROP TABLE IF EXISTS "{target_schema}"."backup_{target_table}" | ||
Check failure on line 1 in projects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/templates/load/fact/insert/03-drop-backup-target.sql Codacy Production / Codacy Static Code Analysisprojects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/templates/load/fact/insert/03-drop-backup-target.sql#L1
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
DROP TABLE IF EXISTS "{target_schema}"."tmp_{target_table}" | ||
Check failure on line 1 in projects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/templates/load/fact/insert/03-drop-tmp-target.sql Codacy Production / Codacy Static Code Analysisprojects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/templates/load/fact/insert/03-drop-tmp-target.sql#L1
Check failure on line 1 in projects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/templates/load/fact/insert/03-drop-tmp-target.sql Codacy Production / Codacy Static Code Analysisprojects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/templates/load/fact/insert/03-drop-tmp-target.sql#L1
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
CREATE TABLE "{target_schema}"."tmp_{target_table}"( | ||
LIKE "{target_schema}"."{target_table}" | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
INSERT INTO "{target_schema}"."tmp_{target_table}" | ||
Check failure on line 1 in projects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/templates/load/fact/insert/05-insert-into-tmp-target.sql Codacy Production / Codacy Static Code Analysisprojects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/templates/load/fact/insert/05-insert-into-tmp-target.sql#L1
Check failure on line 1 in projects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/templates/load/fact/insert/05-insert-into-tmp-target.sql Codacy Production / Codacy Static Code Analysisprojects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/templates/load/fact/insert/05-insert-into-tmp-target.sql#L1
|
||
( | ||
SELECT * | ||
Check failure on line 3 in projects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/templates/load/fact/insert/05-insert-into-tmp-target.sql Codacy Production / Codacy Static Code Analysisprojects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/templates/load/fact/insert/05-insert-into-tmp-target.sql#L3
|
||
FROM "{target_schema}"."{target_table}" | ||
) | ||
UNION ALL | ||
( | ||
SELECT * | ||
FROM "{source_schema}"."{source_view}" | ||
) | ||
Check warning on line 10 in projects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/templates/load/fact/insert/05-insert-into-tmp-target.sql Codacy Production / Codacy Static Code Analysisprojects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/templates/load/fact/insert/05-insert-into-tmp-target.sql#L10
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
# Copyright 2023-2024 Broadcom | ||
# SPDX-License-Identifier: Apache-2.0 | ||
import logging | ||
|
||
from vdk.api.job_input import IJobInput | ||
from vdk.plugin.trino.trino_utils import TrinoTemplateQueries | ||
|
||
log = logging.getLogger(__name__) | ||
|
||
|
||
def run(job_input: IJobInput): | ||
""" | ||
In this step we try to move data from tmp_target_table (where we populated the result data in the previous step) | ||
to target table in the following way: | ||
1. Move data from target_table to a backup table | ||
2. Try to move data from tmp_target_table to target_table | ||
3. If 2 fails, try to restore target from backup | ||
4. If 3 succeeds, drop tmp target. The job fails. | ||
5. If 3 fails, target table is lost, its content are in the backup table. Next job retry will try to | ||
recover target on its first step. | ||
6. If 2 succeeds, drop backup, we are ready. | ||
Note: If there is no data in tmp_target_table, we are sure that the source table provided initially was empty, | ||
so we do nothing, target remains unchanged and we drop the empty tmp_target_table. | ||
""" | ||
|
||
args = job_input.get_arguments() | ||
target_schema = args.get("target_schema") | ||
source_view = args.get("source_view") | ||
target_table = args.get("target_table") | ||
tmp_target_table = "tmp_" + target_table | ||
trino_queries = TrinoTemplateQueries(job_input) | ||
|
||
log.debug("Check if tmp target has data.") | ||
res = job_input.execute_query( | ||
f""" | ||
SELECT COUNT(*) FROM {target_schema}.{tmp_target_table} | ||
""" | ||
) | ||
if res and res[0][0] > 0: | ||
log.debug( | ||
"Confirmed that tmp target has data, proceed with moving it to target." | ||
) | ||
trino_queries.perform_safe_move_data_to_table_step( | ||
from_db=target_schema, | ||
from_table_name=tmp_target_table, | ||
to_db=target_schema, | ||
to_table_name=target_table, | ||
) | ||
else: | ||
log.info( | ||
f"Target table {target_schema}.{target_table} remains unchanged " | ||
f"because source table {target_schema}.{source_view} was empty." | ||
) | ||
trino_queries.drop_table(target_schema, tmp_target_table) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
### Purpose: | ||
|
||
This template can be used to load raw data from Data Lake to target Table in Data Warehouse. In summary, it appends all records from the source view to the target table. Similar to all other SQL modeling templates there is also schema validation, table refresh and statistics are computed when necessary. | ||
|
||
### Template Name (template_name): | ||
|
||
- "insert" | ||
|
||
### Template Parameters (template_args): | ||
|
||
- target_schema - Data Warehouse schema, where target data is loaded | ||
- target_table - Data Warehouse table of DW type table, where target data is loaded | ||
- source_schema - Data Lake schema, where source raw data is loaded from | ||
- source_view - Data Lake view, where source raw data is loaded from | ||
- check - (Optional) Callback function responsible for checking the quality of the data. Takes in a table name as a parameter which will be used for data validation | ||
- staging_schema - (Optional) Schema where the checks will be executed. If not provided target_schema will be used as default | ||
|
||
### Prerequisites: | ||
|
||
In order to use this template you need to ensure the following: | ||
- {source_schema}.{source_view} exists | ||
- {target_schema}.{target_table} exists | ||
- {source_schema}.{source_view} has the exact same schema as {target_schema}.{target_table} | ||
|
||
### Sample Usage: | ||
|
||
The diagram below shows how the data from the source view is inserted into the target table, without checking for duplicate data. | ||
<img width="836" alt="insert_template_2" src="https://user-images.githubusercontent.com/21333266/204796027-0e77631d-dc12-497d-b0c6-df09bbc51a0e.png"> | ||
So you should be careful for ensuring no duplicate rows are introduced in the source view you are using. | ||
|
||
Say there is SDDC-related 'Snapshot Periodic Fact Table' called 'fact_vmc_utilization_cpu_mem_every5min_daily' in 'history' schema. | ||
Updating it with the latest raw data from a Data Lake (from source view called 'vw_fact_vmc_utilization_cpu_mem_every5min_daily' in 'default' schema) is done in the following manner: | ||
|
||
```python | ||
def run(job_input): | ||
# . . . | ||
template_args = { | ||
'source_schema': 'default', | ||
'source_view': 'vw_fact_vmc_utilization_cpu_mem_every5min_daily', | ||
'target_schema': 'history', | ||
'target_table': 'fact_vmc_utilization_cpu_mem_every5min_daily' | ||
} | ||
job_input.execute_template('insert', template_args) | ||
# . . . | ||
``` | ||
|
||
### Example | ||
|
||
See all templates in [our example documentation](https://github.com/vmware/versatile-data-kit/wiki/SQL-Data-Processing-templates-examples). |