Skip to content

Commit

Permalink
#13469 add the wf lang id column and migration
Browse files Browse the repository at this point in the history
  • Loading branch information
Jonathan authored and Jonathan committed Feb 14, 2018
1 parent d296db2 commit d2a2d43
Showing 1 changed file with 260 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@

package com.dotmarketing.startup.runonce;

import com.dotcms.concurrent.DotConcurrentFactory;
import com.dotcms.concurrent.DotSubmitter;
import com.dotcms.util.ConversionUtils;
import com.dotmarketing.business.UserAPI;
import com.dotmarketing.common.db.DotConnect;
import com.dotmarketing.common.db.Params;
import com.dotmarketing.db.DbConnectionFactory;
import com.dotmarketing.db.HibernateUtil;
import com.dotmarketing.exception.DotDataException;
Expand All @@ -12,12 +16,17 @@
import com.dotmarketing.startup.AbstractJDBCStartupTask;
import com.dotmarketing.util.Config;
import com.dotmarketing.util.Logger;
import com.dotmarketing.util.UtilMethods;
import com.dotmarketing.util.UUIDGenerator;
import com.google.common.collect.ImmutableList;
import com.google.common.primitives.Ints;

import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

import static com.dotcms.util.CollectionsUtils.map;

Expand All @@ -31,6 +40,16 @@
*/
public class Task04330WorkflowTaskAddLanguageIdColumn extends AbstractJDBCStartupTask {

protected static final String SYSTEM_USER_ID = UserAPI.SYSTEM_USER_ID;
protected static final int SELECT_MAX_ROWS = Config.getIntProperty("task.4330.selectmaxrows", 25);
protected static final int MAX_BATCH_SIZE = Config.getIntProperty("task.4330.maxbatchsize", 100);
protected static final String SELECT_CONTENTLET_LANGS_INFO_SQL = "select lang from contentlet_version_info where identifier=?";
protected static final String INSERT_WORKFLOW_TASK_SQL = "insert into (id, create_date, mode_date, created_by, assigned_to, status, webasset, language_id) values (?,?,?,?,?,?,?,?)";

protected static final long DEFAULT_LANGUAGE_ID = 1L;
protected final DotSubmitter submitter = DotConcurrentFactory.getInstance().getSubmitter(DotConcurrentFactory.DOT_SYSTEM_THREAD_POOL);


private static final Map<DbType, String> selectLanguageIdColumnSQLMap = map(
DbType.POSTGRESQL, "SELECT language_id FROM workflow_task",
DbType.MYSQL, "SELECT language_id FROM workflow_task",
Expand All @@ -46,10 +65,10 @@ public class Task04330WorkflowTaskAddLanguageIdColumn extends AbstractJDBCStartu
);

private static final Map<DbType, String> updateLanguageIdColumnSQLMap = map(
DbType.POSTGRESQL, "UPDATE workflow_task SET language_id = ?",
DbType.MYSQL, "UPDATE workflow_task SET language_id = ?",
DbType.ORACLE, "UPDATE workflow_task SET language_id = ?",
DbType.MSSQL, "UPDATE workflow_task SET language_id = ?"
DbType.POSTGRESQL, "UPDATE workflow_task SET language_id = ? where id = ?",
DbType.MYSQL, "UPDATE workflow_task SET language_id = ? where id = ?",
DbType.ORACLE, "UPDATE workflow_task SET language_id = ? where id = ?",
DbType.MSSQL, "UPDATE workflow_task SET language_id = ? where id = ?"
);

private static final Map<DbType, String> addLanguageIdIndexSQLMap = map(
Expand All @@ -66,7 +85,6 @@ public class Task04330WorkflowTaskAddLanguageIdColumn extends AbstractJDBCStartu
DbType.MSSQL, "ALTER TABLE workflow_task ADD CONSTRAINT FK_workflow_task_language FOREIGN KEY (language_id) REFERENCES language(id)"
);

private static final long INVALID_LANGUAGE_ID = -1;
@Override
public boolean forceRun() {
return Boolean.TRUE;
Expand All @@ -75,7 +93,6 @@ public boolean forceRun() {
@Override
public void executeUpgrade() throws DotDataException {


final boolean created = this.addLanguageIdColumn();

if (created) {
Expand Down Expand Up @@ -170,17 +187,176 @@ private void addWorkflowTaskLanguageIdFK() {

private void updateAllWorkflowTaskToDefaultLanguage() throws DotDataException {

final long id = this.getDefaultLanguageId();
Logger.debug(this, "Running upgrade Task04330WorkflowTaskAddLanguageIdColumn");

int selectStartRow = 0;
final List<WorkflowTask> workflowTasks = new ArrayList<>();
final List<Future<WorkflowTaskResult>> futures =
new ArrayList<>();
List<Map<String, Object>> workflowTaskList = new DotConnect()
.setSQL("select id, assigned_to, title, status, webasset from workflow_task")
.setStartRow(selectStartRow)
.setMaxRows(SELECT_MAX_ROWS)
.loadResults();


if (id > 0) {
Logger.info(this, "Running the upgrade for the workflow task languages");

Logger.info(this, "Updating all workflow_task to the default language: "
+ id + ", rows affected: " +
new DotConnect().executeUpdate(this.getUpdateLanguageIdColumnSQL(), id));
} else {
while (workflowTaskList != null && !workflowTaskList.isEmpty()) {

Logger.warn(this, "The default language id: " + id +
" is not valid, the current workflow task were not updated");
Logger.info(this, "Reading workflow task from " + selectStartRow +
", to " + (selectStartRow + SELECT_MAX_ROWS));

if (Config.getBooleanProperty("task.4330.runparallel", true)) {

Logger.info(this, "Processing the workflow task parallel");
workflowTaskList.stream().parallel().
forEach((Map<String, Object> workflowTaskMap) -> this.runTask(futures, workflowTaskMap));
} else {
Logger.info(this, "Processing the workflow task normally");
workflowTaskList.stream().
forEach((Map<String, Object> workflowTaskMap) -> this.runTask(futures, workflowTaskMap));
}

this.processFutures(futures, workflowTasks);
this.doBatch (workflowTasks);

selectStartRow += SELECT_MAX_ROWS;
workflowTaskList =
(List<Map<String, Object>>)new DotConnect()
.setSQL("select id, assigned_to, title, status, webasset from workflow_task")
.setStartRow(selectStartRow)
.setMaxRows (SELECT_MAX_ROWS)
.loadResults();
futures.clear();
}
}

private void doBatch(final List<WorkflowTask> workflowTasks) {

this.doBatch(workflowTasks, false);
}

private void doBatch(final List<WorkflowTask> workflowTasks, final boolean force) {

if (force || workflowTasks.size() > MAX_BATCH_SIZE) {

Logger.info(this, "Doing Workflow Task update batch");
int rowsAffected = 0;
final List<Params> paramsInsert = new ArrayList<>();
final List<Params> paramsUpdate = new ArrayList<>();
for (WorkflowTask workflowTask : workflowTasks) {

if (workflowTask.isUpdate()) {

paramsInsert.add(new Params(workflowTask.getLanguageId(),
workflowTask.getId()
));
} else {
paramsUpdate.add(new Params(workflowTask.getId(),
DbConnectionFactory.now(),
DbConnectionFactory.now(),
SYSTEM_USER_ID,
workflowTask.getAssignedTo(),
workflowTask.getStatus(),
workflowTask.getWebasset(),
workflowTask.getLanguageId()
));
}
}

try {

if (!paramsInsert.isEmpty()) {

final List<Integer> batchResult =
Ints.asList(new DotConnect().executeBatch(
INSERT_WORKFLOW_TASK_SQL,
paramsInsert));

rowsAffected = batchResult.stream().reduce(0, Integer::sum);
Logger.info(this, "Batch rows workflow task with language, inserted: " + rowsAffected);
}

if (!paramsUpdate.isEmpty()) {

final List<Integer> batchResult =
Ints.asList(new DotConnect().executeBatch(
this.getUpdateLanguageIdColumnSQL(),
paramsUpdate));

rowsAffected += batchResult.stream().reduce(0, Integer::sum);
Logger.info(this, "Batch rows workflow task with language, update: " + rowsAffected);
}

if (rowsAffected != workflowTasks.size()) {

Logger.warn(this, "Tried to update: " + workflowTasks.size() + " rows, but just updated: " + rowsAffected +
" rows. List of objects: " + workflowTasks);
}

Logger.info(this, "Batch DONE, rows affected: " + rowsAffected);
} catch (DotDataException e) {

Logger.error(this, e.getMessage(), e);
}

workflowTasks.clear();
}
}

private void runTask(final List<Future<WorkflowTaskResult>> futures,
final Map<String, Object> workflowTaskMap) {

futures.add(submitter.submit(()-> this.runWorkflowTask (workflowTaskMap)));
}

private WorkflowTaskResult runWorkflowTask(final Map<String, Object> workflowTaskMap) {

final String id = (String)workflowTaskMap.get("id");
final String assignedTo = (String)workflowTaskMap.get("assigned_to");
final String title = (String)workflowTaskMap.get("title");
final String status = (String)workflowTaskMap.get("status");
final String webasset = (String)workflowTaskMap.get("webasset");
final ImmutableList.Builder<WorkflowTask> tasks =
new ImmutableList.Builder<>();

try {

final List<Map<String, Object>> contentletLanguages =
new DotConnect().setSQL(SELECT_CONTENTLET_LANGS_INFO_SQL)
.addParam(webasset).loadObjectResults();

if (contentletLanguages.size() > 0) {

tasks.add(new WorkflowTask(true, id, assignedTo, title, status, webasset,
ConversionUtils.toLong(contentletLanguages.get(0).get("lang"), DEFAULT_LANGUAGE_ID)));
for (int i = 1; i < contentletLanguages.size(); ++i) {

tasks.add(new WorkflowTask(false, UUIDGenerator.generateUuid(), assignedTo, title, status, webasset,
ConversionUtils.toLong(contentletLanguages.get(i).get("lang"), DEFAULT_LANGUAGE_ID)));
}
}
} catch (DotDataException e) {

Logger.error(this, e.getMessage(), e);
}

return new WorkflowTaskResult(tasks.build());
}

private void processFutures(final List<Future<WorkflowTaskResult>> futures,
final List<WorkflowTask> workflowTasks) {

for (Future<WorkflowTaskResult> future : futures) {

try {

workflowTasks.addAll(future.get().getWorkflowTasks());
} catch (InterruptedException | ExecutionException e) {

Logger.error(this, e.getMessage(), e);
}
}
}

Expand All @@ -198,8 +374,6 @@ private String getUpdateLanguageIdColumnSQL() {
return updateLanguageIdColumnSQLMap.getOrDefault(dbType, null);
}



private String getAddLanguageIdIndex() {

final DbType dbType = DbType.getDbType(DbConnectionFactory.getDBType());
Expand All @@ -221,25 +395,6 @@ private String getSelectLanguageIdColumnSQL() {
return selectLanguageIdColumnSQLMap.getOrDefault(dbType, null);
}

private long getDefaultLanguageId() throws DotDataException {

final String languageCode = Config.getStringProperty("DEFAULT_LANGUAGE_CODE");
final String countryCode = Config.getStringProperty("DEFAULT_LANGUAGE_COUNTRY_CODE");
List<Map<String, Object>> results = null;

Logger.info(this, "Checking the default language, code: "
+ languageCode + ", country: " + countryCode);

results = (UtilMethods.isSet(countryCode))?
new DotConnect().setSQL("select id from Language where language_code=? and country_code=?")
.addParam(languageCode.toLowerCase()).addParam(countryCode.toUpperCase()).loadResults():
new DotConnect().setSQL("select id from Language where language_code=? and and (country_code = '' OR country_code IS NULL)")
.addParam(languageCode.toLowerCase()).loadObjectResults();

return null != results && results.size() > 0?
ConversionUtils.toLong(results.get(0).getOrDefault("id", INVALID_LANGUAGE_ID).toString()) :INVALID_LANGUAGE_ID;
}

private void closeCommitAndStartTransaction() throws DotHibernateException {
if (DbConnectionFactory.inTransaction()) {
HibernateUtil.closeAndCommitTransaction();
Expand Down Expand Up @@ -271,4 +426,73 @@ private void closeAndStartTransaction() throws DotHibernateException {
@Override
protected List<String> getTablesToDropConstraints() { return Collections.emptyList(); }


public static class WorkflowTaskResult {

private final List<WorkflowTask> workflowTasks;

public WorkflowTaskResult(List<WorkflowTask> workflowTasks) {
this.workflowTasks = workflowTasks;
}

public List<WorkflowTask> getWorkflowTasks() {
return workflowTasks;
}
}
public static class WorkflowTask {

private final boolean update;
private final String id;
private final String assignedTo;
private final String title;
private final String status;
private final String webasset;
private final long languageId;

public WorkflowTask(final boolean update,
final String id,
final String assignedTo,
final String title,
final String status,
final String webasset,
final long languageId) {

this.update = update;
this.id = id;
this.assignedTo = assignedTo;
this.title = title;
this.status = status;
this.webasset = webasset;
this.languageId = languageId;
}

public boolean isUpdate() {
return update;
}

public String getId() {
return id;
}

public String getAssignedTo() {
return assignedTo;
}

public String getTitle() {
return title;
}

public String getStatus() {
return status;
}

public String getWebasset() {
return webasset;
}

public long getLanguageId() {
return languageId;
}
}

}

0 comments on commit d2a2d43

Please sign in to comment.