@@ -128,28 +128,41 @@ def _extract_groups_and_namespaces_from_token(
128
128
token_review = client .V1TokenReview (
129
129
spec = client .V1TokenReviewSpec (token = access_token )
130
130
)
131
- groups = []
132
- namespaces = []
131
+ groups : list [ str ] = []
132
+ namespaces : list [ str ] = []
133
133
134
134
# Call Token Access Review API
135
135
response = self .auth_v1 .create_token_review (token_review )
136
136
137
137
if response .status .authenticated :
138
138
# Extract groups and namespaces from the response
139
- groups = response .status .groups
139
+ # Groups are in response.status.user.groups, not response.status.groups
140
+ if response .status .user and hasattr (response .status .user , "groups" ):
141
+ groups = response .status .user .groups or []
142
+ else :
143
+ groups = []
140
144
141
145
# Extract namespaces from the user info
142
146
if response .status .user :
143
147
# For service accounts, the namespace is typically in the username
144
148
# For regular users, we might need to extract from groups or other fields
145
- username = response .status .user . get ( "username" , "" )
149
+ username = getattr ( response .status .user , "username" , "" ) or ""
146
150
if ":" in username and username .startswith (
147
151
"system:serviceaccount:"
148
152
):
149
153
# Extract namespace from service account username
150
154
parts = username .split (":" )
151
155
if len (parts ) >= 4 :
152
- namespaces .append (parts [2 ]) # namespace is the 3rd part
156
+ service_account_namespace = parts [
157
+ 2
158
+ ] # namespace is the 3rd part
159
+ namespaces .append (service_account_namespace )
160
+
161
+ # For service accounts, also extract groups that have access to this namespace
162
+ namespace_groups = self ._extract_namespace_access_groups (
163
+ service_account_namespace
164
+ )
165
+ groups .extend (namespace_groups )
153
166
154
167
# Also check if there are namespace-specific groups
155
168
for group in groups :
@@ -170,6 +183,98 @@ def _extract_groups_and_namespaces_from_token(
170
183
# We dont need to extract groups and namespaces from jwt decoding, not ideal for kubernetes auth
171
184
return groups , namespaces
172
185
186
+ def _extract_namespace_access_groups (self , namespace : str ) -> list [str ]:
187
+ """
188
+ Extract groups that have access to a specific namespace by querying RoleBindings and ClusterRoleBindings.
189
+
190
+ Args:
191
+ namespace: The namespace to check for group access
192
+
193
+ Returns:
194
+ list[str]: List of groups that have access to the namespace
195
+ """
196
+ groups = []
197
+ try :
198
+ # Get RoleBindings in the namespace
199
+ role_bindings = self .rbac_v1 .list_namespaced_role_binding (
200
+ namespace = namespace
201
+ )
202
+ for rb in role_bindings .items :
203
+ for subject in rb .subjects or []:
204
+ if subject .kind == "Group" :
205
+ groups .append (subject .name )
206
+ logger .debug (
207
+ f"Found group { subject .name } in RoleBinding { rb .metadata .name } "
208
+ )
209
+
210
+ # Get ClusterRoleBindings that might grant access to this namespace
211
+ cluster_role_bindings = self .rbac_v1 .list_cluster_role_binding ()
212
+ for crb in cluster_role_bindings .items :
213
+ # Check if this ClusterRoleBinding grants access to the namespace
214
+ if self ._cluster_role_binding_grants_namespace_access (crb , namespace ):
215
+ for subject in crb .subjects or []:
216
+ if subject .kind == "Group" :
217
+ groups .append (subject .name )
218
+ logger .debug (
219
+ f"Found group { subject .name } in ClusterRoleBinding { crb .metadata .name } "
220
+ )
221
+
222
+ # Remove duplicates and sort
223
+ groups = sorted (list (set (groups )))
224
+ logger .info (
225
+ f"Found { len (groups )} groups with access to namespace { namespace } : { groups } "
226
+ )
227
+
228
+ except Exception as e :
229
+ logger .error (
230
+ f"Failed to extract namespace access groups for { namespace } : { e } "
231
+ )
232
+
233
+ return groups
234
+
235
+ def _cluster_role_binding_grants_namespace_access (
236
+ self , cluster_role_binding , namespace : str
237
+ ) -> bool :
238
+ """
239
+ Check if a ClusterRoleBinding grants access to a specific namespace.
240
+ This is a simplified check - in practice, you might need more sophisticated logic.
241
+
242
+ Args:
243
+ cluster_role_binding: The ClusterRoleBinding to check
244
+ namespace: The namespace to check access for
245
+
246
+ Returns:
247
+ bool: True if the ClusterRoleBinding likely grants access to the namespace
248
+ """
249
+ try :
250
+ # Get the ClusterRole referenced by this binding
251
+ cluster_role_name = cluster_role_binding .role_ref .name
252
+ cluster_role = self .rbac_v1 .read_cluster_role (name = cluster_role_name )
253
+
254
+ # Check if the ClusterRole has rules that could grant access to the namespace
255
+ for rule in cluster_role .rules or []:
256
+ # Check if the rule applies to namespaces or has wildcard access
257
+ if (
258
+ rule .resources
259
+ and ("namespaces" in rule .resources or "*" in rule .resources )
260
+ and rule .verbs
261
+ and (
262
+ "get" in rule .verbs or "list" in rule .verbs or "*" in rule .verbs
263
+ )
264
+ ):
265
+ return True
266
+
267
+ # Check if the rule has resourceNames that include our namespace
268
+ if rule .resource_names and namespace in rule .resource_names :
269
+ return True
270
+
271
+ except Exception as e :
272
+ logger .debug (
273
+ f"Error checking ClusterRoleBinding { cluster_role_binding .metadata .name } : { e } "
274
+ )
275
+
276
+ return False
277
+
173
278
174
279
def _decode_token (access_token : str ) -> tuple [str , str ]:
175
280
"""
0 commit comments