9
9
10
10
from anytree import Node , NodeMixin , LevelOrderGroupIter
11
11
from trustshell import CONFIG_DIR , console
12
+ from trustshell .rhel_releases import EnhancedProdDefs
12
13
13
14
logger = logging .getLogger (__name__ )
14
15
@@ -47,39 +48,20 @@ class ProductStream(ProductBase, NodeMixin):
47
48
def __init__ (self , name : str , cpes : list [str ] = [], active : bool = False ) -> None :
48
49
super ().__init__ (name )
49
50
# In rhel 10 we don't use mainline CPEs, so we need to filter them out
50
- if name .startswith ("rhel-" ) and not name .startswith ("rhel-10" ):
51
- self .cpes = self ._filter_rhel_mainline_cpes (cpes )
51
+ if name .startswith ("rhel-" ):
52
+ # Remove any single digit like cpe:/a:redhat:enterprise_linux:9::appstream
53
+ self .cpes = [
54
+ cpe
55
+ for cpe in cpes
56
+ if not re .search (r":redhat:enterprise_linux:\d:" , cpe )
57
+ ]
52
58
else :
53
59
self .cpes = cpes
54
60
self .active = active
55
61
56
62
def set_active (self , active : bool ) -> None :
57
63
self .active = active
58
64
59
- @staticmethod
60
- def _filter_rhel_mainline_cpes (cpes : list [str ]) -> list [str ]:
61
- """Special logic for RHEL streams is required because during the lifetime of a RHEL
62
- release it will have the main line CPE (redhat:enterprise_linux:),
63
- and also EUS/AUS/TUS CPEs depending on it's lifecycle phase. The rule here is that
64
- if a stream has both main line and EUS/AUS/TUS ignore the main line one
65
- (which is less specific)"""
66
- cpe_set = set (cpes )
67
- if len (cpe_set ) <= 1 :
68
- return cpes
69
- has_eatus = False
70
- for cpe in cpes :
71
- for us in "eus" , "aus" , "tus" , "e4s" :
72
- if f":rhel_{ us } :" in cpe :
73
- has_eatus = True
74
- break
75
- if has_eatus :
76
- break
77
- if not has_eatus :
78
- return cpes
79
- return [
80
- cpe for cpe in cpes if not re .search (r":redhat:enterprise_linux:\d:" , cpe )
81
- ]
82
-
83
65
84
66
class ProdDefs :
85
67
ETAG_FILE = os .path .join (CONFIG_DIR , "etag.txt" )
@@ -141,11 +123,32 @@ def get_product_definitions_service(cls) -> dict[str, Any]:
141
123
product_definitions = json .load (f )
142
124
return product_definitions
143
125
144
- def __init__ (self , active_only : bool = True ) -> None :
126
+ def __init__ (
127
+ self ,
128
+ active_only : bool = True ,
129
+ rhel_git_branch : str = "main" ,
130
+ rhel_releases_path : str = "" ,
131
+ ) -> None :
145
132
self .stream_nodes_by_cpe : dict [str , list [ProductStream ]] = defaultdict (list )
146
133
product_streams_by_name : dict [str , list [ProductStream ]] = defaultdict (list )
147
134
self .product_trees : list [NodeMixin ] = []
148
135
136
+ # Initialize enhanced RHEL release data if requested
137
+ self .enhanced_proddefs : Optional [EnhancedProdDefs ] = None
138
+ if rhel_releases_path :
139
+ # Use local file for testing
140
+ try :
141
+ self .enhanced_proddefs = EnhancedProdDefs (
142
+ git_branch = rhel_git_branch , rhel_releases_path = rhel_releases_path
143
+ )
144
+ except Exception as e :
145
+ logger .warning (
146
+ f"Could not initialize enhanced product definitions: { e } "
147
+ )
148
+ self .enhanced_proddefs = None
149
+ else :
150
+ self .enhanced_proddefs = self ._enable_rhel_release_data (rhel_git_branch )
151
+
149
152
data = self .get_product_definitions_service ()
150
153
151
154
if not data :
@@ -201,6 +204,36 @@ def match_module_pattern(self, cpe: str) -> list[ProductModule]:
201
204
module_matches .append (module )
202
205
return module_matches
203
206
207
+ def _enable_rhel_release_data (
208
+ self , git_branch : str = "main" , rhel_releases_path : str = ""
209
+ ) -> Optional [EnhancedProdDefs ]:
210
+ """
211
+ Enable RHEL release data fetching from GitLab repository or local file.
212
+
213
+ Args:
214
+ git_branch: Git branch to use for fetching data
215
+ rhel_releases_path: Local file path (for testing)
216
+
217
+ Returns:
218
+ EnhancedProdDefs instance or None if failed
219
+ """
220
+ try :
221
+ enhanced_proddefs = EnhancedProdDefs (
222
+ git_branch = git_branch , rhel_releases_path = rhel_releases_path
223
+ )
224
+ if rhel_releases_path :
225
+ logger .info (
226
+ f"Enabled RHEL release data from local file: { rhel_releases_path } "
227
+ )
228
+ else :
229
+ logger .info (
230
+ f"Enabled RHEL release data from GitLab (branch: { git_branch } )"
231
+ )
232
+ return enhanced_proddefs
233
+ except Exception as e :
234
+ logger .error (f"Could not enable RHEL release data: { e } " )
235
+ return None
236
+
204
237
@staticmethod
205
238
def _clean_cpe (cpe : str ) -> str :
206
239
"""CPEs from SBOMs have extra characters added to them, clean them up here
@@ -226,6 +259,13 @@ def extend_with_product_mappings(
226
259
for tree in ancestor_trees :
227
260
for leaf in tree .leaves :
228
261
cleaned_leaf_name = self ._clean_cpe (leaf .name )
262
+ # Don't try and match single digit enterprise_linux CPEs
263
+ # Filter out single digit enterprise_linux CPEs (only when not keeping CPEs)
264
+ if not keep_cpes and re .search (
265
+ r":redhat:enterprise_linux:\d:" , cleaned_leaf_name
266
+ ):
267
+ leaf .parent = None
268
+ continue
229
269
leaf_with_products = self ._check_streams (
230
270
leaf , cleaned_leaf_name , keep_cpes
231
271
)
@@ -248,6 +288,13 @@ def extend_with_product_mappings(
248
288
def _check_streams (self , leaf : Node , cpe : str , keep_cpes : bool ) -> list [Node ]:
249
289
"""Check if cpe matches exactly to any ProductStreams, if it does add the CPE as a parent
250
290
of the stream. If more than one stream matches, create copies of the stream and leaf"""
291
+ # First try enhanced matching if RHEL release data is available
292
+ enhanced_streams = self ._check_enhanced_streams (cpe )
293
+ if enhanced_streams :
294
+ return self ._duplicate_leaves_and_set_parents (
295
+ leaf , enhanced_streams , keep_cpes
296
+ )
297
+ # Fallback to original direct matching
251
298
if cpe not in self .stream_nodes_by_cpe :
252
299
return []
253
300
stream_nodes = self .stream_nodes_by_cpe [cpe ]
@@ -259,6 +306,39 @@ def _check_streams(self, leaf: Node, cpe: str, keep_cpes: bool) -> list[Node]:
259
306
leaf , copy_of_stream_nodes , keep_cpes
260
307
)
261
308
309
+ def _check_enhanced_streams (self , cpe : str ) -> list [ProductStream ]:
310
+ """Check if CPE matches using enhanced RHEL release data logic."""
311
+ if not self .enhanced_proddefs :
312
+ return []
313
+
314
+ # Get active streams and their CPEs
315
+ active_streams = set ()
316
+ stream_cpes = {}
317
+
318
+ for stream_name , stream_nodes in self .stream_nodes_by_cpe .items ():
319
+ for stream_node in stream_nodes :
320
+ if stream_node .active :
321
+ active_streams .add (stream_node .name )
322
+ if stream_node .name not in stream_cpes :
323
+ stream_cpes [stream_node .name ] = []
324
+ stream_cpes [stream_node .name ].extend (stream_node .cpes )
325
+
326
+ # Use enhanced matching to find relevant active streams
327
+ matching_stream_names = self .enhanced_proddefs .enhance_cpe_matching (
328
+ cpe , active_streams , stream_cpes
329
+ )
330
+
331
+ # Return the actual ProductStream objects for the matching streams
332
+ result_streams = []
333
+ for stream_name in matching_stream_names :
334
+ # Find ProductStream objects with this name
335
+ for stream_nodes in self .stream_nodes_by_cpe .values ():
336
+ for stream_node in stream_nodes :
337
+ if stream_node .name == stream_name and stream_node .active :
338
+ result_streams .append (copy .deepcopy (stream_node ))
339
+
340
+ return result_streams
341
+
262
342
def _check_modules (self , leaf : Node , cpe : str , keep_cpes : bool ) -> list [Node ]:
263
343
"""Check if the cpe matches any ProductModule"""
264
344
module_nodes = self .match_module_pattern (cpe )
@@ -280,7 +360,11 @@ def _duplicate_leaves_and_set_parents(
280
360
If keep_cpes=False: Returns the list of unique streams that replaced the leaf.
281
361
"""
282
362
if not product_nodes :
283
- return []
363
+ if keep_cpes :
364
+ # Keep the CPE node even if no products match
365
+ return [leaf ]
366
+ else :
367
+ return []
284
368
285
369
# Convert modules to their parent streams
286
370
streams_to_attach : list [Any ] = []
@@ -313,6 +397,37 @@ def _duplicate_leaves_and_set_parents(
313
397
stream .parent = leaf .parent
314
398
return unique_streams
315
399
400
+ def get_all_cpes_for_rhel_stream (self , stream_name : str ) -> set [str ]:
401
+ """
402
+ Get all CPEs that should be associated with a given RHEL stream by traversing
403
+ the RHEL release graph to include related releases.
404
+
405
+ Args:
406
+ stream_name: The ps_update_stream name (e.g., "rhel-9.2.0.z")
407
+
408
+ Returns:
409
+ Set of all CPEs that should be associated with this stream
410
+ """
411
+ if not self .enhanced_proddefs :
412
+ # Fallback to direct stream CPEs only
413
+ result = set ()
414
+ for cpe , stream_nodes in self .stream_nodes_by_cpe .items ():
415
+ for stream_node in stream_nodes :
416
+ if stream_node .name == stream_name :
417
+ result .update (stream_node .cpes )
418
+ return result
419
+
420
+ # Get stream CPEs mapping
421
+ stream_cpes = {}
422
+ for cpe , stream_nodes in self .stream_nodes_by_cpe .items ():
423
+ for stream_node in stream_nodes :
424
+ if stream_node .name not in stream_cpes :
425
+ stream_cpes [stream_node .name ] = []
426
+ stream_cpes [stream_node .name ].extend (stream_node .cpes )
427
+
428
+ # Use enhanced matching to get all related CPEs
429
+ return self .enhanced_proddefs .get_all_cpes_for_stream (stream_name , stream_cpes )
430
+
316
431
def _add_ancestor (self , leaf : Node , product : Any ) -> None :
317
432
if product .parent :
318
433
product .parent .parent = leaf
0 commit comments