一、简单认识
Celery是由Python开发、简单、灵活、可靠的分布式任务队列,其本质是生产者消费者模型,生产者发送任务到消息队列,消费者负责处理任务。Celery侧重于实时操作,但对调度支持也很好,其每天可以处理数以百万计的任务。
它的特点有:
- 简单:熟悉了它的流程后,配置使用简单;
- 高可用:任务执行失败或执行过程中发生连接中断,Celery会自动重新执行任务;
- 快速:一个单进程的Celery每分钟可处理上百万个任务;
- 灵活:Celery的各个组件都可以被扩展及自定制;
应用场景举例:
1.web应用:用户在网站进行某个操作需要很长时间完成时,我们可以将这种操作交给Celery执行,直接返回给用户,等到Celery执行完成以后通知用户,大大提好网站并发及用户体验感。
2.任务场景:需要批量在几百台机器执行某些命令或者任务,Celery可以轻松搞定。
3.定时任务:向定时导数据报表、定时发送通知类似场景,Celery可以提供管理接口和丰富的API。
二、架构和工作原理
Celery由以下三部分构成:消息中间件(Broker)、任务执行单元(Worker)、结果存储(Backend):来个图
消息中间件(Broker):
消息中间件Broker支持RabbitMQ、Redis、MongoDB、Memcached 等,官方推荐RabbitMQ。
任务执行单元(Worker)
Worker是任务执行单元,负责从消息队列中取出任务执行,它可以启动一个或者多个,也可以启动在不同的机器节点,这就是其实现分布式的核心。
结果存储(Backend)
Backend结果存储官方也提供了诸多的存储方式支持:RabbitMQ、 Redis、Memcached,SQLAlchemy, Django ORM、Apache Cassandra、Elasticsearch。
工作原理:
任务模块Task包含异步任务和定时任务。其中,异步任务通常在业务逻辑中被触发并发往消息队列,而定时任务由Celery Beat进程周期性地将任务发往消息队列;
任务执行单元Worker实时监视消息队列获取队列中的任务执行;
Woker执行完任务后将结果保存在Backend中;
三、安装使用
Redis的安装:
参考我的另外一篇:https://www.cnblogs.com/Utopia-Clint/p/10868258.html
Celery的安装:
一个简单的应用(Linux环境下)
注意:此时并没有将配置文件、任务文件及初始化文件分开,真的到应用环境中是要分开的,后面会提到;
创建一个文件目录:
mkdir /root/celery_study
在celery_study创建文件task.py
task.py:任务定义文件
# -*- coding:utf-8 -*-
# @Author : Clint
from celery import Celery
app = Celery('task',
broker='redis://:123456@localhost:6379',
backend='redis://:123456@localhost:6379',
)
@app.task
def add(x, y):
print("running...", x, y)
return x + y
启动Worker
celery -A task worker --loglevel=info
各个参数含义:
worker: 代表第启动的角色是work当然还有beat等其他角色;
-A :项目路径,这里我的目录是task;
-loglevel:启动的日志级别,有info、debug等,更多参数使用celery --help查看
任务队列已经准备就绪;
我们还需要通过delay或apply_async来将任务添加到worker中,这里我们通过交互式方法添加任务,并返回AsyncResult对象,通过AsyncResult对象获取结果:
AsyncResult除了get方法用于常用获取结果方法外还提以下常用方法或属性:
- state: 返回任务状态;
- task_id: 返回任务id;
- result: 返回任务结果,同get()方法;
- ready(): 判断任务是否以及有结果,有结果为True,否则False;
- info(): 获取任务信息,默认为结果;
- wait(t): 等待t秒后获取结果,若任务执行完毕,则不等待直接获取结果,若任务在执行中,则wait期间一直阻塞,直到超时报错;
- successfu(): 判断任务是否成功,成功为True,否则为False;