diff --git a/.test-infra/jenkins/job_PreCommit_Java.groovy b/.test-infra/jenkins/job_PreCommit_Java.groovy index ddeb05506cb8..41a3b418a015 100644 --- a/.test-infra/jenkins/job_PreCommit_Java.groovy +++ b/.test-infra/jenkins/job_PreCommit_Java.groovy @@ -34,6 +34,7 @@ def excludePaths = [ 'io/elasticsearch', 'io/elasticsearch-tests', 'io/file-schema-transform', + 'io/google-ads', 'io/google-cloud-platform', 'io/hadoop-common', 'io/hadoop-file-system', diff --git a/.test-infra/jenkins/job_PreCommit_Java_IOs.groovy b/.test-infra/jenkins/job_PreCommit_Java_IOs.groovy index f707be2674c5..edeeed5f0970 100644 --- a/.test-infra/jenkins/job_PreCommit_Java_IOs.groovy +++ b/.test-infra/jenkins/job_PreCommit_Java_IOs.groovy @@ -81,6 +81,7 @@ def ioModulesMap = [ 'debezium', 'elasticsearch', 'file-schema-transform', + 'google-ads', 'hbase', 'hcatalog', 'influxdb', diff --git a/CHANGES.md b/CHANGES.md index 07bfd51a7fb5..44de36b23133 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -63,6 +63,7 @@ * Java KafkaIO now supports picking up topics via topicPattern ([#26948](https://github.com/apache/beam/pull/26948)) * Support for read from Cosmos DB Core SQL API ([#23604](https://github.com/apache/beam/issues/23604)) * Upgraded to HBase 2.5.5 for HBaseIO. (Java) ([#27711](https://github.com/apache/beam/issues/19554)) +* Added support for GoogleAdsIO source (Java) ([#27681](https://github.com/apache/beam/pull/27681)). ## New Features / Improvements diff --git a/build.gradle.kts b/build.gradle.kts index 4ce425286c73..000277135ab0 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -335,6 +335,7 @@ tasks.register("javaioPreCommit") { dependsOn(":sdks:java:io:elasticsearch-tests:elasticsearch-tests-common:build") dependsOn(":sdks:java:io:elasticsearch:build") dependsOn(":sdks:java:io:file-schema-transform:build") + dependsOn(":sdks:java:io:google-ads:build") dependsOn(":sdks:java:io:hbase:build") dependsOn(":sdks:java:io:hcatalog:build") dependsOn(":sdks:java:io:influxdb:build") diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index eccc610696a8..8abb521d25b4 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -531,6 +531,7 @@ class BeamModulePlugin implements Plugin { def errorprone_version = "2.10.0" // Try to keep gax_version consistent with gax-grpc version in google_cloud_platform_libraries_bom def gax_version = "2.31.1" + def google_ads_version = "26.0.0" def google_clients_version = "2.0.0" def google_cloud_bigdataoss_version = "2.2.16" // Try to keep google_cloud_spanner_version consistent with google_cloud_spanner_bom in google_cloud_platform_libraries_bom @@ -651,6 +652,8 @@ class BeamModulePlugin implements Plugin { gax_grpc : "com.google.api:gax-grpc", // google_cloud_platform_libraries_bom sets version gax_grpc_test : "com.google.api:gax-grpc:$gax_version:testlib", // google_cloud_platform_libraries_bom sets version gax_httpjson : "com.google.api:gax-httpjson", // google_cloud_platform_libraries_bom sets version + google_ads : "com.google.api-ads:google-ads:$google_ads_version", + google_ads_stubs_v14 : "com.google.api-ads:google-ads-stubs-v14:$google_ads_version", google_api_client : "com.google.api-client:google-api-client:$google_clients_version", // for the libraries using $google_clients_version below. google_api_client_jackson2 : "com.google.api-client:google-api-client-jackson2:$google_clients_version", google_api_client_java6 : "com.google.api-client:google-api-client-java6:$google_clients_version", diff --git a/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml b/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml index aa96e452d17f..bb8954839d50 100644 --- a/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml +++ b/sdks/java/build-tools/src/main/resources/beam/checkstyle/suppressions.xml @@ -60,6 +60,9 @@ + + + diff --git a/sdks/java/io/google-ads/build.gradle b/sdks/java/io/google-ads/build.gradle new file mode 100644 index 000000000000..e874c0b3cb1b --- /dev/null +++ b/sdks/java/io/google-ads/build.gradle @@ -0,0 +1,45 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* License); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an AS IS BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +plugins { id 'org.apache.beam.module' } +applyJavaNature( automaticModuleName: 'org.apache.beam.sdk.io.googleads') + +description = "Apache Beam :: SDKs :: Java :: IO :: Google Ads" +ext.summary = "IO to read from Google Ads" + +dependencies { + implementation project(path: ":sdks:java:core", configuration: "shadow") + implementation project(path: ":sdks:java:extensions:google-cloud-platform-core") + implementation library.java.jackson_annotations + implementation library.java.gax + implementation library.java.google_ads + implementation library.java.google_auth_library_credentials + implementation library.java.google_auth_library_oauth2_http + implementation library.java.protobuf_java + implementation library.java.protobuf_java_util + implementation library.java.google_ads + implementation library.java.google_ads_stubs_v14 + implementation library.java.joda_time + implementation library.java.vendored_guava_32_1_2_jre + testImplementation project(path: ":sdks:java:core", configuration: "shadowTest") + testImplementation project(path: ":sdks:java:io:common", configuration: "testRuntimeMigration") + testImplementation library.java.mockito_core + testImplementation library.java.junit + testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow") + testRuntimeOnly library.java.slf4j_jdk14 +} diff --git a/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/DefaultGoogleAdsClientFactory.java b/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/DefaultGoogleAdsClientFactory.java new file mode 100644 index 000000000000..ace5d4d1b124 --- /dev/null +++ b/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/DefaultGoogleAdsClientFactory.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.googleads; + +import com.google.ads.googleads.lib.GoogleAdsClient; +import com.google.auth.Credentials; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** The default way to construct a {@link GoogleAdsClient}. */ +public class DefaultGoogleAdsClientFactory implements GoogleAdsClientFactory { + private static final DefaultGoogleAdsClientFactory INSTANCE = new DefaultGoogleAdsClientFactory(); + + public static DefaultGoogleAdsClientFactory getInstance() { + return INSTANCE; + } + + @Override + public GoogleAdsClient newGoogleAdsClient( + GoogleAdsOptions options, + @Nullable String developerToken, + @Nullable Long linkedCustomerId, + @Nullable Long loginCustomerId) { + + GoogleAdsClient.Builder builder = GoogleAdsClient.newBuilder(); + + Credentials credentials = options.getGoogleAdsCredential(); + if (credentials != null) { + builder.setCredentials(credentials); + } + + if (options.getGoogleAdsEndpoint() != null) { + builder.setEndpoint(options.getGoogleAdsEndpoint()); + } + + String developerTokenFromOptions = options.getGoogleAdsDeveloperToken(); + if (developerToken != null) { + builder.setDeveloperToken(developerToken); + } else if (developerTokenFromOptions != null) { + builder.setDeveloperToken(developerTokenFromOptions); + } + + if (linkedCustomerId != null) { + builder.setLinkedCustomerId(linkedCustomerId); + } + + if (loginCustomerId != null) { + builder.setLoginCustomerId(loginCustomerId); + } + + return builder.build(); + } +} diff --git a/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/GoogleAdsClientFactory.java b/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/GoogleAdsClientFactory.java new file mode 100644 index 000000000000..cef32f5c4b9b --- /dev/null +++ b/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/GoogleAdsClientFactory.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.googleads; + +import com.google.ads.googleads.lib.GoogleAdsClient; +import java.io.Serializable; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** Defines how to construct a {@link GoogleAdsClient}. */ +public interface GoogleAdsClientFactory extends Serializable { + GoogleAdsClient newGoogleAdsClient( + GoogleAdsOptions options, + @Nullable String developerToken, + @Nullable Long linkedCustomerId, + @Nullable Long loginCustomerId); +} diff --git a/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/GoogleAdsIO.java b/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/GoogleAdsIO.java new file mode 100644 index 000000000000..de176a05e9be --- /dev/null +++ b/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/GoogleAdsIO.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.googleads; + +/** + * {@link GoogleAdsIO} provides an API for reading from the Google Ads API over different + * versions of the Google Ads client libraries. + * + * @see GoogleAdsV14 + */ +public class GoogleAdsIO { + private GoogleAdsIO() {} + + public static GoogleAdsV14 v14() { + return GoogleAdsV14.INSTANCE; + } +} diff --git a/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/GoogleAdsOptions.java b/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/GoogleAdsOptions.java new file mode 100644 index 000000000000..738760c22eb2 --- /dev/null +++ b/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/GoogleAdsOptions.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.googleads; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.google.auth.Credentials; +import java.io.IOException; +import java.security.GeneralSecurityException; +import org.apache.beam.sdk.extensions.gcp.auth.CredentialFactory; +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.DefaultValueFactory; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.util.InstanceBuilder; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** Options used to configure Google Ads API specific options. */ +public interface GoogleAdsOptions extends PipelineOptions { + /** Host endpoint to use for connections to the Google Ads API. */ + @Description("Host endpoint to use for connections to the Google Ads API.") + @Default.String("googleads.googleapis.com:443") + String getGoogleAdsEndpoint(); + + void setGoogleAdsEndpoint(String endpoint); + + /** + * OAuth 2.0 Client ID identifying the application. + * + * @see https://developers.google.com/google-ads/api/docs/oauth/overview + * @see https://developers.google.com/identity/protocols/oauth2 + */ + @Description("OAuth 2.0 Client ID identifying the application.") + String getGoogleAdsClientId(); + + void setGoogleAdsClientId(String clientId); + + /** + * OAuth 2.0 Client Secret for the specified Client ID. + * + * @see https://developers.google.com/google-ads/api/docs/oauth/overview + * @see https://developers.google.com/identity/protocols/oauth2 + */ + @Description("OAuth 2.0 Client Secret for the specified Client ID.") + String getGoogleAdsClientSecret(); + + void setGoogleAdsClientSecret(String clientSecret); + + /** + * OAuth 2.0 Refresh Token for the user connecting to the Google Ads API. + * + * @see https://developers.google.com/google-ads/api/docs/oauth/overview + * @see https://developers.google.com/identity/protocols/oauth2 + */ + @Description("OAuth 2.0 Refresh Token for the user connecting to the Google Ads API.") + String getGoogleAdsRefreshToken(); + + void setGoogleAdsRefreshToken(String refreshToken); + + /** Google Ads developer token for the user connecting to the Google Ads API. */ + @Description("Google Ads developer token for the user connecting to the Google Ads API.") + @Nullable + String getGoogleAdsDeveloperToken(); + + void setGoogleAdsDeveloperToken(String developerToken); + + /** + * The class of the credential factory to create credentials if none have been explicitly set. + * + * @see #getGoogleAdsCredential() + */ + @Description( + "The class of the credential factory to create credentials if none have been explicitly set.") + @Default.Class(GoogleAdsUserCredentialFactory.class) + Class getGoogleAdsCredentialFactoryClass(); + + void setGoogleAdsCredentialFactoryClass( + Class credentialFactoryClass); + + /** + * The credential instance that should be used to authenticate against the Google Ads API. + * Defaults to a credential instance constructed by the credential factory. + * + * @see #getGoogleAdsCredential() + * @see https://github.com/googleapis/google-auth-library-java + */ + @JsonIgnore + @Description( + "The credential instance that should be used to authenticate against the Google Ads API. " + + "Defaults to a credential instance constructed by the credential factory.") + @Default.InstanceFactory(GoogleAdsCredentialsFactory.class) + @Nullable + Credentials getGoogleAdsCredential(); + + void setGoogleAdsCredential(Credentials credential); + + /** + * Attempts to load the Google Ads credentials. See {@link CredentialFactory#getCredential()} for + * more details. + */ + class GoogleAdsCredentialsFactory implements DefaultValueFactory<@Nullable Credentials> { + @Override + public @Nullable Credentials create(PipelineOptions options) { + GoogleAdsOptions googleAdsOptions = options.as(GoogleAdsOptions.class); + try { + CredentialFactory factory = + InstanceBuilder.ofType(CredentialFactory.class) + .fromClass(googleAdsOptions.getGoogleAdsCredentialFactoryClass()) + .fromFactoryMethod("fromOptions") + .withArg(PipelineOptions.class, options) + .build(); + return factory.getCredential(); + } catch (IOException | GeneralSecurityException e) { + throw new RuntimeException("Unable to obtain credential", e); + } + } + } +} diff --git a/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/GoogleAdsUserCredentialFactory.java b/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/GoogleAdsUserCredentialFactory.java new file mode 100644 index 000000000000..50e05a1d5196 --- /dev/null +++ b/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/GoogleAdsUserCredentialFactory.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.googleads; + +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; + +import com.google.auth.Credentials; +import com.google.auth.oauth2.UserCredentials; +import org.apache.beam.sdk.extensions.gcp.auth.CredentialFactory; +import org.apache.beam.sdk.options.PipelineOptions; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** + * Constructs and returns {@link Credentials} to be used by Google Ads API calls. This factory only + * supports {@link com.google.auth.oauth2.UserCredentials}, {@link + * com.google.auth.oauth2.ServiceAccountCredentials} and domain-wide delegation are not supported. + */ +public class GoogleAdsUserCredentialFactory implements CredentialFactory { + // The OAuth client ID, client secret, and refresh token. + private String clientId; + private String clientSecret; + private String refreshToken; + + private GoogleAdsUserCredentialFactory( + String clientId, String clientSecret, String refreshToken) { + this.clientId = clientId; + this.clientSecret = clientSecret; + this.refreshToken = refreshToken; + } + + public static GoogleAdsUserCredentialFactory fromOptions(PipelineOptions options) { + GoogleAdsOptions adsOptions = options.as(GoogleAdsOptions.class); + + checkArgument( + adsOptions.getGoogleAdsClientId() != null + && adsOptions.getGoogleAdsClientSecret() != null + && adsOptions.getGoogleAdsRefreshToken() != null, + "googleAdsClientId, googleAdsClientSecret and googleAdsRefreshToken must not be null"); + + return new GoogleAdsUserCredentialFactory( + adsOptions.getGoogleAdsClientId(), + adsOptions.getGoogleAdsClientSecret(), + adsOptions.getGoogleAdsRefreshToken()); + } + + /** Returns {@link Credentials} as configured by {@link GoogleAdsOptions}. */ + @Override + public @Nullable Credentials getCredential() { + return UserCredentials.newBuilder() + .setClientId(clientId) + .setClientSecret(clientSecret) + .setRefreshToken(refreshToken) + .build(); + } +} diff --git a/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/GoogleAdsV14.java b/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/GoogleAdsV14.java new file mode 100644 index 000000000000..d4f2119e3f20 --- /dev/null +++ b/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/GoogleAdsV14.java @@ -0,0 +1,669 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.googleads; + +import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; + +import com.google.ads.googleads.lib.GoogleAdsClient; +import com.google.ads.googleads.v14.errors.GoogleAdsError; +import com.google.ads.googleads.v14.errors.GoogleAdsException; +import com.google.ads.googleads.v14.errors.GoogleAdsFailure; +import com.google.ads.googleads.v14.errors.InternalErrorEnum; +import com.google.ads.googleads.v14.errors.QuotaErrorEnum; +import com.google.ads.googleads.v14.services.GoogleAdsRow; +import com.google.ads.googleads.v14.services.GoogleAdsServiceClient; +import com.google.ads.googleads.v14.services.SearchGoogleAdsStreamRequest; +import com.google.ads.googleads.v14.services.SearchGoogleAdsStreamResponse; +import com.google.auto.value.AutoValue; +import com.google.protobuf.Message; +import com.google.protobuf.util.Durations; +import java.io.IOException; +import java.io.Serializable; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.ProcessContext; +import org.apache.beam.sdk.transforms.DoFn.ProcessElement; +import org.apache.beam.sdk.transforms.DoFn.Setup; +import org.apache.beam.sdk.transforms.DoFn.Teardown; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.util.BackOff; +import org.apache.beam.sdk.util.BackOffUtils; +import org.apache.beam.sdk.util.FluentBackoff; +import org.apache.beam.sdk.util.Sleeper; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.RateLimiter; +import org.checkerframework.checker.nullness.qual.EnsuresNonNull; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.checkerframework.checker.nullness.qual.RequiresNonNull; +import org.joda.time.Duration; + +/** + * {@link GoogleAdsV14} provides an API to read Google Ads API v14 reports. + * + *

The Google Ads API does not use service account credentials in the same way as Google Cloud + * Platform APIs do. Service account credentials are typically only used to delegate (using + * domain-wide delegation) access through end user accounts. Providing credentials using the OAuth2 + * desktop flow may be preferable over domain wide delegation. Please refer to the Google Ads API + * documentation for more information on OAuth2 in the Google Ads API. + * + *

Defaults for OAuth 2.0 credentials, refresh token and developer token can be provided using + * the following flags: + * + *

+ *   --googleAdsClientId=your-client-id
+ *   --googleAdsClientSecret=your-client-secret
+ *   --googleAdsRefreshToken=your-refresh-token
+ *   --googleAdsDeveloperToken=your-developer-token
+ * 
+ * + *

Use {@link GoogleAdsV14#read()} to read either a bounded or unbounded {@link PCollection} of + * {@link GoogleAdsRow} from a single Google Ads Query + * Language query using {@link Read#withQuery(String)} and a {@link PCollection} of customer + * IDs. Alternatively, use {@link GoogleAdsV14#readAll()} to read either a bounded or unbounded + * {@link PCollection} of {@link GoogleAdsRow} from a {@link PCollection} of {@link + * SearchGoogleAdsStreamRequest} potentially containing many different queries. + * + *

For example, using {@link GoogleAdsV14#read()}: + * + *

{@code
+ * Pipeline p = Pipeline.create();
+ * PCollection customerIds =
+ *     p.apply(Create.of(Long.toString(1234567890L)));
+ * PCollection rows =
+ *     customerIds.apply(
+ *         GoogleAdsIO.v14()
+ *             .read()
+ *             .withRateLimitPolicy(MY_RATE_LIMIT_POLICY)
+ *             .withQuery(
+ *                 "SELECT"
+ *                     + "campaign.id,"
+ *                     + "campaign.name,"
+ *                     + "campaign.status"
+ *                     + "FROM campaign"));
+ * p.run();
+ * }
+ * + *

Alternatively, using {@link GoogleAdsV14#readAll()} to execute requests from a {@link + * PCollection} of {@link SearchGoogleAdsStreamRequest}: + * + *

{@code
+ * Pipeline p = Pipeline.create();
+ * PCollection requests =
+ *     p.apply(
+ *         Create.of(
+ *             ImmutableList.of(
+ *                 SearchGoogleAdsStreamRequest.newBuilder()
+ *                     .setCustomerId(Long.toString(1234567890L))
+ *                     .setQuery(
+ *                         "SELECT"
+ *                             + "campaign.id,"
+ *                             + "campaign.name,"
+ *                             + "campaign.status"
+ *                             + "FROM campaign")
+ *                     .build())));
+ * PCollection rows =
+ *     requests.apply(GoogleAdsIO.v14().readAll().withRateLimitPolicy(MY_RATE_LIMIT_POLICY));
+ * p.run();
+ * }
+ * + *

Client-side rate limiting

+ * + * On construction of a {@link GoogleAdsV14#read()} or {@link GoogleAdsV14#readAll()} transform a + * rate limiting policy must be specified to stay well under the assigned quota for the Google Ads + * API. The Google Ads API enforces global rate limits from the developer token down to the customer + * ID and depending on the access level of the developer token a limit on the total number of + * executed operations per day. See Rate + * Limits and API Limits and + * Quotas in the Google Ads documentation for more details. + * + *

It is recommended to host a shared rate limiting service to coordinate traffic to the Google + * Ads API across all applications using the same developer token. Users of these transforms are + * strongly advised to implement their own {@link RateLimitPolicy} and {@link + * RateLimitPolicyFactory} to interact with a shared rate limiting service (e.g. gubernator) for any production workloads. + * + *

Required Minimum Functionality

+ * + * Pipelines built using these transforms may still be subject to the Required Minimum Functionality + * policy. Please review the policy carefully and have your tool reviewed by the Google Ads API + * Review Team. See Required Minimum + * Functionality and Rate + * sheet & non-compliance fees in the Google Ads API documentation for more details. + * + * @see GoogleAdsIO#v14() + * @see GoogleAdsOptions + * @see Best + * Practices in the Google Ads documentation + */ +public class GoogleAdsV14 { + static final GoogleAdsV14 INSTANCE = new GoogleAdsV14(); + + private GoogleAdsV14() {} + + public Read read() { + return new AutoValue_GoogleAdsV14_Read.Builder() + .setGoogleAdsClientFactory(DefaultGoogleAdsClientFactory.getInstance()) + .build(); + } + + public ReadAll readAll() { + return new AutoValue_GoogleAdsV14_ReadAll.Builder() + .setGoogleAdsClientFactory(DefaultGoogleAdsClientFactory.getInstance()) + .build(); + } + + /** + * A {@link PTransform} that reads the results of a Google Ads query as {@link GoogleAdsRow} + * objects. + * + * @see GoogleAdsIO#v14() + * @see #readAll() + */ + @AutoValue + public abstract static class Read + extends PTransform, PCollection> { + abstract @Nullable String getDeveloperToken(); + + abstract @Nullable Long getLoginCustomerId(); + + abstract @Nullable String getQuery(); + + abstract GoogleAdsClientFactory getGoogleAdsClientFactory(); + + abstract @Nullable RateLimitPolicyFactory getRateLimitPolicyFactory(); + + abstract Builder toBuilder(); + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setDeveloperToken(@Nullable String developerToken); + + abstract Builder setLoginCustomerId(@Nullable Long loginCustomerId); + + abstract Builder setQuery(String query); + + abstract Builder setGoogleAdsClientFactory(GoogleAdsClientFactory googleAdsClientFactory); + + abstract Builder setRateLimitPolicyFactory(RateLimitPolicyFactory rateLimitPolicyFactory); + + abstract Read build(); + } + + /** + * Creates and returns a new {@link Read} transform with the specified developer token. A + * developer token is required to access the Google Ads API. + * + * @param developerToken The developer token to set. + * @return A new {@link Read} transform with the specified developer token. + * @see GoogleAdsClient + */ + public Read withDeveloperToken(@Nullable String developerToken) { + return toBuilder().setDeveloperToken(developerToken).build(); + } + + /** + * Creates and returns a new {@link Read} transform with the specified login customer ID. A + * login customer ID is only required for manager accounts. + * + * @param loginCustomerId The login customer ID to set. + * @return A new {@link Read} transform with the specified login customer ID. + * @see GoogleAdsClient + */ + public Read withLoginCustomerId(@Nullable Long loginCustomerId) { + return toBuilder().setLoginCustomerId(loginCustomerId).build(); + } + + /** + * Creates and returns a new {@link Read} transform with the specified query. The query will be + * executed for each customer ID. + * + * @param query + * @return A new {@link Read} transform with the specified query. + * @see SearchGoogleAdsStreamRequest + */ + public Read withQuery(String query) { + checkArgumentNotNull(query, "query cannot be null"); + checkArgument(!query.isEmpty(), "query cannot be empty"); + + return toBuilder().setQuery(query).build(); + } + + /** + * Creates and returns a new {@link Read} transform with the specified client factory. A {@link + * GoogleAdsClientFactory} builds the {@link GoogleAdsClient} used to construct service clients. + * The {@link DefaultGoogleAdsClientFactory} should be sufficient for most purposes unless the + * construction of {@link GoogleAdsClient} requires customization. + * + * @param googleAdsClientFactory + * @return A new {@link Read} transform with the specified client factory. + * @see GoogleAdsClient + */ + public Read withGoogleAdsClientFactory(GoogleAdsClientFactory googleAdsClientFactory) { + checkArgumentNotNull(googleAdsClientFactory, "googleAdsClientFactory cannot be null"); + + return toBuilder().setGoogleAdsClientFactory(googleAdsClientFactory).build(); + } + + /** + * Creates and returns a new {@link Read} transform with the specified rate limit policy + * factory. A {@link RateLimitPolicyFactory} builds the {@link RateLimitPolicy} used to limit + * the number of requests made by {@link ReadAll.ReadAllFn}. The Google Ads API enforces global + * limits from the developer token down to the customer ID and it is recommended to host a + * shared rate limiting service to coordinate traffic to the Google Ads API across all + * applications using the same developer token. Users of these transforms are strongly advised + * to implement their own {@link RateLimitPolicy} and {@link RateLimitPolicyFactory} to interact + * with a shared rate limiting service for any production workloads. + * + * @param rateLimitPolicyFactory + * @return A new {@link Read} transform with the specified rate limit policy factory. + * @see GoogleAdsClient + */ + public Read withRateLimitPolicy(RateLimitPolicyFactory rateLimitPolicyFactory) { + checkArgumentNotNull(rateLimitPolicyFactory, "rateLimitPolicyFactory cannot be null"); + + return toBuilder().setRateLimitPolicyFactory(rateLimitPolicyFactory).build(); + } + + @Override + public PCollection expand(PCollection input) { + String query = getQuery(); + RateLimitPolicyFactory rateLimitPolicyFactory = getRateLimitPolicyFactory(); + checkArgumentNotNull(query, "withQuery() is required"); + checkArgumentNotNull(rateLimitPolicyFactory, "withRateLimitPolicy() is required"); + + return input + .apply( + MapElements.into(TypeDescriptor.of(SearchGoogleAdsStreamRequest.class)) + .via( + customerId -> + SearchGoogleAdsStreamRequest.newBuilder() + .setCustomerId(customerId) + .setQuery(query) + .build())) + .apply( + INSTANCE + .readAll() + .withDeveloperToken(getDeveloperToken()) + .withLoginCustomerId(getLoginCustomerId()) + .withGoogleAdsClientFactory(getGoogleAdsClientFactory()) + .withRateLimitPolicy(rateLimitPolicyFactory)); + } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + builder.addIfNotNull( + DisplayData.item("query", String.valueOf(getQuery())).withLabel("Query")); + } + } + + /** + * A {@link PTransform} that reads the results of many {@link SearchGoogleAdsStreamRequest} + * objects as {@link GoogleAdsRow} objects. * + * + * @see GoogleAdsIO#v14() + * @see #readAll() + */ + @AutoValue + public abstract static class ReadAll + extends PTransform, PCollection> { + abstract @Nullable String getDeveloperToken(); + + abstract @Nullable Long getLoginCustomerId(); + + abstract GoogleAdsClientFactory getGoogleAdsClientFactory(); + + abstract @Nullable RateLimitPolicyFactory getRateLimitPolicyFactory(); + + abstract Builder toBuilder(); + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setDeveloperToken(@Nullable String developerToken); + + abstract Builder setLoginCustomerId(@Nullable Long loginCustomerId); + + abstract Builder setGoogleAdsClientFactory(GoogleAdsClientFactory googleAdsClientFactory); + + abstract Builder setRateLimitPolicyFactory(RateLimitPolicyFactory rateLimitPolicyFactory); + + abstract ReadAll build(); + } + + /** + * Creates and returns a new {@link ReadAll} transform with the specified developer token. A + * developer token is required to access the Google Ads API. + * + * @param developerToken The developer token to set. + * @return A new {@link ReadAll} transform with the specified developer token. + * @see GoogleAdsClient + */ + public ReadAll withDeveloperToken(@Nullable String developerToken) { + return toBuilder().setDeveloperToken(developerToken).build(); + } + + /** + * Creates and returns a new {@link ReadAll} transform with the specified login customer ID. A + * login customer ID is only required for manager accounts. + * + * @param loginCustomerId The login customer ID to set. + * @return A new {@link ReadAll} transform with the specified login customer ID. + * @see GoogleAdsClient + */ + public ReadAll withLoginCustomerId(@Nullable Long loginCustomerId) { + return toBuilder().setLoginCustomerId(loginCustomerId).build(); + } + + /** + * Creates and returns a new {@link ReadAll} transform with the specified client factory. A + * {@link GoogleAdsClientFactory} builds the {@link GoogleAdsClient} used to construct service + * clients. The {@link DefaultGoogleAdsClientFactory} should be sufficient for most purposes + * unless the construction of {@link GoogleAdsClient} requires customization. + * + * @param googleAdsClientFactory + * @return A new {@link ReadAll} transform with the specified client factory. + * @see GoogleAdsClient + */ + public ReadAll withGoogleAdsClientFactory(GoogleAdsClientFactory googleAdsClientFactory) { + checkArgumentNotNull(googleAdsClientFactory, "googleAdsClientFactory cannot be null"); + + return toBuilder().setGoogleAdsClientFactory(googleAdsClientFactory).build(); + } + + /** + * Creates and returns a new {@link ReadAll} transform with the specified rate limit policy + * factory. A {@link RateLimitPolicyFactory} builds the {@link RateLimitPolicy} used to limit + * the number of requests made by {@link ReadAll.ReadAllFn}. The Google Ads API enforces global + * limits from the developer token down to the customer ID and it is recommended to host a + * shared rate limiting service to coordinate traffic to the Google Ads API across all + * applications using the same developer token. Users of these transforms are strongly advised + * to implement their own {@link RateLimitPolicy} and {@link RateLimitPolicyFactory} to interact + * with a shared rate limiting service for any production workloads. + * + * @param rateLimitPolicyFactory + * @return A new {@link ReadAll} transform with the specified rate limit policy factory. + * @see GoogleAdsClient + */ + public ReadAll withRateLimitPolicy(RateLimitPolicyFactory rateLimitPolicyFactory) { + checkArgumentNotNull(rateLimitPolicyFactory, "rateLimitPolicyFactory cannot be null"); + + return toBuilder().setRateLimitPolicyFactory(rateLimitPolicyFactory).build(); + } + + @Override + public PCollection expand(PCollection input) { + GoogleAdsOptions options = input.getPipeline().getOptions().as(GoogleAdsOptions.class); + + checkArgument( + options.getGoogleAdsDeveloperToken() != null || getDeveloperToken() != null, + "either --googleAdsDeveloperToken or .withDeveloperToken() is required"); + checkArgumentNotNull(getRateLimitPolicyFactory(), "withRateLimitPolicy() is required"); + + return input.apply(ParDo.of(new ReadAllFn(this))); + } + + /** + * A {@link DoFn} that reads reports from Google Ads for each query using the {@code + * SearchStream} method. + */ + @VisibleForTesting + static class ReadAllFn extends DoFn { + // The default retry configuration is based on that of services with a comparable + // potential volume of requests to the Google Ads API. + private static final int MAX_RETRIES = 5; + private static final FluentBackoff BACKOFF = + FluentBackoff.DEFAULT + .withExponent(2.0) + .withInitialBackoff(Duration.standardSeconds(30)) + .withMaxRetries(MAX_RETRIES); + + @VisibleForTesting static Sleeper sleeper = Sleeper.DEFAULT; + + private final GoogleAdsV14.ReadAll spec; + + private transient @Nullable GoogleAdsClient googleAdsClient; + private transient @Nullable GoogleAdsServiceClient googleAdsServiceClient; + private transient @Nullable RateLimitPolicy rateLimitPolicy; + + ReadAllFn(GoogleAdsV14.ReadAll spec) { + this.spec = spec; + } + + @Setup + @EnsuresNonNull({"googleAdsClient", "googleAdsServiceClient", "rateLimitPolicy"}) + public void setup(PipelineOptions options) { + GoogleAdsOptions adsOptions = options.as(GoogleAdsOptions.class); + + final GoogleAdsClient googleAdsClient = + spec.getGoogleAdsClientFactory() + .newGoogleAdsClient( + adsOptions, spec.getDeveloperToken(), null, spec.getLoginCustomerId()); + final GoogleAdsServiceClient googleAdsServiceClient = + googleAdsClient.getVersion14().createGoogleAdsServiceClient(); + final RateLimitPolicy rateLimitPolicy = + checkStateNotNull(spec.getRateLimitPolicyFactory()).getRateLimitPolicy(); + + this.googleAdsClient = googleAdsClient; + this.googleAdsServiceClient = googleAdsServiceClient; + this.rateLimitPolicy = rateLimitPolicy; + } + + @ProcessElement + @RequiresNonNull({"googleAdsClient", "googleAdsServiceClient", "rateLimitPolicy"}) + public void processElement(ProcessContext c) throws IOException, InterruptedException { + final GoogleAdsClient googleAdsClient = this.googleAdsClient; + final GoogleAdsServiceClient googleAdsServiceClient = this.googleAdsServiceClient; + final RateLimitPolicy rateLimitPolicy = this.rateLimitPolicy; + + BackOff backoff = BACKOFF.backoff(); + BackOff nextBackoff = backoff; + GoogleAdsException lastException = null; + + SearchGoogleAdsStreamRequest request = c.element(); + String developerToken = googleAdsClient.getDeveloperToken(); + String customerId = request.getCustomerId(); + + do { + rateLimitPolicy.onBeforeRequest(developerToken, customerId, request); + + try { + for (SearchGoogleAdsStreamResponse response : + googleAdsServiceClient.searchStreamCallable().call(request)) { + for (GoogleAdsRow row : response.getResultsList()) { + c.output(row); + } + } + rateLimitPolicy.onSuccess(developerToken, customerId, request); + return; + } catch (GoogleAdsException e) { + GoogleAdsError retryableError = + findFirstRetryableError(e.getGoogleAdsFailure()) + .orElseThrow(() -> new IOException(e)); + + rateLimitPolicy.onError(developerToken, customerId, request, retryableError); + + // If the error happens to carry a suggested retry delay, then use that instead. + // Retry these errors without incrementing the retry count or backoff interval. + // For all other retryable errors fall back to the existing backoff. + if (retryableError.getDetails().getQuotaErrorDetails().hasRetryDelay()) { + nextBackoff = + new BackOff() { + @Override + public void reset() {} + + @Override + public long nextBackOffMillis() { + return Durations.toMillis( + retryableError.getDetails().getQuotaErrorDetails().getRetryDelay()); + } + }; + } else { + nextBackoff = backoff; + } + } + } while (BackOffUtils.next(sleeper, nextBackoff)); + + throw new IOException( + String.format( + "Unable to get Google Ads response after retrying %d times using query (%s)", + MAX_RETRIES, request.getQuery()), + lastException); + } + + @Teardown + public void teardown() { + if (googleAdsServiceClient != null) { + googleAdsServiceClient.close(); + } + } + + private Optional findFirstRetryableError(GoogleAdsFailure e) { + return e.getErrorsList().stream() + .filter( + err -> + // Unexpected internal error + err.getErrorCode().getInternalError() + == InternalErrorEnum.InternalError.INTERNAL_ERROR + || + // Unexpected transient error + err.getErrorCode().getInternalError() + == InternalErrorEnum.InternalError.TRANSIENT_ERROR + || + // Too many requests + err.getErrorCode().getQuotaError() + == QuotaErrorEnum.QuotaError.RESOURCE_EXHAUSTED + || + // Too many requests in a short amount of time + err.getErrorCode().getQuotaError() + == QuotaErrorEnum.QuotaError.RESOURCE_TEMPORARILY_EXHAUSTED) + .findFirst(); + } + } + } + + /** + * Implement this interface to create a {@link RateLimitPolicy}. This should be used to limit all + * traffic sent to the Google Ads API for a pair of developer token and customer ID and any other + * relevant attributes for the specific Google Ads API service being called. + */ + public interface RateLimitPolicyFactory extends Serializable { + RateLimitPolicy getRateLimitPolicy(); + } + + /** + * This interface can be used to implement custom client-side rate limiting policies. Custom + * policies should follow best practices for interacting with the Google Ads API. + * + * @see Best + * Practices in the Google Ads documentation + */ + public interface RateLimitPolicy { + /** + * Called before a request is sent. + * + * @param developerToken The developer token used for the request. + * @param customerId The customer ID specified on the request. + * @param request Any Google Ads API request. + * @throws InterruptedException + */ + void onBeforeRequest(String developerToken, String customerId, Message request) + throws InterruptedException; + + /** + * Called after a request succeeds. + * + * @param developerToken The developer token used for the request. + * @param customerId The customer ID specified on the request. + * @param request Any Google Ads API request. + */ + void onSuccess(String developerToken, String customerId, Message request); + + /** + * Called after a request fails with a retryable error. + * + * @param developerToken The developer token used for the request. + * @param customerId The customer ID specified on the request. + * @param request Any Google Ads API request. + * @param error A retryable error. + */ + void onError(String developerToken, String customerId, Message request, GoogleAdsError error); + } + + /** + * This rate limit policy wraps a {@link RateLimiter} and can be used in low volume and + * development use cases as a client-side rate limiting policy. This policy does not enforce a + * global (per pipeline or otherwise) rate limit to requests and should not be used in deployments + * where the Google Ads API quota is shared between multiple applications. + * + *

This policy can be used to limit requests across all {@link GoogleAdsV14.Read} or {@link + * GoogleAdsV14.ReadAll} transforms by defining and using a {@link + * GoogleAdsV14.RateLimitPolicyFactory} which holds a shared static {@link + * GoogleAdsV14.SimpleRateLimitPolicy}. Note that the desired rate must be divided by the expected + * maximum number of workers for the pipeline, otherwise the pipeline may exceed the desired rate + * after an upscaling event. + * + *

{@code
+   * public class SimpleRateLimitPolicyFactory implements GoogleAdsV14.RateLimitPolicyFactory {
+   *   private static final GoogleAdsV14.RateLimitPolicy POLICY =
+   *       new GoogleAdsV14.SimpleRateLimitPolicy(1.0 / 1000.0);
+   *
+   *   @Override
+   *   public GoogleAdsV14.RateLimitPolicy getRateLimitPolicy() {
+   *     return POLICY;
+   *   }
+   * }
+   * }
+ */ + public static class SimpleRateLimitPolicy implements RateLimitPolicy { + private final RateLimiter rateLimiter; + + SimpleRateLimitPolicy(double permitsPerSecond) { + rateLimiter = RateLimiter.create(permitsPerSecond); + } + + SimpleRateLimitPolicy(double permitsPerSecond, long warmupPeriod, TimeUnit unit) { + rateLimiter = RateLimiter.create(permitsPerSecond, warmupPeriod, unit); + } + + @Override + public void onBeforeRequest(String developerToken, String customerId, Message request) + throws InterruptedException { + rateLimiter.acquire(); + } + + @Override + public void onSuccess(String developerToken, String customerId, Message request) {} + + @Override + public void onError( + String developerToken, String customerId, Message request, GoogleAdsError error) {} + } +} diff --git a/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/package-info.java b/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/package-info.java new file mode 100644 index 000000000000..56aaed903e3c --- /dev/null +++ b/sdks/java/io/google-ads/src/main/java/org/apache/beam/sdk/io/googleads/package-info.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Defines transforms for reading from Google Ads. + * + * @see org.apache.beam.sdk.io.googleads.GoogleAdsIO + */ +package org.apache.beam.sdk.io.googleads; diff --git a/sdks/java/io/google-ads/src/test/java/org/apache/beam/sdk/io/googleads/DummyRateLimitPolicy.java b/sdks/java/io/google-ads/src/test/java/org/apache/beam/sdk/io/googleads/DummyRateLimitPolicy.java new file mode 100644 index 000000000000..c2d3b230c13c --- /dev/null +++ b/sdks/java/io/google-ads/src/test/java/org/apache/beam/sdk/io/googleads/DummyRateLimitPolicy.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.googleads; + +import com.google.ads.googleads.v14.errors.GoogleAdsError; +import com.google.protobuf.Message; + +public class DummyRateLimitPolicy implements GoogleAdsV14.RateLimitPolicy { + @Override + public void onBeforeRequest(String developerToken, String customerId, Message request) + throws InterruptedException {} + + @Override + public void onSuccess(String developerToken, String customerId, Message request) {} + + @Override + public void onError( + String developerToken, String customerId, Message request, GoogleAdsError error) {} +} diff --git a/sdks/java/io/google-ads/src/test/java/org/apache/beam/sdk/io/googleads/GoogleAdsV14Test.java b/sdks/java/io/google-ads/src/test/java/org/apache/beam/sdk/io/googleads/GoogleAdsV14Test.java new file mode 100644 index 000000000000..efe1694691c3 --- /dev/null +++ b/sdks/java/io/google-ads/src/test/java/org/apache/beam/sdk/io/googleads/GoogleAdsV14Test.java @@ -0,0 +1,519 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.googleads; + +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.when; + +import com.google.ads.googleads.v14.errors.AuthenticationErrorEnum.AuthenticationError; +import com.google.ads.googleads.v14.errors.ErrorCode; +import com.google.ads.googleads.v14.errors.ErrorDetails; +import com.google.ads.googleads.v14.errors.GoogleAdsError; +import com.google.ads.googleads.v14.errors.GoogleAdsException; +import com.google.ads.googleads.v14.errors.GoogleAdsFailure; +import com.google.ads.googleads.v14.errors.InternalErrorEnum.InternalError; +import com.google.ads.googleads.v14.errors.QuotaErrorDetails; +import com.google.ads.googleads.v14.errors.QuotaErrorEnum.QuotaError; +import com.google.ads.googleads.v14.services.GoogleAdsRow; +import com.google.ads.googleads.v14.services.SearchGoogleAdsStreamRequest; +import com.google.ads.googleads.v14.services.SearchGoogleAdsStreamResponse; +import com.google.api.gax.grpc.GrpcStatusCode; +import com.google.api.gax.rpc.ApiException; +import com.google.protobuf.Duration; +import io.grpc.Metadata; +import io.grpc.Status.Code; +import java.io.IOException; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.Pipeline.PipelineExecutionException; +import org.apache.beam.sdk.extensions.gcp.auth.NoopCredentialFactory; +import org.apache.beam.sdk.io.googleads.GoogleAdsV14.RateLimitPolicyFactory; +import org.apache.beam.sdk.testing.NeedsRunner; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.sdk.values.TypeDescriptors; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.experimental.runners.Enclosed; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(Enclosed.class) +public class GoogleAdsV14Test { + static final RateLimitPolicyFactory TEST_POLICY_FACTORY = () -> new DummyRateLimitPolicy(); + + @RunWith(JUnit4.class) + public static class ConstructionTests { + private final transient TestPipeline pipeline = TestPipeline.create(); + + @Test + public void testReadAllExpandWithDeveloperTokenFromBuilder() { + pipeline + .apply(Create.empty(new TypeDescriptor() {})) + .apply( + GoogleAdsIO.v14() + .readAll() + .withRateLimitPolicy(TEST_POLICY_FACTORY) + .withDeveloperToken("abc")); + } + + @Test + public void testReadAllExpandWithDeveloperTokenFromOptions() { + pipeline.getOptions().as(GoogleAdsOptions.class).setGoogleAdsDeveloperToken("abc"); + pipeline + .apply(Create.empty(new TypeDescriptor() {})) + .apply(GoogleAdsIO.v14().readAll().withRateLimitPolicy(TEST_POLICY_FACTORY)); + } + + @Test + public void testReadAllExpandWithDeveloperTokenFromOptionsAndBuilder() { + pipeline.getOptions().as(GoogleAdsOptions.class).setGoogleAdsDeveloperToken("abc"); + pipeline + .apply(Create.empty(new TypeDescriptor() {})) + .apply( + GoogleAdsIO.v14() + .readAll() + .withRateLimitPolicy(TEST_POLICY_FACTORY) + .withDeveloperToken(null)); + } + + @Test + public void testReadAllExpandWithoutDeveloperToken() throws Exception { + Assert.assertThrows( + "Developer token required but not provided", + IllegalArgumentException.class, + () -> + pipeline + .apply(Create.empty(new TypeDescriptor() {})) + .apply(GoogleAdsIO.v14().readAll().withRateLimitPolicy(TEST_POLICY_FACTORY))); + } + + @Test + public void testReadAllExpandWithoutRateLimitPolicy() throws Exception { + Assert.assertThrows( + "Rate limit policy required but not provided", + IllegalArgumentException.class, + () -> + pipeline + .apply(Create.empty(new TypeDescriptor() {})) + .apply(GoogleAdsIO.v14().readAll().withDeveloperToken("abc"))); + } + + @Test + public void testReadAllExpandWithoutValidGoogleAdsClientFactory() throws Exception { + Assert.assertThrows( + "Non-null googleAdsClientFactory required but not provided", + IllegalArgumentException.class, + () -> + pipeline + .apply(Create.empty(new TypeDescriptor() {})) + .apply( + GoogleAdsIO.v14() + .readAll() + .withRateLimitPolicy(TEST_POLICY_FACTORY) + .withGoogleAdsClientFactory(null))); + } + + @Test + public void testReadAllExpandWithoutValidRateLimitPolicy() throws Exception { + Assert.assertThrows( + "Non-null rateLimitPolicy required but not provided", + IllegalArgumentException.class, + () -> + pipeline + .apply(Create.empty(new TypeDescriptor() {})) + .apply(GoogleAdsIO.v14().readAll().withRateLimitPolicy(null))); + } + + @Test + public void testReadExpandWithDeveloperTokenFromBuilder() { + pipeline + .apply(Create.empty(TypeDescriptors.strings())) + .apply( + GoogleAdsIO.v14() + .read() + .withRateLimitPolicy(TEST_POLICY_FACTORY) + .withDeveloperToken("abc") + .withQuery("GAQL")); + pipeline.getOptions().as(GoogleAdsOptions.class).setGoogleAdsDeveloperToken("abc"); + pipeline + .apply(Create.empty(TypeDescriptors.strings())) + .apply( + GoogleAdsIO.v14().read().withRateLimitPolicy(TEST_POLICY_FACTORY).withQuery("GAQL")); + } + + @Test + public void testReadExpandWithDeveloperTokenFromOptions() { + pipeline.getOptions().as(GoogleAdsOptions.class).setGoogleAdsDeveloperToken("abc"); + pipeline + .apply(Create.empty(TypeDescriptors.strings())) + .apply( + GoogleAdsIO.v14().read().withRateLimitPolicy(TEST_POLICY_FACTORY).withQuery("GAQL")); + } + + @Test + public void testReadExpandWithDeveloperTokenFromOptionsAndBuilder() { + pipeline.getOptions().as(GoogleAdsOptions.class).setGoogleAdsDeveloperToken("abc"); + pipeline + .apply(Create.empty(TypeDescriptors.strings())) + .apply( + GoogleAdsIO.v14() + .read() + .withRateLimitPolicy(TEST_POLICY_FACTORY) + .withDeveloperToken(null) + .withQuery("GAQL")); + } + + @Test + public void testReadExpandWithoutDeveloperToken() throws Exception { + Assert.assertThrows( + "Developer token required but not provided", + IllegalArgumentException.class, + () -> + pipeline + .apply(Create.empty(TypeDescriptors.strings())) + .apply( + GoogleAdsIO.v14() + .read() + .withRateLimitPolicy(TEST_POLICY_FACTORY) + .withQuery("GAQL"))); + } + + @Test + public void testReadExpandWithoutQuery() throws Exception { + Assert.assertThrows( + "Query required but not provided", + IllegalArgumentException.class, + () -> + pipeline + .apply(Create.empty(TypeDescriptors.strings())) + .apply(GoogleAdsIO.v14().read().withRateLimitPolicy(TEST_POLICY_FACTORY))); + } + + @Test + public void testReadExpandWithoutRateLimitPolicy() throws Exception { + Assert.assertThrows( + "Rate limit policy required but not provided", + IllegalArgumentException.class, + () -> + pipeline + .apply(Create.empty(TypeDescriptors.strings())) + .apply(GoogleAdsIO.v14().read().withDeveloperToken("abc").withQuery("GAQL"))); + } + + @Test + public void testReadExpandWithoutValidGoogleAdsClientFactory() throws Exception { + Assert.assertThrows( + "Non-null googleAdsClientFactory required but not provided", + IllegalArgumentException.class, + () -> + pipeline + .apply(Create.empty(TypeDescriptors.strings())) + .apply( + GoogleAdsIO.v14() + .read() + .withRateLimitPolicy(TEST_POLICY_FACTORY) + .withQuery("GAQL") + .withGoogleAdsClientFactory(null))); + } + + @Test + public void testReadExpandWithoutValidQuery() throws Exception { + Assert.assertThrows( + "Non-null query required but not provided", + IllegalArgumentException.class, + () -> + pipeline + .apply(Create.empty(TypeDescriptors.strings())) + .apply( + GoogleAdsIO.v14() + .read() + .withRateLimitPolicy(TEST_POLICY_FACTORY) + .withQuery(null))); + + Assert.assertThrows( + "Non-empty query required but not provided", + IllegalArgumentException.class, + () -> + pipeline + .apply(Create.empty(TypeDescriptors.strings())) + .apply( + GoogleAdsIO.v14() + .read() + .withRateLimitPolicy(TEST_POLICY_FACTORY) + .withQuery(""))); + } + + @Test + public void testReadExpandWithoutValidRateLimitPolicy() throws Exception { + Assert.assertThrows( + "Non-null rateLimitPolicy required but not provided", + IllegalArgumentException.class, + () -> + pipeline + .apply(Create.empty(TypeDescriptors.strings())) + .apply(GoogleAdsIO.v14().read().withQuery("GAQL").withRateLimitPolicy(null))); + } + } + + @RunWith(MockitoJUnitRunner.class) + public static class ExecutionTests { + @Rule public final transient TestPipeline pipeline = TestPipeline.create(); + + @Before + public void init() { + GoogleAdsOptions options = pipeline.getOptions().as(GoogleAdsOptions.class); + options.setGoogleAdsCredentialFactoryClass(NoopCredentialFactory.class); + synchronized (GoogleAdsV14.ReadAll.ReadAllFn.class) { + GoogleAdsV14.ReadAll.ReadAllFn.sleeper = (long millis) -> {}; + } + } + + @Test + @Category(NeedsRunner.class) + public void testRead() { + when(MockGoogleAdsClientFactory.GOOGLE_ADS_SERVICE_STUB_V14 + .searchStreamCallable() + .call(any(SearchGoogleAdsStreamRequest.class)) + .iterator()) + .thenReturn( + ImmutableList.of( + SearchGoogleAdsStreamResponse.newBuilder() + .addResults(GoogleAdsRow.newBuilder()) + .build()) + .iterator()); + + PCollection rows = + pipeline + .apply(Create.of("123")) + .apply( + GoogleAdsIO.v14() + .read() + .withGoogleAdsClientFactory(new MockGoogleAdsClientFactory()) + .withRateLimitPolicy(TEST_POLICY_FACTORY) + .withDeveloperToken("abc") + .withQuery("GAQL")); + PAssert.thatSingleton(rows).isEqualTo(GoogleAdsRow.getDefaultInstance()); + + pipeline.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testReadWithFailureFromMaxRetriesExceeded() throws Exception { + when(MockGoogleAdsClientFactory.GOOGLE_ADS_SERVICE_STUB_V14 + .searchStreamCallable() + .call(any(SearchGoogleAdsStreamRequest.class))) + .thenThrow( + new GoogleAdsException( + new ApiException(null, GrpcStatusCode.of(Code.UNKNOWN), false), + GoogleAdsFailure.newBuilder() + .addErrors( + GoogleAdsError.newBuilder() + .setErrorCode( + ErrorCode.newBuilder() + .setInternalError(InternalError.TRANSIENT_ERROR))) + .build(), + new Metadata())); + + pipeline + .apply(Create.of("123")) + .apply( + GoogleAdsIO.v14() + .read() + .withGoogleAdsClientFactory(new MockGoogleAdsClientFactory()) + .withRateLimitPolicy(TEST_POLICY_FACTORY) + .withDeveloperToken("abc") + .withQuery("GAQL")); + + PipelineExecutionException exception = + Assert.assertThrows( + "Last retryable error after max retries", + Pipeline.PipelineExecutionException.class, + pipeline::run); + Assert.assertEquals(IOException.class, exception.getCause().getClass()); + Assert.assertEquals( + "Unable to get Google Ads response after retrying 5 times using query (GAQL)", + exception.getCause().getMessage()); + } + + @Test + @Category(NeedsRunner.class) + public void testReadWithFailureFromNonRetryableError() throws Exception { + when(MockGoogleAdsClientFactory.GOOGLE_ADS_SERVICE_STUB_V14 + .searchStreamCallable() + .call(any(SearchGoogleAdsStreamRequest.class))) + .thenThrow( + new GoogleAdsException( + new ApiException(null, GrpcStatusCode.of(Code.UNKNOWN), false), + GoogleAdsFailure.newBuilder() + .addErrors( + GoogleAdsError.newBuilder() + .setErrorCode( + ErrorCode.newBuilder() + .setAuthenticationError( + AuthenticationError.OAUTH_TOKEN_REVOKED))) + .build(), + new Metadata())); + + pipeline + .apply(Create.of("123")) + .apply( + GoogleAdsIO.v14() + .read() + .withGoogleAdsClientFactory(new MockGoogleAdsClientFactory()) + .withRateLimitPolicy(TEST_POLICY_FACTORY) + .withDeveloperToken("abc") + .withQuery("GAQL")); + + PipelineExecutionException exception = + Assert.assertThrows( + "First non-retryable error", + Pipeline.PipelineExecutionException.class, + pipeline::run); + Assert.assertEquals(IOException.class, exception.getCause().getClass()); + Assert.assertEquals( + "com.google.ads.googleads.v14.errors.GoogleAdsException: errors {\n" + + " error_code {\n" + + " authentication_error: OAUTH_TOKEN_REVOKED\n" + + " }\n" + + "}\n", + exception.getCause().getMessage()); + } + + @Test + @Category(NeedsRunner.class) + public void testReadWithRecoveryFromInternalError() throws Exception { + when(MockGoogleAdsClientFactory.GOOGLE_ADS_SERVICE_STUB_V14 + .searchStreamCallable() + .call(any(SearchGoogleAdsStreamRequest.class)) + .iterator()) + .thenThrow( + new GoogleAdsException( + new ApiException(null, GrpcStatusCode.of(Code.UNKNOWN), false), + GoogleAdsFailure.newBuilder() + .addErrors( + GoogleAdsError.newBuilder() + .setErrorCode( + ErrorCode.newBuilder() + .setInternalError(InternalError.INTERNAL_ERROR))) + .build(), + new Metadata())) + .thenThrow( + new GoogleAdsException( + new ApiException(null, GrpcStatusCode.of(Code.UNKNOWN), false), + GoogleAdsFailure.newBuilder() + .addErrors( + GoogleAdsError.newBuilder() + .setErrorCode( + ErrorCode.newBuilder() + .setInternalError(InternalError.TRANSIENT_ERROR))) + .build(), + new Metadata())) + .thenReturn( + ImmutableList.of( + SearchGoogleAdsStreamResponse.newBuilder() + .addResults(GoogleAdsRow.newBuilder()) + .build()) + .iterator()); + + PCollection rows = + pipeline + .apply(Create.of("123")) + .apply( + GoogleAdsIO.v14() + .read() + .withGoogleAdsClientFactory(new MockGoogleAdsClientFactory()) + .withRateLimitPolicy(TEST_POLICY_FACTORY) + .withDeveloperToken("abc") + .withQuery("GAQL")); + PAssert.thatSingleton(rows).isEqualTo(GoogleAdsRow.getDefaultInstance()); + + pipeline.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testReadWithRecoveryFromQuotaErrorWithRetryDelay() throws Exception { + when(MockGoogleAdsClientFactory.GOOGLE_ADS_SERVICE_STUB_V14 + .searchStreamCallable() + .call(any(SearchGoogleAdsStreamRequest.class)) + .iterator()) + .thenThrow( + new GoogleAdsException( + new ApiException(null, GrpcStatusCode.of(Code.UNKNOWN), false), + GoogleAdsFailure.newBuilder() + .addErrors( + GoogleAdsError.newBuilder() + .setErrorCode( + ErrorCode.newBuilder() + .setQuotaError(QuotaError.RESOURCE_EXHAUSTED)) + .setDetails( + ErrorDetails.newBuilder() + .setQuotaErrorDetails( + QuotaErrorDetails.newBuilder() + .setRetryDelay(Duration.newBuilder().setSeconds(0))))) + .build(), + new Metadata())) + .thenThrow( + new GoogleAdsException( + new ApiException(null, GrpcStatusCode.of(Code.UNKNOWN), false), + GoogleAdsFailure.newBuilder() + .addErrors( + GoogleAdsError.newBuilder() + .setErrorCode( + ErrorCode.newBuilder() + .setQuotaError(QuotaError.RESOURCE_EXHAUSTED)) + .setDetails( + ErrorDetails.newBuilder() + .setQuotaErrorDetails( + QuotaErrorDetails.newBuilder() + .setRetryDelay( + Duration.newBuilder().setSeconds(42))))) + .build(), + new Metadata())) + .thenReturn( + ImmutableList.of( + SearchGoogleAdsStreamResponse.newBuilder() + .addResults(GoogleAdsRow.newBuilder()) + .build()) + .iterator()); + + PCollection rows = + pipeline + .apply(Create.of("123")) + .apply( + GoogleAdsIO.v14() + .read() + .withGoogleAdsClientFactory(new MockGoogleAdsClientFactory()) + .withRateLimitPolicy(TEST_POLICY_FACTORY) + .withDeveloperToken("abc") + .withQuery("GAQL")); + PAssert.thatSingleton(rows).isEqualTo(GoogleAdsRow.getDefaultInstance()); + + pipeline.run(); + } + } +} diff --git a/sdks/java/io/google-ads/src/test/java/org/apache/beam/sdk/io/googleads/MockGoogleAdsClientFactory.java b/sdks/java/io/google-ads/src/test/java/org/apache/beam/sdk/io/googleads/MockGoogleAdsClientFactory.java new file mode 100644 index 000000000000..258b47763a4b --- /dev/null +++ b/sdks/java/io/google-ads/src/test/java/org/apache/beam/sdk/io/googleads/MockGoogleAdsClientFactory.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.googleads; + +import static org.mockito.Answers.RETURNS_DEEP_STUBS; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.withSettings; + +import com.google.ads.googleads.lib.GoogleAdsClient; +import com.google.ads.googleads.v14.services.GoogleAdsServiceClient; +import com.google.ads.googleads.v14.services.stub.GoogleAdsServiceStub; +import org.checkerframework.checker.nullness.qual.Nullable; + +class MockGoogleAdsClientFactory implements GoogleAdsClientFactory { + static final GoogleAdsServiceStub GOOGLE_ADS_SERVICE_STUB_V14 = + mock(GoogleAdsServiceStub.class, withSettings().defaultAnswer(RETURNS_DEEP_STUBS)); + + @Override + public GoogleAdsClient newGoogleAdsClient( + GoogleAdsOptions options, + @Nullable String developerToken, + @Nullable Long linkedCustomerId, + @Nullable Long loginCustomerId) { + GoogleAdsClient mockGoogleAdsClient = + mock(GoogleAdsClient.class, withSettings().defaultAnswer(RETURNS_DEEP_STUBS)); + when(mockGoogleAdsClient.getVersion14().createGoogleAdsServiceClient()) + .thenReturn(GoogleAdsServiceClient.create(GOOGLE_ADS_SERVICE_STUB_V14)); + return mockGoogleAdsClient; + } +} diff --git a/settings.gradle.kts b/settings.gradle.kts index 5ba8096f4842..44ade9dab745 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -228,6 +228,7 @@ include(":sdks:java:io:bigquery-io-perf-tests") include(":sdks:java:io:cdap") include(":sdks:java:io:csv") include(":sdks:java:io:file-schema-transform") +include(":sdks:java:io:google-ads") include(":sdks:java:io:google-cloud-platform") include(":sdks:java:io:google-cloud-platform:expansion-service") include(":sdks:java:io:hadoop-common")