系统异步任务

系统异步任务是 ThinkAdmin 的一大特色,实现了 windowslinux 平台多进程并列执行任务,大大提升了任务处理能力。

监听进程 LISTEN 启动之后,理论上会每隔 0.5 秒扫描一次数据表,当扫描到有需要执行的任务时会创建独立 PHP-CLI 进程。

特别注意: 不能同时启动多个监听进程 LISTEN 以防止重复执行任务,建议使用 START 指令来启动监听 LISTEN 进程。

任务主进程会在 Linux 中定时执行 ps ax 指令,用来检查指定进程是否运行,一些服务器安全工具可能会提示安全问题,忽略即可。

新版本任务执行需要依赖 symfony/process 组件,运行环境~~~~必需启用 exec | proc_open | posix_kill 等函数。

守护进程管理

  • 创建定时任务去执行检测监听进程 LISTEN 的指令 START,当发现监听进程 LISTEN 未运行时会自动创建;
  • 如果是 linux 服务器,建议指定用户运行,防止 clicgi | fpm 三种运行方式的缓存文件权限冲突,排错时需检查日志;
  • 建议指定用户运行 sudo -u www php think xadmin:queue start,可以直接在系统后台的任务管理复制执行指令再配置到定时任务;
  • 任务进程创建是通过 PHP 函数 exec 实现,所以函数 exec 不能被禁用,当执行异常时请根据错误提示对 PHP 运行环境进行调整;

进程管理相关指令

  • 执行 php think xadmin:queue stop # 停止所有正在执行的异步任务进程
  • 执行 php think xadmin:queue query # 查询当前所有正在执行的任务进程
  • 执行 php think xadmin:queue start # 开启异步任务守护进程(后台进程)
  • 执行 php think xadmin:queue listen # 启动异步任务监听进程(前台进程)
  • 执行 php think xadmin:queue status # 查看当前守护进程的后台运行状态

创建异步任务

可以使用 sysqueue() 函数或在控制器中使用 $this->_queue() 创建任务队列。

// 直接使用函数创建任务,获取到任务编号
$code = sysqueue($title, $command, $later = 0, $data = [], $rscript = 1, $loops = 0);

// 在控制器中创建任务,获取到任务编号
$queue = $this->_queue($title, $command, $later = 0, $data = [], $rscript = 0, $loops = 0);
$code = $queue->code; 

// 如果要在任务中创建新任务,需要注意创建新的任务对象哦,不然会导致原任务异常
$queue = QueueService::instance([],true); // 创建新任务服务,不替换原任务对象
$queue->register();// 注册新任务内容

// 在任务中可以使用 $this->queue 获取当前任务的数据(在任务继承了 Queue 或 QueueService 的情况下))
$this->queue->code; // 获取当前任务编号
$this->queue->data; // 获取当前任务参数
$this->queue->record; // 获取当前任务记录

// 设置当前任务进度数据及成功状态或失败状态(在任务继承了 Queue 或 QueueService 的情况下)
$this->setQueueError(); // 设置失败的消息并结束执行
$this->setQueueSuccess(); // 设置成功的消息并结束执行
$this->setQueueProgress(); // 设置运行进度并继续执行

以上两种方式都可以指定 任务标题执行任务内容执行延时时间任务绑定数据允许重复创建是否循环执行 等。

有些任务,在待处理和处理中是不需要再创建重复任务的,$rscript 就需要设置为 0,这是根据标题来识别的,所以标题也可以适当加上个性名称。

自建的异步处理是多进程任务处理,其中 windows 是基于 wmic 指令创建后台进程实现的,而 linux 则是通过 & 符实现。

因为是异步并列执行,建议自行控制下任务数据,免得过多消耗系统资源而影响项目正常使用。

在部署时,通常我们只需要创建定时器去执行 php think xadmin:queue start 就可以守护异步任务监听进程。

目前 ThinkAmdin V6 自定义异步任务机制支持两种规则机制:

  • 自定义单独的处理类,需要继承 think\admin\Queue 抽象类,实现 execute 方法,参数为任务绑定的参数 $data
  • 自定义 ThinkPHP 指令,默认使用 Console::call() 去尝试执行传入的指令,指令可以继承 think\admin\Command

异步任务进度

ThinkAdmin 支持任务完成状态跟进。

  • 后端创建任务之后可以获得任务编号,任务编号以字母 Q 开头
  • 前端可以使用 $.loadQueue(CODE)展示任务完成进度及数据
  • 后端任务可以使用 $this->setQueueProgress() 任务完成进度
  • 后端任务可以使用 $this->setQueueError() 设置任务已经失败
  • 后端任务可以使用 $this->setQueueSuccess() 设置任务已经完成
  • 具体参数可以查看方法源代码,进度数值为百分比,如 50 表示完成了 50%
  • 基于控制器实现,前端 data-queue=URL ,后端调用 $this->_queue() 方法创建任务

任务案例解析

前端代码(属性 data-queue 触发 $.loadQueue(Code) 展示任务进度)

<button data-queue="{:url('sync')}" class='layui-btn layui-btn-sm layui-btn-primary'>刷新会员数据</button>

后端代码(调用$this->_queue()会返回响应对象,响应对象data默认为任务编号)

/**
 * 刷新会员数据
 * @auth true
 */
public function sync()
{
    $this->_queue('重新计算所有会员级别', "xsync:member", 1, [], 0);
}

对应指令(指令需要注册,通过 sys.php 注册或 Service.php配置,执行 php think 可查看是否已生效)

// sys.php 动态注册操作指令
\think\Console::starting(function (Console $console) {
    $console->addCommand(\app\store\command\MemberRun::class);
});

在任务处理时可以使用 $this->queue 获取到数据参数(实际对象为 QueueService ,具体可以查阅对象代码)

/**
 * 重新计算会员数据
 * Class MemberRun
 * @package app\store\command
 */
class MemberRun extends \think\admin\Command
{

    protected function configure()
    {
        $this->setName('xsync:member')->setDescription('[ 商城 - 不需执行 ] 重新计算所有会员的等级');
    }

    /**
     * @param Input $input
     * @param Output $output
     * @throws \think\admin\Exception
     */
    protected function execute(Input $input, Output $output)
    {
        list($count, $total) = [0, $this->app->db->name('StoreMember')->count()];
        $this->app->db->name('StoreMember')->chunk(100, function (Collection $data) use (&$count, $total) {
            foreach ($data->toArray() as $vo) {
                $count++;
                $state = MemberService::instance()->syncLevel($vo['id']) ? '成功' : '失败';
                $this->setQueueProgress("调整会员 {$vo['id']} {$vo['phone']} {$vo['nickname']} 级别{$state}", $count / $total * 100);
            }
        });
        $this->setQueueSuccess("重新计算 {$count} 个会员的级别!");
    }
} 
Last Updated:
Contributors: 邹景立