有机化学、化学信息学、生物化学、生物信息学、机器学习、深度学习、药物设计、网站建设关注我!Bilibili
通过建立websocket,来监控任务的完成状态
流程介绍:
1、前端提交任务请求
2、后端接收到请求以后,使用celery进行任务排队和状态变换(依赖于配置内的进程数量配置),同时把任务ID返回给前端
3、前端拿着任务ID,和后端建立websocket通信
4、当任务状态变化的时候,通知前端(比如从-排队中-转变为-已开始-)
5、结束通信(前端退出页面主动结束、后端任务状态为success的时候主动结束)
celery的任务ID可以查询当前的任务状态
关于celery和redis的配置请看:
settings.py内增加channel的配置:
# 指定 ASGI 应用 ASGI 来处理 WebSocket、SSE、异步后台任务通知等
ASGI_APPLICATION = 'aiochemserver.asgi.application'
CHANNEL_LAYERS = {
'default': {
'BACKEND': 'channels_redis.core.RedisChannelLayer',
'CONFIG': {
'hosts': [('127.0.0.1', 6379)],
},
},
}
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'appchemnote',
'appuser',
# ——— 把channels写入进来 ———
'channels',
'django.contrib.postgres',
]settings同级目录建立routing.py
# websocket的路由
from django.urls import path
from aiochemserver.consumers import TaskProgressConsumer
# 通用任务查询接口
websocket_urlpatterns = [
path('ws/task_progress/', TaskProgressConsumer.as_asgi()),
]settings同级目录建立consumers.py
# aiochemserver/consumers.py
from channels.generic.websocket import AsyncJsonWebsocketConsumer
from asgiref.sync import sync_to_async
from celery.result import AsyncResult
# 异步安全地从 Celery 后端读取 state 和 info.progress
@sync_to_async
def fetch_task_meta(task_id: str):
result = AsyncResult(task_id)
state = result.state
info = result.info or {}
progress = info.get("progress", 0) if isinstance(info, dict) else 0
return state, progress
class TaskProgressConsumer(AsyncJsonWebsocketConsumer):
async def connect(self):
# 接受连接
await self.accept()
async def disconnect(self, close_code):
# 断开时退出所有 group
if hasattr(self, "task_ids"):
for tid in self.task_ids:
await self.channel_layer.group_discard(f"task_{tid}", self.channel_name)
async def receive_json(self, content, **kwargs):
"""
content: { "task_ids": [...] }
"""
self.task_ids = content.get("task_ids", [])
# 先推送一次当前所有任务的“初始状态”
for task_id in self.task_ids:
# 加入对应组,方便后续 group_send
await self.channel_layer.group_add(f"task_{task_id}", self.channel_name)
# 拉取当前状态
state, progress = await fetch_task_meta(task_id)
# 直接发给当前 WebSocket 客户端
await self.send_json({
"task_id": task_id,
"status": state,
"progress": progress,
})
async def task_update(self, event):
"""
当后端做了
await channel_layer.group_send(
f"task_{task_id}",
{
"type": "task_update",
"task_id": task_id,
"status": new_state,
"progress": new_progress,
}
)
时,就会执行这里,推送给前端
"""
await self.send_json({
"task_id": event["task_id"],
"status": event["status"],
"progress": event.get("progress", 0),
})在这两个文件里面转路由,由于channels属于asgi,所以以后启动服务都需要使用支持asgi的服务
按照下面的写法,普通的http请求就走普通服务,channel服务就会走websocket
import os, django
from django.core.asgi import get_asgi_application
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStack
import aiochemserver.routing # 确保这个模块里有 websocket_urlpatterns
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'aiochemserver.settings')
django.setup()
application = ProtocolTypeRouter({
"http": get_asgi_application(),
"websocket": AuthMiddlewareStack(
URLRouter(aiochemserver.routing.websocket_urlpatterns)
),
})wsgi.py内还正常写就行:
普通的url服务,依旧走wsgi
import os
from django.core.wsgi import get_wsgi_application
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'aiochemserver.settings')
application = get_wsgi_application()
在utils下新建一个task_notify.py
用来复用状态查询的逻辑:
from asgiref.sync import async_to_sync
from channels.layers import get_channel_layer
def notify_task_state(task_id, status, progress=None):
"""
统一封装任务状态通知函数,支持进度信息推送给 WebSocket 客户端。
"""
channel_layer = get_channel_layer()
group_name = f"task_{task_id}"
message = {
"type": "task_update",
"task_id": task_id,
"status": status,
}
if progress is not None:
message["progress"] = progress
async_to_sync(channel_layer.group_send)(group_name, message)
这里写一个测试,在tasks.py中的调用函数内,
需要设置bind=True
from utils.task_notify import notify_task_state
@shared_task(bind=True, soft_time_limit=600) # 10分钟超时
def save_note(self, data, note_id, type):
task_id = self.request.id
# 任务开始
self.update_state(state="STARTED", meta={"progress": 0})
notify_task_state(task_id, 'STARTED', progress=0)self.update_state这个是通知本身的进程和数据库,写入celery的任务状态
notify_task_state是通知前端任务现在的状态,注意区分
关于view的解释:
对于一个异步任务,上面已经介绍过,是前端发起任务请求,后端再执行
from appchemnote.tasks import save_note
class ReceiveNote(APIView):
def post(self, request, *args, **kwargs):
async_res = save_note.delay(request_data, note.note_id, 'create')
return JsonResponse({
'message': '笔记存储任务提交成功',
'result': {
'rd_task_id': async_res.id
}
}, status=status.HTTP_200_OK)
解释一下整个流程:
1、在一个任务被前端提交进来的时候,后端会把任务调度交给celery,并且立即给前端一个task_id。
2、这时候,这个任务在后端还没有完成,所以在有需求的时候,前端就通过task_id来查询任务状态,当这个任务不是success的时候,拿这个task_id和Django建立websocket通信。
任务在后端的进程progress,完全依赖于在tasks的函数内,我们自己定义的任务进度和progress(所以进度条都是后端控制的,不会特别精准)
3、当ws通信建立后,每次后端的状态更新的时候,Django都会通过websocket告诉前端,我的任务状态更新了,这是新的状态数据。
4、前端每次拿到新的数据后,就会把状态展示在页面上,从而实现任务状态实时更新,用户可以感知到的这个需求。
没有推荐的文章...
没有对应的文章...