Skip to content

Commit

Permalink
FRI-646 Broadcast Topic message after pulishing a release in SRS
Browse files Browse the repository at this point in the history
  • Loading branch information
QuyenLy87 committed Apr 14, 2023
1 parent 7fccd0f commit f687120
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 6 deletions.
7 changes: 7 additions & 0 deletions src/main/java/org/ihtsdo/buildcloud/config/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import javax.jms.Queue;
import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@SpringBootApplication(exclude = {
ContextInstanceDataAutoConfiguration.class,
Expand Down Expand Up @@ -200,6 +202,11 @@ public ActiveMQTextMessage buildStatusTextMessage(@Value("${srs.jms.queue.prefix
return getActiveMQTextMessage(queue);
}

@Bean
public ExecutorService cachedThreadExecutors() {
return Executors.newCachedThreadPool();
}

private ActiveMQTextMessage getActiveMQTextMessage(final String queue) throws JMSException {
final ActiveMQTextMessage activeMQTextMessage = new ActiveMQTextMessage();
activeMQTextMessage.setJMSReplyTo(new ActiveMQQueue(queue));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,26 +1,30 @@
package org.ihtsdo.buildcloud.core.service;

import com.amazonaws.services.s3.model.*;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.ListObjectsRequest;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.log4j.MDC;
import org.ihtsdo.buildcloud.core.dao.BuildDAO;
import org.ihtsdo.buildcloud.core.dao.ReleaseCenterDAO;
import org.ihtsdo.buildcloud.core.dao.helper.S3PathHelper;
import org.ihtsdo.buildcloud.core.entity.Build;
import org.ihtsdo.buildcloud.core.entity.BuildConfiguration;
import org.ihtsdo.buildcloud.core.entity.QATestConfig;
import org.ihtsdo.buildcloud.core.entity.ReleaseCenter;
import org.ihtsdo.buildcloud.core.entity.*;
import org.ihtsdo.buildcloud.core.service.build.RF2Constants;
import org.ihtsdo.buildcloud.core.service.helper.ProcessingStatus;
import org.ihtsdo.buildcloud.core.service.identifier.client.IdServiceRestClient;
import org.ihtsdo.buildcloud.core.service.identifier.client.SchemeIdType;
import org.ihtsdo.otf.dao.s3.S3Client;
import org.ihtsdo.otf.dao.s3.helper.FileHelper;
import org.ihtsdo.otf.dao.s3.helper.S3ClientHelper;
import org.ihtsdo.otf.jms.MessagingHelper;
import org.ihtsdo.otf.rest.client.RestClientException;
import org.ihtsdo.otf.rest.client.terminologyserver.pojo.CodeSystem;
import org.ihtsdo.otf.rest.client.terminologyserver.pojo.CodeSystemVersion;
import org.ihtsdo.otf.rest.exception.BadRequestException;
import org.ihtsdo.otf.rest.exception.BusinessServiceException;
import org.ihtsdo.otf.rest.exception.EntityAlreadyExistsException;
Expand All @@ -42,11 +46,13 @@
import org.springframework.util.FileCopyUtils;
import org.springframework.util.StreamUtils;

import javax.jms.JMSException;
import java.io.*;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
Expand Down Expand Up @@ -78,6 +84,12 @@ public class PublishServiceImpl implements PublishService {
@Value("${srs.storage.bucketName}")
private String storageBucketName;

@Value("${srs.jms.time-to-live-seconds}")
private int messageTimeToLiveSeconds;

@Value("${srs.jms.queue.prefix}.release-publish")
private String releasePublishDestination;

@Autowired
private TermServerService termServerService;

Expand All @@ -100,6 +112,12 @@ public class PublishServiceImpl implements PublishService {

@Autowired
private SchemaFactory schemaFactory;

@Autowired
private ExecutorService executorService;

@Autowired
private MessagingHelper messagingHelper;

private static final int BATCH_SIZE = 5000;

Expand Down Expand Up @@ -258,8 +276,33 @@ public void publishBuild(final Build build, boolean publishComponentIds, String
.orElse(null);
if (codeSystem != null && build.getConfiguration().getBranchPath().startsWith(codeSystem.getBranchPath())) {
try {
final String effectiveTimeSnomedFormat = build.getConfiguration().getEffectiveTimeSnomedFormat();
LOGGER.info("Update the release package for Code System Version: {}, {}, {}", codeSystem.getShortName(), build.getConfiguration().getEffectiveTimeSnomedFormat(), releaseFileName);
termServerService.updateCodeSystemVersionPackage(codeSystem.getShortName(), build.getConfiguration().getEffectiveTimeSnomedFormat(), releaseFileName);
termServerService.updateCodeSystemVersionPackage(codeSystem.getShortName(), effectiveTimeSnomedFormat, releaseFileName);

// Send notification via JMS
executorService.submit(() -> {
try {
String importDate = null;
List<CodeSystemVersion> versions = termServerService.getCodeSystemVersions(codeSystem.getShortName(), false, false);
for (CodeSystemVersion version : versions) {
if (String.valueOf(version.getEffectiveDate()).equals(effectiveTimeSnomedFormat)) {
importDate = DateFormatUtils.format(version.getImportDate(), Product.SNOMED_DATE_FORMAT);
break;
}
}

Map payload = new HashMap();
payload.put("codeSystemShortName", codeSystem.getShortName());
payload.put("effectiveDate", effectiveTimeSnomedFormat);
if (importDate != null) {
payload.put("importDate", importDate);
}
messagingHelper.publish(releasePublishDestination, payload, null, messageTimeToLiveSeconds);
} catch (JsonProcessingException | JMSException e) {
LOGGER.error("Failed to send PUBLISHED status to {}", releasePublishDestination);
}
});
} catch (Exception e) {
concurrentPublishingBuildStatus.put(getBuildUniqueKey(build), new ProcessingStatus(Status.COMPLETED.name(), "The build has been published successfully but failed to update Code System Version Package. Error message: " + e.getMessage()));
throw new BusinessServiceException("Failed to update Code System Version Package", e);
Expand Down
1 change: 1 addition & 0 deletions src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ spring.activemq.queuePrefetch=1

srs.jms.queue.prefix = local-srs
srs.jms.queue.concurrency=2
srs.jms.time-to-live-seconds =3600

# Time to live for the job status update messages sent to the client
#srs.jms.status.time-to-live-seconds = 3600
Expand Down

0 comments on commit f687120

Please sign in to comment.