Skip to content

Commit 453f550

Browse files
Add data quality checks (#11)
* add data quality checks * improve assertions
1 parent 5772d20 commit 453f550

File tree

14 files changed

+122
-50
lines changed

14 files changed

+122
-50
lines changed

.pre-commit-config.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ default_language_version:
22
python: python3
33
repos:
44
- repo: https://github.com/astral-sh/ruff-pre-commit
5-
rev: v0.12.0
5+
rev: v0.12.3
66
hooks:
77
- id: ruff
88
args:

Pipfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ pytest = "==8.3.2"
1717
jinja2 = "==3.1.4"
1818
pyspark = "==3.5.5"
1919
pytest-cov = "==5.0.0"
20+
databricks-labs-dqx = "==0.7.0"
2021
packages = "*"
2122

2223
[dev-packages]

README.md

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11

2-
# Template project with medallion architecture, Python packaging, unit tests, integration tests, CI/CD automation, and Databricks Asset Bundles.
2+
# Template project with medallion architecture, Python packaging, unit tests, integration tests, CI/CD automation, Databricks Asset Bundles, and DQX data quality framework.
33

44
This project template provides a structured approach to enhance productivity when delivering ETL pipelines on Databricks. Feel free to customize it based on your project's specific nuances and the audience you are targeting.
55

@@ -11,7 +11,8 @@ This project template demonstrates how to:
1111
- structure PySpark code inside classes/packages.
1212
- structure unit tests for the data transformations and set up VS Code to run them on your local machine.
1313
- structure integration tests to be executed on different environments / catalogs.
14-
- package code and deploy it to different environments (dev, staging, prod) using a CI/CD pipeline with [Github Actions](https://docs.github.com/en/actions).
14+
- utilize [Databricks DQX](https://databrickslabs.github.io/dqx/) to define and enforce data quality rules, such as null checks, uniqueness, thresholds, and schema validation.
15+
- package and deploy code to different environments (dev, staging, prod) using a CI/CD pipeline with [Github Actions](https://docs.github.com/en/actions).
1516
- isolate "dev" environments / catalogs to avoid concurrency issues between developers testing jobs.
1617
- configure the workflow to run in different environments with different parameters with [jinja package](https://pypi.org/project/jinja2/).
1718
- configure the workflow to run tasks selectively.
@@ -25,7 +26,7 @@ This project template demonstrates how to:
2526
- utilize [Databricks CLI](https://docs.databricks.com/en/dev-tools/cli/index.html) and [Databricks Asset Bundles](https://docs.databricks.com/en/dev-tools/bundles/index.html) to package/deploy/run a Python wheel package on Databricks.
2627
- utilize [Databricks SDK for Python](https://docs.databricks.com/en/dev-tools/sdk-python.html) to manage workspaces and accounts. The sample script enables metastore system tables with [relevant data about billing, usage, lineage, prices, and access](https://www.youtube.com/watch?v=LcRWHzk8Wm4).
2728
- utilize [Databricks Unity Catalog](https://www.databricks.com/product/unity-catalog) and get data lineage for your tables and columns and a simplified permission model for your data.
28-
- utilize [Databricks Workflows](https://docs.databricks.com/en/workflows/index.html) to execute a DAG and [task parameters](https://docs.databricks.com/en/workflows/jobs/parameter-value-references.html) to share context information between tasks (see [Task Parameters section](#task-parameters)). Yes, you don't need Airflow to manage your DAGs here!!!
29+
- utilize [Databricks Lakeflow Jobs](https://docs.databricks.com/en/workflows/index.html) to execute a DAG and [task parameters](https://docs.databricks.com/en/workflows/jobs/parameter-value-references.html) to share context information between tasks (see [Task Parameters section](#task-parameters)). Yes, you don't need Airflow to manage your DAGs here!!!
2930
- utilize [Databricks job clusters](https://docs.databricks.com/en/workflows/jobs/use-compute.html#use-databricks-compute-with-your-jobs) to reduce costs.
3031
- define clusters on AWS and Azure.
3132

@@ -39,7 +40,7 @@ Sessions on Databricks Asset Bundles, CI/CD, and Software Development Life Cycle
3940
- [Deploying Databricks Asset Bundles (DABs) at Scale](https://www.youtube.com/watch?v=mMwprgB-sIU)
4041
- [A Prescription for Success: Leveraging DABs for Faster Deployment and Better Patient Outcomes](https://www.youtube.com/watch?v=01JHTM2UP-U)
4142

42-
### DAG
43+
### DAGs
4344

4445
<br>
4546

@@ -51,18 +52,27 @@ Sessions on Databricks Asset Bundles, CI/CD, and Software Development Life Cycle
5152

5253
<br>
5354

54-
<img src="docs/task output.png">
55+
<img src="docs/task_output.png">
5556

5657
<br>
5758

5859
### Data Lineage (Catalog Explorer)
5960

6061
<br>
6162

62-
<img src="docs/data lineage.png">
63+
<img src="docs/data_lineage.png">
6364

6465
<br>
6566

67+
### Data Quality (generated by Databricks DQX)
68+
69+
<br>
70+
71+
<img src="docs/data_quality.png">
72+
73+
<br>
74+
75+
6676

6777
### CI/CD pipeline
6878

docs/data_quality.png

85.3 KB
Loading

docs/task output.png

-34.6 KB
Binary file not shown.

docs/task_output.png

49.9 KB
Loading

setup.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,5 +36,6 @@
3636
"setuptools",
3737
"funcy",
3838
"databricks-sdk",
39+
"databricks-labs-dqx",
3940
],
4041
)

src/template/baseTask.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,3 @@ class BaseTask:
22
def __init__(self, config):
33
self.config = config
44
self.spark = config.get_spark()
5-
self.dbutils = config.get_dbutils()

src/template/config.py

Lines changed: 8 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
import pyspark.sql.functions as F
2+
from databricks.labs.dqx.engine import DQEngine
3+
from databricks.sdk import WorkspaceClient
24
from pyspark.sql import SparkSession
35

46

@@ -16,29 +18,7 @@ def __init__(self, args):
1618

1719
self.spark = SparkSession.builder.appName(args.task).getOrCreate()
1820

19-
try:
20-
from pyspark.dbutils import DBUtils
21-
22-
self.dbutils = DBUtils(self.spark)
23-
24-
# TODO cannot access context on serverless
25-
# context_tags = self.dbutils.notebook.entry_point.getDbutils().notebook().getContext().tags()
26-
# print(context_tags)
27-
28-
# username = context_tags.get("user")
29-
30-
# if username.isDefined():
31-
# actual_value = username.get()
32-
# python_string = str(actual_value)
33-
# self.params.update({"workspace_user": python_string})
34-
# print("workspace user: " + python_string)
35-
# else:
36-
# print("workspace user empty")
37-
38-
except ModuleNotFoundError:
39-
self.dbutils = self._mock_dbutils(self.spark)
40-
41-
if self.params["env"] != "local":
21+
if args.env != "local":
4222
# if running in Databricks, set default catalog and schema
4323

4424
if args.env == "dev":
@@ -57,29 +37,18 @@ def __init__(self, args):
5737

5838
self.spark.sql(f"CREATE SCHEMA IF NOT EXISTS {args.schema}")
5939

60-
def _mock_dbutils(self, spark):
61-
class DBUtils:
62-
def __init__(self, spark):
63-
self.fs = self.FileSystem()
64-
65-
class FileSystem:
66-
def mount(self, source, mount_point):
67-
print(f"Mounting {source} to {mount_point}")
40+
ws = WorkspaceClient()
6841

69-
def unmount(self, mount_point):
70-
print(f"Unmounting {mount_point}")
42+
else:
43+
from unittest.mock import MagicMock
7144

72-
def mounts(self):
73-
return []
45+
ws = MagicMock(spec=WorkspaceClient, **{"current_user.me.return_value": None})
7446

75-
return DBUtils(spark)
47+
self.dq_engine = DQEngine(ws)
7648

7749
def get_spark(self):
7850
return self.spark
7951

80-
def get_dbutils(self):
81-
return self.dbutils
82-
8352
def get_value(self, key):
8453
return self.params[key]
8554

0 commit comments

Comments
 (0)