Coverage for backend/common/models/notifications/payloads.py: 100%
16 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-11-06 23:27 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-11-06 23:27 +0000
1from enum import Enum
2from pydantic import BaseModel
3from common.models import CloudEventsPayload
6class NotificationServiceMessageType(str, Enum):
7 TASK_COMPLETED = "task/completed"
8 TASK_UPDATED = "task/updated"
9 TASK_CANCELLED = "task/cancelled"
11class NotificationServiceMessage(BaseModel):
12 message_type: NotificationServiceMessageType
13 data: dict
15# For use in telling the notification (Django Channels) service to send a notification
16class NotificationServiceEnvelope(BaseModel):
17 flowsheet_id: int
18 messages: list[NotificationServiceMessage]
20NotificationEvent = CloudEventsPayload[NotificationServiceEnvelope]
22# The payload the user is sent by the notification service
23class TaskCompletedPayload(BaseModel):
24 task_id: int