Coverage for backend/django/notifications/views/broadcast_view.py: 100%

22 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2026-05-13 02:47 +0000

1from asgiref.sync import async_to_sync 

2from channels.layers import get_channel_layer, BaseChannelLayer 

3from drf_spectacular.utils import extend_schema 

4from rest_framework.decorators import api_view, authentication_classes, permission_classes 

5from rest_framework.permissions import IsAuthenticated 

6from rest_framework.response import Response 

7 

8from authentication.custom_drf_authentication import DaprApiTokenAuthentication 

9from common.models.notifications import NotificationEvent 

10from common.models.notifications.payloads import TaskCompletedPayload 

11 

12 

13@extend_schema(exclude=True) 

14@api_view(['POST']) 

15@authentication_classes([DaprApiTokenAuthentication]) 

16@permission_classes([IsAuthenticated]) 

17def broadcast_message_to_user(request) -> Response: 

18 """Send a message to all consumers linked to a user's flowsheet""" 

19 

20 notification_event = NotificationEvent.model_validate(request.data) 

21 notification_envelope = notification_event.data 

22 user_payloads = notification_envelope.messages 

23 

24 channel_layer: BaseChannelLayer = get_channel_layer() 

25 broadcast_group = f"{notification_envelope.flowsheet_id}" 

26 

27 for payload in user_payloads: 

28 async_to_sync(channel_layer.group_send)(broadcast_group, 

29 { 

30 "type": "flowsheet.message", 

31 # This invokes the flowsheet_message method in the handling consumer 

32 "data": payload.model_dump_json() 

33 }) 

34 

35 return Response(status=200)