Skip to content

Conversation

ghanse
Copy link
Contributor

@ghanse ghanse commented Aug 29, 2025

Changes

This PR introduces summary metrics as outputs of quality checking methods. Summary metrics computation relies on Spark's Observation feature.

Basic usage

The DQObserver can be added to DQEngine to manage Spark observations and track summary metrics on datasets checked with DQX:

observer = DQObserver()
engine = DQEngine(ws, observer=observer)

Methods of DQEngine have been updated to optionally return the Spark observation associated with a given run:

checked_df, observation = engine.apply_checks(input_df, checks)
checked_df.count()  # or any other action like saving the dataframe to a table
metrics = observation.get

Writing summary metrics with checked data

When DQEngine methods write results to an output sink, metrics can also be written:

engine.apply_checks_and_save_in_table(
  checks=...,
  input_config=...,
  output_config=...,
  metrics_config=OutputConfig("main.dqx.summary_metrics")
)

Integration with installed workflows

I have also updated the quality checking and e2e workflows to allow users to specify an output table where metrics are stored.

TODO

  • Baseline functionality
  • Add handlers for streaming
  • Add handlers for custom callback functions
  • Update tests
  • Update docs
  • Update demos

Linked issues

Resolves #376

Tests

  • manually tested
  • added unit tests
  • added integration tests
  • added end-to-end tests

Copy link

github-actions bot commented Aug 29, 2025

✅ 13/13 passed, 12 flaky, 10m0s total

Flaky tests:

  • 🤪 test_run_pii_detection_notebook (5.498s)
  • 🤪 test_run_dqx_manufacturing_demo (5.569s)
  • 🤪 test_run_dqx_demo_pii_detection (5.721s)
  • 🤪 test_run_dqx_streaming_demo_diy (7.671s)
  • 🤪 test_run_observable_metrics_notebook (7.833s)
  • 🤪 test_run_dqx_dlt_demo (10.023s)
  • 🤪 test_run_dqx_streaming_demo_native (10.016s)
  • 🤪 test_run_dqx_demo_library (10.022s)
  • 🤪 test_run_dqx_quick_start_demo_library (10.021s)
  • 🤪 test_run_dqx_demo_tool (10.017s)
  • 🤪 test_run_dqx_multi_table_demo (8.419s)
  • 🤪 test_run_dqx_demo_asset_bundle (7.178s)

Running from acceptance #2894

Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR introduces summary metrics as outputs of quality checking methods, using Spark's Observation feature to track data quality metrics. The DQObserver class manages Spark observations and tracks both default metrics (input/error/warning/valid counts) and custom user-defined SQL expressions.

  • Adds DQObserver class for managing Spark observations and tracking summary metrics
  • Updates DQEngine methods to return tuples with both DataFrames and observations
  • Integrates metrics collection with existing workflows and configuration system

Reviewed Changes

Copilot reviewed 17 out of 17 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
src/databricks/labs/dqx/observer.py New DQObserver class for managing Spark observations and tracking metrics
src/databricks/labs/dqx/engine.py Updated engine methods to support metrics collection and storage
src/databricks/labs/dqx/config.py Added metrics configuration fields to RunConfig and WorkspaceConfig
tests/unit/test_observer.py Unit tests for the DQObserver class functionality
tests/integration/test_summary_metrics.py Integration tests for end-to-end metrics collection
tests/integration/test_metrics_workflow.py Tests for workflow-based metrics collection

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

Comment on lines 621 to 622
save_dataframe_as_table(metrics_df, metrics_config)
save_dataframe_as_table(metrics_df, metrics_config)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree

assert run_config.metrics_config is None

ctx.deployed_workflows.run_workflow("quality-checker", run_config.name)
assert not ws.tables.exists(run_config.metrics_config.location).table_exists
Copy link

Copilot AI Aug 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assertion will fail because run_config.metrics_config is None when metrics are disabled, causing a NoneType attribute access error. The assertion should check that run_config.metrics_config is None instead.

Suggested change
assert not ws.tables.exists(run_config.metrics_config.location).table_exists
# Cannot check for metrics table existence as metrics_config is None.

Copilot uses AI. Check for mistakes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

true

Copy link
Contributor

@mwojtyczka mwojtyczka left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Going in the right direction

- `sample_seed`: seed for reproducible sampling.
- `limit`: maximum number of records to analyze.
- `extra_params`: (optional) extra parameters to pass to the jobs such as result column names and user_metadata
- `custom_metrics`: (optional) list of Spark SQL expressions for capturing custom summary metrics.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be worth to add that a set of default metrics are always used regardless.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think this is clear?

By default, the number of input, warning, and error rows will be tracked. When custom metrics are defined, they will be tracked in addition to the default metrics.

A list of Spark SQL expressions as strings
"""
result_columns = self.result_columns or {}
errors_column = result_columns.get(ColumnArguments.ERRORS.value, DefaultColumnNames.ERRORS.value)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be better to provide this directly to avoid repeating the engine implementation. I would set this in the engine. Otherwise, user have to provide extra params twice and it would be error prone.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need a way to update the column names in SQL expressions used for default metrics. I added a _set_column_names method in the DQMetricsObserver that is called whenever DQEngine is initialized with an observer.

"""
Spark `Observation` which can be attached to a `DataFrame` to track summary metrics. Metrics will be collected
when the 1st action is triggered on the attached `DataFrame`. Subsequent operations on the attached `DataFrame`
will not update the observed metrics. See: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.Observation.html
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make this a proper link so that we render this nicely in api docs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

assert run_config.metrics_config is None

ctx.deployed_workflows.run_workflow("quality-checker", run_config.name)
assert not ws.tables.exists(run_config.metrics_config.location).table_exists
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

true


def test_engine_with_observer_before_action(ws, spark):
"""Test that summary metrics are empty before running a Spark action."""
custom_metrics = ["avg(age) as avg_age", "sum(salary) as total_salary"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need custom metrics here? it seems we can remove it for this test

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[FEATURE]: Add summary statistics as an additional output of quality checking

2 participants