Coverage for backend/CoreRoot/asgi.py: 100%
23 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-11-06 23:27 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-11-06 23:27 +0000
1"""
2ASGI config for CoreRoot project.
4It exposes the ASGI callable as a module-level variable named ``application``.
6For more information on this file, see
7https://docs.djangoproject.com/en/4.2/howto/deployment/asgi/
8"""
10import os
11from urllib.parse import parse_qs
13from django.core.asgi import get_asgi_application
14from channels.middleware import BaseMiddleware
15from channels.routing import ProtocolTypeRouter, URLRouter
16from django.urls import path
17from opentelemetry.instrumentation.asgi import OpenTelemetryMiddleware
19from CoreRoot import settings
20from CoreRoot.helpers import initialise_tracing
22os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'CoreRoot.settings')
24initialise_tracing()
26# Initialize Django ASGI application early to ensure the AppRegistry
27# is populated before importing code that may import ORM models.
28django_asgi_app = get_asgi_application()
30from notifications.consumers import NotificationsConsumer
31from authentication.middleware import AsgiAuthHeaderMiddleware
33class QueryParamsMiddleware(BaseMiddleware):
34 """Middleware that parses the query string into a dictionary on the scope."""
36 async def __call__(self, scope, receive, send):
37 scope = dict(scope)
38 scope["query_params"] = parse_qs(scope["query_string"].decode())
40 return await super().__call__(scope, receive, send)
42url_router = URLRouter([
43 path("api/notifications/", NotificationsConsumer.as_asgi()),
44])
46inner_middleware = AsgiAuthHeaderMiddleware(url_router) if settings.DEBUG else url_router
48application = ProtocolTypeRouter({
49 "http": django_asgi_app,
50 "websocket": QueryParamsMiddleware(
51 inner_middleware
52 )
53})
54application = OpenTelemetryMiddleware(application)