Coverage for backend/CoreRoot/asgi.py: 100%

23 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-11-06 23:27 +0000

1""" 

2ASGI config for CoreRoot project. 

3 

4It exposes the ASGI callable as a module-level variable named ``application``. 

5 

6For more information on this file, see 

7https://docs.djangoproject.com/en/4.2/howto/deployment/asgi/ 

8""" 

9 

10import os 

11from urllib.parse import parse_qs 

12 

13from django.core.asgi import get_asgi_application 

14from channels.middleware import BaseMiddleware 

15from channels.routing import ProtocolTypeRouter, URLRouter 

16from django.urls import path 

17from opentelemetry.instrumentation.asgi import OpenTelemetryMiddleware 

18 

19from CoreRoot import settings 

20from CoreRoot.helpers import initialise_tracing 

21 

22os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'CoreRoot.settings') 

23 

24initialise_tracing() 

25 

26# Initialize Django ASGI application early to ensure the AppRegistry 

27# is populated before importing code that may import ORM models. 

28django_asgi_app = get_asgi_application() 

29 

30from notifications.consumers import NotificationsConsumer 

31from authentication.middleware import AsgiAuthHeaderMiddleware 

32 

33class QueryParamsMiddleware(BaseMiddleware): 

34 """Middleware that parses the query string into a dictionary on the scope.""" 

35 

36 async def __call__(self, scope, receive, send): 

37 scope = dict(scope) 

38 scope["query_params"] = parse_qs(scope["query_string"].decode()) 

39 

40 return await super().__call__(scope, receive, send) 

41 

42url_router = URLRouter([ 

43 path("api/notifications/", NotificationsConsumer.as_asgi()), 

44]) 

45 

46inner_middleware = AsgiAuthHeaderMiddleware(url_router) if settings.DEBUG else url_router 

47 

48application = ProtocolTypeRouter({ 

49 "http": django_asgi_app, 

50 "websocket": QueryParamsMiddleware( 

51 inner_middleware 

52 ) 

53}) 

54application = OpenTelemetryMiddleware(application)