Coverage for backend/common/src/common/models/notifications/payloads.py: 100%
28 statements
« prev ^ index » next coverage.py v7.10.7, created at 2026-05-13 02:47 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2026-05-13 02:47 +0000
1from enum import Enum
2from typing import Literal, Optional
3from pydantic import BaseModel
4from common.models import CloudEventsPayload
5from ahuora_builder_types.payloads import BuildStateRequestContext
6from ahuora_builder_types.unit_model_schema import SolvedPropertyValueSchema
9class NotificationServiceMessageType(str, Enum):
10 TASK_COMPLETED = "task/completed"
11 TASK_UPDATED = "task/updated"
12 TASK_CANCELLED = "task/cancelled"
13 TASK_CANCELLING = "task/cancelling"
14 TASK_CHILDREN_CANCELLED = "task/children-cancelled"
15 BUILD_STATE_COMPLETED = "build-state/completed"
17class NotificationServiceMessage(BaseModel):
18 message_type: NotificationServiceMessageType
19 data: dict
21# For use in telling the notification (Django Channels) service to send a notification
22class NotificationServiceEnvelope(BaseModel):
23 flowsheet_id: int
24 messages: list[NotificationServiceMessage]
26NotificationEvent = CloudEventsPayload[NotificationServiceEnvelope]
28# The payload the user is sent by the notification service
29class TaskCompletedPayload(BaseModel):
30 task_id: int
33class BuildStateCompletedPayload(BaseModel):
34 context: BuildStateRequestContext
35 status: Literal["success", "error"]
36 properties: Optional[list[SolvedPropertyValueSchema]] = None
37 error: Optional[str] = None
38 failure_kind: Optional[Literal["idaes-error", "delivery-failure"]] = None