|
目录
一:拓展安装
- composer require enqueue/amqp-lib
复制代码 文档地址:https://github.com/php-enqueue/enqueue-dev/blob/master/docs/transport/amqp_lib.md
二:方法介绍
1:连接rabbitmq
- $factory = new AmqpConnectionFactory([
- 'host' => '192.168.6.88',//host
- 'port' => '5672',//端口
- 'vhost' => '/',//虚拟主机
- 'user' => 'admin',//账号
- 'pass' => 'admin',//密码
- ]);
- $context = $factory->createContext();
复制代码 2:声明主题
- //声明并创建主题
- $exchangeName = 'exchange';
- $fooTopic = $context->createTopic($exchangeName);
- $fooTopic->setType(AmqpTopic::TYPE_FANOUT);
- $context->declareTopic($fooTopic);
-
- //删除主题
- $context->deleteTopic($fooTopic);
复制代码 3:声明队列
- //声明并创建队列
- $queueName = 'rabbitmq';
- $fooQueue = $context->createQueue($queueName);
- $fooQueue->addFlag(AmqpQueue::FLAG_DURABLE);
- $context->declareQueue($fooQueue);
-
- //删除队列
- $context->deleteQueue($fooQueue);
复制代码 4:将队列绑定到主题
- $context->bind(new AmqpBind($fooTopic, $fooQueue));
复制代码 5:发送消息
- //向队列发送消息
- $message = $context->createMessage('Hello world!');
- $context->createProducer()->send($fooQueue, $message);
-
- //向队列发送优先消息
- $queueName = 'rabbitmq';
- $fooQueue = $context->createQueue(queueName);
- $fooQueue->addFlag(AmqpQueue::FLAG_DURABLE);
- //设置队列的最大优先级
- $fooQueue->setArguments(['x-max-priority' => 10]);
- $context->declareQueue($fooQueue);
-
- $message = $context->createMessage('Hello world!');
-
- $context->createProducer()
- ->setPriority(5) //设置优先级,优先级越高,消息越快到达消费者
- ->send($fooQueue, $message);
-
- //向队列发送延时消息
- $message = $context->createMessage('Hello world!');
-
- $context->createProducer()
- ->setDelayStrategy(new RabbitMqDlxDelayStrategy())
- ->setDeliveryDelay(5000) //消息延时5秒
- ->send($fooQueue, $message);
复制代码 6:消费消息【接收消息】
- //消费消息
- $consumer = $context->createConsumer($fooQueue);
-
- $message = $consumer->receive();
-
- // process a message
- //业务代码
-
- $consumer->acknowledge($message);//ack应答,通知rabbitmq成功,删除对应任务
- // $consumer->reject($message);ack应答,通知rabbitmq失败,不删除对应任务
-
-
- //订阅消费者
- $fooConsumer = $context->createConsumer($fooQueue);
-
- $subscriptionConsumer = $context->createSubscriptionConsumer();
- $subscriptionConsumer->subscribe($fooConsumer, function(Message $message, Consumer $consumer) {
- // process message
- //业务代码
- $consumer->acknowledge($message);//ack应答,通知rabbitmq成功,删除对应任务
- // $consumer->reject($message);ack应答,通知rabbitmq失败,不删除对应任务
-
- return true;
- });
- $subscriptionConsumer->consume();
-
- //清除队列消息
- $queueName = 'rabbitmq';
- $queue = $context->createQueue($queueName);
- $context->purgeQueue($queue);
复制代码 三:简单实现
1:发送消息
- //连接rabbitmq$factory = new AmqpConnectionFactory([ 'host' => '192.168.6.88', 'port' => '5672', 'vhost' => '/', 'user' => 'admin', 'pass' => 'admin', 'persisted' => false,]); $context = $factory->createContext();//声明主题$exchangeName = 'exchange';$fooTopic = $context->createTopic($exchangeName);$fooTopic->setType(AmqpTopic::TYPE_FANOUT);$context->declareTopic($fooTopic); //声明队列$queueName = 'rabbitmq';$fooQueue = $context->createQueue($queueName);$fooQueue->addFlag(AmqpQueue::FLAG_DURABLE);$context->declareQueue($fooQueue); //将队列绑定到主题$context->bind(new AmqpBind($fooTopic, $fooQueue)); //发送消息到队列$message = $context->createMessage('Hello world!'); $context->createProducer()->send($fooQueue, $message);
复制代码 2:消费消息
- $factory = new AmqpConnectionFactory([
- 'host' => '192.168.6.88',
- 'port' => '5672',
- 'vhost' => '/',
- 'user' => 'admin',
- 'pass' => 'admin',
- 'persisted' => false,
- ]);
- $context = $factory->createContext();
-
-
- $queueName = 'rabbitmq';
- $fooQueue = $context->createQueue($queueName);
-
-
-
- $fooConsumer = $context->createConsumer($fooQueue);
-
- $subscriptionConsumer = $context->createSubscriptionConsumer();
- $subscriptionConsumer->subscribe($fooConsumer, function(Message $message, Consumer $consumer) {
- // process message
- //业务代码
- $consumer->acknowledge($message);//ack应答,通知rabbitmq成功,删除对应任务
- // $consumer->reject($message);ack应答,通知rabbitmq失败,不删除对应任务
-
- return true;
- });
- $subscriptionConsumer->consume();
复制代码 到此这篇关于PHP使用enqueue/amqp-lib实现rabbitmq任务处理的文章就介绍到这了,更多相关PHP rabbitmq任务处理内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |
|