Skip to content

Commit

Permalink
feat(kinesisfirehose): supports Kinesis data stream source for delive…
Browse files Browse the repository at this point in the history
…ry stream (aws#15836)

closes aws#15500 
closes aws#10783 

----

*By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license*
  • Loading branch information
BenChaimberg authored and david-doyle-as24 committed Sep 7, 2021
1 parent d431894 commit 4975593
Show file tree
Hide file tree
Showing 6 changed files with 434 additions and 2 deletions.
18 changes: 17 additions & 1 deletion packages/@aws-cdk/aws-kinesisfirehose/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,27 @@ The above example defines the following resources:
## Sources

There are two main methods of sourcing input data: Kinesis Data Streams and via a "direct
put". This construct library currently only supports "direct put". See [#15500](https://github.com/aws/aws-cdk/issues/15500) to track the status of adding support for Kinesis Data Streams.
put".

See: [Sending Data to a Delivery Stream](https://docs.aws.amazon.com/firehose/latest/dev/basic-write.html)
in the *Kinesis Data Firehose Developer Guide*.

### Kinesis Data Stream

A delivery stream can read directly from a Kinesis data stream as a consumer of the data
stream. Configure this behaviour by providing a data stream in the `sourceStream`
property when constructing a delivery stream:

```ts fixture=with-destination
import * as kinesis from '@aws-cdk/aws-kinesis';

const sourceStream = new kinesis.Stream(this, 'Source Stream');
new DeliveryStream(this, 'Delivery Stream', {
sourceStream: sourceStream,
destinations: [destination],
});
```

### Direct Put

Data must be provided via "direct put", ie., by using a `PutRecord` or `PutRecordBatch` API call. There are a number of ways of doing
Expand Down
34 changes: 33 additions & 1 deletion packages/@aws-cdk/aws-kinesisfirehose/lib/delivery-stream.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import * as cloudwatch from '@aws-cdk/aws-cloudwatch';
import * as ec2 from '@aws-cdk/aws-ec2';
import * as iam from '@aws-cdk/aws-iam';
import * as kinesis from '@aws-cdk/aws-kinesis';
import * as kms from '@aws-cdk/aws-kms';
import * as cdk from '@aws-cdk/core';
import { RegionInfo } from '@aws-cdk/region-info';
Expand Down Expand Up @@ -196,6 +197,13 @@ export interface DeliveryStreamProps {
*/
readonly deliveryStreamName?: string;

/**
* The Kinesis data stream to use as a source for this delivery stream.
*
* @default - data must be written to the delivery stream via a direct put.
*/
readonly sourceStream?: kinesis.IStream;

/**
* The IAM role associated with this delivery stream.
*
Expand Down Expand Up @@ -319,6 +327,12 @@ export class DeliveryStream extends DeliveryStreamBase {
});
this.grantPrincipal = role;

if (
props.sourceStream &&
(props.encryption === StreamEncryption.AWS_OWNED || props.encryption === StreamEncryption.CUSTOMER_MANAGED || props.encryptionKey)
) {
throw new Error('Requested server-side encryption but delivery stream source is a Kinesis data stream. Specify server-side encryption on the data stream instead.');
}
if ((props.encryption === StreamEncryption.AWS_OWNED || props.encryption === StreamEncryption.UNENCRYPTED) && props.encryptionKey) {
throw new Error(`Specified stream encryption as ${StreamEncryption[props.encryption]} but provided a customer-managed key`);
}
Expand All @@ -339,15 +353,33 @@ export class DeliveryStream extends DeliveryStreamBase {
*/
encryptionKey?.grantEncryptDecrypt(role);

const sourceStreamConfig = props.sourceStream ? {
kinesisStreamArn: props.sourceStream.streamArn,
roleArn: role.roleArn,
} : undefined;
const readStreamGrant = props.sourceStream?.grantRead(role);
/*
* Firehose still uses the deprecated DescribeStream API instead of the modern DescribeStreamSummary API.
* kinesis.IStream.grantRead does not provide DescribeStream permissions so we add it manually here.
*/
if (readStreamGrant && readStreamGrant.principalStatement) {
readStreamGrant.principalStatement.addActions('kinesis:DescribeStream');
}

const destinationConfig = props.destinations[0].bind(this, {});

const resource = new CfnDeliveryStream(this, 'Resource', {
deliveryStreamEncryptionConfigurationInput: encryptionConfig,
deliveryStreamName: props.deliveryStreamName,
deliveryStreamType: 'DirectPut',
deliveryStreamType: props.sourceStream ? 'KinesisStreamAsSource' : 'DirectPut',
kinesisStreamSourceConfiguration: sourceStreamConfig,
...destinationConfig,
});

destinationConfig.dependables?.forEach(dependable => resource.node.addDependency(dependable));
if (readStreamGrant) {
resource.node.addDependency(readStreamGrant);
}

this.deliveryStreamArn = this.getResourceArnAttribute(resource.attrArn, {
service: 'kinesis',
Expand Down
2 changes: 2 additions & 0 deletions packages/@aws-cdk/aws-kinesisfirehose/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
"@aws-cdk/aws-cloudwatch": "0.0.0",
"@aws-cdk/aws-ec2": "0.0.0",
"@aws-cdk/aws-iam": "0.0.0",
"@aws-cdk/aws-kinesis": "0.0.0",
"@aws-cdk/aws-kms": "0.0.0",
"@aws-cdk/aws-lambda": "0.0.0",
"@aws-cdk/aws-logs": "0.0.0",
Expand All @@ -98,6 +99,7 @@
"@aws-cdk/aws-cloudwatch": "0.0.0",
"@aws-cdk/aws-ec2": "0.0.0",
"@aws-cdk/aws-iam": "0.0.0",
"@aws-cdk/aws-kinesis": "0.0.0",
"@aws-cdk/aws-kms": "0.0.0",
"@aws-cdk/aws-lambda": "0.0.0",
"@aws-cdk/aws-logs": "0.0.0",
Expand Down
58 changes: 58 additions & 0 deletions packages/@aws-cdk/aws-kinesisfirehose/test/delivery-stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { ABSENT, ResourcePart, SynthUtils, anything, arrayWith } from '@aws-cdk/
import * as cloudwatch from '@aws-cdk/aws-cloudwatch';
import * as ec2 from '@aws-cdk/aws-ec2';
import * as iam from '@aws-cdk/aws-iam';
import * as kinesis from '@aws-cdk/aws-kinesis';
import * as kms from '@aws-cdk/aws-kms';
import * as cdk from '@aws-cdk/core';
import { Construct, Node } from 'constructs';
Expand Down Expand Up @@ -89,6 +90,43 @@ describe('delivery stream', () => {
});
});

test('providing source stream creates configuration and grants permission', () => {
const sourceStream = new kinesis.Stream(stack, 'Source Stream');

new firehose.DeliveryStream(stack, 'Delivery Stream', {
destinations: [mockS3Destination],
sourceStream: sourceStream,
role: deliveryStreamRole,
});

expect(stack).toHaveResourceLike('AWS::IAM::Policy', {
PolicyDocument: {
Statement: [
{
Action: arrayWith(
'kinesis:DescribeStream',
'kinesis:GetRecords',
'kinesis:GetShardIterator',
'kinesis:ListShards',
),
Resource: stack.resolve(sourceStream.streamArn),
},
],
},
Roles: [stack.resolve(deliveryStreamRole.roleName)],
});
expect(stack).toHaveResource('AWS::KinesisFirehose::DeliveryStream', {
DeliveryStreamType: 'KinesisStreamAsSource',
KinesisStreamSourceConfiguration: {
KinesisStreamARN: stack.resolve(sourceStream.streamArn),
RoleARN: stack.resolve(deliveryStreamRole.roleArn),
},
});
expect(stack).toHaveResourceLike('AWS::KinesisFirehose::DeliveryStream', {
DependsOn: arrayWith('DeliveryStreamRoleDefaultPolicy2759968B'),
}, ResourcePart.CompleteDefinition);
});

test('requesting customer-owned encryption creates key and configuration', () => {
new firehose.DeliveryStream(stack, 'Delivery Stream', {
destinations: [mockS3Destination],
Expand Down Expand Up @@ -232,6 +270,26 @@ describe('delivery stream', () => {
})).toThrowError('Specified stream encryption as UNENCRYPTED but provided a customer-managed key');
});

test('requesting encryption or providing a key when source is a stream throws an error', () => {
const sourceStream = new kinesis.Stream(stack, 'Source Stream');

expect(() => new firehose.DeliveryStream(stack, 'Delivery Stream 1', {
destinations: [mockS3Destination],
encryption: firehose.StreamEncryption.AWS_OWNED,
sourceStream,
})).toThrowError('Requested server-side encryption but delivery stream source is a Kinesis data stream. Specify server-side encryption on the data stream instead.');
expect(() => new firehose.DeliveryStream(stack, 'Delivery Stream 2', {
destinations: [mockS3Destination],
encryption: firehose.StreamEncryption.CUSTOMER_MANAGED,
sourceStream,
})).toThrowError('Requested server-side encryption but delivery stream source is a Kinesis data stream. Specify server-side encryption on the data stream instead.');
expect(() => new firehose.DeliveryStream(stack, 'Delivery Stream 3', {
destinations: [mockS3Destination],
encryptionKey: new kms.Key(stack, 'Key'),
sourceStream,
})).toThrowError('Requested server-side encryption but delivery stream source is a Kinesis data stream. Specify server-side encryption on the data stream instead.');
});

test('grant provides access to stream', () => {
const role = new iam.Role(stack, 'Role', {
assumedBy: new iam.ServicePrincipal('lambda.amazonaws.com'),
Expand Down
Loading

0 comments on commit 4975593

Please sign in to comment.