getCacheManager();
$this->directory = $modx->getCachePath() . 'registry/';
$this->directory .= isset($options['directory'])
? $options['directory']
: $key;
$this->directory = rtrim($this->directory, '/') . '/';
}
/**
* Make sure the register can write to the specified $directory.
*
* {@inheritdoc}
*/
public function connect(array $attributes = array()) {
$connected = false;
if (is_string($this->directory) && strlen($this->directory)) {
$connected = $this->modx->cacheManager->writeTree($this->directory);
}
return $connected;
}
/**
* Clear the register messages.
*
* {@inheritdoc}
*/
public function clear($topic)
{
$topicDirectory = $this->directory . ltrim($this->sanitizePath($topic), '/');
return $this->modx->cacheManager->deleteTree(
realpath($topicDirectory),
array(
'extensions' => array('.msg.php')
)
);
}
/**
* {@inheritdoc}
*
* This implementation supports the following options and default behavior:
*
* - msg_limit: Only poll until the specified limit of messages has
* been digested. Default is 5 messages.
* - time_limit: Poll for new messages for a specified number of
* seconds. Default is the result of the php time_limit system variable.
* - poll_limit: Only poll for new subscriptions a specified number
* of times. Default is unlimited.
* - poll_interval: Wait a specified number of seconds between each
* additional polling iteration, after the initial one. Default is no
* interval.
* - remove_read: Remove the message immediately upon digesting it.
* Default is true.
* - include_keys: Include the message keys in the array of messages returned.
* Default is false.
*
*/
public function read(array $options = array()) {
$this->__kill = false;
$messages = array();
$topicMessages = array();
$msgLimit = isset($options['msg_limit']) ? intval($options['msg_limit']) : 5;
$timeLimit = isset($options['time_limit']) ? intval($options['time_limit']) : ini_get('time_limit');
$pollLimit = isset($options['poll_limit']) ? intval($options['poll_limit']) : 0;
$pollInterval = isset($options['poll_interval']) ? intval($options['poll_interval']) : 0;
$removeRead = isset($options['remove_read']) ? (boolean) $options['remove_read'] : true;
$includeKeys = isset($options['include_keys']) ? (boolean) $options['include_keys'] : false;
$startTime = microtime(true);
$time = $timeLimit <= 0 ? -1 : $startTime;
$expires = $startTime + $timeLimit;
$msgCount = 0;
$iteration = 0;
while ($time < $expires && $msgCount < $msgLimit && !$this->__kill) {
if ($iteration > 0) {
if ($pollLimit > 0 && $iteration >= $pollLimit) {
break;
}
if ($pollInterval > 0) sleep($pollInterval);
}
$iteration++;
foreach ($this->subscriptions as $subIdx => $topic) {
$topicMessages = array();
$topicDirectory = $this->directory;
$topicDirectory.= $topic[0] == '/' ? substr($topic, 1) : $topic ;
if (is_dir($topicDirectory)) {
$dirListing = $this->getSortedDirectoryListing($topicDirectory);
if (!empty($dirListing)) {
foreach ($dirListing as $idx => $entry) {
if ($msgCount >= $msgLimit || $this->__kill) break;
if ($newMsg = $this->_readMessage($topicDirectory . $entry, $removeRead)) {
if (!$includeKeys) {
$topicMessages[] = $newMsg;
} else {
$msgKey = substr($entry, 0, strpos($entry, '.msg.php'));
$topicMessages[$msgKey] = $newMsg;
}
$msgCount++;
}
}
}
}
elseif ($newMsg = $this->_readMessage($topicDirectory . '.msg.php', $removeRead)) {
if (!$includeKeys) {
$topicMessages[] = $newMsg;
} else {
$topicMessages[$topicDirectory] = $newMsg;
}
$msgCount++;
}
}
if (!empty($topicMessages)) {
if (!$includeKeys) {
$messages = $messages + $topicMessages;
} else {
$messages = array_merge($messages, $topicMessages);
}
}
$time = microtime(true);
}
return $messages;
}
/**
* Get list of topic messages from a directory sorted by modified date.
*
* @param string $dir A valid directory path.
* @return array An array of topic messages sorted by modified date.
*/
private function getSortedDirectoryListing($dir) {
$listing = array();
$d = new DirectoryIterator($dir);
$idx = 0;
foreach ($d as $f) {
$filename = $f->getFilename();
if ($f->isFile() && strpos($filename, '.msg.php')) {
$listing[] = $filename;
$idx++;
}
}
if (!empty($listing)) sort($listing);
return $listing;
}
/**
* Read a message file from the queue.
*
* @todo Implement support for reading various message types, other than
* executable PHP format.
* @access private
* @param string $filename An absolute path to a message file to read.
* @param boolean $remove Indicates if the message file should be deleted
* once the message is read from it.
*/
private function _readMessage($filename, $remove = true) {
$message = null;
if (file_exists($filename)) {
$message = @ include($filename);
if ($remove) {
@ unlink($filename);
}
}
return $message;
}
/**
* {@inheritdoc}
*
* This implementation provides support for sending messages using either
* time-based indexes so they are consumed in the order they are produced,
* or named indexes typically used when consumers want to subscribe to a
* specific, unique message. Individual messages or message collections
* passed in numerically indexed arrays are treated as time-based messages
* and message collections passed in associative arrays are treated as named
* messages. e.g., to send a single message as named, wrap it in an array
* with the intended message name as the key.
*
* This implementation also supports a message_type option to indicate the
* format of the message being sent to the register. Currently only supports
* executable PHP format.
*
* Other implementation specific options include:
*
* - delay: Number of seconds to delay the message. This option is only
* supported for time-based messages.
* - ttl: Number of seconds the message is valid in the queue.
* Default is forever or 0.
* - kill: Tells a message consumer to stop consuming any more
* messages after reading any message sent with this option.
*
*
* @todo Implement support for sending various message types, other than
* executable PHP format.
*/
public function send($topic, $message, array $options = array()) {
$sent = false;
if (empty($topic) || $topic[0] != '/') $topic = $this->_currentTopic . $topic;
$topicIdx = array_search($topic, $this->subscriptions);
$topic = substr($topic, 1);
if ($topicIdx !== false) {
$messageType = isset($options['message_type']) ? $options['message_type'] : 'php';
$topicDirectory = $this->directory . $topic;
if ($topicDirectory[strlen($topicDirectory) - 1] != '/') $topicDirectory .= '/';
if (!is_array($message)) {
$message = array($message);
}
foreach ($message as $msgIdx => $msg) {
if (is_scalar($msg) || is_array($msg) || is_object($msg)) {
switch ($messageType) {
//TODO: implement more message types
case 'php' :
default :
$timestamp = isset($options['delay']) ? time() + intval($options['delay']) : time();
$expires = isset($options['ttl']) && !empty($options['ttl']) ? time() + intval($options['ttl']) : 0;
$kill = isset($options['kill']) ? (boolean) $options['kill'] : false;
if (!is_int($msgIdx)) {
$msgKey = $msgIdx;
} else {
$msgKey = strftime('%Y%m%dT%H%M%S', $timestamp) . '-' . sprintf("%03d", $msgIdx);
}
$filename = $topicDirectory . $msgKey . '.msg.php';
$content = " 0) $content.= "if (time() > {$expires}) return null;\n";
if ($kill) $content.= "\$this->__kill = true;\n";
$content.= 'return ' . var_export($msg, true) . ";\n";
$sent = $this->modx->cacheManager->writeFile($filename, $content);
}
}
}
}
return $sent;
}
public function close() {
return true;
}
/**
* Sanitize the specified path
*
* @param string $path The path to clean
* @return string The sanitized path
*/
protected function sanitizePath($path) {
return preg_replace(array("/\.*[\/|\\\]/i", "/[\/|\\\]+/i"), array('/', '/'), $path);
}
}