Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions dbt/adapters/clickhouse/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,16 @@ def _render_event_time_filtered(self, event_time_filter: EventTimeFilter) -> str

return filter

def derivative(self, suffix: str, relation_type: Optional[str] = None) -> BaseRelation:
path = Path(schema=self.path.schema, database='', identifier=self.path.identifier + suffix)
def derivative(
self,
suffix: str,
relation_type: Optional[str] = None,
interpret_suffix_as_full_identifier: bool = False,
) -> BaseRelation:
new_identifier = (
suffix if interpret_suffix_as_full_identifier else self.path.identifier + suffix
)
path = Path(schema=self.path.schema, database='', identifier=new_identifier)
derivative_type = ClickHouseRelationType(relation_type) if relation_type else self.type
return ClickHouseRelation(
type=derivative_type, path=path, can_on_cluster=self.can_on_cluster
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
{{ log('Searching for existing materialized views with the pattern of ' + target_relation.name) }}
{{ log('Views dictionary contents: ' + views | string) }}
{% set found_associated_mvs, expected_mv_tables = clickhouse__search_associated_mvs_to_target(existing_relation.schema, target_relation.name, views) %}
{% if found_associated_mvs is none %}
{% if not found_associated_mvs %}
{{ log('No existing mvs found matching the pattern. continuing..', info=True) }}
{% else %}
{% for table in found_associated_mvs %}
Expand All @@ -72,7 +72,7 @@
{% endfor %}
{% endif %}
{% if should_full_refresh() %}
{{ clickhouse__drop_mvs(target_relation, cluster_clause, views) }}
{{ clickhouse__drop_mvs_by_suffixes(target_relation, cluster_clause, views) }}

{% call statement('main') -%}
{{ get_create_table_as_sql(False, backup_relation, sql) }}
Expand Down Expand Up @@ -173,9 +173,16 @@

{%- endmacro %}

{% macro clickhouse__drop_mvs(target_relation, cluster_clause, views) -%}
{% for view in views.keys() %}
{%- set mv_relation = target_relation.derivative('_' + view, 'materialized_view') -%}
{% macro clickhouse__drop_mvs_by_suffixes(target_relation, cluster_clause, views_suffixes) -%}
{% for suffix in views_suffixes.keys() %}
{%- set mv_relation = target_relation.derivative('_' + suffix, 'materialized_view') -%}
{{ clickhouse__drop_mv(mv_relation, cluster_clause) }};
{% endfor %}
{%- endmacro %}

{% macro clickhouse__drop_mvs_by_names(target_relation, cluster_clause, mvs_names) -%}
{% for mvs_name in mvs_names %}
{%- set mv_relation = target_relation.derivative(mvs_name, 'materialized_view', interpret_suffix_as_full_identifier=True) -%}
{{ clickhouse__drop_mv(mv_relation, cluster_clause) }};
{% endfor %}
{%- endmacro %}
Expand All @@ -195,19 +202,19 @@
and extract(create_table_query, 'TO\\s+([^\\s(]+)') = '{{ relation_schema }}.{{ relation_name }}'
{% endset %}

{% set mv_names = [] %}
{% set expected_mvs = [] %}
{% for suffix in mv_suffixes.keys() %}
{% do mv_names.append(relation_name ~ "_" ~ suffix) %}
{% do expected_mvs.append(relation_name ~ "_" ~ suffix) %}
{% endfor %}
{{ log('Model mvs to replace ' + mv_names | string) }}
{{ log('Model mvs to replace ' + expected_mvs | string) }}

{% set tables_result = run_query(tables_query) %}
{% if tables_result is not none and tables_result.columns %}
{% set tables = tables_result.columns[0].values() %}
{{ log('Current mvs found in ClickHouse are: ' + tables | join(', ')) }}
{{ return((tables, mv_names,)) }}
{% set mvs_found = run_query(tables_query) %}
{% if mvs_found is not none and mvs_found.columns %}
{% set mv_found_names = mvs_found.columns[0].values() %}
{{ log('Current mvs found in ClickHouse are: ' + mv_found_names | join(', ')) }}
{{ return((mv_found_names, expected_mvs,)) }}
{% else %}
{{ return((None, mv_names,)) }}
{{ return(([], expected_mvs,)) }}
{% endif %}
{%- endmacro %}

Expand All @@ -229,8 +236,13 @@
{% endfor %}
{% endif %}
{%- set cluster_clause = on_cluster_clause(target_relation) -%}
{{ clickhouse__drop_mvs(target_relation, cluster_clause, views) }}

{% set matching_mvs = [] %}
{% for mv in found_associated_mvs %}
{% if mv in expected_mv_tables %}
{% do matching_mvs.append(mv) %}
{% endif %}
{% endfor %}
{{ clickhouse__drop_mvs_by_names(target_relation, cluster_clause, matching_mvs) }}
{%- endmacro %}

{% macro clickhouse__update_mvs(target_relation, cluster_clause, refreshable_clause, views) -%}
Expand All @@ -244,7 +256,7 @@
{# drop existing materialized view while we recreate the target table #}
{%- set cluster_clause = on_cluster_clause(target_relation) -%}
{%- set refreshable_clause = refreshable_mv_clause() -%}
{{ clickhouse__drop_mvs(target_relation, cluster_clause, views) }}
{{ clickhouse__drop_mvs_by_suffixes(target_relation, cluster_clause, views) }}

{# recreate the target table #}
{% call statement('main') -%}
Expand Down
121 changes: 102 additions & 19 deletions tests/integration/adapter/materialized_view/test_materialized_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,36 @@ def test_disabled_catchup(self, project):
assert result[0][0] == 1


# View model for testing view + MV coexistence
VIEW_MODEL_HACKERS = """
{{ config(
materialized='view',
schema='custom_schema'
) }}

select
id,
name,
case
when name like 'Dade' then 'crash_override'
when name like 'Kate' then 'acid burn'
else 'N/A'
end as hacker_alias
from {{ source('raw', 'people') }}
where department = 'engineering'
"""


def query_table_type(project, schema, table):
table_type = project.run_sql(
f"""
select engine from system.tables where database = '{schema}' and name = '{table}'
""",
fetch="all",
)
return table_type[0][0] if len(table_type) > 0 else None


class TestUpdateMV:
@pytest.fixture(scope="class")
def seeds(self):
Expand All @@ -187,7 +217,8 @@ def seeds(self):
@pytest.fixture(scope="class")
def models(self):
return {
"hackers.sql": MV_MODEL,
"hackers_mv.sql": MV_MODEL,
"hackers.sql": VIEW_MODEL_HACKERS,
}

def test_update_incremental(self, project):
Expand All @@ -209,7 +240,8 @@ def test_update_incremental(self, project):

# assert that we now have both of Dade's aliases in our hackers table
result = project.run_sql(
f"select distinct hacker_alias from {schema}.hackers where name = 'Dade'", fetch="all"
f"select distinct hacker_alias from {schema}.hackers_mv where name = 'Dade'",
fetch="all",
)
assert len(result) == 2

Expand All @@ -232,7 +264,8 @@ def test_update_full_refresh(self, project):

# assert that we now have both of Dade's aliases in our hackers table
result = project.run_sql(
f"select distinct hacker_alias from {schema}.hackers where name = 'Dade'", fetch="all"
f"select distinct hacker_alias from {schema}.hackers_mv where name = 'Dade'",
fetch="all",
)
assert len(result) == 2

Expand All @@ -243,7 +276,6 @@ def test_mv_is_dropped_on_full_refresh(self, project):
3. change the model to be a view and run with full refresh
4. assert that the target table is now a view and the internal MV (_mv) no longer exists
"""
schema = quote_identifier(project.test_schema + "_custom_schema")
schema_unquoted = project.test_schema + "_custom_schema"

# Step 1: Create base table via dbt seed
Expand All @@ -252,29 +284,80 @@ def test_mv_is_dropped_on_full_refresh(self, project):

# Step 2: Create the model as a materialized view
results = run_dbt()
assert len(results) == 1

def query_table_type(table_name):
table_type = project.run_sql(
f"""
select engine from system.tables where database = '{schema_unquoted}' and name = '{table_name}'
""",
fetch="all",
)
return table_type[0][0] if len(table_type) > 0 else None
assert len(results) == 2 # will include also a view for the other test.

# Verify both tables were created correctly
assert query_table_type('hackers') == "MergeTree"
assert query_table_type('hackers_mv') == "MaterializedView"
assert query_table_type(project, schema_unquoted, 'hackers_mv') == "MergeTree"
assert query_table_type(project, schema_unquoted, 'hackers_mv_mv') == "MaterializedView"

# Step 3: Change model to view materialization and run with full refresh
run_vars = {"run_type": "view_conversion"}
results = run_dbt(
["run", "--full-refresh", "--log-level", "debug", "--vars", json.dumps(run_vars)]
)
assert len(results) == 1
assert len(results) == 2 # will include also a view for the other test.

# Step 4: Assert that target table is now a view and internal MV no longer exists
assert query_table_type('hackers') == "View"
assert query_table_type(project, schema_unquoted, 'hackers_mv') == "View"
# Verify that the internal materialized view (_mv) no longer exists
assert query_table_type('hackers_mv') is None
assert query_table_type(project, schema_unquoted, 'hackers_mv_mv') is None

def test_view_full_refresh_does_not_affect_existing_mv_with_mv_suffix(self, project):
"""
1. create a base table via dbt seed
2. create a regular view (hackers) and a materialized view (hackers_mv) with the same query
4. force a full refresh on hackers (the view)
5. verify that hackers still works and hackers_mv and hackers_mv_mv are still present
"""
schema_unquoted = project.test_schema + "_custom_schema"

# Step 1: Create base table via dbt seed
results = run_dbt(["seed"])
assert len(results) == 1

# Step 2: Create both models (view and materialized view)
results = run_dbt()
assert len(results) == 2

# Verify both models were created correctly
assert query_table_type(project, schema_unquoted, 'hackers') == "View"
assert query_table_type(project, schema_unquoted, 'hackers_mv') == "MergeTree"
assert query_table_type(project, schema_unquoted, 'hackers_mv_mv') == "MaterializedView"

# Verify data is present in both
result = project.run_sql(f"select count(*) from {schema_unquoted}.hackers", fetch="all")
assert result[0][0] == 3 # 3 engineering people in seed data

result = project.run_sql(f"select count(*) from {schema_unquoted}.hackers_mv", fetch="all")
assert result[0][0] == 3

# Step 3: Force a full refresh on hackers (the view) only
results = run_dbt(["run", "--full-refresh", "--select", "hackers"])
assert len(results) == 1

# Step 4: Verify that hackers still works
assert query_table_type(project, schema_unquoted, 'hackers') == "View"
result = project.run_sql(f"select count(*) from {schema_unquoted}.hackers", fetch="all")
assert result[0][0] == 3

# Verify that hackers_mv and hackers_mv_mv are still present and working
assert query_table_type(project, schema_unquoted, 'hackers_mv') == "MergeTree"
assert query_table_type(project, schema_unquoted, 'hackers_mv_mv') == "MaterializedView"

result = project.run_sql(f"select count(*) from {schema_unquoted}.hackers_mv", fetch="all")
assert result[0][0] == 3

# Insert new data and verify materialized view still captures it
project.run_sql(
f"""
insert into {quote_identifier(project.test_schema)}.people ("id", "name", "age", "department")
values (7777,'Neo',30,'engineering');
"""
)

# Verify the new data appears in both view and materialized view
result = project.run_sql(f"select count(*) from {schema_unquoted}.hackers", fetch="all")
assert result[0][0] == 4

result = project.run_sql(f"select count(*) from {schema_unquoted}.hackers_mv", fetch="all")
assert result[0][0] == 4