FastAPI+apSheduler动态定时任务

Stella981
• 阅读 1784

###阅读目录

####一、apSheduler ####二、实战应用

apSheduler

###1.安装

pip install apscheduler

###2.基础组件

  • triggers 触发器
  • job stores job存储
  • executors 执行器
  • schedulers 调度器

###3.选择合适的调度器,存储器,执行器,触发器 ####3.1调度器(schedulers)

  • BlockingScheduler: 进程中只有调度器
  • BackgroundScheduler: 非以下框架,且希望运行在后台
  • AsyncIOScheduler: 应用程序使用asyncio
  • GeventScheduler: 应用程序使用gevent
  • TornadoScheduler: 构建Tornado
  • TwistedScheduler: 构建Twisted应用
  • QtScheduler: 构建Qt应用

####3.2存储器(job stores)

  • 持久化存储job,通过SQLAlchemyJobStore设置存储链接
  • 非持久化存储job,重启时重新创建job,默认MemoryJobStore内存存储

####3.3执行器(executors)

  • processpoolexecutor,CUP密集型业务,可选进程池,也可以同线程池同时使用
  • threadpoolexecutor,默人线程池

####3.4触发器(triggers )

  • date: 设置日期,针对某个时间点运行一次job
  • interval: 固定时间间隔运行job
  • cron: 类似linux-crontab,某个时间点定期运行job

###4.配置调度器

from pytz import utc

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ProcessPoolExecutor


jobstores = {
    # 可以配置多个存储
    'mongo': {'type': 'mongodb'}, 
    'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')  # SQLAlchemyJobStore指定存储链接
}
executors = {
    'default': {'type': 'threadpool', 'max_workers': 20},     # 最大工作线程数20
    'processpool': ProcessPoolExecutor(max_workers=5)         # 最大工作进程数为5
}
job_defaults = {
    'coalesce': False,   # 关闭新job的合并,当job延误或者异常原因未执行时
    'max_instances': 3   # 并发运行新job默认最大实例多少
}
scheduler = BackgroundScheduler()

# .. do something else here, maybe add jobs etc.

scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc) # utc作为调度程序的时区

###5.调度器的增删改查

import os
import time
from apscheduler.schedulers.background import BackgroundScheduler

def print_time(name):
    print(f'{name} - {time.ctime()}')

def add_job(job_id, func, args, seconds):
    """添加job"""
    print(f"添加job - {job_id}")
    scheduler.add_job(id=job_id, func=func, args=args, trigger='interval', seconds=seconds)

def remove_job(job_id):
    """移除job"""
    scheduler.remove_job(job_id)
    print(f"移除job - {job_id}")

def pause_job(job_id):
    """停止job"""
    scheduler.pause_job(job_id)
    print(f"停止job - {job_id}")

def resume_job(job_id):
    """恢复job"""
    scheduler.resume_job(job_id)
    print(f"恢复job - {job_id}")

def get_jobs():
    """获取所有job信息,包括已停止的"""
    res = scheduler.get_jobs()
    print(f"所有job - {res}")

def print_jobs():
    print(f"详细job信息")
    scheduler.print_jobs()

def start():
    """启动调度器"""
    scheduler.start()

def shutdown():
    """关闭调度器"""
    scheduler.shutdown()

if __name__ == '__main__':
    scheduler = BackgroundScheduler()
    start()
    print('Press Ctrl+{0} to exit \n'.format('Break' if os.name == 'nt' else 'C'))
    add_job('job_A', func=print_time, args=("A", ), seconds=1)
    add_job('job_B', func=print_time, args=("B", ), seconds=2)
    time.sleep(6)
    pause_job('job_A')
    get_jobs()
    time.sleep(6)
    print_jobs()
    resume_job('job_A')
    time.sleep(6)
    remove_job('job_A')
    time.sleep(6)
    try:
        shutdown()
    except RuntimeError:
        pass

