2015-05-27
memche消息队列的原理就是在key上做文章,用以做一个连续的数字加上前缀记录序列化以后消息或者日志。然后通过定时程序将内容落地到文件或者数据库。
php实现消息队列的用处比如在做发送邮件时发送大量邮件很费时间的问题,那么可以采取队列。
方便实现队列的轻量级队列服务器是:
starling支持memcache协议的轻量级持久化服务器
https://github.com/starling/starling
Beanstalkd轻量、高效,支持持久化,每秒可处理3000左右的队列
http://kr.github.com/beanstalkd/
php中也可以使用memcache/memcached来实现消息队列。
connect('127.0.0.1', 11211);}return $mc;}/*** mc 计数器,增加计数并返回新的计数* @param string $key 计数器* @param int $offset 计数增量,可为负数.0为不改变计数* @param int $time 时间* @return int/false 失败是返回false,成功时返回更新计数器后的计数*/static public function set_counter( $key, $offset, $time=0 ){$mc = self::mc_init();$val = $mc->get($key);if( !is_numeric($val) || $val set( $key, 0, $time );if( !$ret ) return false;$val = 0;}$offset = intval( $offset );if( $offset > 0 ){return $mc->increment( $key, $offset );}elseif( $offset decrement( $key, -$offset );}return $val;}/*** 写入队列* @param string $key* @param mixed $value* @return bool*/static public function input( $key, $value ){$mc = self::mc_init();$w_key = self::PREFIX.$key.'W';$v_key = self::PREFIX.$key.self::set_counter($w_key, 1);return $mc->set( $v_key, $value );}/*** 读取队列里的数据* @param string $key* @param int $max 最多读取条数* @return array*/static public function output( $key, $max=100 ){$out = array();$mc = self::mc_init();$r_key = self::PREFIX.$key.'R';$w_key = self::PREFIX.$key.'W';$r_p = self::set_counter( $r_key, 0 );//读指针$w_p = self::set_counter( $w_key, 0 );//写指针if( $r_p == 0 ) $r_p = 1;while( $w_p >= $r_p ){if( --$max get( $v_key );$mc->delete($v_key);}return $out;}}/**使用方法:QMC::input($key, $value );//写入队列$list = QMC::output($key);//读取队列*/?>
基于PHP共享内存实现的消息队列:
shmId = shmop_open($shmkey, "c", 0644, $this->memSize );$this->maxQSize = $this->memSize / $this->blockSize;// 申?一个信号量$this->semId = sem_get($shmkey, 1);sem_acquire($this->semId); // 申请进入临界区$this->init();}private function init(){if ( file_exists($this->filePtr) ){$contents = file_get_contents($this->filePtr);$data = explode( '|', $contents );if ( isset($data[0]) && isset($data[1])){$this->front = (int)$data[0];$this->rear = (int)$data[1];}}}public function getLength(){return (($this->rear - $this->front + $this->memSize) % ($this->memSize) )/$this->blockSize;}public function enQueue( $value ){if ( $this->ptrInc($this->rear) == $this->front ){ // 队满return false;}$data = $this->encode($value);shmop_write($this->shmId, $data, $this->rear );$this->rear = $this->ptrInc($this->rear);return true;}public function deQueue(){if ( $this->front == $this->rear ){ // 队空return false;}$value = shmop_read($this->shmId, $this->front, $this->blockSize-1);$this->front = $this->ptrInc($this->front);return $this->decode($value);}private function ptrInc( $ptr ){return ($ptr + $this->blockSize) % ($this->memSize);}private function encode( $value ){$data = serialize($value) . "__eof";echo '';echo strlen($data);echo '';echo $this->blockSize -1;echo '';if ( strlen($data) > $this->blockSize -1 ){throw new Exception(strlen($data)." is overload block size!");}return $data;}private function decode( $value ){$data = explode("__eof", $value);return unserialize($data[0]);}public function __destruct(){$data = $this->front . '|' . $this->rear;file_put_contents($this->filePtr, $data);sem_release($this->semId); // 出临界区, 释放信号量}}/*// 进队操作$shmq = new ShmQueue();$data = 'test data';$shmq->enQueue($data);unset($shmq);// 出队操作$shmq = new ShmQueue();$data = $shmq->deQueue();unset($shmq);*/?>
对于一个很大的消息队列,频繁进行进行大数据库的序列化 和 反序列化,有太耗费。下面是我用PHP 实现的一个消息队列,只需要在尾部插入一个数据,就操作尾部,不用操作整个消息队列进行读取,与操作。但是,这个消息队列不是线程安全的,我只是尽量的避免了冲突的可能性。如果消息不是非常的密集,比如几秒钟才一个,还是可以考虑这样使用的。
如果你要实现线程安全的,一个建议是通过文件进行锁定,然后进行操作。下面是代码:
代码如下:
class Memcache_Queue { private $memcache; private $name; private $prefix; function __construct($maxSize, $name, $memcache, $prefix = "__memcache_queue__") { if ($memcache == null) { throw new Exception("memcache object is null, new the object first."); } $this->memcache = $memcache; $this->name = $name; $this->prefix = $prefix; $this->maxSize = $maxSize; $this->front = 0; $this->real = 0; $this->size = 0; } function __get($name) { return $this->get($name); } function __set($name, $value) { $this->add($name, $value); return $this; } function isEmpty() { return $this->size == 0; } function isFull() { return $this->size == $this->maxSize; } function enQueue($data) { if ($this->isFull()) { throw new Exception("Queue is Full"); } $this->increment("size"); $this->set($this->real, $data); $this->set("real", ($this->real + 1) % $this->maxSize); return $this; } function deQueue() { if ($this->isEmpty()) { throw new Exception("Queue is Empty"); } $this->decrement("size"); $this->delete($this->front); $this->set("front", ($this->front + 1) % $this->maxSize); return $this; } function getTop() { return $this->get($this->front); } function getAll() { return $this->getPage(); } function getPage($offset = 0, $limit = 0) { if ($this->isEmpty() || $this->size getKeyByPos(($this->front + $offset) % $this->maxSize); $num = 1; for ($pos = ($this->front + $offset + 1) % $this->maxSize; $pos != $this->real; $pos = ($pos + 1) % $this->maxSize) { $keys[] = $this->getKeyByPos($pos); $num++; if ($limit > 0 && $limit == $num) { break; } } return array_values($this->memcache->get($keys)); } function makeEmpty() { $keys = $this->getAllKeys(); foreach ($keys as $value) { $this->delete($value); } $this->delete("real"); $this->delete("front"); $this->delete("size"); $this->delete("maxSize"); } private function getAllKeys() { if ($this->isEmpty()) { return array(); } $keys[] = $this->getKeyByPos($this->front); for ($pos = ($this->front + 1) % $this->maxSize; $pos != $this->real; $pos = ($pos + 1) % $this->maxSize) { $keys[] = $this->getKeyByPos($pos); } return $keys; } private function add($pos, $data) { $this->memcache->add($this->getKeyByPos($pos), $data); return $this; } private function increment($pos) { return $this->memcache->increment($this->getKeyByPos($pos)); } private function decrement($pos) { $this->memcache->decrement($this->getKeyByPos($pos)); } private function set($pos, $data) { $this->memcache->set($this->getKeyByPos($pos), $data); return $this; } private function get($pos) { return $this->memcache->get($this->getKeyByPos($pos)); } private function delete($pos) { return $this->memcache->delete($this->getKeyByPos($pos)); } private function getKeyByPos($pos) { return $this->prefix . $this->name . $pos; } }
1
CI框架连接数据库配置操作以及多数据库操作
09-05
2
asp 简单读取数据表并列出来 ASP如何快速从数据库读取大量数据
05-17
3
C语言关键字及其解释介绍 C语言32个关键字详解
04-05
4
C语言中sizeof是什么意思 c语言里sizeof怎样用法详解
04-26
5
PHP中的魔术方法 :__construct, __destruct , __call, __callStatic,__get, __set, __isset, __unset , __sleep,
09-05
6
PHP中的(++i)前缀自增 和 (i++)后缀自增
09-05
7
最简单的asp登陆界面代码 asp登陆界面源代码详细介绍
04-12
8
常用dos命令及语法
09-27
PHP中include和require区别之我见
2014-09-05
将视频设置为Android手机开机动画的教程
2014-12-11
php递归返回值的问题
2014-09-05
如何安装PHPstorm并配置方法教程 phpstorm安装后要进行哪些配置
2017-05-03
PHP 教程之如何使用BLOB存取图片信息实例
2014-09-05
java中的info是什么意思
2022-03-24
单片机编程好学吗?单片机初学者怎样看懂代码
2022-03-21
学ug编程如何快速入门?
2022-03-17
PHP数组函数array
2014-09-05
IcePHP框架中的快速后台中的通用CRUD功能框架
2014-09-05
真实枪械武器模拟器游戏下载v2.177 安卓版
射击枪战 194.6M
下载果汁四溅2中文版下载v1.9.8 安卓版
休闲益智 81.7M
下载我飞刀玩得贼6手游下载v2.2.7 安卓版
休闲益智 73.9M
下载方块冒险下载v1.1.9 安卓版
休闲益智 19.7M
下载我是鸽手游戏下载v2.0.0 安卓版
休闲益智 41.3M
下载耶小兔子2最新中文版下载v1.2.9 安卓版
休闲益智 60.9M
下载弓箭大师你就是王者游戏下载v2.0.1 安卓最新版
其它手游 164.4M
下载弓箭大师你就是王者官方版下载v2.0.1 安卓版
其它手游 164.4M
下载3d狙击王者手机游戏下载v1.5 安卓版
下载
足球冲鸭手机版下载v1.0.16.404.401.0116 安卓版
下载
有种来找我手游下载v1.4.8 安卓版
下载
忍者跳跳跳官方版下载v1.0.2 安卓版
下载
百战斗斗堂拇指游戏下载v5.8 安卓版
下载
梦幻斗斗堂手游下载v5.8 安卓版
下载
百战斗斗堂4399版下载v5.8 安卓版
下载
百战斗斗堂九游手游下载v5.8 安卓版
下载