swoole+curl_multi_init 批量推送QQ模板消息

swoole+curl_multi_init 多进程并发推送上万QQ模板消息

      最近完成了一个QQ小程序项目,有一个功能需要每天9点推送模板消息提醒用户打卡,考虑本地的用户量是600多万,所以不能按照普通的操作去发送,博主自行谷歌百度了相关内容,发现了swoole可以支持高并发多线程,所以博主自己研究、试验了两周时间,终于解决了这个问题,所以记录下代码,向大家分享如何使用swoole进行推送大批量的模板消息,关于swoole相关的技术文档,请问访问swoole官方文档

      在试验发送的过程中,博主采用的是curl方式去发送,效率也是极低,所以找到了替代的方案,利用的是支持并发方式的 curl_multi_init ,成功解决了效率低的问题,下面将向大家介绍代码的实现过程。


      一、代码实现


      我们先创建一个本地文件夹sendMessage,底下再创建一个文件夹,命名为log,用来存放日志,在log底下我们再分别创建error、ing、option、optionSql、over、start 六个文件夹,权限都是777,文件分布如下:

123.png

      然后创建我们的中间连接文件,命名为Client.php,代码如下:

<?php

/**
 * 开启入口
 * Created by PhpStorm.
 * User: pc001
 * Date: 2019/6/14
 * Time: 9:05
 */
class Client
{
    /**
     * 发送请求
     * @param $msg
     */
    public function send($msg)
    {
        $client = new swoole_client(SWOOLE_SOCK_TCP);
        //连接到服务器
        if (!$client->connect('127.0.0.1', 9502, 0.5)) {
            $this->writeError("connect failed.");
        }
        //向服务器发送数据
        if (!$client->send($msg)) {
            $date = date('Y-m-d', time());
            $this->writeError($date . ":请求失败");
        }
        //关闭连接
        $client->close();
    }

    /**
     * 记录错误日志
     * @param $str
     */
    private function writeError($str)
    {
        $path = "/www/wwwroot/projectCode/sendMessage/log/synClient.log";
        $str = "[" . date("Y-m-d H:i:s") . "]" . $str;
        $str .= PHP_EOL;
        file_put_contents($path, $str, FILE_APPEND);
    }
}

      

      然后创建我们的后台守护进程文件,专门用来接收数据,发送数据,命名为server.php,代码如下:

<?php
/**
 * 客户端
 * Created by PhpStorm.
 * User: pc001
 * Date: 2019/6/14
 * Time: 9:05
 */

//连接请求
$serv = new swoole_server("127.0.0.1", 9502);

$logDay = date('Y-m-d', time());
//设置异步任务的工作进程数量
$serv->set(array(
    'task_worker_num' => 25,  //task进程的数量
    'daemonize' => 1,         //以守护进程执行【后台执行】
    'task_ipc_mode' => 3,     //使用消息队列通信,并设置为争抢模式
    'worker_num' => 2,        //worker进程数,一般设置为服务器CPU数的1-4倍
    'reactor_num' => 8,
    'max_coro_num' => 8000,
    'log_file' => '/www/wwwroot/projectCode/sendMessage/log/error/'.$logDay.'_errorWork.log'   //错误日志打印
));

//监听数据接收事件
$serv->on('receive', function ($serv, $fd, $from_id, $data) {
    //任务开始记录日志
    writeWorkStart("任务开始");
    //投递异步任务
    $serv->task($data);//非阻塞
});

