Skip to content

Commit 02d52b4

Browse files
authored
Merge pull request #47 from RedHatProductSecurity/PSDEVOPS-4471
PSDEVOPS-4471
2 parents 25e5590 + bd425dc commit 02d52b4

File tree

5 files changed

+259
-105
lines changed

5 files changed

+259
-105
lines changed

src/trustshell/product_definitions.py

Lines changed: 53 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -210,27 +210,42 @@ def _clean_cpe(cpe: str) -> str:
210210
# Remove trailing ':' characters
211211
return cleaned_cpe.rstrip(":")
212212

213-
def extend_with_product_mappings(self, ancestor_trees: list[Node]) -> list[Node]:
214-
"""Create a new list of results with any matching streams or module as ancestors"""
213+
def extend_with_product_mappings(
214+
self, ancestor_trees: list[Node], keep_cpes: bool = False
215+
) -> None:
216+
"""Update the ancestor_trees with any matching streams or module as descendants
217+
218+
Args:
219+
ancestor_trees: List of Node trees to extend with product mappings
220+
keep_cpes: If False, replace CPE leaf nodes with product streams. If True, keep CPE nodes as parents of streams.
221+
"""
215222
if not self.product_trees:
216223
# ProdDefs service is unavailable, don't attempt any product mapping
217-
return ancestor_trees
224+
return None
225+
218226
for tree in ancestor_trees:
219227
for leaf in tree.leaves:
220228
cleaned_leaf_name = self._clean_cpe(leaf.name)
221-
leaf_with_products = self._check_streams(leaf, cleaned_leaf_name)
229+
leaf_with_products = self._check_streams(
230+
leaf, cleaned_leaf_name, keep_cpes
231+
)
222232
if not leaf_with_products:
223-
leaf_with_products = self._check_modules(leaf, cleaned_leaf_name)
233+
leaf_with_products = self._check_modules(
234+
leaf, cleaned_leaf_name, keep_cpes
235+
)
224236
if not leaf_with_products:
225237
console.print(
226238
f"Warning, didn't find any products matching {cleaned_leaf_name}",
227239
style="warning",
228240
)
229-
# the tree is modified as a side effect of the checking streams|modules
230-
# therefore we do not need to explicitly store and return it elsewhere
231-
return ancestor_trees
232-
233-
def _check_streams(self, leaf: Node, cpe: str) -> list[Node]:
241+
else:
242+
# When keep_cpes=False, we need to remove the CPE leaf from the tree
243+
# since it's been replaced by the product streams
244+
if not keep_cpes:
245+
# Remove the CPE leaf node from its parent
246+
leaf.parent = None
247+
248+
def _check_streams(self, leaf: Node, cpe: str, keep_cpes: bool) -> list[Node]:
234249
"""Check if cpe matches exactly to any ProductStreams, if it does add the CPE as a parent
235250
of the stream. If more than one stream matches, create copies of the stream and leaf"""
236251
if cpe not in self.stream_nodes_by_cpe:
@@ -240,18 +255,30 @@ def _check_streams(self, leaf: Node, cpe: str) -> list[Node]:
240255
# the original stream_nodes_by_cpe map which should be preserved incase we encounter the
241256
# same CPE twice
242257
copy_of_stream_nodes = copy.deepcopy(stream_nodes)
243-
return self._duplicate_leaves_and_set_parents(leaf, copy_of_stream_nodes)
258+
return self._duplicate_leaves_and_set_parents(
259+
leaf, copy_of_stream_nodes, keep_cpes
260+
)
244261

245-
def _check_modules(self, leaf: Node, cpe: str) -> list[Node]:
262+
def _check_modules(self, leaf: Node, cpe: str, keep_cpes: bool) -> list[Node]:
246263
"""Check if the cpe matches any ProductModule"""
247264
module_nodes = self.match_module_pattern(cpe)
248-
return self._duplicate_leaves_and_set_parents(leaf, module_nodes)
265+
return self._duplicate_leaves_and_set_parents(leaf, module_nodes, keep_cpes)
249266

