Skip to content

Commit 594958b

Browse files
committed
match cpes to streams
1 parent 2f843b5 commit 594958b

File tree

6 files changed

+603
-11
lines changed

6 files changed

+603
-11
lines changed

README.md

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@ Atlas Stage:
2525
`export TRUSTIFY_URL="https://atlas.release.stage.devshift.net"`
2626
`export AUTH_ENDPOINT="https://auth.stage.redhat.com/auth/realms/EmployeeIDP/protocol/openid-connect"`
2727

28+
Product Mapping:
29+
`export PRODDEFS_URL="https://prodsec.pages.example.com/product-definitions/products.json"`
30+
`export SSL_CERT_FILE=/etc/pki/tls/certs/ca-bundle.crt`
31+
2832
## Usage
2933

3034
### Find matching PackageURLs in Trustify:
@@ -65,4 +69,10 @@ sbom_count: 673
6569
Priming graph ...
6670
```
6771

68-
It can also be run with `--check` to see the graph and sbom counts without actually priming the garph cache.
72+
It can also be run with `--check` to see the graph and sbom counts without actually priming the garph cache.
73+
74+
### CPE to product mapping
75+
76+
It's possible to map CPEs to products using product metadata as demonstrated in the docs/product-definitions.json file. This allows integration with a bug tracking system like Jira.
77+
78+
The way this mapping works is to match against a ps_update_steam if such a map exists. If not, we try to match against ps_modules.

src/trustshell/product_definitions.py

Lines changed: 258 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,258 @@
1+
from collections import defaultdict
2+
import copy
3+
import json
4+
import logging
5+
import os
6+
import re
7+
import httpx
8+
9+
from anytree import Node, NodeMixin, LevelOrderGroupIter
10+
from trustshell import CONFIG_DIR, console
11+
12+
logger = logging.getLogger(__name__)
13+
14+
15+
class ProductBase(object):
16+
def __init__(self, name):
17+
self.name = name
18+
19+
def __hash__(self):
20+
return hash(self.label)
21+
22+
def __eq__(self, other):
23+
if isinstance(other, ProductBase) and type(self) is type(other):
24+
return self.label == other.label()
25+
return False
26+
27+
28+
class ProductModule(ProductBase, NodeMixin):
29+
def __init__(self, name, cpe_patterns):
30+
super().__init__(name)
31+
self.cpe_patterns = [
32+
pattern.replace(".", "\\.").replace("*", ".*") for pattern in cpe_patterns
33+
]
34+
35+
def match(self, cpe) -> bool:
36+
# First try to match exactly, then substring
37+
for suffix in ("$", ""):
38+
# We must try in descending-length order so that X:10 matches before X:1.
39+
for regex in sorted(self.cpe_patterns, key=len, reverse=True):
40+
if re.match(regex + suffix, cpe):
41+
return True
42+
return False
43+
44+
45+
class ProductStream(ProductBase, NodeMixin):
46+
def __init__(self, name: str, cpes: list[str] = [], active=False):
47+
super().__init__(name)
48+
self.cpes = self._filter_rhel_mainline_cpes(cpes)
49+
self.active = active
50+
51+
def set_active(self, active: bool):
52+
self.active = active
53+
54+
@staticmethod
55+
def _filter_rhel_mainline_cpes(cpes: list[str]) -> list[str]:
56+
"""Special logic for RHEL streams is required because during the lifetime of a RHEL
57+
release it will have the main line CPE (redhat:enterprise_linux:),
58+
and also EUS/AUS/TUS CPEs depending on it's lifecycle phase. The rule here is that
59+
if a stream has both main line and EUS/AUS/TUS ignore the main line one
60+
(which is less specific)"""
61+
cpe_set = set(cpes)
62+
if len(cpe_set) <= 1:
63+
return cpes
64+
has_eatus = False
65+
for cpe in cpes:
66+
for us in "eus", "aus", "tus", "e4s":
67+
if f":rhel_{us}:" in cpe:
68+
has_eatus = True
69+
break
70+
if has_eatus:
71+
break
72+
if not has_eatus:
73+
return cpes
74+
return [cpe for cpe in cpes if ":redhat:enterprise_linux:" not in cpe]
75+
76+
77+
class ProdDefs:
78+
ETAG_FILE = os.path.join(CONFIG_DIR, "etag.txt")
79+
PRODUCT_FILE = os.path.join(CONFIG_DIR, "products.json")
80+
81+
@classmethod
82+
def get_etag(cls, url: str):
83+
response = httpx.head(url)
84+
return response.headers.get("etag")
85+
86+
# Assisted by watsonx Code Assistant
87+
@classmethod
88+
def persist_etag(cls, etag: str, file_path: str):
89+
with open(file_path, "w") as f:
90+
f.write(etag)
91+
92+
# Assisted by watsonx Code Assistant
93+
@classmethod
94+
def load_etag(cls, file_path: str):
95+
if os.path.exists(file_path):
96+
with open(file_path, "r") as f:
97+
return f.read().strip()
98+
return None
99+
100+
# Assisted by watsonx Code Assistant
101+
@classmethod
102+
def load_product_definitions(cls, url: str, file_path: str):
103+
response = httpx.get(url)
104+
with open(file_path, "w") as f:
105+
f.write(response.text)
106+
107+
@classmethod
108+
def get_product_definitions_service(cls) -> dict:
109+
proddefs_url = None
110+
if "PRODDEFS_URL" not in os.environ:
111+
console.print(
112+
"PRODDEFS_URL not set, not product mappings will be available",
113+
style="warning",
114+
)
115+
return {}
116+
else:
117+
proddefs_url = os.getenv("PRODDEFS_URL")
118+
119+
etag = cls.load_etag(cls.ETAG_FILE)
120+
url_etag = cls.get_etag(proddefs_url)
121+
122+
if etag == url_etag:
123+
with open(cls.PRODUCT_FILE, "r") as f:
124+
product_definitions = json.load(f)
125+
else:
126+
cls.load_product_definitions(proddefs_url, cls.PRODUCT_FILE)
127+
cls.persist_etag(url_etag, cls.ETAG_FILE)
128+
with open(cls.PRODUCT_FILE, "r") as f:
129+
product_definitions = json.load(f)
130+
return product_definitions
131+
132+
def __init__(self, active_only: bool = True):
133+
self.stream_nodes_by_cpe = defaultdict(list)
134+
product_streams_by_name = defaultdict(list)
135+
self.product_trees: list[NodeMixin] = []
136+
137+
data = self.get_product_definitions_service()
138+
139+
if not data:
140+
return
141+
142+
for ps_update_stream, stream_data in data["ps_update_streams"].items():
143+
cpes = stream_data.get("cpe", [])
144+
stream_node = ProductStream(ps_update_stream, cpes)
145+
product_streams_by_name[ps_update_stream].append(stream_node)
146+
for cpe in cpes:
147+
# We need this check because RHEL mainline CPEs are filtered out
148+
if cpe in stream_node.cpes:
149+
self.stream_nodes_by_cpe[cpe].append(stream_node)
150+
151+
seen_stream_names: set[str] = set()
152+
for ps_module, module_data in data["ps_modules"].items():
153+
cpes = module_data.get("cpe", [])
154+
155+
active_streams: set[str] = set()
156+
active_streams.update(module_data.get("active_ps_update_streams", []))
157+
for stream in module_data.get("ps_update_streams"):
158+
for stream_node in product_streams_by_name[stream]:
159+
if stream in active_streams:
160+
stream_node.set_active(True)
161+
elif active_only:
162+
if stream in self.stream_nodes_by_cpe:
163+
# The stream is not active in the module, and we only want active streams
164+
# Therefore lets remove this stream from the product_streams_by_cpe map
165+
del self.stream_nodes_by_cpe[stream]
166+
# don't add the stream to the product_trees
167+
continue
168+
self._check_stream_name(seen_stream_names, stream)
169+
module_node = ProductModule(ps_module, cpes)
170+
module_node.parent = stream_node
171+
self.product_trees.append(stream_node)
172+
173+
@staticmethod
174+
def _check_stream_name(seen_stream_names, stream):
175+
if stream in seen_stream_names:
176+
console.print(
177+
f"Warning: duplicate stream: {stream} detected.", style="warning"
178+
)
179+
seen_stream_names.add(stream)
180+
181+
def match_module_pattern(self, cpe: str) -> list[ProductModule]:
182+
module_matches = []
183+
for module_tree in self.product_trees:
184+
for modules in LevelOrderGroupIter(module_tree, maxlevel=2):
185+
for module in modules:
186+
if not isinstance(module, ProductModule):
187+
continue
188+
if module.match(cpe):
189+
module_matches.append(module)
190+
return module_matches
191+
192+
@staticmethod
193+
def _clean_cpe(cpe: str) -> str:
194+
# CPEs from SBOMs have extra characters added to them, clean them up here
195+
# see https://github.com/trustification/trustify/issues/1621
196+
# Remove all '*' characters
197+
cleaned_cpe = cpe.replace("*", "")
198+
# Remove trailing ':' characters
199+
return cleaned_cpe.rstrip(":")
200+
201+
def extend_with_product_mappings(self, ancestor_trees: list[Node]) -> list[Node]:
202+
"""Create a new list of results with any matching streams or module as ancestors"""
203+
if not self.product_trees:
204+
# ProdDefs service is unavailable, don't attempt any product mapping
205+
return ancestor_trees
206+
ancestors_with_products: list[Node] = []
207+
for tree in ancestor_trees:
208+
for leaf in tree.leaves:
209+
cleaned_leaf_name = self._clean_cpe(leaf.name)
210+
leaf_with_products = self._check_streams(leaf, cleaned_leaf_name)
211+
if not leaf_with_products:
212+
leaf_with_products = self._check_modules(leaf, cleaned_leaf_name)
213+
if not leaf_with_products:
214+
console.print(
215+
f"Warning, didn't find any products matching {cleaned_leaf_name}",
216+
style="warning",
217+
)
218+
ancestors_with_products.extend(leaf_with_products)
219+
return ancestors_with_products
220+
221+
def _check_streams(self, root: Node, cleaned_root_name: str) -> list[Node]:
222+
"""Check if cpe matches exactly to any ProductStreams, if it does add the CPE as a parent
223+
of the stream. If more than one stream matches, create copies of the stream and root"""
224+
if cleaned_root_name not in self.stream_nodes_by_cpe:
225+
return []
226+
stream_nodes = self.stream_nodes_by_cpe[cleaned_root_name]
227+
# Create a copy so that pop in the _duplicate_roots_and_set_parent function doesn't modify
228+
# the original stream_nodes_by_cpe map which should be preserved incase we encounter the
229+
# same CPE twice
230+
copy_of_stream_nodes = copy.deepcopy(stream_nodes)
231+
return self._duplicate_roots_and_set_parents(root, copy_of_stream_nodes)
232+
233+
def _check_modules(self, root: Node, cleaned_root_name: str) -> list[Node]:
234+
# Check if the cpe matches any ProductModule
235+
module_nodes = self.match_module_pattern(cleaned_root_name)
236+
return self._duplicate_roots_and_set_parents(root, module_nodes)
237+
238+
def _duplicate_roots_and_set_parents(self, root, product_nodes) -> list[Node]:
239+
"""Assign each product as a ancestor of the root. Copy the root when assigning it another
240+
parent because one root can exist in mutliple products"""
241+
root_with_products: list[Node] = []
242+
while product_nodes:
243+
last_product = product_nodes.pop()
244+
copy_of_root = copy.deepcopy(root)
245+
copy_of_product = copy.deepcopy(last_product)
246+
self._add_ancestor(copy_of_root, copy_of_product)
247+
root_with_products.append(copy_of_root)
248+
# For the last item in the product_nodes list no need to copy:
249+
if len(product_nodes) == 1:
250+
self._add_ancestor(root, product_nodes.pop())
251+
root_with_products.append(root)
252+
return root_with_products
253+
254+
def _add_ancestor(self, copy_of_root, copy_of_product):
255+
if copy_of_product.parent:
256+
copy_of_product.parent.parent = copy_of_root
257+
else:
258+
copy_of_product.parent = copy_of_root

src/trustshell/products.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
print_version,
2121
urlencoded,
2222
)
23+
from trustshell.product_definitions import ProdDefs
2324

2425
ANALYSIS_ENDPOINT = f"{TRUSTIFY_URL}analysis/latest/component"
2526
MAX_I64 = 2**63 - 1
@@ -83,8 +84,12 @@ def search(purl: str, debug: bool):
8384
if not ancestor_trees or len(ancestor_trees) == 0:
8485
console.print("No results")
8586
return
87+
88+
prod_defs = ProdDefs()
89+
ancestor_trees = prod_defs.extend_with_product_mappings(ancestor_trees)
90+
8691
for tree in ancestor_trees:
87-
_render_tree(tree)
92+
_render_tree(tree.root)
8893

8994

9095
def _render_tree(root: Node):
@@ -104,7 +109,6 @@ def _get_roots(base_purl: str) -> list[Node]:
104109
access_token = check_or_get_access_token()
105110
auth_header = {"Authorization": f"Bearer {access_token}"}
106111

107-
# TODO change back to purl~ (like) query?
108112
request_url = (
109113
f"{ANALYSIS_ENDPOINT}?ancestors={MAX_I64}&q={urlencoded(f'purl~{base_purl}@')}"
110114
)

0 commit comments

Comments
 (0)