//处理异步任务
$serv->on('task', function ($serv, $task_id, $from_id, $data) {
    //记录接收数据
    writeWorkIng($data);
    #转化数据
    $sendData = json_decode($data, true);
    $accessToken = $sendData["accessToken"];   //accessToken
    $template_id = $sendData["template_id"];   //模版id
    $substance = $sendData["substance"];       //模版内容
    ############################    处理发送逻辑     ############################
    #获取当天日期
    $remindData = date('Y-m-d', time());
    $out_time = time() + 60 * 60;
    #连接数据库获取数据
    $selfCon = mysqli_connect("地址", "账户", "密码", "数据库名");
    #获取要发送数据
    $getDataSql = "SELECT `mini_app_user`.`uid`,MAX(`mini_app_from_id`.`from_id`) AS 
    `from_id` FROM `mini_app_user` left join `mini_app_from_id` ON 
    `mini_app_user`.`uid` = `mini_app_from_id`.`uid` WHERE 
    `mini_app_from_id`.`out_time` >='$out_time' AND 
    `mini_app_user`.`remind_date`!='$remindData' GROUP BY `mini_app_user`.`uid`";
    $getData = mysqli_query($selfCon, $getDataSql);
    #初始化数据
    $uidString = "";       //uid字符串
    $formIdString = "";    //formId字符串
    $paramsList = array(); //数据内容
    $index = 0;            //发送数量
    #遍历数据发送
    while ($row = mysqli_fetch_array($getData)) {
        $index += 1;   //自增+1,用于记录发送数量
        //获取基础数据
        $uid = $row["uid"];
        $formId = $row["from_id"];
        //组合发送模板消息主体
        $sendUserData = array(
            'access_token' => $accessToken,
            'touser' => $uid,
            'template_id' => $template_id,
            'form_id' => $formId,
            'data' => $substance,
        );
        //根据情况判断
        if (count($paramsList) < 20) {
            //组合数据
            $paramsList[] = $sendUserData;                          //发送数据体
            $uidString = $uidString . "'" . $uid . "',";            //批量修改用户UID
            $formIdString = $formIdString . "'" . $formId . "',";   //批量删除FormId
        } else {
            //去除字符串最后一个,
            $uidString = deleteLastStr($uidString);
            $formIdString = deleteLastStr($formIdString);
            //发送信息
            runPost($paramsList, count($paramsList), $accessToken);
            //更新用户、删除formId
            updateUserDate($selfCon, $uidString);
            deleteFormId($selfCon, $formIdString);
            //记录日志
            writeWorkOption($index);
            //初始化
            $uidString = "";
            $formIdString = "";
            $paramsList = array();
        }
    }
    //最后捡漏
    if (count($paramsList) > 0) {
        //去除字符串最后一个,
        $uidString = deleteLastStr($uidString);
        $formIdString = deleteLastStr($formIdString);
        //发送消息
        runPost($paramsList, count($paramsList), $accessToken);
        //更新用户、删除formId
        updateUserDate($selfCon, $uidString);
        deleteFormId($selfCon, $formIdString);
        //记录日志
        writeWorkOption($index);
        //初始化
        $uidString = "";
        $formIdString = "";
        $paramsList = array();
    }
    ############################    处理发送逻辑     ############################
    //返回任务执行的结果
    $serv->finish(array());

});

//处理异步任务的结果
$serv->on('finish', function ($serv, $task_id, $data) {
    //记录日志
    writeWorkOver("结束");
});


//开启
$serv->start();


/////////////////////////////    私有方法  /////////////////////////////

/**
 * 更新用户通知日期【批量更新】
 * @param $selfCon
 * @param $uid
 */
function updateUserDate($selfCon, $uid)
{
    //更新提醒日期
    $remind_date = date('Y-m-d', time());
    $updateDataSql = "UPDATE `mini_app_user` SET `remind_date` ='$remind_date' WHERE `uid` 
    IN ($uid) ";
    mysqli_query($selfCon, $updateDataSql);
    //记录本次操作语句
    writeWorkOptionSql($updateDataSql);
}

/**
 * 删除用户本次FormId【批量删除】
 * @param $selfCon
 * @param $formId
 */
function deleteFormId($selfCon, $formId)
{
    //去除formId
    $deleteSql = "DELETE FROM `mini_app_from_id` WHERE `from_id` IN ($formId)";
    mysqli_query($selfCon, $deleteSql);
    //记录本次操作语句
    writeWorkOptionSql($deleteSql);
}

/**
 * 去除最后一个字符串
 * @param $str
 * @return bool|string
 */
function deleteLastStr($str)
{
    $string = substr($str, 0, strlen($str) - 1);
    return $string;
}

///////////////////////////////     请求方式      ///////////////////////////////

/**
 * 并发请求发送模板消息【curl_multi_init方式发送】
 * @param $paramsList
 * @param $paramsListCount
 * @param $token
 */
