Coverage for backend/django/notifications/views/broadcast_view.py: 100%
22 statements
« prev ^ index » next coverage.py v7.10.7, created at 2026-05-13 02:47 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2026-05-13 02:47 +0000
1from asgiref.sync import async_to_sync
2from channels.layers import get_channel_layer, BaseChannelLayer
3from drf_spectacular.utils import extend_schema
4from rest_framework.decorators import api_view, authentication_classes, permission_classes
5from rest_framework.permissions import IsAuthenticated
6from rest_framework.response import Response
8from authentication.custom_drf_authentication import DaprApiTokenAuthentication
9from common.models.notifications import NotificationEvent
10from common.models.notifications.payloads import TaskCompletedPayload
13@extend_schema(exclude=True)
14@api_view(['POST'])
15@authentication_classes([DaprApiTokenAuthentication])
16@permission_classes([IsAuthenticated])
17def broadcast_message_to_user(request) -> Response:
18 """Send a message to all consumers linked to a user's flowsheet"""
20 notification_event = NotificationEvent.model_validate(request.data)
21 notification_envelope = notification_event.data
22 user_payloads = notification_envelope.messages
24 channel_layer: BaseChannelLayer = get_channel_layer()
25 broadcast_group = f"{notification_envelope.flowsheet_id}"
27 for payload in user_payloads:
28 async_to_sync(channel_layer.group_send)(broadcast_group,
29 {
30 "type": "flowsheet.message",
31 # This invokes the flowsheet_message method in the handling consumer
32 "data": payload.model_dump_json()
33 })
35 return Response(status=200)