Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 10 additions & 100 deletions cfn-lint-serverless/cfn_lint_serverless/rules/lambda_.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@
"""


import re
from collections import defaultdict
from typing import Dict, List, Optional, Tuple, Union
from typing import Dict, List, Union

from cfnlint.rules import CloudFormationLintRule, RuleMatch

Expand Down Expand Up @@ -199,88 +198,6 @@ class LambdaLogRetentionRule(CloudFormationLintRule):

_message = "Lambda function {} does not have a corresponding log group with a Retention property"

def _get_function_from_join(self, log_group_join: list) -> Optional[Tuple[str, str]]:
"""
Return the function reference from a LogGroupName Join intrinsic function
"""

# First item in a Join function is a delimiter (string)
# Second item is a list of elements that are joined by the delimiter
# One of those elements might be the function name
for value in log_group_join[1]:
if not isinstance(value, dict):
continue

if "Fn::Ref" in value:
return ("ref", value["Fn::Ref"])
if "Ref" in value:
return ("ref", value["Ref"])

return None

def _get_function_from_sub(self, log_group_sub: Union[str, list]) -> Optional[Tuple[str, str]]:
"""
Return the function reference from a LogGroupName Sub intrinsic function
"""

# LogGroupName: !Sub "/aws/lambda/${Function}"
if isinstance(log_group_sub, str):
match = re.search(r"/aws/lambda/\${(?P<func>[^}]+)}", log_group_sub)
if match is not None:
return ("ref", match["func"])

elif isinstance(log_group_sub, list):
match = re.search(r"/aws/lambda/\${(?P<func>[^}]+)}", log_group_sub[0])

if match is not None:
func_name = match["func"]
# LogGroupName: !Sub ["/${Aws}/lambda/${Function}", {Aws: "aws"}]
if func_name not in log_group_sub[1]:
return ("ref", func_name)

func_name = log_group_sub[1][func_name]

# LogGroupName: !Sub ["/aws/lambda/${Function}", {Function: "my-function-name"}]
if isinstance(func_name, str):
return ("name", func_name)
# LogGroupName: !Sub ["/aws/lambda/${Function}", {Function: !Ref MyFunction}]
elif isinstance(func_name, dict):
if "Fn::Ref" in func_name:
return ("ref", func_name["Fn::Ref"])
if "Ref" in func_name:
return ("ref", func_name["Ref"])

return None

def _get_function_from_log_group(self, log_group_name: Union[dict, str]) -> Optional[Tuple[str, str]]:
"""
Return the function name or reference from a LogGroupName property
"""

# Cases where the function name is hardcoded
# e.g. '/aws/lambda/static_function_name'
if isinstance(log_group_name, str) and log_group_name.find("/aws/lambda/") == 0:
return ("name", log_group_name[12:])

# Shortcut as next tests require log_group_name to be a dict
if not isinstance(log_group_name, dict):
return None

# Join functions
if "Fn::Join" in log_group_name:
return self._get_function_from_join(log_group_name["Fn::Join"])
if "Join" in log_group_name:
return self._get_function_from_join(log_group_name["Join"])

# Sub functions
if "Fn::Sub" in log_group_name:
return self._get_function_from_sub(log_group_name["Fn::Sub"])
if "Sub" in log_group_name:
return self._get_function_from_sub(log_group_name["Sub"])

# Default case
return None

def _get_valid_functions(self, log_groups):
"""
Return function names with valid LogGroups
Expand All @@ -291,26 +208,19 @@ def _get_valid_functions(self, log_groups):

# Scan log groups for resource names
for resource in log_groups.values():
# This use an autogenerated log group name
if "LogGroupName" not in resource.get("Properties", {}):
continue

log_group_name = resource.get("Properties", {}).get("LogGroupName")
log_group_name = Value(resource.get("Properties", {}).get("LogGroupName", None))
retention = resource.get("Properties", {}).get("RetentionInDays", None)

# No retention
if retention is None:
# No retention or log group name, break early
if log_group_name is None or retention is None:
continue

# Look for Substitution function
retval = self._get_function_from_log_group(log_group_name)
if retval is not None:

if retval[0] == "ref":
known_refs.append(retval[1])

if retval[0] == "name":
known_names.append(retval[1])
# Searching for references in log group name
if len(log_group_name.references) > 0:
known_refs.extend(log_group_name.references)
# Otherwise, check if this is a hardcoded name
elif log_group_name.id.find("/aws/lambda/") == 0:
known_names.append(log_group_name.id[12:])

return {"ref": known_refs, "name": known_names}

Expand Down
26 changes: 24 additions & 2 deletions cfn-lint-serverless/cfn_lint_serverless/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,31 @@


import re
from typing import List, Tuple, Union
from typing import List, Tuple, TypeVar, Union

SUB_PATTERN = re.compile(r"\${(?P<ref>[^}]+)}")


TValue = TypeVar("TValue", bound="Value")


class Value:

id = "" # noqa: VNE003
references = None

def __new__(cls, value: Union[None, dict, str]) -> Union[None, TValue]:
"""
Create a new Value object

If the 'value' passed is None, this will return None instead of a class object
"""

if value is None:
return None

return super(Value, cls).__new__(cls)

def __init__(self, value: Union[dict, str]):
"""
Parse a CloudFormation value
Expand Down Expand Up @@ -99,7 +114,14 @@ def _get_from_sub(self, value: Union[str, list]) -> Tuple[str, List[str]]:

for match in SUB_PATTERN.findall(pattern):
if match in variables:
references.extend(variables[match].references)
# Variable with reference(s)
if len(variables[match].references) > 0:
references.extend(variables[match].references)
# Hard-coded variable
else:
# Replace with hard-coded value in value ID
pattern = pattern.replace(f"${{{match}}}", variables[match].id)
# No matching variable
else:
references.append(match)

Expand Down
18 changes: 15 additions & 3 deletions cfn-lint-serverless/tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
},
# Fn::Sub
{"input": {"Fn::Sub": "abc-${MyResource}"}, "id": "abc-${MyResource}", "references": ["MyResource"]},
# Fn::Sub with variables
{"input": {"Fn::Sub": ["abc-${MyVar}", {"MyVar": "MyResource"}]}, "id": "abc-${MyVar}", "references": []},
# Fn::Sub with hard-coded variables
{"input": {"Fn::Sub": ["abc-${MyVar}", {"MyVar": "MyResource"}]}, "id": "abc-MyResource", "references": []},
# Fn::Sub with variables and references
{
"input": {"Fn::Sub": ["abc-${MyVar}", {"MyVar": {"Ref": "MyResource"}}]},
Expand All @@ -41,9 +41,21 @@ def test_value(case):
Test Value()
"""

print(case)
print(f"case: {case}")

output = utils.Value(case["input"])

print(f"output id: {output.id}")
print(f"output ref: {output.references}")

assert case["id"] == output.id
assert case["references"] == output.references


def test_none_value():
"""
Test Value(None)
"""

output = utils.Value(None)
assert output is None