function runPost($paramsList, $paramsListCount, $token)
{
    $chArr = [];
    for ($i = 0; $i < $paramsListCount - 1; $i++) {
        $paramsString = json_encode($paramsList[$i]);
        $chArr[$i] = curl_init("https://api.q.qq.com/api/json/template/send?access_token=" . 
        $token);
        curl_setopt($chArr[$i], CURLOPT_SSL_VERIFYPEER, FALSE);
        curl_setopt($chArr[$i], CURLOPT_SSL_VERIFYHOST, FALSE);
        curl_setopt($chArr[$i], CURLOPT_HEADER, FALSE);
        curl_setopt($chArr[$i], CURLOPT_RETURNTRANSFER, TRUE);
        // post数据
        curl_setopt($chArr[$i], CURLOPT_POST, 1);
        // post的变量
        curl_setopt($chArr[$i], CURLOPT_POSTFIELDS, $paramsString);
        curl_setopt($chArr[$i], CURLOPT_HTTPHEADER, array(
                'Content-Type: application/json; charset=utf-8',
                'Content-Length: ' . strlen($paramsString))
        );
    }
    $mh = curl_multi_init();
    foreach ($chArr as $k => $ch)

        curl_multi_add_handle($mh, $ch);

    $running = null;

    do {
        $mrc = curl_multi_exec($mh, $running);
    } while ($mrc == CURLM_CALL_MULTI_PERFORM);


    while ($running && $mrc == CURLM_OK) {
        if (curl_multi_select($mh) != -1) {
            do {
                $mrc = curl_multi_exec($mh, $running);
            } while ($mrc == CURLM_CALL_MULTI_PERFORM);

        }

    }
    foreach ($chArr as $k => $ch) {

        $result[$k] = curl_multi_getcontent($ch);

        curl_multi_remove_handle($mh, $ch);

    }
    curl_multi_close($mh);
}


//////////////////////////////  日志记录  //////////////////////////////

/**
 * 任务开始记录日志
 * @param $str
 */
function writeWorkStart($str)
{
    $day = date('Y-m-d');
    $path = "/www/wwwroot/projectCode/sendMessage/log/start/" . $day . "_workStart.log";
    $str = "[" . date("Y-m-d H:i:s") . "]" . $str . "\n";
    $str .= PHP_EOL;
    file_put_contents($path, $str, FILE_APPEND);
}

/**
 * 任务进行中记录日志
 * @param $str
 */
function writeWorkIng($str)
{
    $day = date('Y-m-d');
    $path = "/www/wwwroot/projectCode/sendMessage/log/ing/" . $day . "_workIng.log";
    $str = "[" . date("Y-m-d H:i:s") . "]" . $str . "\n";
    $str .= PHP_EOL;
    file_put_contents($path, $str, FILE_APPEND);
}

/**
 * 任务结束记录日志
 * @param $str
 */
function writeWorkOver($str)
{
    $day = date('Y-m-d');
    $path = "/www/wwwroot/projectCode/sendMessage/log/over/" . $day . "_workOver.log";
    $str = "[" . date("Y-m-d H:i:s") . "]" . $str . "\n";
    $str .= PHP_EOL;
    file_put_contents($path, $str, FILE_APPEND);
}

/**
 * 任务处理数据记录日志
 * @param $str
 */
function writeWorkOption($str)
{
    $day = date('Y-m-d');
    $path = "/www/wwwroot/projectCode/sendMessage/log/option/" . $day . "_workOption.log";
    $str = "[" . date("Y-m-d H:i:s") . "]" . $str . "\n";
    $str .= PHP_EOL;
    file_put_contents($path, $str, FILE_APPEND);
}

/**
 * 任务处理记录日志
 * @param $str
 */
function writeWorkOptionSql($str)
{
    $day = date('Y-m-d');
    $path = "/www/wwwroot/projectCode/sendMessage/log/optionSql/".$day."_workOptionSql.log";
    $str = "[" . date("Y-m-d H:i:s") . "]" . $str . "\n";
    $str .= PHP_EOL;
    file_put_contents($path, $str, FILE_APPEND);
}

      

      然后再创建启动入口文件,命名为start.php,代码如下:

<?php

/**
 * 调用文件
 * Created by PhpStorm.
 * User: pc001
 * Date: 2019/6/14
 * Time: 9:05
 */

