Skip to content

Commit

Permalink
Merge pull request #15871 from cdapio/oom-fix
Browse files Browse the repository at this point in the history
[CDAP-21135] Add pagination in apps list call made during pre-upgrade job
  • Loading branch information
adrikagupta authored Feb 13, 2025
2 parents 3f9a33f + e9d21b0 commit 7f29f87
Show file tree
Hide file tree
Showing 5 changed files with 147 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public void setUp() throws Throwable {
.setVerifySSLCert(false)
.setDefaultReadTimeout(60 * 1000)
.setUploadReadTimeout(120 * 1000)
.setConnectionConfig(connectionConfig).build();
.setConnectionConfig(connectionConfig).setAppListPageSize(25).build();
}

protected ClientConfig getClientConfig() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import com.google.gson.JsonObject;
import io.cdap.cdap.ConfigTestApp;
import io.cdap.cdap.api.Config;
import io.cdap.cdap.api.artifact.ArtifactSummary;
Expand Down Expand Up @@ -163,6 +164,42 @@ public void testAppConfig() throws Exception {
}
}

@Test
public void testPaginatedList() throws Exception {
ApplicationId app = NamespaceId.DEFAULT.app(FakeApp.NAME);
for (int i = 0; i < 30; i++) {
appClient.deploy(NamespaceId.DEFAULT, createAppJarFile(FakeApp.class,
FakeApp.NAME, "1.0.0-SNAPSHOT"));
ApplicationDetail appDetail = appClient.get(app);
app = new ApplicationId(app.getNamespace(), app.getApplication(), appDetail.getAppVersion());
appClient.waitForDeployed(app, 30, TimeUnit.SECONDS);
}
Assert.assertEquals(30, appClient.list(NamespaceId.DEFAULT).size());

int count = 0;
String token = null;
boolean isLastPage = false;
int currentResultSize = 0;
while (!isLastPage) {
JsonObject result = appClient.paginatedList(NamespaceId.DEFAULT, token);
currentResultSize = result.get("applications").getAsJsonArray().size();
count += currentResultSize;
token =
result.get("nextPageToken") == null ? null : result.get("nextPageToken").getAsString();
isLastPage = (token == null);
if (!isLastPage) {
Assert.assertEquals(25, currentResultSize);
}
}

Assert.assertEquals(5, currentResultSize);
Assert.assertEquals(30, count);

appClient.deleteAll(NamespaceId.DEFAULT);
appClient.waitForDeleted(app, 30, TimeUnit.SECONDS);
Assert.assertEquals(0, appClient.list(NamespaceId.DEFAULT).size());
}

@Test
public void testAppUpdate() throws Exception {
String artifactName = "cfg-programs";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.common.collect.Maps;
import com.google.common.reflect.TypeToken;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import io.cdap.cdap.api.Config;
import io.cdap.cdap.api.annotation.Beta;
import io.cdap.cdap.api.security.AccessException;
Expand Down Expand Up @@ -108,6 +109,34 @@ public List<ApplicationRecord> list(NamespaceId namespace)
}).getResponseObject();
}

/**
* Retrieves a paginated list of applications within the specified namespace.
*
* @param namespace The {@link NamespaceId} representing the namespace from which to list
* applications.
* @param nextPageToken The token for fetching the next page of results.
* @return A {@link JsonObject} containing the paginated list of applications and the next page
* token if available.
* @throws IOException If a network error occurred.
* @throws UnauthenticatedException If the request is not authorized successfully in th gateway
* server
* @throws UnauthorizedException If the caller lacks sufficient permissions.
*/
public JsonObject paginatedList(NamespaceId namespace, String nextPageToken)
throws IOException, UnauthenticatedException, UnauthorizedException {
StringBuilder pathBuilder = new StringBuilder("apps?latestOnly=false&pageSize=").append(
config.getAppListPageSize());

if (nextPageToken != null && !nextPageToken.isEmpty()) {
pathBuilder.append("&pageToken=").append(nextPageToken);
}
HttpResponse response = restClient.execute(HttpMethod.GET,
config.resolveNamespacedURLV3(namespace, pathBuilder.toString()),
config.getAccessToken());
return ObjectResponse.fromJsonBody(response, new TypeToken<JsonObject>() {
}).getResponseObject();
}

