Skip to content
This repository has been archived by the owner on Jan 29, 2020. It is now read-only.

Commit

Permalink
Fixed the Memcached adapter + add getDelayed(), fetch(), fetchAll()
Browse files Browse the repository at this point in the history
  • Loading branch information
ezimuel authored and weierophinney committed Dec 20, 2011
1 parent 8c0e6d3 commit 4ff045c
Show file tree
Hide file tree
Showing 5 changed files with 254 additions and 53 deletions.
5 changes: 5 additions & 0 deletions src/Storage/Adapter/AbstractAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,11 @@ public function removePlugin(Plugin $plugin)
$registry = $this->getPluginRegistry();
if ($registry->contains($plugin)) {
$plugin->detach($this->events());
} else {
throw new Exception\LogicException(sprintf(
'Plugin of type "%s" already removed',
get_class($plugin)
));
}
$registry->detach($plugin);
return $this;
Expand Down
193 changes: 150 additions & 43 deletions src/Storage/Adapter/Memcached.php
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,21 @@ class Memcached extends AbstractAdapter
* @throws Exception
* @return void
*/
public function __construct()
public function __construct($options = null)
{
if (!extension_loaded('memcached')) {
throw new Exception\ExtensionNotLoadedException("Memcached extension is not loaded");
}

$this->memcached= new MemcachedResource();

if (!empty($options)) {
$this->setOptions($options);
}

$options= $this->getOptions();
$this->memcached->addServer($options->getServer(), $options->getPort());

}

/* options */
Expand Down Expand Up @@ -478,7 +485,14 @@ public function replaceItem($key, $value, array $options = array())
return $eventRs->last();
}

$result= $this->memcached->replace($internalKey, $value, $options['ttl']);
$internalKey = $options['namespace'] . $baseOptions->getNamespaceSeparator() . $key;
if (!$this->memcached->get($internalKey)) {
throw new Exception\ItemNotFoundException(
"Key '{$internalKey}' doesn't exist"
);
}

$result = $this->memcached->replace($internalKey, $value, $options['ttl']);

