From d4ac4fedbfdf680347faee8382404bac14058428 Mon Sep 17 00:00:00 2001 From: Steven van Rossum Date: Thu, 29 Jun 2023 15:17:50 +0200 Subject: [PATCH 01/16] Add GoogleAdsIO for reading from Google Ads --- .test-infra/jenkins/job_PreCommit_Java.groovy | 1 + .../jenkins/job_PreCommit_Java_IOs.groovy | 1 + build.gradle.kts | 1 + .../beam/gradle/BeamModulePlugin.groovy | 3 + .../beam/checkstyle/suppressions.xml | 2 + sdks/java/io/google-ads/build.gradle | 45 ++ .../DefaultGoogleAdsClientFactory.java | 67 ++ .../io/googleads/GoogleAdsClientFactory.java | 31 + .../beam/sdk/io/googleads/GoogleAdsIO.java | 33 + .../sdk/io/googleads/GoogleAdsOptions.java | 132 ++++ .../GoogleAdsUserCredentialFactory.java | 70 ++ .../beam/sdk/io/googleads/GoogleAdsV14.java | 647 ++++++++++++++++++ .../beam/sdk/io/googleads/package-info.java | 24 + .../sdk/io/googleads/GoogleAdsV14Test.java | 476 +++++++++++++ .../googleads/MockGoogleAdsClientFactory.java | 46 ++ settings.gradle.kts | 1 + 16 files changed, 1580 insertions(+) create mode 100644 sdks/java/io/google-ads/build.gradle create mode 100644 sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/DefaultGoogleAdsClientFactory.java create mode 100644 sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/GoogleAdsClientFactory.java create mode 100644 sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/GoogleAdsIO.java create mode 100644 sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/GoogleAdsOptions.java create mode 100644 sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/GoogleAdsUserCredentialFactory.java create mode 100644 sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/GoogleAdsV14.java create mode 100644 sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/package-info.java create mode 100644 sdks/java/io/google-ads/src/test/java/org/apache/beam/sdk/io/googleads/GoogleAdsV14Test.java create mode 100644 sdks/java/io/google-ads/src/test/java/org/apache/beam/sdk/io/googleads/MockGoogleAdsClientFactory.java diff --git a/.test-infra/jenkins/job_PreCommit_Java.groovy b/.test-infra/jenkins/job_PreCommit_Java.groovy index ddeb05506cb8..41a3b418a015 100644 --- a/.test-infra/jenkins/job_PreCommit_Java.groovy +++ b/.test-infra/jenkins/job_PreCommit_Java.groovy @@ -34,6 +34,7 @@ def excludePaths = [ 'io/elasticsearch', 'io/elasticsearch-tests', 'io/file-schema-transform', + 'io/google-ads', 'io/google-cloud-platform', 'io/hadoop-common', 'io/hadoop-file-system', diff --git a/.test-infra/jenkins/job_PreCommit_Java_IOs.groovy b/.test-infra/jenkins/job_PreCommit_Java_IOs.groovy index f707be2674c5..edeeed5f0970 100644 --- a/.test-infra/jenkins/job_PreCommit_Java_IOs.groovy +++ b/.test-infra/jenkins/job_PreCommit_Java_IOs.groovy @@ -81,6 +81,7 @@ def ioModulesMap = [ 'debezium', 'elasticsearch', 'file-schema-transform', + 'google-ads', 'hbase', 'hcatalog', 'influxdb', diff --git a/build.gradle.kts b/build.gradle.kts index 4ce425286c73..000277135ab0 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -335,6 +335,7 @@ tasks.register("javaioPreCommit") { dependsOn(":sdks:java:io:elasticsearch-tests:elasticsearch-tests-common:build") dependsOn(":sdks:java:io:elasticsearch:build") dependsOn(":sdks:java:io:file-schema-transform:build") + dependsOn(":sdks:java:io:google-ads:build") dependsOn(":sdks:java:io:hbase:build") dependsOn(":sdks:java:io:hcatalog:build") dependsOn(":sdks:java:io:influxdb:build") diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index eccc610696a8..8abb521d25b4 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -531,6 +531,7 @@ class BeamModulePlugin implements Plugin { def errorprone_version = "2.10.0" // Try to keep gax_version consistent with gax-grpc version in google_cloud_platform_libraries_bom def gax_version = "2.31.1" + def google_ads_version = "26.0.0" def google_clients_version = "2.0.0" def google_cloud_bigdataoss_version = "2.2.16" // Try to keep google_cloud_spanner_version consistent with google_cloud_spanner_bom in google_cloud_platform_libraries_bom @@ -651,6 +652,8 @@ class BeamModulePlugin implements Plugin { gax_grpc : "com.google.api:gax-grpc", // google_cloud_platform_libraries_bom sets version gax_grpc_test : "com.google.api:gax-grpc:$gax_version:testlib", // google_cloud_platform_libraries_bom sets version gax_httpjson : "com.google.api:gax-httpjson", // google_cloud_platform_libraries_bom sets version + google_ads : "com.google.api-ads:google-ads:$google_ads_version", + google_ads_stubs_v14 : "com.google.api-ads:google-ads-stubs-v14:$google_ads_version", google_api_client : "com.google.api-client:google-api-client:$google_clients_version", // for the libraries using $google_clients_version below. google_api_client_jackson2 : "com.google.api-client:google-api-client-jackson2:$google_clients_version", google_api_client_java6 : "com.google.api-client:google-api-client-java6:$google_clients_version", diff --git a/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml b/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml index aa96e452d17f..ea541de134f8 100644 --- a/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml +++ b/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml @@ -60,6 +60,8 @@ + + diff --git a/sdks/java/io/google-ads/build.gradle b/sdks/java/io/google-ads/build.gradle new file mode 100644 index 000000000000..4b07c7ddbfa7 --- /dev/null +++ b/sdks/java/io/google-ads/build.gradle @@ -0,0 +1,45 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* License); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an AS IS BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +plugins { id 'org.apache.beam.module' } +applyJavaNature( automaticModuleName: 'org.apache.beam.sdk.io.googleads') + +description = "Apache Beam :: SDKs :: Java :: IO :: Google Ads" +ext.summary = "IO to read from Google Ads" + +dependencies { + implementation project(path: ":sdks:java:core", configuration: "shadow") + implementation project(path: ":sdks:java:extensions:google-cloud-platform-core") + implementation library.java.jackson_annotations + implementation library.java.gax + implementation library.java.google_ads + implementation library.java.google_auth_library_credentials + implementation library.java.google_auth_library_oauth2_http + implementation library.java.protobuf_java + implementation library.java.protobuf_java_util + implementation library.java.google_ads + implementation library.java.google_ads_stubs_v14 + implementation library.java.joda_time + implementation library.java.vendored_guava_26_0_jre + testImplementation project(path: ":sdks:java:core", configuration: "shadowTest") + testImplementation project(path: ":sdks:java:io:common", configuration: "testRuntimeMigration") + testImplementation library.java.mockito_core + testImplementation library.java.junit + testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow") + testRuntimeOnly library.java.slf4j_jdk14 +} diff --git a/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/DefaultGoogleAdsClientFactory.java b/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/DefaultGoogleAdsClientFactory.java new file mode 100644 index 000000000000..ace5d4d1b124 --- /dev/null +++ b/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/DefaultGoogleAdsClientFactory.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.googleads; + +import com.google.ads.googleads.lib.GoogleAdsClient; +import com.google.auth.Credentials; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** The default way to construct a {@link GoogleAdsClient}. */ +public class DefaultGoogleAdsClientFactory implements GoogleAdsClientFactory { + private static final DefaultGoogleAdsClientFactory INSTANCE = new DefaultGoogleAdsClientFactory(); + + public static DefaultGoogleAdsClientFactory getInstance() { + return INSTANCE; + } + + @Override + public GoogleAdsClient newGoogleAdsClient( + GoogleAdsOptions options, + @Nullable String developerToken, + @Nullable Long linkedCustomerId, + @Nullable Long loginCustomerId) { + + GoogleAdsClient.Builder builder = GoogleAdsClient.newBuilder(); + + Credentials credentials = options.getGoogleAdsCredential(); + if (credentials != null) { + builder.setCredentials(credentials); + } + + if (options.getGoogleAdsEndpoint() != null) { + builder.setEndpoint(options.getGoogleAdsEndpoint()); + } + + String developerTokenFromOptions = options.getGoogleAdsDeveloperToken(); + if (developerToken != null) { + builder.setDeveloperToken(developerToken); + } else if (developerTokenFromOptions != null) { + builder.setDeveloperToken(developerTokenFromOptions); + } + + if (linkedCustomerId != null) { + builder.setLinkedCustomerId(linkedCustomerId); + } + + if (loginCustomerId != null) { + builder.setLoginCustomerId(loginCustomerId); + } + + return builder.build(); + } +} diff --git a/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/GoogleAdsClientFactory.java b/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/GoogleAdsClientFactory.java new file mode 100644 index 000000000000..cef32f5c4b9b --- /dev/null +++ b/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/GoogleAdsClientFactory.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.googleads; + +import com.google.ads.googleads.lib.GoogleAdsClient; +import java.io.Serializable; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** Defines how to construct a {@link GoogleAdsClient}. */ +public interface GoogleAdsClientFactory extends Serializable { + GoogleAdsClient newGoogleAdsClient( + GoogleAdsOptions options, + @Nullable String developerToken, + @Nullable Long linkedCustomerId, + @Nullable Long loginCustomerId); +} diff --git a/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/GoogleAdsIO.java b/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/GoogleAdsIO.java new file mode 100644 index 000000000000..de176a05e9be --- /dev/null +++ b/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/GoogleAdsIO.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.googleads; + +/** + * {@link GoogleAdsIO} provides an API for reading from the Google Ads API over different + * versions of the Google Ads client libraries. + * + * @see GoogleAdsV14 + */ +public class GoogleAdsIO { + private GoogleAdsIO() {} + + public static GoogleAdsV14 v14() { + return GoogleAdsV14.INSTANCE; + } +} diff --git a/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/GoogleAdsOptions.java b/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/GoogleAdsOptions.java new file mode 100644 index 000000000000..738760c22eb2 --- /dev/null +++ b/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/GoogleAdsOptions.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.googleads; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.google.auth.Credentials; +import java.io.IOException; +import java.security.GeneralSecurityException; +import org.apache.beam.sdk.extensions.gcp.auth.CredentialFactory; +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.DefaultValueFactory; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.util.InstanceBuilder; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** Options used to configure Google Ads API specific options. */ +public interface GoogleAdsOptions extends PipelineOptions { + /** Host endpoint to use for connections to the Google Ads API. */ + @Description("Host endpoint to use for connections to the Google Ads API.") + @Default.String("googleads.googleapis.com:443") + String getGoogleAdsEndpoint(); + + void setGoogleAdsEndpoint(String endpoint); + + /** + * OAuth 2.0 Client ID identifying the application. + * + * @see https://developers.google.com/google-ads/api/docs/oauth/overview + * @see https://developers.google.com/identity/protocols/oauth2 + */ + @Description("OAuth 2.0 Client ID identifying the application.") + String getGoogleAdsClientId(); + + void setGoogleAdsClientId(String clientId); + + /** + * OAuth 2.0 Client Secret for the specified Client ID. + * + * @see https://developers.google.com/google-ads/api/docs/oauth/overview + * @see https://developers.google.com/identity/protocols/oauth2 + */ + @Description("OAuth 2.0 Client Secret for the specified Client ID.") + String getGoogleAdsClientSecret(); + + void setGoogleAdsClientSecret(String clientSecret); + + /** + * OAuth 2.0 Refresh Token for the user connecting to the Google Ads API. + * + * @see https://developers.google.com/google-ads/api/docs/oauth/overview + * @see https://developers.google.com/identity/protocols/oauth2 + */ + @Description("OAuth 2.0 Refresh Token for the user connecting to the Google Ads API.") + String getGoogleAdsRefreshToken(); + + void setGoogleAdsRefreshToken(String refreshToken); + + /** Google Ads developer token for the user connecting to the Google Ads API. */ + @Description("Google Ads developer token for the user connecting to the Google Ads API.") + @Nullable + String getGoogleAdsDeveloperToken(); + + void setGoogleAdsDeveloperToken(String developerToken); + + /** + * The class of the credential factory to create credentials if none have been explicitly set. + * + * @see #getGoogleAdsCredential() + */ + @Description( + "The class of the credential factory to create credentials if none have been explicitly set.") + @Default.Class(GoogleAdsUserCredentialFactory.class) + Class getGoogleAdsCredentialFactoryClass(); + + void setGoogleAdsCredentialFactoryClass( + Class credentialFactoryClass); + + /** + * The credential instance that should be used to authenticate against the Google Ads API. + * Defaults to a credential instance constructed by the credential factory. + * + * @see #getGoogleAdsCredential() + * @see https://github.com/googleapis/google-auth-library-java + */ + @JsonIgnore + @Description( + "The credential instance that should be used to authenticate against the Google Ads API. " + + "Defaults to a credential instance constructed by the credential factory.") + @Default.InstanceFactory(GoogleAdsCredentialsFactory.class) + @Nullable + Credentials getGoogleAdsCredential(); + + void setGoogleAdsCredential(Credentials credential); + + /** + * Attempts to load the Google Ads credentials. See {@link CredentialFactory#getCredential()} for + * more details. + */ + class GoogleAdsCredentialsFactory implements DefaultValueFactory<@Nullable Credentials> { + @Override + public @Nullable Credentials create(PipelineOptions options) { + GoogleAdsOptions googleAdsOptions = options.as(GoogleAdsOptions.class); + try { + CredentialFactory factory = + InstanceBuilder.ofType(CredentialFactory.class) + .fromClass(googleAdsOptions.getGoogleAdsCredentialFactoryClass()) + .fromFactoryMethod("fromOptions") + .withArg(PipelineOptions.class, options) + .build(); + return factory.getCredential(); + } catch (IOException | GeneralSecurityException e) { + throw new RuntimeException("Unable to obtain credential", e); + } + } + } +} diff --git a/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/GoogleAdsUserCredentialFactory.java b/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/GoogleAdsUserCredentialFactory.java new file mode 100644 index 000000000000..c9183157fc31 --- /dev/null +++ b/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/GoogleAdsUserCredentialFactory.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.googleads; + +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; + +import com.google.auth.Credentials; +import com.google.auth.oauth2.UserCredentials; +import org.apache.beam.sdk.extensions.gcp.auth.CredentialFactory; +import org.apache.beam.sdk.options.PipelineOptions; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** + * Constructs and returns {@link Credentials} to be used by Google Ads API calls. This factory only + * supports {@link com.google.auth.oauth2.UserCredentials}, {@link + * com.google.auth.oauth2.ServiceAccountCredentials} and domain-wide delegation are not supported. + */ +public class GoogleAdsUserCredentialFactory implements CredentialFactory { + // The OAuth client ID, client secret, and refresh token. + private String clientId; + private String clientSecret; + private String refreshToken; + + private GoogleAdsUserCredentialFactory( + String clientId, String clientSecret, String refreshToken) { + this.clientId = clientId; + this.clientSecret = clientSecret; + this.refreshToken = refreshToken; + } + + public static GoogleAdsUserCredentialFactory fromOptions(PipelineOptions options) { + GoogleAdsOptions adsOptions = options.as(GoogleAdsOptions.class); + + checkArgument( + adsOptions.getGoogleAdsClientId() != null + && adsOptions.getGoogleAdsClientSecret() != null + && adsOptions.getGoogleAdsRefreshToken() != null, + "googleAdsClientId, googleAdsClientSecret and googleAdsRefreshToken must not be null"); + + return new GoogleAdsUserCredentialFactory( + adsOptions.getGoogleAdsClientId(), + adsOptions.getGoogleAdsClientSecret(), + adsOptions.getGoogleAdsRefreshToken()); + } + + /** Returns {@link Credentials} as configured by {@link GoogleAdsOptions}. */ + @Override + public @Nullable Credentials getCredential() { + return UserCredentials.newBuilder() + .setClientId(clientId) + .setClientSecret(clientSecret) + .setRefreshToken(refreshToken) + .build(); + } +} diff --git a/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/GoogleAdsV14.java b/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/GoogleAdsV14.java new file mode 100644 index 000000000000..8ddcaf42aed9 --- /dev/null +++ b/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/GoogleAdsV14.java @@ -0,0 +1,647 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.googleads; + +import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; + +import com.google.ads.googleads.lib.GoogleAdsClient; +import com.google.ads.googleads.v14.errors.GoogleAdsError; +import com.google.ads.googleads.v14.errors.GoogleAdsException; +import com.google.ads.googleads.v14.errors.GoogleAdsFailure; +import com.google.ads.googleads.v14.errors.InternalErrorEnum; +import com.google.ads.googleads.v14.errors.QuotaErrorEnum; +import com.google.ads.googleads.v14.services.GoogleAdsRow; +import com.google.ads.googleads.v14.services.GoogleAdsServiceClient; +import com.google.ads.googleads.v14.services.SearchGoogleAdsStreamRequest; +import com.google.ads.googleads.v14.services.SearchGoogleAdsStreamResponse; +import com.google.auto.value.AutoValue; +import com.google.protobuf.Message; +import com.google.protobuf.util.Durations; +import java.io.IOException; +import java.io.Serializable; +import java.util.List; +import java.util.Optional; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.ProcessContext; +import org.apache.beam.sdk.transforms.DoFn.ProcessElement; +import org.apache.beam.sdk.transforms.DoFn.Setup; +import org.apache.beam.sdk.transforms.DoFn.Teardown; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.util.BackOff; +import org.apache.beam.sdk.util.BackOffUtils; +import org.apache.beam.sdk.util.FluentBackoff; +import org.apache.beam.sdk.util.Sleeper; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.RateLimiter; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Duration; + +/** + * {@link GoogleAdsV14} provides an API to read Google Ads API v14 reports. + * + *

The Google Ads API does not use service account credentials in the same way as Google Cloud + * Platform APIs do. Service account credentials are typically only used to delegate (using + * domain-wide delegation) access through end user accounts. Providing credentials using the OAuth2 + * desktop flow may be preferable over domain wide delegation. Defaults for OAuth 2.0 credentials, + * refresh token and developer token can be provided using the following flags: + * + *

+ *   --googleAdsClientId=your-client-id
+ *   --googleAdsClientSecret=your-client-secret
+ *   --googleAdsRefreshToken=your-refresh-token
+ *   --googleAdsDeveloperToken=your-developer-token
+ * 
+ * + *

Use {@link GoogleAdsV14#read()} to read a bounded {@link PCollection} of {@link GoogleAdsRow} + * from a query using {@link Read#withQuery(String)} and one or a few customer IDs using either + * {@link Read#withCustomerId(Long)} or {@link Read#withCustomerIds(List)}. Alternatively, use + * {@link GoogleAdsV14#readAll()} to read either a bounded or unbounded {@link PCollection} of + * {@link GoogleAdsRow} from a {@link PCollection} of {@link SearchGoogleAdsStreamRequest}. + * + *

For example, using {@link GoogleAdsV14#read()}: + * + *

{@code
+ * Pipeline p = Pipeline.create();
+ * PCollection rows =
+ *     p.apply(
+ *         GoogleAdsIO.v14()
+ *             .read()
+ *             .withCustomerId(1234567890l)
+ *             .withQuery(
+ *                 "SELECT"
+ *                     + "campaign.id,"
+ *                     + "campaign.name,"
+ *                     + "campaign.status"
+ *                     + "FROM campaign"));
+ * p.run();
+ * }
+ * + *

Alternatively, using {@link GoogleAdsV14#readAll()} to execute requests from a {@link + * PCollection} of {@link SearchGoogleAdsStreamRequest}: + * + *

{@code
+ * Pipeline p = Pipeline.create();
+ * PCollection requests =
+ *     p.apply(
+ *         Create.of(
+ *             ImmutableList.of(
+ *                 SearchGoogleAdsStreamRequest.newBuilder()
+ *                     .setCustomerId(Long.toString(1234567890l))
+ *                     .setQuery(
+ *                         "SELECT"
+ *                             + "campaign.id,"
+ *                             + "campaign.name,"
+ *                             + "campaign.status"
+ *                             + "FROM campaign")
+ *                     .build())));
+ * PCollection rows = requests.apply(GoogleAdsIO.v14().readAll());
+ * p.run();
+ * }
+ * + *

Client-side rate limiting

+ * + * On construction of a {@link GoogleAdsV14#read()} or {@link GoogleAdsV14#readAll()} transform a + * default rate limiting policy is provided to stay well under the rate limit for the Google Ads + * API, but this limit is only local to a single worker and operates without any knowledge of other + * applications using the same developer token for any customer ID. The Google Ads API enforces + * global limits from the developer token down to the customer ID and it is recommended to host a + * shared rate limiting service to coordinate traffic to the Google Ads API across all applications + * using the same developer token. Users of these transforms are strongly advised to implement their + * own {@link RateLimitPolicy} and {@link RateLimitPolicyFactory} to interact with a shared rate + * limiting service for any production workloads. + * + * @see GoogleAdsIO#v14() + * @see GoogleAdsOptions + * @see Best + * Practices in the Google Ads documentation + */ +public class GoogleAdsV14 { + static final GoogleAdsV14 INSTANCE = new GoogleAdsV14(); + + private GoogleAdsV14() {} + + public Read read() { + return new AutoValue_GoogleAdsV14_Read.Builder() + .setGoogleAdsClientFactory(DefaultGoogleAdsClientFactory.getInstance()) + .setRateLimitPolicyFactory(() -> new DefaultRateLimitPolicy()) + .build(); + } + + public ReadAll readAll() { + return new AutoValue_GoogleAdsV14_ReadAll.Builder() + .setGoogleAdsClientFactory(DefaultGoogleAdsClientFactory.getInstance()) + .setRateLimitPolicyFactory(() -> new DefaultRateLimitPolicy()) + .build(); + } + + /** + * A {@link PTransform} that reads the results of a Google Ads query as {@link GoogleAdsRow} + * objects. + * + * @see GoogleAdsIO#v14() + * @see #readAll() + */ + @AutoValue + public abstract static class Read extends PTransform> { + abstract @Nullable String getDeveloperToken(); + + abstract @Nullable Long getLoginCustomerId(); + + abstract @Nullable List getCustomerIds(); + + abstract @Nullable String getQuery(); + + abstract GoogleAdsClientFactory getGoogleAdsClientFactory(); + + abstract RateLimitPolicyFactory getRateLimitPolicyFactory(); + + abstract Builder toBuilder(); + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setDeveloperToken(@Nullable String developerToken); + + abstract Builder setLoginCustomerId(@Nullable Long loginCustomerId); + + abstract Builder setCustomerIds(List customerId); + + abstract Builder setQuery(String query); + + abstract Builder setGoogleAdsClientFactory(GoogleAdsClientFactory googleAdsClientFactory); + + abstract Builder setRateLimitPolicyFactory(RateLimitPolicyFactory rateLimitPolicyFactory); + + abstract Read build(); + } + + /** + * Creates and returns a new {@link Read} transform with the specified developer token. A + * developer token is required to access the Google Ads API. + * + * @param developerToken The developer token to set. + * @return A new {@link Read} transform with the specified developer token. + * @see GoogleAdsClient + */ + public Read withDeveloperToken(@Nullable String developerToken) { + return toBuilder().setDeveloperToken(developerToken).build(); + } + + /** + * Creates and returns a new {@link Read} transform with the specified login customer ID. A + * login customer ID is only required for manager accounts. + * + * @param loginCustomerId The login customer ID to set. + * @return A new {@link Read} transform with the specified login customer ID. + * @see GoogleAdsClient + */ + public Read withLoginCustomerId(@Nullable Long loginCustomerId) { + return toBuilder().setLoginCustomerId(loginCustomerId).build(); + } + + /** + * Creates and returns a new {@link Read} transform with the specified customer IDs to query. + * + * @param customerIds + * @return A new {@link Read} transform with the specified customer IDs to query. + * @see SearchGoogleAdsStreamRequest + * @see #withQuery(String) + */ + public Read withCustomerIds(List customerIds) { + checkArgumentNotNull(customerIds, "customerIds cannot be null"); + checkArgument(customerIds.size() > 0, "customerIds cannot be empty"); + + return toBuilder().setCustomerIds(ImmutableList.copyOf(customerIds)).build(); + } + + /** + * Creates and returns a new {@link Read} transform with the specified customer ID to query. + * + * @param customerId + * @return A new {@link Read} transform with the specified customer ID to query. + * @see SearchGoogleAdsStreamRequest + * @see #withQuery(String) + */ + public Read withCustomerId(Long customerId) { + checkArgumentNotNull(customerId, "customerId cannot be null"); + + return withCustomerIds(ImmutableList.of(customerId)); + } + + /** + * Creates and returns a new {@link Read} transform with the specified query. The query will be + * executed for each customer ID. + * + * @param query + * @return A new {@link Read} transform with the specified query. + * @see SearchGoogleAdsStreamRequest + * @see #withCustomerId(Long) + * @see #withCustomerIds(List) + */ + public Read withQuery(String query) { + checkArgumentNotNull(query, "query cannot be null"); + checkArgument(!query.isEmpty(), "query cannot be empty"); + + return toBuilder().setQuery(query).build(); + } + + /** + * Creates and returns a new {@link Read} transform with the specified client factory. A {@link + * GoogleAdsClientFactory} builds the {@link GoogleAdsClient} used to construct service clients. + * The {@link DefaultGoogleAdsClientFactory} should be sufficient for most purposes unless the + * construction of {@link GoogleAdsClient} requires customization. + * + * @param googleAdsClientFactory + * @return A new {@link Read} transform with the specified client factory. + * @see GoogleAdsClient + */ + public Read withGoogleAdsClientFactory(GoogleAdsClientFactory googleAdsClientFactory) { + checkArgumentNotNull(googleAdsClientFactory, "googleAdsClientFactory cannot be null"); + + return toBuilder().setGoogleAdsClientFactory(googleAdsClientFactory).build(); + } + + /** + * Creates and returns a new {@link Read} transform with the specified rate limit policy + * factory. A {@link RateLimitPolicyFactory} builds the {@link RateLimitPolicy} used to limit + * the number of requests made by {@link ReadAll.ReadAllFn}. The Google Ads API enforces global + * limits from the developer token down to the customer ID and it is recommended to host a + * shared rate limiting service to coordinate traffic to the Google Ads API across all + * applications using the same developer token. Users of these transforms are strongly advised + * to implement their own {@link RateLimitPolicy} and {@link RateLimitPolicyFactory} to interact + * with a shared rate limiting service for any production workloads. + * + * @param rateLimitPolicyFactory + * @return A new {@link Read} transform with the specified rate limit policy factory. + * @see GoogleAdsClient + */ + public Read withRateLimitPolicy(RateLimitPolicyFactory rateLimitPolicyFactory) { + checkArgumentNotNull(rateLimitPolicyFactory, "rateLimitPolicyFactory cannot be null"); + + return toBuilder().setRateLimitPolicyFactory(rateLimitPolicyFactory).build(); + } + + @Override + public PCollection expand(PBegin input) { + String query = getQuery(); + List customerIds = getCustomerIds(); + checkArgumentNotNull(query, "withQuery() is required"); + checkArgumentNotNull(customerIds, "either withCustomerId() or withCustomerIds() is required"); + + return input + .apply(Create.of(customerIds)) + .apply( + MapElements.into(TypeDescriptor.of(SearchGoogleAdsStreamRequest.class)) + .via( + customerId -> + SearchGoogleAdsStreamRequest.newBuilder() + .setCustomerId(Long.toString(customerId)) + .setQuery(query) + .build())) + .apply( + INSTANCE + .readAll() + .withDeveloperToken(getDeveloperToken()) + .withLoginCustomerId(getLoginCustomerId()) + .withGoogleAdsClientFactory(getGoogleAdsClientFactory()) + .withRateLimitPolicy(getRateLimitPolicyFactory())); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + builder + .addIfNotDefault( + DisplayData.item("customerIds", String.valueOf(getCustomerIds())) + .withLabel("Customer IDs"), + "null") + .addIfNotNull(DisplayData.item("query", String.valueOf(getQuery())).withLabel("Query")); + } + } + + /** + * A {@link PTransform} that reads the results of many {@link SearchGoogleAdsStreamRequest} + * objects as {@link GoogleAdsRow} objects. * + * + * @see GoogleAdsIO#v14() + * @see #readAll() + */ + @AutoValue + public abstract static class ReadAll + extends PTransform, PCollection> { + abstract @Nullable String getDeveloperToken(); + + abstract @Nullable Long getLoginCustomerId(); + + abstract GoogleAdsClientFactory getGoogleAdsClientFactory(); + + abstract RateLimitPolicyFactory getRateLimitPolicyFactory(); + + abstract Builder toBuilder(); + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setDeveloperToken(@Nullable String developerToken); + + abstract Builder setLoginCustomerId(@Nullable Long loginCustomerId); + + abstract Builder setGoogleAdsClientFactory(GoogleAdsClientFactory googleAdsClientFactory); + + abstract Builder setRateLimitPolicyFactory(RateLimitPolicyFactory rateLimitPolicyFactory); + + abstract ReadAll build(); + } + + /** + * Creates and returns a new {@link ReadAll} transform with the specified developer token. A + * developer token is required to access the Google Ads API. + * + * @param developerToken The developer token to set. + * @return A new {@link ReadAll} transform with the specified developer token. + * @see GoogleAdsClient + */ + public ReadAll withDeveloperToken(@Nullable String developerToken) { + return toBuilder().setDeveloperToken(developerToken).build(); + } + + /** + * Creates and returns a new {@link ReadAll} transform with the specified login customer ID. A + * login customer ID is only required for manager accounts. + * + * @param loginCustomerId The login customer ID to set. + * @return A new {@link ReadAll} transform with the specified login customer ID. + * @see GoogleAdsClient + */ + public ReadAll withLoginCustomerId(@Nullable Long loginCustomerId) { + return toBuilder().setLoginCustomerId(loginCustomerId).build(); + } + + /** + * Creates and returns a new {@link ReadAll} transform with the specified client factory. A + * {@link GoogleAdsClientFactory} builds the {@link GoogleAdsClient} used to construct service + * clients. The {@link DefaultGoogleAdsClientFactory} should be sufficient for most purposes + * unless the construction of {@link GoogleAdsClient} requires customization. + * + * @param googleAdsClientFactory + * @return A new {@link ReadAll} transform with the specified client factory. + * @see GoogleAdsClient + */ + public ReadAll withGoogleAdsClientFactory(GoogleAdsClientFactory googleAdsClientFactory) { + checkArgumentNotNull(googleAdsClientFactory, "googleAdsClientFactory cannot be null"); + + return toBuilder().setGoogleAdsClientFactory(googleAdsClientFactory).build(); + } + + /** + * Creates and returns a new {@link ReadAll} transform with the specified rate limit policy + * factory. A {@link RateLimitPolicyFactory} builds the {@link RateLimitPolicy} used to limit + * the number of requests made by {@link ReadAll.ReadAllFn}. The Google Ads API enforces global + * limits from the developer token down to the customer ID and it is recommended to host a + * shared rate limiting service to coordinate traffic to the Google Ads API across all + * applications using the same developer token. Users of these transforms are strongly advised + * to implement their own {@link RateLimitPolicy} and {@link RateLimitPolicyFactory} to interact + * with a shared rate limiting service for any production workloads. + * + * @param rateLimitPolicyFactory + * @return A new {@link ReadAll} transform with the specified rate limit policy factory. + * @see GoogleAdsClient + */ + public ReadAll withRateLimitPolicy(RateLimitPolicyFactory rateLimitPolicyFactory) { + checkArgumentNotNull(rateLimitPolicyFactory, "rateLimitPolicyFactory cannot be null"); + + return toBuilder().setRateLimitPolicyFactory(rateLimitPolicyFactory).build(); + } + + @Override + public PCollection expand(PCollection input) { + GoogleAdsOptions options = input.getPipeline().getOptions().as(GoogleAdsOptions.class); + + checkArgument( + options.getGoogleAdsDeveloperToken() != null || getDeveloperToken() != null, + "either --googleAdsDeveloperToken or .withDeveloperToken() is required"); + + return input.apply(ParDo.of(new ReadAllFn(this))); + } + + /** + * A {@link DoFn} that reads reports from Google Ads for each query using the {@code + * SearchStream} method. + */ + @VisibleForTesting + static class ReadAllFn extends DoFn { + private static final int MAX_RETRIES = 5; + private static final FluentBackoff BACKOFF = + FluentBackoff.DEFAULT + .withExponent(2.0) + .withInitialBackoff(Duration.standardSeconds(30)) + .withMaxRetries(MAX_RETRIES); + + @VisibleForTesting static Sleeper sleeper = Sleeper.DEFAULT; + + private final GoogleAdsV14.ReadAll spec; + + private transient @Nullable GoogleAdsClient googleAdsClient; + private transient @Nullable GoogleAdsServiceClient googleAdsServiceClient; + private transient @Nullable RateLimitPolicy rateLimitPolicy; + + ReadAllFn(GoogleAdsV14.ReadAll spec) { + this.spec = spec; + } + + @Setup + public void setup(PipelineOptions options) { + GoogleAdsOptions adsOptions = options.as(GoogleAdsOptions.class); + + googleAdsClient = + spec.getGoogleAdsClientFactory() + .newGoogleAdsClient( + adsOptions, spec.getDeveloperToken(), null, spec.getLoginCustomerId()); + googleAdsServiceClient = googleAdsClient.getVersion14().createGoogleAdsServiceClient(); + rateLimitPolicy = spec.getRateLimitPolicyFactory().getRateLimitPolicy(); + } + + @ProcessElement + public void processElement(ProcessContext c) throws IOException, InterruptedException { + GoogleAdsClient googleAdsClient = checkStateNotNull(this.googleAdsClient); + GoogleAdsServiceClient googleAdsServiceClient = + checkStateNotNull(this.googleAdsServiceClient); + RateLimitPolicy rateLimitPolicy = checkStateNotNull(this.rateLimitPolicy); + + BackOff backoff = BACKOFF.backoff(); + BackOff nextBackoff = backoff; + GoogleAdsException lastException = null; + + SearchGoogleAdsStreamRequest request = c.element(); + String developerToken = googleAdsClient.getDeveloperToken(); + String customerId = request.getCustomerId(); + + do { + rateLimitPolicy.onBeforeRequest(developerToken, customerId, request); + + try { + for (SearchGoogleAdsStreamResponse response : + googleAdsServiceClient.searchStreamCallable().call(request)) { + for (GoogleAdsRow row : response.getResultsList()) { + c.output(row); + } + } + rateLimitPolicy.onSuccess(developerToken, customerId, request); + return; + } catch (GoogleAdsException e) { + GoogleAdsError retryableError = + findFirstRetryableError(e.getGoogleAdsFailure()) + .orElseThrow(() -> new IOException(e)); + + rateLimitPolicy.onError(developerToken, customerId, request, retryableError); + + // If the error happens to carry a suggested retry delay, then use that instead. + // Retry these errors without incrementing the retry count or backoff interval. + // For all other retryable errors fall back to the existing backoff. + if (retryableError.getDetails().getQuotaErrorDetails().hasRetryDelay()) { + nextBackoff = + new BackOff() { + @Override + public void reset() {} + + @Override + public long nextBackOffMillis() { + return Durations.toMillis( + retryableError.getDetails().getQuotaErrorDetails().getRetryDelay()); + } + }; + } else { + nextBackoff = backoff; + } + } + } while (BackOffUtils.next(sleeper, nextBackoff)); + + throw new IOException( + String.format( + "Unable to get Google Ads response after retrying %d times using query (%s)", + MAX_RETRIES, request.getQuery()), + lastException); + } + + @Teardown + public void teardown() { + if (googleAdsServiceClient != null) { + googleAdsServiceClient.close(); + } + } + + private Optional findFirstRetryableError(GoogleAdsFailure e) { + return e.getErrorsList().stream() + .filter( + err -> + // Unexpected internal error + err.getErrorCode().getInternalError() + == InternalErrorEnum.InternalError.INTERNAL_ERROR + || + // Unexpected transient error + err.getErrorCode().getInternalError() + == InternalErrorEnum.InternalError.TRANSIENT_ERROR + || + // Too many requests + err.getErrorCode().getQuotaError() + == QuotaErrorEnum.QuotaError.RESOURCE_EXHAUSTED + || + // Too many requests in a short amount of time + err.getErrorCode().getQuotaError() + == QuotaErrorEnum.QuotaError.RESOURCE_TEMPORARILY_EXHAUSTED) + .findFirst(); + } + } + } + + /** + * Implement this interface to create a {@link RateLimitPolicy}. This should be used to limit all + * traffic sent to the Google Ads API for a pair of developer token and customer ID and any other + * relevant attributes for the specific Google Ads API service being called. + */ + public interface RateLimitPolicyFactory extends Serializable { + RateLimitPolicy getRateLimitPolicy(); + } + + /** + * This interface can be used to implement custom client-side rate limiting policies. Custom + * policies should follow best practices for interacting with the Google Ads API. + * + * @see Best + * Practices in the Google Ads documentation + */ + public interface RateLimitPolicy { + /** + * Called before a request is sent. + * + * @param developerToken The developer token used for the request. + * @param customerId The customer ID specified on the request. + * @param request Any Google Ads API request. + * @throws InterruptedException + */ + void onBeforeRequest(String developerToken, String customerId, Message request) + throws InterruptedException; + + /** + * Called after a request succeeds. + * + * @param developerToken The developer token used for the request. + * @param customerId The customer ID specified on the request. + * @param request Any Google Ads API request. + */ + void onSuccess(String developerToken, String customerId, Message request); + + /** + * Called after a request fails with a retryable error. + * + * @param developerToken The developer token used for the request. + * @param customerId The customer ID specified on the request. + * @param request Any Google Ads API request. + * @param error A retryable error. + */ + void onError(String developerToken, String customerId, Message request, GoogleAdsError error); + } + + static class DefaultRateLimitPolicy implements RateLimitPolicy { + private static final RateLimiter RATE_LIMITER = RateLimiter.create(1.0); + + DefaultRateLimitPolicy() {} + + @Override + public void onBeforeRequest(String developerToken, String customerId, Message request) + throws InterruptedException { + RATE_LIMITER.acquire(); + } + + @Override + public void onSuccess(String developerToken, String customerId, Message request) {} + + @Override + public void onError( + String developerToken, String customerId, Message request, GoogleAdsError error) {} + } +} diff --git a/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/package-info.java b/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/package-info.java new file mode 100644 index 000000000000..56aaed903e3c --- /dev/null +++ b/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/package-info.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Defines transforms for reading from Google Ads. + * + * @see org.apache.beam.sdk.io.googleads.GoogleAdsIO + */ +package org.apache.beam.sdk.io.googleads; diff --git a/sdks/java/io/google-ads/src/test/java/org/apache/beam/sdk/io/googleads/GoogleAdsV14Test.java b/sdks/java/io/google-ads/src/test/java/org/apache/beam/sdk/io/googleads/GoogleAdsV14Test.java new file mode 100644 index 000000000000..cd59531bec39 --- /dev/null +++ b/sdks/java/io/google-ads/src/test/java/org/apache/beam/sdk/io/googleads/GoogleAdsV14Test.java @@ -0,0 +1,476 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.googleads; + +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.when; + +import com.google.ads.googleads.v14.errors.AuthenticationErrorEnum.AuthenticationError; +import com.google.ads.googleads.v14.errors.ErrorCode; +import com.google.ads.googleads.v14.errors.ErrorDetails; +import com.google.ads.googleads.v14.errors.GoogleAdsError; +import com.google.ads.googleads.v14.errors.GoogleAdsException; +import com.google.ads.googleads.v14.errors.GoogleAdsFailure; +import com.google.ads.googleads.v14.errors.InternalErrorEnum.InternalError; +import com.google.ads.googleads.v14.errors.QuotaErrorDetails; +import com.google.ads.googleads.v14.errors.QuotaErrorEnum.QuotaError; +import com.google.ads.googleads.v14.services.GoogleAdsRow; +import com.google.ads.googleads.v14.services.SearchGoogleAdsStreamRequest; +import com.google.ads.googleads.v14.services.SearchGoogleAdsStreamResponse; +import com.google.api.gax.grpc.GrpcStatusCode; +import com.google.api.gax.rpc.ApiException; +import com.google.protobuf.Duration; +import io.grpc.Metadata; +import io.grpc.Status.Code; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.Pipeline.PipelineExecutionException; +import org.apache.beam.sdk.extensions.gcp.auth.NoopCredentialFactory; +import org.apache.beam.sdk.testing.NeedsRunner; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.experimental.runners.Enclosed; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(Enclosed.class) +public class GoogleAdsV14Test { + + @RunWith(JUnit4.class) + public static class ConstructionTests { + private final transient TestPipeline pipeline = TestPipeline.create(); + + @Test + public void testReadAllExpandWithDeveloperTokenFromBuilder() { + pipeline + .apply(Create.empty(new TypeDescriptor() {})) + .apply(GoogleAdsIO.v14().readAll().withDeveloperToken("abc")); + } + + @Test + public void testReadAllExpandWithDeveloperTokenFromOptions() { + pipeline.getOptions().as(GoogleAdsOptions.class).setGoogleAdsDeveloperToken("abc"); + pipeline + .apply(Create.empty(new TypeDescriptor() {})) + .apply(GoogleAdsIO.v14().readAll()); + } + + @Test + public void testReadAllExpandWithDeveloperTokenFromOptionsAndBuilder() { + pipeline.getOptions().as(GoogleAdsOptions.class).setGoogleAdsDeveloperToken("abc"); + pipeline + .apply(Create.empty(new TypeDescriptor() {})) + .apply(GoogleAdsIO.v14().readAll().withDeveloperToken(null)); + } + + @Test + public void testReadAllExpandWithoutDeveloperToken() throws Exception { + Assert.assertThrows( + "Developer token required but not provided", + IllegalArgumentException.class, + () -> + pipeline + .apply(Create.empty(new TypeDescriptor() {})) + .apply(GoogleAdsIO.v14().readAll())); + } + + @Test + public void testReadAllExpandWithoutValidGoogleAdsClientFactory() throws Exception { + Assert.assertThrows( + "Non-empty googleAdsClientFactory required but not provided", + IllegalArgumentException.class, + () -> + pipeline + .apply(Create.empty(new TypeDescriptor() {})) + .apply(GoogleAdsIO.v14().readAll().withGoogleAdsClientFactory(null))); + } + + @Test + public void testReadAllExpandWithoutValidRateLimitPolicy() throws Exception { + Assert.assertThrows( + "Non-empty rateLimitPolicy required but not provided", + IllegalArgumentException.class, + () -> + pipeline + .apply(Create.empty(new TypeDescriptor() {})) + .apply(GoogleAdsIO.v14().readAll().withRateLimitPolicy(null))); + } + + @Test + public void testReadExpandWithDeveloperTokenFromBuilder() { + pipeline.apply( + GoogleAdsIO.v14() + .read() + .withDeveloperToken("abc") + .withCustomerId(123L) + .withQuery("GAQL")); + pipeline.getOptions().as(GoogleAdsOptions.class).setGoogleAdsDeveloperToken("abc"); + pipeline.apply(GoogleAdsIO.v14().read().withCustomerId(123L).withQuery("GAQL")); + } + + @Test + public void testReadExpandWithDeveloperTokenFromOptions() { + pipeline.getOptions().as(GoogleAdsOptions.class).setGoogleAdsDeveloperToken("abc"); + pipeline.apply(GoogleAdsIO.v14().read().withCustomerId(123L).withQuery("GAQL")); + } + + @Test + public void testReadExpandWithDeveloperTokenFromOptionsAndBuilder() { + pipeline.getOptions().as(GoogleAdsOptions.class).setGoogleAdsDeveloperToken("abc"); + pipeline.apply( + GoogleAdsIO.v14().read().withDeveloperToken(null).withCustomerId(123L).withQuery("GAQL")); + } + + @Test + public void testReadExpandWithoutCustomerId() throws Exception { + Assert.assertThrows( + "Customer ID required but not provided", + IllegalArgumentException.class, + () -> pipeline.apply(GoogleAdsIO.v14().read().withQuery("GAQL"))); + } + + @Test + public void testReadExpandWithoutCustomerIds() throws Exception { + Assert.assertThrows( + "Customer IDs required but not provided", + IllegalArgumentException.class, + () -> pipeline.apply(GoogleAdsIO.v14().read().withQuery("GAQL"))); + } + + @Test + public void testReadExpandWithoutDeveloperToken() throws Exception { + Assert.assertThrows( + "Developer token required but not provided", + IllegalArgumentException.class, + () -> pipeline.apply(GoogleAdsIO.v14().read().withCustomerId(123L).withQuery("GAQL"))); + } + + @Test + public void testReadExpandWithoutQuery() throws Exception { + Assert.assertThrows( + "Query required but not provided", + IllegalArgumentException.class, + () -> pipeline.apply(GoogleAdsIO.v14().read().withCustomerId(123L))); + } + + @Test + public void testReadExpandWithoutValidCustomerIds() throws Exception { + Assert.assertThrows( + "Non-null customer IDs list required but not provided", + IllegalArgumentException.class, + () -> pipeline.apply(GoogleAdsIO.v14().read().withCustomerIds(null).withQuery("GAQL"))); + + Assert.assertThrows( + "At least one customer ID required but not provided", + IllegalArgumentException.class, + () -> + pipeline.apply( + GoogleAdsIO.v14().read().withCustomerIds(ImmutableList.of()).withQuery("GAQL"))); + + List customerIds = new ArrayList<>(); + customerIds.add(123L); + customerIds.add(null); + Assert.assertThrows( + "Non-null customer IDs required but not provided", + NullPointerException.class, + () -> + pipeline.apply( + GoogleAdsIO.v14().read().withCustomerIds(customerIds).withQuery("GAQL"))); + } + + @Test + public void testReadExpandWithoutValidGoogleAdsClientFactory() throws Exception { + Assert.assertThrows( + "Non-empty googleAdsClientFactory required but not provided", + IllegalArgumentException.class, + () -> + pipeline.apply( + GoogleAdsIO.v14() + .read() + .withCustomerId(123L) + .withQuery("GAQL") + .withGoogleAdsClientFactory(null))); + } + + @Test + public void testReadExpandWithoutValidQuery() throws Exception { + Assert.assertThrows( + "Non-null query required but not provided", + IllegalArgumentException.class, + () -> pipeline.apply(GoogleAdsIO.v14().read().withCustomerId(123L).withQuery(null))); + + Assert.assertThrows( + "Non-empty query required but not provided", + IllegalArgumentException.class, + () -> pipeline.apply(GoogleAdsIO.v14().read().withCustomerId(123L).withQuery(""))); + } + + @Test + public void testReadExpandWithoutValidRateLimitPolicy() throws Exception { + Assert.assertThrows( + "Non-empty rateLimitPolicy required but not provided", + IllegalArgumentException.class, + () -> + pipeline.apply( + GoogleAdsIO.v14() + .read() + .withCustomerId(123L) + .withQuery("GAQL") + .withRateLimitPolicy(null))); + } + } + + @RunWith(MockitoJUnitRunner.class) + public static class ExecutionTests { + @Rule public final transient TestPipeline pipeline = TestPipeline.create(); + + @Before + public void init() { + GoogleAdsOptions options = pipeline.getOptions().as(GoogleAdsOptions.class); + options.setGoogleAdsCredentialFactoryClass(NoopCredentialFactory.class); + GoogleAdsV14.ReadAll.ReadAllFn.sleeper = (long millis) -> {}; + } + + @Test + @Category(NeedsRunner.class) + public void testRead() { + when(MockGoogleAdsClientFactory.GOOGLE_ADS_SERVICE_STUB_V14 + .searchStreamCallable() + .call(any(SearchGoogleAdsStreamRequest.class)) + .iterator()) + .thenReturn( + ImmutableList.of( + SearchGoogleAdsStreamResponse.newBuilder() + .addResults(GoogleAdsRow.newBuilder()) + .build()) + .iterator()); + + PCollection rows = + pipeline.apply( + GoogleAdsIO.v14() + .read() + .withGoogleAdsClientFactory(new MockGoogleAdsClientFactory()) + .withDeveloperToken("abc") + .withCustomerId(123L) + .withQuery("GAQL")); + PAssert.thatSingleton(rows).isEqualTo(GoogleAdsRow.getDefaultInstance()); + + pipeline.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testReadWithFailureFromMaxRetriesExceeded() throws Exception { + when(MockGoogleAdsClientFactory.GOOGLE_ADS_SERVICE_STUB_V14 + .searchStreamCallable() + .call(any(SearchGoogleAdsStreamRequest.class))) + .thenThrow( + new GoogleAdsException( + new ApiException(null, GrpcStatusCode.of(Code.UNKNOWN), false), + GoogleAdsFailure.newBuilder() + .addErrors( + GoogleAdsError.newBuilder() + .setErrorCode( + ErrorCode.newBuilder() + .setInternalError(InternalError.TRANSIENT_ERROR))) + .build(), + new Metadata())); + + pipeline.apply( + GoogleAdsIO.v14() + .read() + .withGoogleAdsClientFactory(new MockGoogleAdsClientFactory()) + .withDeveloperToken("abc") + .withCustomerId(123L) + .withQuery("GAQL")); + + PipelineExecutionException exception = + Assert.assertThrows( + "Last retryable error after max retries", + Pipeline.PipelineExecutionException.class, + pipeline::run); + Assert.assertEquals(IOException.class, exception.getCause().getClass()); + Assert.assertEquals( + "Unable to get Google Ads response after retrying 5 times using query (GAQL)", + exception.getCause().getMessage()); + } + + @Test + @Category(NeedsRunner.class) + public void testReadWithFailureFromNonRetryableError() throws Exception { + when(MockGoogleAdsClientFactory.GOOGLE_ADS_SERVICE_STUB_V14 + .searchStreamCallable() + .call(any(SearchGoogleAdsStreamRequest.class))) + .thenThrow( + new GoogleAdsException( + new ApiException(null, GrpcStatusCode.of(Code.UNKNOWN), false), + GoogleAdsFailure.newBuilder() + .addErrors( + GoogleAdsError.newBuilder() + .setErrorCode( + ErrorCode.newBuilder() + .setAuthenticationError( + AuthenticationError.OAUTH_TOKEN_REVOKED))) + .build(), + new Metadata())); + + pipeline.apply( + GoogleAdsIO.v14() + .read() + .withGoogleAdsClientFactory(new MockGoogleAdsClientFactory()) + .withDeveloperToken("abc") + .withCustomerId(123L) + .withQuery("GAQL")); + + PipelineExecutionException exception = + Assert.assertThrows( + "First non-retryable error", + Pipeline.PipelineExecutionException.class, + pipeline::run); + Assert.assertEquals(IOException.class, exception.getCause().getClass()); + Assert.assertEquals( + "com.google.ads.googleads.v14.errors.GoogleAdsException: errors {\n" + + " error_code {\n" + + " authentication_error: OAUTH_TOKEN_REVOKED\n" + + " }\n" + + "}\n", + exception.getCause().getMessage()); + } + + @Test + @Category(NeedsRunner.class) + public void testReadWithRecoveryFromInternalError() throws Exception { + when(MockGoogleAdsClientFactory.GOOGLE_ADS_SERVICE_STUB_V14 + .searchStreamCallable() + .call(any(SearchGoogleAdsStreamRequest.class)) + .iterator()) + .thenThrow( + new GoogleAdsException( + new ApiException(null, GrpcStatusCode.of(Code.UNKNOWN), false), + GoogleAdsFailure.newBuilder() + .addErrors( + GoogleAdsError.newBuilder() + .setErrorCode( + ErrorCode.newBuilder() + .setInternalError(InternalError.INTERNAL_ERROR))) + .build(), + new Metadata())) + .thenThrow( + new GoogleAdsException( + new ApiException(null, GrpcStatusCode.of(Code.UNKNOWN), false), + GoogleAdsFailure.newBuilder() + .addErrors( + GoogleAdsError.newBuilder() + .setErrorCode( + ErrorCode.newBuilder() + .setInternalError(InternalError.TRANSIENT_ERROR))) + .build(), + new Metadata())) + .thenReturn( + ImmutableList.of( + SearchGoogleAdsStreamResponse.newBuilder() + .addResults(GoogleAdsRow.newBuilder()) + .build()) + .iterator()); + + PCollection rows = + pipeline.apply( + GoogleAdsIO.v14() + .read() + .withGoogleAdsClientFactory(new MockGoogleAdsClientFactory()) + .withDeveloperToken("abc") + .withCustomerId(123L) + .withQuery("GAQL")); + PAssert.thatSingleton(rows).isEqualTo(GoogleAdsRow.getDefaultInstance()); + + pipeline.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testReadWithRecoveryFromQuotaErrorWithRetryDelay() throws Exception { + when(MockGoogleAdsClientFactory.GOOGLE_ADS_SERVICE_STUB_V14 + .searchStreamCallable() + .call(any(SearchGoogleAdsStreamRequest.class)) + .iterator()) + .thenThrow( + new GoogleAdsException( + new ApiException(null, GrpcStatusCode.of(Code.UNKNOWN), false), + GoogleAdsFailure.newBuilder() + .addErrors( + GoogleAdsError.newBuilder() + .setErrorCode( + ErrorCode.newBuilder() + .setQuotaError(QuotaError.RESOURCE_EXHAUSTED)) + .setDetails( + ErrorDetails.newBuilder() + .setQuotaErrorDetails( + QuotaErrorDetails.newBuilder() + .setRetryDelay(Duration.newBuilder().setSeconds(0))))) + .build(), + new Metadata())) + .thenThrow( + new GoogleAdsException( + new ApiException(null, GrpcStatusCode.of(Code.UNKNOWN), false), + GoogleAdsFailure.newBuilder() + .addErrors( + GoogleAdsError.newBuilder() + .setErrorCode( + ErrorCode.newBuilder() + .setQuotaError(QuotaError.RESOURCE_EXHAUSTED)) + .setDetails( + ErrorDetails.newBuilder() + .setQuotaErrorDetails( + QuotaErrorDetails.newBuilder() + .setRetryDelay( + Duration.newBuilder().setSeconds(42))))) + .build(), + new Metadata())) + .thenReturn( + ImmutableList.of( + SearchGoogleAdsStreamResponse.newBuilder() + .addResults(GoogleAdsRow.newBuilder()) + .build()) + .iterator()); + + PCollection rows = + pipeline.apply( + GoogleAdsIO.v14() + .read() + .withGoogleAdsClientFactory(new MockGoogleAdsClientFactory()) + .withDeveloperToken("abc") + .withCustomerId(123L) + .withQuery("GAQL")); + PAssert.thatSingleton(rows).isEqualTo(GoogleAdsRow.getDefaultInstance()); + + pipeline.run(); + } + } +} diff --git a/sdks/java/io/google-ads/src/test/java/org/apache/beam/sdk/io/googleads/MockGoogleAdsClientFactory.java b/sdks/java/io/google-ads/src/test/java/org/apache/beam/sdk/io/googleads/MockGoogleAdsClientFactory.java new file mode 100644 index 000000000000..258b47763a4b --- /dev/null +++ b/sdks/java/io/google-ads/src/test/java/org/apache/beam/sdk/io/googleads/MockGoogleAdsClientFactory.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.googleads; + +import static org.mockito.Answers.RETURNS_DEEP_STUBS; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.withSettings; + +import com.google.ads.googleads.lib.GoogleAdsClient; +import com.google.ads.googleads.v14.services.GoogleAdsServiceClient; +import com.google.ads.googleads.v14.services.stub.GoogleAdsServiceStub; +import org.checkerframework.checker.nullness.qual.Nullable; + +class MockGoogleAdsClientFactory implements GoogleAdsClientFactory { + static final GoogleAdsServiceStub GOOGLE_ADS_SERVICE_STUB_V14 = + mock(GoogleAdsServiceStub.class, withSettings().defaultAnswer(RETURNS_DEEP_STUBS)); + + @Override + public GoogleAdsClient newGoogleAdsClient( + GoogleAdsOptions options, + @Nullable String developerToken, + @Nullable Long linkedCustomerId, + @Nullable Long loginCustomerId) { + GoogleAdsClient mockGoogleAdsClient = + mock(GoogleAdsClient.class, withSettings().defaultAnswer(RETURNS_DEEP_STUBS)); + when(mockGoogleAdsClient.getVersion14().createGoogleAdsServiceClient()) + .thenReturn(GoogleAdsServiceClient.create(GOOGLE_ADS_SERVICE_STUB_V14)); + return mockGoogleAdsClient; + } +} diff --git a/settings.gradle.kts b/settings.gradle.kts index 5ba8096f4842..44ade9dab745 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -228,6 +228,7 @@ include(":sdks:java:io:bigquery-io-perf-tests") include(":sdks:java:io:cdap") include(":sdks:java:io:csv") include(":sdks:java:io:file-schema-transform") +include(":sdks:java:io:google-ads") include(":sdks:java:io:google-cloud-platform") include(":sdks:java:io:google-cloud-platform:expansion-service") include(":sdks:java:io:hadoop-common") From e95b2f810cbd63837b0d73053d3408114283b898 Mon Sep 17 00:00:00 2001 From: Steven van Rossum Date: Wed, 26 Jul 2023 16:34:17 +0200 Subject: [PATCH 02/16] Remove trailing whitespace --- sdks/java/io/google-ads/build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/io/google-ads/build.gradle b/sdks/java/io/google-ads/build.gradle index 4b07c7ddbfa7..81057ce3cdc8 100644 --- a/sdks/java/io/google-ads/build.gradle +++ b/sdks/java/io/google-ads/build.gradle @@ -31,7 +31,7 @@ dependencies { implementation library.java.google_auth_library_credentials implementation library.java.google_auth_library_oauth2_http implementation library.java.protobuf_java - implementation library.java.protobuf_java_util + implementation library.java.protobuf_java_util implementation library.java.google_ads implementation library.java.google_ads_stubs_v14 implementation library.java.joda_time From c57f5121812332d7b5b41492908ec20fc8be459b Mon Sep 17 00:00:00 2001 From: Steven van Rossum Date: Sat, 5 Aug 2023 17:39:45 +0200 Subject: [PATCH 03/16] Add link to OAuth2 documentation in the Google Ads API --- .../org/apache/beam/sdk/io/googleads/GoogleAdsV14.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/GoogleAdsV14.java b/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/GoogleAdsV14.java index 8ddcaf42aed9..1ac8ed756c69 100644 --- a/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/GoogleAdsV14.java +++ b/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/GoogleAdsV14.java @@ -68,8 +68,12 @@ *

The Google Ads API does not use service account credentials in the same way as Google Cloud * Platform APIs do. Service account credentials are typically only used to delegate (using * domain-wide delegation) access through end user accounts. Providing credentials using the OAuth2 - * desktop flow may be preferable over domain wide delegation. Defaults for OAuth 2.0 credentials, - * refresh token and developer token can be provided using the following flags: + * desktop flow may be preferable over domain wide delegation. Please refer to the Google Ads API + * documentation for more information on OAuth2 in the Google Ads API. + * + *

Defaults for OAuth 2.0 credentials, refresh token and developer token can be provided using + * the following flags: * *

  *   --googleAdsClientId=your-client-id

From 072a0274167d5ef22403832cf3adf175487c82ff Mon Sep 17 00:00:00 2001
From: Steven van Rossum 
Date: Sat, 5 Aug 2023 20:27:21 +0200
Subject: [PATCH 04/16] Document retry configuration

---
 .../java/org/apache/beam/sdk/io/googleads/GoogleAdsV14.java     | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/GoogleAdsV14.java b/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/GoogleAdsV14.java
index 1ac8ed756c69..52068a6379fc 100644
--- a/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/GoogleAdsV14.java
+++ b/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/GoogleAdsV14.java
@@ -458,6 +458,8 @@ public PCollection expand(PCollection {
+      // The default retry configuration is based on that of services with a comparable
+      // potential volume of requests to the Google Ads API.
       private static final int MAX_RETRIES = 5;
       private static final FluentBackoff BACKOFF =
           FluentBackoff.DEFAULT

From 993f63d00198e60fea53b73f354b6e30bc976582 Mon Sep 17 00:00:00 2001
From: Steven van Rossum 
Date: Sun, 6 Aug 2023 14:01:09 +0200
Subject: [PATCH 05/16] Capitalize long literal suffix in documentation

---
 .../java/org/apache/beam/sdk/io/googleads/GoogleAdsV14.java     | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/GoogleAdsV14.java b/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/GoogleAdsV14.java
index 52068a6379fc..4e7617069459 100644
--- a/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/GoogleAdsV14.java
+++ b/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/GoogleAdsV14.java
@@ -116,7 +116,7 @@
  *         Create.of(
  *             ImmutableList.of(
  *                 SearchGoogleAdsStreamRequest.newBuilder()
- *                     .setCustomerId(Long.toString(1234567890l))
+ *                     .setCustomerId(Long.toString(1234567890L))
  *                     .setQuery(
  *                         "SELECT"
  *                             + "campaign.id,"

From b151a37584e322ca872690fe9e9b54830b9bff61 Mon Sep 17 00:00:00 2001
From: Steven van Rossum 
Date: Sun, 6 Aug 2023 14:08:45 +0200
Subject: [PATCH 06/16] Make GoogleAdsV14.Read consume customer IDs from a
 PCollection

---
 .../beam/sdk/io/googleads/GoogleAdsV14.java   |  75 ++------
 .../sdk/io/googleads/GoogleAdsV14Test.java    | 178 ++++++++----------
 2 files changed, 92 insertions(+), 161 deletions(-)

diff --git a/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/GoogleAdsV14.java b/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/GoogleAdsV14.java
index 4e7617069459..b6c48e75d3d0 100644
--- a/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/GoogleAdsV14.java
+++ b/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/GoogleAdsV14.java
@@ -36,10 +36,9 @@
 import com.google.protobuf.util.Durations;
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.List;
 import java.util.Optional;
+import org.apache.beam.sdk.io.googleads.GoogleAdsV14.DefaultRateLimitPolicy;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
 import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
@@ -53,11 +52,9 @@
 import org.apache.beam.sdk.util.BackOffUtils;
 import org.apache.beam.sdk.util.FluentBackoff;
 import org.apache.beam.sdk.util.Sleeper;
-import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.RateLimiter;
 import org.checkerframework.checker.nullness.qual.Nullable;
 import org.joda.time.Duration;
@@ -82,21 +79,24 @@
  *   --googleAdsDeveloperToken=your-developer-token
  * 
* - *

Use {@link GoogleAdsV14#read()} to read a bounded {@link PCollection} of {@link GoogleAdsRow} - * from a query using {@link Read#withQuery(String)} and one or a few customer IDs using either - * {@link Read#withCustomerId(Long)} or {@link Read#withCustomerIds(List)}. Alternatively, use - * {@link GoogleAdsV14#readAll()} to read either a bounded or unbounded {@link PCollection} of - * {@link GoogleAdsRow} from a {@link PCollection} of {@link SearchGoogleAdsStreamRequest}. + *

Use {@link GoogleAdsV14#read()} to read either a bounded or unbounded {@link PCollection} of + * {@link GoogleAdsRow} from a single Google Ads Query + * Language query using {@link Read#withQuery(String)} and a {@link PCollection} of customer + * IDs. Alternatively, use {@link GoogleAdsV14#readAll()} to read either a bounded or unbounded + * {@link PCollection} of {@link GoogleAdsRow} from a {@link PCollection} of {@link + * SearchGoogleAdsStreamRequest} potentially containing many different queries. * *

For example, using {@link GoogleAdsV14#read()}: * *

{@code
  * Pipeline p = Pipeline.create();
+ * PCollection customerIds =
+ *     p.apply(Create.of("1234567890"));
  * PCollection rows =
- *     p.apply(
+ *     customerIds.apply(
  *         GoogleAdsIO.v14()
  *             .read()
- *             .withCustomerId(1234567890l)
  *             .withQuery(
  *                 "SELECT"
  *                     + "campaign.id,"
@@ -172,13 +172,12 @@ public ReadAll readAll() {
    * @see #readAll()
    */
   @AutoValue
-  public abstract static class Read extends PTransform> {
+  public abstract static class Read
+      extends PTransform, PCollection> {
     abstract @Nullable String getDeveloperToken();
 
     abstract @Nullable Long getLoginCustomerId();
 
-    abstract @Nullable List getCustomerIds();
-
     abstract @Nullable String getQuery();
 
     abstract GoogleAdsClientFactory getGoogleAdsClientFactory();
@@ -193,8 +192,6 @@ abstract static class Builder {
 
       abstract Builder setLoginCustomerId(@Nullable Long loginCustomerId);
 
-      abstract Builder setCustomerIds(List customerId);
-
       abstract Builder setQuery(String query);
 
       abstract Builder setGoogleAdsClientFactory(GoogleAdsClientFactory googleAdsClientFactory);
@@ -228,35 +225,6 @@ public Read withLoginCustomerId(@Nullable Long loginCustomerId) {
       return toBuilder().setLoginCustomerId(loginCustomerId).build();
     }
 
-    /**
-     * Creates and returns a new {@link Read} transform with the specified customer IDs to query.
-     *
-     * @param customerIds
-     * @return A new {@link Read} transform with the specified customer IDs to query.
-     * @see SearchGoogleAdsStreamRequest
-     * @see #withQuery(String)
-     */
-    public Read withCustomerIds(List customerIds) {
-      checkArgumentNotNull(customerIds, "customerIds cannot be null");
-      checkArgument(customerIds.size() > 0, "customerIds cannot be empty");
-
-      return toBuilder().setCustomerIds(ImmutableList.copyOf(customerIds)).build();
-    }
-
-    /**
-     * Creates and returns a new {@link Read} transform with the specified customer ID to query.
-     *
-     * @param customerId
-     * @return A new {@link Read} transform with the specified customer ID to query.
-     * @see SearchGoogleAdsStreamRequest
-     * @see #withQuery(String)
-     */
-    public Read withCustomerId(Long customerId) {
-      checkArgumentNotNull(customerId, "customerId cannot be null");
-
-      return withCustomerIds(ImmutableList.of(customerId));
-    }
-
     /**
      * Creates and returns a new {@link Read} transform with the specified query. The query will be
      * executed for each customer ID.
@@ -264,8 +232,6 @@ public Read withCustomerId(Long customerId) {
      * @param query
      * @return A new {@link Read} transform with the specified query.
      * @see SearchGoogleAdsStreamRequest
-     * @see #withCustomerId(Long)
-     * @see #withCustomerIds(List)
      */
     public Read withQuery(String query) {
       checkArgumentNotNull(query, "query cannot be null");
@@ -311,20 +277,17 @@ public Read withRateLimitPolicy(RateLimitPolicyFactory rateLimitPolicyFactory) {
     }
 
     @Override
-    public PCollection expand(PBegin input) {
+    public PCollection expand(PCollection input) {
       String query = getQuery();
-      List customerIds = getCustomerIds();
       checkArgumentNotNull(query, "withQuery() is required");
-      checkArgumentNotNull(customerIds, "either withCustomerId() or withCustomerIds() is required");
 
       return input
-          .apply(Create.of(customerIds))
           .apply(
               MapElements.into(TypeDescriptor.of(SearchGoogleAdsStreamRequest.class))
                   .via(
                       customerId ->
                           SearchGoogleAdsStreamRequest.newBuilder()
-                              .setCustomerId(Long.toString(customerId))
+                              .setCustomerId(customerId)
                               .setQuery(query)
                               .build()))
           .apply(
@@ -339,12 +302,8 @@ public PCollection expand(PBegin input) {
     @Override
     public void populateDisplayData(DisplayData.Builder builder) {
       super.populateDisplayData(builder);
-      builder
-          .addIfNotDefault(
-              DisplayData.item("customerIds", String.valueOf(getCustomerIds()))
-                  .withLabel("Customer IDs"),
-              "null")
-          .addIfNotNull(DisplayData.item("query", String.valueOf(getQuery())).withLabel("Query"));
+      builder.addIfNotNull(
+          DisplayData.item("query", String.valueOf(getQuery())).withLabel("Query"));
     }
   }
 
diff --git a/sdks/java/io/google-ads/src/test/java/org/apache/beam/sdk/io/googleads/GoogleAdsV14Test.java b/sdks/java/io/google-ads/src/test/java/org/apache/beam/sdk/io/googleads/GoogleAdsV14Test.java
index cd59531bec39..c048d9c43c20 100644
--- a/sdks/java/io/google-ads/src/test/java/org/apache/beam/sdk/io/googleads/GoogleAdsV14Test.java
+++ b/sdks/java/io/google-ads/src/test/java/org/apache/beam/sdk/io/googleads/GoogleAdsV14Test.java
@@ -38,8 +38,6 @@
 import io.grpc.Metadata;
 import io.grpc.Status.Code;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
 import org.apache.beam.sdk.extensions.gcp.auth.NoopCredentialFactory;
@@ -49,6 +47,7 @@
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeDescriptors;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import org.junit.Assert;
 import org.junit.Before;
@@ -125,43 +124,29 @@ public void testReadAllExpandWithoutValidRateLimitPolicy() throws Exception {
 
     @Test
     public void testReadExpandWithDeveloperTokenFromBuilder() {
-      pipeline.apply(
-          GoogleAdsIO.v14()
-              .read()
-              .withDeveloperToken("abc")
-              .withCustomerId(123L)
-              .withQuery("GAQL"));
+      pipeline
+          .apply(Create.empty(TypeDescriptors.strings()))
+          .apply(GoogleAdsIO.v14().read().withDeveloperToken("abc").withQuery("GAQL"));
       pipeline.getOptions().as(GoogleAdsOptions.class).setGoogleAdsDeveloperToken("abc");
-      pipeline.apply(GoogleAdsIO.v14().read().withCustomerId(123L).withQuery("GAQL"));
+      pipeline
+          .apply(Create.empty(TypeDescriptors.strings()))
+          .apply(GoogleAdsIO.v14().read().withQuery("GAQL"));
     }
 
     @Test
     public void testReadExpandWithDeveloperTokenFromOptions() {
       pipeline.getOptions().as(GoogleAdsOptions.class).setGoogleAdsDeveloperToken("abc");
-      pipeline.apply(GoogleAdsIO.v14().read().withCustomerId(123L).withQuery("GAQL"));
+      pipeline
+          .apply(Create.empty(TypeDescriptors.strings()))
+          .apply(GoogleAdsIO.v14().read().withQuery("GAQL"));
     }
 
     @Test
     public void testReadExpandWithDeveloperTokenFromOptionsAndBuilder() {
       pipeline.getOptions().as(GoogleAdsOptions.class).setGoogleAdsDeveloperToken("abc");
-      pipeline.apply(
-          GoogleAdsIO.v14().read().withDeveloperToken(null).withCustomerId(123L).withQuery("GAQL"));
-    }
-
-    @Test
-    public void testReadExpandWithoutCustomerId() throws Exception {
-      Assert.assertThrows(
-          "Customer ID required but not provided",
-          IllegalArgumentException.class,
-          () -> pipeline.apply(GoogleAdsIO.v14().read().withQuery("GAQL")));
-    }
-
-    @Test
-    public void testReadExpandWithoutCustomerIds() throws Exception {
-      Assert.assertThrows(
-          "Customer IDs required but not provided",
-          IllegalArgumentException.class,
-          () -> pipeline.apply(GoogleAdsIO.v14().read().withQuery("GAQL")));
+      pipeline
+          .apply(Create.empty(TypeDescriptors.strings()))
+          .apply(GoogleAdsIO.v14().read().withDeveloperToken(null).withQuery("GAQL"));
     }
 
     @Test
@@ -169,7 +154,10 @@ public void testReadExpandWithoutDeveloperToken() throws Exception {
       Assert.assertThrows(
           "Developer token required but not provided",
           IllegalArgumentException.class,
-          () -> pipeline.apply(GoogleAdsIO.v14().read().withCustomerId(123L).withQuery("GAQL")));
+          () ->
+              pipeline
+                  .apply(Create.empty(TypeDescriptors.strings()))
+                  .apply(GoogleAdsIO.v14().read().withQuery("GAQL")));
     }
 
     @Test
@@ -177,32 +165,10 @@ public void testReadExpandWithoutQuery() throws Exception {
       Assert.assertThrows(
           "Query required but not provided",
           IllegalArgumentException.class,
-          () -> pipeline.apply(GoogleAdsIO.v14().read().withCustomerId(123L)));
-    }
-
-    @Test
-    public void testReadExpandWithoutValidCustomerIds() throws Exception {
-      Assert.assertThrows(
-          "Non-null customer IDs list required but not provided",
-          IllegalArgumentException.class,
-          () -> pipeline.apply(GoogleAdsIO.v14().read().withCustomerIds(null).withQuery("GAQL")));
-
-      Assert.assertThrows(
-          "At least one customer ID required but not provided",
-          IllegalArgumentException.class,
-          () ->
-              pipeline.apply(
-                  GoogleAdsIO.v14().read().withCustomerIds(ImmutableList.of()).withQuery("GAQL")));
-
-      List customerIds = new ArrayList<>();
-      customerIds.add(123L);
-      customerIds.add(null);
-      Assert.assertThrows(
-          "Non-null customer IDs required but not provided",
-          NullPointerException.class,
           () ->
-              pipeline.apply(
-                  GoogleAdsIO.v14().read().withCustomerIds(customerIds).withQuery("GAQL")));
+              pipeline
+                  .apply(Create.empty(TypeDescriptors.strings()))
+                  .apply(GoogleAdsIO.v14().read()));
     }
 
     @Test
@@ -211,12 +177,10 @@ public void testReadExpandWithoutValidGoogleAdsClientFactory() throws Exception
           "Non-empty googleAdsClientFactory required but not provided",
           IllegalArgumentException.class,
           () ->
-              pipeline.apply(
-                  GoogleAdsIO.v14()
-                      .read()
-                      .withCustomerId(123L)
-                      .withQuery("GAQL")
-                      .withGoogleAdsClientFactory(null)));
+              pipeline
+                  .apply(Create.empty(TypeDescriptors.strings()))
+                  .apply(
+                      GoogleAdsIO.v14().read().withQuery("GAQL").withGoogleAdsClientFactory(null)));
     }
 
     @Test
@@ -224,12 +188,18 @@ public void testReadExpandWithoutValidQuery() throws Exception {
       Assert.assertThrows(
           "Non-null query required but not provided",
           IllegalArgumentException.class,
-          () -> pipeline.apply(GoogleAdsIO.v14().read().withCustomerId(123L).withQuery(null)));
+          () ->
+              pipeline
+                  .apply(Create.empty(TypeDescriptors.strings()))
+                  .apply(GoogleAdsIO.v14().read().withQuery(null)));
 
       Assert.assertThrows(
           "Non-empty query required but not provided",
           IllegalArgumentException.class,
-          () -> pipeline.apply(GoogleAdsIO.v14().read().withCustomerId(123L).withQuery("")));
+          () ->
+              pipeline
+                  .apply(Create.empty(TypeDescriptors.strings()))
+                  .apply(GoogleAdsIO.v14().read().withQuery("")));
     }
 
     @Test
@@ -238,12 +208,9 @@ public void testReadExpandWithoutValidRateLimitPolicy() throws Exception {
           "Non-empty rateLimitPolicy required but not provided",
           IllegalArgumentException.class,
           () ->
-              pipeline.apply(
-                  GoogleAdsIO.v14()
-                      .read()
-                      .withCustomerId(123L)
-                      .withQuery("GAQL")
-                      .withRateLimitPolicy(null)));
+              pipeline
+                  .apply(Create.empty(TypeDescriptors.strings()))
+                  .apply(GoogleAdsIO.v14().read().withQuery("GAQL").withRateLimitPolicy(null)));
     }
   }
 
@@ -273,13 +240,14 @@ public void testRead() {
                   .iterator());
 
       PCollection rows =
-          pipeline.apply(
-              GoogleAdsIO.v14()
-                  .read()
-                  .withGoogleAdsClientFactory(new MockGoogleAdsClientFactory())
-                  .withDeveloperToken("abc")
-                  .withCustomerId(123L)
-                  .withQuery("GAQL"));
+          pipeline
+              .apply(Create.of("123"))
+              .apply(
+                  GoogleAdsIO.v14()
+                      .read()
+                      .withGoogleAdsClientFactory(new MockGoogleAdsClientFactory())
+                      .withDeveloperToken("abc")
+                      .withQuery("GAQL"));
       PAssert.thatSingleton(rows).isEqualTo(GoogleAdsRow.getDefaultInstance());
 
       pipeline.run();
@@ -303,13 +271,14 @@ public void testReadWithFailureFromMaxRetriesExceeded() throws Exception {
                       .build(),
                   new Metadata()));
 
-      pipeline.apply(
-          GoogleAdsIO.v14()
-              .read()
-              .withGoogleAdsClientFactory(new MockGoogleAdsClientFactory())
-              .withDeveloperToken("abc")
-              .withCustomerId(123L)
-              .withQuery("GAQL"));
+      pipeline
+          .apply(Create.of("123"))
+          .apply(
+              GoogleAdsIO.v14()
+                  .read()
+                  .withGoogleAdsClientFactory(new MockGoogleAdsClientFactory())
+                  .withDeveloperToken("abc")
+                  .withQuery("GAQL"));
 
       PipelineExecutionException exception =
           Assert.assertThrows(
@@ -341,13 +310,14 @@ public void testReadWithFailureFromNonRetryableError() throws Exception {
                       .build(),
                   new Metadata()));
 
-      pipeline.apply(
-          GoogleAdsIO.v14()
-              .read()
-              .withGoogleAdsClientFactory(new MockGoogleAdsClientFactory())
-              .withDeveloperToken("abc")
-              .withCustomerId(123L)
-              .withQuery("GAQL"));
+      pipeline
+          .apply(Create.of("123"))
+          .apply(
+              GoogleAdsIO.v14()
+                  .read()
+                  .withGoogleAdsClientFactory(new MockGoogleAdsClientFactory())
+                  .withDeveloperToken("abc")
+                  .withQuery("GAQL"));
 
       PipelineExecutionException exception =
           Assert.assertThrows(
@@ -401,13 +371,14 @@ public void testReadWithRecoveryFromInternalError() throws Exception {
                   .iterator());
 
       PCollection rows =
-          pipeline.apply(
-              GoogleAdsIO.v14()
-                  .read()
-                  .withGoogleAdsClientFactory(new MockGoogleAdsClientFactory())
-                  .withDeveloperToken("abc")
-                  .withCustomerId(123L)
-                  .withQuery("GAQL"));
+          pipeline
+              .apply(Create.of("123"))
+              .apply(
+                  GoogleAdsIO.v14()
+                      .read()
+                      .withGoogleAdsClientFactory(new MockGoogleAdsClientFactory())
+                      .withDeveloperToken("abc")
+                      .withQuery("GAQL"));
       PAssert.thatSingleton(rows).isEqualTo(GoogleAdsRow.getDefaultInstance());
 
       pipeline.run();
@@ -461,13 +432,14 @@ public void testReadWithRecoveryFromQuotaErrorWithRetryDelay() throws Exception
                   .iterator());
 
       PCollection rows =
-          pipeline.apply(
-              GoogleAdsIO.v14()
-                  .read()
-                  .withGoogleAdsClientFactory(new MockGoogleAdsClientFactory())
-                  .withDeveloperToken("abc")
-                  .withCustomerId(123L)
-                  .withQuery("GAQL"));
+          pipeline
+              .apply(Create.of("123"))
+              .apply(
+                  GoogleAdsIO.v14()
+                      .read()
+                      .withGoogleAdsClientFactory(new MockGoogleAdsClientFactory())
+                      .withDeveloperToken("abc")
+                      .withQuery("GAQL"));
       PAssert.thatSingleton(rows).isEqualTo(GoogleAdsRow.getDefaultInstance());
 
       pipeline.run();

From d1d799a6438f3f8b160133e6a915f2b6c993f889 Mon Sep 17 00:00:00 2001
From: Steven van Rossum 
Date: Mon, 7 Aug 2023 00:28:18 +0200
Subject: [PATCH 07/16] Change non-empty to non-null in test assertions

---
 .../apache/beam/sdk/io/googleads/GoogleAdsV14Test.java    | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git a/sdks/java/io/google-ads/src/test/java/org/apache/beam/sdk/io/googleads/GoogleAdsV14Test.java b/sdks/java/io/google-ads/src/test/java/org/apache/beam/sdk/io/googleads/GoogleAdsV14Test.java
index c048d9c43c20..84f42e017684 100644
--- a/sdks/java/io/google-ads/src/test/java/org/apache/beam/sdk/io/googleads/GoogleAdsV14Test.java
+++ b/sdks/java/io/google-ads/src/test/java/org/apache/beam/sdk/io/googleads/GoogleAdsV14Test.java
@@ -103,7 +103,7 @@ public void testReadAllExpandWithoutDeveloperToken() throws Exception {
     @Test
     public void testReadAllExpandWithoutValidGoogleAdsClientFactory() throws Exception {
       Assert.assertThrows(
-          "Non-empty googleAdsClientFactory required but not provided",
+          "Non-null googleAdsClientFactory required but not provided",
           IllegalArgumentException.class,
           () ->
               pipeline
@@ -114,7 +114,7 @@ public void testReadAllExpandWithoutValidGoogleAdsClientFactory() throws Excepti
     @Test
     public void testReadAllExpandWithoutValidRateLimitPolicy() throws Exception {
       Assert.assertThrows(
-          "Non-empty rateLimitPolicy required but not provided",
+          "Non-null rateLimitPolicy required but not provided",
           IllegalArgumentException.class,
           () ->
               pipeline
@@ -174,7 +174,7 @@ public void testReadExpandWithoutQuery() throws Exception {
     @Test
     public void testReadExpandWithoutValidGoogleAdsClientFactory() throws Exception {
       Assert.assertThrows(
-          "Non-empty googleAdsClientFactory required but not provided",
+          "Non-null googleAdsClientFactory required but not provided",
           IllegalArgumentException.class,
           () ->
               pipeline
@@ -205,7 +205,7 @@ public void testReadExpandWithoutValidQuery() throws Exception {
     @Test
     public void testReadExpandWithoutValidRateLimitPolicy() throws Exception {
       Assert.assertThrows(
-          "Non-empty rateLimitPolicy required but not provided",
+          "Non-null rateLimitPolicy required but not provided",
           IllegalArgumentException.class,
           () ->
               pipeline

From 6fa42bfab1e8b0607ea279be177fbdac221b9a84 Mon Sep 17 00:00:00 2001
From: Steven van Rossum 
Date: Mon, 7 Aug 2023 09:55:35 +0200
Subject: [PATCH 08/16] Use Long.toString in transform construction example

---
 .../java/org/apache/beam/sdk/io/googleads/GoogleAdsV14.java     | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/GoogleAdsV14.java b/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/GoogleAdsV14.java
index b6c48e75d3d0..f2fcfe875241 100644
--- a/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/GoogleAdsV14.java
+++ b/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/GoogleAdsV14.java
@@ -92,7 +92,7 @@
  * 
{@code
  * Pipeline p = Pipeline.create();
  * PCollection customerIds =
- *     p.apply(Create.of("1234567890"));
+ *     p.apply(Create.of(Long.toString(1234567890L)));
  * PCollection rows =
  *     customerIds.apply(
  *         GoogleAdsIO.v14()

From 0b676a496e62041ea049796cf07cbe377a102a69 Mon Sep 17 00:00:00 2001
From: Steven van Rossum 
Date: Tue, 8 Aug 2023 00:00:23 +0200
Subject: [PATCH 09/16] Use EnsuresNonNull, RequiresNonNull instead of
 checkStateNotNull

---
 .../beam/sdk/io/googleads/GoogleAdsV14.java   | 24 ++++++++++++-------
 1 file changed, 16 insertions(+), 8 deletions(-)

diff --git a/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/GoogleAdsV14.java b/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/GoogleAdsV14.java
index f2fcfe875241..ad876f0bfaaa 100644
--- a/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/GoogleAdsV14.java
+++ b/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/GoogleAdsV14.java
@@ -18,7 +18,6 @@
 package org.apache.beam.sdk.io.googleads;
 
 import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
-import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
 import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
 
 import com.google.ads.googleads.lib.GoogleAdsClient;
@@ -56,7 +55,9 @@
 import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.RateLimiter;
+import org.checkerframework.checker.nullness.qual.EnsuresNonNull;
 import org.checkerframework.checker.nullness.qual.Nullable;
+import org.checkerframework.checker.nullness.qual.RequiresNonNull;
 import org.joda.time.Duration;
 
 /**
@@ -439,23 +440,30 @@ static class ReadAllFn extends DoFn
       }
 
       @Setup
+      @EnsuresNonNull({"googleAdsClient", "googleAdsServiceClient", "rateLimitPolicy"})
       public void setup(PipelineOptions options) {
         GoogleAdsOptions adsOptions = options.as(GoogleAdsOptions.class);
 
-        googleAdsClient =
+        final GoogleAdsClient googleAdsClient =
             spec.getGoogleAdsClientFactory()
                 .newGoogleAdsClient(
                     adsOptions, spec.getDeveloperToken(), null, spec.getLoginCustomerId());
-        googleAdsServiceClient = googleAdsClient.getVersion14().createGoogleAdsServiceClient();
-        rateLimitPolicy = spec.getRateLimitPolicyFactory().getRateLimitPolicy();
+        final GoogleAdsServiceClient googleAdsServiceClient =
+            googleAdsClient.getVersion14().createGoogleAdsServiceClient();
+        final RateLimitPolicy rateLimitPolicy =
+            spec.getRateLimitPolicyFactory().getRateLimitPolicy();
+
+        this.googleAdsClient = googleAdsClient;
+        this.googleAdsServiceClient = googleAdsServiceClient;
+        this.rateLimitPolicy = rateLimitPolicy;
       }
 
       @ProcessElement
+      @RequiresNonNull({"googleAdsClient", "googleAdsServiceClient", "rateLimitPolicy"})
       public void processElement(ProcessContext c) throws IOException, InterruptedException {
-        GoogleAdsClient googleAdsClient = checkStateNotNull(this.googleAdsClient);
-        GoogleAdsServiceClient googleAdsServiceClient =
-            checkStateNotNull(this.googleAdsServiceClient);
-        RateLimitPolicy rateLimitPolicy = checkStateNotNull(this.rateLimitPolicy);
+        final GoogleAdsClient googleAdsClient = this.googleAdsClient;
+        final GoogleAdsServiceClient googleAdsServiceClient = this.googleAdsServiceClient;
+        final RateLimitPolicy rateLimitPolicy = this.rateLimitPolicy;
 
         BackOff backoff = BACKOFF.backoff();
         BackOff nextBackoff = backoff;

From 8a5fd70671b7250d551781f6ae4301cdd844b81c Mon Sep 17 00:00:00 2001
From: Steven van Rossum 
Date: Tue, 8 Aug 2023 00:48:03 +0200
Subject: [PATCH 10/16] Remove default rate limit policy

---
 .../beam/checkstyle/suppressions.xml          |  1 +
 .../beam/sdk/io/googleads/GoogleAdsV14.java   | 34 ++++---
 .../io/googleads/DummyRateLimitPolicy.java    | 34 +++++++
 .../sdk/io/googleads/GoogleAdsV14Test.java    | 97 ++++++++++++++++---
 4 files changed, 136 insertions(+), 30 deletions(-)
 create mode 100644 sdks/java/io/google-ads/src/test/java/org/apache/beam/sdk/io/googleads/DummyRateLimitPolicy.java

diff --git a/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml b/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml
index ea541de134f8..bb8954839d50 100644
--- a/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml
+++ b/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml
@@ -60,6 +60,7 @@
   
   
   
+  
   
   
   
diff --git a/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/GoogleAdsV14.java b/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/GoogleAdsV14.java
index ad876f0bfaaa..eb62ed0596b6 100644
--- a/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/GoogleAdsV14.java
+++ b/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/GoogleAdsV14.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.io.googleads;
 
 import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
 import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
 
 import com.google.ads.googleads.lib.GoogleAdsClient;
@@ -36,7 +37,6 @@
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Optional;
-import org.apache.beam.sdk.io.googleads.GoogleAdsV14.DefaultRateLimitPolicy;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
@@ -98,6 +98,7 @@
  *     customerIds.apply(
  *         GoogleAdsIO.v14()
  *             .read()
+ *             .withRateLimitPolicy(MY_RATE_LIMIT_POLICY)
  *             .withQuery(
  *                 "SELECT"
  *                     + "campaign.id,"
@@ -125,21 +126,21 @@
  *                             + "campaign.status"
  *                             + "FROM campaign")
  *                     .build())));
- * PCollection rows = requests.apply(GoogleAdsIO.v14().readAll());
+ * PCollection rows =
+ *     requests.apply(GoogleAdsIO.v14().readAll().withRateLimitPolicy(MY_RATE_LIMIT_POLICY));
  * p.run();
  * }
* *

Client-side rate limiting

* * On construction of a {@link GoogleAdsV14#read()} or {@link GoogleAdsV14#readAll()} transform a - * default rate limiting policy is provided to stay well under the rate limit for the Google Ads - * API, but this limit is only local to a single worker and operates without any knowledge of other - * applications using the same developer token for any customer ID. The Google Ads API enforces - * global limits from the developer token down to the customer ID and it is recommended to host a - * shared rate limiting service to coordinate traffic to the Google Ads API across all applications - * using the same developer token. Users of these transforms are strongly advised to implement their - * own {@link RateLimitPolicy} and {@link RateLimitPolicyFactory} to interact with a shared rate - * limiting service for any production workloads. + * rate limiting policy must be specified to stay well under the assigned quota for the Google Ads + * API. The Google Ads API enforces global limits from the developer token down to the customer ID + * and it is recommended to host a shared rate limiting service to coordinate traffic to the Google + * Ads API across all applications using the same developer token. Users of these transforms are + * strongly advised to implement their own {@link RateLimitPolicy} and {@link + * RateLimitPolicyFactory} to interact with a shared rate limiting service for any production + * workloads. * * @see GoogleAdsIO#v14() * @see GoogleAdsOptions @@ -154,14 +155,12 @@ private GoogleAdsV14() {} public Read read() { return new AutoValue_GoogleAdsV14_Read.Builder() .setGoogleAdsClientFactory(DefaultGoogleAdsClientFactory.getInstance()) - .setRateLimitPolicyFactory(() -> new DefaultRateLimitPolicy()) .build(); } public ReadAll readAll() { return new AutoValue_GoogleAdsV14_ReadAll.Builder() .setGoogleAdsClientFactory(DefaultGoogleAdsClientFactory.getInstance()) - .setRateLimitPolicyFactory(() -> new DefaultRateLimitPolicy()) .build(); } @@ -183,7 +182,7 @@ public abstract static class Read abstract GoogleAdsClientFactory getGoogleAdsClientFactory(); - abstract RateLimitPolicyFactory getRateLimitPolicyFactory(); + abstract @Nullable RateLimitPolicyFactory getRateLimitPolicyFactory(); abstract Builder toBuilder(); @@ -280,7 +279,9 @@ public Read withRateLimitPolicy(RateLimitPolicyFactory rateLimitPolicyFactory) { @Override public PCollection expand(PCollection input) { String query = getQuery(); + RateLimitPolicyFactory rateLimitPolicyFactory = getRateLimitPolicyFactory(); checkArgumentNotNull(query, "withQuery() is required"); + checkArgumentNotNull(rateLimitPolicyFactory, "withRateLimitPolicy() is required"); return input .apply( @@ -297,7 +298,7 @@ public PCollection expand(PCollection input) { .withDeveloperToken(getDeveloperToken()) .withLoginCustomerId(getLoginCustomerId()) .withGoogleAdsClientFactory(getGoogleAdsClientFactory()) - .withRateLimitPolicy(getRateLimitPolicyFactory())); + .withRateLimitPolicy(rateLimitPolicyFactory)); } @Override @@ -324,7 +325,7 @@ public abstract static class ReadAll abstract GoogleAdsClientFactory getGoogleAdsClientFactory(); - abstract RateLimitPolicyFactory getRateLimitPolicyFactory(); + abstract @Nullable RateLimitPolicyFactory getRateLimitPolicyFactory(); abstract Builder toBuilder(); @@ -408,6 +409,7 @@ public PCollection expand(PCollection new DummyRateLimitPolicy(); @RunWith(JUnit4.class) public static class ConstructionTests { @@ -70,7 +72,11 @@ public static class ConstructionTests { public void testReadAllExpandWithDeveloperTokenFromBuilder() { pipeline .apply(Create.empty(new TypeDescriptor() {})) - .apply(GoogleAdsIO.v14().readAll().withDeveloperToken("abc")); + .apply( + GoogleAdsIO.v14() + .readAll() + .withRateLimitPolicy(TEST_POLICY_FACTORY) + .withDeveloperToken("abc")); } @Test @@ -78,7 +84,7 @@ public void testReadAllExpandWithDeveloperTokenFromOptions() { pipeline.getOptions().as(GoogleAdsOptions.class).setGoogleAdsDeveloperToken("abc"); pipeline .apply(Create.empty(new TypeDescriptor() {})) - .apply(GoogleAdsIO.v14().readAll()); + .apply(GoogleAdsIO.v14().readAll().withRateLimitPolicy(TEST_POLICY_FACTORY)); } @Test @@ -86,7 +92,11 @@ public void testReadAllExpandWithDeveloperTokenFromOptionsAndBuilder() { pipeline.getOptions().as(GoogleAdsOptions.class).setGoogleAdsDeveloperToken("abc"); pipeline .apply(Create.empty(new TypeDescriptor() {})) - .apply(GoogleAdsIO.v14().readAll().withDeveloperToken(null)); + .apply( + GoogleAdsIO.v14() + .readAll() + .withRateLimitPolicy(TEST_POLICY_FACTORY) + .withDeveloperToken(null)); } @Test @@ -97,7 +107,18 @@ public void testReadAllExpandWithoutDeveloperToken() throws Exception { () -> pipeline .apply(Create.empty(new TypeDescriptor() {})) - .apply(GoogleAdsIO.v14().readAll())); + .apply(GoogleAdsIO.v14().readAll().withRateLimitPolicy(TEST_POLICY_FACTORY))); + } + + @Test + public void testReadAllExpandWithoutRateLimitPolicy() throws Exception { + Assert.assertThrows( + "Rate limit policy required but not provided", + IllegalArgumentException.class, + () -> + pipeline + .apply(Create.empty(new TypeDescriptor() {})) + .apply(GoogleAdsIO.v14().readAll().withDeveloperToken("abc"))); } @Test @@ -108,7 +129,11 @@ public void testReadAllExpandWithoutValidGoogleAdsClientFactory() throws Excepti () -> pipeline .apply(Create.empty(new TypeDescriptor() {})) - .apply(GoogleAdsIO.v14().readAll().withGoogleAdsClientFactory(null))); + .apply( + GoogleAdsIO.v14() + .readAll() + .withRateLimitPolicy(TEST_POLICY_FACTORY) + .withGoogleAdsClientFactory(null))); } @Test @@ -126,11 +151,17 @@ public void testReadAllExpandWithoutValidRateLimitPolicy() throws Exception { public void testReadExpandWithDeveloperTokenFromBuilder() { pipeline .apply(Create.empty(TypeDescriptors.strings())) - .apply(GoogleAdsIO.v14().read().withDeveloperToken("abc").withQuery("GAQL")); + .apply( + GoogleAdsIO.v14() + .read() + .withRateLimitPolicy(TEST_POLICY_FACTORY) + .withDeveloperToken("abc") + .withQuery("GAQL")); pipeline.getOptions().as(GoogleAdsOptions.class).setGoogleAdsDeveloperToken("abc"); pipeline .apply(Create.empty(TypeDescriptors.strings())) - .apply(GoogleAdsIO.v14().read().withQuery("GAQL")); + .apply( + GoogleAdsIO.v14().read().withRateLimitPolicy(TEST_POLICY_FACTORY).withQuery("GAQL")); } @Test @@ -138,7 +169,8 @@ public void testReadExpandWithDeveloperTokenFromOptions() { pipeline.getOptions().as(GoogleAdsOptions.class).setGoogleAdsDeveloperToken("abc"); pipeline .apply(Create.empty(TypeDescriptors.strings())) - .apply(GoogleAdsIO.v14().read().withQuery("GAQL")); + .apply( + GoogleAdsIO.v14().read().withRateLimitPolicy(TEST_POLICY_FACTORY).withQuery("GAQL")); } @Test @@ -146,7 +178,12 @@ public void testReadExpandWithDeveloperTokenFromOptionsAndBuilder() { pipeline.getOptions().as(GoogleAdsOptions.class).setGoogleAdsDeveloperToken("abc"); pipeline .apply(Create.empty(TypeDescriptors.strings())) - .apply(GoogleAdsIO.v14().read().withDeveloperToken(null).withQuery("GAQL")); + .apply( + GoogleAdsIO.v14() + .read() + .withRateLimitPolicy(TEST_POLICY_FACTORY) + .withDeveloperToken(null) + .withQuery("GAQL")); } @Test @@ -157,7 +194,11 @@ public void testReadExpandWithoutDeveloperToken() throws Exception { () -> pipeline .apply(Create.empty(TypeDescriptors.strings())) - .apply(GoogleAdsIO.v14().read().withQuery("GAQL"))); + .apply( + GoogleAdsIO.v14() + .read() + .withRateLimitPolicy(TEST_POLICY_FACTORY) + .withQuery("GAQL"))); } @Test @@ -168,7 +209,18 @@ public void testReadExpandWithoutQuery() throws Exception { () -> pipeline .apply(Create.empty(TypeDescriptors.strings())) - .apply(GoogleAdsIO.v14().read())); + .apply(GoogleAdsIO.v14().read().withRateLimitPolicy(TEST_POLICY_FACTORY))); + } + + @Test + public void testReadExpandWithoutRateLimitPolicy() throws Exception { + Assert.assertThrows( + "Rate limit policy required but not provided", + IllegalArgumentException.class, + () -> + pipeline + .apply(Create.empty(TypeDescriptors.strings())) + .apply(GoogleAdsIO.v14().read().withDeveloperToken("abc").withQuery("GAQL"))); } @Test @@ -180,7 +232,11 @@ public void testReadExpandWithoutValidGoogleAdsClientFactory() throws Exception pipeline .apply(Create.empty(TypeDescriptors.strings())) .apply( - GoogleAdsIO.v14().read().withQuery("GAQL").withGoogleAdsClientFactory(null))); + GoogleAdsIO.v14() + .read() + .withRateLimitPolicy(TEST_POLICY_FACTORY) + .withQuery("GAQL") + .withGoogleAdsClientFactory(null))); } @Test @@ -191,7 +247,11 @@ public void testReadExpandWithoutValidQuery() throws Exception { () -> pipeline .apply(Create.empty(TypeDescriptors.strings())) - .apply(GoogleAdsIO.v14().read().withQuery(null))); + .apply( + GoogleAdsIO.v14() + .read() + .withRateLimitPolicy(TEST_POLICY_FACTORY) + .withQuery(null))); Assert.assertThrows( "Non-empty query required but not provided", @@ -199,7 +259,11 @@ public void testReadExpandWithoutValidQuery() throws Exception { () -> pipeline .apply(Create.empty(TypeDescriptors.strings())) - .apply(GoogleAdsIO.v14().read().withQuery(""))); + .apply( + GoogleAdsIO.v14() + .read() + .withRateLimitPolicy(TEST_POLICY_FACTORY) + .withQuery(""))); } @Test @@ -246,6 +310,7 @@ public void testRead() { GoogleAdsIO.v14() .read() .withGoogleAdsClientFactory(new MockGoogleAdsClientFactory()) + .withRateLimitPolicy(TEST_POLICY_FACTORY) .withDeveloperToken("abc") .withQuery("GAQL")); PAssert.thatSingleton(rows).isEqualTo(GoogleAdsRow.getDefaultInstance()); @@ -277,6 +342,7 @@ public void testReadWithFailureFromMaxRetriesExceeded() throws Exception { GoogleAdsIO.v14() .read() .withGoogleAdsClientFactory(new MockGoogleAdsClientFactory()) + .withRateLimitPolicy(TEST_POLICY_FACTORY) .withDeveloperToken("abc") .withQuery("GAQL")); @@ -316,6 +382,7 @@ public void testReadWithFailureFromNonRetryableError() throws Exception { GoogleAdsIO.v14() .read() .withGoogleAdsClientFactory(new MockGoogleAdsClientFactory()) + .withRateLimitPolicy(TEST_POLICY_FACTORY) .withDeveloperToken("abc") .withQuery("GAQL")); @@ -377,6 +444,7 @@ public void testReadWithRecoveryFromInternalError() throws Exception { GoogleAdsIO.v14() .read() .withGoogleAdsClientFactory(new MockGoogleAdsClientFactory()) + .withRateLimitPolicy(TEST_POLICY_FACTORY) .withDeveloperToken("abc") .withQuery("GAQL")); PAssert.thatSingleton(rows).isEqualTo(GoogleAdsRow.getDefaultInstance()); @@ -438,6 +506,7 @@ public void testReadWithRecoveryFromQuotaErrorWithRetryDelay() throws Exception GoogleAdsIO.v14() .read() .withGoogleAdsClientFactory(new MockGoogleAdsClientFactory()) + .withRateLimitPolicy(TEST_POLICY_FACTORY) .withDeveloperToken("abc") .withQuery("GAQL")); PAssert.thatSingleton(rows).isEqualTo(GoogleAdsRow.getDefaultInstance()); From cc25276043d2035998312522db0b32cb62ce071c Mon Sep 17 00:00:00 2001 From: Steven van Rossum Date: Tue, 8 Aug 2023 01:26:58 +0200 Subject: [PATCH 11/16] Refactor DefaultRateLimitPolicy to SimpleRateLimitPolicy --- .../beam/sdk/io/googleads/GoogleAdsV14.java | 40 +++++++++++++++++-- 1 file changed, 36 insertions(+), 4 deletions(-) diff --git a/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/GoogleAdsV14.java b/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/GoogleAdsV14.java index eb62ed0596b6..b1012262731e 100644 --- a/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/GoogleAdsV14.java +++ b/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/GoogleAdsV14.java @@ -37,6 +37,7 @@ import java.io.IOException; import java.io.Serializable; import java.util.Optional; +import java.util.concurrent.TimeUnit; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFn.ProcessContext; @@ -601,15 +602,46 @@ void onBeforeRequest(String developerToken, String customerId, Message request) void onError(String developerToken, String customerId, Message request, GoogleAdsError error); } - static class DefaultRateLimitPolicy implements RateLimitPolicy { - private static final RateLimiter RATE_LIMITER = RateLimiter.create(1.0); + /** + * This rate limit policy wraps a {@link RateLimiter} and can be used in low volume and + * development use cases as a client-side rate limiting policy. This policy does not enforce a + * global (per pipeline or otherwise) rate limit to requests and should not be used in deployments + * where the Google Ads API quota is shared between multiple applications. + * + *

This policy can be used to limit requests across all {@link GoogleAdsV14.Read} or {@link + * GoogleAdsV14.ReadAll} transforms by defining and using a {@link + * GoogleAdsV14.RateLimitPolicyFactory} which holds a shared static {@link + * GoogleAdsV14.SimpleRateLimitPolicy}. Note that the desired rate must be divided by the expected + * maximum number of workers for the pipeline, otherwise the pipeline may exceed the desired rate + * after an upscaling event. + * + *

{@code
+   * public class SimpleRateLimitPolicyFactory implements GoogleAdsV14.RateLimitPolicyFactory {
+   *   private static final GoogleAdsV14.RateLimitPolicy POLICY =
+   *       new GoogleAdsV14.SimpleRateLimitPolicy(1.0 / 1000.0);
+   *
+   *   @Override
+   *   public GoogleAdsV14.RateLimitPolicy getRateLimitPolicy() {
+   *     return POLICY;
+   *   }
+   * }
+   * }
+ */ + public static class SimpleRateLimitPolicy implements RateLimitPolicy { + private final RateLimiter rateLimiter; + + SimpleRateLimitPolicy(double permitsPerSecond) { + rateLimiter = RateLimiter.create(permitsPerSecond); + } - DefaultRateLimitPolicy() {} + SimpleRateLimitPolicy(double permitsPerSecond, long warmupPeriod, TimeUnit unit) { + rateLimiter = RateLimiter.create(permitsPerSecond, warmupPeriod, unit); + } @Override public void onBeforeRequest(String developerToken, String customerId, Message request) throws InterruptedException { - RATE_LIMITER.acquire(); + rateLimiter.acquire(); } @Override From 4dbe0500651ffbb399609198501b8fcdcfb06cbb Mon Sep 17 00:00:00 2001 From: Steven van Rossum Date: Tue, 8 Aug 2023 01:34:31 +0200 Subject: [PATCH 12/16] Synchronize on ReadAllFn to safely modify static variable sleeper --- .../org/apache/beam/sdk/io/googleads/GoogleAdsV14Test.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sdks/java/io/google-ads/src/test/java/org/apache/beam/sdk/io/googleads/GoogleAdsV14Test.java b/sdks/java/io/google-ads/src/test/java/org/apache/beam/sdk/io/googleads/GoogleAdsV14Test.java index 7b9ec8d3b230..c6c96a9724db 100644 --- a/sdks/java/io/google-ads/src/test/java/org/apache/beam/sdk/io/googleads/GoogleAdsV14Test.java +++ b/sdks/java/io/google-ads/src/test/java/org/apache/beam/sdk/io/googleads/GoogleAdsV14Test.java @@ -286,7 +286,9 @@ public static class ExecutionTests { public void init() { GoogleAdsOptions options = pipeline.getOptions().as(GoogleAdsOptions.class); options.setGoogleAdsCredentialFactoryClass(NoopCredentialFactory.class); - GoogleAdsV14.ReadAll.ReadAllFn.sleeper = (long millis) -> {}; + synchronized (GoogleAdsV14.ReadAll.ReadAllFn.class) { + GoogleAdsV14.ReadAll.ReadAllFn.sleeper = (long millis) -> {}; + } } @Test From 9cc5e0fa6af2274e509b6c163484135d6c693654 Mon Sep 17 00:00:00 2001 From: Steven van Rossum Date: Tue, 8 Aug 2023 23:04:27 +0200 Subject: [PATCH 13/16] Add additional links to clarify API limits, quotas, and policies --- .../beam/sdk/io/googleads/GoogleAdsV14.java | 23 +++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/GoogleAdsV14.java b/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/GoogleAdsV14.java index b1012262731e..178ba0ccac7d 100644 --- a/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/GoogleAdsV14.java +++ b/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/GoogleAdsV14.java @@ -136,12 +136,27 @@ * * On construction of a {@link GoogleAdsV14#read()} or {@link GoogleAdsV14#readAll()} transform a * rate limiting policy must be specified to stay well under the assigned quota for the Google Ads - * API. The Google Ads API enforces global limits from the developer token down to the customer ID - * and it is recommended to host a shared rate limiting service to coordinate traffic to the Google + * API. The Google Ads API enforces global rate limits from the developer token down to the customer + * ID and depending on the access level of the developer token a limit on the total number of + * executed operations per day. See Rate + * Limits and API Limits and + * Quotas in the Google Ads documentation for more details. + * + *

It is recommended to host a shared rate limiting service to coordinate traffic to the Google * Ads API across all applications using the same developer token. Users of these transforms are * strongly advised to implement their own {@link RateLimitPolicy} and {@link - * RateLimitPolicyFactory} to interact with a shared rate limiting service for any production - * workloads. + * RateLimitPolicyFactory} to interact with a shared rate limiting service (e.g. gubernator) for any production workloads. + * + *

Required Minimum Functionality

+ * + * Pipelines built using these transforms may still be subject to the Required Minimum Functionality + * policy. Please review the policy carefully and have your tool reviewed by the Google Ads API + * Review Team. See Required Minimum + * Functionality and Rate + * sheet & non-compliance fees in the Google Ads API documentation for more details. * * @see GoogleAdsIO#v14() * @see GoogleAdsOptions From 04282b9fa2894fd868c800413094f3ddf98de7e8 Mon Sep 17 00:00:00 2001 From: Steven van Rossum Date: Tue, 8 Aug 2023 23:29:27 +0200 Subject: [PATCH 14/16] Fix Guava imports --- sdks/java/io/google-ads/build.gradle | 2 +- .../sdk/io/googleads/GoogleAdsUserCredentialFactory.java | 2 +- .../java/org/apache/beam/sdk/io/googleads/GoogleAdsV14.java | 6 +++--- .../org/apache/beam/sdk/io/googleads/GoogleAdsV14Test.java | 2 +- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/sdks/java/io/google-ads/build.gradle b/sdks/java/io/google-ads/build.gradle index 81057ce3cdc8..e874c0b3cb1b 100644 --- a/sdks/java/io/google-ads/build.gradle +++ b/sdks/java/io/google-ads/build.gradle @@ -35,7 +35,7 @@ dependencies { implementation library.java.google_ads implementation library.java.google_ads_stubs_v14 implementation library.java.joda_time - implementation library.java.vendored_guava_26_0_jre + implementation library.java.vendored_guava_32_1_2_jre testImplementation project(path: ":sdks:java:core", configuration: "shadowTest") testImplementation project(path: ":sdks:java:io:common", configuration: "testRuntimeMigration") testImplementation library.java.mockito_core diff --git a/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/GoogleAdsUserCredentialFactory.java b/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/GoogleAdsUserCredentialFactory.java index c9183157fc31..50e05a1d5196 100644 --- a/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/GoogleAdsUserCredentialFactory.java +++ b/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/GoogleAdsUserCredentialFactory.java @@ -17,7 +17,7 @@ */ package org.apache.beam.sdk.io.googleads; -import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; import com.google.auth.Credentials; import com.google.auth.oauth2.UserCredentials; diff --git a/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/GoogleAdsV14.java b/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/GoogleAdsV14.java index 178ba0ccac7d..d4f2119e3f20 100644 --- a/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/GoogleAdsV14.java +++ b/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/GoogleAdsV14.java @@ -19,7 +19,7 @@ import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; -import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; import com.google.ads.googleads.lib.GoogleAdsClient; import com.google.ads.googleads.v14.errors.GoogleAdsError; @@ -54,8 +54,8 @@ import org.apache.beam.sdk.util.Sleeper; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TypeDescriptor; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.RateLimiter; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.RateLimiter; import org.checkerframework.checker.nullness.qual.EnsuresNonNull; import org.checkerframework.checker.nullness.qual.Nullable; import org.checkerframework.checker.nullness.qual.RequiresNonNull; diff --git a/sdks/java/io/google-ads/src/test/java/org/apache/beam/sdk/io/googleads/GoogleAdsV14Test.java b/sdks/java/io/google-ads/src/test/java/org/apache/beam/sdk/io/googleads/GoogleAdsV14Test.java index c6c96a9724db..efe1694691c3 100644 --- a/sdks/java/io/google-ads/src/test/java/org/apache/beam/sdk/io/googleads/GoogleAdsV14Test.java +++ b/sdks/java/io/google-ads/src/test/java/org/apache/beam/sdk/io/googleads/GoogleAdsV14Test.java @@ -49,7 +49,7 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.sdk.values.TypeDescriptors; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; From 10356550de52df0c907cc3f3d6bf0f95e5db39b6 Mon Sep 17 00:00:00 2001 From: Steven van Rossum Date: Wed, 9 Aug 2023 17:28:06 +0200 Subject: [PATCH 15/16] Update CHANGES.md --- CHANGES.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES.md b/CHANGES.md index 07bfd51a7fb5..602265221199 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -142,6 +142,7 @@ * This change is not compatible with Flink savepoints created by Beam 2.46.0 applications which had KinesisIO sources. * Added textio.ReadWithFilename transform (Go) ([#25812](https://github.com/apache/beam/issues/25812)). * Added fileio.MatchContinuously transform (Go) ([#26186](https://github.com/apache/beam/issues/26186)). +* Added support for GoogleAdsIO source (Java) ([#27681](https://github.com/apache/beam/pull/27681)). ## New Features / Improvements From 5cd70c4e6368bb58aad76e89c8dfe79ba68b8f0a Mon Sep 17 00:00:00 2001 From: Steven van Rossum Date: Wed, 9 Aug 2023 17:35:04 +0200 Subject: [PATCH 16/16] Move change to upcoming release --- CHANGES.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index 602265221199..44de36b23133 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -63,6 +63,7 @@ * Java KafkaIO now supports picking up topics via topicPattern ([#26948](https://github.com/apache/beam/pull/26948)) * Support for read from Cosmos DB Core SQL API ([#23604](https://github.com/apache/beam/issues/23604)) * Upgraded to HBase 2.5.5 for HBaseIO. (Java) ([#27711](https://github.com/apache/beam/issues/19554)) +* Added support for GoogleAdsIO source (Java) ([#27681](https://github.com/apache/beam/pull/27681)). ## New Features / Improvements @@ -142,7 +143,6 @@ * This change is not compatible with Flink savepoints created by Beam 2.46.0 applications which had KinesisIO sources. * Added textio.ReadWithFilename transform (Go) ([#25812](https://github.com/apache/beam/issues/25812)). * Added fileio.MatchContinuously transform (Go) ([#26186](https://github.com/apache/beam/issues/26186)). -* Added support for GoogleAdsIO source (Java) ([#27681](https://github.com/apache/beam/pull/27681)). ## New Features / Improvements