-
Queue jobs
- To run now
- To run later
- To run after other jobs complete
- To run after other criteria are met
-
Run jobs
- As quickly and fairly as possible
- Across multiple servers
- Without using more resources than are available
Q::push(
$job_name = 'job_to_run',
// 'silex_service'
// 'Some\Class\Name'
// 'silex_service#methodName'
// 'Some\Class\Name#methodName'
$job_params = array(
'param1' => 'value1',
'param2' => 'value2',
),
$job_options = array(
'queue_name' => 'default',
'start_time' => new DateTime(),
// '+1 hour'
// '2014-07-20 00:15:00'
// 'now'
'recur' => array(
// how often to run the job
'interval' => '12:00:00',
// optional, will use the last run time
// without a base_time
'base_time' => '2014-07-20 03:00:00',
// a name to refer to the job by
'known_as' => 'job_to_run:every_12_hours',
),
'depends_on' => array(
12, // job id
$job, // job object
'#isReadyToRun',
'Some\Other\Class#isReadyToRun',
'silex_service#isReadyToRun',
),
'priority' => 10,
// use `nice`'s semantics:
// - lower numbers run sooner
// - allow -20 to 20
'max_failures' => 3,
)
);
// other $job_name possibility
$job_name = array(
'type' => 'pimple',
// 'silex' - same as 'pimple'
// 'class'
'name' => 'silex_service',
'method' => 'methodName',
);
Q::schedule(
$job_name = 'job_to_run',
// 'silex_service'
// 'Some\Class\Name'
// 'silex_service#methodName'
// 'Some\Class\Name#methodName'
$job_params = array(
'param1' => 'value1',
'param2' => 'value2',
),
$start_time = new DateTime(),
// '+1 hour'
// '2014-07-20 00:15:00'
// 'now'
$job_options = array(
'queue_name' => 'default',
'depends_on' => array(
12, // job id
$job, // job object
'#isReadyToRun',
'Some\Other\Class#isReadyToRun',
'silex_service#isReadyToRun',
),
'priority' => 10,
// use `nice`'s semantics:
// - lower numbers run sooner
// - allow -20 to 20
'max_failures' => 3,
)
);
Q::recur(
$job_name = 'job_to_run',
// 'silex_service'
// 'Some\Class\Name'
// 'silex_service#methodName'
// 'Some\Class\Name#methodName'
$job_params = array(
'param1' => 'value1',
'param2' => 'value2',
),
// a name to refer to the job by
$known_as = 'job_to_run:every_12_hours',
// how often to run the job
$interval = '12:00:00',
// optional, will use the last run time
// without a base_time
$base_time = '2014-07-20 03:00:00',
$job_options = array(
'queue_name' => 'default',
'depends_on' => array(
12, // job id
$job, // job object
'#isReadyToRun',
'Some\Other\Class#isReadyToRun',
'silex_service#isReadyToRun',
),
'priority' => 10,
// use `nice`'s semantics:
// - lower numbers run sooner
// - allow -20 to 20
'max_failures' => 3,
)
);
namespace Some\Class;
class Name extends Job
{
public function run(array $job_params, array $job_options)
{
// do some stuff to process the job
if ($something_went_wrong) {
// something went wrong, but if there
// are attempts remaining try again
throw $this->retry();
} elseif ($something_else_went_wrong) {
// do not try again
throw $this->fail();
} elseif ($what_else_goes_wrong) {
throw new Exception('same as $this->retry()');
} elseif ($something_is_wrong_but_do_not_exit_yet) {
$this->setStatus(self::STATUS_RETRY);
} else {
// success is assumed, otherwise
}
}
public function isReadyToRun(array $job_params, array $job_options)
{
return $this->ready()
// limit to 5 simultaneous jobs of `get_class($this)` jobs
// per bucket name
->limitTo(
5,
array(
'bucket_name' => $job_params['bucket_name'],
)
)
// limit to 15 simultaneous jobs of `get_class($this)` jobs
->limitTo(15)
->check(function (array $job_params, array $job_options) {
return rand(1, 10) == 5;
})
// the chains are applied as an 'AND', but 'any()'
// allows for this 'OR' that
->any(
$this->ready()
->check(function (array $job_params, array $job_options) {
return rand(1, 3) == 1;
})
->check(function (array $job_params, array $job_options) {
return rand(1, 3) == 2;
})
,
$this->ready()
->check(function (array $job_params, array $job_options) {
return rand(1, 10) == 3;
})
)
;
}
}
Taken from experiences working with @zacharyrankin and @twenty7:
- Application pushes job to buffer queue (RabbitMQ)
- Job queue's buffer worker reads job from buffer queue (RabbitMQ)
- Buffer queue puts job into database (PostgreSQL)
- Superqueuer reads job from database (PostgreSQL)
- Superqueuer pushes job to worker queue (RabbitMQ)
- Job worker reads job from worker queue (RabbitMQ)
- Job worker runs job
-
Recurring jobs are stored in PostgreSQL database
-
Scheduler worker reads job from schedule database (PostgreSQL)
-
Any jobs that are due to be ran are queued to buffer queue with recurring job ID
-
After job is complete, the worker will update the next run time for the recurring job ID
- Is it possible for the job to successfully complete and be moved out of pending jobs but fail to be scheduled for the next run time? How do we handle these failures?
-
Things to consider:
- Should a job's next run time be an interval based on the start time or
end time of the previous job? Ideas:
- If there is a base time, all next run times should be an interval based on that time. So once a job finishes, the next run time would be the next available time that is a multiple of the defined interval plus the base start time.
- If there is not a base time, all next run times should be the most recent completion time plus the interval. Using the start time instead would potentially cause problems since a job could take 5 hours to run and be ran at an interval of 1 hour, which would mean the job would be scheduled to immediately re-run.
- Should a job's next run time be an interval based on the start time or
end time of the previous job? Ideas:
[program:program-name]
command=/path/to/process
process_name=%(program_name)s
numprocs=5
autostart=true
autorestart=true
startsecs=0
startretries=3
stopsignal=TERM
stopwaitsecs=10
redirect_stderr=false
stdout_logfile=/path/to/debug_log
stdout_logfile_maxbytes=10MB
stdout_logfile_backups=10
stderr_logfile=/path/to/error_log
stderr_logfile_maxbytes=10MB
stderr_logfile_backups=10