From b5a852d0aa5e435997849d11e9e209ff389d077d Mon Sep 17 00:00:00 2001 From: Cyril Fait Date: Thu, 20 Jun 2024 12:28:42 +0200 Subject: [PATCH] new stage: sdlf-stage-ecsfargate --- sdlf-stage-ecsfargate/.gitignore | 21 + .../lambda/error/src/lambda_function.py | 30 + .../src/lambda_function.py | 53 ++ .../lambda/redrive/src/lambda_function.py | 27 + .../lambda/routing/src/lambda_function.py | 135 ++++ .../state-machine/stage-ecsfargate.asl.json | 112 +++ sdlf-stage-ecsfargate/template.yaml | 651 ++++++++++++++++++ 7 files changed, 1029 insertions(+) create mode 100644 sdlf-stage-ecsfargate/.gitignore create mode 100755 sdlf-stage-ecsfargate/lambda/error/src/lambda_function.py create mode 100755 sdlf-stage-ecsfargate/lambda/postupdate-metadata/src/lambda_function.py create mode 100755 sdlf-stage-ecsfargate/lambda/redrive/src/lambda_function.py create mode 100755 sdlf-stage-ecsfargate/lambda/routing/src/lambda_function.py create mode 100644 sdlf-stage-ecsfargate/state-machine/stage-ecsfargate.asl.json create mode 100644 sdlf-stage-ecsfargate/template.yaml diff --git a/sdlf-stage-ecsfargate/.gitignore b/sdlf-stage-ecsfargate/.gitignore new file mode 100644 index 00000000..4208f885 --- /dev/null +++ b/sdlf-stage-ecsfargate/.gitignore @@ -0,0 +1,21 @@ +# Packaged Templates +output/ + +# Editors +.vscode/ +.idea/ + +# Mac/OSX +.DS_Store + +# Windows +Thumbs.db + +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# Environments +.env +.venv \ No newline at end of file diff --git a/sdlf-stage-ecsfargate/lambda/error/src/lambda_function.py b/sdlf-stage-ecsfargate/lambda/error/src/lambda_function.py new file mode 100755 index 00000000..3ec4e2c4 --- /dev/null +++ b/sdlf-stage-ecsfargate/lambda/error/src/lambda_function.py @@ -0,0 +1,30 @@ +import json +import os + +from datalake_library.commons import init_logger +from datalake_library.configuration.resource_configs import SQSConfiguration +from datalake_library.interfaces.sqs_interface import SQSInterface + +logger = init_logger(__name__) +team = os.environ["TEAM"] +dataset = os.environ["DATASET"] +pipeline = os.environ["PIPELINE"] +pipeline_stage = os.environ["PIPELINE_STAGE"] +org = os.environ["ORG"] +domain = os.environ["DOMAIN"] +env = os.environ["ENV"] + + +def lambda_handler(event, context): + try: + if isinstance(event, str): + event = json.loads(event) + + sqs_config = SQSConfiguration(team, pipeline, pipeline_stage) + sqs_interface = SQSInterface(sqs_config.get_stage_dlq_name) + + logger.info("Execution Failed. Sending original payload to DLQ") + sqs_interface.send_message_to_fifo_queue(json.dumps(event), "failed") + except Exception as e: + logger.error("Fatal error", exc_info=True) + raise e diff --git a/sdlf-stage-ecsfargate/lambda/postupdate-metadata/src/lambda_function.py b/sdlf-stage-ecsfargate/lambda/postupdate-metadata/src/lambda_function.py new file mode 100755 index 00000000..50725451 --- /dev/null +++ b/sdlf-stage-ecsfargate/lambda/postupdate-metadata/src/lambda_function.py @@ -0,0 +1,53 @@ +import os + +from datalake_library import octagon +from datalake_library.commons import init_logger +from datalake_library.octagon import peh + +logger = init_logger(__name__) +team = os.environ["TEAM"] +dataset = os.environ["DATASET"] +pipeline = os.environ["PIPELINE"] +pipeline_stage = os.environ["PIPELINE_STAGE"] +org = os.environ["ORG"] +domain = os.environ["DOMAIN"] +env = os.environ["ENV"] + + +def lambda_handler(event, context): + """Updates the S3 objects metadata catalog + + Arguments: + event {dict} -- Dictionary with details on previous processing step + context {dict} -- Dictionary with details on Lambda context + + Returns: + {dict} -- Dictionary with outcome of the process + """ + try: + logger.info("Initializing Octagon client") + component = context.function_name.split("-")[-2].title() + octagon_client = octagon.OctagonClient().with_run_lambda(True).with_configuration_instance(env).build() + peh_id = event[0][0]["peh_id"] + peh.PipelineExecutionHistoryAPI(octagon_client).retrieve_pipeline_execution(peh_id) + + partial_failure = False + for records in event: + for record in records: + if "processed" not in record or not record["processed"]: + partial_failure = True + + if not partial_failure: + octagon_client.update_pipeline_execution( + status="{} {} Processing".format(pipeline_stage, component), component=component + ) + octagon_client.end_pipeline_execution_success() + else: + raise Exception("Failure: Processing failed for one or more record") + + except Exception as e: + logger.error("Fatal error", exc_info=True) + octagon_client.end_pipeline_execution_failed( + component=component, issue_comment=f"{pipeline_stage} {component} Error: {repr(e)}" + ) + raise e diff --git a/sdlf-stage-ecsfargate/lambda/redrive/src/lambda_function.py b/sdlf-stage-ecsfargate/lambda/redrive/src/lambda_function.py new file mode 100755 index 00000000..5ae3d5b5 --- /dev/null +++ b/sdlf-stage-ecsfargate/lambda/redrive/src/lambda_function.py @@ -0,0 +1,27 @@ +import os + +from datalake_library.commons import init_logger +from datalake_library.configuration.resource_configs import SQSConfiguration +from datalake_library.interfaces.sqs_interface import SQSInterface + +logger = init_logger(__name__) + + +def lambda_handler(event, context): + try: + sqs_config = SQSConfiguration(os.environ["TEAM"], os.environ["PIPELINE"], os.environ["STAGE"]) + dlq_interface = SQSInterface(sqs_config.get_stage_dlq_name) + messages = dlq_interface.receive_messages(1) + if not messages: + logger.info("No messages found in {}".format(sqs_config.get_stage_dlq_name)) + return + + logger.info("Received {} messages".format(len(messages))) + queue_interface = SQSInterface(sqs_config.get_stage_queue_name) + for message in messages: + queue_interface.send_message_to_fifo_queue(message["Body"], "redrive") + logger.info("Redrive message succeeded") + except Exception as e: + logger.error("Fatal error", exc_info=True) + raise e + return diff --git a/sdlf-stage-ecsfargate/lambda/routing/src/lambda_function.py b/sdlf-stage-ecsfargate/lambda/routing/src/lambda_function.py new file mode 100755 index 00000000..524c3c93 --- /dev/null +++ b/sdlf-stage-ecsfargate/lambda/routing/src/lambda_function.py @@ -0,0 +1,135 @@ +import json +import os + +from datalake_library import octagon +from datalake_library.commons import init_logger +from datalake_library.configuration.resource_configs import ( + DynamoConfiguration, + SQSConfiguration, + StateMachineConfiguration, +) +from datalake_library.interfaces.dynamo_interface import DynamoInterface +from datalake_library.interfaces.sqs_interface import SQSInterface +from datalake_library.interfaces.states_interface import StatesInterface + +logger = init_logger(__name__) +team = os.environ["TEAM"] +dataset = os.environ["DATASET"] +pipeline = os.environ["PIPELINE"] +pipeline_stage = os.environ["PIPELINE_STAGE"] +org = os.environ["ORG"] +domain = os.environ["DOMAIN"] +env = os.environ["ENV"] + + +def pipeline_start(octagon_client, event): + peh_id = octagon_client.start_pipeline_execution( + pipeline_name=f"{team}-{pipeline}-{pipeline_stage}", + dataset_name=f"{team}-{dataset}", + comment=event, # TODO test maximum size + ) + logger.info(f"peh_id: {peh_id}") + return peh_id + + +# sdlf-stage-* stages supports three types of trigger: +# event: run stage when an event received on the team's event bus matches the configured event pattern +# event-schedule: store events received on the team's event bus matching the configured event pattern, then process them on the configured schedule +# schedule: run stage on the configured schedule, without any event as input +def get_source_records(event, dynamo_interface): + records = [] + + if event.get("trigger_type") == "schedule" and "event_pattern" not in event: + logger.info("Stage trigger: schedule") + records.append(event) + elif event.get("trigger_type") == "schedule" and "event_pattern" in event: + logger.info("Stage trigger: event-schedule") + pipeline_info = dynamo_interface.get_pipelines_table_item(f"{team}-{pipeline}-{pipeline_stage}") + min_items_to_process = 1 + max_items_to_process = 100 + logger.info(f"Pipeline is {pipeline}, stage is {pipeline_stage}") + logger.info(f"Details from DynamoDB: {pipeline_info.get('pipeline', {})}") + min_items_to_process = pipeline_info["pipeline"].get("min_items_process", min_items_to_process) + max_items_to_process = pipeline_info["pipeline"].get("max_items_process", max_items_to_process) + + sqs_config = SQSConfiguration(team, pipeline, pipeline_stage) + queue_interface = SQSInterface(sqs_config.get_stage_queue_name) + logger.info(f"Querying {team}-{pipeline}-{pipeline_stage} objects waiting for processing") + messages = queue_interface.receive_min_max_messages(min_items_to_process, max_items_to_process) + logger.info(f"{len(messages)} Objects ready for processing") + + for record in messages: + records.append(json.loads(record)) + elif "Records" in event: + logger.info("Stage trigger: event") + for record in event["Records"]: + records.append(json.loads(record["body"])) + else: + raise Exception("Unable to ascertain trigger type (schedule, event-schedule or event)") + + return records + + +def enrich_records(records, metadata): + enriched_records = [] + for record in records: + enriched_record = { + **record, + **metadata, + } + enriched_records.append(enriched_record) + + return enriched_records + + +def get_transform_details(dynamo_interface): + transform_info = dynamo_interface.get_transform_table_item(f"{team}-{dataset}") + ecsfargate_cluster = "" + ecsfargate_arn = "" + logger.info(f"Pipeline is {pipeline}, stage is {pipeline_stage}") + if pipeline in transform_info.get("pipeline", {}): + if pipeline_stage in transform_info["pipeline"][pipeline]: + logger.info(f"Details from DynamoDB: {transform_info['pipeline'][pipeline][pipeline_stage]}") + ecsfargate_cluster = transform_info["pipeline"][pipeline][pipeline_stage].get( + "ecsfargate_cluster", ecsfargate_cluster + ) + ecsfargate_arn = transform_info["pipeline"][pipeline][pipeline_stage].get("ecsfargate_arn", ecsfargate_arn) + + return ecsfargate_cluster, ecsfargate_arn + + +def lambda_handler(event, context): + try: + octagon_client = octagon.OctagonClient().with_run_lambda(True).with_configuration_instance(env).build() + dynamo_config = DynamoConfiguration() + dynamo_interface = DynamoInterface(dynamo_config) + peh_id = pipeline_start(octagon_client, event) + records = get_source_records(event, dynamo_interface) + ecsfargate_cluster, ecsfargate_transform = get_transform_details( + dynamo_interface + ) # allow customising the ecsfargate task transform and cluster through sdlf-dataset's pPipelineDetails + metadata = dict(peh_id=peh_id, ecsfargate_transform=ecsfargate_transform, ecsfargate_cluster=ecsfargate_cluster) + records = enrich_records(records, metadata) + + if records: + if records[0].get("trigger_type"): + logger.info("Starting State Machine Execution (scheduled run without source events)") + else: + logger.info(f"Starting State Machine Execution (processing {len(records)} source events)") + state_config = StateMachineConfiguration(team, pipeline, pipeline_stage) + StatesInterface().run_state_machine(state_config.get_stage_state_machine_arn, json.dumps(records)) + octagon_client.update_pipeline_execution( + status=f"{pipeline_stage} Transform Processing", component="Transform" + ) + else: + logger.info("Nothing to process, exiting pipeline") + octagon_client.end_pipeline_execution_success() + + except Exception as e: + logger.error("Fatal error", exc_info=True) + component = context.function_name.split("-")[-2].title() + octagon_client.end_pipeline_execution_failed( + component=component, + issue_comment=f"{pipeline_stage} {component} Error: {repr(e)}", + ) + raise e diff --git a/sdlf-stage-ecsfargate/state-machine/stage-ecsfargate.asl.json b/sdlf-stage-ecsfargate/state-machine/stage-ecsfargate.asl.json new file mode 100644 index 00000000..43f5757d --- /dev/null +++ b/sdlf-stage-ecsfargate/state-machine/stage-ecsfargate.asl.json @@ -0,0 +1,112 @@ +{ + "Comment": "Simple ECS Fargate-based transform", + "StartAt": "Try", + "States": { + "Try": { + "Type": "Parallel", + "Branches": [ + { + "StartAt": "Pass", + "States": { + "Pass": { + "Type": "Pass", + "Next": "Records", + "Parameters": { + "Items.$": "States.StringToJson($)" + } + }, + "Records": { + "Type": "Map", + "ItemProcessor": { + "ProcessorConfig": { + "Mode": "DISTRIBUTED", + "ExecutionType": "STANDARD" + }, + "StartAt": "Execute ECS Fargate Transformation", + "States": { + "Execute ECS Fargate Transformation": { + "Type": "Task", + "Resource": "arn:aws:states:::ecs:runTask.sync", + "Parameters": { + "LaunchType": "FARGATE", + "Cluster": "$.Items[0].ecsfargate_cluster", + "TaskDefinition": "$.Items[0].ecsfargate_transform" + }, + "End": true + } + } + }, + "Next": "Post-update Catalog", + "Label": "Records", + "MaxConcurrency": 50, + "ToleratedFailurePercentage": 100, + "ItemBatcher": { + "MaxItemsPerBatch": 1 + }, + "InputPath": "$.Items" + }, + "Post-update Catalog": { + "Type": "Task", + "Resource": "arn:aws:states:::lambda:invoke", + "ResultPath": null, + "Parameters": { + "Payload.$": "$", + "FunctionName": "${lStep3}:$LATEST" + }, + "Retry": [ + { + "ErrorEquals": [ + "Lambda.ServiceException", + "Lambda.AWSLambdaException", + "Lambda.SdkClientException", + "Lambda.TooManyRequestsException" + ], + "IntervalSeconds": 2, + "MaxAttempts": 6, + "BackoffRate": 2 + } + ], + "End": true + } + } + } + ], + "End": true, + "Catch": [ + { + "ErrorEquals": [ + "States.ALL" + ], + "ResultPath": null, + "Next": "Error" + } + ] + }, + "Error": { + "Type": "Task", + "Resource": "arn:aws:states:::lambda:invoke", + "OutputPath": "$.Payload", + "Parameters": { + "Payload.$": "$", + "FunctionName": "${lError}:$LATEST" + }, + "Retry": [ + { + "ErrorEquals": [ + "Lambda.ServiceException", + "Lambda.AWSLambdaException", + "Lambda.SdkClientException", + "Lambda.TooManyRequestsException" + ], + "IntervalSeconds": 2, + "MaxAttempts": 6, + "BackoffRate": 2 + } + ], + "Next": "Fail" + }, + "Fail": { + "Type": "Fail" + } + } +} \ No newline at end of file diff --git a/sdlf-stage-ecsfargate/template.yaml b/sdlf-stage-ecsfargate/template.yaml new file mode 100644 index 00000000..7c4d0636 --- /dev/null +++ b/sdlf-stage-ecsfargate/template.yaml @@ -0,0 +1,651 @@ +AWSTemplateFormatVersion: "2010-09-09" +Transform: AWS::Serverless-2016-10-31 +Description: Simple ECS Fargate-based Transform + +Parameters: + pPipelineReference: + Description: Workaround for CloudFormation resolve:ssm not updating on stack update (https://github.com/aws-cloudformation/cloudformation-coverage-roadmap/issues/844) + Type: String + Default: none + pOrg: + Description: Name of the organization owning the datalake + Type: String + Default: "{{resolve:ssm:/SDLF/Misc/pOrg}}" + pDomain: + Description: Data domain name + Type: String + Default: "{{resolve:ssm:/SDLF/Misc/pDomain}}" + pDatasetBucket: + Description: The raw or central bucket for the solution + Type: String + Default: "{{resolve:ssm:/SDLF/S3/RawBucket}}" + pStageBucket: + Description: The stage bucket for the solution + Type: String + Default: "{{resolve:ssm:/SDLF/S3/StageBucket}}" + pTeamName: + Description: Name of the team owning the pipeline (all lowercase, no symbols or spaces) + Type: String + AllowedPattern: "[a-z0-9]*" + pDataset: + Description: The name of the dataset (all lowercase, no symbols or spaces) + Type: String + AllowedPattern: "[a-z0-9]{2,14}" + Default: legislators + pPipeline: + Description: The name of the pipeline (all lowercase, no symbols or spaces) + Type: String + AllowedPattern: "[a-z0-9]*" + pEnv: + Description: Environment name + Type: String + Default: "{{resolve:ssm:/SDLF/Misc/pEnv}}" + pStageName: + Description: Name of the stage (all lowercase, hyphen allowed, no other symbols or spaces) + Type: String + AllowedPattern: "[a-zA-Z0-9\\-]{1,12}" + pStageEnabled: + Description: Whether the stage is enabled or not + Type: String + Default: true + AllowedValues: [true, false] + pTriggerType: + Description: Trigger type of the stage (event or schedule) + Type: String + Default: event + AllowedValues: [event, schedule] + pSchedule: + Description: Cron expression when trigger type is schedule + Type: String + Default: "cron(*/5 * * * ? *)" + pEventPattern: + Description: Event pattern to match from previous stage + Type: String + Default: "" + pElasticSearchEnabled: + Description: Boolean for wether ElasticSearch is enabled + Type: AWS::SSM::Parameter::Value + Default: /SDLF/ElasticSearch/Enabled + pKibanaStreamRole: + Description: ARN of the role used to forward logs to Kinesis + Type: AWS::SSM::Parameter::Value + Default: /SDLF/Lambda/KibanaStreamRoleArn + pKibanaStream: + Description: ARN of the Kinesis stream that collates logs + Type: AWS::SSM::Parameter::Value + Default: /SDLF/Lambda/KibanaStreamArn + pCloudWatchLogsRetentionInDays: + Description: The number of days log events are kept in CloudWatch Logs + Type: Number + Default: 30 + AllowedValues: + [ + 1, + 3, + 5, + 7, + 14, + 30, + 60, + 90, + 120, + 150, + 180, + 365, + 400, + 545, + 731, + 1827, + 3653, + ] + pEnableTracing: + Description: Flag for whether XRay tracing is enabled + Type: String + # the ideal would be to fetch ssm:/SDLF/VPC/Enabled and not ask the user to set this variable to true manually. + # however between AWS::SSM::Parameter::Value not working in CloudFormation modules, + # Fn::ImportValue not being accepted in CloudFormation modules template fragments, + # {{resolve:}} being evaluated later than the Conditions block, options are limited. + pEnableVpc: + Description: Deploy SDLF resources in a VPC + Type: String + Default: false + # pVpcSecurityGroupIds and pVpcSubnetIds are passed explicitly (unlike in sdlf-cicd/template-cicd-sdlf-repositories.yaml for example) + # due to Fn::ImportValue not being accepted in CloudFormation modules template fragments + pVpcSecurityGroupIds: + Description: VPC Security Groups Ids + Type: String + Default: "" + pVpcSubnetIds: + Description: VPC Subnet Ids + Type: String + Default: "" + +Conditions: + DeployElasticSearch: !Equals [!Ref pElasticSearchEnabled, "true"] + EnableTracing: !Equals [!Ref pEnableTracing, "true"] + RunInVpc: !Equals [!Ref pEnableVpc, true] + +Globals: + Function: + Runtime: python3.12 + Handler: lambda_function.lambda_handler + Layers: + - "{{resolve:ssm:/SDLF/Lambda/LatestDatalakeLibraryLayer}}" + Environment: + Variables: + TEAM: !Ref pTeamName + DATASET: !Ref pDataset + PIPELINE: !Ref pPipeline + PIPELINE_STAGE: !Ref pStageName + ORG: !Ref pOrg + DOMAIN: !Ref pDomain + ENV: !Ref pEnv + KmsKeyArn: !Sub "{{resolve:ssm:/SDLF/KMS/${pTeamName}/InfraKeyId}}" + VpcConfig: !If + - RunInVpc + - SecurityGroupIds: !Split [",", !Ref pVpcSecurityGroupIds] + SubnetIds: !Split [",", !Ref pVpcSubnetIds] + - !Ref "AWS::NoValue" + + +Resources: + rPipelineInterface: + Type: awslabs::sdlf::pipeline::MODULE + Properties: + pPipelineReference: !Ref pPipelineReference + pOrg: !Ref pOrg + pDomain: !Ref pDomain + pEnv: !Ref pEnv + pTeamName: !Ref pTeamName + pPipelineName: !Ref pPipeline + pStageName: !Ref pStageName + pStageEnabled: !Ref pStageEnabled + pTriggerType: !Ref pTriggerType + pSchedule: !Ref pSchedule + pEventPattern: !Ref pEventPattern + pLambdaRoutingStep: !GetAtt rLambdaRoutingStep.Arn + + ######## IAM ######### + rLambdaCommonPolicy: + Type: AWS::IAM::ManagedPolicy + Properties: + Path: !Sub /sdlf-${pTeamName}/ + PolicyDocument: + Version: "2012-10-17" + Statement: + - Effect: Allow + Action: + - logs:CreateLogGroup + - logs:CreateLogStream + - logs:PutLogEvents + Resource: + - !Sub arn:${AWS::Partition}:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/lambda/sdlf-${pTeamName}-${pPipeline}-* + - Effect: Allow + Action: + - ssm:GetParameter + - ssm:GetParameters + Resource: !Sub arn:${AWS::Partition}:ssm:${AWS::Region}:${AWS::AccountId}:parameter/SDLF/* + - Effect: Allow + Action: + - dynamodb:BatchGetItem + - dynamodb:BatchWriteItem + - dynamodb:DeleteItem + - dynamodb:DescribeTable + - dynamodb:GetItem + - dynamodb:GetRecords + - dynamodb:PutItem + - dynamodb:Query + - dynamodb:Scan + - dynamodb:UpdateItem + Resource: + - !Sub arn:${AWS::Partition}:dynamodb:${AWS::Region}:${AWS::AccountId}:table/octagon-* + - Effect: Allow + Action: + - kms:CreateGrant + - kms:Decrypt + - kms:DescribeKey + - kms:Encrypt + - kms:GenerateDataKey* + - kms:ReEncrypt* + Resource: + - !Sub "{{resolve:ssm:/SDLF/KMS/${pTeamName}/InfraKeyId}}" + + # Routing Role + rRoleLambdaExecutionRoutingStep: + Type: AWS::IAM::Role + Properties: + Path: !Sub /sdlf-${pTeamName}/ + PermissionsBoundary: !Sub "{{resolve:ssm:/SDLF/IAM/${pTeamName}/TeamPermissionsBoundary}}" + ManagedPolicyArns: + - !Ref rLambdaCommonPolicy + - !If + - RunInVpc + - !Sub arn:${AWS::Partition}:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole + - !Ref "AWS::NoValue" + AssumeRolePolicyDocument: + Version: "2012-10-17" + Statement: + - Effect: Allow + Principal: + Service: lambda.amazonaws.com + Action: sts:AssumeRole + Policies: + - PolicyName: !Sub sdlf-${pTeamName}-${pPipeline}-routing-${pStageName} + PolicyDocument: + Version: "2012-10-17" + Statement: + - Effect: Allow + Action: + - states:StartExecution + Resource: + - !Ref rStateMachine + - Effect: Allow + Action: + - sqs:DeleteMessage + - sqs:GetQueueAttributes + - sqs:GetQueueUrl + - sqs:ListQueues + - sqs:ListDeadLetterSourceQueues + - sqs:ListQueueTags + - sqs:ReceiveMessage + - sqs:SendMessage + Resource: + - !Sub arn:${AWS::Partition}:sqs:${AWS::Region}:${AWS::AccountId}:sdlf-${pTeamName}-${pPipeline}-queue-* + - !Sub arn:${AWS::Partition}:sqs:${AWS::Region}:${AWS::AccountId}:sdlf-${pTeamName}-${pPipeline}-dlq-* + + # Metadata Step Role (fetch metadata, update pipeline execution history...) + rRoleLambdaExecutionMetadataStep: + Type: AWS::IAM::Role + Properties: + Path: !Sub /sdlf-${pTeamName}/ + PermissionsBoundary: !Sub "{{resolve:ssm:/SDLF/IAM/${pTeamName}/TeamPermissionsBoundary}}" + ManagedPolicyArns: + - !Ref rLambdaCommonPolicy + - !If + - RunInVpc + - !Sub arn:${AWS::Partition}:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole + - !Ref "AWS::NoValue" + AssumeRolePolicyDocument: + Version: "2012-10-17" + Statement: + - Effect: Allow + Principal: + Service: lambda.amazonaws.com + Action: sts:AssumeRole + Policies: + - PolicyName: !Sub sdlf-${pTeamName}-${pPipeline}-metadata-${pStageName} + PolicyDocument: + Version: "2012-10-17" + Statement: + - Effect: Allow + Action: + - s3:ListBucket + Resource: + - !Sub arn:${AWS::Partition}:s3:::${pStageBucket} + - Effect: Allow + Action: + - s3:GetObject + Resource: + - !Sub arn:${AWS::Partition}:s3:::${pStageBucket}/${pTeamName}/* + - !Sub arn:${AWS::Partition}:s3:::${pStageBucket}/pre-stage/${pTeamName}/* # DEPRECATED, use {team}/{dataset}/{pipeline_stage}/* to store files instead + - !Sub arn:${AWS::Partition}:s3:::${pStageBucket}/post-stage/${pTeamName}/* # DEPRECATED, use {team}/{dataset}/{pipeline_stage}/* to store files instead + + # Processing Role + rRoleLambdaExecutionProcessingStep: + Type: AWS::IAM::Role + Properties: + Path: !Sub /sdlf-${pTeamName}/ + PermissionsBoundary: !Sub "{{resolve:ssm:/SDLF/IAM/${pTeamName}/TeamPermissionsBoundary}}" + ManagedPolicyArns: + - !Ref rLambdaCommonPolicy + - !If + - RunInVpc + - !Sub arn:${AWS::Partition}:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole + - !Ref "AWS::NoValue" + AssumeRolePolicyDocument: + Version: "2012-10-17" + Statement: + - Effect: Allow + Principal: + Service: lambda.amazonaws.com + Action: sts:AssumeRole + Policies: + - PolicyName: !Sub sdlf-${pTeamName}-${pPipeline}-process-${pStageName} + PolicyDocument: + Version: "2012-10-17" + Statement: + - Effect: Allow + Action: + - s3:GetBucketVersioning + - s3:ListBucket + Resource: + - !Sub arn:${AWS::Partition}:s3:::${pDatasetBucket} + - !Sub arn:${AWS::Partition}:s3:::${pStageBucket} + - Effect: Allow + Action: + - s3:GetObject + - s3:PutObject + Resource: + - !Sub arn:${AWS::Partition}:s3:::${pDatasetBucket}/${pTeamName}/* + - !Sub arn:${AWS::Partition}:s3:::${pStageBucket}/${pTeamName}/* + - !Sub arn:${AWS::Partition}:s3:::${pStageBucket}/pre-stage/${pTeamName}/* # DEPRECATED, use {team}/{dataset}/{pipeline_stage}/* to store files instead + - !Sub arn:${AWS::Partition}:s3:::${pStageBucket}/post-stage/${pTeamName}/* # DEPRECATED, use {team}/{dataset}/{pipeline_stage}/* to store files instead + - Effect: Allow + Action: + - kms:DescribeKey + - kms:Encrypt + - kms:Decrypt + - kms:ReEncrypt* + - kms:GenerateDataKey* + - kms:CreateGrant + Resource: + - !Sub "{{resolve:ssm:/SDLF/KMS/${pTeamName}/DataKeyId}}" + + # Error Handling Lambda Role + rRoleLambdaExecutionErrorStep: + Type: AWS::IAM::Role + Properties: + Path: !Sub /sdlf-${pTeamName}/ + PermissionsBoundary: !Sub "{{resolve:ssm:/SDLF/IAM/${pTeamName}/TeamPermissionsBoundary}}" + ManagedPolicyArns: + - !Ref rLambdaCommonPolicy + - !If + - RunInVpc + - !Sub arn:${AWS::Partition}:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole + - !Ref "AWS::NoValue" + AssumeRolePolicyDocument: + Version: "2012-10-17" + Statement: + - Effect: Allow + Principal: + Service: lambda.amazonaws.com + Action: sts:AssumeRole + Policies: + - PolicyName: !Sub sdlf-${pTeamName}-${pPipeline}-error-${pStageName} + PolicyDocument: + Version: "2012-10-17" + Statement: + - Effect: Allow + Action: + - sqs:DeleteMessage + - sqs:GetQueueAttributes + - sqs:GetQueueUrl + - sqs:ListQueues + - sqs:ListDeadLetterSourceQueues + - sqs:ListQueueTags + - sqs:ReceiveMessage + - sqs:SendMessage + Resource: + - !Sub arn:${AWS::Partition}:sqs:${AWS::Region}:${AWS::AccountId}:sdlf-${pTeamName}-${pPipeline}-dlq-* + + ######## LAMBDA FUNCTIONS ######### + rLambdaRoutingStep: + Type: AWS::Serverless::Function + Metadata: + cfn_nag: + rules_to_suppress: + - id: W58 + reason: Permissions to write CloudWatch Logs are granted by rLambdaCommonPolicy + Properties: + CodeUri: ./lambda/routing/src + FunctionName: !Sub sdlf-${pTeamName}-${pPipeline}-routing-${pStageName} + Description: Routes S3 PutObject Logs to the relevant StageA State Machine + MemorySize: 192 + Timeout: 60 + Role: !GetAtt rRoleLambdaExecutionRoutingStep.Arn + + rLambdaRedriveStep: + Type: AWS::Serverless::Function + Metadata: + cfn_nag: + rules_to_suppress: + - id: W58 + reason: Permissions to write CloudWatch Logs are granted by rLambdaCommonPolicy + Properties: + CodeUri: ./lambda/redrive/src + FunctionName: !Sub sdlf-${pTeamName}-${pPipeline}-redrive-${pStageName} + Description: Redrives Failed S3 PutObject Logs to the routing queue + MemorySize: 192 + Timeout: 300 + Role: !GetAtt rRoleLambdaExecutionRoutingStep.Arn + + rLambdaStep3: + Type: AWS::Serverless::Function + Metadata: + cfn_nag: + rules_to_suppress: + - id: W58 + reason: Permissions to write CloudWatch Logs are granted by rLambdaCommonPolicy + Properties: + CodeUri: ./lambda/postupdate-metadata/src + FunctionName: !Sub sdlf-${pTeamName}-${pPipeline}-postupdate-${pStageName} + Description: Post-Update the metadata in the DynamoDB Catalog table + MemorySize: 192 + Timeout: 300 + Role: !GetAtt rRoleLambdaExecutionMetadataStep.Arn + + rLambdaErrorStep: + Type: AWS::Serverless::Function + Metadata: + cfn_nag: + rules_to_suppress: + - id: W58 + reason: Permissions to write CloudWatch Logs are granted by rLambdaCommonPolicy + Properties: + CodeUri: ./lambda/error/src + FunctionName: !Sub sdlf-${pTeamName}-${pPipeline}-error-${pStageName} + Description: Fallback lambda to handle messages which failed processing + MemorySize: 192 + Timeout: 300 + Role: !GetAtt rRoleLambdaExecutionErrorStep.Arn + + ######## CLOUDWATCH ######### + rLambdaRoutingStepLogGroup: + Type: AWS::Logs::LogGroup + DeletionPolicy: Delete + UpdateReplacePolicy: Delete + Properties: + LogGroupName: !Sub /aws/lambda/${rLambdaRoutingStep} + RetentionInDays: !Ref pCloudWatchLogsRetentionInDays + KmsKeyId: !Sub "{{resolve:ssm:/SDLF/KMS/${pTeamName}/InfraKeyId}}" + + rUpdateSubscriptionFilterRoutingStep: + Type: AWS::Logs::SubscriptionFilter + Condition: DeployElasticSearch + Properties: + LogGroupName: !Ref rLambdaRoutingStepLogGroup + DestinationArn: !Ref pKibanaStream + RoleArn: !Ref pKibanaStreamRole + FilterPattern: "[log_type, log_timestamp, log_id, log_message]" + + rLambdaRoutingStepCloudWatchAlarm: + Type: AWS::CloudWatch::Alarm + Properties: + AlarmDescription: !Sub StageA ${pTeamName} ${pPipeline} Routing Lambda Alarm + AlarmActions: + - !Sub "{{resolve:ssm:/SDLF/SNS/${pTeamName}/Notifications}}" + MetricName: Errors + EvaluationPeriods: 5 + Period: 60 + ComparisonOperator: GreaterThanThreshold + Namespace: AWS/Lambda + Statistic: Sum + Threshold: 5 + Unit: Count + Dimensions: + - Name: FunctionName + Value: !Ref rLambdaRoutingStep + + rLambdaRedriveStepLogGroup: + Type: AWS::Logs::LogGroup + DeletionPolicy: Delete + UpdateReplacePolicy: Delete + Properties: + LogGroupName: !Sub /aws/lambda/${rLambdaRedriveStep} + RetentionInDays: !Ref pCloudWatchLogsRetentionInDays + KmsKeyId: !Sub "{{resolve:ssm:/SDLF/KMS/${pTeamName}/InfraKeyId}}" + + rUpdateSubscriptionFilterRedriveStep: + Type: AWS::Logs::SubscriptionFilter + Condition: DeployElasticSearch + Properties: + LogGroupName: !Ref rLambdaRedriveStepLogGroup + DestinationArn: !Ref pKibanaStream + RoleArn: !Ref pKibanaStreamRole + FilterPattern: "[log_type, log_timestamp, log_id, log_message]" + + rLambdaStep3LogGroup: + Type: AWS::Logs::LogGroup + DeletionPolicy: Delete + UpdateReplacePolicy: Delete + Properties: + LogGroupName: !Sub /aws/lambda/${rLambdaStep3} + RetentionInDays: !Ref pCloudWatchLogsRetentionInDays + KmsKeyId: !Sub "{{resolve:ssm:/SDLF/KMS/${pTeamName}/InfraKeyId}}" + + rUpdateSubscriptionFilterStep3: + Type: AWS::Logs::SubscriptionFilter + Condition: DeployElasticSearch + Properties: + LogGroupName: !Ref rLambdaStep3LogGroup + DestinationArn: !Ref pKibanaStream + RoleArn: !Ref pKibanaStreamRole + FilterPattern: "[log_type, log_timestamp, log_id, log_message]" + + rLambdaErrorStepLogGroup: + Type: AWS::Logs::LogGroup + DeletionPolicy: Delete + UpdateReplacePolicy: Delete + Properties: + LogGroupName: !Sub /aws/lambda/${rLambdaErrorStep} + RetentionInDays: !Ref pCloudWatchLogsRetentionInDays + KmsKeyId: !Sub "{{resolve:ssm:/SDLF/KMS/${pTeamName}/InfraKeyId}}" + + rUpdateSubscriptionFilterErrorStep: + Type: AWS::Logs::SubscriptionFilter + Condition: DeployElasticSearch + Properties: + LogGroupName: !Ref rLambdaErrorStepLogGroup + DestinationArn: !Ref pKibanaStream + RoleArn: !Ref pKibanaStreamRole + FilterPattern: "[log_type, log_timestamp, log_id, log_message]" + + rLambdaErrorStepCloudWatchAlarm: + Type: AWS::CloudWatch::Alarm + Properties: + AlarmDescription: !Sub StageA ${pTeamName} ${pPipeline} Error Lambda Alarm + AlarmActions: + - !Sub "{{resolve:ssm:/SDLF/SNS/${pTeamName}/Notifications}}" + MetricName: Invocations + EvaluationPeriods: 5 + Period: 60 + ComparisonOperator: GreaterThanThreshold + Namespace: AWS/Lambda + Statistic: Sum + Threshold: 5 + Unit: Count + Dimensions: + - Name: FunctionName + Value: !Ref rLambdaErrorStep + + ######## STATE MACHINE ######### + rStatesExecutionRole: + Type: AWS::IAM::Role + Metadata: + cfn_nag: + rules_to_suppress: + - id: W11 + reason: The actions with "*" are all ones that do not have resource limitations associated with them + Properties: + Path: !Sub /sdlf-${pTeamName}/ + PermissionsBoundary: !Sub "{{resolve:ssm:/SDLF/IAM/${pTeamName}/TeamPermissionsBoundary}}" + AssumeRolePolicyDocument: + Version: "2012-10-17" + Statement: + - Effect: Allow + Principal: + Service: + - !Sub states.${AWS::Region}.amazonaws.com + Action: sts:AssumeRole + Condition: + StringEquals: + "aws:SourceAccount": !Sub ${AWS::AccountId} + Policies: + - PolicyName: !Sub sdlf-${pTeamName}-states-execution + PolicyDocument: + Version: "2012-10-17" + Statement: + - Effect: Allow + Action: + - lambda:InvokeFunction + Resource: !Sub arn:${AWS::Partition}:lambda:${AWS::Region}:${AWS::AccountId}:function:sdlf-${pTeamName}-* + - Effect: Allow + Action: + - ecs:RunTask # W11 TaskId is not known until the task is submitted + - ecs:StopTask # W11 TaskId is not known until the task is submitted + - ecs:DescribeTasks # W11 TaskId is not known until the task is submitted + Resource: "*" + - Effect: Allow + Action: + - events:PutTargets + - events:PutRule + - events:DescribeRule + Resource: !Sub arn:${AWS::Partition}:events:${AWS::Region}:${AWS::AccountId}:rule/StepFunctionsGetEventsForECSTaskRule + - Effect: Allow + Action: + - xray:PutTraceSegments # W11 exception + - xray:PutTelemetryRecords # W11 exception + - xray:GetSamplingRules # W11 exception + - xray:GetSamplingTargets # W11 exception + Resource: "*" + + rStatesExecutionRoleMap: + Type: AWS::IAM::RolePolicy + Properties: + PolicyName: sfn-map + PolicyDocument: + Version: "2012-10-17" + Statement: + - Effect: Allow + Action: + - states:StartExecution + - states:DescribeExecution + - states:StopExecution + Resource: + - !Ref rStateMachine + - !Sub arn:${AWS::Partition}:states:${AWS::Region}:${AWS::AccountId}:execution:${rStateMachine.Name}:* + RoleName: !Ref rStatesExecutionRole + + rStateMachine: + Type: AWS::Serverless::StateMachine + Properties: + Name: !Sub sdlf-${pTeamName}-${pPipeline}-sm-${pStageName} + DefinitionUri: ./state-machine/stage-lambda.asl.json + DefinitionSubstitutions: + lStep3: !GetAtt rLambdaStep3.Arn + lError: !GetAtt rLambdaErrorStep.Arn + Role: !GetAtt rStatesExecutionRole.Arn + Tracing: + Enabled: !If [EnableTracing, true, false] + + ######## SSM OUTPUTS ######### + rRoutingLambdaSsm: + Type: AWS::SSM::Parameter + Properties: + Name: !Sub /SDLF/Lambda/${pTeamName}/${pPipeline}${pStageName}RoutingLambda + Type: String + Value: !GetAtt rLambdaRoutingStep.Arn + Description: !Sub "ARN of the ${pStageName} ${pTeamName} ${pPipeline} Routing Lambda" + + rStateMachineSsm: + Type: AWS::SSM::Parameter + Properties: + Name: !Sub /SDLF/SM/${pTeamName}/${pPipeline}${pStageName}SM + Type: String + Value: !Ref rStateMachine + Description: !Sub "ARN of the ${pStageName} ${pTeamName} ${pPipeline} State Machine" + +Outputs: + oPipelineReference: + Description: CodePipeline reference this stack has been deployed with + Value: !Ref pPipelineReference