Coverage for backend/django/core/auxiliary/managers/TaskManager.py: 86%

20 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2026-05-13 02:47 +0000

1import logging 

2 

3from common.models.notifications.payloads import NotificationServiceMessageType 

4from common.services import messaging 

5from core.auxiliary.enums.generalEnums import TaskStatus 

6from core.auxiliary.models.Task import Task 

7from core.auxiliary.serializers import TaskSerializer 

8from django.db import transaction 

9 

10logger = logging.getLogger(__name__) 

11 

12 

13def handle_task_running_event(task_id: int): 

14 """Mark a pending task as running and broadcast the status change.""" 

15 with transaction.atomic(): 

16 try: 

17 task = Task.objects.select_for_update().get(id=task_id) 

18 except Task.DoesNotExist: 

19 logger.info("Discarding task-running event for missing task_id=%s.", task_id) 

20 return 

21 

22 # Ignore task status update if task is not pending 

23 # to avoid overwriting status of tasks that have already completed or failed 

24 if task.status != TaskStatus.Pending: 

25 return 

26 

27 task.status = TaskStatus.Running 

28 task.save() 

29 

30 # Notify users that the task has started running 

31 messaging.send_flowsheet_notification_message( 

32 task.flowsheet_id, 

33 TaskSerializer(task).data, 

34 NotificationServiceMessageType.TASK_UPDATED 

35 )