Coverage for backend/notifications/views/broadcast_view.py: 100%
20 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-11-06 23:27 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-11-06 23:27 +0000
1from asgiref.sync import async_to_sync
2from channels.layers import get_channel_layer, BaseChannelLayer
3from drf_spectacular.utils import extend_schema
4from rest_framework.decorators import api_view, authentication_classes
5from rest_framework.response import Response
7from authentication.custom_drf_authentication import DaprApiTokenAuthentication
8from common.models.notifications import NotificationEvent
9from common.models.notifications.payloads import TaskCompletedPayload
12@extend_schema(exclude=True)
13@api_view(['POST'])
14@authentication_classes([DaprApiTokenAuthentication])
15def broadcast_message_to_user(request) -> Response:
16 """Send a message to all consumers linked to a user's flowsheet"""
18 notification_event = NotificationEvent.model_validate(request.data)
19 notification_envelope = notification_event.data
20 user_payloads = notification_envelope.messages
22 channel_layer: BaseChannelLayer = get_channel_layer()
23 broadcast_group = f"{notification_envelope.flowsheet_id}"
25 for payload in user_payloads:
26 async_to_sync(channel_layer.group_send)(broadcast_group,
27 {
28 "type": "flowsheet.message",
29 # This invokes the flowsheet_message method in the handling consumer
30 "data": payload.model_dump_json()
31 })
33 return Response(status=200)