V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
onanying
V2EX  ›  PHP

使用 mixphp 打造多进程异步邮件发送

  •  
  •   onanying · 2018-07-06 13:49:49 +08:00 · 1700 次点击
    这是一个创建于 2092 天前的主题,其中的信息可能已经有所发展或是发生改变。

    邮件发送是很常见的需求,由于发送邮件的操作一般是比较耗时的,所以我们一般采用异步处理来提升用户体验,而异步通常我们使用消息队列来实现。

    传统 MVC 框架由于缺少多进程开发能力,通常是采用同一个脚本执行多次,产生多个进程的方式,mixphp 封装了 TaskExecutor 专用于多进程开发,用户能非常简单的开发出功能完善的高可用多进程应用。

    下面演示一个异步邮件发送系统的开发过程,涉及知识点:

    • 异步
    • 消息队列
    • 多进程
    • 守护进程

    如何使用消息队列实现异步

    PHP 使用消息队列通常是使用中间件来实现,常用的消息中间件有:

    • redis
    • rabbitmq
    • kafka

    本次我们选用 redis 来实现异步邮件发送,redis 的数据类型中有一个 list 类型,可实现消息队列,使用以下命令:

    // 入列
    $redis->lpush($key, $data);
    // 出列
    $data = $redis->rpop($key);
    // 阻塞出列
    $data = $redis->brpop($key, 10);
    

    架构设计

    本实例由传统 MVC 框架投递邮件发送需求,MixPHP 多进程执行发送任务。

    邮件发送库选型

    以往我们通常使用框架提供的邮件发送库,或者网上下载别的用户分享的库,composer 出现后,https://packagist.org/ 上有大量优质的库,我们只需选择一个最好的即可,本例选择 swiftmailer。

    由于发送任务是由 MixPHP 执行,所以 swiftmailer 是安装在 MixPHP 项目中,在项目根目录中执行以下命令安装:

    composer require swiftmailer/swiftmailer
    

    生产者开发

    在邮件发送这个需求中生产者是指投递发送任务的一方,这一方通常是一个接口或网页,这个部分并不一定需 mixphp 开发,TP、CI、YII 这些都可以,只需在接口或网页中把任务信息投递到消息队列中即可。

    在传统 MVC 框架的控制器中增加如下代码:

    通常框架中使用 redis 会安装一个类库来使用,本例使用原生代码,便于理解。

    // 连接
    $redis = new \Redis();
    if (!$redis->connect('127.0.0.1', 6379)) {
        throw new \Exception('Redis Connect Failure');
    }
    $redis->auth('');
    $redis->select(0);
    // 投递任务
    $data = [
        'to'      => ['***@qq.com' => 'A name'],
        'body'    => 'Here is the message itself',
        'subject' => 'The title content',
    ];
    $redis->lpush('queue:email', serialize($data));
    

    通常异步开发中,投递完成后就会立即响应一个消息给用户,当然此时该任务并没有执行。

    消费者开发

    本例我们使用 MixPHP 的多进程开发工具 TaskExecutor 来完成这个需求,通常使用常驻进程来处理队列的消费,所以我们使用 TaskExecutor 的 TYPE_DAEMON 类型,MODE_PUSH 模式。

    TaskExecutor 的 MODE_PUSH 模式有二种进程:

    • 左进程:负责从消息队列取出任务数据,投放给中进程。
    • 中进程:负责执行邮件发送任务。

    PushCommand.php 代码如下:

    <?php
    
    namespace apps\daemon\commands;
    
    use mix\console\ExitCode;
    use mix\facades\Input;
    use mix\facades\Redis;
    use mix\task\CenterProcess;
    use mix\task\LeftProcess;
    use mix\task\TaskExecutor;
    
    /**
     * 推送模式范例
     * @author 刘健 <[email protected]>
     */
    class PushCommand extends BaseCommand
    {
    
        // 配置信息
        const HOST = 'smtpdm.aliyun.com';
        const PORT = 465;
        const SECURITY = 'ssl';
        const USERNAME = '****@email.***.com';
        const PASSWORD = '****';
    
        // 初始化事件
        public function onInitialize()
        {
            parent::onInitialize(); // TODO: Change the autogenerated stub
            // 获取程序名称
            $this->programName = Input::getCommandName();
            // 设置 pidfile
            $this->pidFile = "/var/run/{$this->programName}.pid";
        }
    
        /**
         * 获取服务
         * @return TaskExecutor
         */
        public function getTaskService()
        {
            return create_object(
                [
                    // 类路径
                    'class'         => 'mix\task\TaskExecutor',
                    // 服务名称
                    'name'          => "mix-daemon: {$this->programName}",
                    // 执行类型
                    'type'          => \mix\task\TaskExecutor::TYPE_DAEMON,
                    // 执行模式
                    'mode'          => \mix\task\TaskExecutor::MODE_PUSH,
                    // 左进程数
                    'leftProcess'   => 1,
                    // 中进程数
                    'centerProcess' => 5,
                    // 任务超时时间 (秒)
                    'timeout'       => 5,
                ]
            );
        }
    
        // 启动
        public function actionStart()
        {
            // 预处理
            if (!parent::actionStart()) {
                return ExitCode::UNSPECIFIED_ERROR;
            }
            // 启动服务
            $service = $this->getTaskService();
            $service->on('LeftStart', [$this, 'onLeftStart']);
            $service->on('CenterStart', [$this, 'onCenterStart']);
            $service->start();
            // 返回退出码
            return ExitCode::OK;
        }
    
        // 左进程启动事件回调函数
        public function onLeftStart(LeftProcess $worker)
        {
            try {
                // 模型内使用长连接版本的数据库组件,这样组件会自动帮你维护连接不断线
                $queueModel = Redis::getInstance();
                // 保持任务执行状态,循环结束后当前进程会退出,主进程会重启一个新进程继续执行任务,这样做是为了避免长时间执行内存溢出
                for ($j = 0; $j < 16000; $j++) {
                    // 从消息队列中间件阻塞获取一条消息
                    $data = $queueModel->brpop('queue:email', 10);
                    if (empty($data)) {
                        continue;
                    }
                    list(, $data) = $data;
                    // 将消息推送给中进程去处理,push 有长度限制 ( https://wiki.swoole.com/wiki/page/290.html)
                    $worker->push($data, false);
                }
            } catch (\Exception $e) {
                // 休息一会,避免 CPU 出现 100%
                sleep(1);
                // 抛出错误
                throw $e;
            }
        }
    
        // 中进程启动事件回调函数
        public function onCenterStart(CenterProcess $worker)
        {
            // 保持任务执行状态,循环结束后当前进程会退出,主进程会重启一个新进程继续执行任务,这样做是为了避免长时间执行内存溢出
            for ($j = 0; $j < 16000; $j++) {
                // 从进程消息队列中抢占一条消息
                $data = $worker->pop();
                if (empty($data)) {
                    continue;
                }
                // 处理消息
                try {
                    // 处理消息,比如:发送短信、发送邮件、微信推送
                    var_dump($data);
                    $ret = self::sendEmail($data);
                    var_dump($ret);
                } catch (\Exception $e) {
                    // 回退数据到消息队列
                    $worker->rollback($data);
                    // 休息一会,避免 CPU 出现 100%
                    sleep(1);
                    // 抛出错误
                    throw $e;
                }
            }
        }
    
        // 发送邮件
        public static function sendEmail($data)
        {
            // Create the Transport
            $transport = (new \Swift_SmtpTransport(self::HOST, self::PORT, self::SECURITY))
                ->setUsername(self::USERNAME)
                ->setPassword(self::PASSWORD);
            // Create the Mailer using your created Transport
            $mailer = new \Swift_Mailer($transport);
            // Create a message
            $message = (new \Swift_Message($data['subject']))
                ->setFrom([self::USERNAME => '**网'])
                ->setTo($data['to'])
                ->setBody($data['body']);
            // Send the message
            $result = $mailer->send($message);
            return $result;
        }
    
    }
    

    测试

    1. 在 shell 中启动 push 常驻程序。
    [root@localhost bin]# ./mix-daemon push start
    mix-daemon 'push' start successed.
    
    1. 调用接口往消息队列投放任务。

    此时 shell 终端将打印:

    图 1

    成功收到测试邮件:

    图 2

    MixPHP

    GitHub: https://github.com/mixstart/mixphp 官网: http://www.mixphp.cn/

    目前尚无回复
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   我们的愿景   ·   实用小工具   ·   3221 人在线   最高记录 6543   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 28ms · UTC 11:39 · PVG 19:39 · LAX 04:39 · JFK 07:39
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.