-
Notifications
You must be signed in to change notification settings - Fork 3
Chore: Create CI Team in teams.py for Canary Hub Testing ZIP-956 #1086
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
WalkthroughAdds many GCP CI canary artifacts: new GroupBys, Joins, StagingQueries, team metadata, compiled manifests, and a script branch to select canary vs standard hub backfill. All artifacts include execution, conf, and env specs. Changes
Sequence Diagram(s)sequenceDiagram
participant PubSub as Pub/Sub / Kafka
participant Staging as StagingExports
participant GB as GroupBy jobs
participant Join as Join jobs
participant StagingQ as StagingQueries
PubSub->>Staging: ingest/export to staging tables
Staging->>GB: source tables for group-by queries
GB->>Join: right-side aggregated features
Join->>StagingQ: assemble training/demo outputs
StagingQ->>Staging: write staging outputs (Iceberg/BQ)
sequenceDiagram
participant Events as item_events source
participant Actions as actions_pubsub_v2 GroupBy
participant Purch as purchases GroupBy
participant Join as item_event_join
Events->>Actions: map event_type -> signals, aggregate per-listing
Purch->>Join: provide per-user purchase features
Actions->>Join: provide per-listing action features
Join->>StagingQ: produce joined training/demo rows
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested reviewers
Poem
Tip 🔌 Remote MCP (Model Context Protocol) integration is now available!Pro plan users can now connect to remote MCP servers from the Integrations page. Connect with popular remote MCPs such as Notion and Linear to add more context to your reviews and chats. ✨ Finishing Touches
🧪 Generate unit tests
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 30
🧹 Nitpick comments (55)
api/python/test/canary/compiled/staging_queries/gcp_ci/exports.dim_merchants__0 (1)
76-99
: Env inconsistency: dev artifact prefix in CI artifact.ARTIFACT_PREFIX points to dev; consider canary-specific prefix for CI isolation.
- "ARTIFACT_PREFIX": "gs://zipline-artifacts-dev", + "ARTIFACT_PREFIX": "gs://zipline-artifacts-canary",Same tweak may apply under modeEnvironments.upload. If these files are generated, align the generator (teams metadata) instead of hand-editing.
Also applies to: 97-115
api/python/test/canary/compiled/staging_queries/gcp_ci/exports.dim_users__0 (1)
76-99
: Env inconsistency: dev artifact prefix in CI artifact.Mirror the canary artifact prefix for CI.
- "ARTIFACT_PREFIX": "gs://zipline-artifacts-dev", + "ARTIFACT_PREFIX": "gs://zipline-artifacts-canary",If compiled, adjust the source config so regenerated artifacts are correct.
Also applies to: 97-115
api/python/test/canary/staging_queries/gcp_ci/exports.py (1)
10-11
: Make partition filter robust across TIMESTAMP/DATE partition columns.Casting to TIMESTAMP avoids type mismatches (e.g., ds vs _PARTITIONTIME).
- TIMESTAMP_TRUNC({partition_column}, DAY) BETWEEN {{ start_date }} AND {{ end_date }} + TIMESTAMP_TRUNC(TIMESTAMP({partition_column}), DAY) BETWEEN {{ start_date }} AND {{ end_date }}api/python/test/canary/compiled/staging_queries/gcp_ci/exports.checkouts__0 (2)
77-77
: Align ARTIFACT_PREFIX with CI artifacts.Env points to dev bucket while cluster metadata uses canary. Use a CI/canary bucket to avoid cross-env contamination.
- "ARTIFACT_PREFIX": "gs://zipline-artifacts-dev", + "ARTIFACT_PREFIX": "gs://zipline-artifacts-canary",- "ARTIFACT_PREFIX": "gs://zipline-artifacts-dev", + "ARTIFACT_PREFIX": "gs://zipline-artifacts-canary",Also applies to: 98-98
81-81
: Double-check CUSTOMER_ID == "dev" for gcp_ci.If CUSTOMER_ID feeds routing/tenancy, consider "ci" or "canary" for isolation.
Also applies to: 102-102
api/python/test/canary/compiled/staging_queries/gcp_ci/exports.dim_listings__0 (2)
77-77
: Align ARTIFACT_PREFIX with CI artifacts.Same mismatch as other gcp_ci artifacts; prefer a CI/canary bucket.
- "ARTIFACT_PREFIX": "gs://zipline-artifacts-dev", + "ARTIFACT_PREFIX": "gs://zipline-artifacts-canary",- "ARTIFACT_PREFIX": "gs://zipline-artifacts-dev", + "ARTIFACT_PREFIX": "gs://zipline-artifacts-canary",Also applies to: 98-98
81-81
: Confirm CUSTOMER_ID value for CI.Ensure "dev" is intentional in CI; otherwise switch to "ci"/"canary".
Also applies to: 102-102
api/python/test/canary/compiled/group_bys/gcp_ci/item_event_canary.actions_v1__0 (2)
132-149
: Env inconsistencies with partitioning and artifacts.
- PARTITION_COLUMN in env is "ds" but conf uses "_DATE". Align to avoid surprises.
- ARTIFACT_PREFIX points to dev; prefer CI/canary bucket.
- "ARTIFACT_PREFIX": "gs://zipline-artifacts-dev", + "ARTIFACT_PREFIX": "gs://zipline-artifacts-canary", ... - "PARTITION_COLUMN": "ds", + "PARTITION_COLUMN": "_DATE",- "ARTIFACT_PREFIX": "gs://zipline-artifacts-dev", + "ARTIFACT_PREFIX": "gs://zipline-artifacts-canary", ... - "PARTITION_COLUMN": "ds", + "PARTITION_COLUMN": "_DATE",Also applies to: 152-171
136-146
: Verify CUSTOMER_ID for CI.Confirm "dev" is expected for gcp_ci; switch if needed.
Also applies to: 157-167
api/python/test/canary/compiled/group_bys/gcp_ci/dim_listings.v1__0 (2)
97-115
: Use CI artifact bucket instead of dev.Keep gcp_ci isolated from dev.
- "ARTIFACT_PREFIX": "gs://zipline-artifacts-dev", + "ARTIFACT_PREFIX": "gs://zipline-artifacts-canary",- "ARTIFACT_PREFIX": "gs://zipline-artifacts-dev", + "ARTIFACT_PREFIX": "gs://zipline-artifacts-canary",Also applies to: 117-136
101-109
: Confirm CUSTOMER_ID.Ensure "dev" is intended for CI.
Also applies to: 121-129
api/python/test/canary/group_bys/gcp_ci/purchases.py (4)
23-49
: DRY up duplicated aggregation specs.Define the purchase aggregations once and reuse in all GroupBys.
+purchase_aggs = [ + Aggregation(input_column="purchase_price", operation=Operation.SUM, windows=window_sizes), + Aggregation(input_column="purchase_price", operation=Operation.COUNT, windows=window_sizes), + Aggregation(input_column="purchase_price", operation=Operation.AVERAGE, windows=window_sizes), + Aggregation(input_column="purchase_price", operation=Operation.LAST_K(10)), +] @@ - aggregations=[Aggregation( - input_column="purchase_price", - operation=Operation.SUM, - windows=window_sizes - ), # The sum of purchases prices in various windows - Aggregation( - input_column="purchase_price", - operation=Operation.COUNT, - windows=window_sizes - ), # The count of purchases in various windows - Aggregation( - input_column="purchase_price", - operation=Operation.AVERAGE, - windows=window_sizes - ), # The average purchases by user in various windows - Aggregation( - input_column="purchase_price", - operation=Operation.LAST_K(10), - ), - ], + aggregations=purchase_aggs,
51-77
: Apply the same DRY refactor to v1_test.- aggregations=[Aggregation( - input_column="purchase_price", - operation=Operation.SUM, - windows=window_sizes - ), # The sum of purchases prices in various windows - Aggregation( - input_column="purchase_price", - operation=Operation.COUNT, - windows=window_sizes - ), # The count of purchases in various windows - Aggregation( - input_column="purchase_price", - operation=Operation.AVERAGE, - windows=window_sizes - ), # The average purchases by user in various windows - Aggregation( - input_column="purchase_price", - operation=Operation.LAST_K(10), - ), - ], + aggregations=purchase_aggs,
92-118
: DRY for notds variants too.- aggregations=[Aggregation( - input_column="purchase_price", - operation=Operation.SUM, - windows=window_sizes - ), # The sum of purchases prices in various windows - Aggregation( - input_column="purchase_price", - operation=Operation.COUNT, - windows=window_sizes - ), # The count of purchases in various windows - Aggregation( - input_column="purchase_price", - operation=Operation.AVERAGE, - windows=window_sizes - ), # The average purchases by user in various windows - Aggregation( - input_column="purchase_price", - operation=Operation.LAST_K(10), - ), - ], + aggregations=purchase_aggs, @@ - aggregations=[Aggregation( - input_column="purchase_price", - operation=Operation.SUM, - windows=window_sizes - ), # The sum of purchases prices in various windows - Aggregation( - input_column="purchase_price", - operation=Operation.COUNT, - windows=window_sizes - ), # The count of purchases in various windows - Aggregation( - input_column="purchase_price", - operation=Operation.AVERAGE, - windows=window_sizes - ), # The average purchases by user in various windows - Aggregation( - input_column="purchase_price", - operation=Operation.LAST_K(10), - ), - ], + aggregations=purchase_aggs,Also applies to: 120-146
33-47
: Tiny comment nits.“purchases prices” → “purchase prices”; “average purchases by user” → “average purchase price by user”.
- ), # The sum of purchases prices in various windows + ), # The sum of purchase prices in various windows @@ - ), # The average purchases by user in various windows + ), # The average purchase price by user in various windowsAlso applies to: 61-75, 103-115, 135-143
api/python/test/canary/group_bys/gcp_ci/item_event_canary.py (2)
1-4
: Prefer helper constructor over direct ttypes for clarity and defaults.Use ai.chronon.source.EventSource helper; it returns the correct ttypes.Source and keeps parameter names consistent (isCumulative, etc.). Keeps code idiomatic with the rest of the repo.
Apply:
-from ai.chronon.api.ttypes import EventSource, Source +from ai.chronon.api.ttypes import Source +from ai.chronon.source import EventSource as mk_event_source
16-34
: Simplify Source construction via helper.This avoids manual ttypes wiring and mismatched field names.
- return Source( - events=EventSource( - # This source table contains a custom struct ('attributes') that enables - # attributes['key'] style access pattern in a BQ native table. - table="data.item_events_parquet_compat_partitioned", - topic=topic, - query=Query( - selects=selects( - listing_id="EXPLODE(TRANSFORM(SPLIT(COALESCE(attributes['sold_listing_ids'], attributes['listing_id']), ','), e -> CAST(e AS LONG)))", - add_cart="IF(event_type = 'backend_add_to_cart', 1, 0)", - view="IF(event_type = 'view_listing', 1, 0)", - purchase="IF(event_type = 'backend_cart_payment', 1, 0)", - favorite="IF(event_type = 'backend_favorite_item2', 1, 0)", - ), - wheres=[_action_events_filter], - time_column="timestamp", - ), - ) - ) + return mk_event_source( + # This source table contains a custom struct ('attributes') that enables + # attributes['key'] style access pattern in a BQ native table. + table="data.item_events_parquet_compat_partitioned", + topic=topic, + query=Query( + selects=selects( + listing_id="EXPLODE(TRANSFORM(SPLIT(COALESCE(attributes['sold_listing_ids'], attributes['listing_id']), ','), e -> CAST(e AS LONG)))", + add_cart="IF(event_type = 'backend_add_to_cart', 1, 0)", + view="IF(event_type = 'view_listing', 1, 0)", + purchase="IF(event_type = 'backend_cart_payment', 1, 0)", + favorite="IF(event_type = 'backend_favorite_item2', 1, 0)", + ), + wheres=[_action_events_filter], + time_column="timestamp", + ), + )scripts/distribution/run_gcp_hub_quickstart.sh (1)
196-200
: DRY conf path selection in run_gcp_hub_quickstart.shCompute
CONF_PATH
based on$ENVIRONMENT
and callzipline hub backfill
once:# pick conf file if [[ "$ENVIRONMENT" == "canary" ]]; then CONF_PATH="compiled/staging_queries/gcp_ci/sample_staging_query.v1_hub__0" else CONF_PATH="compiled/staging_queries/gcp/sample_staging_query.v1_hub__0" fi zipline hub backfill \ --repo=$CHRONON_ROOT \ --conf=$CONF_PATH \ --start-ds 2023-12-01 \ --end-ds 2023-12-01 \ | tee tmp_backfill.outapi/python/test/canary/group_bys/gcp_ci/dim_merchants.py (2)
8-12
: Docstring references dim_listings; update to dim_merchants.Avoid confusion.
-This GroupBy creates a simple passthrough transformation on the dim_listings table. +This GroupBy creates a simple passthrough transformation on the dim_merchants table.
31-32
: Prefer empty list over None for aggregations.Some consumers iterate aggregations without None checks.
- aggregations=None, # No aggregations - this is a simple passthrough + aggregations=[], # No aggregations - this is a simple passthroughapi/python/test/canary/compiled/staging_queries/gcp_ci/sample_staging_query.v1_bigquery_import__0 (1)
76-95
: Env uses dev artifact bucket; CI likely should use canary.Align ARTIFACT_PREFIX with CI (canary). Prefer fixing teams.py and regenerating artifacts rather than hand-editing compiled JSON.
I can update teams.py and recompile manifests if you want.
api/python/test/canary/joins/gcp_ci/demo.py (1)
10-18
: Docstring nit: mention merchants.Right parts include dim_merchants but docstring omits it.
Right parts: - User behavioral aggregations (keyed by user_id) - Listing dimension attributes (keyed by listing_id) +- Merchant dimension attributes (keyed by merchant_id, prefixed)
api/python/test/canary/compiled/staging_queries/gcp_ci/sample_staging_query.payment__0 (1)
76-83
: Artifact prefix inconsistent with canary CI.env ARTIFACT_PREFIX uses dev while cluster metadata uses canary. Unify to canary.
- "ARTIFACT_PREFIX": "gs://zipline-artifacts-dev", + "ARTIFACT_PREFIX": "gs://zipline-artifacts-canary", ... - "ARTIFACT_PREFIX": "gs://zipline-artifacts-dev", + "ARTIFACT_PREFIX": "gs://zipline-artifacts-canary",Optionally consider setting CUSTOMER_ID to "canary" (or "ci") for clarity.
Also applies to: 97-115
api/python/test/canary/compiled/joins/gcp_ci/training_set.v1_test_notds__0 (1)
220-238
: Use canary artifact prefix for CI.Same ARTIFACT_PREFIX consistency issue.
- "ARTIFACT_PREFIX": "gs://zipline-artifacts-dev", + "ARTIFACT_PREFIX": "gs://zipline-artifacts-canary", ... - "ARTIFACT_PREFIX": "gs://zipline-artifacts-dev", + "ARTIFACT_PREFIX": "gs://zipline-artifacts-canary",Also applies to: 240-259
api/python/test/canary/compiled/joins/gcp_ci/item_event_join.canary_streaming_v1__0 (1)
205-223
: Streaming bootstrap missing at join-level env.GroupBy metaData sets CHRONON_ONLINE_ARGS with -Zbootstrap; top-level env lacks it. If join runs streaming, add bootstrap.
- "CHRONON_ONLINE_ARGS": " -Ztasks=4", + "CHRONON_ONLINE_ARGS": " -Ztasks=4 -Zbootstrap=bootstrap.zipline-kafka-cluster.us-central1.managedkafka.canary-443022.cloud.goog:9092",api/python/test/canary/compiled/joins/gcp_ci/training_set.v1_hub__0 (1)
218-236
: Align ARTIFACT_PREFIX with canary CI.Use canary bucket for CI artifacts.
- "ARTIFACT_PREFIX": "gs://zipline-artifacts-dev", + "ARTIFACT_PREFIX": "gs://zipline-artifacts-canary", ... - "ARTIFACT_PREFIX": "gs://zipline-artifacts-dev", + "ARTIFACT_PREFIX": "gs://zipline-artifacts-canary",Also applies to: 238-257
api/python/test/canary/joins/gcp_ci/item_event_join.py (1)
23-24
: Prefer list form for row_ids for consistency.Keeps API usage uniform with combined join.
- row_ids="user_id", + row_ids=["user_id"], ... - row_ids="listing_id", + row_ids=["listing_id"],Also applies to: 34-35
api/python/test/canary/compiled/staging_queries/gcp_ci/sample_staging_query.purchases_labels__0 (2)
129-129
: Avoid nondeterministic labels for CI.rand() will make results flaky; prefer a deterministic hash-based label (e.g., pmod(abs(hash(user_id, ds)), 2)).
80-93
: Nit: Consider CUSTOMER_ID="ci" for clarity.Since this is the gcp_ci team, “ci” is less ambiguous than “dev”.
- "CUSTOMER_ID": "dev", + "CUSTOMER_ID": "ci", ... - "CUSTOMER_ID": "dev", + "CUSTOMER_ID": "ci",Also applies to: 101-114
api/python/test/canary/compiled/staging_queries/gcp_ci/exports.user_activities__0 (1)
127-127
: Improve BigQuery partition pruning.Use _PARTITIONDATE (or direct _PARTITIONTIME) to enable pruning; wrapping in TIMESTAMP_TRUNC may scan more data.
- "query": "\n SELECT \n * \n FROM demo.`user-activities`\n WHERE \n TIMESTAMP_TRUNC(_PARTITIONTIME, DAY) BETWEEN {{ start_date }} AND {{ end_date }}\n ", + "query": "\n SELECT \n * \n FROM demo.`user-activities`\n WHERE \n _PARTITIONDATE BETWEEN {{ start_date }} AND {{ end_date }}\n ",api/python/test/canary/joins/gcp_ci/training_set.py (1)
23-30
: Normalize row_ids to list for consistency.The API accepts string or list; using a list across all joins avoids surprises.
- row_ids="user_id", + row_ids=["user_id"], ... - row_ids="user_id", + row_ids=["user_id"], ... - row_ids="user_id", + row_ids=["user_id"],Also applies to: 32-39, 41-48
api/python/test/canary/compiled/joins/gcp_ci/item_event_join.canary_combined_v1__0 (1)
371-381
: Specify engineType explicitly to avoid engine driftThe compiled canary config doesn’t include an
engineType
field—adding it makes the runtime engine unambiguous.• File:
api/python/test/canary/compiled/joins/gcp_ci/item_event_join.canary_combined_v1__0
(around line 371)
• Suggestion: insert anengineType
entry, e.g.:{ + "engineType": "<desired_engine>", // e.g. "spark", "flink", etc. "scheduleCron": "@daily" … }
api/python/test/canary/compiled/group_bys/gcp_ci/purchases.v1_dev_notds__0 (2)
92-92
: Bucket mismatch: dataproc uses canary, env uses dev.Align artifact buckets to avoid split writes/reads.
- "dataproc.config": "{\"gceClusterConfig\": {\"subnetworkUri\": \"default\", \"serviceAccount\": \"[email protected]\", \"serviceAccountScopes\": [\"https://www.googleapis.com/auth/cloud-platform\", \"https://www.googleapis.com/auth/cloud.useraccounts.readonly\", \"https://www.googleapis.com/auth/devstorage.read_write\", \"https://www.googleapis.com/auth/logging.write\"], \"metadata\": {\"hive-version\": \"3.1.2\", \"SPARK_BQ_CONNECTOR_URL\": \"gs://spark-lib/bigquery/spark-3.5-bigquery-0.42.1.jar\", \"artifact_prefix\": \"gs://zipline-artifacts-canary\"}, \"tags\": []}, ... + "dataproc.config": "{\"gceClusterConfig\": {\"subnetworkUri\": \"default\", \"serviceAccount\": \"[email protected]\", \"serviceAccountScopes\": [\"https://www.googleapis.com/auth/cloud-platform\", \"https://www.googleapis.com/auth/cloud.useraccounts.readonly\", \"https://www.googleapis.com/auth/devstorage.read_write\", \"https://www.googleapis.com/auth/logging.write\"], \"metadata\": {\"hive-version\": \"3.1.2\", \"SPARK_BQ_CONNECTOR_URL\": \"gs://spark-lib/bigquery/spark-3.5-bigquery-0.42.1.jar\", \"artifact_prefix\": \"gs://zipline-artifacts-canary\"}, \"tags\": []}, ...
158-177
: Use canary artifacts in CI env.Switch ARTIFACT_PREFIX (and optionally CUSTOMER_ID) to CI.
- "ARTIFACT_PREFIX": "gs://zipline-artifacts-dev", + "ARTIFACT_PREFIX": "gs://zipline-artifacts-canary", - "CUSTOMER_ID": "dev", + "CUSTOMER_ID": "ci", ... - "ARTIFACT_PREFIX": "gs://zipline-artifacts-dev", + "ARTIFACT_PREFIX": "gs://zipline-artifacts-canary", - "CUSTOMER_ID": "dev", + "CUSTOMER_ID": "ci",Also applies to: 180-198
api/python/test/canary/compiled/joins/gcp_ci/training_set.v1_test__0 (1)
218-236
: CI env still uses dev artifact bucket.Align to canary.
- "ARTIFACT_PREFIX": "gs://zipline-artifacts-dev", + "ARTIFACT_PREFIX": "gs://zipline-artifacts-canary", - "CUSTOMER_ID": "dev", + "CUSTOMER_ID": "ci", ... - "ARTIFACT_PREFIX": "gs://zipline-artifacts-dev", + "ARTIFACT_PREFIX": "gs://zipline-artifacts-canary", - "CUSTOMER_ID": "dev", + "CUSTOMER_ID": "ci",Also applies to: 239-257
api/python/test/canary/compiled/staging_queries/gcp_ci/sample_staging_query.order__0 (1)
76-94
: Use canary artifact bucket for CI.- "ARTIFACT_PREFIX": "gs://zipline-artifacts-dev", + "ARTIFACT_PREFIX": "gs://zipline-artifacts-canary", - "CUSTOMER_ID": "dev", + "CUSTOMER_ID": "ci", ... - "ARTIFACT_PREFIX": "gs://zipline-artifacts-dev", + "ARTIFACT_PREFIX": "gs://zipline-artifacts-canary", - "CUSTOMER_ID": "dev", + "CUSTOMER_ID": "ci",Also applies to: 97-115
api/python/test/canary/compiled/group_bys/gcp_ci/dim_merchants.v1__0 (1)
83-101
: CI env uses dev artifact bucket.Align to canary for consistency.
- "ARTIFACT_PREFIX": "gs://zipline-artifacts-dev", + "ARTIFACT_PREFIX": "gs://zipline-artifacts-canary", - "CUSTOMER_ID": "dev", + "CUSTOMER_ID": "ci", ... - "ARTIFACT_PREFIX": "gs://zipline-artifacts-dev", + "ARTIFACT_PREFIX": "gs://zipline-artifacts-canary", - "CUSTOMER_ID": "dev", + "CUSTOMER_ID": "ci",Also applies to: 104-122
api/python/test/canary/compiled/group_bys/gcp_ci/purchases.v1_test__0 (1)
158-177
: CI env still points to dev artifacts.Switch to canary bucket (and CUSTOMER_ID=ci if applicable).
- "ARTIFACT_PREFIX": "gs://zipline-artifacts-dev", + "ARTIFACT_PREFIX": "gs://zipline-artifacts-canary", - "CUSTOMER_ID": "dev", + "CUSTOMER_ID": "ci", ... - "ARTIFACT_PREFIX": "gs://zipline-artifacts-dev", + "ARTIFACT_PREFIX": "gs://zipline-artifacts-canary", - "CUSTOMER_ID": "dev", + "CUSTOMER_ID": "ci",Also applies to: 180-198
api/python/test/canary/compiled/staging_queries/gcp_ci/sample_staging_query.cart__0 (2)
80-80
: Confirm CUSTOMER_ID=dev for CI.If CI isolation is desired, consider CUSTOMER_ID=ci or canary.
Also applies to: 101-101
9-9
: Drop unnecessary Dataproc optional components to cut CI cost.If unused, remove FLINK and JUPYTER from optionalComponents in the generator (not necessarily here).
api/python/test/canary/compiled/joins/gcp_ci/training_set.v1_dev__0 (1)
223-223
: Confirm CUSTOMER_ID=dev for CI.Switch to ci/canary if you need isolation.
Also applies to: 243-243
api/python/test/canary/compiled/staging_queries/gcp_ci/sample_staging_query.user__0 (1)
80-80
: Confirm CUSTOMER_ID=dev for CI.Use a CI-specific value if isolation is required.
Also applies to: 101-101
api/python/test/canary/compiled/staging_queries/gcp_ci/sample_staging_query.terminal__0 (1)
80-80
: Confirm CUSTOMER_ID=dev for CI.Consider ci/canary for isolation.
Also applies to: 101-101
api/python/test/canary/compiled/teams_metadata/gcp_ci/gcp_ci_team_metadata (1)
81-91
: Confirm CUSTOMER_ID=dev and optional components.
- CUSTOMER_ID=dev: OK if intended; else use ci/canary.
- Consider removing FLINK/JUPYTER from Dataproc optionalComponents in the generator to cut CI cost.
Also applies to: 103-113
api/python/test/canary/compiled/group_bys/gcp_ci/item_event_canary.actions_pubsub_v2__0 (3)
132-132
: Use canary artifact bucket for CI (currently points to dev).Align env with dataproc.config artifact_prefix.
- "ARTIFACT_PREFIX": "gs://zipline-artifacts-dev", + "ARTIFACT_PREFIX": "gs://zipline-artifacts-canary",- "ARTIFACT_PREFIX": "gs://zipline-artifacts-dev", + "ARTIFACT_PREFIX": "gs://zipline-artifacts-canary",Also applies to: 153-153
133-133
: Kafka bootstrap arg + Pub/Sub = mixed signals.CHRONON_ONLINE_ARGS includes Kafka bootstrap while ENABLE_PUBSUB=true. Confirm intended runtime and drop irrelevant arg.
Also applies to: 154-154
136-136
: CUSTOMER_ID=dev in CI config.Consider setting a CI-specific value (e.g., "ci") to avoid mixing environments.
Also applies to: 157-157
api/python/test/canary/compiled/group_bys/gcp_ci/user_activities.v1__0 (1)
464-464
: Use canary artifact bucket for CI (currently points to dev).Keep env consistent with canary GCS paths.
- "ARTIFACT_PREFIX": "gs://zipline-artifacts-dev", + "ARTIFACT_PREFIX": "gs://zipline-artifacts-canary",- "ARTIFACT_PREFIX": "gs://zipline-artifacts-dev", + "ARTIFACT_PREFIX": "gs://zipline-artifacts-canary",Also applies to: 485-485
api/python/test/canary/compiled/staging_queries/gcp_ci/sample_staging_query.shipping__0 (1)
76-76
: Use canary artifact bucket for CI (currently points to dev).- "ARTIFACT_PREFIX": "gs://zipline-artifacts-dev", + "ARTIFACT_PREFIX": "gs://zipline-artifacts-canary",- "ARTIFACT_PREFIX": "gs://zipline-artifacts-dev", + "ARTIFACT_PREFIX": "gs://zipline-artifacts-canary",Also applies to: 97-97
api/python/test/canary/compiled/staging_queries/gcp_ci/sample_staging_query.item__0 (1)
76-76
: Use canary artifact bucket for CI (currently points to dev).- "ARTIFACT_PREFIX": "gs://zipline-artifacts-dev", + "ARTIFACT_PREFIX": "gs://zipline-artifacts-canary",- "ARTIFACT_PREFIX": "gs://zipline-artifacts-dev", + "ARTIFACT_PREFIX": "gs://zipline-artifacts-canary",Also applies to: 97-97
api/python/test/canary/compiled/joins/gcp_ci/demo.v1__0 (2)
630-630
: Duplicate Airflow dependency (user_activities listed twice).Drop the duplicate entry.
- "customJson": "{\"airflowDependencies\": [{\"name\": \"wf_data_gcp_exports_user_activities__0_with_offset_0\", \"spec\": \"data.gcp_exports_user_activities__0/ds={{ macros.ds_add(ds, 0) }}\"}, {\"name\": \"wf_data_gcp_exports_user_activities__0_with_offset_0\", \"spec\": \"data.gcp_exports_user_activities__0/ds={{ macros.ds_add(ds, 0) }}\"}, {\"name\": \"wf_data_gcp_exports_dim_listings__0_with_offset_0\", \"spec\": \"data.gcp_exports_dim_listings__0/ds={{ macros.ds_add(ds, 0) }}\"}, {\"name\": \"wf_data_gcp_exports_dim_merchants__0_with_offset_0\", \"spec\": \"data.gcp_exports_dim_merchants__0/ds={{ macros.ds_add(ds, 0) }}\"}], \"label_join\": false}", + "customJson": "{\"airflowDependencies\": [{\"name\": \"wf_data_gcp_exports_user_activities__0_with_offset_0\", \"spec\": \"data.gcp_exports_user_activities__0/ds={{ macros.ds_add(ds, 0) }}\"}, {\"name\": \"wf_data_gcp_exports_dim_listings__0_with_offset_0\", \"spec\": \"data.gcp_exports_dim_listings__0/ds={{ macros.ds_add(ds, 0) }}\"}, {\"name\": \"wf_data_gcp_exports_dim_merchants__0_with_offset_0\", \"spec\": \"data.gcp_exports_dim_merchants__0/ds={{ macros.ds_add(ds, 0) }}\"}], \"label_join\": false}",
703-703
: Use canary artifact bucket for CI (currently points to dev).- "ARTIFACT_PREFIX": "gs://zipline-artifacts-dev", + "ARTIFACT_PREFIX": "gs://zipline-artifacts-canary",- "ARTIFACT_PREFIX": "gs://zipline-artifacts-dev", + "ARTIFACT_PREFIX": "gs://zipline-artifacts-canary",Also applies to: 724-724
api/python/test/canary/staging_queries/gcp_ci/sample_staging_query.py (3)
18-18
: DRY up repeated table_properties.Use a shared constant to avoid duplication.
- table_properties={"sample_config_json": """{"sample_key": "sample value"}"""}, + table_properties=SHARED_TABLE_PROPS,Add once near the top of the file:
SHARED_TABLE_PROPS = {"sample_config_json": '{"sample_key": "sample value"}'}Also applies to: 43-43, 64-64, 82-82
33-38
: Avoid SELECT * in UNION ALL.Schema drift will silently break or reorder columns. Prefer an explicit column list (+ category_name) to lock the schema.
56-63
: Make labels deterministic if used in tests.rand() will cause flaky assertions. Consider a hash/mod based label from a stable key (e.g., user_id).
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (46)
api/python/test/canary/compiled/group_bys/gcp_ci/dim_listings.v1__0
(1 hunks)api/python/test/canary/compiled/group_bys/gcp_ci/dim_merchants.v1__0
(1 hunks)api/python/test/canary/compiled/group_bys/gcp_ci/item_event_canary.actions_pubsub_v2__0
(1 hunks)api/python/test/canary/compiled/group_bys/gcp_ci/item_event_canary.actions_v1__0
(1 hunks)api/python/test/canary/compiled/group_bys/gcp_ci/purchases.v1_dev__0
(1 hunks)api/python/test/canary/compiled/group_bys/gcp_ci/purchases.v1_dev_notds__0
(1 hunks)api/python/test/canary/compiled/group_bys/gcp_ci/purchases.v1_test__0
(1 hunks)api/python/test/canary/compiled/group_bys/gcp_ci/purchases.v1_test_notds__0
(1 hunks)api/python/test/canary/compiled/group_bys/gcp_ci/user_activities.v1__0
(1 hunks)api/python/test/canary/compiled/joins/gcp_ci/demo.v1__0
(1 hunks)api/python/test/canary/compiled/joins/gcp_ci/item_event_join.canary_batch_v1__0
(1 hunks)api/python/test/canary/compiled/joins/gcp_ci/item_event_join.canary_combined_v1__0
(1 hunks)api/python/test/canary/compiled/joins/gcp_ci/item_event_join.canary_streaming_v1__0
(1 hunks)api/python/test/canary/compiled/joins/gcp_ci/training_set.v1_dev__0
(1 hunks)api/python/test/canary/compiled/joins/gcp_ci/training_set.v1_dev_notds__0
(1 hunks)api/python/test/canary/compiled/joins/gcp_ci/training_set.v1_hub__0
(1 hunks)api/python/test/canary/compiled/joins/gcp_ci/training_set.v1_test__0
(1 hunks)api/python/test/canary/compiled/joins/gcp_ci/training_set.v1_test_notds__0
(1 hunks)api/python/test/canary/compiled/staging_queries/gcp_ci/exports.checkouts__0
(1 hunks)api/python/test/canary/compiled/staging_queries/gcp_ci/exports.dim_listings__0
(1 hunks)api/python/test/canary/compiled/staging_queries/gcp_ci/exports.dim_merchants__0
(1 hunks)api/python/test/canary/compiled/staging_queries/gcp_ci/exports.dim_users__0
(1 hunks)api/python/test/canary/compiled/staging_queries/gcp_ci/exports.user_activities__0
(1 hunks)api/python/test/canary/compiled/staging_queries/gcp_ci/sample_staging_query.cart__0
(1 hunks)api/python/test/canary/compiled/staging_queries/gcp_ci/sample_staging_query.item__0
(1 hunks)api/python/test/canary/compiled/staging_queries/gcp_ci/sample_staging_query.order__0
(1 hunks)api/python/test/canary/compiled/staging_queries/gcp_ci/sample_staging_query.payment__0
(1 hunks)api/python/test/canary/compiled/staging_queries/gcp_ci/sample_staging_query.purchases_labels__0
(1 hunks)api/python/test/canary/compiled/staging_queries/gcp_ci/sample_staging_query.shipping__0
(1 hunks)api/python/test/canary/compiled/staging_queries/gcp_ci/sample_staging_query.terminal__0
(1 hunks)api/python/test/canary/compiled/staging_queries/gcp_ci/sample_staging_query.user__0
(1 hunks)api/python/test/canary/compiled/staging_queries/gcp_ci/sample_staging_query.v1_bigquery_import__0
(1 hunks)api/python/test/canary/compiled/staging_queries/gcp_ci/sample_staging_query.v1_hub__0
(1 hunks)api/python/test/canary/compiled/teams_metadata/gcp_ci/gcp_ci_team_metadata
(1 hunks)api/python/test/canary/group_bys/gcp_ci/dim_listings.py
(1 hunks)api/python/test/canary/group_bys/gcp_ci/dim_merchants.py
(1 hunks)api/python/test/canary/group_bys/gcp_ci/item_event_canary.py
(1 hunks)api/python/test/canary/group_bys/gcp_ci/purchases.py
(1 hunks)api/python/test/canary/group_bys/gcp_ci/user_activities.py
(1 hunks)api/python/test/canary/joins/gcp_ci/demo.py
(1 hunks)api/python/test/canary/joins/gcp_ci/item_event_join.py
(1 hunks)api/python/test/canary/joins/gcp_ci/training_set.py
(1 hunks)api/python/test/canary/staging_queries/gcp_ci/exports.py
(1 hunks)api/python/test/canary/staging_queries/gcp_ci/sample_staging_query.py
(1 hunks)api/python/test/canary/teams.py
(1 hunks)scripts/distribution/run_gcp_hub_quickstart.sh
(1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (12)
scripts/distribution/run_gcp_hub_quickstart.sh (2)
api/python/ai/chronon/repo/zipline.py (1)
zipline
(43-45)api/python/ai/chronon/repo/hub_runner.py (1)
hub
(15-16)
api/python/test/canary/group_bys/gcp_ci/user_activities.py (4)
api/python/ai/chronon/source.py (1)
EventSource
(8-35)api/python/ai/chronon/group_by.py (2)
TimeUnit
(181-183)Window
(247-248)api/python/ai/chronon/query.py (1)
selects
(103-126)api/python/ai/chronon/utils.py (1)
get_staging_query_output_table_name
(313-318)
api/python/test/canary/joins/gcp_ci/training_set.py (2)
api/python/ai/chronon/source.py (1)
EventSource
(8-35)api/python/ai/chronon/query.py (1)
selects
(103-126)
api/python/test/canary/group_bys/gcp_ci/dim_listings.py (3)
api/src/main/scala/ai/chronon/api/Extensions.scala (1)
query
(396-404)api/python/ai/chronon/query.py (1)
selects
(103-126)api/python/ai/chronon/utils.py (1)
get_staging_query_output_table_name
(313-318)
api/python/test/canary/group_bys/gcp_ci/dim_merchants.py (3)
api/src/main/scala/ai/chronon/api/Extensions.scala (1)
query
(396-404)api/python/ai/chronon/query.py (1)
selects
(103-126)api/python/ai/chronon/utils.py (1)
get_staging_query_output_table_name
(313-318)
api/python/test/canary/staging_queries/gcp_ci/exports.py (1)
api/python/ai/chronon/staging_query.py (2)
EngineType
(14-16)TableDependency
(19-41)
api/python/test/canary/joins/gcp_ci/item_event_join.py (2)
api/python/ai/chronon/source.py (1)
EventSource
(8-35)api/python/ai/chronon/query.py (1)
selects
(103-126)
api/python/test/canary/joins/gcp_ci/demo.py (4)
api/src/main/scala/ai/chronon/api/Extensions.scala (1)
query
(396-404)api/python/ai/chronon/query.py (1)
selects
(103-126)api/python/ai/chronon/source.py (1)
EventSource
(8-35)api/python/ai/chronon/utils.py (1)
get_staging_query_output_table_name
(313-318)
api/python/test/canary/group_bys/gcp_ci/item_event_canary.py (3)
api/python/ai/chronon/source.py (1)
EventSource
(8-35)api/python/ai/chronon/group_by.py (1)
Operation
(60-149)api/python/ai/chronon/query.py (1)
selects
(103-126)
api/python/test/canary/group_bys/gcp_ci/purchases.py (3)
api/python/ai/chronon/source.py (1)
EventSource
(8-35)api/python/ai/chronon/group_by.py (3)
Operation
(60-149)TimeUnit
(181-183)Window
(247-248)api/python/ai/chronon/query.py (1)
selects
(103-126)
api/python/test/canary/staging_queries/gcp_ci/sample_staging_query.py (3)
api/python/ai/chronon/staging_query.py (3)
EngineType
(14-16)Import
(43-65)TableDependency
(19-41)api/python/ai/chronon/utils.py (2)
get_join_output_table_name
(326-341)get_staging_query_output_table_name
(313-318)api/src/main/scala/ai/chronon/api/Extensions.scala (1)
query
(396-404)
api/python/test/canary/teams.py (2)
api/python/ai/chronon/repo/constants.py (1)
RunMode
(4-31)api/python/ai/chronon/repo/cluster.py (1)
generate_dataproc_cluster_config
(4-65)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (17)
- GitHub Check: service_tests
- GitHub Check: service_commons_tests
- GitHub Check: api_tests
- GitHub Check: cloud_aws_tests
- GitHub Check: online_tests
- GitHub Check: flink_tests
- GitHub Check: aggregator_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: analyzer_tests
- GitHub Check: streaming_tests
- GitHub Check: groupby_tests
- GitHub Check: join_tests
- GitHub Check: fetcher_tests
- GitHub Check: batch_tests
- GitHub Check: python_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: spark_tests
🔇 Additional comments (31)
api/python/test/canary/group_bys/gcp_ci/dim_listings.py (1)
32-33
: Canary test is GCP-only; BigQuery IF() is fine
The file lives underapi/python/test/canary/group_bys/gcp_ci
—no Spark variant exists. The canary runs on BigQuery, soIF()
syntax is supported. No change needed.api/python/test/canary/group_bys/gcp_ci/user_activities.py (2)
58-60
: No changes needed: Operation.AVERAGE is correct
Verified that the Python binding (api/python/ai/chronon/group_by.py
) definesOperation.AVERAGE
and that there is noOperation.AVG
anywhere in the codebase. This use is consistent with existing tests and implementations—no update required.
24-35
: Ignore SQL dialect refactor for GCP CI
This file targets BigQuery in the GCP CI tests, where IF() and STRUCT(... AS timestamp) are valid syntax. No changes needed.Likely an incorrect or invalid review comment.
api/python/test/canary/compiled/group_bys/gcp_ci/item_event_canary.actions_v1__0 (1)
2-47
: Aggregations look correct.Four 1d SUMs over the indicator columns keyed by listing_id are consistent and straightforward.
api/python/test/canary/compiled/group_bys/gcp_ci/dim_listings.v1__0 (1)
149-176
: Source and selects are sensible.Snapshot source, startPartition, and derived flags (is_expensive/is_in_stock) look good.
api/python/test/canary/group_bys/gcp_ci/item_event_canary.py (1)
30-32
: Confirm time column exists.Ensure the source has a timestamp field named exactly timestamp.
api/python/test/canary/teams.py (2)
182-191
: Cluster config already uses canary artifact prefix.Good alignment with canary infra.
124-141
: ARTIFACT_PREFIX must point to canary bucket
- In api/python/test/canary/teams.py (lines ~69 & ~135), update:
- "ARTIFACT_PREFIX": "gs://zipline-artifacts-dev", + "ARTIFACT_PREFIX": "gs://zipline-artifacts-canary",- Regenerate the compiled fixtures under api/python/test/canary/compiled to reflect this change
- Also review if
CUSTOMER_ID
should be"canary"
for gcp_ciapi/python/test/canary/compiled/joins/gcp_ci/training_set.v1_test_notds__0 (2)
93-98
: Verify team of nested groupBy.Nested metaData shows team "gcp". If CI isolation is desired, this should reference gcp_ci artifacts.
158-165
: Partition column 'ds' vs sources using 'notds'.conf uses spark.chronon.partition.column=ds while sources partition on notds. Confirm intended output partitioning to avoid write-time failures.
Also applies to: 188-196
api/python/test/canary/compiled/joins/gcp_ci/item_event_join.canary_streaming_v1__0 (1)
79-83
: Confirm nested team.Nested groupBy metaData has team "gcp"; should this be gcp_ci for CI isolation?
api/python/test/canary/compiled/joins/gcp_ci/training_set.v1_hub__0 (1)
93-98
: Verify nested team is gcp intentionally.If CI should be fully isolated, consider pointing to gcp_ci groupBys.
api/python/test/canary/compiled/joins/gcp_ci/item_event_join.canary_batch_v1__0 (1)
219-226
: Verify CI canary env valuesI couldn’t locate any other gcp_ci configs, but this still uses
dev
. Please confirm intent or update:• api/python/test/canary/compiled/joins/gcp_ci/item_event_join.canary_batch_v1__0 (219–226):
- "ARTIFACT_PREFIX": "gs://zipline-artifacts-dev", + "ARTIFACT_PREFIX": "gs://zipline-artifacts-canary", … - "CUSTOMER_ID": "dev", + "CUSTOMER_ID": "ci",Ensure no other gcp_ci canary configs remain on
dev
.api/python/test/canary/compiled/group_bys/gcp_ci/purchases.v1_test_notds__0 (1)
158-176
: CI config still carries “dev” env values. Confirm/update.If this artifact is for CI, consider CUSTOMER_ID=ci and a CI/canary artifact prefix.
- "ARTIFACT_PREFIX": "gs://zipline-artifacts-dev", + "ARTIFACT_PREFIX": "gs://zipline-artifacts-canary", ... - "CUSTOMER_ID": "dev", + "CUSTOMER_ID": "ci",api/python/test/canary/compiled/joins/gcp_ci/training_set.v1_dev_notds__0 (1)
219-238
: CI env still “dev”. Verify or switch.Same concern as other artifacts (CUSTOMER_ID, ARTIFACT_PREFIX).
- "ARTIFACT_PREFIX": "gs://zipline-artifacts-dev", + "ARTIFACT_PREFIX": "gs://zipline-artifacts-canary", ... - "CUSTOMER_ID": "dev", + "CUSTOMER_ID": "ci",api/python/test/canary/compiled/group_bys/gcp_ci/purchases.v1_dev__0 (1)
159-176
: Dev-stamped env in CI artifact. Confirm.Consider setting CUSTOMER_ID=ci and CI/canary artifact prefix if these run in CI.
- "ARTIFACT_PREFIX": "gs://zipline-artifacts-dev", + "ARTIFACT_PREFIX": "gs://zipline-artifacts-canary", ... - "CUSTOMER_ID": "dev", + "CUSTOMER_ID": "ci",api/python/test/canary/compiled/staging_queries/gcp_ci/sample_staging_query.purchases_labels__0 (1)
1-4
: Confirm default engineType for staging queries
EngineType is only set in exports and the BigQuery import query but omitted here. Verify that the scheduler defaults to the correct engine or add an explicit"engineType": 1
.• File: api/python/test/canary/compiled/staging_queries/gcp_ci/sample_staging_query.purchases_labels__0 (lines 1–4)
api/python/test/canary/compiled/staging_queries/gcp_ci/sample_staging_query.v1_hub__0 (1)
3-11
: Offset (+1) matches timeUnit (1); compiled upstream present
- api/python/test/canary/compiled/staging_queries/gcp_ci/sample_staging_query.v1_hub__0 (lines 3, 146)
- api/python/test/canary/compiled/staging_queries/gcp_ci/sample_staging_query.v1_bigquery_import__0 (lines 4, 144)
api/python/test/canary/joins/gcp_ci/training_set.py (2)
1-1
: Confirm cross-team dependency is intentional.This gcp_ci join imports group_bys from group_bys.gcp.purchases. If isolation is desired, consider group_bys.gcp_ci instead.
32-37
: Double-check v1_hub uses the intended purchases variant.You reference purchases.v1_test in v1_hub; ensure this is not meant to be a hub-specific or dev variant.
api/python/test/canary/compiled/joins/gcp_ci/item_event_join.canary_combined_v1__0 (1)
223-234
: Verify left/source tables are correct (compat vs compat_partitioned).Left uses data.item_events_parquet_compat, while the group-by reads compat_partitioned; ensure both are intended and dependencies cover both (they do), and that partition pruning is acceptable.
Also applies to: 256-263
api/python/test/canary/compiled/joins/gcp_ci/training_set.v1_test__0 (1)
5-70
: Join aggregations: looks good.Windows and ops are consistent with purchases GBs.
api/python/test/canary/compiled/staging_queries/gcp_ci/sample_staging_query.order__0 (1)
141-147
: Table dependency should track CI table.- "table": "data.gcp_training_set_v1_test__0" + "table": "data.gcp_ci_training_set_v1_test__0"If the canonical naming omits team, ignore; otherwise update compiled sources accordingly. I can generate a PR-wide check to confirm all references if you want.
api/python/test/canary/compiled/group_bys/gcp_ci/purchases.v1_test__0 (1)
2-66
: Purchases aggregations: looks good.Windows/ops consistent and sane.
api/python/test/canary/compiled/joins/gcp_ci/training_set.v1_dev__0 (1)
93-98
: Mixed team identifiers (gcp vs gcp_ci).Inner groupBy metaData uses team/name under gcp while the top-level uses gcp_ci. Verify this is intentional to reference a shared gcp artifact; otherwise recompile under gcp_ci.
Also applies to: 262-269
api/python/test/canary/compiled/staging_queries/gcp_ci/sample_staging_query.terminal__0 (1)
129-129
: Manual schema check required for UNION ALL
– Nocategory_name
alias found in compiled queries.
– Please confirm all six source tables expose thecategory_name
column and identicalds
partitioning to avoid runtime failures.api/python/test/canary/compiled/group_bys/gcp_ci/item_event_canary.actions_pubsub_v2__0 (1)
73-75
: Partition column mismatch (_DATE vs ds).Conf uses _DATE while env uses ds. Ensure downstream tooling expects the correct partition key.
Also applies to: 147-149
api/python/test/canary/compiled/staging_queries/gcp_ci/sample_staging_query.shipping__0 (1)
132-139
: Verify dependency window offsets.startOffset=1 and endOffset=1 is unusual for daily partitions; confirm this is intentional (often endOffset=0).
api/python/test/canary/staging_queries/gcp_ci/sample_staging_query.py (3)
7-23
: LGTM: Jinja braces correctly escaped in f-strings.Query construction and dependency shape look sane.
20-21
: Mixed dependency offsets (0 vs 1) — confirm intent.You use offset=1 for most staging deps, but offset=0 for purchases_labels and the BigQuery import. Validate these align with data availability/SLA.
Also applies to: 46-51, 67-68, 84-85, 101-101
96-105
: BIGQUERY Import usage looks correct.Exactly one dependency with a partition column; engine specified.
"keyColumns": [ | ||
"listing_id" | ||
], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Key column mismatch: listing_id mapped from merchant_id.
Likely wrong primary key; use merchant_id consistently.
- "keyColumns": [
- "listing_id"
- ],
+ "keyColumns": [
+ "merchant_id"
+ ],
...
- "listing_id": "merchant_id",
+ "merchant_id": "merchant_id",
Follow-up: regenerate hashes if schema changes.
Also applies to: 139-142
🤖 Prompt for AI Agents
In api/python/test/canary/compiled/group_bys/gcp_ci/dim_merchants.v1__0 around
lines 2-4 (and also lines 139-142) the keyColumns array incorrectly lists
"listing_id" while the schema/field mappings use "merchant_id"; replace
"listing_id" with "merchant_id" so the primary key is consistent across
mappings, then regenerate any dependent hashes/artifacts to reflect the schema
change.
api/python/test/canary/compiled/joins/gcp_ci/item_event_join.canary_batch_v1__0
Outdated
Show resolved
Hide resolved
selects=selects( | ||
user_id="user_id", | ||
listing_id="listing_id", | ||
row_id="event_id" | ||
), | ||
time_column="event_time_ms", | ||
), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Align row_ids with selected column name.
You select row_id="event_id" but set row_ids=["event_id"]. Join will not find event_id on the left.
- selects=selects(
- user_id="user_id",
- listing_id="listing_id",
- row_id="event_id"
- ),
+ selects=selects(
+ user_id="user_id",
+ listing_id="listing_id",
+ event_id="event_id",
+ ),
time_column="event_time_ms",
...
- row_ids=["event_id"], # TODO -- kill this once the SPJ API change goes through
+ row_ids=["event_id"], # TODO -- kill this once the SPJ API change goes through
Also applies to: 37-37
🤖 Prompt for AI Agents
In api/python/test/canary/joins/gcp_ci/demo.py around lines 25-31 (and also line
37), the chosen row_id name on the right side ("event_id") does not match the
column name used on the left in selects, causing the join to fail to find the
key; make the row_id name align with the selected column names on both sides —
either change selects(...) to expose the same column name used in row_ids or
update row_ids to reference the actual selected column name on the left (and
mirror this change at line 37) so the join keys match exactly.
@@ -0,0 +1,52 @@ | |||
from group_bys.gcp import item_event_canary, purchases | |||
|
|||
from ai.chronon.api.ttypes import EventSource, Source |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wrong imports for EventSource/Source.
Use the helper factory from ai.chronon.source; don’t import thrift types directly here.
-from ai.chronon.api.ttypes import EventSource, Source
+from ai.chronon.source import EventSource
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
from ai.chronon.api.ttypes import EventSource, Source | |
from ai.chronon.source import EventSource |
🤖 Prompt for AI Agents
In api/python/test/canary/joins/gcp_ci/item_event_join.py around line 3, the
code incorrectly imports thrift types EventSource and Source directly from
ai.chronon.api.ttypes; replace that direct import by using the helper factory
from ai.chronon.source (import the factory function/class provided there and use
it to construct EventSource/Source instances) so tests use the public factory
API instead of native thrift types.
source = Source( | ||
events=EventSource( | ||
table="data.item_events_parquet_compat", | ||
query=Query( | ||
selects=selects( | ||
listing_id="EXPLODE(TRANSFORM(SPLIT(COALESCE(attributes['sold_listing_ids'], attributes['listing_id']), ','), e -> CAST(e AS LONG)))", | ||
user_id="attributes['user_id']", | ||
), | ||
time_column="timestamp", | ||
), | ||
) | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Incorrect Source construction (double-wrapping).
EventSource(...) already returns a Source; wrapping it in Source(...) is invalid.
-source = Source(
- events=EventSource(
- table="data.item_events_parquet_compat",
- query=Query(
- selects=selects(
- listing_id="EXPLODE(TRANSFORM(SPLIT(COALESCE(attributes['sold_listing_ids'], attributes['listing_id']), ','), e -> CAST(e AS LONG)))",
- user_id="attributes['user_id']",
- ),
- time_column="timestamp",
- ),
- )
-)
+source = EventSource(
+ table="data.item_events_parquet_compat",
+ query=Query(
+ selects=selects(
+ listing_id="EXPLODE(TRANSFORM(SPLIT(COALESCE(attributes['sold_listing_ids'], attributes['listing_id']), ','), e -> CAST(e AS LONG)))",
+ user_id="attributes['user_id']",
+ ),
+ time_column="timestamp",
+ ),
+)
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
source = Source( | |
events=EventSource( | |
table="data.item_events_parquet_compat", | |
query=Query( | |
selects=selects( | |
listing_id="EXPLODE(TRANSFORM(SPLIT(COALESCE(attributes['sold_listing_ids'], attributes['listing_id']), ','), e -> CAST(e AS LONG)))", | |
user_id="attributes['user_id']", | |
), | |
time_column="timestamp", | |
), | |
) | |
) | |
source = EventSource( | |
table="data.item_events_parquet_compat", | |
query=Query( | |
selects=selects( | |
listing_id="EXPLODE(TRANSFORM(SPLIT(COALESCE(attributes['sold_listing_ids'], attributes['listing_id']), ','), e -> CAST(e AS LONG)))", | |
user_id="attributes['user_id']", | |
), | |
time_column="timestamp", | |
), | |
) |
🤖 Prompt for AI Agents
In api/python/test/canary/joins/gcp_ci/item_event_join.py around lines 7 to 18,
the code incorrectly wraps an EventSource in Source (double-wrapping); replace
the outer Source(...) with a direct assignment of the EventSource to the source
variable so that source = EventSource(...), ensuring the EventSource call
remains unchanged (same table, query, selects, time_column) and remove the
invalid outer constructor.
api/python/test/canary/staging_queries/gcp_ci/sample_staging_query.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 8
♻️ Duplicate comments (7)
api/python/test/canary/compiled/joins/gcp_ci/training_set.v1_dev_notds__0 (1)
93-98
: Resolved: nested GroupBy team now gcp_ci.Past concern addressed; the nested GB metadata correctly uses team "gcp_ci".
api/python/test/canary/compiled/joins/gcp_ci/training_set.v1_dev__0 (1)
218-236
: Use canary artifact bucket for CI.Both env blocks still reference dev artifacts; align to canary.
- "ARTIFACT_PREFIX": "gs://zipline-artifacts-dev", + "ARTIFACT_PREFIX": "gs://zipline-artifacts-canary", @@ - "ARTIFACT_PREFIX": "gs://zipline-artifacts-dev", + "ARTIFACT_PREFIX": "gs://zipline-artifacts-canary",Also applies to: 239-257
api/python/test/canary/compiled/group_bys/gcp_ci/dim_merchants.v1__0 (1)
2-4
: Fix primary key: use merchant_id consistently (keyColumns and selects).listing_id is incorrectly mapped from merchant_id. Set the PK to merchant_id and select it under the same name; then regenerate columnHashes and any downstream artifacts/tests.
- "keyColumns": [ - "listing_id" - ], + "keyColumns": [ + "merchant_id" + ],- "selects": { - "listing_id": "merchant_id", - "primary_category": "primary_category" - }, + "selects": { + "merchant_id": "merchant_id", + "primary_category": "primary_category" + },Also applies to: 139-146
api/python/test/canary/compiled/staging_queries/gcp_ci/sample_staging_query.purchases_labels__0 (1)
76-94
: Align ARTIFACT_PREFIX with canary.Same mismatch as other files; switch -dev to -canary.
- "ARTIFACT_PREFIX": "gs://zipline-artifacts-dev", + "ARTIFACT_PREFIX": "gs://zipline-artifacts-canary", ... - "ARTIFACT_PREFIX": "gs://zipline-artifacts-dev", + "ARTIFACT_PREFIX": "gs://zipline-artifacts-canary",Also applies to: 96-115
api/python/test/canary/compiled/staging_queries/gcp_ci/sample_staging_query.cart__0 (1)
76-76
: Use canary artifact bucket for CI.Same mismatch as previously noted; switch dev → canary.
- "ARTIFACT_PREFIX": "gs://zipline-artifacts-dev", + "ARTIFACT_PREFIX": "gs://zipline-artifacts-canary", @@ - "ARTIFACT_PREFIX": "gs://zipline-artifacts-dev", + "ARTIFACT_PREFIX": "gs://zipline-artifacts-canary",Also applies to: 97-97
api/python/test/canary/compiled/staging_queries/gcp_ci/sample_staging_query.user__0 (1)
76-76
: Use canary artifact bucket for CI.Align ARTIFACT_PREFIX with canary.
- "ARTIFACT_PREFIX": "gs://zipline-artifacts-dev", + "ARTIFACT_PREFIX": "gs://zipline-artifacts-canary", @@ - "ARTIFACT_PREFIX": "gs://zipline-artifacts-dev", + "ARTIFACT_PREFIX": "gs://zipline-artifacts-canary",Also applies to: 97-97
api/python/test/canary/compiled/staging_queries/gcp_ci/sample_staging_query.v1_hub__0 (1)
76-76
: Use canary artifact bucket for CI.Fix remaining dev bucket refs.
- "ARTIFACT_PREFIX": "gs://zipline-artifacts-dev", + "ARTIFACT_PREFIX": "gs://zipline-artifacts-canary", @@ - "ARTIFACT_PREFIX": "gs://zipline-artifacts-dev", + "ARTIFACT_PREFIX": "gs://zipline-artifacts-canary",Also applies to: 97-97
🧹 Nitpick comments (12)
api/python/test/canary/compiled/joins/gcp_ci/training_set.v1_dev_notds__0 (1)
218-238
: Use canary artifact bucket for CI.ARTIFACT_PREFIX still points to dev; switch to canary for CI isolation.
- "ARTIFACT_PREFIX": "gs://zipline-artifacts-dev", + "ARTIFACT_PREFIX": "gs://zipline-artifacts-canary", @@ - "ARTIFACT_PREFIX": "gs://zipline-artifacts-dev", + "ARTIFACT_PREFIX": "gs://zipline-artifacts-canary",Also applies to: 239-259
api/python/test/canary/compiled/joins/gcp_ci/training_set.v1_test_notds__0 (1)
220-238
: Use canary artifact bucket for CI (test_notds variant).Mirror the change here to avoid writing to dev in CI.
- "ARTIFACT_PREFIX": "gs://zipline-artifacts-dev", + "ARTIFACT_PREFIX": "gs://zipline-artifacts-canary", @@ - "ARTIFACT_PREFIX": "gs://zipline-artifacts-dev", + "ARTIFACT_PREFIX": "gs://zipline-artifacts-canary",Also applies to: 241-259
api/python/test/canary/compiled/group_bys/gcp_ci/dim_merchants.v1__0 (1)
83-83
: Align artifact bucket to canary.Env points to dev artifacts while cluster uses canary. Use the canary bucket to avoid confusion/misroutes.
- "ARTIFACT_PREFIX": "gs://zipline-artifacts-dev", + "ARTIFACT_PREFIX": "gs://zipline-artifacts-canary",- "ARTIFACT_PREFIX": "gs://zipline-artifacts-dev", + "ARTIFACT_PREFIX": "gs://zipline-artifacts-canary",Also applies to: 104-104
api/python/test/canary/compiled/joins/gcp_ci/demo.v1__0 (2)
630-630
: Duplicate Airflow dependency; dedupe the user_activities entry.The same dependency is listed twice; remove the duplicate to avoid scheduler noise.
- "customJson": "{\"airflowDependencies\": [{\"name\": \"wf_data_gcp_ci_exports_user_activities__0_with_offset_0\", \"spec\": \"data.gcp_ci_exports_user_activities__0/ds={{ macros.ds_add(ds, 0) }}\"}, {\"name\": \"wf_data_gcp_ci_exports_user_activities__0_with_offset_0\", \"spec\": \"data.gcp_ci_exports_user_activities__0/ds={{ macros.ds_add(ds, 0) }}\"}, {\"name\": \"wf_data_gcp_ci_exports_dim_listings__0_with_offset_0\", \"spec\": \"data.gcp_ci_exports_dim_listings__0/ds={{ macros.ds_add(ds, 0) }}\"}, {\"name\": \"wf_data_gcp_ci_exports_dim_merchants__0_with_offset_0\", \"spec\": \"data.gcp_ci_exports_dim_merchants__0/ds={{ macros.ds_add(ds, 0) }}\"}], \"label_join\": false}", + "customJson": "{\"airflowDependencies\": [{\"name\": \"wf_data_gcp_ci_exports_user_activities__0_with_offset_0\", \"spec\": \"data.gcp_ci_exports_user_activities__0/ds={{ macros.ds_add(ds, 0) }}\"}, {\"name\": \"wf_data_gcp_ci_exports_dim_listings__0_with_offset_0\", \"spec\": \"data.gcp_ci_exports_dim_listings__0/ds={{ macros.ds_add(ds, 0) }}\"}, {\"name\": \"wf_data_gcp_ci_exports_dim_merchants__0_with_offset_0\", \"spec\": \"data.gcp_ci_exports_dim_merchants__0/ds={{ macros.ds_add(ds, 0) }}\"}], \"label_join\": false}",
703-703
: Use canary artifact bucket in env.Dev artifact prefix here may publish to the wrong bucket for CI canary runs.
- "ARTIFACT_PREFIX": "gs://zipline-artifacts-dev", + "ARTIFACT_PREFIX": "gs://zipline-artifacts-canary",- "ARTIFACT_PREFIX": "gs://zipline-artifacts-dev", + "ARTIFACT_PREFIX": "gs://zipline-artifacts-canary",Also applies to: 724-724
api/python/test/canary/compiled/group_bys/gcp_ci/dim_listings.v1__0 (1)
97-97
: Align artifact bucket to canary.Consistent with other CI envs, switch ARTIFACT_PREFIX to canary.
- "ARTIFACT_PREFIX": "gs://zipline-artifacts-dev", + "ARTIFACT_PREFIX": "gs://zipline-artifacts-canary",- "ARTIFACT_PREFIX": "gs://zipline-artifacts-dev", + "ARTIFACT_PREFIX": "gs://zipline-artifacts-canary",Also applies to: 118-118
api/python/test/canary/compiled/joins/gcp_ci/item_event_join.canary_streaming_v1__0 (2)
69-74
: Drop nested ONLINE_ARGS (Kafka bootstrap) to avoid shadowing and wrong transport.This Kafka bootstrap conflicts with Pub/Sub and duplicates global env.
Apply this diff:
- "env": { - "common": { - "CHRONON_ONLINE_ARGS": "-Ztasks=4 -Zbootstrap=bootstrap.zipline-kafka-cluster.us-central1.managedkafka.canary-443022.cloud.goog:9092" - }, - "modeEnvironments": {} - },
207-208
: Replace placeholders before enabling outside tests.[ONLINE-TODO] and [STREAMING-TODO] placeholders should be resolved or removed to avoid accidental usage in CI.
Also applies to: 217-218, 228-229, 238-239
api/python/test/canary/compiled/staging_queries/gcp_ci/sample_staging_query.payment__0 (1)
88-90
: HADOOP_DIR placeholder.If unused for staging, leave empty or document; placeholders can confuse operators.
Also applies to: 109-111
api/python/test/canary/compiled/staging_queries/gcp_ci/sample_staging_query.purchases_labels__0 (1)
129-129
: Non-deterministic labels via rand().If test outputs must be reproducible, derive label from a stable hash of an ID instead of rand().
api/python/test/canary/staging_queries/gcp_ci/sample_staging_query.py (2)
58-63
: Make labels deterministic for stable CI runs.Seed rand() to avoid flakiness across runs/partitions.
- case when rand() < 0.5 then 0 else 1 end as label + case when rand(42) < 0.5 then 0 else 1 end as label
45-52
: Reduce duplication when listing terminal dependencies.Build the list via comprehension for maintainability.
- dependencies=[ - TableDependency(table=get_staging_query_output_table_name(cart, True), partition_column="ds", offset=1), - TableDependency(table=get_staging_query_output_table_name(user, True), partition_column="ds", offset=1), - TableDependency(table=get_staging_query_output_table_name(item, True), partition_column="ds", offset=1), - TableDependency(table=get_staging_query_output_table_name(order, True), partition_column="ds", offset=1), - TableDependency(table=get_staging_query_output_table_name(payment, True), partition_column="ds", offset=1), - TableDependency(table=get_staging_query_output_table_name(shipping, True), partition_column="ds", offset=1), - ], + dependencies=[TableDependency( + table=get_staging_query_output_table_name(sq, True), partition_column="ds", offset=1 + ) for sq in [cart, user, item, order, payment, shipping]],
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (28)
api/python/test/canary/compiled/group_bys/gcp_ci/dim_listings.v1__0
(1 hunks)api/python/test/canary/compiled/group_bys/gcp_ci/dim_merchants.v1__0
(1 hunks)api/python/test/canary/compiled/group_bys/gcp_ci/user_activities.v1__0
(1 hunks)api/python/test/canary/compiled/joins/gcp_ci/demo.v1__0
(1 hunks)api/python/test/canary/compiled/joins/gcp_ci/item_event_join.canary_batch_v1__0
(1 hunks)api/python/test/canary/compiled/joins/gcp_ci/item_event_join.canary_combined_v1__0
(1 hunks)api/python/test/canary/compiled/joins/gcp_ci/item_event_join.canary_streaming_v1__0
(1 hunks)api/python/test/canary/compiled/joins/gcp_ci/training_set.v1_dev__0
(1 hunks)api/python/test/canary/compiled/joins/gcp_ci/training_set.v1_dev_notds__0
(1 hunks)api/python/test/canary/compiled/joins/gcp_ci/training_set.v1_hub__0
(1 hunks)api/python/test/canary/compiled/joins/gcp_ci/training_set.v1_test__0
(1 hunks)api/python/test/canary/compiled/joins/gcp_ci/training_set.v1_test_notds__0
(1 hunks)api/python/test/canary/compiled/staging_queries/gcp_ci/sample_staging_query.cart__0
(1 hunks)api/python/test/canary/compiled/staging_queries/gcp_ci/sample_staging_query.item__0
(1 hunks)api/python/test/canary/compiled/staging_queries/gcp_ci/sample_staging_query.order__0
(1 hunks)api/python/test/canary/compiled/staging_queries/gcp_ci/sample_staging_query.payment__0
(1 hunks)api/python/test/canary/compiled/staging_queries/gcp_ci/sample_staging_query.purchases_labels__0
(1 hunks)api/python/test/canary/compiled/staging_queries/gcp_ci/sample_staging_query.shipping__0
(1 hunks)api/python/test/canary/compiled/staging_queries/gcp_ci/sample_staging_query.user__0
(1 hunks)api/python/test/canary/compiled/staging_queries/gcp_ci/sample_staging_query.v1_bigquery_import__0
(1 hunks)api/python/test/canary/compiled/staging_queries/gcp_ci/sample_staging_query.v1_hub__0
(1 hunks)api/python/test/canary/group_bys/gcp_ci/dim_listings.py
(1 hunks)api/python/test/canary/group_bys/gcp_ci/dim_merchants.py
(1 hunks)api/python/test/canary/group_bys/gcp_ci/user_activities.py
(1 hunks)api/python/test/canary/joins/gcp_ci/demo.py
(1 hunks)api/python/test/canary/joins/gcp_ci/item_event_join.py
(1 hunks)api/python/test/canary/joins/gcp_ci/training_set.py
(1 hunks)api/python/test/canary/staging_queries/gcp_ci/sample_staging_query.py
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (11)
- api/python/test/canary/joins/gcp_ci/item_event_join.py
- api/python/test/canary/group_bys/gcp_ci/dim_merchants.py
- api/python/test/canary/group_bys/gcp_ci/user_activities.py
- api/python/test/canary/compiled/group_bys/gcp_ci/user_activities.v1__0
- api/python/test/canary/compiled/joins/gcp_ci/item_event_join.canary_batch_v1__0
- api/python/test/canary/joins/gcp_ci/training_set.py
- api/python/test/canary/joins/gcp_ci/demo.py
- api/python/test/canary/group_bys/gcp_ci/dim_listings.py
- api/python/test/canary/compiled/joins/gcp_ci/training_set.v1_test__0
- api/python/test/canary/compiled/joins/gcp_ci/training_set.v1_hub__0
- api/python/test/canary/compiled/joins/gcp_ci/item_event_join.canary_combined_v1__0
🧰 Additional context used
🧬 Code Graph Analysis (1)
api/python/test/canary/staging_queries/gcp_ci/sample_staging_query.py (2)
api/python/ai/chronon/staging_query.py (3)
EngineType
(14-16)Import
(43-65)TableDependency
(19-41)api/python/ai/chronon/utils.py (2)
get_join_output_table_name
(326-341)get_staging_query_output_table_name
(313-318)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (18)
- GitHub Check: cloud_aws_tests
- GitHub Check: api_tests
- GitHub Check: online_tests
- GitHub Check: flink_tests
- GitHub Check: service_commons_tests
- GitHub Check: service_tests
- GitHub Check: cloud_gcp_tests
- GitHub Check: aggregator_tests
- GitHub Check: groupby_tests
- GitHub Check: streaming_tests
- GitHub Check: scala_compile_fmt_fix
- GitHub Check: batch_tests
- GitHub Check: spark_tests
- GitHub Check: fetcher_tests
- GitHub Check: analyzer_tests
- GitHub Check: join_tests
- GitHub Check: python_tests
- GitHub Check: enforce_triggered_workflows
🔇 Additional comments (18)
api/python/test/canary/compiled/joins/gcp_ci/training_set.v1_dev_notds__0 (1)
224-231
: Verify CUSTOMER_ID for GCP CI testsAll generated canary fixtures under api/python/test/canary/compiled/joins/gcp_ci/ use
"CUSTOMER_ID": "dev"
. Confirm that"dev"
is the intended tenant ID for your CI pipelines—if CI needs its own ID (e.g."ci"
), please update the code-generation template accordingly.api/python/test/canary/compiled/joins/gcp_ci/demo.v1__0 (2)
756-758
: RowId mismatch: use the selected alias.You select row_id from event_id but rowIds lists event_id. Prefer row_id for consistency.
- "rowIds": [ - "event_id" - ], + "rowIds": [ + "row_id" + ],
398-403
: LGTM: CI artifacts are correctly referenced (gcp_ci).Names/teams now align with gcp_ci across user_activities, dim_listings, and dim_merchants.
Also applies to: 459-464, 510-515
api/python/test/canary/compiled/group_bys/gcp_ci/dim_listings.v1__0 (1)
2-23
: LGTM: listings dimension looks consistent.Key, selects, and metadata align with the CI exports table.
Also applies to: 149-176
api/python/test/canary/compiled/joins/gcp_ci/item_event_join.canary_streaming_v1__0 (3)
255-256
: LGTM: correct team set to gcp_ci.Matches PR goal of CI-specific team separation.
10-16
: Verify window unit mapping vs column suffix
- File: api/python/test/canary/compiled/joins/gcp_ci/item_event_join.canary_streaming_v1__0
Lines: 10–16, 55–61- Windows are defined with
length=1, timeUnit=1
but column names end with_1d
.- Please confirm that
timeUnit=1
maps to DAY in the underlying proto/enum. If it does not, update thetimeUnit
value or rename the columns to match.
119-121
: Consistent Partitioned Table UsageThe streaming join in
api/python/test/canary/compiled/joins/gcp_ci/item_event_join.canary_streaming_v1__0
is using the partitioned table on one side and the unpartitioned on the other. To leverage partition pruning and keep both sides aligned, please confirm thatdata.item_events_parquet_compat_partitioned
exists and, if so, update the second source:• File: api/python/test/canary/compiled/joins/gcp_ci/item_event_join.canary_streaming_v1__0 (lines 119-121)
- "table": "data.item_events_parquet_compat" + "table": "data.item_events_parquet_compat_partitioned"You can verify both variants in your environment (e.g.
rg -n '"data\.item_events_parquet_compat(_partitioned)?"'
) and adjust accordingly.api/python/test/canary/compiled/staging_queries/gcp_ci/sample_staging_query.v1_bigquery_import__0 (3)
3-5
: Airflow dep correctly points to CI dataset.Uses gcp_ci table with matching 0-day offset. Looks good.
84-91
: Confirm CUSTOMER_ID for CI.Currently "dev". If these are CI-only canaries, consider "ci" to prevent conf bleed.
Also applies to: 106-113
1-148
: CI canary sanity checks results
- ✅ Found
gcp_ci
in teams.py- ✅ “ARTIFACT_PREFIX”: “gs://zipline-artifacts-dev” present in all gcp_ci compiled configs
⚠️ Scan for rawgcp_
refs in gcp_ci staging queries returned many false positives (e.g.cloud_gcp
,gcp_ci
); please refine the grep or manually confirm no non-CIgcp_
dependencies remain inairflowDependencies
specs or SQL FROM clauses.api/python/test/canary/compiled/staging_queries/gcp_ci/sample_staging_query.payment__0 (2)
2-4
: Airflow dep uses CI dataset with +1 offset.Dependency aligns with tableDependencies (start/end offset = 1). Good.
129-129
: Query reads from CI dataset.FROM data.gcp_ci_training_set_v1_test__0 is correct for CI.
api/python/test/canary/compiled/staging_queries/gcp_ci/sample_staging_query.purchases_labels__0 (1)
2-4
: Airflow dep points to CI dataset with 0 offset.Matches tableDependencies. Good.
api/python/test/canary/compiled/staging_queries/gcp_ci/sample_staging_query.order__0 (2)
2-4
: Airflow dep fixed to gcp_ci.References CI dataset and +1 offset; consistent with deps.
129-129
: Query uses CI dataset.Correct CI table in FROM clause.
api/python/test/canary/compiled/staging_queries/gcp_ci/sample_staging_query.item__0 (2)
2-4
: Airflow dep uses CI dataset.Good use of gcp_ci with +1 offset.
129-129
: Query reads from CI dataset.Looks correct.
api/python/test/canary/staging_queries/gcp_ci/sample_staging_query.py (1)
1-1
: Correct import: using gcp_ci joins.Good switch; keeps CI artifacts isolated from non-CI.
"keyColumns": [ | ||
"listing_id" | ||
], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Wrong join key for merchants; joining on listing_id via merchant_id alias is semantically incorrect.
Use merchant_id as the key. Note: left side currently lacks merchant_id; you may need multi-hop join (via dim_listings) or to project merchant_id onto the left before joining.
- "keyColumns": [
- "listing_id"
- ],
+ "keyColumns": [
+ "merchant_id"
+ ],
- "selects": {
- "listing_id": "merchant_id",
- "primary_category": "primary_category"
- },
+ "selects": {
+ "merchant_id": "merchant_id",
+ "primary_category": "primary_category"
+ },
Follow-up:
- If engine supports chaining: join dim_listings first (to expose merchant_id), then join merchants on merchant_id.
- Otherwise, reshape the merchants group-by to be keyed by listing_id with correct source mapping, or add a listing→merchant mapping artifact.
Also applies to: 521-523
🏁 Script executed:
#!/bin/bash
# Display context around the join on listing_id to verify availability of merchant_id
sed -n '480,550p' api/python/test/canary/compiled/joins/gcp_ci/demo.v1__0
Length of output: 2011
Use merchant_id for the merchants join
The second groupBy
is joining dim_merchants
on listing_id
, causing a mismatch. Switch to merchant_id
to join on the correct key.
Locations to fix:
- api/python/test/canary/compiled/joins/gcp_ci/demo.v1__0 lines 498–500
- api/python/test/canary/compiled/joins/gcp_ci/demo.v1__0 lines 521–523
--- a/api/python/test/canary/compiled/joins/gcp_ci/demo.v1__0
@@ lines 498-500
- "keyColumns": [
- "listing_id"
- ],
+ "keyColumns": [
+ "merchant_id"
+ ],
@@ lines 521-523
- "selects": {
- "listing_id": "merchant_id",
- "primary_category": "primary_category"
- },
+ "selects": {
+ "merchant_id": "merchant_id",
+ "primary_category": "primary_category"
+ },
Follow-up: the pipeline already chains dim_listings
first, so merchant_id
is available for the second join.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
"keyColumns": [ | |
"listing_id" | |
], | |
"keyColumns": [ | |
"merchant_id" | |
], | |
"selects": { | |
"merchant_id": "merchant_id", | |
"primary_category": "primary_category" | |
}, |
🤖 Prompt for AI Agents
In api/python/test/canary/compiled/joins/gcp_ci/demo.v1__0 around lines 498–500
and 521–523 the second groupBy incorrectly uses listing_id as the join key for
dim_merchants; change the keyColumns entry from "listing_id" to "merchant_id" in
both locations so the merchants join uses merchant_id (merchant_id is available
because dim_listings is chained first).
"conf": { | ||
"common": { | ||
"spark.chronon.partition.column": "_DATE" | ||
}, | ||
"modeConfigs": {} | ||
}, | ||
"env": { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix inconsistent partition column (_DATE vs ds).
GroupBy meta uses _DATE while global/backfill use ds. This divergence can break reads/writes and backfills. Standardize on ds.
Apply this diff to the GroupBy meta block:
- "common": {
- "spark.chronon.partition.column": "_DATE"
- },
+ "common": {
+ "spark.chronon.partition.column": "ds"
+ },
Also applies to: 142-148, 171-178
🤖 Prompt for AI Agents
In
api/python/test/canary/compiled/joins/gcp_ci/item_event_join.canary_streaming_v1__0
around lines 63-69 (also apply same change at 142-148 and 171-178), the GroupBy
meta uses the partition column "_DATE" while global/backfill use "ds"; change
the partition column key value from "_DATE" to "ds" so the GroupBy meta matches
the rest of the pipeline and avoids read/write/backfill mismatches—update the
"spark.chronon.partition.column" entry under the GroupBy "conf.common" block to
"ds" in each referenced block.
"common": { | ||
"ARTIFACT_PREFIX": "gs://zipline-artifacts-dev", | ||
"CHRONON_ONLINE_ARGS": " -Ztasks=4", | ||
"CHRONON_ONLINE_CLASS": "[ONLINE-TODO]your.online.class", | ||
"CLOUD_PROVIDER": "gcp", | ||
"CUSTOMER_ID": "dev", | ||
"ENABLE_PUBSUB": "true", | ||
"FLINK_STATE_URI": "gs://zipline-warehouse-canary/flink-state", | ||
"FRONTEND_URL": "https://canary-zipline-ui-703996152583.us-central1.run.app", | ||
"GCP_BIGTABLE_INSTANCE_ID": "zipline-canary-instance", | ||
"GCP_DATAPROC_CLUSTER_NAME": "zipline-canary-cluster", | ||
"GCP_PROJECT_ID": "canary-443022", | ||
"GCP_REGION": "us-central1", | ||
"HADOOP_DIR": "[STREAMING-TODO]/path/to/folder/containing", | ||
"HUB_URL": "https://canary-zipline-orchestration-703996152583.us-central1.run.app", | ||
"JOB_MODE": "local[*]", | ||
"PARTITION_COLUMN": "ds", | ||
"PARTITION_FORMAT": "yyyy-MM-dd", | ||
"VERSION": "latest" | ||
}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
CI env still points to dev artifacts and CUSTOMER_ID=dev.
Use canary artifacts and set CUSTOMER_ID=ci for gcp_ci.
Apply this diff to both env blocks:
- "ARTIFACT_PREFIX": "gs://zipline-artifacts-dev",
- "CHRONON_ONLINE_ARGS": " -Ztasks=4",
+ "ARTIFACT_PREFIX": "gs://zipline-artifacts-canary",
+ "CHRONON_ONLINE_ARGS": "-Ztasks=4",
- "CUSTOMER_ID": "dev",
+ "CUSTOMER_ID": "ci",
And in upload:
- "ARTIFACT_PREFIX": "gs://zipline-artifacts-dev",
- "CHRONON_ONLINE_ARGS": " -Ztasks=4",
+ "ARTIFACT_PREFIX": "gs://zipline-artifacts-canary",
+ "CHRONON_ONLINE_ARGS": "-Ztasks=4",
- "CUSTOMER_ID": "dev",
+ "CUSTOMER_ID": "ci",
Also applies to: 226-244
🤖 Prompt for AI Agents
In
api/python/test/canary/compiled/joins/gcp_ci/item_event_join.canary_streaming_v1__0
around lines 204-223 (and similarly update lines 226-244 and the upload env
block), replace the dev artifact and customer values: change "ARTIFACT_PREFIX"
from the dev gs://zipline-artifacts-dev bucket to the canary bucket (e.g.,
gs://zipline-artifacts-canary) and change "CUSTOMER_ID" from "dev" to "ci";
ensure the same replacements are applied to the second env block at 226-244 and
the upload section so all CI GCP env blocks consistently use the canary artifact
prefix and CUSTOMER_ID=ci.
"ARTIFACT_PREFIX": "gs://zipline-artifacts-dev", | ||
"CHRONON_ONLINE_ARGS": " -Ztasks=4", | ||
"CHRONON_ONLINE_CLASS": "[ONLINE-TODO]your.online.class", | ||
"CLOUD_PROVIDER": "gcp", | ||
"CUSTOMER_ID": "dev", | ||
"ENABLE_PUBSUB": "true", | ||
"FLINK_STATE_URI": "gs://zipline-warehouse-canary/flink-state", | ||
"FRONTEND_URL": "https://canary-zipline-ui-703996152583.us-central1.run.app", | ||
"GCP_BIGTABLE_INSTANCE_ID": "zipline-canary-instance", | ||
"GCP_DATAPROC_CLUSTER_NAME": "zipline-canary-cluster", | ||
"GCP_PROJECT_ID": "canary-443022", | ||
"GCP_REGION": "us-central1", | ||
"HADOOP_DIR": "[STREAMING-TODO]/path/to/folder/containing", | ||
"HUB_URL": "https://canary-zipline-orchestration-703996152583.us-central1.run.app", | ||
"JOB_MODE": "local[*]", | ||
"PARTITION_COLUMN": "ds", | ||
"PARTITION_FORMAT": "yyyy-MM-dd", | ||
"VERSION": "latest" | ||
}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Fix artifact bucket mismatch (dev → canary).
Keep artifacts consistent across env and cluster.
- "ARTIFACT_PREFIX": "gs://zipline-artifacts-dev",
+ "ARTIFACT_PREFIX": "gs://zipline-artifacts-canary",
...
- "ARTIFACT_PREFIX": "gs://zipline-artifacts-dev",
+ "ARTIFACT_PREFIX": "gs://zipline-artifacts-canary",
Also applies to: 96-115
🤖 Prompt for AI Agents
In
api/python/test/canary/compiled/staging_queries/gcp_ci/sample_staging_query.item__0
around lines 76-94 (and also apply the same change at lines 96-115), update the
artifact bucket value to match the canary environment: replace
"ARTIFACT_PREFIX": "gs://zipline-artifacts-dev" with the canary bucket (e.g.,
"gs://zipline-artifacts-canary") so artifacts are consistent with the canary
cluster; make the same replacement for any other duplicate ARTIFACT_PREFIX
occurrences in the referenced line ranges.
"ARTIFACT_PREFIX": "gs://zipline-artifacts-dev", | ||
"CHRONON_ONLINE_ARGS": " -Ztasks=4", | ||
"CHRONON_ONLINE_CLASS": "[ONLINE-TODO]your.online.class", | ||
"CLOUD_PROVIDER": "gcp", | ||
"CUSTOMER_ID": "dev", | ||
"ENABLE_PUBSUB": "true", | ||
"FLINK_STATE_URI": "gs://zipline-warehouse-canary/flink-state", | ||
"FRONTEND_URL": "https://canary-zipline-ui-703996152583.us-central1.run.app", | ||
"GCP_BIGTABLE_INSTANCE_ID": "zipline-canary-instance", | ||
"GCP_DATAPROC_CLUSTER_NAME": "zipline-canary-cluster", | ||
"GCP_PROJECT_ID": "canary-443022", | ||
"GCP_REGION": "us-central1", | ||
"HADOOP_DIR": "[STREAMING-TODO]/path/to/folder/containing", | ||
"HUB_URL": "https://canary-zipline-orchestration-703996152583.us-central1.run.app", | ||
"JOB_MODE": "local[*]", | ||
"PARTITION_COLUMN": "ds", | ||
"PARTITION_FORMAT": "yyyy-MM-dd", | ||
"VERSION": "latest" | ||
}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Standardize ARTIFACT_PREFIX to canary.
Prevent env/cluster bucket mismatch.
- "ARTIFACT_PREFIX": "gs://zipline-artifacts-dev",
+ "ARTIFACT_PREFIX": "gs://zipline-artifacts-canary",
...
- "ARTIFACT_PREFIX": "gs://zipline-artifacts-dev",
+ "ARTIFACT_PREFIX": "gs://zipline-artifacts-canary",
Also applies to: 96-115
🤖 Prompt for AI Agents
In
api/python/test/canary/compiled/staging_queries/gcp_ci/sample_staging_query.order__0
around lines 76-94 (and also apply the same change to lines 96-115), the
ARTIFACT_PREFIX is set to "gs://zipline-artifacts-dev" which mismatches the
canary environment; update ARTIFACT_PREFIX to the canary bucket (e.g.,
"gs://zipline-artifacts-canary") in both locations so the env/cluster artifact
bucket is standardized for canary.
"ARTIFACT_PREFIX": "gs://zipline-artifacts-dev", | ||
"CHRONON_ONLINE_ARGS": " -Ztasks=4", | ||
"CHRONON_ONLINE_CLASS": "[ONLINE-TODO]your.online.class", | ||
"CLOUD_PROVIDER": "gcp", | ||
"CUSTOMER_ID": "dev", | ||
"ENABLE_PUBSUB": "true", | ||
"FLINK_STATE_URI": "gs://zipline-warehouse-canary/flink-state", | ||
"FRONTEND_URL": "https://canary-zipline-ui-703996152583.us-central1.run.app", | ||
"GCP_BIGTABLE_INSTANCE_ID": "zipline-canary-instance", | ||
"GCP_DATAPROC_CLUSTER_NAME": "zipline-canary-cluster", | ||
"GCP_PROJECT_ID": "canary-443022", | ||
"GCP_REGION": "us-central1", | ||
"HADOOP_DIR": "[STREAMING-TODO]/path/to/folder/containing", | ||
"HUB_URL": "https://canary-zipline-orchestration-703996152583.us-central1.run.app", | ||
"JOB_MODE": "local[*]", | ||
"PARTITION_COLUMN": "ds", | ||
"PARTITION_FORMAT": "yyyy-MM-dd", | ||
"VERSION": "latest" | ||
}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Unify artifact buckets to canary.
Env uses -dev while Dataproc uses -canary; standardize to canary for CI stability.
- "ARTIFACT_PREFIX": "gs://zipline-artifacts-dev",
+ "ARTIFACT_PREFIX": "gs://zipline-artifacts-canary",
...
- "ARTIFACT_PREFIX": "gs://zipline-artifacts-dev",
+ "ARTIFACT_PREFIX": "gs://zipline-artifacts-canary",
Also applies to: 96-115
🤖 Prompt for AI Agents
In
api/python/test/canary/compiled/staging_queries/gcp_ci/sample_staging_query.payment__0
around lines 76-94 (also applies to lines 96-115), the ARTIFACT_PREFIX and any
other artifact bucket references use a "-dev" suffix inconsistent with Dataproc
which uses "-canary"; update all artifact/bucket environment values to use the
"-canary" bucket (e.g., change "gs://zipline-artifacts-dev" to
"gs://zipline-artifacts-canary") and ensure any other GCS paths in the indicated
ranges consistently reference the canary buckets so CI uses the same canary
storage across the file.
}, | ||
"env": { | ||
"common": { | ||
"ARTIFACT_PREFIX": "gs://zipline-artifacts-dev", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Point ARTIFACT_PREFIX to canary bucket (not dev) for CI.
Env still uses the dev artifact bucket; this will pull/push wrong artifacts in CI.
Apply:
- "ARTIFACT_PREFIX": "gs://zipline-artifacts-dev",
+ "ARTIFACT_PREFIX": "gs://zipline-artifacts-canary",
@@
- "ARTIFACT_PREFIX": "gs://zipline-artifacts-dev",
+ "ARTIFACT_PREFIX": "gs://zipline-artifacts-canary",
Run to ensure no other CI compiled artifacts still reference the dev bucket:
Also applies to: 97-97
🏁 Script executed:
#!/bin/bash
rg -n -C2 'zipline-artifacts-dev' api/python/test/canary/compiled/staging_queries/gcp_ci/
Length of output: 19977
Update ARTIFACT_PREFIX to canary bucket across all compiled CI queries
CI is still pointing at the dev bucket—tests will pull/push wrong artifacts. Replace every
"ARTIFACT_PREFIX": "gs://zipline-artifacts-dev"
→ "gs://zipline-artifacts-canary"
in both the common
and modeEnvironments.upload
blocks of all files under
api/python/test/canary/compiled/staging_queries/gcp_ci
:
- common.env.common and modeEnvironments.upload sections in
sample_staging_query.*__0
files (around lines 75–80 & 95–100) - common.env.common and modeEnvironments.upload sections in
exports.*__0
files (around lines 76 & 98)
Apply:
- "ARTIFACT_PREFIX": "gs://zipline-artifacts-dev",
+ "ARTIFACT_PREFIX": "gs://zipline-artifacts-canary",
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
"ARTIFACT_PREFIX": "gs://zipline-artifacts-dev", | |
"ARTIFACT_PREFIX": "gs://zipline-artifacts-canary", |
🤖 Prompt for AI Agents
In
api/python/test/canary/compiled/staging_queries/gcp_ci/sample_staging_query.shipping__0
around line 76 (and similarly for other files under that directory), the
ARTIFACT_PREFIX is still set to "gs://zipline-artifacts-dev"; update every
occurrence in the common.env.common block and the modeEnvironments.upload block
to "gs://zipline-artifacts-canary". Specifically, search each
sample_staging_query.*__0 and exports.*__0 file for the keys "ARTIFACT_PREFIX":
"gs://zipline-artifacts-dev" (around lines ~75–80 and ~95–100 for sample files,
~76 & ~98 for exports) and replace the value with
"gs://zipline-artifacts-canary" so both common and upload environment sections
point to the canary bucket.
"common": { | ||
"ARTIFACT_PREFIX": "gs://zipline-artifacts-dev", | ||
"CHRONON_ONLINE_ARGS": " -Ztasks=4", | ||
"CHRONON_ONLINE_CLASS": "[ONLINE-TODO]your.online.class", | ||
"CLOUD_PROVIDER": "gcp", | ||
"CUSTOMER_ID": "dev", | ||
"ENABLE_PUBSUB": "true", | ||
"FLINK_STATE_URI": "gs://zipline-warehouse-canary/flink-state", | ||
"FRONTEND_URL": "https://canary-zipline-ui-703996152583.us-central1.run.app", | ||
"GCP_BIGTABLE_INSTANCE_ID": "zipline-canary-instance", | ||
"GCP_DATAPROC_CLUSTER_NAME": "zipline-canary-cluster", | ||
"GCP_PROJECT_ID": "canary-443022", | ||
"GCP_REGION": "us-central1", | ||
"HADOOP_DIR": "[STREAMING-TODO]/path/to/folder/containing", | ||
"HUB_URL": "https://canary-zipline-orchestration-703996152583.us-central1.run.app", | ||
"JOB_MODE": "local[*]", | ||
"PARTITION_COLUMN": "ds", | ||
"PARTITION_FORMAT": "yyyy-MM-dd", | ||
"VERSION": "latest" | ||
}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Align ARTIFACT_PREFIX with Dataproc artifact_prefix (dev → canary).
Env uses gs://zipline-artifacts-dev while Dataproc metadata uses gs://zipline-artifacts-canary; avoid mixed environments.
Apply:
- "ARTIFACT_PREFIX": "gs://zipline-artifacts-dev",
+ "ARTIFACT_PREFIX": "gs://zipline-artifacts-canary",
Repeat the same change in the upload env block below.
Also applies to: 96-117
🤖 Prompt for AI Agents
In
api/python/test/canary/compiled/staging_queries/gcp_ci/sample_staging_query.v1_bigquery_import__0
around lines 76–95 (and also lines 96–117), the ARTIFACT_PREFIX is set to
gs://zipline-artifacts-dev while Dataproc metadata expects
gs://zipline-artifacts-canary; update ARTIFACT_PREFIX in the "common" block
(lines 76–95) to gs://zipline-artifacts-canary and make the identical change in
the upload env block referenced by lines 96–117 so both env blocks use
gs://zipline-artifacts-canary to avoid mixing dev and canary artifacts.
Summary
I copied gcp to gcp_ci in teams.py and the conf paths so that we can have a version of the confs that point to the cloudrun hub for CI separate from the confs that can point to the dev cloudrun. Also setup hub tests to use these new confs.
Checklist
Summary by CodeRabbit