hyperf2.1框架使用rabbitmq推送订阅邮件发送功能流程步骤

2023-06-01 00:00:00 框架 推送 邮件发送

本篇文章是接上一篇,我的这个2.1版本默认已经安装好了AMQP组件了,直接用rabbitmq就OK了

环境:

centos7

rabbitmq服务器 
https://www.zongscan.com/demo333/89368.html

hyperf2.1

hyperf-ext/mail组件包 
https://www.zongscan.com/demo333/1306.html


进入步骤:


创建生产者

[[email protected] hyperf-skeleton]# php bin/hyperf.php gen:amqp-producer DemoProducer
[DEBUG] Event Hyperf\Framework\Event\BootApplication handled by Hyperf\Config\Listener\RegisterPropertyHandlerListener listener.
[DEBUG] Event Hyperf\Framework\Event\BootApplication handled by Hyperf\Paginator\Listener\PageResolverListener listener.
[DEBUG] Event Hyperf\Framework\Event\BootApplication handled by Hyperf\ExceptionHandler\Listener\ExceptionHandlerListener listener.
[DEBUG] Event Hyperf\Framework\Event\BootApplication handled by Hyperf\DbConnection\Listener\RegisterConnectionResolverListener listener.
App\Amqp\Producer\DemoProducer created successfully.

代码:直接从官网手册上复制过来

<?php
declare(strict_types=1);
namespace App\Amqp\Producer;

use Hyperf\Amqp\Annotation\Producer;
use Hyperf\Amqp\Message\ProducerMessage;

/**
 * @Producer(exchange="hyperf", routingKey="hyperf")
 */
class DemoProducer extends ProducerMessage
{
    public function __construct($data)
    {
        $this->payload = $data;
    }
}

创建消费者

[[email protected] hyperf-skeleton]# php bin/hyperf.php gen:amqp-consumer DemoConsumer
[DEBUG] Event Hyperf\Framework\Event\BootApplication handled by Hyperf\Config\Listener\RegisterPropertyHandlerListener listener.
[DEBUG] Event Hyperf\Framework\Event\BootApplication handled by Hyperf\Paginator\Listener\PageResolverListener listener.
[DEBUG] Event Hyperf\Framework\Event\BootApplication handled by Hyperf\ExceptionHandler\Listener\ExceptionHandlerListener listener.
[DEBUG] Event Hyperf\Framework\Event\BootApplication handled by Hyperf\DbConnection\Listener\RegisterConnectionResolverListener listener.
App\Amqp\Consumer\DemoConsumer created successfully.

代码:

<?php
declare(strict_types=1);
namespace App\Amqp\Consumer;

use Hyperf\Amqp\Result;
use Hyperf\Amqp\Annotation\Consumer;
use Hyperf\Amqp\Message\ConsumerMessage;
use PhpAmqpLib\Message\AMQPMessage;

use App\Mail\subs;
use HyperfExt\Mail\Mail;

/**
 * @Consumer(exchange="hyperf", routingKey="hyperf", queue="hyperf", name ="DemoConsumer", nums=1)
 */
class DemoConsumer extends ConsumerMessage
{
    public function consumeMessage($data, AMQPMessage $message): string
    {
        var_dump('消费者正在消费数据:' . $data);
       
        $msg = \GuzzleHttp\json_decode($data,true);
        switch ($msg['type']) {
            case 1 :
                //echo $msg['type'];
                //发邮件
                Mail::to($msg['data']['email'])->send(new subs());
                break;
            case 2 :
                break;
            default:
                echo 666;
        }
        return Result::ACK;
    }
}


配置文件:

hyperf\config\autoload\amqp.php

