From 4c8992cc13ee5ef778d17a9e3184b9506ade5397 Mon Sep 17 00:00:00 2001 From: Danny Mccormick Date: Mon, 23 Dec 2024 13:17:19 -0500 Subject: [PATCH] put behind flag --- .../io/aws2/common/ClientBuilderFactory.java | 16 +++++--- .../io/aws2/common/ClientConfiguration.java | 14 +++++++ .../kinesis/KinesisTransformRegistrar.java | 7 ++++ .../io/external/xlang_kinesisio_it_test.py | 1 + sdks/python/apache_beam/io/kinesis.py | 37 ++++--------------- 5 files changed, 41 insertions(+), 34 deletions(-) diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/common/ClientBuilderFactory.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/common/ClientBuilderFactory.java index 9a1772ce878b..85aa99cea360 100644 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/common/ClientBuilderFactory.java +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/common/ClientBuilderFactory.java @@ -162,7 +162,11 @@ public , ClientT> BuilderT HttpClientConfiguration httpConfig = options.getHttpClientConfiguration(); ProxyConfiguration proxyConfig = options.getProxyConfiguration(); - if (proxyConfig != null || httpConfig != null) { + boolean skipCertificateVerification = false; + if (config.skipCertificateVerification() != null) { + skipCertificateVerification = config.skipCertificateVerification(); + } + if (proxyConfig != null || httpConfig != null || skipCertificateVerification) { if (builder instanceof SdkSyncClientBuilder) { ApacheHttpClient.Builder client = syncClientBuilder(); @@ -178,8 +182,9 @@ public , ClientT> BuilderT setOptional(httpConfig.maxConnections(), client::maxConnections); } - // TODO - gate this behind a flag. - client.tlsKeyManagersProvider(NoneTlsKeyManagersProvider.getInstance()); + if (skipCertificateVerification) { + client.tlsKeyManagersProvider(NoneTlsKeyManagersProvider.getInstance()); + } // must use builder to make sure client is managed by the SDK ((SdkSyncClientBuilder) builder).httpClientBuilder(client); @@ -205,8 +210,9 @@ public , ClientT> BuilderT setOptional(httpConfig.maxConnections(), client::maxConcurrency); } - // TODO - gate this behind a flag. - client.tlsKeyManagersProvider(NoneTlsKeyManagersProvider.getInstance()); + if (skipCertificateVerification) { + client.tlsKeyManagersProvider(NoneTlsKeyManagersProvider.getInstance()); + } // must use builder to make sure client is managed by the SDK ((SdkAsyncClientBuilder) builder).httpClientBuilder(client); diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/common/ClientConfiguration.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/common/ClientConfiguration.java index 08fb595bd037..385a25b5a13f 100644 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/common/ClientConfiguration.java +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/common/ClientConfiguration.java @@ -76,6 +76,13 @@ public abstract class ClientConfiguration implements Serializable { return regionId() != null ? Region.of(regionId()) : null; } + /** + * Optional flag to skip certificate verification. Should only be overriden for test scenarios. If + * set, this overwrites the default in {@link AwsOptions#skipCertificateVerification()}. + */ + @JsonProperty + public abstract @Nullable @Pure Boolean skipCertificateVerification(); + /** * Optional service endpoint to use AWS compatible services instead, e.g. for testing. If set, * this overwrites the default in {@link AwsOptions#getEndpoint()}. @@ -156,6 +163,13 @@ public Builder retry(Consumer retry) { return retry(builder.build()); } + /** + * Optional flag to skip certificate verification. Should only be overriden for test scenarios. + * If set, this overwrites the default in {@link AwsOptions#skipCertificateVerification()}. + */ + @JsonProperty + public abstract Builder skipCertificateVerification(boolean skipCertificateVerification); + abstract Builder regionId(String region); abstract Builder credentialsProviderAsJson(String credentialsProvider); diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java index 8fbf26537d4f..11e517872971 100644 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisTransformRegistrar.java @@ -63,6 +63,7 @@ private abstract static class CrossLanguageConfiguration { String awsSecretKey; String region; @Nullable String serviceEndpoint; + boolean verifyCertificate; public void setStreamName(String streamName) { this.streamName = streamName; @@ -83,6 +84,10 @@ public void setRegion(String region) { public void setServiceEndpoint(@Nullable String serviceEndpoint) { this.serviceEndpoint = serviceEndpoint; } + + public void setVerifyCertificate(@Nullable Boolean verifyCertificate) { + this.verifyCertificate = verifyCertificate == null || verifyCertificate; + } } public static class WriteBuilder @@ -123,6 +128,7 @@ public PTransform, KinesisIO.Write.Result> buildExternal( .credentialsProvider(provider) .region(Region.of(configuration.region)) .endpoint(endpoint) + .skipCertificateVerification(!configuration.verifyCertificate) .build()) .withPartitioner(p -> pk) .withSerializer(serializer); @@ -233,6 +239,7 @@ public PTransform> buildExternal( .credentialsProvider(provider) .region(Region.of(configuration.region)) .endpoint(endpoint) + .skipCertificateVerification(!configuration.verifyCertificate) .build()); if (configuration.maxNumRecords != null) { diff --git a/sdks/python/apache_beam/io/external/xlang_kinesisio_it_test.py b/sdks/python/apache_beam/io/external/xlang_kinesisio_it_test.py index d37ce1f87fe3..df0926341a5e 100644 --- a/sdks/python/apache_beam/io/external/xlang_kinesisio_it_test.py +++ b/sdks/python/apache_beam/io/external/xlang_kinesisio_it_test.py @@ -129,6 +129,7 @@ def run_kinesis_read(self): aws_secret_key=self.aws_secret_key, region=self.aws_region, service_endpoint=self.aws_service_endpoint, + verify_certificate=not self.use_localstack, max_num_records=NUM_RECORDS, max_read_time=MAX_READ_TIME, request_records_limit=REQUEST_RECORDS_LIMIT, diff --git a/sdks/python/apache_beam/io/kinesis.py b/sdks/python/apache_beam/io/kinesis.py index cdae53e71c5a..94be0c44b709 100644 --- a/sdks/python/apache_beam/io/kinesis.py +++ b/sdks/python/apache_beam/io/kinesis.py @@ -111,6 +111,7 @@ def default_io_expansion_service(): ('region', str), ('partition_key', str), ('service_endpoint', Optional[str]), + ('verify_certificate', Optional[bool]), ], ) @@ -143,26 +144,14 @@ def __init__( :param aws_secret_key: Kinesis access key secret. :param region: AWS region. Example: 'us-east-1'. :param service_endpoint: Kinesis service endpoint - :param verify_certificate: Deprecated - certificates will always be - verified. + :param verify_certificate: Enable or disable certificate verification. + Never set to False on production. True by default. :param partition_key: Specify default partition key. :param producer_properties: Specify the configuration properties for Kinesis Producer Library (KPL) as dictionary. Example: {'CollectionMaxCount': '1000', 'ConnectTimeout': '10000'} :param expansion_service: The address (host:port) of the ExpansionService. """ - if verify_certificate is False: - # Previously, we supported this via - # https://javadoc.io/doc/com.amazonaws/amazon-kinesis-producer/0.14.0/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.html#isVerifyCertificate-- - # With the new AWS client, we no longer support it and it is always True - raise ValueError( - 'verify_certificate set to False. This option is no longer ' + - 'supported and certificate verification will still happen.') - if verify_certificate is True: - logging.warning( - 'verify_certificate set to True. This option is no longer ' + - 'supported and certificate verification will automatically ' + - 'happen. This option may be removed in a future release') if producer_properties is not None: raise ValueError( 'producer_properties is no longer supported and will be removed ' + @@ -177,6 +166,7 @@ def __init__( region=region, partition_key=partition_key, service_endpoint=service_endpoint, + verify_certificate=verify_certificate, )), expansion_service or default_io_expansion_service(), ) @@ -190,6 +180,7 @@ def __init__( ('aws_secret_key', str), ('region', str), ('service_endpoint', Optional[str]), + ('verify_certificate', Optional[bool]), ('max_num_records', Optional[int]), ('max_read_time', Optional[int]), ('initial_position_in_stream', Optional[str]), @@ -240,8 +231,8 @@ def __init__( :param aws_secret_key: Kinesis access key secret. :param region: AWS region. Example: 'us-east-1'. :param service_endpoint: Kinesis service endpoint - :param verify_certificate: Deprecated - certificates will always be - verified. + :param verify_certificate: Enable or disable certificate verification. + Never set to False on production. True by default. :param max_num_records: Specifies to read at most a given number of records. Must be greater than 0. :param max_read_time: Specifies to read records during x milliseconds. @@ -288,19 +279,6 @@ def __init__( ): logging.warning('Provided timestamp emplaced not in the past.') - if verify_certificate is False: - # Previously, we supported this via - # https://javadoc.io/doc/com.amazonaws/amazon-kinesis-producer/0.14.0/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.html#isVerifyCertificate-- - # With the new AWS client, we no longer support it and it is always True - raise ValueError( - 'verify_certificate set to False. This option is no longer ' + - 'supported and certificate verification will still happen.') - if verify_certificate is True: - logging.warning( - 'verify_certificate set to True. This option is no longer ' + - 'supported and certificate verification will automatically ' + - 'happen. This option may be removed in a future release') - super().__init__( self.URN, NamedTupleBasedPayloadBuilder( @@ -310,6 +288,7 @@ def __init__( aws_secret_key=aws_secret_key, region=region, service_endpoint=service_endpoint, + verify_certificate=verify_certificate, max_num_records=max_num_records, max_read_time=max_read_time, initial_position_in_stream=initial_position_in_stream,