Skip to content

Commit

Permalink
#11432 : Adding correct filtering criteria to avoid re-processing bun…
Browse files Browse the repository at this point in the history
…dles that were sent correctly and without problems. Adding more logging to the process to better track down bundle status. Adding Javadoc. (#11498)

(cherry picked from commit 4d4d6c1)
  • Loading branch information
jcastro-dotcms authored and jgambarios committed May 5, 2017
1 parent f26ee96 commit 256a7f5
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,34 @@
import com.dotmarketing.exception.DotHibernateException;
import com.dotmarketing.util.Logger;


/**
* Implementation class for the {@link PublishAuditAPI}.
*
* @author Alberto
* @version N/A
* @since Oct 18, 2012
*
*/
public class PublishAuditAPIImpl extends PublishAuditAPI {

private static PublishAuditAPIImpl instance= null;
private PublishAuditStatusMapper mapper = null;


/**
* Returns a singleton instance of the {@link PublishAuditAPI}.
*
* @return A unique instance of {@link PublishAuditAPI}.
*/
public static PublishAuditAPIImpl getInstance() {
if(instance==null)
if (instance == null) {
instance = new PublishAuditAPIImpl();

}
return instance;
}


/**
* Protected class constructor.
*/
protected PublishAuditAPIImpl(){
// Exists only to defeat instantiation.
mapper = new PublishAuditStatusMapper();
Expand All @@ -42,7 +57,6 @@ protected PublishAuditAPIImpl(){

private final String INSERTSQL="insert into publishing_queue_audit("+MANDATORY_FIELDS+") values("+MANDATORY_PLACE_HOLDER+")";


@Override
public void insertPublishAuditStatus(PublishAuditStatus pa)
throws DotPublisherException {
Expand Down Expand Up @@ -99,11 +113,11 @@ public void updatePublishAuditStatus(String bundleId, Status newStatus, PublishA
}
dc.addParam(newStatus.getCode());

if(history != null)
if(history != null) {
dc.addParam(history.getSerialized());
else
} else {
dc.addParam("");

}
if ( updateDates ) {
dc.addParam( new Date() );
dc.addParam( new Date() );
Expand Down Expand Up @@ -177,11 +191,12 @@ public PublishAuditStatus getPublishAuditStatus(String bundleId)
dc.addParam(bundleId);

List<Map<String, Object>> res = dc.loadObjectResults();
if(res.size() > 1)
if(res.size() > 1) {
throw new DotPublisherException("Found duplicate bundle status");
else {
if(!res.isEmpty())
} else {
if(!res.isEmpty()) {
return mapper.mapObject(res.get(0));
}
return null;
}
}catch(Exception e){
Expand All @@ -195,7 +210,8 @@ public PublishAuditStatus getPublishAuditStatus(String bundleId)
private final String SELECTSQLALL=
"SELECT * "+
"FROM publishing_queue_audit order by status_updated desc";


@Override
public List<PublishAuditStatus> getAllPublishAuditStatus() throws DotPublisherException {
try{
DotConnect dc = new DotConnect();
Expand All @@ -209,7 +225,8 @@ public List<PublishAuditStatus> getAllPublishAuditStatus() throws DotPublisherEx
DbConnectionFactory.closeConnection();
}
}


@Override
public List<PublishAuditStatus> getAllPublishAuditStatus(Integer limit, Integer offset) throws DotPublisherException {
try{
DotConnect dc = new DotConnect();
Expand All @@ -225,12 +242,12 @@ public List<PublishAuditStatus> getAllPublishAuditStatus(Integer limit, Integer
}
}


private final String SELECTSQLMAXDATE=
"select max(c.create_date) as max_date "+
"from publishing_queue_audit c " +
"where c.status != ? ";


@Override
public Date getLastPublishAuditStatusDate() throws DotPublisherException {
try{
DotConnect dc = new DotConnect();
Expand All @@ -240,8 +257,9 @@ public Date getLastPublishAuditStatusDate() throws DotPublisherException {

List<Map<String, Object>> res = dc.loadObjectResults();

if(!res.isEmpty())
if(!res.isEmpty()) {
return (Date) res.get(0).get("max_date");
}
return null;

}catch(Exception e){
Expand All @@ -253,7 +271,8 @@ public Date getLastPublishAuditStatusDate() throws DotPublisherException {
private final String SELECTSQLALLCOUNT=
"SELECT count(*) as count "+
"FROM publishing_queue_audit ";


@Override
public Integer countAllPublishAuditStatus() throws DotPublisherException {
try{
DotConnect dc = new DotConnect();
Expand All @@ -268,29 +287,32 @@ public Integer countAllPublishAuditStatus() throws DotPublisherException {
private final String SELECTSQLPENDING=
"SELECT * "+
"FROM publishing_queue_audit " +
"WHERE status = ? or status = ? or status = ? or status = ? or status = ?";

"WHERE status = ? or status = ? or status = ? or status = ? or status = ? or status = ?";

@Override
public List<PublishAuditStatus> getPendingPublishAuditStatus() throws DotPublisherException {
try{
DotConnect dc = new DotConnect();
dc.setSQL(SELECTSQLPENDING);

dc.addParam(PublishAuditStatus.Status.BUNDLE_SENT_SUCCESSFULLY.getCode());
dc.addParam(PublishAuditStatus.Status.FAILED_TO_SEND_TO_SOME_GROUPS.getCode());
dc.addParam(PublishAuditStatus.Status.FAILED_TO_SEND_TO_ALL_GROUPS.getCode());
dc.addParam(PublishAuditStatus.Status.RECEIVED_BUNDLE.getCode());
dc.addParam(PublishAuditStatus.Status.PUBLISHING_BUNDLE.getCode());
dc.addParam(PublishAuditStatus.Status.WAITING_FOR_PUBLISHING.getCode());
return mapper.mapRows(dc.loadObjectResults());
}catch(Exception e){
Logger.debug(PublisherUtil.class,e.getMessage(),e);
throw new DotPublisherException("Unable to get list of elements with error:"+e.getMessage(), e);
}
}

@Override
public PublishAuditStatus updateAuditTable ( String endpointId, String groupId, String bundleFolder ) throws DotPublisherException {
return updateAuditTable( endpointId, groupId, bundleFolder, false );
}

@Override
public PublishAuditStatus updateAuditTable ( String endpointId, String groupId, String bundleFolder, Boolean updateDates ) throws DotPublisherException {

//Status
Expand All @@ -316,4 +338,5 @@ public PublishAuditStatus updateAuditTable ( String endpointId, String groupId,

return status;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -565,22 +565,21 @@ public List<Map<String,Object>> getQueueBundleIds(int limit, int offest) throws
"left join publishing_queue_audit a "+
"ON p.bundle_id=a.bundle_id "+
"where "+
"((a.status != ? and a.status != ?) or a.status is null ) and p.publish_date is not null "+
"((a.status != ? and a.status != ? AND a.status != ?) or a.status is null ) and p.publish_date is not null "+
"order by publish_date ASC,operation ASC";

@Override
public List<Map<String, Object>> getQueueBundleIdsToProcess() throws DotPublisherException {
try{
DotConnect dc = new DotConnect();

dc.setSQL(SQLGETBUNDLESTOPROCESS);

dc.addParam(Status.BUNDLE_SENT_SUCCESSFULLY.getCode());
dc.addParam(Status.PUBLISHING_BUNDLE.getCode());
dc.addParam(Status.WAITING_FOR_PUBLISHING.getCode());
return dc.loadObjectResults();
}catch(Exception e){
Logger.error(PublisherUtil.class,e.getMessage(),e);
throw new DotPublisherException("Unable to get list of elements with error:"+e.getMessage(), e);
throw new DotPublisherException("Unable to get list of bundles to process: " + e.getMessage(), e);
}finally{
DbConnectionFactory.closeConnection();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,21 +42,25 @@
import com.dotmarketing.util.Config;
import com.dotmarketing.util.Logger;
import com.dotmarketing.util.PushPublishLogger;
import com.dotmarketing.util.UtilMethods;

/**
* This job is in charge of performing two actions:
* This job is in charge of auditing and triggering the push publishing process
* in dotCMS. This job is executed right after a user marks contents or a bundle
* for Push Publishing, and at second zero of every minute. Basically, what this
* job does is:
* <ol>
* <li><b>Bundle status update:</b> Each bundle is associated to an environment,
* which contains one or more end-points. This job will connect to one or all of
* them to verify the deployment status of the bundle in order to update its
* status in the sender server. Examples of status can be (please see the
* {@link Status} class to see all the available status options):
* status in the sender server. Examples of status can be:
* <ul>
* <li><code>Success</code></li>
* <li><code>Bundle sent</code></li>
* <li><code>Failed to Publish</code></li>
* <li><code>Failed to send to all environments</code></li>
* <li>etc.</li>
* <li>etc. (Please see the {@link Status} class to see all the available status
* options.)</li>
* </ul>
* </li>
* <li><b>Pending bundle push:</b> Besides auditing the different bundles that
Expand All @@ -66,6 +70,7 @@
* </ol>
*
* @author Alberto
* @version N/A
* @since Oct 5, 2012
*
*/
Expand All @@ -81,17 +86,16 @@ public class PublisherQueueJob implements StatefulJob {
private EnvironmentAPI environmentAPI = APILocator.getEnvironmentAPI();
private PublishingEndPointAPI publisherEndPointAPI = APILocator.getPublisherEndPointAPI();

/**
/**
* Reads from the publishing queue table and depending of the publish date
* will send a bundle to publish
* ({@link com.dotcms.publishing.PublisherAPI#publish(com.dotcms.publishing.PublisherConfig)}).
* will send a bundle to publish (see
* {@link com.dotcms.publishing.PublisherAPI#publish(PublisherConfig)}).
*
* @param jobExecution
* - Context Containing the current job context information (the data).
* - Context Containing the current job context information (the
* data).
* @throws JobExecutionException
* An exception occurred while executing the job.
* @see PublisherAPI
* @see PublisherAPIImpl
*/
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
try {
Expand All @@ -113,8 +117,18 @@ public void execute(JobExecutionContext jobExecutionContext) throws JobExecution
PublishAuditStatus status;
PublishAuditHistory historyPojo;
String tempBundleId;

if (bundles.size() > 0) {
Logger.info(this, "");
Logger.info(this, "Found " + bundles.size() + " bundle(s) to process.");
Logger.info(this, "");
}
for ( Map<String, Object> bundle : bundles ) {
Logger.info(this, "===========================================================");
Logger.info(this, "Processing bundle:");
Logger.info(this, "-> ID: " + bundle.get("bundle_id"));
Logger.info(this, "-> Status: "
+ (UtilMethods.isSet(bundle.get("status")) ? bundle.get("status") : "Starting"));
Logger.info(this, "===========================================================");
Date publishDate = (Date) bundle.get("publish_date");
if ( publishDate.before(new Date()) ) {
tempBundleId = (String) bundle.get("bundle_id");
Expand Down Expand Up @@ -316,6 +330,7 @@ private void updateAuditStatus() throws DotPublisherException, DotDataException
}
localHistory.setPublishStart(publishStart);
localHistory.setPublishEnd(publishEnd);
PublishAuditStatus.Status bundleStatus = null;
if ( localHistory.getNumTries() >= MAX_NUM_TRIES && (countGroupFailed > 0 || countGroupPublishing > 0) ) {
// If bundle cannot be installed after [MAX_NUM_TRIES] tries
// and some groups could not be published
Expand All @@ -324,38 +339,37 @@ private void updateAuditStatus() throws DotPublisherException, DotDataException
APILocator.getPushedAssetsAPI().deletePushedAssets(bundleAudit.getBundleId(), environment.getId());
}
PushPublishLogger.log(this.getClass(), "Status Update: Failed to publish");
pubAuditAPI.updatePublishAuditStatus(bundleAudit.getBundleId(),
PublishAuditStatus.Status.FAILED_TO_PUBLISH,
localHistory);
bundleStatus = PublishAuditStatus.Status.FAILED_TO_PUBLISH;
pubAuditAPI.updatePublishAuditStatus(bundleAudit.getBundleId(), bundleStatus, localHistory);
pubAPI.deleteElementsFromPublishQueueTable(bundleAudit.getBundleId());
} else if ( countGroupFailed > 0 && countGroupOk > 0 ) {
} else if (countGroupFailed > 0 && (countGroupOk + countGroupFailed) == endpointTrackingMap.size()) {
// If bundle was installed in some groups only
pubAuditAPI.updatePublishAuditStatus(bundleAudit.getBundleId(),
PublishAuditStatus.Status.FAILED_TO_SEND_TO_SOME_GROUPS,
localHistory);
bundleStatus = PublishAuditStatus.Status.FAILED_TO_SEND_TO_SOME_GROUPS;
pubAuditAPI.updatePublishAuditStatus(bundleAudit.getBundleId(), bundleStatus, localHistory);
} else if ( countGroupFailed == endpointTrackingMap.size() ) {
// If bundle cannot be installed in all groups
pubAuditAPI.updatePublishAuditStatus(bundleAudit.getBundleId(),
PublishAuditStatus.Status.FAILED_TO_SEND_TO_ALL_GROUPS,
localHistory);
bundleStatus = PublishAuditStatus.Status.FAILED_TO_SEND_TO_ALL_GROUPS;
pubAuditAPI.updatePublishAuditStatus(bundleAudit.getBundleId(), bundleStatus, localHistory);
} else if ( countGroupOk == endpointTrackingMap.size() ) {
// If bundle was installed in all groups
PushPublishLogger.log(this.getClass(), "Status Update: Success");
pubAuditAPI.updatePublishAuditStatus(bundleAudit.getBundleId(),
PublishAuditStatus.Status.SUCCESS,
localHistory);
bundleStatus = PublishAuditStatus.Status.SUCCESS;
pubAuditAPI.updatePublishAuditStatus(bundleAudit.getBundleId(), bundleStatus, localHistory);
pubAPI.deleteElementsFromPublishQueueTable(bundleAudit.getBundleId());
} else if ( countGroupPublishing == endpointTrackingMap.size() ) {
// If bundle is still publishing in all groups
pubAuditAPI.updatePublishAuditStatus(bundleAudit.getBundleId(),
PublishAuditStatus.Status.PUBLISHING_BUNDLE,
localHistory);
bundleStatus = PublishAuditStatus.Status.PUBLISHING_BUNDLE;
pubAuditAPI.updatePublishAuditStatus(bundleAudit.getBundleId(), bundleStatus, localHistory);
} else {
// Otherwise, just keep trying to publish the bundle
pubAuditAPI.updatePublishAuditStatus(bundleAudit.getBundleId(),
PublishAuditStatus.Status.WAITING_FOR_PUBLISHING,
localHistory);
bundleStatus = PublishAuditStatus.Status.WAITING_FOR_PUBLISHING;
pubAuditAPI.updatePublishAuditStatus(bundleAudit.getBundleId(), bundleStatus, localHistory);
}
Logger.info(this, "===========================================================");
Logger.info(this, String.format("For bundle '%s':", bundleAudit.getBundleId()));
Logger.info(this, String.format("-> Re-try attempts: %d", localHistory.getNumTries()));
Logger.info(this, String.format("-> Status: %s", bundleStatus.toString()));
Logger.info(this, "===========================================================");
} else {
//We delete the Publish Queue.
pubAPI.deleteElementsFromPublishQueueTable(bundleAudit.getBundleId());
Expand Down

0 comments on commit 256a7f5

Please sign in to comment.