250267
def _duplicate_leaves_and_set_parents(
251-
self, leaf: Node, product_nodes: list[Any]
268+
self, leaf: Node, product_nodes: list[Any], keep_cpes: bool
252269
) -> list[Node]:
253270
"""Convert product modules to their parent streams and attach all streams as children of the leaf.
254-
Deduplicates streams to avoid multiple identical children. Returns the leaf in a list."""
271+
Deduplicates streams to avoid multiple identical children.
272+
273+
Args:
274+
leaf: The leaf node to process
275+
product_nodes: List of product nodes (modules or streams)
276+
keep_cpes: If False, replace the leaf with streams in the tree. If True, set streams as children of the leaf.
277+
278+
Returns:
279+
If keep_cpes=True: Returns the leaf in a list.
280+
If keep_cpes=False: Returns the list of unique streams that replaced the leaf.
281+
"""
255282
if not product_nodes:
256283
return []
257284

@@ -274,10 +301,17 @@ def _duplicate_leaves_and_set_parents(
274301
unique_streams.append(stream)
275302
seen.add(stream)
276303

277-
for stream in unique_streams:
278-
stream.parent = leaf
279-
280-
return [leaf]
304+
if keep_cpes:
305+
# Original behavior: set streams as children of the leaf
306+
for stream in unique_streams:
307+
stream.parent = leaf
308+
return [leaf]
309+
else:
310+
# New behavior: replace the leaf with the streams in the tree
311+
# Each stream takes the place of the leaf in the tree hierarchy
312+
for stream in unique_streams:
313+
stream.parent = leaf.parent
314+
return unique_streams
281315

282316
def _add_ancestor(self, leaf: Node, product: Any) -> None:
283317
if product.parent:

src/trustshell/products.py

Lines changed: 38 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ def prime_cache(check: bool, debug: bool) -> None:
7272
is_eager=True,
7373
)
7474
@click.option("--latest", "-l", is_flag=True, default=True)
75+
@click.option("--cpes", "-c", is_flag=True, default=False)
7576
@click.option("--flaw", "-f", help="OSIDB flaw uuid or CVE")
7677
@click.option(
7778
"--replace",
@@ -85,7 +86,9 @@ def prime_cache(check: bool, debug: bool) -> None:
8586
"purl",
8687
type=click.STRING,
8788
)
88-
def search(purl: str, flaw: str, replace: bool, debug: bool, latest: bool) -> None:
89+
def search(
90+
purl: str, flaw: str, replace: bool, debug: bool, latest: bool, cpes: bool
91+
) -> None:
8992
"""Relate a purl to products in Trustify"""
9093
if not debug:
9194
config_logging(level="INFO")
@@ -104,9 +107,15 @@ def search(purl: str, flaw: str, replace: bool, debug: bool, latest: bool) -> No
104107
return
105108

106109
prod_defs = ProdDefs()
107-
ancestor_trees = prod_defs.extend_with_product_mappings(ancestor_trees)
110+
prod_defs.extend_with_product_mappings(ancestor_trees, keep_cpes=cpes)
108111

112+
seen_trees = set()
109113
for tree in ancestor_trees:
114+
_remove_duplicate_branches(tree)
115+
tree_signature = _get_branch_signature(tree.root)
116+
if tree_signature in seen_trees:
117+
continue
118+
seen_trees.add(tree_signature)
110119
render_tree(tree.root)
111120

112121
if not flaw:
@@ -141,27 +150,31 @@ def extract_affects(ancestor_trees: list[Node]) -> set[tuple[str, str]]:
141150
raise ValueError(f"More than one ProductModule found in {tree.root.name}")
142151
for ps_module_node in ps_module_nodes:
143152
for ancestor in ps_module_node.ancestors:
144-
if ancestor.name.startswith("cpe:/"):
145-
if ancestor.parent:
146-
purl = PackageURL.from_string(ancestor.parent.name)
147-
if purl.type == "oci" and "tag" in purl.qualifiers:
148-
purl.qualifiers.pop("tag")
149-
elif (
150-
purl.type == "maven"
151-
or purl.type == "generic"
152-
and PackageURL.from_string(ps_module_node.root.name).type
153-
== "maven"
154-
):
155-
# If it's a maven type or a generic one based on maven,
156-
# we set the purl to root
157-
purl = PackageURL.from_string(ps_module_node.root.name)
158-
purl_sans_version_obj = purl_sans_version(purl)
159-
affects.add(
160-
(
161-
ps_module_node.name,
162-
purl_sans_version_obj.to_string(),
163-
)
164-
)
153+
# Find the root component
154+
if ancestor.name.startswith("pkg:"):
155+
purl = PackageURL.from_string(ancestor.name)
156+
else:
157+
continue
158+
# Clean up the purl ready for use in affects
159+
if purl.type == "oci" and "tag" in purl.qualifiers:
160+
purl.qualifiers.pop("tag")
161+
elif (
162+
purl.type == "maven"
163+
or purl.type == "generic"
164+
and PackageURL.from_string(ps_module_node.root.name).type == "maven"
165+
):
166+
# If it's a maven type or a generic one based on maven,
167+
# we set the purl to root
168+
purl = PackageURL.from_string(ps_module_node.root.name)
169+
else:
170+
purl = purl_sans_version(purl)
171+
172+
affects.add(
173+
(
174+
ps_module_node.name,
175+
purl.to_string(),
176+
)
177+
)
165178
return affects
166179

167180

@@ -353,14 +366,15 @@ def _trees_with_cpes(ancestor_data: dict[str, Any]) -> list[Node]:
353366
if tree.name.startswith("pkg:rpm/"):
354367
if container_in_tree(tree):
355368
continue
369+
_remove_non_cpe_branches(tree)
356370
if not _has_cpe_node(tree):
357371
for leaf in tree.leaves:
358372
logger.debug(
359373
f"Found result {tree.name} with ancestor: {leaf.name} but no CPE parent"
360374
)
361375
else:
362376
trees_with_cpes.append(tree)
363-
return [_remove_non_cpe_branches(tree) for tree in trees_with_cpes]
377+
return trees_with_cpes
364378

365379

366380
def container_in_tree(root: Node) -> bool:

0 commit comments

Comments
 (0)