This repository has been archived by the owner on Jul 19, 2023. It is now read-only.
forked from cfrutos/li3_gearman
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathGearman.php
170 lines (153 loc) · 5.74 KB
/
Gearman.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
<?php
/**
* Lithium: the most rad php framework
*
* @copyright Copyright 2012, Mariano Iglesias (http://marianoiglesias.com.ar)
* @license http://opensource.org/licenses/bsd-license.php The BSD License
*/
namespace li3_gearman;
use lithium\aop\Filters;
use lithium\core\Adaptable;
use lithium\core\ConfigException;
class Gearman extends Adaptable
{
/**
* Stores configurations for various authentication adapters.
*
* @var object `Collection` of authentication configurations.
*/
protected static $_configurations = [];
/**
* Libraries::locate() compatible path to adapters for this class.
*
* @see lithium\core\Libraries::locate()
* @var string Dot-delimited path.
*/
protected static $_adapters = 'adapter.queue';
/**
* Run/schedule a job on a configuration
*
* @param string $configName Configuration to use
* @param string $action Action name (job) to execute
* @param array $args Arguments to pass to the action
* @param array $options Extra options (used by adapter)
* @return mixed Returned value by adapter's run() method
*/
public static function run($configName, $action, array $args = [], array $options = [])
{
$config = static::getConfig($configName);
$filters = (array) $config['filters'];
foreach ($filters as $currentFilter) {
Filters::apply(get_called_class(), __FUNCTION__, $currentFilter);
}
$params = compact('action', 'args', 'configName');
return Filters::run(get_called_class(), __FUNCTION__, $params, function ($params) use ($options) {
return self::adapter($params['configName'])->run(
$params['action'],
$params['args'],
['configName' => $params['configName']] + $options
);
});
}
/**
* Executes job on a configuration
*
* @param string $configName Configuration to use
* @param string $action Action name (job) to execute
* @param array $args Arguments to pass to the action
* @param array $env Settings to merge on $_SERVER
* @param array $workload Full workload
* @return mixed Returned value by adapter's execute() method
*/
public static function execute($configName, $action, array $args, array $env = [], array $workload = [])
{
$config = static::getConfig($configName);
$filters = (array) $config['filters'];
foreach ($filters as $currentFilter) {
Filters::apply(get_called_class(), __FUNCTION__, $currentFilter);
}
$params = compact('action', 'args', 'env', 'workload');
return Filters::run(get_called_class(), __FUNCTION__, $params, function ($params) use ($configName) {
return self::adapter($configName)->execute(
$params['action'],
$params['args'],
$params['env'],
$params['workload']
);
});
}
/**
* Processes a scheduled job
*
* @param string $configName Configuration to use
* @return mixed Returned value by adapter's scheduled() method
*/
public static function scheduled($configName)
{
$config = static::getConfig($configName);
$filters = (array) $config['filters'];
foreach ($filters as $currentFilter) {
Filters::apply(get_called_class(), __FUNCTION__, $currentFilter);
}
return Filters::run(get_called_class(), __FUNCTION__, [], function ($params) use ($configName) {
return self::adapter($configName)->scheduled();
});
}
/**
* Gets the given config, checking for validity
*
* @param string $name Configuration name
* @return array Configuration
*/
public static function getConfig($name)
{
if (($config = static::_config($name)) === null) {
throw new ConfigException("Configuration {$config} has not been defined.");
} elseif (!is_array($config)) {
throw new ConfigException('Invalid configuration: not an array');
} elseif (empty($config['servers'])) {
throw new ConfigException('No servers defined. Add them to the "servers" setting');
}
return $config;
}
/**
* Initializes configuration with default settings
*
* @param string $name The name of the configuration which is being accessed. This is the key
* name containing the specific set of configuration passed into `config()`.
* @param array $config Contains the configuration assigned to `$name`. If this configuration is
* segregated by environment, then this will contain the configuration for the
* current environment.
* @return array Returns the final array of settings for the given named configuration.
*/
protected static function _initConfig($name, $config)
{
$defaults = [
'filters' => [],
'servers' => []
];
$config = parent::_initConfig($name, $config) + $defaults;
foreach (['filters', 'servers'] as $arrayVar) {
if (!empty($config[$arrayVar]) && !is_array($config[$arrayVar])) {
$config[$arrayVar] = (array) $config[$arrayVar];
} elseif (empty($config[$arrayVar])) {
$config[$arrayVar] = [];
}
}
if (empty($config['adapter'])) {
$config['adapter'] = 'Job';
}
return $config;
}
/**
* Executed through the Gearmand shell for testing that a worker is alive
*/
public static function ping($scheduled = false)
{
if ($scheduled) {
echo "[Scheduled PING] OK\n";
return;
}
return 'OK';
}
}