tasks.py 添加任务from .celery import app@app.taskdef add(n1,n2):res = n1+n2print('n1+n2 = %s' % res)return res@app.taskdef low(n1,n2):res = n1-n2print('n1-n2 = %s' % res)return res
add_task.py 添加立即、延迟任务from celery_task import tasks# delay:添加立即任务# apply_async : 添加延迟任务# eta : 执行的utc时间# 添加立即执行任务t1 = tasks.add.delay(10, 20)t2 = tasks.low.delay(100, 50)print(t1.id)# 添加延迟任务from celery_package.tasks import jumpfrom datetime import datetime,timedelta# 秒def eta_second(second):ctime = datetime.now()# 当前时间utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())# 当前UTC时间time_delay = timedelta(seconds=second)# 秒return utc_ctime + time_delay# 当前时间+往后延迟的秒# 天def eta_days(days):ctime = datetime.now()# 当前时间utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())# 当前UTC时间time_delay = timedelta(days=days)# 天return utc_ctime + time_delay# 当前时间+往后延迟的天jump.apply_async(args=(20,5), eta=eta_second(10))# 10秒后执行jump.apply_async(args=(20,5), eta=eta_days(1))# 1天后执行
get_result.py 获取结果from celery_task.celery import appfrom celery.result import AsyncResultid = '21325a40-9d32-44b5-a701-9a31cc3c74b5'if __name__ == '__main__':async = AsyncResult(id=id, app=app)if async.successful():result = async.get()print(result)elif async.failed():print('任务失败')elif async.status == 'PENDING':print('任务等待中被执行')elif async.status == 'RETRY':print('任务异常后正在重试')elif async.status == 'STARTED':print('任务已经开始被执行')
九、高级使用celery.py 定时任务配置(循环的)特点:
添加任务的终端关闭之后,停止添加
celery服务端关闭后,把关闭之后未执行的任务都执行一遍,然后继续接收任务
# 1)创建app + 任务# 2)启动celery(app)服务:# 注):-A表示相对路径,所以一定先进入celery_task所在包-l 表示打印到日志 info 级别# 非windows# 命令:celery worker -A celery_task -l info# windows:# pip3 install eventlet# celery worker -A celery_task -l info -P eventlet# 3)添加任务:自动添加任务,所以要启动一个添加任务的服务# 命令:celery beat -A celery_task -l info# 4)获取结果from celery import Celery# 无密码broker = 'redis://127.0.0.1:6379/1'backend = 'redis://127.0.0.1:6379/2'# 有密码:broker = 'redis://:123@127.0.0.1:6379/1'backend = 'redis://:123@127.0.0.1:6379/2'app = Celery(broker=broker, backend=backend, include=['celery_task.tasks'])# 时区app.conf.timezone = 'Asia/Shanghai'# 是否使用UTCapp.conf.enable_utc = False# 自动任务的定时配置from datetime import timedeltafrom celery.schedules import crontabapp.conf.beat_schedule = {# 定时任务名字'fall_task': {'task': 'celery_task.tasks.fall','args':(30,20),'schedule': timedelta(seconds=3),# 3秒后执行# 'schedule': crontab(hour=8, day_of_week=1),# 每周一早八点}}'''fall_task:任务名自定义task:任务来源args:任务参数schedule:定时时间''''schedule': crontab(hour=8, day_of_week=1),# 每周一早八点'''minute : 分钟hour :小时day_of_week :礼拜day_of_month:月month_of_year:年'''
tasks.pyfrom .celery import app@app.taskdef fall(n1,n2):res = n1/n2print('n1 /n2 = %s' % res)return res
get_result.pyfrom celery_task.celery import appfrom celery.result import AsyncResultid = '21325a40-9d32-44b5-a701-9a31cc3c74b5'if __name__ == '__main__':async = AsyncResult(id=id, app=app)if async.successful():result = async.get()print(result)elif async.failed():print('任务失败')elif async.status == 'PENDING':print('任务等待中被执行')elif async.status == 'RETRY':print('任务异常后正在重试')elif async.status == 'STARTED':print('任务已经开始被执行')
十、django中使用(更新轮播图案例)最终达到的效果:根据定时任务来更新redis中的缓存 。用户获取资源都是从redis缓存中获取 。避免了数据库的压力
redis的配置dev.py
# 缓存redis数据库配置CACHES = {"default": {"BACKEND": "django_redis.cache.RedisCache","LOCATION": "redis://127.0.0.1:6379/10","OPTIONS": {"CLIENT_CLASS": "django_redis.client.DefaultClient","CONNECTION_POOL_KWARGS": {"max_connections": 100}, # 同时的并发量"DECODE_RESPONSES": True,"PASSword": "123",}}}
接口缓存"""1)什么是接口的后台缓存 前台访问后台接口,后台会优先从缓存(内存)中查找接口数据如果有数据,直接对前台响应缓存数据如果没有数据,与(MySQL)数据库交互,得到数据,对前台响应,同时将数据进行缓存,以备下次使用了解:前台缓存 - 前台在请求到接口数据后,在前台建立缓存,再发送同样请求时,发现前台缓存有数据,就不再对后台做请求了 2)什么的接口会进行接口缓存 i)接口会被大量访问:比如主页中的接口,几乎所有人都会访问,而且会重复访问 ii)在一定时间内数据不会变化(或数据不变化)的接口 iii)接口数据的时效性不是特别强(数据库数据发生变化了,不是立即同步给前台,验后时间同步给前台也没事) 注:理论上所有接口都可以建立缓存,只要数据库与缓存数据同步及时 3)如何实现接口缓存:主页轮播图接口"""
推荐阅读
- 调度器是怎么处理核上任务分配的?
- JavaScript的宏任务和微任务
- 征途手机版运镖任务每日最多可以接多少次答案
- 使用 PDF Mix Tool 执行常见的 PDF 编辑任务
- 研究生 学生代表大会的主要任务是什么?
- 使用时间轮实现“延时任务”
- spacex载人龙飞船发射 SpaceX龙飞船首次商业载人航天任务
- 异步文件通道Java NIO你需要了解多少,来看看这篇文章
- 设计一个高效的定时任务系统
- 当宇航员在国际空间站外执行任务时 国际空间站在大气层外吗