Skip to content

Commit

Permalink
CloudFormation: Implement Fn::Sub variable mapping (#6124)
Browse files Browse the repository at this point in the history
  • Loading branch information
jmsanders authored Mar 24, 2023
1 parent d33fe9e commit 5c2fc55
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 33 deletions.
68 changes: 35 additions & 33 deletions moto/cloudformation/parsing.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import string
import functools
import json
import logging
Expand Down Expand Up @@ -189,40 +190,41 @@ def clean_json(resource_json: Any, resources_map: "ResourceMap") -> Any:
return select_list[select_index]

if "Fn::Sub" in resource_json:
if isinstance(resource_json["Fn::Sub"], list):
warnings.warn(
"Tried to parse Fn::Sub with variable mapping but it's not supported by moto's CloudFormation implementation"
)
else:
fn_sub_value = clean_json(resource_json["Fn::Sub"], resources_map)
to_sub = re.findall(r'(?=\${)[^!^"]*?}', fn_sub_value)
literals = re.findall(r'(?=\${!)[^"]*?}', fn_sub_value)
for sub in to_sub:
if "." in sub:
cleaned_ref = clean_json(
{
"Fn::GetAtt": re.findall(r'(?<=\${)[^"]*?(?=})', sub)[
0
].split(".")
},
resources_map,
)
else:
cleaned_ref = clean_json(
{"Ref": re.findall(r'(?<=\${)[^"]*?(?=})', sub)[0]},
resources_map,
)
if cleaned_ref is not None:
fn_sub_value = fn_sub_value.replace(sub, str(cleaned_ref))
else:
# The ref was not found in the template - either it didn't exist, or we couldn't parse it
pass
for literal in literals:
fn_sub_value = fn_sub_value.replace(
literal, literal.replace("!", "")
template = resource_json["Fn::Sub"]

if isinstance(template, list):
template, mappings = resource_json["Fn::Sub"]
for key, value in mappings.items():
template = string.Template(template).safe_substitute(
**{key: str(clean_json(value, resources_map))}
)

fn_sub_value = clean_json(template, resources_map)
to_sub = re.findall(r'(?=\${)[^!^"]*?}', fn_sub_value)
literals = re.findall(r'(?=\${!)[^"]*?}', fn_sub_value)
for sub in to_sub:
if "." in sub:
cleaned_ref = clean_json(
{
"Fn::GetAtt": re.findall(r'(?<=\${)[^"]*?(?=})', sub)[
0
].split(".")
},
resources_map,
)
else:
cleaned_ref = clean_json(
{"Ref": re.findall(r'(?<=\${)[^"]*?(?=})', sub)[0]},
resources_map,
)
return fn_sub_value
pass
if cleaned_ref is not None:
fn_sub_value = fn_sub_value.replace(sub, str(cleaned_ref))
else:
# The ref was not found in the template - either it didn't exist, or we couldn't parse it
pass
for literal in literals:
fn_sub_value = fn_sub_value.replace(literal, literal.replace("!", ""))
return fn_sub_value

if "Fn::ImportValue" in resource_json:
cleaned_val = clean_json(resource_json["Fn::ImportValue"], resources_map)
Expand Down
52 changes: 52 additions & 0 deletions tests/test_cloudformation/test_stack_parsing.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,33 @@
},
}

sub_mapping_template = {
"AWSTemplateFormatVersion": "2010-09-09",
"Parameters": {
"TestRef": {"Type": "String"},
},
"Conditions": {
"IsApple": {"Fn::Equals": [{"Ref": "TestRef"}, "apple"]},
},
"Resources": {
"Queue": {
"Type": "AWS::SQS::Queue",
"Properties": {
"QueueName": {
"Fn::Sub": [
"${AWS::StackName}-queue-${TestRef}-${TestFn}",
{
"TestRef": {"Ref": "TestRef"},
"TestFn": {"Fn::If": ["IsApple", "yes", "no"]},
},
],
},
"VisibilityTimeout": 60,
},
},
},
}

export_value_template = {
"AWSTemplateFormatVersion": "2010-09-09",
"Resources": {
Expand Down Expand Up @@ -191,6 +218,7 @@
split_select_template_json = json.dumps(split_select_template)
sub_template_json = json.dumps(sub_template)
sub_num_template_json = json.dumps(sub_num_template)
sub_mapping_json = json.dumps(sub_mapping_template)
export_value_template_json = json.dumps(export_value_template)
import_value_template_json = json.dumps(import_value_template)

Expand Down Expand Up @@ -521,6 +549,30 @@ def test_sub_num():
queue.name.should.equal("test_stack-queue-42")


def test_sub_mapping():
stack = FakeStack(
stack_id="test_id",
name="test_stack",
template=sub_mapping_json,
parameters={"TestRef": "apple"},
account_id=ACCOUNT_ID,
region_name="us-west-1",
)
queue = stack.resource_map["Queue"]
queue.name.should.equal("test_stack-queue-apple-yes")

stack = FakeStack(
stack_id="test_id",
name="test_stack",
template=sub_mapping_json,
parameters={"TestRef": "banana"},
account_id=ACCOUNT_ID,
region_name="us-west-1",
)
queue = stack.resource_map["Queue"]
queue.name.should.equal("test_stack-queue-banana-no")


def test_import():
export_stack = FakeStack(
stack_id="test_id",
Expand Down

0 comments on commit 5c2fc55

Please sign in to comment.