###6.调度事件 可以将事件侦听器附加到调度程序。调度程序事件在某些情况下会被触发,并且可能会在其中携带有关该特定事件详细信息的附加信息。通过给add_listener()提供适当的掩码参数或者将不同的常量放在一起,可以只监听特定类型的事件。用一个参数调用侦听器callable,即event对象。

def my_listener(event):
    if event.exception:
        print('The job crashed :(')
    else:
        print('The job worked :)')

scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)

###7.配置日志

import logging

logging.basicConfig()
logging.getLogger('apscheduler').setLevel(logging.DEBUG)

实战应用

###1.fastapi动态添加定时任务

import asyncio
import datetime
import uvicorn
from fastapi import FastAPI, Body
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ProcessPoolExecutor
from apscheduler.triggers.cron import CronTrigger

app = FastAPI(title='fast-api')

scheduler = None


@app.on_event('startup')
def init_scheduler():
    """初始化"""
    jobstores = {
        'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')  # SQLAlchemyJobStore指定存储链接
    }
    executors = {
        'default': {'type': 'threadpool', 'max_workers': 20},  # 最大工作线程数20
        'processpool': ProcessPoolExecutor(max_workers=5)  # 最大工作进程数为5
    }
    global scheduler
    scheduler = AsyncIOScheduler()
    scheduler.configure(jobstores=jobstores, executors=executors)
    # 添加一个coroutine执行,结果很不理想...
    scheduler.add_job(tick, 'interval', seconds=3)
    print("启动调度器...")

    scheduler.start()


def print_time(name):
    print(f'{name} - {datetime.datetime.now()}')


async def tick():
    print('Tick! The time is: %s' % datetime.datetime.now())
    await asyncio.sleep(1)


@app.post('/add-job')
async def add_job(job_id: str = Body(...), cron: str = Body(...)):
    """添加job"""
    scheduler.add_job(id=job_id, func=print_time, args=(job_id, ), trigger=CronTrigger.from_crontab(cron))
    return {"msg": "success!"}


@app.post('/remove-job')
async def remove_job(job_id: str = Body(..., embed=True)):
    """移除job"""
    scheduler.remove_job(job_id)
    return {"msg": "success!"}


if __name__ == '__main__':
    uvicorn.run(app, host='127.0.0.1', port=5566)

####1.1思考

  • 在项目中定时任务要和后端运行需要运行在一个进程中吗?
  • 后端频繁发布代码,怎么避免影响定时任务执行呢?
  • 把定时任务抽离搭建服务,如何去做呢?
  • apscheduler在多进程中会多次加载job,导致job重复执行,怎么解决呢?

###2.rpyc实现定时任务服务注册

针对上面的三个思考,官方也给出了基于rpyc解决的demo

####2.1rpc负责添加job,执行函数在rpc,serve只做添加调用后关闭

  • 见官方demo

####2.2rpc负责添加job,执行函数在server #####server.py

import datetime
import uvicorn
from fastapi import FastAPI, Body
import rpyc

app = FastAPI(title='fast-api')

conn = None
bgsrv = None
mon = None

@app.on_event('startup')
def init_scheduler():
    """初始化"""
    global conn,bgsrv,mon
    conn = rpyc.connect("localhost", 12345)
    # create a bg thread to process incoming events
    bgsrv = rpyc.BgServingThread(conn)
    mon = conn.root.Monitor(print_time)

def print_time(name):
    print(f'{name} - {datetime.datetime.now()}')

def from_crontab(cron):
    values = cron.split(' ')
    return {
        'minute': values[0],
        'hour': values[1],
        'day': values[2],
        'month': values[3],
        'day_of_week': values[4],
    }


@app.post('/add-job')
async def add_job(job_id: str = Body(...), cron: str = Body(...)):
    """添加job"""
    mon.add_job(id=job_id, args=(job_id, ), trigger='cron',  **from_crontab(cron))
    return {"msg": "success!"}

