Coverage for backend/common/models/notifications/payloads.py: 100%

16 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-11-06 23:27 +0000

1from enum import Enum 

2from pydantic import BaseModel 

3from common.models import CloudEventsPayload 

4 

5 

6class NotificationServiceMessageType(str, Enum): 

7 TASK_COMPLETED = "task/completed" 

8 TASK_UPDATED = "task/updated" 

9 TASK_CANCELLED = "task/cancelled" 

10 

11class NotificationServiceMessage(BaseModel): 

12 message_type: NotificationServiceMessageType 

13 data: dict 

14 

15# For use in telling the notification (Django Channels) service to send a notification 

16class NotificationServiceEnvelope(BaseModel): 

17 flowsheet_id: int 

18 messages: list[NotificationServiceMessage] 

19 

20NotificationEvent = CloudEventsPayload[NotificationServiceEnvelope] 

21 

22# The payload the user is sent by the notification service 

23class TaskCompletedPayload(BaseModel): 

24 task_id: int