Skip to content

[source-google-ads] : Implement hybrid customer partition router #64559

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,50 @@ class GoogleAdsHttpRequester(HttpRequester):
"""

schema_loader: InlineSchemaLoader = None

def get_url_base(
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> str:
return "https://googleads.googleapis.com/v20/"

def get_path(
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> str:
"""
Construct the path for Google Ads API endpoint.
Handles both HybridCustomerPartitionRouter and SubstreamPartitionRouter scenarios.
"""
# Try to get customer_id from different possible locations in stream_slice
customer_id = None

# First try direct access (for HybridCustomerPartitionRouter)
if hasattr(stream_slice, 'partition') and stream_slice.partition:
customer_id = stream_slice.partition.get("customer_id")

# Fallback to direct attribute access
if not customer_id and hasattr(stream_slice, 'customer_id'):
customer_id = stream_slice.customer_id

# Another fallback to dict-style access
if not customer_id:
customer_id = stream_slice.get("customer_id")

if not customer_id:
raise ValueError(f"customer_id not found in stream_slice. Available keys: {list(stream_slice.__dict__.keys()) if hasattr(stream_slice, '__dict__') else 'N/A'}")

# Extract just the customer ID if it's in resource name format (e.g., "customers/1234567890")
if isinstance(customer_id, str) and "/" in customer_id:
customer_id = customer_id.split("/")[-1]

return f"customers/{customer_id}/googleAds:searchStream"

def get_request_body_json(
self,
Expand Down Expand Up @@ -654,3 +698,110 @@ def migrate(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]:
return stream_state

return {"parent_state": stream_state}


@dataclass
class HybridCustomerPartitionRouter(SubstreamPartitionRouter):
"""
Hybrid partition router that:
- Uses customer_id directly from config when provided
- Falls back to customer_client stream partitioning when no customer_ids are provided
"""

config: Config
parameters: Mapping[str, Any]
parent_stream_configs: List[Mapping[str, Any]]

def __post_init__(self, parameters: Mapping[str, Any] = None) -> None:
super().__post_init__(parameters)


def stream_slices(self) -> Iterable[StreamSlice]:
"""
Generate stream slices directly from customer_ids in config when available,
otherwise fall back to existing customer_client stream partitioning behavior
"""
# customer_id is converted to customer_ids array
customer_ids = self.config.get("customer_ids", [])
if customer_ids:
# Direct approach: use customer_ids from config
login_customer_id = self.config.get("login_customer_id")
if login_customer_id:
for customer_id in customer_ids:
slice = StreamSlice(
partition={
"customer_id": customer_id,
"parent_slice": {
"customer_id": login_customer_id or customer_id,
"parent_slice": {}
}
},
cursor_slice={},
extra_fields={"manager": False}
)
yield slice
else:
raise ValueError(f"login_customer_id not found in config.")
else:
logger.info("No customer_ids provided in config — falling back to parent stream logic.")
# Ensure that the parent stream configurations are properly utilized
if not self.parent_stream_configs:
logger.error("Parent stream configs not provided.")
raise ValueError("Parent stream configs must be provided for fallback partitioning.")

# Fallback to parent stream partitioning logic
try:
# This leverages the `stream_slices` method from the `SubstreamPartitionRouter` base class
yield from super().stream_slices() # Will use the parent stream's partitioning
except Exception as e:
logger.error(f"Error during fallback partitioning: {e}")
raise


def get_stream_state(self) -> Optional[Mapping[str, Any]]:
"""Required abstract method - return None for stateless partition router"""
return None

def set_initial_state(self, stream_state: StreamState) -> None:
"""Required abstract method - no-op for stateless partition router"""
pass

def get_request_headers(
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
"""Return empty headers - headers are handled by the HTTP requester"""
return {}

def get_request_params(
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
"""Return empty params - params are handled by the HTTP requester"""
return {}

def get_request_body_data(
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
"""Return empty body data - body data is handled by the HTTP requester"""
return {}

def get_request_body_json(
self,
*,
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
next_page_token: Optional[Mapping[str, Any]] = None,
) -> Mapping[str, Any]:
"""Return empty body JSON - body JSON is handled by the HTTP requester"""
return {}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ definitions:
$ref: "#/schemas"
authenticator:
$ref: "#/definitions/authenticator"
url_base: "https://googleads.googleapis.com/v20/{{ stream_partition['customer_id'] }}/googleAds:search"
url_base: "https://googleads.googleapis.com/v20/"
http_method: POST
error_handler:
$ref: "#/definitions/base_error_handler"
Expand Down Expand Up @@ -111,7 +111,8 @@ definitions:
type: CustomSchemaNormalization
class_name: source_google_ads.components.DoubleQuotedDictTypeTransformer
partition_router:
type: SubstreamPartitionRouter
type: CustomPartitionRouter
class_name: "source_google_ads.components.HybridCustomerPartitionRouter"
parent_stream_configs:
- type: ParentStreamConfig
stream: "#/definitions/customer_client"
Expand Down Expand Up @@ -142,7 +143,8 @@ definitions:
type: CustomSchemaNormalization
class_name: source_google_ads.components.DoubleQuotedDictTypeTransformer
partition_router:
type: SubstreamPartitionRouter
type: CustomPartitionRouter
class_name: "source_google_ads.components.HybridCustomerPartitionRouter"
parent_stream_configs:
- type: ParentStreamConfig
stream: "#/definitions/customer_client"
Expand Down Expand Up @@ -179,7 +181,8 @@ definitions:
retriever:
$ref: "#/definitions/incremental_stream_base/retriever"
partition_router:
type: SubstreamPartitionRouter
type: CustomPartitionRouter
class_name: "source_google_ads.components.HybridCustomerPartitionRouter"
parent_stream_configs:
- type: ParentStreamConfig
stream: "#/definitions/customer_client_non_manager"
Expand All @@ -201,7 +204,7 @@ definitions:
$ref: "#/schemas"
authenticator:
$ref: "#/definitions/authenticator"
url_base: "https://googleads.googleapis.com/v20/{{ stream_partition['customer_id'] }}/googleAds:search"
url_base: "https://googleads.googleapis.com/v20/customers/{{ stream_partition['customer_id'] }}/googleAds:searchStream"
http_method: POST
error_handler:
$ref: "#/definitions/base_error_handler"
Expand Down Expand Up @@ -239,6 +242,7 @@ definitions:
$ref: "#/schemas"
authenticator:
$ref: "#/definitions/authenticator"
url_base: "https://googleads.googleapis.com/v20/customers/{{ stream_partition['parent_slice']['customer_id'] }}/googleAds:searchStream"
http_method: POST
error_handler:
$ref: "#/definitions/base_error_handler"
Expand Down Expand Up @@ -334,7 +338,8 @@ definitions:
type: CustomRecordFilter
class_name: "source_google_ads.components.CustomerClientFilter"
partition_router:
type: SubstreamPartitionRouter
type: CustomPartitionRouter
class_name: "source_google_ads.components.HybridCustomerPartitionRouter"
parent_stream_configs:
- type: ParentStreamConfig
stream: "#/definitions/accessible_accounts"
Expand Down Expand Up @@ -373,7 +378,8 @@ definitions:
type: CustomRecordFilter
class_name: "source_google_ads.components.CustomerClientFilter"
partition_router:
type: SubstreamPartitionRouter
type: CustomPartitionRouter
class_name: "source_google_ads.components.HybridCustomerPartitionRouter"
parent_stream_configs:
- type: ParentStreamConfig
stream: "#/definitions/accessible_accounts"
Expand Down Expand Up @@ -791,6 +797,7 @@ definitions:
$ref: "#/schemas"
authenticator:
$ref: "#/definitions/authenticator"
url_base: "https://googleads.googleapis.com/v20/customers/{{ stream_partition['customer_id'] }}/googleAds:searchStream"
http_method: POST
error_handler:
$ref: "#/definitions/base_error_handler"
Expand All @@ -809,7 +816,8 @@ definitions:
type: CustomSchemaNormalization
class_name: source_google_ads.components.DoubleQuotedDictTypeTransformer
partition_router:
type: SubstreamPartitionRouter
type: CustomPartitionRouter
class_name: "source_google_ads.components.HybridCustomerPartitionRouter"
parent_stream_configs:
- type: ParentStreamConfig
stream: "#/definitions/customer_client"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,27 @@ def _get_all_connected_accounts(
yield from self.get_all_accounts(google_api, dummy_customers, customer_status_filter)

def get_customers(self, google_api: GoogleAds, config: Mapping[str, Any]) -> List[CustomerModel]:
customer_status_filter = config.get("customer_status_filter", [])
accounts = self._get_all_connected_accounts(google_api, customer_status_filter)

# filter only selected accounts
if config.get("customer_ids"):
return CustomerModel.from_accounts_by_id(accounts, config["customer_ids"])
customer_ids = config.get("customer_ids", [])

# If customer_ids are provided, use them directly
if customer_ids:
customers = []
login_customer_id = config.get("login_customer_id")
if login_customer_id:
for customer_id in customer_ids:
customers.append(CustomerModel(
id=customer_id,
login_customer_id=login_customer_id,
is_manager_account=False
))
return customers
else:
raise ValueError(f"login_customer_id not found in config.")

# all unique accounts
customer_status_filter = config.get("customer_status_filter", [])
# Fallback: get all connected accounts
accounts = self._get_all_connected_accounts(google_api, customer_status_filter)
return CustomerModel.from_accounts(accounts)

@staticmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,23 @@
"enum": ["UNKNOWN", "ENABLED", "CANCELED", "SUSPENDED", "CLOSED"]
}
},
"login_customer_id": {
"type": "string",
"title": "Login Customer ID for Managed Accounts",
"description": "If your access to the customer account is through a manager account, this field is required, and must be set to the 10-digit customer ID of the manager account. For more information about this field, refer to <a href=\"https://developers.google.com/google-ads/api/docs/concepts/call-structure#cid\">Google's documentation</a>.",
"pattern_descriptor": ": 10 digits, with no dashes.",
"pattern": "^([0-9]{10})?$",
"examples": ["7349206847"],
"order": 3
},
"start_date": {
"type": "string",
"title": "Start Date",
"description": "UTC date in the format YYYY-MM-DD. Any data before this date will not be replicated. (Default value of two years ago is used if not set)",
"pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}$",
"pattern_descriptor": "YYYY-MM-DD",
"examples": ["2017-01-25"],
"order": 3,
"order": 4,
"format": "date"
},
"end_date": {
Expand All @@ -93,14 +102,14 @@
"pattern": "^$|^[0-9]{4}-[0-9]{2}-[0-9]{2}$",
"pattern_descriptor": "YYYY-MM-DD",
"examples": ["2017-01-30"],
"order": 4,
"order": 5,
"format": "date"
},
"custom_queries_array": {
"type": "array",
"title": "Custom GAQL Queries",
"description": "",
"order": 5,
"order": 6,
"items": {
"type": "object",
"required": ["query", "table_name"],
Expand Down Expand Up @@ -130,7 +139,7 @@
"maximum": 1095,
"default": 14,
"examples": [14],
"order": 6
"order": 7
}
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def test_query_shopping_performance_view_stream(customers, config, requests_mock

request_history = requests_mock.register_uri(
"POST",
"https://googleads.googleapis.com/v20/customers/123/googleAds:search",
"https://googleads.googleapis.com/v20/customers/123/googleAds:searchStream",
shopping_performance_view_response,
)

Expand Down
Loading