return [
    'default' => [
        'host' => '192.168.1.98',
        'port' => 5672,
        'user' => 'admin',
        'password' => 'admin',
        'vhost' => '/',

注意:host填写ip地址


新建邮件可邮寄类:

hyperf\app\Mail\subs.php

<?php
declare(strict_types=1);
namespace App\Mail;
use HyperfExt\Contract\ShouldQueue;
use HyperfExt\Mail\Mailable;
use Hyperf\DbConnection\Db;
class subs extends Mailable implements ShouldQueue
{
    /**
     * 创建一个消息实例。
     * @return void
     */
    public function __construct(){}
    /**
     * Build the message.
     */
    public function build()
    {
        $str = '';
        $newart = Db::table('art')->where('is_state',0)->where('is_del',1)->orderBy('pubtime','desc')->take(2)->pluck('title','art_id');
        foreach($newart as $k=>$n) {
            $str .= '<p><a href="https://blog.zongscan.com/art/'.$k.'">'.$n.'</a></p>';
        }
        //邮箱推送文章模板
        $html1 = <<<ht
    <p>Hi,<em style="font-weight: 700;">你好 </em>,本周推送最新两篇文章</p>
    {$str}
ht;
        return $this
            ->subject('ZONGSCAN-本周推送文章')
            ->htmlBody($html1);
    }
}


//订阅功能:(这个功能我之前是有了的,现在改成rabbitmq的方式)

这个就不贴代码了,主要的逻辑是修改用户订阅状态

订阅按钮.png

推送rabbit测试控制器 测试给订阅用户发送邮件

use Hyperf\Amqp\Producer;
use App\Amqp\Producer\DemoProducer;
use Hyperf\Utils\ApplicationContext;

//推送rabbit
public function torabbitmsg()
{
    $user = Db::table('user')->select('user_id','username','email','subscribe')->where('subscribe',1)->get();
    $msg = array('type' => 1, 'rabbit_msg' =>  'rabbit订阅邮件推送', 'rabbit_time' => date("Y-m-d H:i:s", time()));
    foreach($user as $v){
      $msg['data']['user_id'] = $v->user_id;
      $msg['data']['username'] = $v->username;
      $msg['data']['email'] = $v->email;
      //将消息推送给生产者
      $message = new DemoProducer(\GuzzleHttp\json_encode($msg));
      //获取生产者的一个实例
      $producer = ApplicationContext::getContainer()->get(Producer::class);
      //传递消息
      try {
          $producer->produce($message);
      } catch (\Exception $exception) {
          throw new \Swoole\Exception($exception->getMessage());
        }
      }
      return ['msg' => 'rabbit订阅邮件推送', 'time' => date('Y-m-d H:i:s', time()),];
}

到此,流程步骤就撸完了,启动跑一下

测试mq.png

hyperf客户端截图:

string(212) "消费者正在消费数据:{"type":1,"rabbit_msg":"rabbit\u8ba2\u9605\u90ae\u4ef6\u63a8\u9001","rabbit_time":"2021-08-31 13:51:31","data":{"user_id":1,"username":"\u7ba1\u7406\u5458","email":"[email protected]"}}"
Array
(
    [type] => 1
    [rabbit_msg] => rabbit订阅邮件推送
    [rabbit_time] => 2021-08-31 13:51:31
    [data] => Array
        (
            [user_id] => 1
            [username] => 管理员
            [email] => [email protected]
        )
)
1[DEBUG] 1 acked.
string(210) "消费者正在消费数据:{"type":1,"rabbit_msg":"rabbit\u8ba2\u9605\u90ae\u4ef6\u63a8\u9001","rabbit_time":"2021-08-31 13:51:31","data":{"user_id":21,"username":"houtizong","email":"[email protected]"}}"
Array
(
    [type] => 1
    [rabbit_msg] => rabbit订阅邮件推送
    [rabbit_time] => 2021-08-31 13:51:31
    [data] => Array
        (
            [user_id] => 21
            [username] => houtizong
            [email] => [email protected]
        )
)
1[DEBUG] 2 acked.

效果图:

qq邮件.png

钉钉邮件.png


rabbitmq中:

mq.png

mq1.png

注:hyperf中消费者进程会在第一次启动的时候主动去mq消费队列中拉取信息,后面通信就是已建立长连接了,由mq服务端自动推送队列中的数据给消费者去消费


相关文章