Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resumable scheduled tasks #36708

Merged
merged 8 commits into from
Jan 17, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,11 @@
class Scheduler
{
private const LOG_TEXT = [
Status::OK => 'COM_SCHEDULER_SCHEDULER_TASK_COMPLETE',
Status::NO_LOCK => 'COM_SCHEDULER_SCHEDULER_TASK_LOCKED',
Status::NO_RUN => 'COM_SCHEDULER_SCHEDULER_TASK_UNLOCKED',
Status::NO_ROUTINE => 'COM_SCHEDULER_SCHEDULER_TASK_ROUTINE_NA',
Status::OK => 'COM_SCHEDULER_SCHEDULER_TASK_COMPLETE',
Status::WILL_RESUME => 'COM_SCHEDULER_SCHEDULER_TASK_WILL_RESUME',
Status::NO_LOCK => 'COM_SCHEDULER_SCHEDULER_TASK_LOCKED',
Status::NO_RUN => 'COM_SCHEDULER_SCHEDULER_TASK_UNLOCKED',
Status::NO_ROUTINE => 'COM_SCHEDULER_SCHEDULER_TASK_ROUTINE_NA',
];

/**
Expand Down Expand Up @@ -158,7 +159,7 @@ public function runTask(array $options): ?Task

if (\array_key_exists($exitCode, self::LOG_TEXT))
{
$level = $exitCode === Status::OK ? 'info' : 'warning';
$level = in_array($exitCode, [Status::OK, Status::WILL_RESUME]) ? 'info' : 'warning';
$task->log(Text::sprintf(self::LOG_TEXT[$exitCode], $taskId, $duration, $netDuration), $level);

return $task;
Expand Down
13 changes: 13 additions & 0 deletions administrator/components/com_scheduler/src/Task/Status.php
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,19 @@ abstract class Status
*/
public const KNOCKOUT = 5;

/**
* Exit code used when a task needs to resume (reschedule it to run a.s.a.p.).
*
* Use this for long running tasks, e.g. batch processing of hundreds or thousands of files,
* sending newsletters with thousands of subscribers etc. These are tasks which might run out of
* memory and/or hit a time limit when lazy scheduling or web triggering of tasks is being used.
* Split them in smaller batches which return Status::WILL_RESUME. When the last batch is
nikosdion marked this conversation as resolved.
Show resolved Hide resolved
* executed return Status::OK.
*
* @since 4.1.0
*/
public const WILL_RESUME = 123;

/**
* Exit code used when a task times out.
*
Expand Down
38 changes: 29 additions & 9 deletions administrator/components/com_scheduler/src/Task/Task.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// Restrict direct access
\defined('_JEXEC') or die;

use DateInterval;
nikosdion marked this conversation as resolved.
Show resolved Hide resolved
use Joomla\CMS\Application\CMSApplication;
use Joomla\CMS\Component\ComponentHelper;
use Joomla\CMS\Event\AbstractEvent;
Expand Down Expand Up @@ -116,9 +117,10 @@ class Task implements LoggerAwareInterface
* @since 4.1.0
*/
protected const EVENTS_MAP = [
Status::OK => 'onTaskExecuteSuccess',
Status::NO_ROUTINE => 'onTaskRoutineNotFound',
'NA' => 'onTaskExecuteFailure',
Status::OK => 'onTaskExecuteSuccess',
Status::NO_ROUTINE => 'onTaskRoutineNotFound',
Status::WILL_RESUME => 'onTaskRoutineWillResume',
'NA' => 'onTaskExecuteFailure',
];

/**
Expand Down Expand Up @@ -246,11 +248,29 @@ public function run(): bool
// @todo make the ExecRuleHelper usage less ugly, perhaps it should be composed into Task
// Update object state.
$this->set('last_execution', Factory::getDate('@' . (int) $this->snapshot['taskStart'])->toSql());
$this->set('next_execution', (new ExecRuleHelper($this->taskRegistry->toObject()))->nextExec());
$this->set('last_exit_code', $this->snapshot['status']);
$this->set('times_executed', $this->get('times_executed') + 1);

if ($this->snapshot['status'] !== Status::OK)
if ($this->snapshot['status'] !== Status::WILL_RESUME)
{
$this->set('next_execution', (new ExecRuleHelper($this->taskRegistry->toObject()))->nextExec());
$this->set('times_executed', $this->get('times_executed') + 1);
}
else
{
/**
* Resumable tasks need special handling.
*
* The are rescheduled as soon as possible to let their next step to be executed without
nikosdion marked this conversation as resolved.
Show resolved Hide resolved
* a very large temporal gap to the previous step.
*
* Moreover, the times executed does NOT increase for each step. It will increase once,
* after the last step, when they return Status::OK.
*/
$this->set('next_execution', Factory::getDate('now', 'UTC')->sub(new DateInterval('PT1M'))->toSql());
}