include "Client.php";
#链接mysql
$con = mysqli_connect("地址", "账户", "密码", "数据库名");
#获取当前发送模板消息基本数据
$appId = "QQ小程序appId";
$appSecret = "QQ小程序Secret";
$template_id = "模板ID";
$substance = array(
    "keyword1" => array("value" => "情侣头像每日签到"),
    "keyword2" => array("value" => "您今日签到积分未领取,连续\n签到赚更多的积分,积分商城\n精美礼品等您来兑换"),
    "keyword3" => array("value" => date('Y-m-d', time())),
    "keyword4" => array("value" => "小声告诉你:每连签七天可得最\n高99元现金红包~")
);
#获取令牌
$getToken = getAccessToken($con, $appId, $appSecret);
if ($getToken["code"] != 200) {
    echo "获取令牌失败";
    exit();
}
$accessToken = $getToken["msg"]["access_token"];
$expiresTime = $getToken["msg"]["expiresTime"];
#组合数据
$params = array(
    'accessToken' => $accessToken,                           //accessToken
    'template_id' => $template_id,                           //模板ID
    'substance' => $substance,                               //模板内容
    'appid' => $appId,                                       //APPID
    'secret' => $appSecret,                                  //Secret
);
$msg = json_encode($params);
#请求发送
$client = new Client();
$client->send($msg);
echo "[" . date("Y-m-d H:i:s") . "]OK" . PHP_EOL;

////////////////////    私有方法     ////////////////////

/**
 * 获取令牌
 * @param $con
 * @param $appid
 * @param $secret
 * @return array
 */
function getAccessToken($con, $appid, $secret)
{
    //获取当前时间
    $getQqToken = curlGet("https://api.q.qq.com/api/getToken?grant_type=client_credential&appid=" . $appid . "&secret=" . $secret);
    if (!isset($getQqToken["errcode"]) || $getQqToken["errcode"] != 0) {
        return array(
            'code' => 404,
            'msg' => '获取token失败'
        );
    }
    #拿到数据
    $access_token = $getQqToken["access_token"];
    $expires_in = $getQqToken["expires_in"];
    //更新
    $value = array(
        'accessToken' => $access_token,
        'expiresTime' => time() + $expires_in - 600
    );
    $value = json_encode($value);
    $updateTokenSql = "UPDATE `mini_app_config` SET `value` = '$value' WHERE `desc` = '$appid'";
    mysqli_query($con, $updateTokenSql);
    //返回
    return array(
        'code' => 200,
        'msg' => array(
            'access_token' => $access_token,
            'expiresTime' => time() + $expires_in - 600
        ),
    );
}


/**
 * Get方式请求
 * @param $url
 * @return mixed
 */
function curlGet($url)
{
    // 初始化curl
    $ch = curl_init();
    // 设置超时
    curl_setopt($ch, CURLOPT_TIMEOUT, 60);
    curl_setopt($ch, CURLOPT_URL, $url);
    curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, FALSE);
    curl_setopt($ch, CURLOPT_SSL_VERIFYHOST, FALSE);
    curl_setopt($ch, CURLOPT_HEADER, FALSE);
    curl_setopt($ch, CURLOPT_RETURNTRANSFER, TRUE);
    // 运行curl,结果以jason形式返回
    $res = curl_exec($ch);
    curl_close($ch);
    // 取出数据
    $data = json_decode($res, true);
    //返回
    return $data;
}

      

      二、程序开启


      这样我们就成功的实现了代码,启动也是很方便,我们先启动后台守护程序,命令如下:

php server.php

再启动入口文件,命令如下:

php start.php

      在启动之前,记得打开9502端口,博主试了下,10分钟是发了2万3多模板消息,速度还是可以,这是日志记录如下:

1562651866143685.png

                         2.png

博主只用了9分50秒发送数是23616,如果再调高进程数,估计数量会提高一点。如果服务器给力,可以调高进程数,这就是博主利用swoole推送模板消息。


       三、关闭程序


       如果想要关掉这个脚本的话,先输入如下命令:

netstat -ntlp

找到运行9502端口的PID,然后执行以下命令:

kill -9 PID

然后再批量删除子进程,命令如下:

ps -ef | grep php| awk '{ print $2 }' |  xargs kill -9

这样就完成了关掉该程序。

       以上就是代码的实现,以及程序的开启以及关闭的相关教程。

0条评论

发表评论