设为首页收藏本站
天天打卡

 找回密码
 立即注册
搜索
查看: 75|回复: 11

PHP使用enqueue/amqp-lib实现rabbitmq任务处理

[复制链接]

1

主题

57

回帖

137

积分

注册会员

积分
137
发表于 2024-4-20 08:20:21 | 显示全部楼层 |阅读模式
目录


一:拓展安装
  1. composer require enqueue/amqp-lib
复制代码
文档地址:https://github.com/php-enqueue/enqueue-dev/blob/master/docs/transport/amqp_lib.md

二:方法介绍


1:连接rabbitmq
  1. $factory = new AmqpConnectionFactory([
  2.     'host' => '192.168.6.88',//host
  3.     'port' => '5672',//端口
  4.     'vhost' => '/',//虚拟主机
  5.     'user' => 'admin',//账号
  6.     'pass' => 'admin',//密码
  7. ]);
  8. $context = $factory->createContext();
复制代码
2:声明主题
  1. //声明并创建主题
  2. $exchangeName = 'exchange';
  3. $fooTopic = $context->createTopic($exchangeName);
  4. $fooTopic->setType(AmqpTopic::TYPE_FANOUT);
  5. $context->declareTopic($fooTopic);

  6. //删除主题
  7. $context->deleteTopic($fooTopic);
复制代码
3:声明队列
  1. //声明并创建队列
  2. $queueName = 'rabbitmq';
  3. $fooQueue = $context->createQueue($queueName);
  4. $fooQueue->addFlag(AmqpQueue::FLAG_DURABLE);
  5. $context->declareQueue($fooQueue);

  6. //删除队列
  7. $context->deleteQueue($fooQueue);
复制代码
4:将队列绑定到主题
  1. $context->bind(new AmqpBind($fooTopic, $fooQueue));
复制代码
5:发送消息
  1. //向队列发送消息
  2. $message = $context->createMessage('Hello world!');
  3. $context->createProducer()->send($fooQueue, $message);

  4. //向队列发送优先消息
  5. $queueName = 'rabbitmq';
  6. $fooQueue = $context->createQueue(queueName);
  7. $fooQueue->addFlag(AmqpQueue::FLAG_DURABLE);
  8. //设置队列的最大优先级
  9. $fooQueue->setArguments(['x-max-priority' => 10]);
  10. $context->declareQueue($fooQueue);

  11. $message = $context->createMessage('Hello world!');

  12. $context->createProducer()
  13.     ->setPriority(5) //设置优先级,优先级越高,消息越快到达消费者
  14.     ->send($fooQueue, $message);

  15. //向队列发送延时消息
  16. $message = $context->createMessage('Hello world!');

  17. $context->createProducer()
  18.     ->setDelayStrategy(new RabbitMqDlxDelayStrategy())
  19.     ->setDeliveryDelay(5000) //消息延时5秒
  20.     ->send($fooQueue, $message);
复制代码
6:消费消息【接收消息】
  1. //消费消息
  2. $consumer = $context->createConsumer($fooQueue);

  3. $message = $consumer->receive();

  4. // process a message
  5. //业务代码

  6. $consumer->acknowledge($message);//ack应答,通知rabbitmq成功,删除对应任务
  7. // $consumer->reject($message);ack应答,通知rabbitmq失败,不删除对应任务


  8. //订阅消费者
  9. $fooConsumer = $context->createConsumer($fooQueue);

  10. $subscriptionConsumer = $context->createSubscriptionConsumer();
  11. $subscriptionConsumer->subscribe($fooConsumer, function(Message $message, Consumer $consumer) {
  12.     // process message
  13.     //业务代码
  14.     $consumer->acknowledge($message);//ack应答,通知rabbitmq成功,删除对应任务
  15.     // $consumer->reject($message);ack应答,通知rabbitmq失败,不删除对应任务

  16.     return true;
  17. });
  18. $subscriptionConsumer->consume();

  19. //清除队列消息
  20. $queueName = 'rabbitmq';
  21. $queue = $context->createQueue($queueName);
  22. $context->purgeQueue($queue);
复制代码
三:简单实现


