swoole+curl_multi_init 批量推送QQ模板消息
swoole+curl_multi_init 多进程并发推送上万QQ模板消息
最近完成了一个QQ小程序项目,有一个功能需要每天9点推送模板消息提醒用户打卡,考虑本地的用户量是600多万,所以不能按照普通的操作去发送,博主自行谷歌百度了相关内容,发现了swoole可以支持高并发多线程,所以博主自己研究、试验了两周时间,终于解决了这个问题,所以记录下代码,向大家分享如何使用swoole进行推送大批量的模板消息,关于swoole相关的技术文档,请问访问swoole官方文档。
在试验发送的过程中,博主采用的是curl方式去发送,效率也是极低,所以找到了替代的方案,利用的是支持并发方式的 curl_multi_init ,成功解决了效率低的问题,下面将向大家介绍代码的实现过程。
一、代码实现
我们先创建一个本地文件夹sendMessage,底下再创建一个文件夹,命名为log,用来存放日志,在log底下我们再分别创建error、ing、option、optionSql、over、start 六个文件夹,权限都是777,文件分布如下:
然后创建我们的中间连接文件,命名为Client.php,代码如下:
<?php /** * 开启入口 * Created by PhpStorm. * User: pc001 * Date: 2019/6/14 * Time: 9:05 */ class Client { /** * 发送请求 * @param $msg */ public function send($msg) { $client = new swoole_client(SWOOLE_SOCK_TCP); //连接到服务器 if (!$client->connect('127.0.0.1', 9502, 0.5)) { $this->writeError("connect failed."); } //向服务器发送数据 if (!$client->send($msg)) { $date = date('Y-m-d', time()); $this->writeError($date . ":请求失败"); } //关闭连接 $client->close(); } /** * 记录错误日志 * @param $str */ private function writeError($str) { $path = "/www/wwwroot/projectCode/sendMessage/log/synClient.log"; $str = "[" . date("Y-m-d H:i:s") . "]" . $str; $str .= PHP_EOL; file_put_contents($path, $str, FILE_APPEND); } }
然后创建我们的后台守护进程文件,专门用来接收数据,发送数据,命名为server.php,代码如下:
<?php /** * 客户端 * Created by PhpStorm. * User: pc001 * Date: 2019/6/14 * Time: 9:05 */ //连接请求 $serv = new swoole_server("127.0.0.1", 9502); $logDay = date('Y-m-d', time()); //设置异步任务的工作进程数量 $serv->set(array( 'task_worker_num' => 25, //task进程的数量 'daemonize' => 1, //以守护进程执行【后台执行】 'task_ipc_mode' => 3, //使用消息队列通信,并设置为争抢模式 'worker_num' => 2, //worker进程数,一般设置为服务器CPU数的1-4倍 'reactor_num' => 8, 'max_coro_num' => 8000, 'log_file' => '/www/wwwroot/projectCode/sendMessage/log/error/'.$logDay.'_errorWork.log' //错误日志打印 )); //监听数据接收事件 $serv->on('receive', function ($serv, $fd, $from_id, $data) { //任务开始记录日志 writeWorkStart("任务开始"); //投递异步任务 $serv->task($data);//非阻塞 }); //处理异步任务 $serv->on('task', function ($serv, $task_id, $from_id, $data) { //记录接收数据 writeWorkIng($data); #转化数据 $sendData = json_decode($data, true); $accessToken = $sendData["accessToken"]; //accessToken $template_id = $sendData["template_id"]; //模版id $substance = $sendData["substance"]; //模版内容 ############################ 处理发送逻辑 ############################ #获取当天日期 $remindData = date('Y-m-d', time()); $out_time = time() + 60 * 60; #连接数据库获取数据 $selfCon = mysqli_connect("地址", "账户", "密码", "数据库名"); #获取要发送数据 $getDataSql = "SELECT `mini_app_user`.`uid`,MAX(`mini_app_from_id`.`from_id`) AS `from_id` FROM `mini_app_user` left join `mini_app_from_id` ON `mini_app_user`.`uid` = `mini_app_from_id`.`uid` WHERE `mini_app_from_id`.`out_time` >='$out_time' AND `mini_app_user`.`remind_date`!='$remindData' GROUP BY `mini_app_user`.`uid`"; $getData = mysqli_query($selfCon, $getDataSql); #初始化数据 $uidString = ""; //uid字符串 $formIdString = ""; //formId字符串 $paramsList = array(); //数据内容 $index = 0; //发送数量 #遍历数据发送 while ($row = mysqli_fetch_array($getData)) { $index += 1; //自增+1,用于记录发送数量 //获取基础数据 $uid = $row["uid"]; $formId = $row["from_id"]; //组合发送模板消息主体 $sendUserData = array( 'access_token' => $accessToken, 'touser' => $uid, 'template_id' => $template_id, 'form_id' => $formId, 'data' => $substance, ); //根据情况判断 if (count($paramsList) < 20) { //组合数据 $paramsList[] = $sendUserData; //发送数据体 $uidString = $uidString . "'" . $uid . "',"; //批量修改用户UID $formIdString = $formIdString . "'" . $formId . "',"; //批量删除FormId } else { //去除字符串最后一个, $uidString = deleteLastStr($uidString); $formIdString = deleteLastStr($formIdString); //发送信息 runPost($paramsList, count($paramsList), $accessToken); //更新用户、删除formId updateUserDate($selfCon, $uidString); deleteFormId($selfCon, $formIdString); //记录日志 writeWorkOption($index); //初始化 $uidString = ""; $formIdString = ""; $paramsList = array(); } } //最后捡漏 if (count($paramsList) > 0) { //去除字符串最后一个, $uidString = deleteLastStr($uidString); $formIdString = deleteLastStr($formIdString); //发送消息 runPost($paramsList, count($paramsList), $accessToken); //更新用户、删除formId updateUserDate($selfCon, $uidString); deleteFormId($selfCon, $formIdString); //记录日志 writeWorkOption($index); //初始化 $uidString = ""; $formIdString = ""; $paramsList = array(); } ############################ 处理发送逻辑 ############################ //返回任务执行的结果 $serv->finish(array()); }); //处理异步任务的结果 $serv->on('finish', function ($serv, $task_id, $data) { //记录日志 writeWorkOver("结束"); }); //开启 $serv->start(); ///////////////////////////// 私有方法 ///////////////////////////// /** * 更新用户通知日期【批量更新】 * @param $selfCon * @param $uid */ function updateUserDate($selfCon, $uid) { //更新提醒日期 $remind_date = date('Y-m-d', time()); $updateDataSql = "UPDATE `mini_app_user` SET `remind_date` ='$remind_date' WHERE `uid` IN ($uid) "; mysqli_query($selfCon, $updateDataSql); //记录本次操作语句 writeWorkOptionSql($updateDataSql); } /** * 删除用户本次FormId【批量删除】 * @param $selfCon * @param $formId */ function deleteFormId($selfCon, $formId) { //去除formId $deleteSql = "DELETE FROM `mini_app_from_id` WHERE `from_id` IN ($formId)"; mysqli_query($selfCon, $deleteSql); //记录本次操作语句 writeWorkOptionSql($deleteSql); } /** * 去除最后一个字符串 * @param $str * @return bool|string */ function deleteLastStr($str) { $string = substr($str, 0, strlen($str) - 1); return $string; } /////////////////////////////// 请求方式 /////////////////////////////// /** * 并发请求发送模板消息【curl_multi_init方式发送】 * @param $paramsList * @param $paramsListCount * @param $token */ function runPost($paramsList, $paramsListCount, $token) { $chArr = []; for ($i = 0; $i < $paramsListCount - 1; $i++) { $paramsString = json_encode($paramsList[$i]); $chArr[$i] = curl_init("https://api.q.qq.com/api/json/template/send?access_token=" . $token); curl_setopt($chArr[$i], CURLOPT_SSL_VERIFYPEER, FALSE); curl_setopt($chArr[$i], CURLOPT_SSL_VERIFYHOST, FALSE); curl_setopt($chArr[$i], CURLOPT_HEADER, FALSE); curl_setopt($chArr[$i], CURLOPT_RETURNTRANSFER, TRUE); // post数据 curl_setopt($chArr[$i], CURLOPT_POST, 1); // post的变量 curl_setopt($chArr[$i], CURLOPT_POSTFIELDS, $paramsString); curl_setopt($chArr[$i], CURLOPT_HTTPHEADER, array( 'Content-Type: application/json; charset=utf-8', 'Content-Length: ' . strlen($paramsString)) ); } $mh = curl_multi_init(); foreach ($chArr as $k => $ch) curl_multi_add_handle($mh, $ch); $running = null; do { $mrc = curl_multi_exec($mh, $running); } while ($mrc == CURLM_CALL_MULTI_PERFORM); while ($running && $mrc == CURLM_OK) { if (curl_multi_select($mh) != -1) { do { $mrc = curl_multi_exec($mh, $running); } while ($mrc == CURLM_CALL_MULTI_PERFORM); } } foreach ($chArr as $k => $ch) { $result[$k] = curl_multi_getcontent($ch); curl_multi_remove_handle($mh, $ch); } curl_multi_close($mh); } ////////////////////////////// 日志记录 ////////////////////////////// /** * 任务开始记录日志 * @param $str */ function writeWorkStart($str) { $day = date('Y-m-d'); $path = "/www/wwwroot/projectCode/sendMessage/log/start/" . $day . "_workStart.log"; $str = "[" . date("Y-m-d H:i:s") . "]" . $str . "\n"; $str .= PHP_EOL; file_put_contents($path, $str, FILE_APPEND); } /** * 任务进行中记录日志 * @param $str */ function writeWorkIng($str) { $day = date('Y-m-d'); $path = "/www/wwwroot/projectCode/sendMessage/log/ing/" . $day . "_workIng.log"; $str = "[" . date("Y-m-d H:i:s") . "]" . $str . "\n"; $str .= PHP_EOL; file_put_contents($path, $str, FILE_APPEND); } /** * 任务结束记录日志 * @param $str */ function writeWorkOver($str) { $day = date('Y-m-d'); $path = "/www/wwwroot/projectCode/sendMessage/log/over/" . $day . "_workOver.log"; $str = "[" . date("Y-m-d H:i:s") . "]" . $str . "\n"; $str .= PHP_EOL; file_put_contents($path, $str, FILE_APPEND); } /** * 任务处理数据记录日志 * @param $str */ function writeWorkOption($str) { $day = date('Y-m-d'); $path = "/www/wwwroot/projectCode/sendMessage/log/option/" . $day . "_workOption.log"; $str = "[" . date("Y-m-d H:i:s") . "]" . $str . "\n"; $str .= PHP_EOL; file_put_contents($path, $str, FILE_APPEND); } /** * 任务处理记录日志 * @param $str */ function writeWorkOptionSql($str) { $day = date('Y-m-d'); $path = "/www/wwwroot/projectCode/sendMessage/log/optionSql/".$day."_workOptionSql.log"; $str = "[" . date("Y-m-d H:i:s") . "]" . $str . "\n"; $str .= PHP_EOL; file_put_contents($path, $str, FILE_APPEND); }
然后再创建启动入口文件,命名为start.php,代码如下:
<?php /** * 调用文件 * Created by PhpStorm. * User: pc001 * Date: 2019/6/14 * Time: 9:05 */ include "Client.php"; #链接mysql $con = mysqli_connect("地址", "账户", "密码", "数据库名"); #获取当前发送模板消息基本数据 $appId = "QQ小程序appId"; $appSecret = "QQ小程序Secret"; $template_id = "模板ID"; $substance = array( "keyword1" => array("value" => "情侣头像每日签到"), "keyword2" => array("value" => "您今日签到积分未领取,连续\n签到赚更多的积分,积分商城\n精美礼品等您来兑换"), "keyword3" => array("value" => date('Y-m-d', time())), "keyword4" => array("value" => "小声告诉你:每连签七天可得最\n高99元现金红包~") ); #获取令牌 $getToken = getAccessToken($con, $appId, $appSecret); if ($getToken["code"] != 200) { echo "获取令牌失败"; exit(); } $accessToken = $getToken["msg"]["access_token"]; $expiresTime = $getToken["msg"]["expiresTime"]; #组合数据 $params = array( 'accessToken' => $accessToken, //accessToken 'template_id' => $template_id, //模板ID 'substance' => $substance, //模板内容 'appid' => $appId, //APPID 'secret' => $appSecret, //Secret ); $msg = json_encode($params); #请求发送 $client = new Client(); $client->send($msg); echo "[" . date("Y-m-d H:i:s") . "]OK" . PHP_EOL; //////////////////// 私有方法 //////////////////// /** * 获取令牌 * @param $con * @param $appid * @param $secret * @return array */ function getAccessToken($con, $appid, $secret) { //获取当前时间 $getQqToken = curlGet("https://api.q.qq.com/api/getToken?grant_type=client_credential&appid=" . $appid . "&secret=" . $secret); if (!isset($getQqToken["errcode"]) || $getQqToken["errcode"] != 0) { return array( 'code' => 404, 'msg' => '获取token失败' ); } #拿到数据 $access_token = $getQqToken["access_token"]; $expires_in = $getQqToken["expires_in"]; //更新 $value = array( 'accessToken' => $access_token, 'expiresTime' => time() + $expires_in - 600 ); $value = json_encode($value); $updateTokenSql = "UPDATE `mini_app_config` SET `value` = '$value' WHERE `desc` = '$appid'"; mysqli_query($con, $updateTokenSql); //返回 return array( 'code' => 200, 'msg' => array( 'access_token' => $access_token, 'expiresTime' => time() + $expires_in - 600 ), ); } /** * Get方式请求 * @param $url * @return mixed */ function curlGet($url) { // 初始化curl $ch = curl_init(); // 设置超时 curl_setopt($ch, CURLOPT_TIMEOUT, 60); curl_setopt($ch, CURLOPT_URL, $url); curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, FALSE); curl_setopt($ch, CURLOPT_SSL_VERIFYHOST, FALSE); curl_setopt($ch, CURLOPT_HEADER, FALSE); curl_setopt($ch, CURLOPT_RETURNTRANSFER, TRUE); // 运行curl,结果以jason形式返回 $res = curl_exec($ch); curl_close($ch); // 取出数据 $data = json_decode($res, true); //返回 return $data; }
二、程序开启
这样我们就成功的实现了代码,启动也是很方便,我们先启动后台守护程序,命令如下:
php server.php
再启动入口文件,命令如下:
php start.php
在启动之前,记得打开9502端口,博主试了下,10分钟是发了2万3多模板消息,速度还是可以,这是日志记录如下:
博主只用了9分50秒发送数是23616,如果再调高进程数,估计数量会提高一点。如果服务器给力,可以调高进程数,这就是博主利用swoole推送模板消息。
三、关闭程序
如果想要关掉这个脚本的话,先输入如下命令:
netstat -ntlp
找到运行9502端口的PID,然后执行以下命令:
kill -9 PID
然后再批量删除子进程,命令如下:
ps -ef | grep php| awk '{ print $2 }' | xargs kill -9
这样就完成了关掉该程序。
以上就是代码的实现,以及程序的开启以及关闭的相关教程。
0条评论