Skip to content

Commit ff66784

Browse files
fix: Improve BQ point-in-time joining scalability (#3429)
fix: improve BQ point-in-time joining scalability Signed-off-by: Danny Chiao <[email protected]>
1 parent 473f8d9 commit ff66784

File tree

1 file changed

+104
-90
lines changed

1 file changed

+104
-90
lines changed

sdk/python/feast/infra/offline_stores/bigquery.py

Lines changed: 104 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -488,10 +488,24 @@ def to_bigquery(
488488
return str(job_config.destination)
489489

490490
with self._query_generator() as query:
491-
self._execute_query(query, job_config, timeout)
491+
dest = job_config.destination
492+
# because setting destination for scripts is not valid
493+
# remove destination attribute if provided
494+
job_config.destination = None
495+
bq_job = self._execute_query(query, job_config, timeout)
492496

493-
print(f"Done writing to '{job_config.destination}'.")
494-
return str(job_config.destination)
497+
if not job_config.dry_run:
498+
config = bq_job.to_api_repr()["configuration"]
499+
# get temp table created by BQ
500+
tmp_dest = config["query"]["destinationTable"]
501+
temp_dest_table = f"{tmp_dest['projectId']}.{tmp_dest['datasetId']}.{tmp_dest['tableId']}"
502+
503+
# persist temp table
504+
sql = f"CREATE TABLE {dest} AS SELECT * FROM {temp_dest_table}"
505+
self._execute_query(sql, timeout=timeout)
506+
507+
print(f"Done writing to '{dest}'.")
508+
return str(dest)
495509

496510
def _to_arrow_internal(self) -> pyarrow.Table:
497511
with self._query_generator() as query:
@@ -777,7 +791,7 @@ def arrow_schema_to_bq_schema(arrow_schema: pyarrow.Schema) -> List[SchemaField]
777791
Compute a deterministic hash for the `left_table_query_string` that will be used throughout
778792
all the logic as the field to GROUP BY the data
779793
*/
780-
WITH entity_dataframe AS (
794+
CREATE TEMP TABLE entity_dataframe AS (
781795
SELECT *,
782796
{{entity_df_event_timestamp_col}} AS entity_timestamp
783797
{% for featureview in featureviews %}
@@ -793,95 +807,95 @@ def arrow_schema_to_bq_schema(arrow_schema: pyarrow.Schema) -> List[SchemaField]
793807
{% endif %}
794808
{% endfor %}
795809
FROM `{{ left_table_query_string }}`
796-
),
810+
);
797811
798812
{% for featureview in featureviews %}
799-
800-
{{ featureview.name }}__entity_dataframe AS (
801-
SELECT
802-
{{ featureview.entities | join(', ')}}{% if featureview.entities %},{% else %}{% endif %}
803-
entity_timestamp,
804-
{{featureview.name}}__entity_row_unique_id
805-
FROM entity_dataframe
806-
GROUP BY
807-
{{ featureview.entities | join(', ')}}{% if featureview.entities %},{% else %}{% endif %}
808-
entity_timestamp,
809-
{{featureview.name}}__entity_row_unique_id
810-
),
811-
812-
/*
813-
This query template performs the point-in-time correctness join for a single feature set table
814-
to the provided entity table.
815-
816-
1. We first join the current feature_view to the entity dataframe that has been passed.
817-
This JOIN has the following logic:
818-
- For each row of the entity dataframe, only keep the rows where the `timestamp_field`
819-
is less than the one provided in the entity dataframe
820-
- If there a TTL for the current feature_view, also keep the rows where the `timestamp_field`
821-
is higher the the one provided minus the TTL
822-
- For each row, Join on the entity key and retrieve the `entity_row_unique_id` that has been
823-
computed previously
824-
825-
The output of this CTE will contain all the necessary information and already filtered out most
826-
of the data that is not relevant.
827-
*/
828-
829-
{{ featureview.name }}__subquery AS (
830-
SELECT
831-
{{ featureview.timestamp_field }} as event_timestamp,
832-
{{ featureview.created_timestamp_column ~ ' as created_timestamp,' if featureview.created_timestamp_column else '' }}
833-
{{ featureview.entity_selections | join(', ')}}{% if featureview.entity_selections %},{% else %}{% endif %}
834-
{% for feature in featureview.features %}
835-
{{ feature }} as {% if full_feature_names %}{{ featureview.name }}__{{featureview.field_mapping.get(feature, feature)}}{% else %}{{ featureview.field_mapping.get(feature, feature) }}{% endif %}{% if loop.last %}{% else %}, {% endif %}
836-
{% endfor %}
837-
FROM {{ featureview.table_subquery }}
838-
WHERE {{ featureview.timestamp_field }} <= '{{ featureview.max_event_timestamp }}'
839-
{% if featureview.ttl == 0 %}{% else %}
840-
AND {{ featureview.timestamp_field }} >= '{{ featureview.min_event_timestamp }}'
841-
{% endif %}
842-
),
843-
844-
{{ featureview.name }}__base AS (
845-
SELECT
846-
subquery.*,
847-
entity_dataframe.entity_timestamp,
848-
entity_dataframe.{{featureview.name}}__entity_row_unique_id
849-
FROM {{ featureview.name }}__subquery AS subquery
850-
INNER JOIN {{ featureview.name }}__entity_dataframe AS entity_dataframe
851-
ON TRUE
852-
AND subquery.event_timestamp <= entity_dataframe.entity_timestamp
853-
813+
CREATE TEMP TABLE {{ featureview.name }}__cleaned AS (
814+
WITH {{ featureview.name }}__entity_dataframe AS (
815+
SELECT
816+
{{ featureview.entities | join(', ')}}{% if featureview.entities %},{% else %}{% endif %}
817+
entity_timestamp,
818+
{{featureview.name}}__entity_row_unique_id
819+
FROM entity_dataframe
820+
GROUP BY
821+
{{ featureview.entities | join(', ')}}{% if featureview.entities %},{% else %}{% endif %}
822+
entity_timestamp,
823+
{{featureview.name}}__entity_row_unique_id
824+
),
825+
826+
/*
827+
This query template performs the point-in-time correctness join for a single feature set table
828+
to the provided entity table.
829+
830+
1. We first join the current feature_view to the entity dataframe that has been passed.
831+
This JOIN has the following logic:
832+
- For each row of the entity dataframe, only keep the rows where the `timestamp_field`
833+
is less than the one provided in the entity dataframe
834+
- If there a TTL for the current feature_view, also keep the rows where the `timestamp_field`
835+
is higher the the one provided minus the TTL
836+
- For each row, Join on the entity key and retrieve the `entity_row_unique_id` that has been
837+
computed previously
838+
839+
The output of this CTE will contain all the necessary information and already filtered out most
840+
of the data that is not relevant.
841+
*/
842+
843+
{{ featureview.name }}__subquery AS (
844+
SELECT
845+
{{ featureview.timestamp_field }} as event_timestamp,
846+
{{ featureview.created_timestamp_column ~ ' as created_timestamp,' if featureview.created_timestamp_column else '' }}
847+
{{ featureview.entity_selections | join(', ')}}{% if featureview.entity_selections %},{% else %}{% endif %}
848+
{% for feature in featureview.features %}
849+
{{ feature }} as {% if full_feature_names %}{{ featureview.name }}__{{featureview.field_mapping.get(feature, feature)}}{% else %}{{ featureview.field_mapping.get(feature, feature) }}{% endif %}{% if loop.last %}{% else %}, {% endif %}
850+
{% endfor %}
851+
FROM {{ featureview.table_subquery }}
852+
WHERE {{ featureview.timestamp_field }} <= '{{ featureview.max_event_timestamp }}'
854853
{% if featureview.ttl == 0 %}{% else %}
855-
AND subquery.event_timestamp >= Timestamp_sub(entity_dataframe.entity_timestamp, interval {{ featureview.ttl }} second)
854+
AND {{ featureview.timestamp_field }} >= '{{ featureview.min_event_timestamp }}'
856855
{% endif %}
856+
),
857+
858+
{{ featureview.name }}__base AS (
859+
SELECT
860+
subquery.*,
861+
entity_dataframe.entity_timestamp,
862+
entity_dataframe.{{featureview.name}}__entity_row_unique_id
863+
FROM {{ featureview.name }}__subquery AS subquery
864+
INNER JOIN {{ featureview.name }}__entity_dataframe AS entity_dataframe
865+
ON TRUE
866+
AND subquery.event_timestamp <= entity_dataframe.entity_timestamp
867+
868+
{% if featureview.ttl == 0 %}{% else %}
869+
AND subquery.event_timestamp >= Timestamp_sub(entity_dataframe.entity_timestamp, interval {{ featureview.ttl }} second)
870+
{% endif %}
857871
858-
{% for entity in featureview.entities %}
859-
AND subquery.{{ entity }} = entity_dataframe.{{ entity }}
860-
{% endfor %}
861-
),
862-
863-
/*
864-
2. If the `created_timestamp_column` has been set, we need to
865-
deduplicate the data first. This is done by calculating the
866-
`MAX(created_at_timestamp)` for each event_timestamp.
867-
We then join the data on the next CTE
868-
*/
869-
{% if featureview.created_timestamp_column %}
870-
{{ featureview.name }}__dedup AS (
871-
SELECT
872-
{{featureview.name}}__entity_row_unique_id,
873-
event_timestamp,
874-
MAX(created_timestamp) as created_timestamp
875-
FROM {{ featureview.name }}__base
876-
GROUP BY {{featureview.name}}__entity_row_unique_id, event_timestamp
877-
),
878-
{% endif %}
872+
{% for entity in featureview.entities %}
873+
AND subquery.{{ entity }} = entity_dataframe.{{ entity }}
874+
{% endfor %}
875+
),
876+
877+
/*
878+
2. If the `created_timestamp_column` has been set, we need to
879+
deduplicate the data first. This is done by calculating the
880+
`MAX(created_at_timestamp)` for each event_timestamp.
881+
We then join the data on the next CTE
882+
*/
883+
{% if featureview.created_timestamp_column %}
884+
{{ featureview.name }}__dedup AS (
885+
SELECT
886+
{{featureview.name}}__entity_row_unique_id,
887+
event_timestamp,
888+
MAX(created_timestamp) as created_timestamp
889+
FROM {{ featureview.name }}__base
890+
GROUP BY {{featureview.name}}__entity_row_unique_id, event_timestamp
891+
),
892+
{% endif %}
879893
880-
/*
881-
3. The data has been filtered during the first CTE "*__base"
882-
Thus we only need to compute the latest timestamp of each feature.
883-
*/
884-
{{ featureview.name }}__latest AS (
894+
/*
895+
3. The data has been filtered during the first CTE "*__base"
896+
Thus we only need to compute the latest timestamp of each feature.
897+
*/
898+
{{ featureview.name }}__latest AS (
885899
SELECT
886900
event_timestamp,
887901
{% if featureview.created_timestamp_column %}created_timestamp,{% endif %}
@@ -900,13 +914,13 @@ def arrow_schema_to_bq_schema(arrow_schema: pyarrow.Schema) -> List[SchemaField]
900914
{% endif %}
901915
)
902916
WHERE row_number = 1
903-
),
917+
)
904918
905919
/*
906920
4. Once we know the latest value of each feature for a given timestamp,
907921
we can join again the data back to the original "base" dataset
908922
*/
909-
{{ featureview.name }}__cleaned AS (
923+
910924
SELECT base.*
911925
FROM {{ featureview.name }}__base as base
912926
INNER JOIN {{ featureview.name }}__latest
@@ -917,7 +931,7 @@ def arrow_schema_to_bq_schema(arrow_schema: pyarrow.Schema) -> List[SchemaField]
917931
,created_timestamp
918932
{% endif %}
919933
)
920-
){% if loop.last %}{% else %}, {% endif %}
934+
);
921935
922936
923937
{% endfor %}

0 commit comments

Comments
 (0)