if ($result === false) {
$type = is_object($value) ? get_class($value) : gettype($value);
Expand Down Expand Up @@ -534,14 +548,15 @@ public function removeItem($key, array $options = array())

$internalKey = $options['namespace'] . $baseOptions->getNamespaceSeparator() . $key;

$result= $this->memcached->delete($internalKey);
$result = $this->memcached->delete($internalKey);

if ($result === false) {
if (!$options['ignore_missing_items']) {
throw new Exception\ItemNotFoundException("Key '{$internalKey}' not found");
}
}

$result = true;

return $this->triggerPost(__FUNCTION__, $args, $result);
} catch (\Exception $e) {
return $this->triggerException(__FUNCTION__, $args, $e);
Expand Down Expand Up @@ -672,56 +687,84 @@ public function decrementItem($key, $value, array $options = array())

/* non-blocking */

/**
* Find items.
*
* Options:
* - ttl <float> optional
* - The time-to-life (Default: ttl of object)
* - namespace <string> optional
* - The namespace to use (Default: namespace of object)
* - tags <array> optional
* - Tags to search for used with matching modes of
* Zend\Cache\Storage\Adapter::MATCH_TAGS_*
*
* @param int $mode Matching mode (Value of Zend\Cache\Storage\Adapter::MATCH_*)
* @param array $options
* @return boolean
* @throws Exception
* @see fetch()
* @see fetchAll()
*
* @triggers find.pre(PreEvent)
* @triggers find.post(PostEvent)
* @triggers find.exception(ExceptionEvent)
*/
public function find($mode = self::MATCH_ACTIVE, array $options=array())
{
throw Exception\RuntimeException(sprintf(
'%s is not yet implemented',
__METHOD__
));
}

/**
* Fetches the next item from result set
*
* @return array|boolean The next item or false
* @throws Exception
* @see fetchAll()
* @see fetchAll()
*
* @triggers fetch.pre(PreEvent)
* @triggers fetch.post(PostEvent)
* @triggers fetch.exception(ExceptionEvent)
*/
public function fetch()
{
throw Exception\RuntimeException(sprintf(
'%s is not yet implemented',
__METHOD__
));
if (!$this->stmtActive) {
return false;
}

$args = new ArrayObject();

try {
$eventRs = $this->triggerPre(__FUNCTION__, $args);
if ($eventRs->stopped()) {
return $eventRs->last();
}

$prefixL = strlen($this->stmtOptions['namespace'] . $this->getOptions()->getNamespaceSeparator());

if (!$this->stmtIterator) {
// clear stmt
$this->stmtActive = false;
$this->stmtIterator = null;
$this->stmtOptions = null;

$result = false;
} else {
$result = $this->memcached->fetch();
if (!empty($result)) {
$select = $this->stmtOptions['select'];
if (in_array('key', $select)) {
$result['key'] = substr($result['key'], $prefixL);
}
}
}

return $this->triggerPost(__FUNCTION__, $args, $result);
} catch (\Exception $e) {
return $this->triggerException(__FUNCTION__, $args, $e);
}
}

/**
* FetchAll
*
* @throws Exception
* @return array
*/
public function fetchAll()
{
$prefixL = strlen($this->stmtOptions['namespace'] . $this->getOptions()->getNamespaceSeparator());

$result = $this->memcached->fetchAll();

if ($result === false) {
throw new Exception\RuntimeException("Memcached::fetchAll() failed");
}

$select = $this->stmtOptions['select'];

foreach ($result as &$elem) {
if (in_array('key', $select)) {
$elem['key'] = substr($elem['key'], $prefixL);
} else {
unset($elem['key']);
}
}

return $result;
}

/* cleaning */

/**
Expand Down Expand Up @@ -807,8 +850,7 @@ public function getCapabilities()
'object' => 'object',
'resource' => false,
),
'supportedMetadata' => array(
),
'supportedMetadata' => array(),
'maxTtl' => 0,
'staticTtl' => false,
'tagging' => false,
Expand All @@ -830,6 +872,71 @@ public function getCapabilities()
}
}

/**
* Get items that were marked to delay storage for purposes of removing blocking
*
* @param array $keys
* @param array $options
* @return bool
* @throws Exception
*
* @triggers getDelayed.pre(PreEvent)
* @triggers getDelayed.post(PostEvent)
* @triggers getDelayed.exception(ExceptionEvent)
*/
public function getDelayed(array $keys, array $options = array())
{
$baseOptions = $this->getOptions();
if ($this->stmtActive) {
throw new Exception\RuntimeException('Statement already in use');
} elseif (!$baseOptions->getReadable()) {
return false;
} elseif (!$keys) {
return true;
}

$this->normalizeOptions($options);
if (isset($options['callback']) && !is_callable($options['callback'], false)) {
throw new Exception\InvalidArgumentException('Invalid callback');
}

$args = new ArrayObject(array(
'key' => & $key,
'options' => & $options,
));

try {
$eventRs = $this->triggerPre(__FUNCTION__, $args);
if ($eventRs->stopped()) {
return $eventRs->last();
}

$prefix = $options['namespace'] . $baseOptions->getNamespaceSeparator();

$search = array();
foreach ($keys as $key) {
$search[] = $prefix.$key;
}

$this->stmtIterator = $this->memcached->getDelayed($search);

$this->stmtActive = true;
$this->stmtOptions = &$options;

if (isset($options['callback'])) {
$callback = $options['callback'];
while (($item = $this->fetch()) !== false) {
call_user_func($callback, $item);
}
}

$result = true;
return $this->triggerPost(__FUNCTION__, $args, $result);
} catch (\Exception $e) {
return $this->triggerException(__FUNCTION__, $args, $e);
}
}

/**
* Get storage capacity.
*
Expand Down
53 changes: 53 additions & 0 deletions src/Storage/Adapter/MemcachedOptions.php
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,20 @@ class MemcachedOptions extends AdapterOptions
'tcp_nodelay' => MemcachedResource::OPT_TCP_NODELAY,
);

/**
* Memcached server address
*
* @var string
*/
protected $server = 'localhost';

/**
* Memcached port
*
* @var integer
*/
protected $port = 11211;

/**
* Whether or not to enable binary protocol for communication with server
*
Expand Down Expand Up @@ -202,6 +216,37 @@ class MemcachedOptions extends AdapterOptions
*/
protected $tcpNodelay = false;

public function setServer($server)
{
$this->server= $server;
return $this;
}

public function getServer()
{
return $this->server;
}

public function setPort($port)
{
if ((!is_int($port) && !is_numeric($port))
|| 0 > $port
) {
throw new Exception\InvalidArgumentException(sprintf(
'%s expects a positive integer',
__METHOD__
));
}

$this->port= $port;
return $this;
}

public function getPort()
{
return $this->port;
}

/**
* Set flag indicating whether or not to enable binary protocol for
* communication with server
Expand Down Expand Up @@ -708,6 +753,10 @@ public function getServerFailureLimit()
*/
public function setSocketSendSize($socketSendSize)
{
if ($socketSendSize === null) {
return $this;
}

if ((!is_int($socketSendSize) && !is_numeric($socketSendSize))
|| 0 > $socketSendSize
) {
Expand Down Expand Up @@ -739,6 +788,10 @@ public function getSocketSendSize()
*/
public function setSocketRecvSize($socketRecvSize)
{
if ($socketRecvSize === null) {
return $this;
}

if ((!is_int($socketRecvSize) && !is_numeric($socketRecvSize))
|| 0 > $socketRecvSize
) {
Expand Down
Loading

0 comments on commit 4ff045c

Please sign in to comment.