diff --git a/src/main/java/edu/harvard/iq/dataverse/ExternalFileUploadInProgress.java b/src/main/java/edu/harvard/iq/dataverse/ExternalFileUploadInProgress.java new file mode 100644 index 00000000000..dadde64608c --- /dev/null +++ b/src/main/java/edu/harvard/iq/dataverse/ExternalFileUploadInProgress.java @@ -0,0 +1,131 @@ +/* + * Click nbfs://nbhost/SystemFileSystem/Templates/Licenses/license-default.txt to change this license + * Click nbfs://nbhost/SystemFileSystem/Templates/Classes/Class.java to edit this template + */ +package edu.harvard.iq.dataverse; + +import jakarta.persistence.Column; +import jakarta.persistence.ManyToOne; +import jakarta.persistence.NamedQueries; +import jakarta.persistence.NamedQuery; +import java.io.Serializable; +import javax.persistence.Entity; +import javax.persistence.GeneratedValue; +import javax.persistence.GenerationType; +import javax.persistence.Id; + +/** + * + * @author landreev + * + * The name of the class is provisional. I'm open to better-sounding alternatives, + * if anyone can think of any. + * But I wanted to avoid having the word "Globus" in the entity name. I'm adding + * it specifically for the Globus use case. But I'm guessing there's a chance + * this setup may come in handy for other types of datafile uploads that happen + * externally. (?) + */ +@NamedQueries({ + @NamedQuery( name="ExternalFileUploadInProgress.deleteByTaskId", + query="DELETE FROM ExternalFileUploadInProgress f WHERE f.taskId=:taskId"), + @NamedQuery(name = "ExternalFileUploadInProgress.findByTaskId", + query = "SELECT f FROM ExternalFileUploadInProgress f WHERE f.taskId=:taskId")}) +@Entity +public class ExternalFileUploadInProgress implements Serializable { + + private static final long serialVersionUID = 1L; + @Id + @GeneratedValue(strategy = GenerationType.IDENTITY) + private Long id; + + public Long getId() { + return id; + } + + public void setId(Long id) { + this.id = id; + } + + /** + * Rather than saving various individual fields defining the datafile, + * which would essentially replicate the DataFile table, we are simply + * storing the full json record as passed to the API here. + */ + @Column(columnDefinition = "TEXT", nullable = false) + private String fileInfo; + + /** + * This is Globus-specific task id associated with the upload in progress + */ + @Column(nullable = false) + private String taskId; + + /** + * The Dataset to which the files are being added. + * (@todo may not be necessary? - since the corresponding task is tied to a specific + * dataset already?) + */ + /*@ManyToOne + private Dataset dataset;*/ + + /*public ExternalFileUploadInProgress(String taskId, Dataset dataset, String fileInfo) { + this.taskId = taskId; + this.fileInfo = fileInfo; + this.dataset = dataset; + }*/ + + public ExternalFileUploadInProgress(String taskId, String fileInfo) { + this.taskId = taskId; + this.fileInfo = fileInfo; + } + + public String getFileInfo() { + return fileInfo; + } + + public void setFileInfo(String fileInfo) { + this.fileInfo = fileInfo; + } + + public String getTaskId() { + return taskId; + } + + public void setTaskId(String taskId) { + this.taskId = taskId; + } + + /*public Dataset getDataset() { + return dataset; + } + + public void setDataset(Dataset dataset) { + this.dataset = dataset; + }*/ + + @Override + public int hashCode() { + int hash = 0; + hash += (id != null ? id.hashCode() : 0); + return hash; + } + + @Override + public boolean equals(Object object) { + // TODO: Warning - this method won't work in the case the id fields are not set + if (!(object instanceof ExternalFileUploadInProgress)) { + return false; + } + ExternalFileUploadInProgress other = (ExternalFileUploadInProgress) object; + if ((this.id == null && other.id != null) || (this.id != null && !this.id.equals(other.id))) { + return false; + } + return true; + } + + @Override + public String toString() { + return "edu.harvard.iq.dataverse.ExternalFileUploadInProgress[ id=" + id + " ]"; + } + +} diff --git a/src/main/java/edu/harvard/iq/dataverse/globus/GlobusTaskInProgress.java b/src/main/java/edu/harvard/iq/dataverse/globus/GlobusTaskInProgress.java new file mode 100644 index 00000000000..7b12ec0a3ad --- /dev/null +++ b/src/main/java/edu/harvard/iq/dataverse/globus/GlobusTaskInProgress.java @@ -0,0 +1,188 @@ +/* + * Click nbfs://nbhost/SystemFileSystem/Templates/Licenses/license-default.txt to change this license + * Click nbfs://nbhost/SystemFileSystem/Templates/Classes/Class.java to edit this template + */ +package edu.harvard.iq.dataverse.globus; + +import edu.harvard.iq.dataverse.Dataset; +import edu.harvard.iq.dataverse.authorization.users.ApiToken; +import jakarta.persistence.Column; +import jakarta.persistence.EnumType; +import jakarta.persistence.Enumerated; +import jakarta.persistence.ManyToOne; +import java.io.Serializable; +import java.sql.Timestamp; +import java.util.Arrays; +import javax.persistence.Entity; +import javax.persistence.GeneratedValue; +import javax.persistence.GenerationType; +import javax.persistence.Id; + +/** + * + * @author landreev + */ +@Entity +public class GlobusTaskInProgress implements Serializable { + + private static final long serialVersionUID = 1L; + @Id + @GeneratedValue(strategy = GenerationType.IDENTITY) + private Long id; + + /** + * Globus-side identifier of the task in progress, upload or download + */ + @Column(nullable = false) + private String taskId; + + GlobusTaskInProgress(String taskIdentifier, TaskType taskType, Dataset dataset, String clientToken, ApiToken token, Timestamp timestamp) { + throw new UnsupportedOperationException("Not supported yet."); // Generated from nbfs://nbhost/SystemFileSystem/Templates/Classes/Code/GeneratedMethodBody + } + + /** + * I was considering giving this enum type a more specific name "TransferType" + * - but maybe there will be another use case where we need to keep track of + * Globus tasks that are not data transfers (?) + */ + public enum TaskType { + + UPLOAD("UPLOAD"), + DOWNLOAD("DOWNLOAD"); + + private final String text; + + private TaskType(final String text) { + this.text = text; + } + + public static TaskType fromString(String text) { + if (text != null) { + for (TaskType taskType : TaskType.values()) { + if (text.equals(taskType.text)) { + return taskType; + } + } + } + throw new IllegalArgumentException("TaskType must be one of these values: " + Arrays.asList(TaskType.values()) + "."); + } + + @Override + public String toString() { + return text; + } + } + + @Column(nullable = false) + @Enumerated(EnumType.STRING) + private TaskType taskType; + + /** + * Globus API token that should be used to monitor the status of the task + */ + @Column(nullable = false) + private String globusToken; + + /** + * This is the Dataverse API token of the user who initiated the Globus task + */ + private String apiToken; + + @ManyToOne + private Dataset dataset; + + @Column( nullable = false ) + private Timestamp startTime; + + + public GlobusTaskInProgress(String taskId, TaskType taskType, Dataset dataset, String clientToken, String apiToken, Timestamp startTime) { + this.taskId = taskId; + this.taskType = taskType; + this.globusToken = clientToken; + this.apiToken = apiToken; + this.dataset = dataset; + this.startTime = startTime; + } + + public Long getId() { + return id; + } + + public void setId(Long id) { + this.id = id; + } + + public String getTaskId() { + return taskId; + } + + public void setTaskId(String taskId) { + this.taskId = taskId; + } + + public TaskType getTaskType() { + return taskType; + } + + public void setTaskType(TaskType taskType) { + this.taskType = taskType; + } + + public String getGlobusToken() { + return globusToken; + } + + public void setGlobusToken(String clientToken) { + this.globusToken = clientToken; + } + + public String getApiToken() { + return apiToken; + } + + public void setApiToken(String apiToken) { + this.apiToken = apiToken; + } + + public Dataset getDataset() { + return dataset; + } + + public void setDataset(Dataset dataset) { + this.dataset = dataset; + } + + public Timestamp getStartTime() { + return startTime; + } + + public void setStartTime(Timestamp startTime) { + this.startTime = startTime; + } + + @Override + public int hashCode() { + int hash = 0; + hash += (id != null ? id.hashCode() : 0); + return hash; + } + + @Override + public boolean equals(Object object) { + // TODO: Warning - this method won't work in the case the id fields are not set + if (!(object instanceof GlobusTaskInProgress)) { + return false; + } + GlobusTaskInProgress other = (GlobusTaskInProgress) object; + if ((this.id == null && other.id != null) || (this.id != null && !this.id.equals(other.id))) { + return false; + } + return true; + } + + @Override + public String toString() { + return "edu.harvard.iq.dataverse.globus.GlobusTaskInProgress[ id=" + id + " ]"; + } + +} diff --git a/src/main/java/edu/harvard/iq/dataverse/globus/TaskMonitoringServiceBean.java b/src/main/java/edu/harvard/iq/dataverse/globus/TaskMonitoringServiceBean.java new file mode 100644 index 00000000000..da31ded90db --- /dev/null +++ b/src/main/java/edu/harvard/iq/dataverse/globus/TaskMonitoringServiceBean.java @@ -0,0 +1,78 @@ +/* + * Click nbfs://nbhost/SystemFileSystem/Templates/Licenses/license-default.txt to change this license + * Click nbfs://nbhost/SystemFileSystem/Templates/Classes/Class.java to edit this template + */ +package edu.harvard.iq.dataverse.globus; + +import edu.harvard.iq.dataverse.settings.SettingsServiceBean; +import edu.harvard.iq.dataverse.util.SystemConfig; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.Resource; +import jakarta.ejb.EJB; +import jakarta.ejb.Singleton; +import jakarta.ejb.Startup; +import jakarta.enterprise.concurrent.ManagedScheduledExecutorService; +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * + * This Singleton monitors ongoing Globus tasks by checking with the centralized + * Globus API on the status of all the registered ongoing tasks. + * When a successful completion of a task is detected, the service triggers + * the execution of the associated tasks (for example, finalizing adding datafiles + * to the dataset on completion of a remote Globus upload). When a task fails or + * terminates abnormally, a message is logged and the task record is deleted + * from the database. + * + * @author landreev + */ +@Singleton +@Startup +public class TaskMonitoringServiceBean { + @Resource + ManagedScheduledExecutorService scheduler; + + @EJB + SystemConfig systemConfig; + @EJB + SettingsServiceBean settingsSvc; + @EJB + GlobusServiceBean globusService; + + @PostConstruct + public void init() { + if (systemConfig.isGlobusTaskMonitoringServer()) { + int pollingInterval = SystemConfig.getIntLimitFromStringOrDefault( + settingsSvc.getValueForKey(SettingsServiceBean.Key.GlobusPollingInterval), 60); + this.scheduler.scheduleAtFixedRate(this::checkOngoingTasks, + 0, pollingInterval, + TimeUnit.SECONDS); + } + } + + /** + * This method will be executed on a timer-like schedule, continuously + * monitoring all the ongoing external Globus tasks (transfers). + * @todo make sure the executions do not overlap/stack up + */ + public void checkOngoingTasks() { + List tasks = globusService.findAllOngoingTasks(); + + tasks.forEach(t -> { + GlobusTaskState retrieved = globusService.getTask(t.getGlobusToken(), t.getTaskId(), null); + if (GlobusUtil.isTaskCompleted(retrieved)) { + if (GlobusUtil.isTaskSucceeded(retrieved)) { + // Do our thing, finalize adding the files to the dataset + globusService.addFilesOnSuccess(t); + } + // Whether it finished successfully, or failed in the process, + // there's no need to keep monitoring this task, so we can + // delete it. + globusService.deleteTask(t); + // @todo double-check that the locks have been properly handled + } + }); + } + +}