forked from tobi/delayed_job
-
Notifications
You must be signed in to change notification settings - Fork 953
/
Copy pathbase.rb
151 lines (126 loc) · 4.48 KB
/
base.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
module Delayed
module Backend
module Base
def self.included(base)
base.extend ClassMethods
end
module ClassMethods
# Add a job to the queue
def enqueue(*args)
job_options = Delayed::Backend::JobPreparer.new(*args).prepare
enqueue_job(job_options)
end
def enqueue_job(options)
new(options).tap do |job|
Delayed::Worker.lifecycle.run_callbacks(:enqueue, job) do
job.hook(:enqueue)
Delayed::Worker.delay_job?(job) ? job.save : job.invoke_job
end
end
end
def reserve(worker, max_run_time = Worker.max_run_time)
# We get up to 5 jobs from the db. In case we cannot get exclusive access to a job we try the next.
# this leads to a more even distribution of jobs across the worker processes
find_available(worker.name, worker.read_ahead, max_run_time).detect do |job|
job.lock_exclusively!(max_run_time, worker.name)
end
end
# Allow the backend to attempt recovery from reserve errors
def recover_from(_error); end
# Hook method that is called before a new worker is forked
def before_fork; end
# Hook method that is called after a new worker is forked
def after_fork; end
def work_off(num = 100)
warn '[DEPRECATION] `Delayed::Job.work_off` is deprecated. Use `Delayed::Worker.new.work_off instead.'
Delayed::Worker.new.work_off(num)
end
end
attr_reader :error
def error=(error)
@error = error
self.last_error = "#{error.message}\n#{error.backtrace.join("\n")}" if respond_to?(:last_error=)
end
def failed?
!!failed_at
end
alias_method :failed, :failed?
ParseObjectFromYaml = %r{\!ruby/\w+\:([^\s]+)} # rubocop:disable ConstantName
def name
@name ||= payload_object.respond_to?(:display_name) ? payload_object.display_name : payload_object.class.name
rescue DeserializationError
ParseObjectFromYaml.match(handler)[1]
end
def payload_object=(object)
@payload_object = object
self.handler = object.to_yaml
end
def payload_object
@payload_object ||= YAML.load_dj(handler)
rescue TypeError, LoadError, NameError, ArgumentError, SyntaxError, Psych::SyntaxError => e
raise DeserializationError, "Job failed to load: #{e.message}. Handler: #{handler.inspect}"
end
def invoke_job
Delayed::Worker.lifecycle.run_callbacks(:invoke_job, self) do
begin
hook :before
payload_object.perform
hook :success
rescue Exception => e # rubocop:disable RescueException
hook :error, e
raise e
ensure
hook :after
end
end
end
# Unlock this job (note: not saved to DB)
def unlock
self.locked_at = nil
self.locked_by = nil
end
def hook(name, *args)
if payload_object.respond_to?(name)
method = payload_object.method(name)
method.arity.zero? ? method.call : method.call(self, *args)
end
rescue DeserializationError # rubocop:disable HandleExceptions
end
def reschedule_at
if payload_object.respond_to?(:reschedule_at)
payload_object.reschedule_at(self.class.db_time_now, attempts)
else
self.class.db_time_now + (attempts**4) + 5
end
end
def max_attempts
payload_object.max_attempts if payload_object.respond_to?(:max_attempts)
end
def max_run_time
return unless payload_object.respond_to?(:max_run_time)
return unless (run_time = payload_object.max_run_time)
if run_time > Delayed::Worker.max_run_time
Delayed::Worker.max_run_time
else
run_time
end
end
def destroy_failed_jobs?
payload_object.respond_to?(:destroy_failed_jobs?) ? payload_object.destroy_failed_jobs? : Delayed::Worker.destroy_failed_jobs
rescue DeserializationError
Delayed::Worker.destroy_failed_jobs
end
def fail!
update_attributes(:failed_at => self.class.db_time_now)
end
protected
def set_default_run_at
self.run_at ||= self.class.db_time_now
end
# Call during reload operation to clear out internal state
def reset
@payload_object = nil
end
end
end
end