队列
介绍
Laravel 现在提供了 Horizon,一个美观的仪表板和配置系统,用于您的 Redis 驱动的队列。查看完整的 Horizon 文档 以获取更多信息。
Laravel 队列为各种不同的队列后端提供了统一的 API,例如 Beanstalk、Amazon SQS、Redis,甚至是关系数据库。队列允许您将耗时的任务(如发送电子邮件)的处理推迟到稍后的时间。推迟这些耗时的任务可以大大加快应用程序的 Web 请求速度。
队列配置文件存储在 config/queue.php
中。在此文件中,您将找到框架中包含的每个队列驱动程序的连接配置,其中包括数据库、Beanstalkd、Amazon SQS、Redis 和一个同步驱动程序,该驱动程序将立即执行作业(用于本地使用)。还包括一个 null
队列驱动程序,该驱动程序会丢弃排队的作业。
连接与队列
在开始使用 Laravel 队列之前,了解“连接”和“队列”之间的区别很重要。在您的 config/queue.php
配置文件中,有一个 connections
配置选项。此选项定义了与后端服务(如 Amazon SQS、Beanstalk 或 Redis)的特定连接。然而,任何给定的队列连接可能有多个“队列”,可以被视为不同的作业堆栈或堆。
请注意,queue
配置文件中的每个连接配置示例都包含一个 queue
属性。这是将作业发送到给定连接时将被调度到的默认队列。换句话说,如果您在调度作业时没有明确定义应将其调度到哪个队列,则作业将被放置在连接配置的 queue
属性中定义的队列中:
// 这个作业被发送到默认队列...
Job::dispatch();
// 这个作业被发送到 "emails" 队列...
Job::dispatch()->onQueue('emails');
某些应用程序可能不需要将作业推送到多个队列,而是更喜欢拥有一个简单的队列。然而,对于希望优先处理或分段处理作业的应用程序来说,将作业推送到多个队列可能特别有用,因为 Laravel 队列工作者允许您指定应按优先级处理哪些队列。例如,如果您将作业推送到 high
队列,您可以运行一个工作者,给予它们更高的处理优先级:
php artisan queue:work --queue=high,default
驱动程序说明和先决条件
数据库
为了使用 database
队列驱动程序,您需要一个数据库表来保存作业。要生成创建此表的迁移,请运行 queue:table
Artisan 命令。创建迁移后,您可以使用 migrate
命令迁移数据库:
php artisan queue:table
php artisan migrate
Redis
为了使用 redis
队列驱动程序,您应该在 config/database.php
配置文件中配置一个 Redis 数据库连接。
Redis 集群
如果您的 Redis 队列连接使用 Redis 集群,则您的队列名称必须包含一个 key hash tag。这是为了确保给定队列的所有 Redis 键都放置在同一个哈希槽中:
'redis' => [
'driver' => 'redis',
'connection' => 'default',
'queue' => '{default}',
'retry_after' => 90,
],
阻塞
使用 Redis 队列时,您可以使用 block_for
配置选项来指定驱动程序在进入工作者循环并重新轮询 Redis 数据库之前应等待作业可用的时间。
根据您的队列负载调整此值可能比持续轮询 Redis 数据库以获取新作业更有效。例如,您可以将值设置为 5
,以指示驱动程序在等待作业可用时应阻塞五秒钟:
'redis' => [
'driver' => 'redis',
'connection' => 'default',
'queue' => 'default',
'retry_after' => 90,
'block_for' => 5,
],
将 block_for
设置为 0
将导致队列工作者无限期阻塞,直到作业可用。这也将阻止信号(如 SIGTERM
)在处理下一个作业之前被处理。
其他驱动程序先决条件
以下依赖项是列出的队列驱动程序所需的:
- Amazon SQS:
aws/aws-sdk-php ~3.0
- Beanstalkd:
pda/pheanstalk ~4.0
- Redis:
predis/predis ~1.0
或 phpredis PHP 扩展
创建作业
生成作业类
默认情况下,应用程序的所有可排队作业都存储在 app/Jobs
目录中。如果 app/Jobs
目录不存在,当您运行 make:job
Artisan 命令时,它将被创建。您可以使用 Artisan CLI 生成一个新的排队作业:
php artisan make:job ProcessPodcast
生成的类将实现 Illuminate\Contracts\Queue\ShouldQueue
接口,指示 Laravel 该作业应被推送到队列以异步运行。
作业存根可以使用 存根发布 进行自定义
类结构
作业类非常简单,通常只包含一个 handle
方法,该方法在作业被队列处理时调用。首先,让我们看一个示例作业类。在此示例中,我们假装管理一个播客发布服务,并需要在发布之前处理上传的播客文件:
<?php
namespace App\Jobs;
use App\AudioProcessor;
use App\Podcast;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
class ProcessPodcast implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
protected $podcast;
/**
* 创建一个新的作业实例。
*
* @param Podcast $podcast
* @return void
*/
public function __construct(Podcast $podcast)
{
$this->podcast = $podcast;
}
/**
* 执行作业。
*
* @param AudioProcessor $processor
* @return void
*/
public function handle(AudioProcessor $processor)
{
// 处理上传的播客...
}
}
在此示例中,请注意我们能够将 Eloquent 模型 直接传递到排队作业的构造函数中。由于作业使用的 SerializesModels
trait,Eloquent 模型及其加载的关系将在作业处理时被优雅地序列化和反序列化。如果您的排队作业在其构造函数中接受 Eloquent 模型,则只有模型的标识符会被序列化到队列中。当作业实际处理时,队列系统将自动从数据库中重新检索完整的模型实例及其加载的关系。这对您的应用程序来说是完全透明的,并防止了序列化完整 Eloquent 模型实例时可能出现的问题。
handle
方法在作业被队列处理时调用。请注意,我们能够在作业的 handle
方法上进行类型提示依赖项。Laravel 服务容器 会自动注入这些依赖项。
如果您希望完全控制容器如何将依赖项注入到 handle
方法中,可以使用容器的 bindMethod
方法。bindMethod
方法接受一个回调,该回调接收作业和容器。在回调中,您可以随意调用 handle
方法。通常,您应该从 服务提供者 中调用此方法:
use App\Jobs\ProcessPodcast;
$this->app->bindMethod(ProcessPodcast::class.'@handle', function ($job, $app) {
return $job->handle($app->make(AudioProcessor::class));
});
二进制数据(如原始图像内容)应在传递给排队作业之前通过 base64_encode
函数进行处理。否则,作业在放置到队列时可能无法正确序列化为 JSON。
处理关系
由于加载的关系也会被序列化,因此序列化的作业字符串可能会变得相当大。为了防止关系被序列化,您可以在设置属性值时调用模型的 withoutRelations
方法。此方法将返回一个没有加载关系的模型实例:
/**
* 创建一个新的作业实例。
*
* @param \App\Podcast $podcast
* @return void
*/
public function __construct(Podcast $podcast)
{
$this->podcast = $podcast->withoutRelations();
}
作业中间件
作业中间件允许您在排队作业的执行过程中包装自定义逻辑,从而减少作业本身的样板代码。例如,考虑以下利用 Laravel 的 Redis 速率限制功能的 handle
方法,该方法允许每五秒钟只处理一个作业:
/**
* 执行作业。
*
* @return void
*/
public function handle()
{
Redis::throttle('key')->block(0)->allow(1)->every(5)->then(function () {
info('获得锁...');
// 处理作业...
}, function () {
// 无法获得锁...
return $this->release(5);
});
}
虽然此代码是有效的,但 handle
方法的结构变得嘈杂,因为它被 Redis 速率限制逻辑所干扰。此外,必须为我们希望速率限制的任何其他作业复制此速率限制逻辑。
而不是在 handle 方法中进行速率限制,我们可以定义一个处理速率限制的作业中间件。Laravel 没有作业中间件的默认位置,因此您可以将作业中间件放置在应用程序中的任何位置。在此示例中,我们将中间件放置在 app/Jobs/Middleware
目录中:
<?php
namespace App\Jobs\Middleware;
use Illuminate\Support\Facades\Redis;
class RateLimited
{
/**
* 处理排队作业。
*
* @param mixed $job
* @param callable $next
* @return mixed
*/
public function handle($job, $next)
{
Redis::throttle('key')
->block(0)->allow(1)->every(5)
->then(function () use ($job, $next) {
// 获得锁...
$next($job);
}, function () use ($job) {
// 无法获得锁...
$job->release(5);
});
}
}
如您所见,像 路由中间件 一样,作业中间件接收正在处理的作业和一个应调用的回调以继续处理作业。
创建作业中间件后,可以通过从作业的 middleware
方法返回它们来将它们附加到作业。此方法不存在于 make:job
Artisan 命令生成的作业中,因此您需要将其添加到自己的作业类定义中:
use App\Jobs\Middleware\RateLimited;
/**
* 获取作业应通过的中间件。
*
* @return array
*/
public function middleware()
{
return [new RateLimited];
}
调度作业
编写作业类后,您可以使用作业本身的 dispatch
方法来调度它。传递给 dispatch
方法的参数将传递给作业的构造函数:
<?php
namespace App\Http\Controllers;
use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;
class PodcastController extends Controller
{
/**
* 存储一个新的播客。
*
* @param Request $request
* @return Response
*/
public function store(Request $request)
{
// 创建播客...
ProcessPodcast::dispatch($podcast);
}
}
如果您希望有条件地调度作业,可以使用 dispatchIf
和 dispatchUnless
方法:
ProcessPodcast::dispatchIf($accountActive === true, $podcast);
ProcessPodcast::dispatchUnless($accountSuspended === false, $podcast);
延迟调度
如果您希望延迟排队作业的执行,可以在调度作业时使用 delay
方法。例如,让我们指定一个作业在调度后 10 分钟内不可用:
<?php
namespace App\Http\Controllers;
use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;
class PodcastController extends Controller
{
/**
* 存储一个新的播客。
*
* @param Request $request
* @return Response
*/
public function store(Request $request)
{
// 创建播客...
ProcessPodcast::dispatch($podcast)
->delay(now()->addMinutes(10));
}
}
Amazon SQS 队列服务的最大延迟时间为 15 分钟。
在响应发送到浏览器后调度
或者,dispatchAfterResponse
方法会延迟调度作业,直到响应发送到用户的浏览器。这仍然允许用户开始使用应用程序,即使排队作业仍在执行。这通常仅用于大约需要一秒钟的作业,例如发送电子邮件:
use App\Jobs\SendNotification;
SendNotification::dispatchAfterResponse();
您可以 dispatch
一个闭包并将 afterResponse
方法链接到助手上,以便在响应发送到浏览器后执行闭包:
use App\Mail\WelcomeMessage;
use Illuminate\Support\Facades\Mail;
dispatch(function () {
Mail::to('taylor@laravel.com')->send(new WelcomeMessage);
})->afterResponse();
同步调度
如果您希望立即(同步)调度作业,可以使用 dispatchNow
方法。使用此方法时,作业不会被排队,而是在当前进程中立即运行:
<?php
namespace App\Http\Controllers;
use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;
class PodcastController extends Controller
{
/**
* 存储一个新的播客。
*
* @param Request $request
* @return Response
*/
public function store(Request $request)
{
// 创建播客...
ProcessPodcast::dispatchNow($podcast);
}
}
作业链
作业链允许您指定在主作业成功执行后应按顺序运行的排队作业列表。如果序列中的一个作业失败,则不会运行其余的作业。要执行排队作业链,可以在任何可调度作业上使用 withChain
方法:
ProcessPodcast::withChain([
new OptimizePodcast,
new ReleasePodcast
])->dispatch();
除了链接作业类实例,您还可以链接闭包:
ProcessPodcast::withChain([
new OptimizePodcast,
new ReleasePodcast,
function () {
Podcast::update(...);
},
])->dispatch();
使用 $this->delete()
方法删除作业不会阻止处理链接的作业。链条将仅在链中的作业失败时停止执行。
链接连接和队列
如果您希望为链接的作业指定默认连接和队列,可以使用 allOnConnection
和 allOnQueue
方法。这些方法指定应使用的队列连接和队列名称,除非排队作业被明确分配了不同的连接/队列:
ProcessPodcast::withChain([
new OptimizePodcast,
new ReleasePodcast
])->dispatch()->allOnConnection('redis')->allOnQueue('podcasts');
自定义队列和连接
调度到特定队列
通过将作业推送到不同的队列,您可以“分类”排队作业,甚至可以优先考虑为各种队列分配多少工作者。请记住,这不会将作业推送到队列配置文件中定义的不同队列“连接”,而仅推送到单个连接中的特定队列。要指定队列,请在调度作业时使用 onQueue
方法:
<?php
namespace App\Http\Controllers;
use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;
class PodcastController extends Controller
{
/**
* 存储一个新的播客。
*
* @param Request $request
* @return Response
*/
public function store(Request $request)
{
// 创建播客...
ProcessPodcast::dispatch($podcast)->onQueue('processing');
}
}
调度到特定连接
如果您正在使用多个队列连接,可以指定将作业推送到哪个连接。要指定连接,请在调度作业时使用 onConnection
方法:
<?php
namespace App\Http\Controllers;
use App\Http\Controllers\Controller;
use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;
class PodcastController extends Controller
{
/**
* 存储一个新的播客。
*
* @param Request $request
* @return Response
*/
public function store(Request $request)
{
// 创建播客...
ProcessPodcast::dispatch($podcast)->onConnection('sqs');
}
}
您可以链接 onConnection
和 onQueue
方法,以指定作业的连接和队列:
ProcessPodcast::dispatch($podcast)
->onConnection('sqs')
->onQueue('processing');
指定最大作业尝试次数/超时值
最大尝试次数
指定作业可以尝试的最大次数的一种方法是通过 Artisan 命令行上的 --tries
开关:
php artisan queue:work --tries=3
但是,您可以通过在作业类本身上定义最大尝试次数来采取更细粒度的方法。如果在作业上指定了最大尝试次数,它将优先于命令行上提供的值:
<?php
namespace App\Jobs;
class ProcessPodcast implements ShouldQueue
{
/**
* 作业可以尝试的次数。
*
* @var int
*/
public $tries = 5;
}
基于时间的尝试
作为定义作业可以尝试多少次的替代方法,您可以定义作业应超时的时间。这允许在给定时间范围内尝试任意次数的作业。要定义作业应超时的时间,请在作业类中添加 retryUntil
方法:
/**
* 确定作业应超时的时间。
*
* @return \DateTime
*/
public function retryUntil()
{
return now()->addSeconds(5);
}
您还可以在排队的事件监听器上定义 retryUntil
方法。
最大异常次数
有时您可能希望指定作业可以尝试多次,但如果重试是由给定数量的异常触发的,则应失败。为此,您可以在作业类上定义一个 maxExceptions
属性:
<?php
namespace App\Jobs;
class ProcessPodcast implements ShouldQueue
{
/**
* 作业可以尝试的次数。
*
* @var int
*/
public $tries = 25;
/**
* 允许的最大异常次数。
*
* @var int
*/
public $maxExceptions = 3;
/**
* 执行作业。
*
* @return void
*/
public function handle()
{
Redis::throttle('key')->allow(10)->every(60)->then(function () {
// 获得锁,处理播客...
}, function () {
// 无法获得锁...
return $this->release(10);
});
}
}
在此示例中,如果应用程序无法获得 Redis 锁,作业将被释放十秒钟,并将继续重试最多 25 次。然而,如果作业抛出三个未处理的异常,则作业将失败。
超时
必须安装 pcntl
PHP 扩展才能指定作业超时。
同样,可以使用 Artisan 命令行上的 --timeout
开关指定作业可以运行的最大秒数:
php artisan queue:work --timeout=30
但是,您也可以在作业类本身上定义作业应允许运行的最大秒数。如果在作业上指定了超时,它将优先于命令行上指定的任何超时:
<?php
namespace App\Jobs;
class ProcessPodcast implements ShouldQueue
{
/**
* 作业可以运行的最大秒数。
*
* @var int
*/
public $timeout = 120;
}
有时,IO 阻塞进程(如套接字或传出 HTTP 连接)可能不尊重您指定的超时。因此,使用这些功能时,您应始终尝试使用其 API 指定超时。例如,使用 Guzzle 时,您应始终指定连接和请求超时值。
速率限制
此功能要求您的应用程序能够与 Redis 服务器 交互。
如果您的应用程序与 Redis 交互,您可以按时间或并发限制排队作业的速率。当您的排队作业与同样受速率限制的 API 交互时,此功能可能会有所帮助。
例如,使用 throttle
方法,您可以将给定类型的作业限制为每 60 秒仅运行 10 次。如果无法获得锁,您通常应将作业释放回队列,以便稍后重试:
Redis::throttle('key')->allow(10)->every(60)->then(function () {
// 作业逻辑...
}, function () {
// 无法获得锁...
return $this->release(10);
});
在上面的示例中,key
可以是唯一标识您希望速率限制的作业类型的任何字符串。例如,您可能希望根据作业的类名和其操作的 Eloquent 模型的 ID 构建键。
将节流的作业释放回队列仍会增加作业的总 attempts
数。
或者,您可以指定可以同时处理给定作业的最大工作者数量。当排队作业正在修改应仅由一个作业一次修改的资源时,这可能会有所帮助。例如,使用 funnel
方法,您可以限制给定类型的作业仅由一个工作者同时处理:
Redis::funnel('key')->limit(1)->then(function () {
// 作业逻辑...
}, function () {
// 无法获得锁...
return $this->release(10);
});
使用速率限制时,确定作业成功运行所需的尝试次数可能很困难。因此,将速率限制与 基于时间的尝试 结合使用是有用的。
错误处理
如果在处理作业时抛出异常,作业将自动释放回队列,以便再次尝试。作业将继续释放,直到达到应用程序允许的最大尝试次数。最大尝试次数由 queue:work
Artisan 命令上使用的 --tries
开关定义。或者,可以在作业类本身上定义最大尝试次数。有关运行队列工作者的更多信息 可以在下面找到。
队列闭包
除了将作业类调度到队列,您还可以调度闭包。这对于需要在当前请求周期之外执行的快速简单任务非常有用:
$podcast = App\Podcast::find(1);
dispatch(function () use ($podcast) {
$podcast->publish();
});
将闭包调度到队列时,闭包的代码内容会被加密签名,以便在传输过程中无法修改。
运行队列工作者
Laravel 包含一个队列工作者,它将在作业推送到队列时处理新作业。您可以使用 queue:work
Artisan 命令运行工作者。请注意,一旦 queue:work
命令启动,它将继续运行,直到手动停止或关闭终端:
php artisan queue:work
要使 queue:work
进程永久在后台运行,您应该使用进程监视器(如 Supervisor)来确保队列工作者不会停止运行。
请记住,队列工作者是长时间运行的进程,并将已启动的应用程序状态存储在内存中。因此,它们在启动后不会注意到代码库中的更改。因此,在部署过程中,请确保 重新启动队列工作者。此外,请记住,应用程序创建或修改的任何静态状态在作业之间不会自动重置。
或者,您可以运行 queue:listen
命令。使用 queue:listen
命令时,您不必在想要重新加载更新的代码或重置应用程序状态时手动重新启动工作者;然而,此命令不如 queue:work
高效:
php artisan queue:listen
指定连接和队列
您还可以指定工作者应使用哪个队列连接。传递给 work
命令的连接名称应对应于 config/queue.php
配置文件中定义的连接之一:
php artisan queue:work redis
您可以通过仅处理给定连接的特定队列来进一步自定义队列工作者。例如,如果您的所有电子邮件都在 redis
队列连接上的 emails
队列中处理,您可以发出以下命令以启动仅处理该队列的工作者:
php artisan queue:work redis --queue=emails
处理单个作业
可以使用 --once
选项指示工作者仅从队列中处理一个作业:
php artisan queue:work --once
处理所有排队作业然后退出
可以使用 --stop-when-empty
选项指示工作者处理所有作业然后优雅地退出。当在 Docker 容器中使用 Laravel 队列时,如果希望在队列为空后关闭容器,此选项可能很有用:
php artisan queue:work --stop-when-empty
资源考虑
守护进程队列工作者在处理每个作业之前不会“重启”框架。因此,您应该在每个作业完成后释放任何重资源。例如,如果您使用 GD 库进行图像处理,完成后应使用 imagedestroy
释放内存。
队列优先级
有时您可能希望优先处理队列。例如,在 config/queue.php
中,您可以将 redis
连接的默认 queue
设置为 low
。然而,偶尔您可能希望将作业推送到 high
优先级队列,如下所示:
dispatch((new Job)->onQueue('high'));
要启动一个验证所有 high
队列作业在继续处理 low
队列上的任何作业之前已处理的工作者,请将逗号分隔的队列名称列表传递给 work
命令:
php artisan queue:work --queue=high,low
队列工作者和部署
由于队列工作者是长时间运行的进程,因此在不重新启动的情况下不会拾取代码更改。因此,部署使用队列工作者的应用程序的最简单方法是在部署过程中重新启动工作者。您可以通过发出 queue:restart
命令优雅地重新启动所有工作者:
php artisan queue:restart
此命令将指示所有队列工作者在完成当前作业后优雅地“死亡”,以确保没有现有作业丢失。由于在执行 queue:restart
命令时队列工作者将死亡,因此您应该运行一个进程管理器(如 Supervisor)以自动重新启动队列工作者。
队列使用 缓存 存储重启信号,因此在使用此功能之前,您应验证应用程序已正确配置缓存驱动程序。
作业过期和超时
作业过期
在 config/queue.php
配置文件中,每个队列连接定义了一个 retry_after
选项。此选项指定队列连接在重试正在处理的作业之前应等待的秒数。例如,如果 retry_after
的值设置为 90
,则如果作业已处理 90 秒而未被删除,它将被释放回队列。通常,您应将 retry_after
值设置为作业合理完成处理所需的最大秒数。
唯一不包含 retry_after
值的队列连接是 Amazon SQS。SQS 将根据 AWS 控制台中管理的 默认可见性超时 重试作业。
工作者超时
queue:work
Artisan 命令公开了一个 --timeout
选项。--timeout
选项指定 Laravel 队列主进程在杀死正在处理作业的子队列工作者之前将等待的时间。有时,子队列进程可能会因各种原因而“冻结”。--timeout
选项会删除超过指定时间限制的冻结进程:
php artisan queue:work --timeout=60
retry_after
配置选项和 --timeout
CLI 选项是不同的,但它们协同工作以确保作业不会丢失,并且作业仅成功处理一次。
--timeout
值应始终比 retry_after
配置值短几秒钟。这将确保在重试作业之前始终杀死正在处理给定作业的工作者。如果 --timeout
选项长于 retry_after
配置值,您的作业可能会被处理两次。
工作者休眠时间
当队列上有作业可用时,工作者将继续处理作业而没有延迟。然而,sleep
选项决定了如果没有新作业可用,工作者将“休眠”的时间(以秒为单位)。在休眠期间,工作者不会处理任何新作业 - 作业将在工作者再次唤醒后处理。
php artisan queue:work --sleep=3
Supervisor 配置
安装 Supervisor
Supervisor 是 Linux 操作系统的进程监视器,如果 queue:work
进程失败,它将自动重新启动。要在 Ubuntu 上安装 Supervisor,您可以使用以下命令:
sudo apt-get install supervisor
如果自己配置 Supervisor 听起来很复杂,可以考虑使用 Laravel Forge,它将自动为您的 Laravel 项目安装和配置 Supervisor。
配置 Supervisor
Supervisor 配置文件通常存储在 /etc/supervisor/conf.d
目录中。在此目录中,您可以创建任意数量的配置文件,指示 Supervisor 如何监视您的进程。例如,让我们创建一个 laravel-worker.conf
文件,启动并监视一个 queue:work
进程:
[program:laravel-worker]
process_name=%(program_name)s_%(process_num)02d
command=php /home/forge/app.com/artisan queue:work sqs --sleep=3 --tries=3
autostart=true
autorestart=true
user=forge
numprocs=8
redirect_stderr=true
stdout_logfile=/home/forge/app.com/worker.log
stopwaitsecs=3600
在此示例中,numprocs
指令将指示 Supervisor 运行 8 个 queue:work
进程并监视所有进程,如果它们失败,将自动重新启动。您应更改 command
指令中的 queue:work sqs
部分,以反映您所需的队列连接。
您应确保 stopwaitsecs
的值大于最长运行作业消耗的秒数。否则,Supervisor 可能会在作业完成处理之前杀死作业。
启动 Supervisor
创建配置文件后,您可以使用以下命令更新 Supervisor 配置并启动进程:
sudo supervisorctl reread
sudo supervisorctl update
sudo supervisorctl start laravel-worker:*
有关 Supervisor 的更多信息,请查阅 Supervisor 文档。
处理失败的作业
有时您的排队作业会失败。别担心,事情并不总是按计划进行!Laravel 提供了一种方便的方法来指定作业应尝试的最大次数。在作业超过此尝试次数后,它将被插入到 failed_jobs
数据库表中。要为 failed_jobs
表创建迁移,您可以使用 queue:failed-table
命令:
php artisan queue:failed-table
php artisan migrate
然后,在运行 队列工作者 时,您可以使用 queue:work
命令上的 --tries
开关指定作业应尝试的最大次数。如果您未为 --tries
选项指定值,作业将仅尝试一次:
php artisan queue:work redis --tries=3
此外,您可以使用 --delay
选项指定 Laravel 在重试失败作业之前应等待的秒数。默认情况下,作业会立即重试:
php artisan queue:work redis --tries=3 --delay=3
如果您希望在每个作业的基础上配置失败作业重试延迟,可以通过在排队作业类上定义 retryAfter
属性来实现:
/**
* 重试作业之前等待的秒数。
*
* @var int
*/
public $retryAfter = 3;
如果您需要更复杂的逻辑来确定重试延迟,可以在排队作业类上定义 retryAfter
方法:
/**
* 计算重试作业之前等待的秒数。
*
* @return int
*/
public function retryAfter()
{
return 3;
}
清理失败的作业
您可以直接在作业类上定义一个 failed
方法,允许您在发生故障时执行作业特定的清理。这是向用户发送警报或撤销作业执行的任何操作的理想位置。导致作业失败的 Throwable
异常将传递给 failed
方法:
<?php
namespace App\Jobs;
use App\AudioProcessor;
use App\Podcast;
use Throwable;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
class ProcessPodcast implements ShouldQueue
{
use InteractsWithQueue, Queueable, SerializesModels;
protected $podcast;
/**
* 创建一个新的作业实例。
*
* @param \App\Podcast $podcast
* @return void
*/
public function __construct(Podcast $podcast)
{
$this->podcast = $podcast;
}
/**
* 执行作业。
*
* @param \App\AudioProcessor $processor
* @return void
*/
public function handle(AudioProcessor $processor)
{
// 处理上传的播客...
}
/**
* 处理作业失败。
*
* @param \Throwable $exception
* @return void
*/
public function failed(Throwable $exception)
{
// 发送用户失败通知等...
}
}
如果作业是使用 dispatchNow
方法调度的,则不会调用 failed
方法。
失败作业事件
如果您希望注册一个在作业失败时调用的事件,可以使用 Queue::failing
方法。此事件是通过电子邮件或 Slack 通知团队的绝佳机会。例如,我们可以从 Laravel 附带的 AppServiceProvider
中附加一个回调到此事件:
<?php
namespace App\Providers;
use Illuminate\Support\Facades\Queue;
use Illuminate\Support\ServiceProvider;
use Illuminate\Queue\Events\JobFailed;
class AppServiceProvider extends ServiceProvider
{
/**
* 注册任何应用程序服务。
*
* @return void
*/
public function register()
{
//
}
/**
* 启动任何应用程序服务。
*
* @return void
*/
public function boot()
{
Queue::failing(function (JobFailed $event) {
// $event->connectionName
// $event->job
// $event->exception
});
}
}
重试失败的作业
要查看已插入到 failed_jobs
数据库表中的所有失败作业,可以使用 queue:failed
Artisan 命令:
php artisan queue:failed
queue:failed
命令将列出作业 ID、连接、队列、失败时间和有关作业的其他信息。作业 ID 可用于重试失败的作业。例如,要重试 ID 为 5
的失败作业,请发出以下命令:
php artisan queue:retry 5
如有必要,您可以将多个 ID 或 ID 范围(使用数字 ID 时)传递给命令:
php artisan queue:retry 5 6 7 8 9 10
php artisan queue:retry --range=5-10
要重试所有失败的作业,请执行 queue:retry
命令并传递 all
作为 ID:
php artisan queue:retry all
如果您希望删除失败的作业,可以使用 queue:forget
命令:
php artisan queue:forget 5
要删除所有失败的作业,可以使用 queue:flush
命令:
php artisan queue:flush
忽略缺失的模型
将 Eloquent 模型注入作业时,它会在放置到队列之前自动序列化,并在作业处理时恢复。然而,如果模型在作业等待工作者处理时被删除,您的作业可能会因 ModelNotFoundException
而失败。
为了方便起见,您可以选择通过将作业的 deleteWhenMissingModels
属性设置为 true
来自动删除缺失模型的作业:
/**
* 如果模型不再存在,则删除作业。
*
* @var bool
*/
public $deleteWhenMissingModels = true;
作业事件
使用 Queue
facade 上的 before
和 after
方法,您可以指定在处理排队作业之前或之后执行的回调。这些回调是执行额外日志记录或为仪表板增加统计数据的绝佳机会。通常,您应从 服务提供者 中调用这些方法。例如,我们可以使用 Laravel 附带的 AppServiceProvider
:
<?php
namespace App\Providers;
use Illuminate\Support\Facades\Queue;
use Illuminate\Support\ServiceProvider;
use Illuminate\Queue\Events\JobProcessed;
use Illuminate\Queue\Events\JobProcessing;
class AppServiceProvider extends ServiceProvider
{
/**
* 注册任何应用程序服务。
*
* @return void
*/
public function register()
{
//
}
/**
* 启动任何应用程序服务。
*
* @return void
*/
public function boot()
{
Queue::before(function (JobProcessing $event) {
// $event->connectionName
// $event->job
// $event->job->payload()
});
Queue::after(function (JobProcessed $event) {
// $event->connectionName
// $event->job
// $event->job->payload()
});
}
}
使用 Queue
facade 上的 looping
方法,您可以指定在工作者尝试从队列中获取作业之前执行的回调。例如,您可以注册一个闭包,以回滚由先前失败的作业留下的任何事务:
Queue::looping(function () {
while (DB::transactionLevel() > 0) {
DB::rollBack();
}
});