/**
* Lists all applications currently deployed, optionally filtering to only include applications
* that use the specified artifact name and version.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ public class ClientConfig {
private static final int DEFAULT_READ_TIMEOUT = 15000;
private static final int DEFAULT_CONNECT_TIMEOUT = 15000;

private static final int DEFAULT_APP_LIST_PAGE_SIZE = 25;

private static final String DEFAULT_VERSION = Constants.Gateway.API_VERSION_3_TOKEN;

@Nullable
Expand All @@ -56,6 +58,7 @@ public class ClientConfig {
private int uploadConnectTimeout;

private int unavailableRetryLimit;
private int appListPageSize;
private String apiVersion;
private Supplier<AccessToken> accessToken;
private Map<String, String> additionalHeaders;
Expand All @@ -65,7 +68,7 @@ private ClientConfig(@Nullable ConnectionConfig connectionConfig,
String apiVersion, Supplier<AccessToken> accessToken,
int defaultReadTimeout, int defaultConnectTimeout,
int uploadReadTimeout, int uploadConnectTimeout,
Map<String, String> additionalHeaders) {
Map<String, String> additionalHeaders, int appListPageSize) {
this.connectionConfig = connectionConfig;
this.verifySSLCert = verifySSLCert;
this.apiVersion = apiVersion;
Expand All @@ -76,6 +79,7 @@ private ClientConfig(@Nullable ConnectionConfig connectionConfig,
this.uploadReadTimeout = uploadReadTimeout;
this.uploadConnectTimeout = uploadConnectTimeout;
this.additionalHeaders = additionalHeaders;
this.appListPageSize = appListPageSize;
}

public static ClientConfig getDefault() {
Expand Down Expand Up @@ -167,6 +171,8 @@ public int getUploadConnectTimeout() {
return uploadConnectTimeout;
}

public int getAppListPageSize() { return appListPageSize; }

public Map<String, String> getAdditionalHeaders() {
return additionalHeaders;
}
Expand Down Expand Up @@ -198,6 +204,10 @@ public void setDefaultConnectTimeout(int defaultConnectTimeout) {
this.defaultConnectTimeout = defaultConnectTimeout;
}

public void setAppListPageSize(int appListPageSize) {
this.appListPageSize = appListPageSize;
}

public void setUploadReadTimeout(int uploadReadTimeout) {
this.uploadReadTimeout = uploadReadTimeout;
}
Expand Down Expand Up @@ -264,6 +274,7 @@ public static final class Builder {
private int uploadConnectTimeout = DEFAULT_UPLOAD_CONNECT_TIMEOUT;
private int defaultReadTimeout = DEFAULT_READ_TIMEOUT;
private int defaultConnectTimeout = DEFAULT_CONNECT_TIMEOUT;
private int appListPageSize = DEFAULT_APP_LIST_PAGE_SIZE;

private int unavailableRetryLimit = DEFAULT_SERVICE_UNAVAILABLE_RETRY_LIMIT;
private Map<String, String> additionalHeaders = new HashMap<>();
Expand All @@ -281,6 +292,7 @@ public Builder(ClientConfig clientConfig) {
this.defaultReadTimeout = clientConfig.defaultReadTimeout;
this.defaultConnectTimeout = clientConfig.defaultConnectTimeout;
this.unavailableRetryLimit = clientConfig.unavailableRetryLimit;
this.appListPageSize = clientConfig.appListPageSize;
}

public Builder setConnectionConfig(ConnectionConfig connectionConfig) {
Expand Down Expand Up @@ -313,6 +325,11 @@ public Builder setDefaultConnectTimeout(int defaultConnectTimeout) {
return this;
}

public Builder setAppListPageSize(int appListPageSize) {
this.appListPageSize = appListPageSize;
return this;
}

public Builder setAccessToken(Supplier<AccessToken> accessToken) {
this.accessToken = accessToken;
return this;
Expand Down Expand Up @@ -342,7 +359,8 @@ public ClientConfig build() {
return new ClientConfig(connectionConfig, verifySSLCert,
unavailableRetryLimit, apiVersion, accessToken,
defaultReadTimeout, defaultConnectTimeout,
uploadReadTimeout, uploadConnectTimeout, ImmutableMap.copyOf(additionalHeaders));
uploadReadTimeout, uploadConnectTimeout, ImmutableMap.copyOf(additionalHeaders),
appListPageSize);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
*/
package io.cdap.cdap.master.upgrade;

