Coverage for backend/common/src/common/models/notifications/payloads.py: 100%

28 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2026-05-13 02:47 +0000

1from enum import Enum 

2from typing import Literal, Optional 

3from pydantic import BaseModel 

4from common.models import CloudEventsPayload 

5from ahuora_builder_types.payloads import BuildStateRequestContext 

6from ahuora_builder_types.unit_model_schema import SolvedPropertyValueSchema 

7 

8 

9class NotificationServiceMessageType(str, Enum): 

10 TASK_COMPLETED = "task/completed" 

11 TASK_UPDATED = "task/updated" 

12 TASK_CANCELLED = "task/cancelled" 

13 TASK_CANCELLING = "task/cancelling" 

14 TASK_CHILDREN_CANCELLED = "task/children-cancelled" 

15 BUILD_STATE_COMPLETED = "build-state/completed" 

16 

17class NotificationServiceMessage(BaseModel): 

18 message_type: NotificationServiceMessageType 

19 data: dict 

20 

21# For use in telling the notification (Django Channels) service to send a notification 

22class NotificationServiceEnvelope(BaseModel): 

23 flowsheet_id: int 

24 messages: list[NotificationServiceMessage] 

25 

26NotificationEvent = CloudEventsPayload[NotificationServiceEnvelope] 

27 

28# The payload the user is sent by the notification service 

29class TaskCompletedPayload(BaseModel): 

30 task_id: int 

31 

32 

33class BuildStateCompletedPayload(BaseModel): 

34 context: BuildStateRequestContext 

35 status: Literal["success", "error"] 

36 properties: Optional[list[SolvedPropertyValueSchema]] = None 

37 error: Optional[str] = None 

38 failure_kind: Optional[Literal["idaes-error", "delivery-failure"]] = None