-
Notifications
You must be signed in to change notification settings - Fork 2.4k
/
Copy pathgenerators.py
417 lines (363 loc) · 19.2 KB
/
generators.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
import json
from copy import deepcopy
from typing import Any, Dict, List, Tuple
from samtranslator.metrics.method_decorator import cw_timer
from samtranslator.model.exceptions import InvalidEventException, InvalidResourceException
from samtranslator.model.iam import IAMRole, IAMRolePolicies
from samtranslator.model.intrinsics import fnJoin, is_intrinsic
from samtranslator.model.resource_policies import ResourcePolicies
from samtranslator.model.role_utils import construct_role_for_resource
from samtranslator.model.s3_utils.uri_parser import parse_s3_uri
from samtranslator.model.stepfunctions.resources import (
StepFunctionsStateMachine,
StepFunctionsStateMachineAlias,
StepFunctionsStateMachineVersion,
)
from samtranslator.model.tags.resource_tagging import get_tag_list
from samtranslator.model.xray_utils import get_xray_managed_policy_name
from samtranslator.utils.cfn_dynamic_references import is_dynamic_reference
class StateMachineGenerator:
_SAM_KEY = "stateMachine:createdBy"
_SAM_VALUE = "SAM"
_SUBSTITUTION_NAME_TEMPLATE = "definition_substitution_%s"
_SUBSTITUTION_KEY_TEMPLATE = "${definition_substitution_%s}"
SFN_INVALID_PROPERTY_BOTH_ROLE_POLICY = (
"Specify either 'Role' or 'Policies' (but not both at the same time) or neither of them"
)
def __init__( # type: ignore[no-untyped-def] # noqa: PLR0913
self,
logical_id,
depends_on,
managed_policy_map,
intrinsics_resolver,
definition,
definition_uri,
logging,
name,
policies,
permissions_boundary,
definition_substitutions,
role,
state_machine_type,
tracing,
events,
event_resources,
event_resolver,
role_path=None,
tags=None,
resource_attributes=None,
passthrough_resource_attributes=None,
get_managed_policy_map=None,
auto_publish_alias=None,
deployment_preference=None,
use_alias_as_event_target=None,
):
"""
Constructs an State Machine Generator class that generates a State Machine resource
:param logical_id: Logical id of the SAM State Machine Resource
:param depends_on: Any resources that need to be depended on
:param managed_policy_map: Map of managed policy names to the ARNs
:param intrinsics_resolver: Instance of the resolver that knows how to resolve parameter references
:param definition: State Machine definition
:param definition_uri: URI to State Machine definition
:param logging: Logging configuration for the State Machine
:param name: Name of the State Machine resource
:param policies: Policies attached to the execution role
:param permissions_boundary: The ARN of the policy used to set the permissions boundary for the role
:param definition_substitutions: Variable-to-value mappings to be replaced in the State Machine definition
:param role: Role ARN to use for the execution role
:param role_path: The file path of the execution role
:param state_machine_type: Type of the State Machine
:param tracing: Tracing configuration for the State Machine
:param events: List of event sources for the State Machine
:param event_resources: Event resources to link
:param event_resolver: Resolver that maps Event types to Event classes
:param tags: Tags to be associated with the State Machine resource
:param resource_attributes: Resource attributes to add to the State Machine resource
:param passthrough_resource_attributes: Attributes such as `Condition` that are added to derived resources
:param auto_publish_alias: Name of the state machine alias to automatically create and update
:deployment_preference: Settings to enable gradual state machine deployments
:param use_alias_as_event_target: Whether to use the state machine alias as the event target
"""
self.logical_id = logical_id
self.depends_on = depends_on
self.managed_policy_map = managed_policy_map
self.intrinsics_resolver = intrinsics_resolver
self.passthrough_resource_attributes = passthrough_resource_attributes
self.resource_attributes = resource_attributes
self.definition = definition
self.definition_uri = definition_uri
self.name = name
self.logging = logging
self.policies = policies
self.permissions_boundary = permissions_boundary
self.definition_substitutions = definition_substitutions
self.role = role
self.role_path = role_path
self.type = state_machine_type
self.tracing = tracing
self.events = events
self.event_resources = event_resources
self.event_resolver = event_resolver
self.tags = tags
self.state_machine = StepFunctionsStateMachine(
logical_id, depends_on=depends_on, attributes=resource_attributes
)
self.substitution_counter = 1
self.get_managed_policy_map = get_managed_policy_map
self.auto_publish_alias = auto_publish_alias
self.deployment_preference = deployment_preference
self.use_alias_as_event_target = use_alias_as_event_target
@cw_timer(prefix="Generator", name="StateMachine")
def to_cloudformation(self): # type: ignore[no-untyped-def]
"""
Constructs and returns the State Machine resource and any additional resources associated with it.
:returns: a list of resources including the State Machine resource.
:rtype: list
"""
resources: List[Any] = [self.state_machine]
# Defaulting to {} will add the DefinitionSubstitutions field on the transform output even when it is not relevant
if self.definition_substitutions:
self.state_machine.DefinitionSubstitutions = self.definition_substitutions
if self.definition and self.definition_uri:
raise InvalidResourceException(
self.logical_id, "Specify either 'Definition' or 'DefinitionUri' property and not both."
)
if self.definition:
processed_definition = deepcopy(self.definition)
substitutions = self._replace_dynamic_values_with_substitutions(processed_definition) # type: ignore[no-untyped-call]
if len(substitutions) > 0:
if self.state_machine.DefinitionSubstitutions:
self.state_machine.DefinitionSubstitutions.update(substitutions)
else:
self.state_machine.DefinitionSubstitutions = substitutions
self.state_machine.DefinitionString = self._build_definition_string(processed_definition) # type: ignore[no-untyped-call]
elif self.definition_uri:
self.state_machine.DefinitionS3Location = self._construct_definition_uri()
else:
raise InvalidResourceException(
self.logical_id, "Either 'Definition' or 'DefinitionUri' property must be specified."
)
if self.role and self.policies:
raise InvalidResourceException(self.logical_id, self.SFN_INVALID_PROPERTY_BOTH_ROLE_POLICY)
if self.role:
self.state_machine.RoleArn = self.role
else:
if not self.policies:
self.policies = []
execution_role = self._construct_role()
self.state_machine.RoleArn = execution_role.get_runtime_attr("arn")
resources.append(execution_role)
self.state_machine.StateMachineName = self.name
self.state_machine.StateMachineType = self.type
self.state_machine.LoggingConfiguration = self.logging
self.state_machine.TracingConfiguration = self.tracing
self.state_machine.Tags = self._construct_tag_list()
managed_traffic_shifting_resources = self._generate_managed_traffic_shifting_resources()
resources.extend(managed_traffic_shifting_resources)
event_resources = self._generate_event_resources()
resources.extend(event_resources)
return resources
def _construct_definition_uri(self) -> Dict[str, Any]:
"""
Constructs the State Machine's `DefinitionS3 property`_, from the SAM State Machines's DefinitionUri property.
:returns: a DefinitionUri dict, containing the S3 Bucket, Key, and Version of the State Machine definition.
:rtype: dict
"""
if isinstance(self.definition_uri, dict):
if not self.definition_uri.get("Bucket", None) or not self.definition_uri.get("Key", None):
# DefinitionUri is a dictionary but does not contain Bucket or Key property
raise InvalidResourceException(
self.logical_id, "'DefinitionUri' requires Bucket and Key properties to be specified."
)
s3_pointer = self.definition_uri
else:
# DefinitionUri is a string
parsed_s3_pointer = parse_s3_uri(self.definition_uri)
if parsed_s3_pointer is None:
raise InvalidResourceException(
self.logical_id,
"'DefinitionUri' is not a valid S3 Uri of the form "
"'s3://bucket/key' with optional versionId query parameter.",
)
s3_pointer = parsed_s3_pointer
definition_s3 = {"Bucket": s3_pointer["Bucket"], "Key": s3_pointer["Key"]}
if "Version" in s3_pointer:
definition_s3["Version"] = s3_pointer["Version"]
return definition_s3
def _build_definition_string(self, definition_dict): # type: ignore[no-untyped-def]
"""
Builds a CloudFormation definition string from a definition dictionary. The definition string constructed is
a Fn::Join intrinsic function to make it readable.
:param definition_dict: State machine definition as a dictionary
:returns: the state machine definition.
:rtype: dict
"""
# Indenting and then splitting the JSON-encoded string for readability of the state machine definition in the CloudFormation translated resource.
# Separators are passed explicitly to maintain trailing whitespace consistency across Py2 and Py3
definition_lines = json.dumps(definition_dict, sort_keys=True, indent=4, separators=(",", ": ")).split("\n")
return fnJoin("\n", definition_lines)
def _construct_role(self) -> IAMRole:
"""
Constructs a State Machine execution role based on this SAM State Machine's Policies property.
:returns: the generated IAM Role
:rtype: model.iam.IAMRole
"""
policies = self.policies[:]
if self.tracing and self.tracing.get("Enabled") is True:
policies.append(get_xray_managed_policy_name())
state_machine_policies = ResourcePolicies(
{"Policies": policies},
# No support for policy templates in the "core"
policy_template_processor=None,
)
return construct_role_for_resource(
resource_logical_id=self.logical_id,
role_path=self.role_path,
attributes=self.passthrough_resource_attributes,
managed_policy_map=self.managed_policy_map,
assume_role_policy_document=IAMRolePolicies.stepfunctions_assume_role_policy(),
resource_policies=state_machine_policies,
tags=self._construct_tag_list(),
permissions_boundary=self.permissions_boundary,
get_managed_policy_map=self.get_managed_policy_map,
)
def _construct_tag_list(self) -> List[Dict[str, Any]]:
"""
Transforms the SAM defined Tags into the form CloudFormation is expecting.
:returns: List of Tag Dictionaries
:rtype: list
"""
sam_tag = {self._SAM_KEY: self._SAM_VALUE}
return get_tag_list(sam_tag) + get_tag_list(self.tags)
def _construct_version(self) -> StepFunctionsStateMachineVersion:
"""Constructs a state machine version resource that will be auto-published when the revision id of the state machine changes.
:return: Step Functions state machine version resource
"""
# Unlike Lambda function versions, state machine versions do not need a hash suffix because
# they are always replaced when their corresponding state machine is updated.
# I.e. A SAM StateMachine resource will never have multiple version resources at the same time.
logical_id = f"{self.logical_id}Version"
attributes = self.passthrough_resource_attributes.copy()
# Both UpdateReplacePolicy and DeletionPolicy are needed to protect previous version from deletion
# to ensure gradual deployment works.
if "DeletionPolicy" not in attributes:
attributes["DeletionPolicy"] = "Retain"
if "UpdateReplacePolicy" not in attributes:
attributes["UpdateReplacePolicy"] = "Retain"
state_machine_version = StepFunctionsStateMachineVersion(logical_id=logical_id, attributes=attributes)
state_machine_version.StateMachineArn = self.state_machine.get_runtime_attr("arn")
state_machine_version.StateMachineRevisionId = self.state_machine.get_runtime_attr("state_machine_revision_id")
return state_machine_version
def _construct_alias(self, version: StepFunctionsStateMachineVersion) -> StepFunctionsStateMachineAlias:
"""Constructs a state machine alias resource pointing to the given state machine version.
:return: Step Functions state machine alias resource
"""
logical_id = f"{self.logical_id}Alias{self.auto_publish_alias}"
attributes = self.passthrough_resource_attributes
state_machine_alias = StepFunctionsStateMachineAlias(logical_id=logical_id, attributes=attributes)
state_machine_alias.Name = self.auto_publish_alias
state_machine_version_arn = version.get_runtime_attr("arn")
deployment_preference = {}
if self.deployment_preference:
deployment_preference = self.deployment_preference
else:
deployment_preference["Type"] = "ALL_AT_ONCE"
deployment_preference["StateMachineVersionArn"] = state_machine_version_arn
state_machine_alias.DeploymentPreference = deployment_preference
self.state_machine_alias = state_machine_alias
return state_machine_alias
def _generate_managed_traffic_shifting_resources(
self,
) -> List[Any]:
"""Generates and returns the version and alias resources associated with this state machine's managed traffic shifting.
:returns: a list containing the state machine's version and alias resources
:rtype: list
"""
if not self.auto_publish_alias and self.use_alias_as_event_target:
raise InvalidResourceException(
self.logical_id, "'UseAliasAsEventTarget' requires 'AutoPublishAlias' property to be specified."
)
if not self.auto_publish_alias and not self.deployment_preference:
return []
if not self.auto_publish_alias and self.deployment_preference:
raise InvalidResourceException(
self.logical_id, "'DeploymentPreference' requires 'AutoPublishAlias' property to be specified."
)
state_machine_version = self._construct_version()
return [state_machine_version, self._construct_alias(state_machine_version)]
def _generate_event_resources(self) -> List[Dict[str, Any]]:
"""Generates and returns the resources associated with this state machine's event sources.
:returns: a list containing the state machine's event resources
:rtype: list
"""
resources = []
if self.events:
for logical_id, event_dict in self.events.items():
kwargs = {
"intrinsics_resolver": self.intrinsics_resolver,
"permissions_boundary": self.permissions_boundary,
}
try:
eventsource = self.event_resolver.resolve_resource_type(event_dict).from_dict(
self.state_machine.logical_id + logical_id, event_dict, logical_id
)
for name, resource in self.event_resources[logical_id].items():
kwargs[name] = resource
except (TypeError, AttributeError) as e:
raise InvalidEventException(logical_id, str(e)) from e
target_resource = (
(self.state_machine_alias or self.state_machine)
if self.use_alias_as_event_target
else self.state_machine
)
resources += eventsource.to_cloudformation(resource=target_resource, **kwargs)
return resources
def _replace_dynamic_values_with_substitutions(self, _input): # type: ignore[no-untyped-def]
"""
Replaces the CloudFormation instrinsic functions and dynamic references within the input with substitutions.
:param _input: Input dictionary in which the dynamic values need to be replaced with substitutions
:returns: List of substitution to dynamic value mappings
:rtype: dict
"""
substitution_map = {}
for path in self._get_paths_to_intrinsics(_input): # type: ignore[no-untyped-call]
location = _input
for step in path[:-1]:
location = location[step]
sub_name, sub_key = self._generate_substitution()
substitution_map[sub_name] = location[path[-1]]
location[path[-1]] = sub_key
return substitution_map
def _get_paths_to_intrinsics(self, _input, path=None): # type: ignore[no-untyped-def]
"""
Returns all paths to dynamic values within a dictionary
:param _input: Input dictionary to find paths to dynamic values in
:param path: Optional list to keep track of the path to the input dictionary
:returns list: List of keys that defines the path to a dynamic value within the input dictionary
"""
if path is None:
path = []
dynamic_value_paths = [] # type: ignore[var-annotated]
if isinstance(_input, dict):
iterator = _input.items()
elif isinstance(_input, list):
iterator = enumerate(_input) # type: ignore[assignment]
else:
return dynamic_value_paths
for key, value in sorted(iterator, key=lambda item: item[0]): # type: ignore[no-any-return]
if is_intrinsic(value) or is_dynamic_reference(value):
dynamic_value_paths.append([*path, key])
elif isinstance(value, (dict, list)):
dynamic_value_paths.extend(self._get_paths_to_intrinsics(value, [*path, key])) # type: ignore[no-untyped-call]
return dynamic_value_paths
def _generate_substitution(self) -> Tuple[str, str]:
"""
Generates a name and key for a new substitution.
:returns: Substitution name and key
:rtype: string, string
"""
substitution_name = self._SUBSTITUTION_NAME_TEMPLATE % self.substitution_counter
substitution_key = self._SUBSTITUTION_KEY_TEMPLATE % self.substitution_counter
self.substitution_counter += 1
return substitution_name, substitution_key