PHP Laravel 队列技巧:Fail、Retry 或者 Delay

Stella981
• 阅读 621

当创建队列jobs、监听器或订阅服务器以推送到队列中时,您可能会开始认为,一旦分派,队列工作器决定如何处理您的逻辑就完全由您自己决定了。

嗯……并不是说你不能从作业内部与队列工作器交互,但是通常情况下,哪怕你做了,也是没必要的。

这个神奇的骚操作的出现是因为“InteractsWithQueue”这个trait。.当排队作业正在从队列中拉出, 这个 [CallQueuedListener](https://github.com/laravel/framework/blob/5.8/src/Illuminate/Events/CallQueuedListener.php#L90-L104) 会检查它是否在使用 InteractsWithQueue trait, 如果是的话,框架会将底层的“队列jobs”实例注入到内部。

这个 “任务” 实例类似于一个包装了真正的 Job 类的驱动,其中包含队列连接和尝试等信息。

背景
我将以一个转码 Job 为例。 这是一个将广播音频文件转换成192kbps MP3格式的任务。因为这是在自由转码队列中设置的,所以它的作用有限。

检查尝试次数
attempts()是被调用的第一个方法, 顾名思义,它返回尝试次数,一个队列 job总是伴随着一个attempt启动。

此方法旨在与其他方法一起使用 ..., 类似 fail() 或者 release() (delay). 为了便于说明,我们将通知用户第几次重试: 每次我们尝试在空闲队列中转换(转换代码)时,我们都会通知用户我们正在第几次重试,让他可以选择取消将来的转换(转换代码)。

 1 <?php
 2 namespace App\Jobs;
 3 use App\Podcast;
 4 use Transcoder\Transcoder;
 5 use Illuminate\Bus\Queueable;
 6 use Illuminate\Queue\SerializesModels;
 7 use App\Notifications\PodcastTranscoded;
 8 use Illuminate\Queue\InteractsWithQueue;
 9 use Illuminate\Foundation\Bus\Dispatchable;
10 use App\Notifications\RetyingPodcastTranscode;
11 class TranscodePodcast
12 {
13 use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
14 /**
15 * Transcoder Instance
16 *
17 * @var \App\Podcast
18 */
19 protected $podcast;
20 /**
21 * 创建一个新的转码podcast实例。
22 *
23 * @param \App\Podcast $podcast
24 * @return void
25 */
26 public function __construct(Podcast $podcast)
27 {
28 $this->podcast = $podcast;
29 }
30 /**
31 * 执行队列job.
32 *
33 * @param \Transcoder\Transcoder $podcast
34 * @return void
35 */
36 public function handle(Transcoder $transcoder)
37 {
38 // 告诉用户我们第几次重试
39 if ($this->attempts() > 1) {
40 $this->podcast->publisher->notify(new RetryingPodcastTranscode($this->podcast, $this->attempts());
41 }
42 $transcoded = $this->transcoder->setFile($event->podcast)
43 ->format('mp3')
44 ->bitrate(192)
45 ->start();
46 
47 
48 // 将转码podcast与原始podcast关联
49 $this->podcast->transcode()->associate($transcoded);
50 
51 // 通知podcast的发布者他的podcast已经准备好了
52 
53 $this->publisher->notify(new PodcastTranscoded($this->podcast));
54 }
55 }

告诉用户我们将在第几次时重试某些内容,这在逻辑预先失败时很有用,让用户(或开发人员)检查出了什么问题,但当然您可以做更多的事情。

就我个人而言,我喜欢在“Job作业”失败后再这样做,如果还有重试时间,告诉他我们稍后会重试当然,这个例子只是为了举例说明。

删除作业队列 Job
第二个方法就是 delete(). 跟你猜想的一样,您可以从队列中删除当前的“队列 Job”。 当队列 Job或侦听器由于多种原因排队后不应处理时,这将会很方便,例如,考虑一下这个场景:在转码发生之前,上传podcast的发布者由于任何原因(比如TOS冲突)被停用,我们应该不处理podcast。

我们将在前面的示例中添加该代码:

 1 <?php
 2 namespace App\Jobs;
 3 use App\Podcast;
 4 use Transcoder\Transcoder;
 5 use Illuminate\Bus\Queueable;
 6 use Illuminate\Queue\SerializesModels;
 7 use App\Notifications\PodcastTranscoded;
 8 use Illuminate\Queue\InteractsWithQueue;
 9 use Illuminate\Foundation\Bus\Dispatchable;
10 use App\Notifications\RetyingPodcastTranscode;
11 class TranscodePodcast
12 {
13 use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
14 /**
15 * Transcoder Instance
16 *
17 * @var \App\Podcast
18 */
19 protected $podcast;
20 /**
21 * 创建一个新的转码podcast实例。
22 *
23 * @param \App\Podcast $podcast
24 * @return void
25 */
26 public function __construct(Podcast $podcast)
27 {
28 $this->podcast = $podcast;
29 }
30 /**
31 * 执行队列 job.
32 *
33 * @param \Transcoder\Transcoder $podcast
34 * @return void
35 */
36 public function handle(Transcoder $transcoder)
37 {
38 // 如果发布服务器已被停用,请删除此队列job
39 if ($this->podcast->publisher->isDeactivated()) {
40 $this->delete();
41 }
42 // 告诉用户我们第几次重试
43 if ($this->attempts() > 1) {
44 $this->podcast->publisher->notify(new RetryingPodcastTranscode($this->podcast, $this->attempts());
45 }
46 $transcoded = $this->transcoder->setFile($event->podcast)
47 ->format('mp3')
48 ->bitrate(192)
49 ->start();
50 
51 // 将转码podcast与原始podcast关联
52 $this->podcast->transcode()->associate($transcoded);
53 
54 // 通知podcast的发布者他的podcast已经准备好了
55 $this->publisher->notify(new PodcastTranscoded($this->podcast));
56 }
57 }

如果需要删除可能已删除的模型上的作业,则可能需要 设置 [$deleteWhenMissingModels](https://laravel.com/docs/5.8/queues#ignoring-missing-models) 为真 t避免处理不存在的东西。

失败的队列job
当您需要控制人为破坏逻辑时,这非常非常方便, 因为使用空的“return”语句会将“Job”标记为已成功完成。您可以强制使排队的作业失败,希望出现异常,允许处理程序在可能的情况下稍后重试。

这使您在作业失败时可以更好地控制在任何情况下,也可以使用“failed()”方法, 它允许你 失败后执行任何清洁操纵, 比如通知用户或者删除一些东西。

在此示例中,如果由于任何原因(如 CDN 关闭时)无法从存储中检索podcast ,则作业将失败,并引发自定义异常。

 1 <?php
 2 namespace App\Jobs;
 3 use App\Podcast;
 4 use Transcoder\Transcoder;
 5 use Illuminate\Bus\Queueable;
 6 use Illuminate\Queue\SerializesModels;
 7 use App\Exceptions\PodcastUnretrievable;
 8 use App\Notifications\PodcastTranscoded;
 9 use Illuminate\Queue\InteractsWithQueue;
10 use Illuminate\Foundation\Bus\Dispatchable;
11 use App\Notifications\RetyingPodcastTranscode;
12 class TranscodePodcast
13 {
14 use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
15 /**
16 * 转码器实例
17 *
18 * @var \App\Podcast
19 */
20 protected $podcast;
21 /**
22 * 创建一个新的转码Podcast实例。
23 *
24 * @param \App\Podcast $podcast
25 * @return void
26 */
27 public function __construct(Podcast $podcast)
28 {
29 $this->podcast = $podcast;
30 }
31 /**
32 * 执行队列 job.
33 *
34 * @param \Transcoder\Transcoder $podcast
35 * @return void
36 */
37 public function handle(Transcoder $transcoder)
38 {
39 // 如果发布服务器已被停用,请删除此队列job
40 if ($this->podcast->publisher->isDeactivated()) {
41 $this->delete();
42 }
43 //如果podcast不能从storage存储中检索,我们就会失败。
44 if ($this->podcast->fileDoesntExists()) {
45 $this->fail(new PodcastUnretrievable($this->podcast));
46 }
47 // 告诉用户我们第几次重试
48 if ($this->attempts() > 1) {
49 $this->podcast->publisher->notify(new RetryingPodcastTranscode($this->podcast, $this->attempts());
50 }
51 
52 $transcoded = $this->transcoder->setFile($event->podcast)
53 ->format('mp3')
54 ->bitrate(192)
55 ->start();
56 
57 // 将转码podcast与原始podcast关联
58 $this->podcast->transcode()->associate($transcoded);
59 
60 // 通知podcast的发布者他的podcast已经准备好了
61 $this->publisher->notify(new PodcastTranscoded($this->podcast));
62 }
63 }

现在,进入最后的方法。

释放(延迟)队列job
这可能是trait性状的最有用的方法, 因为它可以让你在未来进一步推动这项队列job. 此方法用于 队列job速率限制.

除了速率限制之外,您还可以在某些不可用但希望在不久的将来使用它的情况下使用它同时,避免先发制人地失败。

在最后一个示例中,我们将延迟转码以备稍后使用:如果转码器正在大量使用,我们将延迟转码5分钟直到负载降低。

 1 <?php
 2 namespace App\Jobs;
 3 use App\Podcast;
 4 use Transcoder\Transcoder;
 5 use Illuminate\Bus\Queueable;
 6 use Illuminate\Queue\SerializesModels;
 7 use App\Exceptions\PodcastUnretrievable;
 8 use App\Notifications\PodcastTranscoded;
 9 use Illuminate\Queue\InteractsWithQueue;
10 use App\Notifications\TranscoderHighUsage;
11 use Illuminate\Foundation\Bus\Dispatchable;
12 use App\Notifications\RetyingPodcastTranscode;
13 class TranscodePodcast
14 {
15 use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
16 /**
17 * Transcoder Instance
18 *
19 * @var \App\Podcast
20 */
21 protected $podcast;
22 /**
23 * 创建一个新的转码podcast实例。
24 *
25 * @param \App\Podcast $podcast
26 * @return void
27 */
28 public function __construct(Podcast $podcast)
29 {
30 $this->podcast = $podcast;
31 }
32 /**
33 * 执行队列job.
34 *
35 * @param \Transcoder\Transcoder $podcast
36 * @return void
37 */
38 public function handle(Transcoder $transcoder)
39 {
40 // 如果发布服务器已被停用,请删除此队列job
41 if ($this->podcast->publisher->isDeactivated()) {
42 $this->delete();
43 }
44 // 如果podcast不能从storage存储中检索,我们就会失败。
45 if ($this->podcast->fileDoesntExists()) {
46 $this->fail(new PodcastUnretrievable($this->podcast));
47 }
48 
49 // 如果转码器使用率很高,我们将
50 // t延迟转码5分钟. 否则我们可能会有拖延转码器进程的危险 
51 // 它会把所有的转码子进程都记录下来。
52 if ($transcoder->getLoad()->isHigh()) {
53 $delay = 60 * 5;
54 $this->podcast->publisher->notify(new TranscoderHighUsage($this->podcast, $delay));
55 $this->release($delay);
56 }
57 // 告诉用户我们第几次重试
58 if ($this->attempts() > 1) {
59 $this->podcast->publisher->notify(new RetryingPodcastTranscode($this->podcast, $this->attempts());
60 }
61 
62 $transcoded = $this->transcoder->setFile($event->podcast)
63 ->format('mp3')
64 ->bitrate(192)
65 ->start();
66 
67 // 将转码podcast与原始podcast关联
68 $this->podcast->transcode()->associate($transcoded);
69 
70 // 通知podcast的发布者他的podcast已经准备好了
71 $this->publisher->notify(new PodcastTranscoded($this->podcast));
72 }
73 }

我们可以使用一些特殊方法,例如,获得分配给转码器的一些时隙,如果转码器时隙已满,则延迟作业。 在排队的工作中,你能做的就只有这些了。排队愉快。

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Wesley13 Wesley13
3年前
java将前端的json数组字符串转换为列表
记录下在前端通过ajax提交了一个json数组的字符串,在后端如何转换为列表。前端数据转化与请求varcontracts{id:'1',name:'yanggb合同1'},{id:'2',name:'yanggb合同2'},{id:'3',name:'yang
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
5个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Wesley13 Wesley13
3年前
PHP创建多级树型结构
<!lang:php<?php$areaarray(array('id'1,'pid'0,'name''中国'),array('id'5,'pid'0,'name''美国'),array('id'2,'pid'1,'name''吉林'),array('id'4,'pid'2,'n
Stella981 Stella981
3年前
Django中Admin中的一些参数配置
设置在列表中显示的字段,id为django模型默认的主键list_display('id','name','sex','profession','email','qq','phone','status','create_time')设置在列表可编辑字段list_editable
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
为什么mysql不推荐使用雪花ID作为主键
作者:毛辰飞背景在mysql中设计表的时候,mysql官方推荐不要使用uuid或者不连续不重复的雪花id(long形且唯一),而是推荐连续自增的主键id,官方的推荐是auto_increment,那么为什么不建议采用uuid,使用uuid究
Python进阶者 Python进阶者
11个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这