Skip to content

Commit

Permalink
put behind flag
Browse files Browse the repository at this point in the history
  • Loading branch information
damccorm committed Dec 23, 2024
1 parent 80daeb5 commit 4c8992c
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,11 @@ public <BuilderT extends AwsClientBuilder<BuilderT, ClientT>, ClientT> BuilderT

HttpClientConfiguration httpConfig = options.getHttpClientConfiguration();
ProxyConfiguration proxyConfig = options.getProxyConfiguration();
if (proxyConfig != null || httpConfig != null) {
boolean skipCertificateVerification = false;
if (config.skipCertificateVerification() != null) {
skipCertificateVerification = config.skipCertificateVerification();
}
if (proxyConfig != null || httpConfig != null || skipCertificateVerification) {
if (builder instanceof SdkSyncClientBuilder) {
ApacheHttpClient.Builder client = syncClientBuilder();

Expand All @@ -178,8 +182,9 @@ public <BuilderT extends AwsClientBuilder<BuilderT, ClientT>, ClientT> BuilderT
setOptional(httpConfig.maxConnections(), client::maxConnections);
}

// TODO - gate this behind a flag.
client.tlsKeyManagersProvider(NoneTlsKeyManagersProvider.getInstance());
if (skipCertificateVerification) {
client.tlsKeyManagersProvider(NoneTlsKeyManagersProvider.getInstance());
}

// must use builder to make sure client is managed by the SDK
((SdkSyncClientBuilder<?, ?>) builder).httpClientBuilder(client);
Expand All @@ -205,8 +210,9 @@ public <BuilderT extends AwsClientBuilder<BuilderT, ClientT>, ClientT> BuilderT
setOptional(httpConfig.maxConnections(), client::maxConcurrency);
}

// TODO - gate this behind a flag.
client.tlsKeyManagersProvider(NoneTlsKeyManagersProvider.getInstance());
if (skipCertificateVerification) {
client.tlsKeyManagersProvider(NoneTlsKeyManagersProvider.getInstance());
}

// must use builder to make sure client is managed by the SDK
((SdkAsyncClientBuilder<?, ?>) builder).httpClientBuilder(client);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,13 @@ public abstract class ClientConfiguration implements Serializable {
return regionId() != null ? Region.of(regionId()) : null;
}

/**
* Optional flag to skip certificate verification. Should only be overriden for test scenarios. If
* set, this overwrites the default in {@link AwsOptions#skipCertificateVerification()}.
*/
@JsonProperty
public abstract @Nullable @Pure Boolean skipCertificateVerification();

/**
* Optional service endpoint to use AWS compatible services instead, e.g. for testing. If set,
* this overwrites the default in {@link AwsOptions#getEndpoint()}.
Expand Down Expand Up @@ -156,6 +163,13 @@ public Builder retry(Consumer<RetryConfiguration.Builder> retry) {
return retry(builder.build());
}

/**
* Optional flag to skip certificate verification. Should only be overriden for test scenarios.
* If set, this overwrites the default in {@link AwsOptions#skipCertificateVerification()}.
*/
@JsonProperty
public abstract Builder skipCertificateVerification(boolean skipCertificateVerification);

abstract Builder regionId(String region);

abstract Builder credentialsProviderAsJson(String credentialsProvider);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ private abstract static class CrossLanguageConfiguration {
String awsSecretKey;
String region;
@Nullable String serviceEndpoint;
boolean verifyCertificate;

public void setStreamName(String streamName) {
this.streamName = streamName;
Expand All @@ -83,6 +84,10 @@ public void setRegion(String region) {
public void setServiceEndpoint(@Nullable String serviceEndpoint) {
this.serviceEndpoint = serviceEndpoint;
}

public void setVerifyCertificate(@Nullable Boolean verifyCertificate) {
this.verifyCertificate = verifyCertificate == null || verifyCertificate;
}
}

public static class WriteBuilder
Expand Down Expand Up @@ -123,6 +128,7 @@ public PTransform<PCollection<byte[]>, KinesisIO.Write.Result> buildExternal(
.credentialsProvider(provider)
.region(Region.of(configuration.region))
.endpoint(endpoint)
.skipCertificateVerification(!configuration.verifyCertificate)
.build())
.withPartitioner(p -> pk)
.withSerializer(serializer);
Expand Down Expand Up @@ -233,6 +239,7 @@ public PTransform<PBegin, PCollection<byte[]>> buildExternal(
.credentialsProvider(provider)
.region(Region.of(configuration.region))
.endpoint(endpoint)
.skipCertificateVerification(!configuration.verifyCertificate)
.build());

if (configuration.maxNumRecords != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ def run_kinesis_read(self):
aws_secret_key=self.aws_secret_key,
region=self.aws_region,
service_endpoint=self.aws_service_endpoint,
verify_certificate=not self.use_localstack,
max_num_records=NUM_RECORDS,
max_read_time=MAX_READ_TIME,
request_records_limit=REQUEST_RECORDS_LIMIT,
Expand Down
37 changes: 8 additions & 29 deletions sdks/python/apache_beam/io/kinesis.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ def default_io_expansion_service():
('region', str),
('partition_key', str),
('service_endpoint', Optional[str]),
('verify_certificate', Optional[bool]),
],
)

Expand Down Expand Up @@ -143,26 +144,14 @@ def __init__(
:param aws_secret_key: Kinesis access key secret.
:param region: AWS region. Example: 'us-east-1'.
:param service_endpoint: Kinesis service endpoint
:param verify_certificate: Deprecated - certificates will always be
verified.
:param verify_certificate: Enable or disable certificate verification.
Never set to False on production. True by default.
:param partition_key: Specify default partition key.
:param producer_properties: Specify the configuration properties for Kinesis
Producer Library (KPL) as dictionary.
Example: {'CollectionMaxCount': '1000', 'ConnectTimeout': '10000'}
:param expansion_service: The address (host:port) of the ExpansionService.
"""
if verify_certificate is False:
# Previously, we supported this via
# https://javadoc.io/doc/com.amazonaws/amazon-kinesis-producer/0.14.0/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.html#isVerifyCertificate--
# With the new AWS client, we no longer support it and it is always True
raise ValueError(
'verify_certificate set to False. This option is no longer ' +
'supported and certificate verification will still happen.')
if verify_certificate is True:
logging.warning(
'verify_certificate set to True. This option is no longer ' +
'supported and certificate verification will automatically ' +
'happen. This option may be removed in a future release')
if producer_properties is not None:
raise ValueError(
'producer_properties is no longer supported and will be removed ' +
Expand All @@ -177,6 +166,7 @@ def __init__(
region=region,
partition_key=partition_key,
service_endpoint=service_endpoint,
verify_certificate=verify_certificate,
)),
expansion_service or default_io_expansion_service(),
)
Expand All @@ -190,6 +180,7 @@ def __init__(
('aws_secret_key', str),
('region', str),
('service_endpoint', Optional[str]),
('verify_certificate', Optional[bool]),
('max_num_records', Optional[int]),
('max_read_time', Optional[int]),
('initial_position_in_stream', Optional[str]),
Expand Down Expand Up @@ -240,8 +231,8 @@ def __init__(
:param aws_secret_key: Kinesis access key secret.
:param region: AWS region. Example: 'us-east-1'.
:param service_endpoint: Kinesis service endpoint
:param verify_certificate: Deprecated - certificates will always be
verified.
:param verify_certificate: Enable or disable certificate verification.
Never set to False on production. True by default.
:param max_num_records: Specifies to read at most a given number of records.
Must be greater than 0.
:param max_read_time: Specifies to read records during x milliseconds.
Expand Down Expand Up @@ -288,19 +279,6 @@ def __init__(
):
logging.warning('Provided timestamp emplaced not in the past.')

if verify_certificate is False:
# Previously, we supported this via
# https://javadoc.io/doc/com.amazonaws/amazon-kinesis-producer/0.14.0/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.html#isVerifyCertificate--
# With the new AWS client, we no longer support it and it is always True
raise ValueError(
'verify_certificate set to False. This option is no longer ' +
'supported and certificate verification will still happen.')
if verify_certificate is True:
logging.warning(
'verify_certificate set to True. This option is no longer ' +
'supported and certificate verification will automatically ' +
'happen. This option may be removed in a future release')

super().__init__(
self.URN,
NamedTupleBasedPayloadBuilder(
Expand All @@ -310,6 +288,7 @@ def __init__(
aws_secret_key=aws_secret_key,
region=region,
service_endpoint=service_endpoint,
verify_certificate=verify_certificate,
max_num_records=max_num_records,
max_read_time=max_read_time,
initial_position_in_stream=initial_position_in_stream,
Expand Down

0 comments on commit 4c8992c

Please sign in to comment.