|
| 1 | +""" |
| 2 | +Mapper class for converting ResearchhubUnifiedDocument to AWS Personalize items. |
| 3 | +""" |
| 4 | + |
| 5 | +from typing import Dict, Optional, Protocol, runtime_checkable |
| 6 | + |
| 7 | +from analytics.constants.personalize_constants import ( |
| 8 | + BLUESKY_COUNT_TOTAL, |
| 9 | + BOUNTY_HAS_SOLUTIONS, |
| 10 | + CITATION_COUNT_TOTAL, |
| 11 | + CREATION_TIMESTAMP, |
| 12 | + DELIMITER, |
| 13 | + FIELD_DEFAULTS, |
| 14 | + HAS_ACTIVE_BOUNTY, |
| 15 | + HUB_IDS, |
| 16 | + HUB_L1, |
| 17 | + HUB_L2, |
| 18 | + ITEM_ID, |
| 19 | + ITEM_TYPE, |
| 20 | + ITEM_TYPE_MAPPING, |
| 21 | + PEER_REVIEW_COUNT_TOTAL, |
| 22 | + PROPOSAL_HAS_FUNDERS, |
| 23 | + PROPOSAL_IS_OPEN, |
| 24 | + RFP_HAS_APPLICANTS, |
| 25 | + RFP_IS_OPEN, |
| 26 | + TEXT, |
| 27 | + TITLE, |
| 28 | + TWEET_COUNT_TOTAL, |
| 29 | + UPVOTE_SCORE, |
| 30 | +) |
| 31 | +from analytics.utils.personalize_item_utils import prepare_text_for_personalize |
| 32 | +from utils.time import datetime_to_epoch_seconds |
| 33 | + |
| 34 | + |
| 35 | +@runtime_checkable |
| 36 | +class PrefetchedUnifiedDocument(Protocol): |
| 37 | + """ |
| 38 | + UnifiedDocument with required prefetched relations. |
| 39 | +
|
| 40 | + Required prefetch_related: |
| 41 | + - hubs |
| 42 | + - fundraises, related_bounties, grants |
| 43 | + """ |
| 44 | + |
| 45 | + id: int |
| 46 | + document_type: str |
| 47 | + score: int |
| 48 | + |
| 49 | + |
| 50 | +class PersonalizeItemMapper: |
| 51 | + """Mapper for converting ResearchHub documents to Personalize items.""" |
| 52 | + |
| 53 | + def map_to_item( |
| 54 | + self, |
| 55 | + prefetched_doc: PrefetchedUnifiedDocument, |
| 56 | + bounty_data: dict, |
| 57 | + proposal_data: dict, |
| 58 | + rfp_data: dict, |
| 59 | + review_count_data: dict, |
| 60 | + ) -> Dict[str, Optional[str]]: |
| 61 | + """ |
| 62 | + Map a prefetched ResearchhubUnifiedDocument to a Personalize item dictionary. |
| 63 | +
|
| 64 | + Args: |
| 65 | + prefetched_doc: UnifiedDocument with prefetched relations |
| 66 | + bounty_data: Dict with has_active_bounty and has_solutions flags |
| 67 | + proposal_data: Dict with is_open and has_funders flags |
| 68 | + rfp_data: Dict with is_open and has_applicants flags |
| 69 | + review_count_data: Dict mapping doc_id to review count |
| 70 | +
|
| 71 | + Returns: |
| 72 | + Dictionary with keys matching CSV_HEADERS |
| 73 | + """ |
| 74 | + # Initialize row with default values from constants |
| 75 | + row = {field: default for field, default in FIELD_DEFAULTS.items()} |
| 76 | + |
| 77 | + # Get the concrete document from prefetched data (avoids N+1 queries) |
| 78 | + if prefetched_doc.document_type == "PAPER": |
| 79 | + # For papers, use select_related paper (no query) |
| 80 | + document = prefetched_doc.paper |
| 81 | + if not document: |
| 82 | + raise ValueError(f"Paper not found for unified_doc {prefetched_doc.id}") |
| 83 | + else: |
| 84 | + # For posts, get from prefetched posts (no query) |
| 85 | + # Access the prefetch cache directly to avoid posts.first() query |
| 86 | + posts = prefetched_doc.posts.all() |
| 87 | + if not posts: |
| 88 | + raise ValueError(f"Post not found for unified_doc {prefetched_doc.id}") |
| 89 | + document = posts[0] # Get first from cached list |
| 90 | + |
| 91 | + # Map common fields |
| 92 | + row.update(self._map_common_fields(prefetched_doc, document)) |
| 93 | + |
| 94 | + # Map document-type-specific fields |
| 95 | + if prefetched_doc.document_type == "PAPER": |
| 96 | + row.update(self._map_paper_fields(prefetched_doc, document)) |
| 97 | + else: |
| 98 | + row.update(self._map_post_fields(prefetched_doc, document)) |
| 99 | + |
| 100 | + # Add batch-fetched metrics |
| 101 | + row.update( |
| 102 | + { |
| 103 | + HAS_ACTIVE_BOUNTY: bounty_data.get("has_active_bounty", False), |
| 104 | + BOUNTY_HAS_SOLUTIONS: bounty_data.get("has_solutions", False), |
| 105 | + PROPOSAL_IS_OPEN: proposal_data.get("is_open", False), |
| 106 | + PROPOSAL_HAS_FUNDERS: proposal_data.get("has_funders", False), |
| 107 | + RFP_IS_OPEN: rfp_data.get("is_open", False), |
| 108 | + RFP_HAS_APPLICANTS: rfp_data.get("has_applicants", False), |
| 109 | + PEER_REVIEW_COUNT_TOTAL: review_count_data.get(prefetched_doc.id, 0), |
| 110 | + } |
| 111 | + ) |
| 112 | + |
| 113 | + return row |
| 114 | + |
| 115 | + def _map_common_fields( |
| 116 | + self, prefetched_doc: PrefetchedUnifiedDocument, document |
| 117 | + ) -> dict: |
| 118 | + """Map fields common to all document types using prefetched data.""" |
| 119 | + from hub.models import Hub |
| 120 | + |
| 121 | + # Timestamp |
| 122 | + if ( |
| 123 | + prefetched_doc.document_type == "PAPER" |
| 124 | + and hasattr(document, "paper_publish_date") |
| 125 | + and document.paper_publish_date |
| 126 | + ): |
| 127 | + timestamp = datetime_to_epoch_seconds(document.paper_publish_date) |
| 128 | + else: |
| 129 | + timestamp = datetime_to_epoch_seconds(prefetched_doc.created_date) |
| 130 | + |
| 131 | + # Hub processing |
| 132 | + from analytics.constants.personalize_constants import MAX_HUB_IDS |
| 133 | + |
| 134 | + hub_ids = [] |
| 135 | + hub_l1 = None |
| 136 | + hub_l2 = None |
| 137 | + |
| 138 | + for hub in list(prefetched_doc.hubs.all())[:MAX_HUB_IDS]: |
| 139 | + hub_ids.append(str(hub.id)) |
| 140 | + if hub.namespace == Hub.Namespace.CATEGORY: |
| 141 | + hub_l1 = str(hub.id) |
| 142 | + elif hub.namespace == Hub.Namespace.SUBCATEGORY: |
| 143 | + hub_l2 = str(hub.id) |
| 144 | + |
| 145 | + return { |
| 146 | + ITEM_ID: str(prefetched_doc.id), |
| 147 | + ITEM_TYPE: ITEM_TYPE_MAPPING.get( |
| 148 | + prefetched_doc.document_type, prefetched_doc.document_type |
| 149 | + ), |
| 150 | + CREATION_TIMESTAMP: timestamp, |
| 151 | + UPVOTE_SCORE: ( |
| 152 | + prefetched_doc.score if prefetched_doc.score is not None else 0 |
| 153 | + ), |
| 154 | + HUB_L1: hub_l1, |
| 155 | + HUB_L2: hub_l2, |
| 156 | + HUB_IDS: DELIMITER.join(hub_ids) if hub_ids else None, |
| 157 | + } |
| 158 | + |
| 159 | + def _map_paper_fields( |
| 160 | + self, prefetched_doc: PrefetchedUnifiedDocument, paper |
| 161 | + ) -> dict: |
| 162 | + """Map paper-specific fields.""" |
| 163 | + title = paper.paper_title or paper.title or "" |
| 164 | + abstract = paper.abstract or "" |
| 165 | + # Build hub names from prefetched hubs to avoid query |
| 166 | + hub_names = ",".join(hub.name for hub in prefetched_doc.hubs.all()) |
| 167 | + |
| 168 | + text_concat = f"{title} {abstract} {hub_names}" |
| 169 | + |
| 170 | + fields = { |
| 171 | + TITLE: prepare_text_for_personalize(title), |
| 172 | + TEXT: prepare_text_for_personalize(text_concat), |
| 173 | + CITATION_COUNT_TOTAL: paper.citations if paper.citations is not None else 0, |
| 174 | + } |
| 175 | + |
| 176 | + if paper.external_metadata: |
| 177 | + metrics = paper.external_metadata.get("metrics", {}) |
| 178 | + fields[BLUESKY_COUNT_TOTAL] = metrics.get("bluesky_count", 0) |
| 179 | + fields[TWEET_COUNT_TOTAL] = metrics.get("twitter_count", 0) |
| 180 | + |
| 181 | + return fields |
| 182 | + |
| 183 | + def _map_post_fields(self, prefetched_doc: PrefetchedUnifiedDocument, post) -> dict: |
| 184 | + """Map post-specific fields.""" |
| 185 | + title = post.title or "" |
| 186 | + renderable_text = post.renderable_text or "" |
| 187 | + # Build hub names from prefetched hubs to avoid query |
| 188 | + hub_names = ",".join(hub.name for hub in prefetched_doc.hubs.all()) |
| 189 | + |
| 190 | + text_concat = f"{title} {renderable_text} {hub_names}" |
| 191 | + |
| 192 | + return { |
| 193 | + TITLE: prepare_text_for_personalize(title), |
| 194 | + TEXT: prepare_text_for_personalize(text_concat), |
| 195 | + } |
0 commit comments