@@ -22,6 +22,18 @@ def get_configmap(templates, configmap_name):
22
22
raise ValueError (f"ConfigMap { configmap_name } not found" )
23
23
24
24
25
+ def get_secret (templates , secret_name ):
26
+ """
27
+ Get the content of a Secret with the given name.
28
+ :param secret_name: The name of the Secret to retrieve.
29
+ :return: A string containing the content of the Secret, or an empty string if not found.
30
+ """
31
+ for t in templates :
32
+ if t ["kind" ] == "Secret" and t ["metadata" ]["name" ] == secret_name :
33
+ return t
34
+ raise ValueError (f"Secret { secret_name } not found" )
35
+
36
+
25
37
def get_volume_from_mount (template , volume_mount ):
26
38
"""
27
39
Get a specific volume mount from a given template.
@@ -48,7 +60,6 @@ async def test_secrets_consistency(templates):
48
60
This test checks if each secret is correctly associated with its respective volume and container,
49
61
ensuring that no inconsistencies or missing configurations exist.
50
62
"""
51
- secrets = [t for t in templates if t ["kind" ] == "Secret" ]
52
63
workloads = [t for t in templates if t ["kind" ] in ("Deployment" , "StatefulSet" )]
53
64
for template in workloads :
54
65
# Gather all containers and initContainers from the template spec
@@ -58,47 +69,60 @@ async def test_secrets_consistency(templates):
58
69
59
70
for container in containers :
60
71
# Determine which secrets are mounted by this container
61
- mounted_secrets = []
72
+ mounted_secret_keys = []
62
73
mounted_config_maps = []
74
+ secrets_mount_paths = []
75
+ uses_rendered_config = False
63
76
64
77
for volume_mount in container .get ("volumeMounts" , []):
65
78
current_volume = get_volume_from_mount (template , volume_mount )
66
79
if "secret" in current_volume :
67
80
# Extract the paths where this volume's secrets are mounted
68
- for secret in secrets :
69
- if current_volume ["secret" ]["secretName" ] == secret ["metadata" ]["name" ]:
70
- # When secret data is empty, `data:` is None, so use `get_or_empty`
71
- for key in get_or_empty (secret , "data" ):
72
- mounted_path = f"{ volume_mount ['mountPath' ]} /{ key } "
73
- mounted_secrets .append (mounted_path )
74
- break
81
+ secret = get_secret (templates , current_volume ["secret" ]["secretName" ])
82
+ if "subPath" in volume_mount :
83
+ # When using subPath, the key is mounted as the mountPath itself
84
+ mounted_secret_keys .append (f"{ volume_mount ['mountPath' ]} " )
75
85
else :
76
- raise ValueError (
77
- f"Secret name '{ current_volume ['secret' ]['secretName' ]} ' does not match any secret"
78
- )
86
+ # When secret data is empty, `data:` is None, so use `get_or_empty`
87
+ for key in get_or_empty (secret , "data" ):
88
+ # Without subPath, the key will be present as child of the mount path
89
+ mounted_path = f"{ volume_mount ['mountPath' ]} /{ key } "
90
+ mounted_secret_keys .append (mounted_path )
91
+ secrets_mount_paths .append (volume_mount ["mountPath" ])
79
92
elif "configMap" in current_volume :
80
93
# Parse config map content
81
94
mounted_config_maps .append (get_configmap (templates , current_volume ["configMap" ]["name" ]))
95
+ elif "emptyDir" in current_volume and current_volume ["name" ] == "rendered-config" :
96
+ # We can't verify rendered-config, it's generated at runtime
97
+ uses_rendered_config = True
82
98
83
- for volume_mount in container .get ("volumeMounts" , []):
84
- # Only consider volumes that are secrets
85
- current_volume = get_volume_from_mount (template , volume_mount )
86
- if "secret" not in current_volume :
87
- continue
88
- # Parse container commands to find potential mounted secrets
89
- # Make sure that potential mounted secrets are present in mounted secrets
90
- for matches in re .findall (
91
- rf"{ volume_mount ['mountPath' ]} /([A-Za-z0-9._]+)" , "\n " .join (container .get ("command" , []))
92
- ):
93
- assert f"{ volume_mount ['mountPath' ]} /{ matches } " in mounted_secrets , (
94
- f"{ volume_mount ['mountPath' ]} /{ matches } used in container { container ['name' ]} "
99
+ # We look for all secrets mountPath in configs and commands
100
+ # And using a regex, make sure that patterns `<mount path>/<some key>`
101
+ # refers <some key> to an existing mounted secret key
102
+ for mount_path in set (secrets_mount_paths ):
103
+ mount_path_found = False
104
+ # Parse container commands to find paths which would match a mounted secret
105
+ # Make sure that paths which match are actually present in mounted secrets
106
+ for matches in re .findall (rf"{ mount_path } /([^\s\n);]+)" , "\n " .join (container .get ("command" , []))):
107
+ assert f"{ mount_path } /{ matches } " in mounted_secret_keys , (
108
+ f"{ mount_path } /{ matches } used in container { container ['name' ]} "
95
109
+ "but it is not found from any mounted secret"
96
110
)
111
+ mount_path_found = True
112
+ # Parse container configmaps to find paths which would match a mounted secret
113
+ # Make sure that paths which match are actually present in mounted secrets
97
114
for cm in mounted_config_maps :
98
115
for data , content in cm ["data" ].items ():
99
- for matches in re .findall (rf"{ volume_mount ['mountPath' ]} /([A-Za-z0-9._]+)" , content ):
100
- assert f"{ volume_mount ['mountPath' ]} /{ matches } " in mounted_secrets , (
101
- f"{ volume_mount ['mountPath' ]} /{ matches } used in config { cm ['metadata' ]['name' ]} /{ data } "
116
+ for matches in re .findall (rf"{ mount_path } /([^\s\n);]+)" , content ):
117
+ assert f"{ mount_path } /{ matches } " in mounted_secret_keys , (
118
+ f"{ mount_path } /{ matches } used in "
119
+ f"config { cm ['metadata' ]['name' ]} /{ data } "
102
120
f"mounted in container { container ['name' ]} "
103
121
+ "but it is not found from any mounted secret"
104
122
)
123
+ mount_path_found = True
124
+ if not mount_path_found and not uses_rendered_config :
125
+ raise AssertionError (
126
+ f"{ volume_mount ['mountPath' ]} used in container { container ['name' ]} "
127
+ "but no config or command is using it"
128
+ )
0 commit comments