From 93babb7214238968b5a84ebed0c1134b4712e68c Mon Sep 17 00:00:00 2001 From: Adrika Gupta Date: Tue, 25 Feb 2025 18:13:37 +0530 Subject: [PATCH] make list apps in pre-upgrade and post-upgrade paginated --- .../io/cdap/cdap/client/ProgramClient.java | 68 ++++++++++++------- .../master/upgrade/PostUpgradeJobMain.java | 40 +++++++++-- .../cdap/master/upgrade/UpgradeJobMain.java | 2 +- 3 files changed, 77 insertions(+), 33 deletions(-) diff --git a/cdap-client/src/main/java/io/cdap/cdap/client/ProgramClient.java b/cdap-client/src/main/java/io/cdap/cdap/client/ProgramClient.java index 9aeaeaa0f55a..68d4617170bb 100644 --- a/cdap-client/src/main/java/io/cdap/cdap/client/ProgramClient.java +++ b/cdap-client/src/main/java/io/cdap/cdap/client/ProgramClient.java @@ -21,6 +21,7 @@ import com.google.common.reflect.TypeToken; import com.google.gson.Gson; import com.google.gson.GsonBuilder; +import com.google.gson.JsonObject; import io.cdap.cdap.api.annotation.Beta; import io.cdap.cdap.api.customaction.CustomActionSpecification; import io.cdap.cdap.api.workflow.ConditionSpecification; @@ -275,36 +276,53 @@ public void stopAll(NamespaceId namespace) throws IOException, UnauthenticatedException, InterruptedException, TimeoutException, UnauthorizedException, ApplicationNotFoundException, BadRequestException { - List allApps = applicationClient.list(namespace); - for (ApplicationRecord applicationRecord : allApps) { - ApplicationId appId = new ApplicationId(namespace.getNamespace(), applicationRecord.getName(), - applicationRecord.getAppVersion()); - List programRecords = applicationClient.listPrograms(appId); - for (ProgramRecord programRecord : programRecords) { - try { - ProgramId program = appId.program(programRecord.getType(), programRecord.getName()); - String status = this.getStatus(program); - if (!status.equals("STOPPED")) { + String token = null; + boolean isLastPage = false; + while (!isLastPage) { + JsonObject paginatedListResponse = applicationClient.paginatedList(namespace, token); + token = paginatedListResponse.get("nextPageToken") == null ? null + : paginatedListResponse.get("nextPageToken").getAsString(); + LOG.debug("Called paginated list API to stop programs and got token: {}", token); + if (paginatedListResponse.get("applications").getAsJsonArray().size() != 0) { + Type appListType = new TypeToken>() { + }.getType(); + List records = GSON.fromJson( + paginatedListResponse.get("applications").getAsJsonArray(), appListType); + for (ApplicationRecord applicationRecord : records) { + ApplicationId appId = new ApplicationId(namespace.getNamespace(), + applicationRecord.getName(), + applicationRecord.getAppVersion()); + List programRecords = applicationClient.listPrograms(appId); + for (ProgramRecord programRecord : programRecords) { try { - this.stop(program); - } catch (IOException ioe) { - // ProgramClient#stop calls RestClient, which throws an IOException if the HTTP response code is 400, - // which can be due to the program already being stopped when calling stop on it. - // Most likely, there was a race condition that the program stopped between the time we checked its - // status and calling the stop method. - LOG.warn( - "Program {} is already stopped, proceeding even though the following exception is raised.", - program, ioe); + ProgramId program = appId.program(programRecord.getType(), programRecord.getName()); + String status = this.getStatus(program); + if (!status.equals("STOPPED")) { + try { + this.stop(program); + } catch (IOException ioe) { + // ProgramClient#stop calls RestClient, which throws an IOException if the + // HTTP response code is 400, which can be due to the program already being + // stopped when calling stop on it.Most likely, there was a race condition that + // the program stopped between the time we checked its status and calling + // the stop method. + LOG.warn( + "Program {} is already stopped, proceeding even though the following exception is raised.", + program, ioe); + } + // YarnTwillController has a timeout of 60 seconds after sending a stop signal + // using ZK. If this fails, it kills the app usin Yarn API. In cases where there + // is a failure to send the message via ZK, it waits for 60 seconds. + // So a wait of 60 seconds here is not enough. + this.waitForStatus(program, ProgramStatus.STOPPED, 120, TimeUnit.SECONDS); + } + } catch (ProgramNotFoundException e) { + // IGNORE } - // YarnTwillController has a timeout of 60 seconds after sending a stop signal using ZK. - // If this fails, it kills the app usin Yarn API. In cases where there is a failure to send the message - // via ZK, it waits for 60 seconds. So a wait of 60 seconds here is not enough. - this.waitForStatus(program, ProgramStatus.STOPPED, 120, TimeUnit.SECONDS); } - } catch (ProgramNotFoundException e) { - // IGNORE } } + isLastPage = (token == null); } } diff --git a/cdap-master/src/main/java/io/cdap/cdap/master/upgrade/PostUpgradeJobMain.java b/cdap-master/src/main/java/io/cdap/cdap/master/upgrade/PostUpgradeJobMain.java index 3141bb8e1c92..a6a6bd472861 100644 --- a/cdap-master/src/main/java/io/cdap/cdap/master/upgrade/PostUpgradeJobMain.java +++ b/cdap-master/src/main/java/io/cdap/cdap/master/upgrade/PostUpgradeJobMain.java @@ -15,6 +15,9 @@ */ package io.cdap.cdap.master.upgrade; +import com.google.common.reflect.TypeToken; +import com.google.gson.Gson; +import com.google.gson.JsonObject; import io.cdap.cdap.client.ApplicationClient; import io.cdap.cdap.client.NamespaceClient; import io.cdap.cdap.client.ProgramClient; @@ -33,9 +36,12 @@ import io.cdap.cdap.proto.id.NamespaceId; import io.cdap.cdap.security.impersonation.SecurityUtil; import java.io.IOException; +import java.lang.reflect.Type; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Restarts all schedules and programs stopped between startTimeMillis and now. The first parameter @@ -49,6 +55,9 @@ public class PostUpgradeJobMain { private static final int DEFAULT_READ_TIMEOUT_MILLIS = 90 * 1000; + private static final Logger LOG = LoggerFactory.getLogger(PostUpgradeJobMain.class); + private static final Gson GSON = new Gson(); + private static final int APP_LIST_PAGE_SIZE = 25; public static void main(String[] args) { if (args.length < 3 || args.length > 4) { @@ -65,7 +74,8 @@ public static void main(String[] args) { ClientConfig.Builder clientConfigBuilder = ClientConfig.builder() .setDefaultReadTimeout(DEFAULT_READ_TIMEOUT_MILLIS) - .setConnectionConfig(connectionConfig); + .setConnectionConfig(connectionConfig) + .setAppListPageSize(APP_LIST_PAGE_SIZE); // If used in proxy mode, attach a user ID header to upgrade jobs. CConfiguration cConf = CConfiguration.create(); @@ -111,12 +121,28 @@ private static void restartPipelinesAndSchedules( } for (NamespaceId namespaceId : namespaceIdList) { - for (ApplicationRecord record : applicationClient.list(namespaceId)) { - ApplicationId applicationId = - new ApplicationId(namespaceId.getNamespace(), record.getName(), record.getAppVersion()); - programClient.restart(applicationId, - TimeUnit.MILLISECONDS.toSeconds(startTimeMillis), - TimeUnit.MILLISECONDS.toSeconds(endTimeMillis)); + String token = null; + boolean isLastPage = false; + while (!isLastPage) { + JsonObject paginatedListResponse = applicationClient.paginatedList(namespaceId, token); + token = paginatedListResponse.get("nextPageToken") == null ? null + : paginatedListResponse.get("nextPageToken").getAsString(); + LOG.debug("Called paginated list API to restart programs and got token: {}", token); + if (paginatedListResponse.get("applications").getAsJsonArray().size() != 0) { + Type appListType = new TypeToken>() { + }.getType(); + List records = GSON.fromJson( + paginatedListResponse.get("applications").getAsJsonArray(), appListType); + for (ApplicationRecord record : records) { + ApplicationId applicationId = + new ApplicationId(namespaceId.getNamespace(), record.getName(), + record.getAppVersion()); + programClient.restart(applicationId, + TimeUnit.MILLISECONDS.toSeconds(startTimeMillis), + TimeUnit.MILLISECONDS.toSeconds(endTimeMillis)); + } + } + isLastPage = (token == null); } // Re-enable schedules in a namespace AFTER programs have been restarted. diff --git a/cdap-master/src/main/java/io/cdap/cdap/master/upgrade/UpgradeJobMain.java b/cdap-master/src/main/java/io/cdap/cdap/master/upgrade/UpgradeJobMain.java index e4ca3c124c96..b1345083947c 100644 --- a/cdap-master/src/main/java/io/cdap/cdap/master/upgrade/UpgradeJobMain.java +++ b/cdap-master/src/main/java/io/cdap/cdap/master/upgrade/UpgradeJobMain.java @@ -128,7 +128,7 @@ private static void suspendSchedulesAndStopPipelines(ClientConfig clientConfig) JsonObject paginatedListResponse = applicationClient.paginatedList(namespaceId, token); token = paginatedListResponse.get("nextPageToken") == null ? null : paginatedListResponse.get("nextPageToken").getAsString(); - LOG.debug("Called paginated list API and got token: {}", token); + LOG.debug("Called paginated list API to schedule and workflows and got token: {}", token); if (paginatedListResponse.get("applications").getAsJsonArray().size() != 0) { Type appListType = new TypeToken>() { }.getType();