diff --git a/src/main/java/edu/harvard/iq/dataverse/api/Datasets.java b/src/main/java/edu/harvard/iq/dataverse/api/Datasets.java index 4b919c5ed82..b93257bc0c3 100644 --- a/src/main/java/edu/harvard/iq/dataverse/api/Datasets.java +++ b/src/main/java/edu/harvard/iq/dataverse/api/Datasets.java @@ -4009,6 +4009,7 @@ public Response addGlobusFilesToDataset(@Context ContainerRequestContext crc, logger.info(" ==== (api addGlobusFilesToDataset) jsonData ====== " + jsonData); if (!systemConfig.isHTTPUpload()) { + // @todo why isHTTPUpload()? - shouldn't it be checking isGlobusUpload() here? return error(Response.Status.SERVICE_UNAVAILABLE, BundleUtil.getStringFromBundle("file.api.httpDisabled")); } @@ -4075,7 +4076,11 @@ public Response addGlobusFilesToDataset(@Context ContainerRequestContext crc, String requestUrl = SystemConfig.getDataverseSiteUrlStatic(); // Async Call - globusService.globusUpload(jsonObject, token, dataset, requestUrl, authUser); + try { + globusService.globusUpload(jsonObject, token, dataset, requestUrl, authUser); + } catch (IllegalArgumentException ex) { + return badRequest("Invalid parameters: "+ex.getMessage()); + } return ok("Async call to Globus Upload started "); diff --git a/src/main/java/edu/harvard/iq/dataverse/globus/GlobusServiceBean.java b/src/main/java/edu/harvard/iq/dataverse/globus/GlobusServiceBean.java index d6b56b51fa5..eb1eb47611a 100644 --- a/src/main/java/edu/harvard/iq/dataverse/globus/GlobusServiceBean.java +++ b/src/main/java/edu/harvard/iq/dataverse/globus/GlobusServiceBean.java @@ -22,7 +22,6 @@ import jakarta.json.JsonString; import jakarta.json.JsonValue.ValueType; import jakarta.json.stream.JsonParsingException; -import jakarta.servlet.http.HttpServletRequest; import jakarta.ws.rs.HttpMethod; import static edu.harvard.iq.dataverse.util.json.JsonPrinter.json; @@ -33,7 +32,6 @@ import java.net.HttpURLConnection; import java.net.MalformedURLException; import java.net.URL; -import java.net.URLEncoder; import java.sql.Timestamp; import java.text.SimpleDateFormat; import java.time.Duration; @@ -62,10 +60,10 @@ import edu.harvard.iq.dataverse.dataaccess.GlobusAccessibleStore; import edu.harvard.iq.dataverse.dataaccess.StorageIO; import edu.harvard.iq.dataverse.datasetutility.AddReplaceFileHelper; -import edu.harvard.iq.dataverse.engine.command.DataverseRequest; import edu.harvard.iq.dataverse.ingest.IngestServiceBean; import edu.harvard.iq.dataverse.privateurl.PrivateUrl; import edu.harvard.iq.dataverse.privateurl.PrivateUrlServiceBean; +import edu.harvard.iq.dataverse.settings.FeatureFlags; import edu.harvard.iq.dataverse.settings.JvmSettings; import edu.harvard.iq.dataverse.settings.SettingsServiceBean; import edu.harvard.iq.dataverse.util.FileUtil; @@ -73,6 +71,8 @@ import edu.harvard.iq.dataverse.util.URLTokenUtil; import edu.harvard.iq.dataverse.util.UrlSignerUtil; import edu.harvard.iq.dataverse.util.json.JsonUtil; +import jakarta.persistence.EntityManager; +import jakarta.persistence.PersistenceContext; import jakarta.ws.rs.core.Response; @Stateless @@ -105,6 +105,8 @@ public class GlobusServiceBean implements java.io.Serializable { IngestServiceBean ingestSvc; @EJB SystemConfig systemConfig; + @PersistenceContext(unitName = "VDCNet-ejbPU") + private EntityManager em; private static final Logger logger = Logger.getLogger(GlobusServiceBean.class.getCanonicalName()); private static final SimpleDateFormat logFormatter = new SimpleDateFormat("yyyy-MM-dd'T'HH-mm-ss"); @@ -391,19 +393,33 @@ private void monitorTemporaryPermissions(String ruleId, long datasetId) { * @return * @throws MalformedURLException */ - public GlobusTask getTask(String accessToken, String taskId, Logger globusLogger) throws MalformedURLException { + public GlobusTaskState getTask(String accessToken, String taskId, Logger globusLogger) { - URL url = new URL("https://transfer.api.globusonline.org/v0.10/endpoint_manager/task/" + taskId); + Logger myLogger = globusLogger != null ? globusLogger : logger; + + URL url; + try { + url = new URL("https://transfer.api.globusonline.org/v0.10/endpoint_manager/task/" + taskId); + } catch (MalformedURLException mue) { + myLogger.warning("Malformed URL exception when trying to contact Globus. Globus API url: " + + "https://transfer.api.globusonline.org/v0.10/endpoint_manager/task/" + + taskId); + return null; + } MakeRequestResponse result = makeRequest(url, "Bearer", accessToken, "GET", null); - GlobusTask task = null; + GlobusTaskState task = null; if (result.status == 200) { - task = parseJson(result.jsonResponse, GlobusTask.class, false); + task = parseJson(result.jsonResponse, GlobusTaskState.class, false); } if (result.status != 200) { - globusLogger.warning("Cannot find information for the task " + taskId + " : Reason : " + // @todo It should probably retry it 2-3 times before giving up; + // similarly, it should probably differentiate between a "no such task" + // response and something intermittent like a server/network error or + // an expired token... i.e. something that's recoverable (?) + myLogger.warning("Cannot find information for the task " + taskId + " : Reason : " + result.jsonResponse.toString()); } @@ -646,11 +662,17 @@ private String getGlobusDownloadScript(Dataset dataset, ApiToken apiToken, List< @Asynchronous @TransactionAttribute(TransactionAttributeType.REQUIRES_NEW) public void globusUpload(JsonObject jsonData, ApiToken token, Dataset dataset, String httpRequestUrl, - AuthenticatedUser authUser) throws ExecutionException, InterruptedException, MalformedURLException { + AuthenticatedUser authUser) throws IllegalArgumentException, ExecutionException, InterruptedException, MalformedURLException { - Integer countAll = 0; - Integer countSuccess = 0; - Integer countError = 0; + // Before we do anything else, let's do some basic validation of what + // we've been passed: + + JsonArray filesJsonArray = jsonData.getJsonArray("files"); + + if (filesJsonArray == null || filesJsonArray.size() < 1) { + throw new IllegalArgumentException("No valid json entries supplied for the files being uploaded"); + } + String logTimestamp = logFormatter.format(new Date()); Logger globusLogger = Logger.getLogger( "edu.harvard.iq.dataverse.upload.client.DatasetServiceBean." + "GlobusUpload" + logTimestamp); @@ -674,11 +696,13 @@ public void globusUpload(JsonObject jsonData, ApiToken token, Dataset dataset, S } logger.fine("json: " + JsonUtil.prettyPrint(jsonData)); + + globusLogger.info("Globus upload initiated"); String taskIdentifier = jsonData.getString("taskIdentifier"); GlobusEndpoint endpoint = getGlobusEndpoint(dataset); - GlobusTask task = getTask(endpoint.getClientToken(), taskIdentifier, globusLogger); + GlobusTaskState task = getTask(endpoint.getClientToken(), taskIdentifier, globusLogger); String ruleId = getRuleId(endpoint, task.getOwner_id(), "rw"); logger.fine("Found rule: " + ruleId); if (ruleId != null) { @@ -688,15 +712,43 @@ public void globusUpload(JsonObject jsonData, ApiToken token, Dataset dataset, S rulesCache.invalidate(ruleId); } } - + // Wait before first check Thread.sleep(5000); + + if (FeatureFlags.GLOBUS_USE_EXPERIMENTAL_ASYNC_FRAMEWORK.enabled()) { + + // Save the task information in the database so that the Globus monitoring + // service can continue checking on its progress. + + GlobusTaskInProgress taskInProgress = new GlobusTaskInProgress(taskIdentifier, GlobusTaskInProgress.TaskType.UPLOAD, dataset, endpoint.getClientToken(), token, new Timestamp(new Date().getTime())); + em.persist(taskInProgress); + + // Save the metadata entries that define the files that are being uploaded + // in the database. These entries will be used once/if the uploads + // completes successfully to add the files to the dataset. + + for (JsonObject fileJsonObject : filesJsonArray.getValuesAs(JsonObject.class)) { + ExternalFileUploadInProgress fileUploadRecord = new ExternalFileUploadInProgress(taskIdentifier, fileJsonObject.toString()); + + em.persist(fileUploadRecord); + } + + return; + } + + + // the old implementation that relies on looping continuosly, + // sleeping-then-checking the task status repeatedly: + // globus task status check + // (the method below performs continuous looped checks of the remote + // Globus API, monitoring it for as long as it takes for the task to + // finish one way or another!) task = globusStatusCheck(endpoint, taskIdentifier, globusLogger); + // @todo null check, or make sure it's never null String taskStatus = getTaskStatus(task); - globusLogger.info("Starting a globusUpload "); - if (ruleId != null) { // Transfer is complete, so delete rule deletePermission(ruleId, dataset, globusLogger); @@ -739,138 +791,11 @@ public void globusUpload(JsonObject jsonData, ApiToken token, Dataset dataset, S } else { try { - // - - List inputList = new ArrayList(); - JsonArray filesJsonArray = jsonData.getJsonArray("files"); - - if (filesJsonArray != null) { - String datasetIdentifier = dataset.getAuthorityForFileStorage() + "/" - + dataset.getIdentifierForFileStorage(); - - for (JsonObject fileJsonObject : filesJsonArray.getValuesAs(JsonObject.class)) { - - // storageIdentifier s3://gcs5-bucket1:1781cfeb8a7-748c270a227c from - // externalTool - String storageIdentifier = fileJsonObject.getString("storageIdentifier"); - String[] parts = DataAccess.getDriverIdAndStorageLocation(storageIdentifier); - String storeId = parts[0]; - // If this is an S3 store, we need to split out the bucket name - String[] bits = parts[1].split(":"); - String bucketName = ""; - if (bits.length > 1) { - bucketName = bits[0]; - } - String fileId = bits[bits.length - 1]; - - // fullpath s3://gcs5-bucket1/10.5072/FK2/3S6G2E/1781cfeb8a7-4ad9418a5873 - // or globus:///10.5072/FK2/3S6G2E/1781cfeb8a7-4ad9418a5873 - String fullPath = storeId + "://" + bucketName + "/" + datasetIdentifier + "/" + fileId; - String fileName = fileJsonObject.getString("fileName"); - - inputList.add(fileId + "IDsplit" + fullPath + "IDsplit" + fileName); - } - - // calculateMissingMetadataFields: checksum, mimetype - JsonObject newfilesJsonObject = calculateMissingMetadataFields(inputList, globusLogger); - JsonArray newfilesJsonArray = newfilesJsonObject.getJsonArray("files"); - logger.fine("Size: " + newfilesJsonArray.size()); - logger.fine("Val: " + JsonUtil.prettyPrint(newfilesJsonArray.getJsonObject(0))); - JsonArrayBuilder addFilesJsonData = Json.createArrayBuilder(); - - for (JsonObject fileJsonObject : filesJsonArray.getValuesAs(JsonObject.class)) { - - countAll++; - String storageIdentifier = fileJsonObject.getString("storageIdentifier"); - String fileName = fileJsonObject.getString("fileName"); - String[] parts = DataAccess.getDriverIdAndStorageLocation(storageIdentifier); - // If this is an S3 store, we need to split out the bucket name - String[] bits = parts[1].split(":"); - if (bits.length > 1) { - } - String fileId = bits[bits.length - 1]; - - List newfileJsonObject = IntStream.range(0, newfilesJsonArray.size()) - .mapToObj(index -> ((JsonObject) newfilesJsonArray.get(index)).getJsonObject(fileId)) - .filter(Objects::nonNull).collect(Collectors.toList()); - if (newfileJsonObject != null) { - logger.fine("List Size: " + newfileJsonObject.size()); - // if (!newfileJsonObject.get(0).getString("hash").equalsIgnoreCase("null")) { - JsonPatch path = Json.createPatchBuilder() - .add("/md5Hash", newfileJsonObject.get(0).getString("hash")).build(); - fileJsonObject = path.apply(fileJsonObject); - path = Json.createPatchBuilder() - .add("/mimeType", newfileJsonObject.get(0).getString("mime")).build(); - fileJsonObject = path.apply(fileJsonObject); - addFilesJsonData.add(fileJsonObject); - countSuccess++; - // } else { - // globusLogger.info(fileName - // + " will be skipped from adding to dataset by second API due to missing - // values "); - // countError++; - // } - } else { - globusLogger.info(fileName - + " will be skipped from adding to dataset by second API due to missing values "); - countError++; - } - } - - String newjsonData = addFilesJsonData.build().toString(); - - globusLogger.info("Successfully generated new JsonData for addFiles call"); - - /*String command = "curl -H \"X-Dataverse-key:" + token.getTokenString() + "\" -X POST " - + httpRequestUrl + "/api/datasets/:persistentId/addFiles?persistentId=doi:" - + datasetIdentifier + " -F jsonData='" + newjsonData + "'"; - System.out.println("*******====command ==== " + command);*/ - - // ToDo - refactor to call AddReplaceFileHelper.addFiles directly instead of - // calling API - - // a quick experimental AddReplaceFileHelper implementation: - AddReplaceFileHelper addFileHelper = new AddReplaceFileHelper( - dataverseRequestSvc.getDataverseRequest(), - this.ingestSvc, - this.datasetSvc, - this.dataFileSvc, - this.permissionSvc, - this.commandEngine, - this.systemConfig - ); - - Response addFilesResponse = addFileHelper.addFiles(newjsonData, dataset, authUser); - - if (Response.Status.OK.equals(addFilesResponse.getStatusInfo())) { - // if(!taskSkippedFiles) - if (countError == 0) { - userNotificationService.sendNotification((AuthenticatedUser) authUser, - new Timestamp(new Date().getTime()), UserNotification.Type.GLOBUSUPLOADCOMPLETED, - dataset.getId(), countSuccess + " files added out of " + countAll, true); - } else { - userNotificationService.sendNotification((AuthenticatedUser) authUser, - new Timestamp(new Date().getTime()), - UserNotification.Type.GLOBUSUPLOADCOMPLETEDWITHERRORS, dataset.getId(), - countSuccess + " files added out of " + countAll, true); - } - globusLogger.info("Successfully completed addFiles call "); - } else { - globusLogger.log(Level.SEVERE, - "******* Error while executing addFiles ", newjsonData); - } - - } - - globusLogger.info("Files processed: " + countAll.toString()); - globusLogger.info("Files added successfully: " + countSuccess.toString()); - globusLogger.info("Files failures: " + countError.toString()); - globusLogger.info("Finished upload via Globus job."); - + processUploadedFiles(filesJsonArray, dataset, authUser, globusLogger); } catch (Exception e) { - logger.info("Exception from globusUpload call "); + logger.info("Exception from processUploadedFiles call "); e.printStackTrace(); - globusLogger.info("Exception from globusUpload call " + e.getMessage()); + globusLogger.info("Exception from processUploadedFiles call " + e.getMessage()); datasetSvc.removeDatasetLocks(dataset, DatasetLock.Reason.EditInProgress); // } @@ -883,7 +808,164 @@ public void globusUpload(JsonObject jsonData, ApiToken token, Dataset dataset, S fileHandler.close(); } } + /** + * The code in this method is copy-and-pasted from the previous Borealis + * implemenation + * @todo see if it can be refactored and simplified a bit, the json manipulation + * specifically (?) + * @param filesJsonArray JsonArray containing files metadata entries as passed to /addGlobusFiles + * @param dataset the dataset + * @param authUser the user that should be be performing the addFiles call + * finalizing adding the files to the Dataset. Note that this + * user will need to be obtained from the saved api token, when this + * method is called via the TaskMonitoringService + * @param myLogger the Logger; if null, the main logger of the service bean will be used + * @throws IOException, InterruptedException, ExecutionException @todo may need to throw more exceptions (?) + */ + private void processUploadedFiles(JsonArray filesJsonArray, Dataset dataset, AuthenticatedUser authUser, Logger myLogger) throws IOException, InterruptedException, ExecutionException { + myLogger = myLogger != null ? myLogger : logger; + + Integer countAll = 0; + Integer countSuccess = 0; + Integer countError = 0; + + List inputList = new ArrayList(); + + String datasetIdentifier = dataset.getAuthorityForFileStorage() + "/" + + dataset.getIdentifierForFileStorage(); + + for (JsonObject fileJsonObject : filesJsonArray.getValuesAs(JsonObject.class)) { + + // storageIdentifier s3://gcs5-bucket1:1781cfeb8a7-748c270a227c from + // externalTool + String storageIdentifier = fileJsonObject.getString("storageIdentifier"); + String[] parts = DataAccess.getDriverIdAndStorageLocation(storageIdentifier); + String storeId = parts[0]; + // If this is an S3 store, we need to split out the bucket name + String[] bits = parts[1].split(":"); + String bucketName = ""; + if (bits.length > 1) { + bucketName = bits[0]; + } + String fileId = bits[bits.length - 1]; + + // fullpath s3://gcs5-bucket1/10.5072/FK2/3S6G2E/1781cfeb8a7-4ad9418a5873 + // or globus:///10.5072/FK2/3S6G2E/1781cfeb8a7-4ad9418a5873 + String fullPath = storeId + "://" + bucketName + "/" + datasetIdentifier + "/" + fileId; + String fileName = fileJsonObject.getString("fileName"); + + inputList.add(fileId + "IDsplit" + fullPath + "IDsplit" + fileName); + } + + // calculateMissingMetadataFields: checksum, mimetype + JsonObject newfilesJsonObject = calculateMissingMetadataFields(inputList, myLogger); + JsonArray newfilesJsonArray = newfilesJsonObject.getJsonArray("files"); + logger.fine("Size: " + newfilesJsonArray.size()); + logger.fine("Val: " + JsonUtil.prettyPrint(newfilesJsonArray.getJsonObject(0))); + JsonArrayBuilder addFilesJsonData = Json.createArrayBuilder(); + + for (JsonObject fileJsonObject : filesJsonArray.getValuesAs(JsonObject.class)) { + + countAll++; + String storageIdentifier = fileJsonObject.getString("storageIdentifier"); + String fileName = fileJsonObject.getString("fileName"); + String[] parts = DataAccess.getDriverIdAndStorageLocation(storageIdentifier); + // If this is an S3 store, we need to split out the bucket name + String[] bits = parts[1].split(":"); + if (bits.length > 1) { + } + String fileId = bits[bits.length - 1]; + + List newfileJsonObject = IntStream.range(0, newfilesJsonArray.size()) + .mapToObj(index -> ((JsonObject) newfilesJsonArray.get(index)).getJsonObject(fileId)) + .filter(Objects::nonNull).collect(Collectors.toList()); + if (newfileJsonObject != null) { + logger.fine("List Size: " + newfileJsonObject.size()); + // if (!newfileJsonObject.get(0).getString("hash").equalsIgnoreCase("null")) { + JsonPatch path = Json.createPatchBuilder() + .add("/md5Hash", newfileJsonObject.get(0).getString("hash")).build(); + fileJsonObject = path.apply(fileJsonObject); + path = Json.createPatchBuilder() + .add("/mimeType", newfileJsonObject.get(0).getString("mime")).build(); + fileJsonObject = path.apply(fileJsonObject); + addFilesJsonData.add(fileJsonObject); + countSuccess++; + // } else { + // globusLogger.info(fileName + // + " will be skipped from adding to dataset by second API due to missing + // values "); + // countError++; + // } + } else { + myLogger.info(fileName + + " will be skipped from adding to dataset in the final AddReplaceFileHelper.addFiles() call. "); + countError++; + } + } + + String newjsonData = addFilesJsonData.build().toString(); + + myLogger.info("Successfully generated new JsonData for addFiles call"); + + /*String command = "curl -H \"X-Dataverse-key:" + token.getTokenString() + "\" -X POST " + + httpRequestUrl + "/api/datasets/:persistentId/addFiles?persistentId=doi:" + + datasetIdentifier + " -F jsonData='" + newjsonData + "'"; + System.out.println("*******====command ==== " + command);*/ + // ToDo - refactor to call AddReplaceFileHelper.addFiles directly instead of + // calling API + // a quick experimental AddReplaceFileHelper implementation: + AddReplaceFileHelper addFileHelper = new AddReplaceFileHelper( + dataverseRequestSvc.getDataverseRequest(), + this.ingestSvc, + this.datasetSvc, + this.dataFileSvc, + this.permissionSvc, + this.commandEngine, + this.systemConfig + ); + + Response addFilesResponse = addFileHelper.addFiles(newjsonData, dataset, authUser); + + if (Response.Status.OK.equals(addFilesResponse.getStatusInfo())) { + // if(!taskSkippedFiles) + if (countError == 0) { + userNotificationService.sendNotification((AuthenticatedUser) authUser, + new Timestamp(new Date().getTime()), UserNotification.Type.GLOBUSUPLOADCOMPLETED, + dataset.getId(), countSuccess + " files added out of " + countAll, true); + } else { + userNotificationService.sendNotification((AuthenticatedUser) authUser, + new Timestamp(new Date().getTime()), + UserNotification.Type.GLOBUSUPLOADCOMPLETEDWITHERRORS, dataset.getId(), + countSuccess + " files added out of " + countAll, true); + } + myLogger.info("Successfully completed addFiles call "); + } else { + myLogger.log(Level.SEVERE, + "******* Error while executing addFiles ", newjsonData); + } + myLogger.info("Files processed: " + countAll); + myLogger.info("Files added successfully: " + countSuccess); + myLogger.info("Files failures: " + countError); + myLogger.info("Finished upload via Globus job."); + + } + + /** + * I don't think this method is needed at all. (I suspect that it's a remnant + * from the times when *multiple* individual /add calls needed to be performed + * for each file being added. So this was part of a framework that attempted + * to run this calls in parallel, potentially speeding things up (similarly to + * how the checksums are being calculated in parallel for multiple files). + * As of now, this method doesn't do anything "asynchronous" - there is one + * /addFiles call, and the method below will wait for it to complete, via the + * CompletableFuture.get(). (L.A.) + * @param curlCommand + * @param globusLogger + * @return + * @throws ExecutionException + * @throws InterruptedException + */ public String addFilesAsync(String curlCommand, Logger globusLogger) throws ExecutionException, InterruptedException { CompletableFuture addFilesFuture = CompletableFuture.supplyAsync(() -> { @@ -983,7 +1065,7 @@ public void globusDownload(String jsonData, Dataset dataset, User authUser) thro // If the rules_cache times out, the permission will be deleted. Presumably that // doesn't affect a // globus task status check - GlobusTask task = getTask(endpoint.getClientToken(), taskIdentifier, globusLogger); + GlobusTaskState task = getTask(endpoint.getClientToken(), taskIdentifier, globusLogger); String ruleId = getRuleId(endpoint, task.getOwner_id(), "r"); if (ruleId != null) { logger.fine("Found rule: " + ruleId); @@ -999,6 +1081,7 @@ public void globusDownload(String jsonData, Dataset dataset, User authUser) thro logger.warning("ruleId not found for taskId: " + taskIdentifier); } task = globusStatusCheck(endpoint, taskIdentifier, globusLogger); + // @todo null check String taskStatus = getTaskStatus(task); // Transfer is done (success or failure) so delete the rule @@ -1033,61 +1116,38 @@ public void globusDownload(String jsonData, Dataset dataset, User authUser) thro Executor executor = Executors.newFixedThreadPool(10); - private GlobusTask globusStatusCheck(GlobusEndpoint endpoint, String taskId, Logger globusLogger) + private GlobusTaskState globusStatusCheck(GlobusEndpoint endpoint, String taskId, Logger globusLogger) throws MalformedURLException { - boolean taskCompletion = false; - String status = ""; - GlobusTask task = null; + boolean taskCompleted = false; + GlobusTaskState task = null; int pollingInterval = SystemConfig.getIntLimitFromStringOrDefault( settingsSvc.getValueForKey(SettingsServiceBean.Key.GlobusPollingInterval), 50); do { try { globusLogger.info("checking globus transfer task " + taskId); Thread.sleep(pollingInterval * 1000); + // Call the (centralized) Globus API to check on the task state/status: task = getTask(endpoint.getClientToken(), taskId, globusLogger); - if (task != null) { - status = task.getStatus(); - if (status != null) { - // The task is in progress. - if (status.equalsIgnoreCase("ACTIVE")) { - if (task.getNice_status().equalsIgnoreCase("ok") - || task.getNice_status().equalsIgnoreCase("queued")) { - taskCompletion = false; - } else { - taskCompletion = true; - // status = "FAILED" + "#" + task.getNice_status() + "#" + - // task.getNice_status_short_description(); - } - } else { - // The task is either succeeded, failed or inactive. - taskCompletion = true; - // status = status + "#" + task.getNice_status() + "#" + - // task.getNice_status_short_description(); - } - } else { - // status = "FAILED"; - taskCompletion = true; - } - } else { - // status = "FAILED"; - taskCompletion = true; - } + taskCompleted = GlobusUtil.isTaskCompleted(task); } catch (Exception ex) { ex.printStackTrace(); } - } while (!taskCompletion); + } while (!taskCompleted); globusLogger.info("globus transfer task completed successfully"); return task; } - - private String getTaskStatus(GlobusTask task) { + + private String getTaskStatus(GlobusTaskState task) { String status = null; if (task != null) { status = task.getStatus(); if (status != null) { // The task is in progress but is not ok or queued + // (L.A.) I think the assumption here is that this method is called + // exclusively on tasks that have already completed. So that's why + // it is safe to assume that "ACTIVE" means "FAILED". if (status.equalsIgnoreCase("ACTIVE")) { status = "FAILED" + "#" + task.getNice_status() + "#" + task.getNice_status_short_description(); } else { @@ -1158,7 +1218,16 @@ private FileDetailsHolder calculateDetails(String id, Logger globusLogger) String fileName = id.split("IDsplit")[2]; // ToDo: what if the file does not exist in s3 + // (L.A.) - good question. maybe it should call .open and .exists() here? + // otherwise, there doesn't seem to be any diagnostics as to which + // files uploaded successfully and which failed (?) + // ... however, any partially successful upload cases should be + // properly handled later, during the .addFiles() call - only + // the files that actually exists in storage remotely will be + // added to the dataset permanently then. // ToDo: what if checksum calculation failed + // (L.A.) - this appears to have been addressed - by using "Not available in Dataverse" + // in place of a checksum. String storageDriverId = DataAccess.getDriverIdAndStorageLocation(fullPath)[0]; @@ -1180,8 +1249,6 @@ private FileDetailsHolder calculateDetails(String id, Logger globusLogger) logger.info(ex.getMessage()); Thread.sleep(5000); } - - } while (count < 3); } @@ -1311,5 +1378,57 @@ public void writeGuestbookAndStartTransfer(GuestbookResponse guestbookResponse, } } } + + public List findAllOngoingTasks() { + return em.createQuery("select object(o) from GlobusTaskInProgress as o order by o.startTime", GlobusTaskInProgress.class).getResultList(); + } + + public void deleteTask(GlobusTaskInProgress task) { + GlobusTaskInProgress mergedTask = em.merge(task); + em.remove(mergedTask); + } + + public List findExternalUploadsByTaskId(String taskId) { + return em.createNamedQuery("ExternalFileUploadInProgress.findByTaskId").setParameter("taskId", taskId).getResultList(); + } + + // @todo this may or may not need to be async (?) + public void addFilesOnSuccess(GlobusTaskInProgress globusTask) { + List fileUploadsInProgress = findExternalUploadsByTaskId(globusTask.getTaskId()); + + if (fileUploadsInProgress == null || fileUploadsInProgress.size() < 1) { + // @todo log error message; do nothing + return; + } + Dataset dataset = globusTask.getDataset(); + AuthenticatedUser authUser = authSvc.lookupUser(globusTask.getApiToken()); + if (authUser == null) { + // @todo log error message; do nothing + return; + } + + JsonArrayBuilder filesJsonArrayBuilder = Json.createArrayBuilder(); + + for (ExternalFileUploadInProgress pendingFile : fileUploadsInProgress) { + String jsonInfoString = pendingFile.getFileInfo(); + JsonObject fileObject = JsonUtil.getJsonObject(jsonInfoString); + filesJsonArrayBuilder.add(fileObject); + } + + JsonArray filesJsonArray = filesJsonArrayBuilder.build(); + + if (filesJsonArray == null || filesJsonArray.size() < 1) { + // @todo log error message; do nothing + return; + } + + try { + processUploadedFiles(filesJsonArray, dataset, authUser, null); + } catch (Exception ex) { + // @todo log error message; make sure the error notification to the + // has been sent (may or may not have already been sent inside the + // method above). + } + } } diff --git a/src/main/java/edu/harvard/iq/dataverse/globus/GlobusTask.java b/src/main/java/edu/harvard/iq/dataverse/globus/GlobusTaskState.java similarity index 93% rename from src/main/java/edu/harvard/iq/dataverse/globus/GlobusTask.java rename to src/main/java/edu/harvard/iq/dataverse/globus/GlobusTaskState.java index c2b01779f4a..b5db20d46c1 100644 --- a/src/main/java/edu/harvard/iq/dataverse/globus/GlobusTask.java +++ b/src/main/java/edu/harvard/iq/dataverse/globus/GlobusTaskState.java @@ -1,6 +1,10 @@ package edu.harvard.iq.dataverse.globus; -public class GlobusTask { +/** + * This class is used to store the state of an ongoing Globus task (transfer) + * as reported by the Globus task API. + */ +public class GlobusTaskState { private String DATA_TYPE; private String type; diff --git a/src/main/java/edu/harvard/iq/dataverse/globus/GlobusUtil.java b/src/main/java/edu/harvard/iq/dataverse/globus/GlobusUtil.java index 92cf8ac7704..67594ad1a5e 100644 --- a/src/main/java/edu/harvard/iq/dataverse/globus/GlobusUtil.java +++ b/src/main/java/edu/harvard/iq/dataverse/globus/GlobusUtil.java @@ -30,4 +30,37 @@ public static JsonObject getFilesMap(List dataFiles, Dataset d) { } return filesBuilder.build(); } + + public static boolean isTaskCompleted(GlobusTaskState task) { + if (task != null) { + String status = task.getStatus(); + if (status != null) { + if (status.equalsIgnoreCase("ACTIVE")) { + if (task.getNice_status().equalsIgnoreCase("ok") + || task.getNice_status().equalsIgnoreCase("queued")) { + return false; + } + } + } + } + return true; + } + + public static boolean isTaskSucceeded(GlobusTaskState task) { + String status = null; + if (task != null) { + status = task.getStatus(); + if (status != null) { + status = status.toUpperCase(); + if (status.equals("ACTIVE") || status.startsWith("FAILED") || status.startsWith("INACTIVE")) { + // There are cases where a failed task may still be showing + // as "ACTIVE". But it is definitely safe to assume that it + // has not completed *successfully*. + return false; + } + return true; + } + } + return false; + } } \ No newline at end of file diff --git a/src/main/java/edu/harvard/iq/dataverse/ingest/IngestServiceBean.java b/src/main/java/edu/harvard/iq/dataverse/ingest/IngestServiceBean.java index 9bacafd173f..3f76a319902 100644 --- a/src/main/java/edu/harvard/iq/dataverse/ingest/IngestServiceBean.java +++ b/src/main/java/edu/harvard/iq/dataverse/ingest/IngestServiceBean.java @@ -345,12 +345,13 @@ public List saveAndAddFilesToDataset(DatasetVersion version, StorageIO dataAccess = DataAccess.getStorageIO(dataFile); //Populate metadata dataAccess.open(DataAccessOption.READ_ACCESS); - + // (this will make a remote call to check if the file exists + // and obtain its size) confirmedFileSize = dataAccess.getSize(); // For directly-uploaded files, we will perform the file size // limit and quota checks here. Perform them *again*, in - // some cases: a directly uploaded files have already been + // some cases: files directly uploaded via the UI have already been // checked (for the sake of being able to reject the upload // before the user clicks "save"). But in case of direct // uploads via API, these checks haven't been performed yet, diff --git a/src/main/java/edu/harvard/iq/dataverse/settings/FeatureFlags.java b/src/main/java/edu/harvard/iq/dataverse/settings/FeatureFlags.java index 021977ff8c6..746e6e3b75d 100644 --- a/src/main/java/edu/harvard/iq/dataverse/settings/FeatureFlags.java +++ b/src/main/java/edu/harvard/iq/dataverse/settings/FeatureFlags.java @@ -91,6 +91,11 @@ public enum FeatureFlags { * @since Dataverse 6.3 */ DISABLE_RETURN_TO_AUTHOR_REASON("disable-return-to-author-reason"), + /** + * TEMPORARY feature flag for the new Globus upload framework (will only be + * used for testing). + */ + GLOBUS_USE_EXPERIMENTAL_ASYNC_FRAMEWORK("globus-use-experimental-async-framework"), ; final String flag; diff --git a/src/main/java/edu/harvard/iq/dataverse/util/SystemConfig.java b/src/main/java/edu/harvard/iq/dataverse/util/SystemConfig.java index f9801419e47..7417a5db4d4 100644 --- a/src/main/java/edu/harvard/iq/dataverse/util/SystemConfig.java +++ b/src/main/java/edu/harvard/iq/dataverse/util/SystemConfig.java @@ -82,6 +82,7 @@ public class SystemConfig { private String buildNumber = null; private static final String JVM_TIMER_SERVER_OPTION = "dataverse.timerServer"; + private static final String JVM_GLOBUS_TASK_MONITORING_OPTION = "dataverse.globus.taskMonitoringServer"; private static final long DEFAULT_GUESTBOOK_RESPONSES_DISPLAY_LIMIT = 5000L; private static final long DEFAULT_THUMBNAIL_SIZE_LIMIT_IMAGE = 3000000L; // 3 MB @@ -545,6 +546,14 @@ public boolean isTimerServer() { } return false; } + + public boolean isGlobusTaskMonitoringServer() { + String optionValue = System.getProperty(JVM_GLOBUS_TASK_MONITORING_OPTION); + if ("true".equalsIgnoreCase(optionValue)) { + return true; + } + return false; + } public String getFooterCopyrightAndYear() { return BundleUtil.getStringFromBundle("footer.copyright", Arrays.asList(Year.now().getValue() + ""));