• 冒险村物语
  • 英雄无敌3塔防
  • 驾考家园手游

php amqp 消息队列 RabbitMQ 交换器类型 直连 (三)

2015-05-27

1、AMQP_EX_TYPE_DIRECT:直连型

直连型又包括: 1对1 和1对N(N对1、 N对N)
/
接收端receive.php代码如下
connect();$channel = new AMQPChannel($connect);$exchange = new AMQPExchange($channel);$exchange->setName('exchange');$exchange->setType(AMQP_EX_TYPE_DIRECT);$exchange->declare();$queue = new AMQPQueue($channel);$queue->setName('logs');$queue->declare();$queue->bind('exchange', 'logs');while (true) {    $queue->consume('callback');}$connection->close();function callback($envelope, $queue) {    var_dump($envelope->getBody());    $queue->nack($envelope->getDeliveryTag());}

发送端send.php代码如下
connect();$channel = new AMQPChannel($connect);$exchange = new AMQPExchange($channel);$exchange->setName('exchange');$exchange->setType(AMQP_EX_TYPE_DIRECT);$exchange->declare();$exchange->publish('direct type test','logs');var_dump("Send Message OK");$connect->disconnect();

运行结果如图所示
/



/创建receive_one.php和receive_two.php 并把send.php代码改成如下代码方便我们观看receive_one.php 和 receive_two.php 代码相同 或者用dos运行多个接收端
connect();$channel = new AMQPChannel($connect);$exchange = new AMQPExchange($channel);$exchange->setName('exchange');$exchange->setType(AMQP_EX_TYPE_DIRECT);$exchange->declare();$queue = new AMQPQueue($channel);$queue->setName('logs');@$queue->declare();$queue->bind('exchange', 'logs');while (true) {    $queue->consume('callback');}$connection->close();function callback($envelope, $queue) {    var_dump($envelope->getBody());    $queue->nack($envelope->getDeliveryTag());}




send.php
connect();$channel = new AMQPChannel($connect);$exchange = new AMQPExchange($channel);$exchange->setName('exchange');$exchange->setType(AMQP_EX_TYPE_DIRECT);$exchange->declare();for ($index = 1; $index publish($index,'logs');    var_dump("Send:$index");}$exchange->delete();$connect->disconnect();

运行结果如下/

列队会把消息分配给每一个接收端分配处理这里看似完美但是如果想要更好的处理不同的任务就需要 公平调度
比如当1、3处理的都是简单的人 2、4都是处理的复杂的任务 如果任务过多时 receive_one.php是空闲的而receive_two.php是任务繁重的我们进行如下测试send.php改成5改成50
for ($index = 1; $index publish($index,'logs');    var_dump("Send:$index");}

receive_two.php 加上 sleep(3)
function callback($envelope, $queue) {    var_dump($envelope->getBody());    sleep(3);    $queue->nack($envelope->getDeliveryTag());}

我们运行程序结果如下
/

receive_one全部运行完而receive_two才运行一个 之后receive_one一直空闲我们可以通过 在接收端设置$channel->setPrefetchCount(1);
任务没人完成前不接收新的消息把消息发送给其他接收端
如下receive_one.php 和 receive_two.php
$channel = new AMQPChannel($connect);
改成如下
$channel = new AMQPChannel($connect);$channel->setPrefetchCount(1);




(免责声明:文章内容如涉及作品内容、版权和其它问题,请及时与我们联系,我们将在第一时间删除内容,文章内容仅供参考)

人气推荐

知识阅读

精彩推荐

  • 游戏
  • 软件
查看更多>>