Coverage for backend/notifications/views/broadcast_view.py: 100%

20 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-11-06 23:27 +0000

1from asgiref.sync import async_to_sync 

2from channels.layers import get_channel_layer, BaseChannelLayer 

3from drf_spectacular.utils import extend_schema 

4from rest_framework.decorators import api_view, authentication_classes 

5from rest_framework.response import Response 

6 

7from authentication.custom_drf_authentication import DaprApiTokenAuthentication 

8from common.models.notifications import NotificationEvent 

9from common.models.notifications.payloads import TaskCompletedPayload 

10 

11 

12@extend_schema(exclude=True) 

13@api_view(['POST']) 

14@authentication_classes([DaprApiTokenAuthentication]) 

15def broadcast_message_to_user(request) -> Response: 

16 """Send a message to all consumers linked to a user's flowsheet""" 

17 

18 notification_event = NotificationEvent.model_validate(request.data) 

19 notification_envelope = notification_event.data 

20 user_payloads = notification_envelope.messages 

21 

22 channel_layer: BaseChannelLayer = get_channel_layer() 

23 broadcast_group = f"{notification_envelope.flowsheet_id}" 

24 

25 for payload in user_payloads: 

26 async_to_sync(channel_layer.group_send)(broadcast_group, 

27 { 

28 "type": "flowsheet.message", 

29 # This invokes the flowsheet_message method in the handling consumer 

30 "data": payload.model_dump_json() 

31 }) 

32 

33 return Response(status=200)