// The only acceptable "successful" statuses are either clean exit or resuming execution.
if (!in_array($this->snapshot['status'], [Status::WILL_RESUME, Status::OK]))
{
$this->set('times_failed', $this->get('times_failed') + 1);
}
Expand Down Expand Up @@ -302,7 +322,7 @@ public function acquireLock(): bool
$now = Factory::getDate('now', 'GMT');

$timeout = ComponentHelper::getParams('com_scheduler')->get('timeout', 300);
$timeout = new \DateInterval(sprintf('PT%dS', $timeout));
$timeout = new DateInterval(sprintf('PT%dS', $timeout));
$timeoutThreshold = (clone $now)->sub($timeout)->toSql();
$now = $now->toSql();

Expand Down Expand Up @@ -391,7 +411,7 @@ public function releaseLock(bool $update = true): bool
->bind(':times_executed', $timesExecuted)
->bind(':times_failed', $timesFailed);

if ($exitCode !== Status::OK)
if (!in_array($exitCode, [Status::OK, Status::WILL_RESUME]))
{
$query->set('times_failed = t.times_failed + 1');
}
Expand Down Expand Up @@ -495,7 +515,7 @@ protected function dispatchExitEvent(): void
*/
public function isSuccess(): bool
{
return ($this->snapshot['status'] ?? null) === Status::OK;
return in_array(($this->snapshot['status'] ?? null), [Status::OK, Status::WILL_RESUME]);
}

/**
Expand Down
1 change: 1 addition & 0 deletions administrator/language/en-GB/com_scheduler.ini
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ COM_SCHEDULER_SCHEDULER_TASK_ROUTINE_NA="Task#%1$02d has no corresponding plugin
COM_SCHEDULER_SCHEDULER_TASK_START="Running task#%1$02d '%2$s'."
COM_SCHEDULER_SCHEDULER_TASK_UNKNOWN_EXIT="Task#%1$02d exited with code %4$d in %2$.2f (net %3$.2f) seconds."
COM_SCHEDULER_SCHEDULER_TASK_UNLOCKED="Task#%1$02d was unlocked."
COM_SCHEDULER_SCHEDULER_TASK_WILL_RESUME="Task#%1$02d needs to perform more work."
COM_SCHEDULER_SELECT_INTERVAL_MINUTES="- Select interval in Minutes -"
COM_SCHEDULER_SELECT_TASK_TYPE="Select task, %s"
COM_SCHEDULER_SELECT_TYPE="- Task Type -"
Expand Down
4 changes: 4 additions & 0 deletions administrator/language/en-GB/plg_task_demotasks.ini
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ PLG_TASK_DEMO_TASKS_STRESS_MEMORY_DESC="What happens to a task when the PHP memo
PLG_TASK_DEMO_TASKS_STRESS_MEMORY_OVERRIDE_DESC="What happens to a task when the system memory is exhausted?"
PLG_TASK_DEMO_TASKS_STRESS_MEMORY_OVERRIDE_TITLE="Stress Memory, Override Limit"
PLG_TASK_DEMO_TASKS_STRESS_MEMORY_TITLE="Stress Memory"
PLG_TASK_DEMO_TASKS_RESUMABLE_TITLE="Resumable task"
PLG_TASK_DEMO_TASKS_RESUMABLE_DESC="A simple task to demonstrate resumable task behaviour."
PLG_TASK_DEMO_TASKS_RESUMABLE_STEPS_LABEL="Total number of steps"
PLG_TASK_DEMO_TASKS_RESUMABLE_TIMEOUT_LABEL="Delay per step (seconds)"
PLG_TASK_DEMO_TASKS_TASK_SLEEP_DESC="Sleep, do nothing for x seconds."
PLG_TASK_DEMO_TASKS_TASK_SLEEP_ROUTINE_END_LOG_MESSAGE="TestTask1 return code is: %1$d. Processing Time: %2$.2f seconds"
PLG_TASK_DEMO_TASKS_TASK_SLEEP_TITLE="Demo Task - Sleep"
Expand Down
9 changes: 5 additions & 4 deletions libraries/src/Console/TasksRunCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,11 @@ protected function doExecute(InputInterface $input, OutputInterface $output): in
* load the namespace when it's time to do that (why?)
*/
static $outTextMap = [
Status::OK => 'Task#%1$02d \'%2$s\' processed in %3$.2f seconds.',
Status::NO_RUN => '<warning>Task#%1$02d \'%2$s\' failed to run. Is it already running?</warning>',
Status::NO_ROUTINE => '<error>Task#%1$02d \'%2$s\' is orphaned! Visit the backend to resolve.</error>',
'N/A' => '<error>Task#%1$02d \'%2$s\' exited with code %4$d in %3$.2f seconds.</error>',
Status::OK => 'Task#%1$02d \'%2$s\' processed in %3$.2f seconds.',
Status::WILL_RESUME => '<notice>Task#%1$02d \'%2$s\' ran for %3$.2f seconds, will resume next time.</notice>',
Status::NO_RUN => '<warning>Task#%1$02d \'%2$s\' failed to run. Is it already running?</warning>',
Status::NO_ROUTINE => '<error>Task#%1$02d \'%2$s\' is orphaned! Visit the backend to resolve.</error>',
'N/A' => '<error>Task#%1$02d \'%2$s\' exited with code %4$d in %3$.2f seconds.</error>',
];

