Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(lambda-event-sources): does not support setting maxRecordAge to infinity #27532

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@
"FunctionName": {
"Ref": "FC4345940"
},
"MaximumRecordAgeInSeconds": -1,
"StartingPosition": "TRIM_HORIZON",
"TumblingWindowInSeconds": 60
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class KinesisEventSourceTest extends cdk.Stack {
const eventSource = new KinesisEventSource(stream, {
startingPosition: lambda.StartingPosition.TRIM_HORIZON,
tumblingWindow: cdk.Duration.seconds(60),
maxRecordAge: -1,
});

fn.addEventSource(eventSource);
Expand Down
5 changes: 4 additions & 1 deletion packages/aws-cdk-lib/aws-lambda-event-sources/lib/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,16 @@ export interface StreamEventSourceProps extends BaseStreamEventSourceProps {

/**
* The maximum age of a record that Lambda sends to a function for processing.
*
* Valid Range:
* * Minimum value of 60 seconds
* * Maximum value of 7 days
*
* Set to -1 if you want the Lambda function to never discard old records.
*
* @default - the retention period configured on the stream
*/
readonly maxRecordAge?: Duration;
readonly maxRecordAge?: Duration | -1;

/**
* Maximum number of retry attempts
Expand Down
19 changes: 19 additions & 0 deletions packages/aws-cdk-lib/aws-lambda-event-sources/test/kinesis.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -307,4 +307,23 @@ describe('KinesisEventSource', () => {
StartingPositionTimestamp: 1640995200,
});
});

test('infinite max record age', () => {
// GIVEN
const stack = new cdk.Stack();
const fn = new TestFunction(stack, 'Fn');
const stream = new kinesis.Stream(stack, 'S');
const eventSource = new sources.KinesisEventSource(stream, {
maxRecordAge: -1,
startingPosition: lambda.StartingPosition.TRIM_HORIZON,
});

// WHEN
fn.addEventSource(eventSource);

// THEN
Template.fromStack(stack).hasResourceProperties('AWS::Lambda::EventSourceMapping', {
MaximumRecordAgeInSeconds: -1,
});
});
});
14 changes: 11 additions & 3 deletions packages/aws-cdk-lib/aws-lambda/lib/event-source-mapping.ts
Original file line number Diff line number Diff line change
Expand Up @@ -168,13 +168,17 @@ export interface EventSourceMappingOptions {

/**
* The maximum age of a record that Lambda sends to a function for processing.
*
* Valid Range:
* * Minimum value of 60 seconds
* * Maximum value of 7 days
*
* The default value is -1, which sets the maximum age to infinite.
* When the value is set to infinite, Lambda never discards old records.
*
* @default - infinite or until the record expires.
*/
readonly maxRecordAge?: cdk.Duration;
readonly maxRecordAge?: cdk.Duration | -1;

/**
* The maximum number of times to retry when the function returns an error.
Expand Down Expand Up @@ -342,7 +346,11 @@ export class EventSourceMapping extends cdk.Resource implements IEventSourceMapp
throw new Error('maxConcurrency must be between 2 and 1000 concurrent instances');
}

if (props.maxRecordAge && (props.maxRecordAge.toSeconds() < 60 || props.maxRecordAge.toDays({ integral: false }) > 7)) {
if (
props.maxRecordAge &&
props.maxRecordAge !== -1 &&
(props.maxRecordAge.toSeconds() < 60 || props.maxRecordAge.toDays({ integral: false }) > 7)
) {
throw new Error('maxRecordAge must be between 60 seconds and 7 days inclusive');
}

Expand Down Expand Up @@ -400,7 +408,7 @@ export class EventSourceMapping extends cdk.Resource implements IEventSourceMapp
startingPositionTimestamp: props.startingPositionTimestamp,
functionResponseTypes: props.reportBatchItemFailures ? ['ReportBatchItemFailures'] : undefined,
maximumBatchingWindowInSeconds: props.maxBatchingWindow?.toSeconds(),
maximumRecordAgeInSeconds: props.maxRecordAge?.toSeconds(),
maximumRecordAgeInSeconds: props.maxRecordAge === -1 ? props.maxRecordAge : props.maxRecordAge?.toSeconds(),
maximumRetryAttempts: props.retryAttempts,
parallelizationFactor: props.parallelizationFactor,
topics: props.kafkaTopic !== undefined ? [props.kafkaTopic] : undefined,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Match, Template } from '../../assertions';
import * as cdk from '../../core';
import * as lambda from '../lib';
import { Code, EventSourceMapping, Function, Runtime, Alias, StartingPosition, FilterRule, FilterCriteria } from '../lib';
import { Code, EventSourceMapping, Function, Alias, StartingPosition, FilterRule, FilterCriteria } from '../lib';

let stack: cdk.Stack;
let fn: Function;
Expand Down Expand Up @@ -415,4 +415,16 @@ describe('event source mapping', () => {
startingPositionTimestamp: 1640995200,
})).toThrow(/startingPositionTimestamp can only be used when startingPosition is AT_TIMESTAMP/);
});

test('infinite max record age', () => {
new EventSourceMapping(stack, 'test', {
target: fn,
eventSourceArn: '',
maxRecordAge: -1,
});

Template.fromStack(stack).hasResourceProperties('AWS::Lambda::EventSourceMapping', {
MaximumRecordAgeInSeconds: -1,
});
});
});