hyperf2.1框架使用rabbitmq推送订阅邮件发送功能流程步骤
本篇文章是接上一篇,我的这个2.1版本默认已经安装好了AMQP组件了,直接用rabbitmq就OK了
环境:
centos7
rabbitmq服务器
https://www.zongscan.com/demo333/89368.html
hyperf2.1
hyperf-ext/mail组件包
https://www.zongscan.com/demo333/1306.html
进入步骤:
创建生产者
[[email protected] hyperf-skeleton]# php bin/hyperf.php gen:amqp-producer DemoProducer
[DEBUG] Event Hyperf\Framework\Event\BootApplication handled by Hyperf\Config\Listener\RegisterPropertyHandlerListener listener.
[DEBUG] Event Hyperf\Framework\Event\BootApplication handled by Hyperf\Paginator\Listener\PageResolverListener listener.
[DEBUG] Event Hyperf\Framework\Event\BootApplication handled by Hyperf\ExceptionHandler\Listener\ExceptionHandlerListener listener.
[DEBUG] Event Hyperf\Framework\Event\BootApplication handled by Hyperf\DbConnection\Listener\RegisterConnectionResolverListener listener.
App\Amqp\Producer\DemoProducer created successfully.
代码:直接从官网手册上复制过来
<?php
declare(strict_types=1);
namespace App\Amqp\Producer;
use Hyperf\Amqp\Annotation\Producer;
use Hyperf\Amqp\Message\ProducerMessage;
/**
* @Producer(exchange="hyperf", routingKey="hyperf")
*/
class DemoProducer extends ProducerMessage
{
public function __construct($data)
{
$this->payload = $data;
}
}
创建消费者
[[email protected] hyperf-skeleton]# php bin/hyperf.php gen:amqp-consumer DemoConsumer
[DEBUG] Event Hyperf\Framework\Event\BootApplication handled by Hyperf\Config\Listener\RegisterPropertyHandlerListener listener.
[DEBUG] Event Hyperf\Framework\Event\BootApplication handled by Hyperf\Paginator\Listener\PageResolverListener listener.
[DEBUG] Event Hyperf\Framework\Event\BootApplication handled by Hyperf\ExceptionHandler\Listener\ExceptionHandlerListener listener.
[DEBUG] Event Hyperf\Framework\Event\BootApplication handled by Hyperf\DbConnection\Listener\RegisterConnectionResolverListener listener.
App\Amqp\Consumer\DemoConsumer created successfully.
代码:
<?php
declare(strict_types=1);
namespace App\Amqp\Consumer;
use Hyperf\Amqp\Result;
use Hyperf\Amqp\Annotation\Consumer;
use Hyperf\Amqp\Message\ConsumerMessage;
use PhpAmqpLib\Message\AMQPMessage;
use App\Mail\subs;
use HyperfExt\Mail\Mail;
/**
* @Consumer(exchange="hyperf", routingKey="hyperf", queue="hyperf", name ="DemoConsumer", nums=1)
*/
class DemoConsumer extends ConsumerMessage
{
public function consumeMessage($data, AMQPMessage $message): string
{
var_dump('消费者正在消费数据:' . $data);
$msg = \GuzzleHttp\json_decode($data,true);
switch ($msg['type']) {
case 1 :
//echo $msg['type'];
//发邮件
Mail::to($msg['data']['email'])->send(new subs());
break;
case 2 :
break;
default:
echo 666;
}
return Result::ACK;
}
}
配置文件:
hyperf\config\autoload\amqp.php
return [
'default' => [
'host' => '192.168.1.98',
'port' => 5672,
'user' => 'admin',
'password' => 'admin',
'vhost' => '/',
注意:host填写ip地址
新建邮件可邮寄类:
hyperf\app\Mail\subs.php
<?php
declare(strict_types=1);
namespace App\Mail;
use HyperfExt\Contract\ShouldQueue;
use HyperfExt\Mail\Mailable;
use Hyperf\DbConnection\Db;
class subs extends Mailable implements ShouldQueue
{
/**
* 创建一个消息实例。
* @return void
*/
public function __construct(){}
/**
* Build the message.
*/
public function build()
{
$str = '';
$newart = Db::table('art')->where('is_state',0)->where('is_del',1)->orderBy('pubtime','desc')->take(2)->pluck('title','art_id');
foreach($newart as $k=>$n) {
$str .= '<p><a href="https://blog.zongscan.com/art/'.$k.'">'.$n.'</a></p>';
}
//邮箱推送文章模板
$html1 = <<<ht
<p>Hi,<em style="font-weight: 700;">你好 </em>,本周推送最新两篇文章</p>
{$str}
ht;
return $this
->subject('ZONGSCAN-本周推送文章')
->htmlBody($html1);
}
}
//订阅功能:(这个功能我之前是有了的,现在改成rabbitmq的方式)
这个就不贴代码了,主要的逻辑是修改用户订阅状态
推送rabbit测试控制器 测试给订阅用户发送邮件
use Hyperf\Amqp\Producer;
use App\Amqp\Producer\DemoProducer;
use Hyperf\Utils\ApplicationContext;
//推送rabbit
public function torabbitmsg()
{
$user = Db::table('user')->select('user_id','username','email','subscribe')->where('subscribe',1)->get();
$msg = array('type' => 1, 'rabbit_msg' => 'rabbit订阅邮件推送', 'rabbit_time' => date("Y-m-d H:i:s", time()));
foreach($user as $v){
$msg['data']['user_id'] = $v->user_id;
$msg['data']['username'] = $v->username;
$msg['data']['email'] = $v->email;
//将消息推送给生产者
$message = new DemoProducer(\GuzzleHttp\json_encode($msg));
//获取生产者的一个实例
$producer = ApplicationContext::getContainer()->get(Producer::class);
//传递消息
try {
$producer->produce($message);
} catch (\Exception $exception) {
throw new \Swoole\Exception($exception->getMessage());
}
}
return ['msg' => 'rabbit订阅邮件推送', 'time' => date('Y-m-d H:i:s', time()),];
}
到此,流程步骤就撸完了,启动跑一下
hyperf客户端截图:
string(212) "消费者正在消费数据:{"type":1,"rabbit_msg":"rabbit\u8ba2\u9605\u90ae\u4ef6\u63a8\u9001","rabbit_time":"2021-08-31 13:51:31","data":{"user_id":1,"username":"\u7ba1\u7406\u5458","email":"[email protected]"}}"
Array
(
[type] => 1
[rabbit_msg] => rabbit订阅邮件推送
[rabbit_time] => 2021-08-31 13:51:31
[data] => Array
(
[user_id] => 1
[username] => 管理员
[email] => [email protected]
)
)
1[DEBUG] 1 acked.
string(210) "消费者正在消费数据:{"type":1,"rabbit_msg":"rabbit\u8ba2\u9605\u90ae\u4ef6\u63a8\u9001","rabbit_time":"2021-08-31 13:51:31","data":{"user_id":21,"username":"houtizong","email":"[email protected]"}}"
Array
(
[type] => 1
[rabbit_msg] => rabbit订阅邮件推送
[rabbit_time] => 2021-08-31 13:51:31
[data] => Array
(
[user_id] => 21
[username] => houtizong
[email] => [email protected]
)
)
1[DEBUG] 2 acked.
效果图:
rabbitmq中:
注:hyperf中消费者进程会在第一次启动的时候主动去mq消费队列中拉取信息,后面通信就是已建立长连接了,由mq服务端自动推送队列中的数据给消费者去消费
相关文章