$this->configureIo($input, $output);
Expand Down
60 changes: 60 additions & 0 deletions plugins/task/demotasks/demotasks.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
use Joomla\CMS\Plugin\CMSPlugin;
use Joomla\Component\Scheduler\Administrator\Event\ExecuteTaskEvent;
use Joomla\Component\Scheduler\Administrator\Task\Status;
use Joomla\Component\Scheduler\Administrator\Task\Task;
use Joomla\Component\Scheduler\Administrator\Traits\TaskPluginTrait;
use Joomla\Event\SubscriberInterface;

Expand Down Expand Up @@ -44,6 +45,11 @@ class PlgTaskDemotasks extends CMSPlugin implements SubscriberInterface
'langConstPrefix' => 'PLG_TASK_DEMO_TASKS_STRESS_MEMORY_OVERRIDE',
'method' => 'stressMemoryRemoveLimit',
],
'demoTask_r4.resumable' => [
'langConstPrefix' => 'PLG_TASK_DEMO_TASKS_RESUMABLE',
'method' => 'resumable',
'form' => 'testTaskForm',
],
];

/**
Expand All @@ -68,6 +74,60 @@ public static function getSubscribedEvents(): array
];
}

/**
* Sample resumable task.
*
* Whether the task will resume is random. There's a 40% chance of finishing every time it runs.
*
* You can use this is a template to create long running tasks which can detect an impeding
nikosdion marked this conversation as resolved.
Show resolved Hide resolved
* timeout condition, return Status::WILL_RESUME and resume execution next time they are called.
*
* @param ExecuteTaskEvent $event The event we are handling
*
* @return integer
*
* @throws Exception
* @since 4.1.0
nikosdion marked this conversation as resolved.
Show resolved Hide resolved
*/
private function resumable(ExecuteTaskEvent $event): int
{
/** @var Task $task */
$task = $event->getArgument('subject');
$timeout = (int) $event->getArgument('params')->timeout ?? 1;

$lastStatus = $task->get('last_exit_code', Status::OK);

// This is how you detect if you are resuming a task or starting it afresh
if ($lastStatus === Status::WILL_RESUME)
{
$this->logTask(sprintf('Resuming task %d', $task->get('id')));
}
else
{
$this->logTask(sprintf('Starting new task %d', $task->get('id')));
}

// Sample task body; we are simply sleeping for some time.
$this->logTask(sprintf('Starting %ds timeout', $timeout));
sleep($timeout);
$this->logTask(sprintf('%ds timeout over!', $timeout));

// Should I resume the task in the next step (randomly decided)?
$willResume = random_int(0, 5) < 4;

// Log our intention to resume or not and return the appropriate exit code.
if ($willResume)
{
$this->logTask(sprintf('Task %d will resume', $task->get('id')));
}
else
{
$this->logTask(sprintf('Task %d is now complete', $task->get('id')));
}

return $willResume ? Status::WILL_RESUME : Status::OK;
}

/**
* @param ExecuteTaskEvent $event The `onExecuteTask` event.
*
Expand Down