@app.post('/remove-job')
async def remove_job(job_id: str = Body(..., embed=True)):
    """移除job"""
    mon.remove_job(job_id)
    return {"msg": "success!"}

if __name__ == '__main__':
    uvicorn.run(app, host='127.0.0.1', port=5566)
rpc.py
import rpyc
from rpyc.utils.server import ThreadedServer
from apscheduler.schedulers.background import BackgroundScheduler

class SchedulerService(rpyc.Service):

    class exposed_Monitor(object):   # exposing names is not limited to methods :)
        def __init__(self, callback):
            # callback方法是server.py的回调方法,假如想添加不同事件函数,建议全部传进来在__init__函数初始化所有
            # 这里需要用rpyc.async_异步加载回调函数
            self.callback = rpyc.async_(callback)

        def exposed_add_job(self, *args, **kwargs):
            print("添加任务:", args, kwargs)
            return scheduler.add_job(self.callback, *args, **kwargs)

        def exposed_pause_job(self, job_id, jobstore=None):
            return scheduler.pause_job(job_id, jobstore)

        def exposed_resume_job(self, job_id, jobstore=None):
            return scheduler.resume_job(job_id, jobstore)

        def exposed_remove_job(self, job_id, jobstore=None):
            scheduler.remove_job(job_id, jobstore)


if __name__ == '__main__':
    scheduler = BackgroundScheduler()
    scheduler.start()
    protocol_config = {'allow_public_attrs': True}
    server = ThreadedServer(SchedulerService, port=12345, protocol_config=protocol_config)
    try:
        server.start()
    except (KeyboardInterrupt, SystemExit):
        pass
    finally:
        scheduler.shutdown()

####2.3总结

rpc负责添加job,执行函数在rpc,serve只做添加调用后关闭,这种方式会导致业务代码需要在rpc在写一遍,看项目进展是在什么状态lou... rpc负责添加job,执行函数在server,这种方式的弊端就是和rpc的链接不能断开(加入需要一个定期执行的任务),只能保持一个长连接状态...

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
Wesley13 Wesley13
3年前
java将前端的json数组字符串转换为列表
记录下在前端通过ajax提交了一个json数组的字符串,在后端如何转换为列表。前端数据转化与请求varcontracts{id:'1',name:'yanggb合同1'},{id:'2',name:'yanggb合同2'},{id:'3',name:'yang
皕杰报表之UUID
​在我们用皕杰报表工具设计填报报表时,如何在新增行里自动增加id呢?能新增整数排序id吗?目前可以在新增行里自动增加id,但只能用uuid函数增加UUID编码,不能新增整数排序id。uuid函数说明:获取一个UUID,可以在填报表中用来创建数据ID语法:uuid()或uuid(sep)参数说明:sep布尔值,生成的uuid中是否包含分隔符'',缺省为
待兔 待兔
4个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Stella981 Stella981
3年前
Opencv中Mat矩阵相乘——点乘、dot、mul运算详解
Opencv中Mat矩阵相乘——点乘、dot、mul运算详解2016年09月02日00:00:36 \牧野(https://www.oschina.net/action/GoToLink?urlhttps%3A%2F%2Fme.csdn.net%2Fdcrmg) 阅读数:59593
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Wesley13 Wesley13
3年前
00:Java简单了解
浅谈Java之概述Java是SUN(StanfordUniversityNetwork),斯坦福大学网络公司)1995年推出的一门高级编程语言。Java是一种面向Internet的编程语言。随着Java技术在web方面的不断成熟,已经成为Web应用程序的首选开发语言。Java是简单易学,完全面向对象,安全可靠,与平台无关的编程语言。
Wesley13 Wesley13
3年前
MySQL部分从库上面因为大量的临时表tmp_table造成慢查询
背景描述Time:20190124T00:08:14.70572408:00User@Host:@Id:Schema:sentrymetaLast_errno:0Killed:0Query_time:0.315758Lock_
Python进阶者 Python进阶者
10个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这