Django-Channels的使用

后端
观看:0
文章标签:#django#channels#celery
最后更新:2025年08月08 15:35
用来向前端主动通信,比如前端提交了一个异步任务,这个任务需要一段时间才能完成,在这个任务完成以后,需要django主动向前端发送已经完成任务的通知,无需前端轮询

通过建立websocket,来监控任务的完成状态

流程介绍:

1、前端提交任务请求

2、后端接收到请求以后,使用celery进行任务排队和状态变换(依赖于配置内的进程数量配置),同时把任务ID返回给前端

3、前端拿着任务ID,和后端建立websocket通信

4、当任务状态变化的时候,通知前端(比如从-排队中-转变为-已开始-

5、结束通信(前端退出页面主动结束、后端任务状态为success的时候主动结束)

celery的任务ID可以查询当前的任务状态

关于celery和redis的配置请看:

写配置

settings.py内增加channel的配置:

python
# 指定 ASGI 应用  ASGI 来处理 WebSocket、SSE、异步后台任务通知等
ASGI_APPLICATION = 'aiochemserver.asgi.application'

CHANNEL_LAYERS = {
    'default': {
        'BACKEND': 'channels_redis.core.RedisChannelLayer',
        'CONFIG': {
            'hosts': [('127.0.0.1', 6379)],
        },
    },
}
INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'appchemnote',
    'appuser',
    # ——— 把channels写入进来 ———
    'channels',
    'django.contrib.postgres',

]

建立路由文件

settings同级目录建立routing.py

python
# websocket的路由
from django.urls import path
from aiochemserver.consumers import TaskProgressConsumer

# 通用任务查询接口
websocket_urlpatterns = [
    path('ws/task_progress/', TaskProgressConsumer.as_asgi()),
]

建立用户组的信息

settings同级目录建立consumers.py

python
# aiochemserver/consumers.py
from channels.generic.websocket import AsyncJsonWebsocketConsumer
from asgiref.sync import sync_to_async
from celery.result import AsyncResult

# 异步安全地从 Celery 后端读取 state 和 info.progress
@sync_to_async
def fetch_task_meta(task_id: str):
    result = AsyncResult(task_id)
    state = result.state
    info = result.info or {}
    progress = info.get("progress", 0) if isinstance(info, dict) else 0
    return state, progress

class TaskProgressConsumer(AsyncJsonWebsocketConsumer):
    async def connect(self):
        # 接受连接
        await self.accept()

    async def disconnect(self, close_code):
        # 断开时退出所有 group
        if hasattr(self, "task_ids"):
            for tid in self.task_ids:
                await self.channel_layer.group_discard(f"task_{tid}", self.channel_name)

    async def receive_json(self, content, **kwargs):
        """
        content: { "task_ids": [...] }
        """
        self.task_ids = content.get("task_ids", [])

        # 先推送一次当前所有任务的“初始状态”
        for task_id in self.task_ids:
            # 加入对应组,方便后续 group_send
            await self.channel_layer.group_add(f"task_{task_id}", self.channel_name)
            # 拉取当前状态
            state, progress = await fetch_task_meta(task_id)
            # 直接发给当前 WebSocket 客户端
            await self.send_json({
                "task_id": task_id,
                "status": state,
                "progress": progress,
            })

    async def task_update(self, event):
        """
        当后端做了
          await channel_layer.group_send(
            f"task_{task_id}",
            {
              "type": "task_update",
              "task_id": task_id,
              "status": new_state,
              "progress": new_progress,
            }
          )
        时,就会执行这里,推送给前端
        """
        await self.send_json({
            "task_id": event["task_id"],
            "status": event["status"],
            "progress": event.get("progress", 0),
        })

配置asgi.py

在这两个文件里面转路由,由于channels属于asgi,所以以后启动服务都需要使用支持asgi的服务

按照下面的写法,普通的http请求就走普通服务,channel服务就会走websocket

python
import os, django
from django.core.asgi import get_asgi_application
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStack
import aiochemserver.routing  # 确保这个模块里有 websocket_urlpatterns

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'aiochemserver.settings')
django.setup()

application = ProtocolTypeRouter({
    "http": get_asgi_application(),
    "websocket": AuthMiddlewareStack(
        URLRouter(aiochemserver.routing.websocket_urlpatterns)
    ),
})

wsgi.py内还正常写就行:

普通的url服务,依旧走wsgi

python
import os

from django.core.wsgi import get_wsgi_application

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'aiochemserver.settings')

application = get_wsgi_application()

封装调用函数

在utils下新建一个task_notify.py

用来复用状态查询的逻辑:

python
from asgiref.sync import async_to_sync
from channels.layers import get_channel_layer


def notify_task_state(task_id, status, progress=None):
    """
    统一封装任务状态通知函数,支持进度信息推送给 WebSocket 客户端。
    """
    channel_layer = get_channel_layer()
    group_name = f"task_{task_id}"

    message = {
        "type": "task_update",
        "task_id": task_id,
        "status": status,
    }
    if progress is not None:
        message["progress"] = progress

    async_to_sync(channel_layer.group_send)(group_name, message)

修改异步函数:

这里写一个测试,在tasks.py中的调用函数内,

需要设置bind=True

python
from utils.task_notify import notify_task_state

@shared_task(bind=True, soft_time_limit=600)  # 10分钟超时
def save_note(self, data, note_id, type):
    task_id = self.request.id
    # 任务开始
    self.update_state(state="STARTED", meta={"progress": 0})
    notify_task_state(task_id, 'STARTED', progress=0)

self.update_state这个是通知本身的进程和数据库,写入celery的任务状态

notify_task_state是通知前端任务现在的状态,注意区分

关于view的解释:

对于一个异步任务,上面已经介绍过,是前端发起任务请求,后端再执行

python
from appchemnote.tasks import save_note

class ReceiveNote(APIView):
    def post(self, request, *args, **kwargs):
        async_res = save_note.delay(request_data, note.note_id, 'create')
        return JsonResponse({
                'message': '笔记存储任务提交成功',
                'result': {
                    'rd_task_id': async_res.id
                }
            }, status=status.HTTP_200_OK)

解释一下整个流程:

1、在一个任务被前端提交进来的时候,后端会把任务调度交给celery,并且立即给前端一个task_id。

2、这时候,这个任务在后端还没有完成,所以在有需求的时候,前端就通过task_id来查询任务状态,当这个任务不是success的时候,拿这个task_id和Django建立websocket通信

任务在后端的进程progress,完全依赖于在tasks的函数内,我们自己定义的任务进度和progress(所以进度条都是后端控制的,不会特别精准)

3、当ws通信建立后,每次后端的状态更新的时候,Django都会通过websocket告诉前端,我的任务状态更新了,这是新的状态数据。

4、前端每次拿到新的数据后,就会把状态展示在页面上,从而实现任务状态实时更新,用户可以感知到的这个需求。

请登录后再发表评论
🔍 快速搜索
文章推荐
基于文本相似性

没有推荐的文章...

文章推荐
化学结构同出现

没有对应的文章...

AioChem © 2025

晋ICP备2025060790号-1