1:发送消息
  1. //连接rabbitmq$factory = new AmqpConnectionFactory([    'host' => '192.168.6.88',    'port' => '5672',    'vhost' => '/',    'user' => 'admin',    'pass' => 'admin',    'persisted' => false,]); $context = $factory->createContext();//声明主题$exchangeName = 'exchange';$fooTopic = $context->createTopic($exchangeName);$fooTopic->setType(AmqpTopic::TYPE_FANOUT);$context->declareTopic($fooTopic); //声明队列$queueName = 'rabbitmq';$fooQueue = $context->createQueue($queueName);$fooQueue->addFlag(AmqpQueue::FLAG_DURABLE);$context->declareQueue($fooQueue); //将队列绑定到主题$context->bind(new AmqpBind($fooTopic, $fooQueue)); //发送消息到队列$message = $context->createMessage('Hello world!'); $context->createProducer()->send($fooQueue, $message);
复制代码
2:消费消息
  1. $factory = new AmqpConnectionFactory([
  2.     'host' => '192.168.6.88',
  3.     'port' => '5672',
  4.     'vhost' => '/',
  5.     'user' => 'admin',
  6.     'pass' => 'admin',
  7.     'persisted' => false,
  8. ]);
  9. $context = $factory->createContext();


  10. $queueName = 'rabbitmq';
  11. $fooQueue = $context->createQueue($queueName);



  12. $fooConsumer = $context->createConsumer($fooQueue);

  13. $subscriptionConsumer = $context->createSubscriptionConsumer();
  14. $subscriptionConsumer->subscribe($fooConsumer, function(Message $message, Consumer $consumer) {
  15.     // process message
  16.     //业务代码
  17.     $consumer->acknowledge($message);//ack应答,通知rabbitmq成功,删除对应任务
  18.     // $consumer->reject($message);ack应答,通知rabbitmq失败,不删除对应任务

  19.     return true;
  20. });
  21. $subscriptionConsumer->consume();
复制代码
到此这篇关于PHP使用enqueue/amqp-lib实现rabbitmq任务处理的文章就介绍到这了,更多相关PHP rabbitmq任务处理内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

1

主题

38

回帖

97

积分

注册会员

积分
97
发表于 2024-4-27 01:34:55 | 显示全部楼层
我完全同意你的观点
  • 打卡等级:即来则安
  • 打卡总天数:27
  • 打卡月天数:0
  • 打卡总奖励:424
  • 最近打卡:2024-08-20 23:24:38

6

主题

58

回帖

722

积分

高级会员

积分
722
发表于 2024-4-28 07:54:04 来自手机 | 显示全部楼层
谢谢楼主分享

1

主题

58

回帖

138

积分

注册会员

积分
138
发表于 2024-7-27 20:12:00 | 显示全部楼层
友善的讨论氛围是非常重要的。
  • 打卡等级:无名新人
  • 打卡总天数:1
  • 打卡月天数:0
  • 打卡总奖励:8
  • 最近打卡:2024-08-26 11:27:41

2

主题

33

回帖

119

积分

注册会员

积分
119
发表于 2024-8-8 21:17:21 | 显示全部楼层
能给个链接吗?我想深入了解一下。

0

主题

66

回帖

132

积分

注册会员

积分
132
发表于 2024-8-29 07:43:03 | 显示全部楼层
666666666666666666

6

主题

52

回帖

236

积分

中级会员

积分
236
发表于 2024-9-19 02:50:59 | 显示全部楼层
好用好用

1

主题

35

回帖

93

积分

注册会员

积分
93
发表于 2024-9-20 12:40:25 | 显示全部楼层
这个话题很有趣,我想多了解一些

1

主题

56

回帖

136

积分

注册会员

积分
136
发表于 2024-9-23 23:11:05 | 显示全部楼层
让我们一起努力

1

主题

53

回帖

129

积分

注册会员

积分
129
发表于 2024-9-25 23:35:28 | 显示全部楼层
保持尊重和礼貌对待其他成员是必要的。
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

手机版|小黑屋|爱云论坛 - d.taiji888.cn - 技术学习 免费资源分享 ( 蜀ICP备2022010826号 )|天天打卡

GMT+8, 2024-11-24 13:22 , Processed in 0.123076 second(s), 26 queries .

Powered by i云网络 Licensed

© 2023-2028 正版授权

快速回复 返回顶部 返回列表