import com.google.common.reflect.TypeToken;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import io.cdap.cdap.api.retry.RetryableException;
import io.cdap.cdap.client.ApplicationClient;
import io.cdap.cdap.client.NamespaceClient;
Expand All @@ -39,6 +42,7 @@
import io.cdap.cdap.proto.id.WorkflowId;
import io.cdap.cdap.security.impersonation.SecurityUtil;
import java.io.IOException;
import java.lang.reflect.Type;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand All @@ -54,9 +58,12 @@
public class UpgradeJobMain {

private static final int DEFAULT_READ_TIMEOUT_MILLIS = 90 * 1000;
private static final int APP_LIST_PAGE_SIZE = 25;
private static final String SCHEDULED = "SCHEDULED";
private static final Logger LOG = LoggerFactory.getLogger(UpgradeJobMain.class);

private static final Gson GSON = new Gson();

public static void main(String[] args) {
if (args.length != 2) {
throw new RuntimeException(
Expand All @@ -72,7 +79,8 @@ public static void main(String[] args) {
ClientConfig.Builder clientConfigBuilder =
ClientConfig.builder()
.setDefaultReadTimeout(DEFAULT_READ_TIMEOUT_MILLIS)
.setConnectionConfig(connectionConfig);
.setConnectionConfig(connectionConfig)
.setAppListPageSize(APP_LIST_PAGE_SIZE);

// If used in proxy mode, attach a user ID header to upgrade jobs.
CConfiguration cConf = CConfiguration.create();
Expand Down Expand Up @@ -114,43 +122,62 @@ private static void suspendSchedulesAndStopPipelines(ClientConfig clientConfig)
namespaceIdList.add(NamespaceId.SYSTEM);

for (NamespaceId namespaceId : namespaceIdList) {
for (ApplicationRecord record : applicationClient.list(namespaceId)) {
ApplicationId applicationId =
new ApplicationId(namespaceId.getNamespace(), record.getName(), record.getAppVersion());
LOG.debug("Trying to stop schedule and workflows for application " + applicationId);
List<WorkflowId> workflowIds =
applicationClient.get(applicationId).getPrograms().stream()
.filter(programRecord -> programRecord.getType().equals(ProgramType.WORKFLOW))
.map(programRecord -> new WorkflowId(applicationId, programRecord.getName()))
.collect(Collectors.toList());
for (WorkflowId workflowId : workflowIds) {
List<ScheduleId> scheduleIds =
scheduleClient.listSchedules(workflowId).stream()
.map(scheduleDetail ->
new ScheduleId(namespaceId.getNamespace(), record.getName(),
scheduleDetail.getName()))
.collect(Collectors.toList());
for (ScheduleId scheduleId : scheduleIds) {
if (scheduleClient.getStatus(scheduleId).equals(SCHEDULED)) {
scheduleClient.suspend(scheduleId);
}
}
// Need to stop workflows first or else the program will fail to stop below
if (!programClient.getStatus(workflowId).equals(ProgramStatus.STOPPED.toString())) {
try {
programClient.stop(workflowId);
} catch (BadRequestException e) {
// There might be race condition between checking if the program is in RUNNING state and stopping it.
// This can cause programClient.stop to throw BadRequestException so verifying if the program
// transitioned to stop state since it was checked earlier or not.
String token = null;
boolean isLastPage = false;
while (!isLastPage) {
JsonObject paginatedListResponse = applicationClient.paginatedList(namespaceId, token);
token = paginatedListResponse.get("nextPageToken") == null ? null
: paginatedListResponse.get("nextPageToken").getAsString();
LOG.debug("Called paginated list API and got token: {}", token);
if (paginatedListResponse.get("applications").getAsJsonArray().size() != 0) {
Type appListType = new TypeToken<List<ApplicationRecord>>() {
}.getType();
List<ApplicationRecord> records = GSON.fromJson(
paginatedListResponse.get("applications").getAsJsonArray(), appListType);
for (ApplicationRecord record : records) {
ApplicationId applicationId =
new ApplicationId(namespaceId.getNamespace(),
record.getName(), record.getAppVersion());
LOG.debug("Trying to stop schedule and workflows for application " + applicationId);
List<WorkflowId> workflowIds =
applicationClient.get(applicationId).getPrograms().stream()
.filter(programRecord -> programRecord.getType().equals(ProgramType.WORKFLOW))
.map(programRecord -> new WorkflowId(applicationId, programRecord.getName()))
.collect(Collectors.toList());
for (WorkflowId workflowId : workflowIds) {
List<ScheduleId> scheduleIds =
scheduleClient.listSchedules(workflowId).stream()
.map(scheduleDetail ->
new ScheduleId(namespaceId.getNamespace(), record.getName(),
scheduleDetail.getName()))
.collect(Collectors.toList());
for (ScheduleId scheduleId : scheduleIds) {
if (scheduleClient.getStatus(scheduleId).equals(SCHEDULED)) {
scheduleClient.suspend(scheduleId);
}
}
// Need to stop workflows first or else the program will fail to stop below
if (!programClient.getStatus(workflowId).equals(ProgramStatus.STOPPED.toString())) {
// Pipeline still in running state. Continue with stopping rest of the pipelines in this namespace and
// next retry should try to stop/verify status for this pipeline.
shouldRetry = true;
try {
programClient.stop(workflowId);
} catch (BadRequestException e) {
// There might be race condition between checking if the program
// is in RUNNING state and stopping it. This can cause programClient.stop to
// throw BadRequestException so verifying if the program transitioned to stop
// state since it was checked earlier or not.
if (!programClient.getStatus(workflowId)
.equals(ProgramStatus.STOPPED.toString())) {
// Pipeline still in running state. Continue with stopping rest of the
// pipelines in this namespace and next retry should try to stop/verify status
// for this pipeline.
shouldRetry = true;
}
}
}
}
}
}
isLastPage = (token == null);
}
// At least one pipeline is still in running state so retry to verify pipeline status .
if (shouldRetry) {
Expand Down

0 comments on commit 7f29f87

Please sign in to comment.