Coverage for backend/core/auxiliary/models/Task.py: 80%

64 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-11-06 23:27 +0000

1"""Task model offering tracking and coordination for asynchronous operations.""" 

2 

3from typing import Optional 

4from django.db import models 

5from django.db.models import F 

6from django.utils import timezone 

7from authentication.user.models import User 

8from core.auxiliary.enums.generalEnums import TaskStatus 

9from core.auxiliary.models import Flowsheet 

10from core.exceptions import DetailedException 

11from core.managers import AccessControlManager 

12 

13 

14class TaskMeta(models.Model): 

15 """Aggregate counts that parent tasks use to monitor child progress.""" 

16 

17 scheduled_tasks = models.PositiveIntegerField(default=0) 

18 failed_tasks = models.PositiveIntegerField(default=0) 

19 successful_tasks = models.PositiveIntegerField(default=0) 

20 

21class TaskType(models.TextChoices): 

22 """Enum of supported task categories.""" 

23 

24 ML_TRAINING = 'ML Training' 

25 IDAES_SOLVE = 'Solve' 

26 

27class Task(models.Model): 

28 """Represents a unit of work for platform computation (e.g. solving, ML), the status of which is observable 

29 and cancellable by users.""" 

30 

31 task_type = models.CharField(max_length=50, choices=TaskType.choices , default=TaskType.IDAES_SOLVE) 

32 creator = models.ForeignKey(User, on_delete=models.CASCADE) 

33 status = models.CharField(choices=TaskStatus.choices , default=TaskStatus.Pending) 

34 flowsheet = models.ForeignKey(Flowsheet, on_delete=models.CASCADE) 

35 start_time = models.DateTimeField(auto_now_add=True) 

36 completed_time = models.DateTimeField(null=True) 

37 error = models.JSONField(null=True) 

38 debug = models.JSONField(null=True) 

39 log = models.TextField(null=True) 

40 parent = models.ForeignKey('Task', on_delete=models.CASCADE, null=True, related_name='children') 

41 metadata = models.OneToOneField(TaskMeta, on_delete=models.CASCADE, null=True, related_name='task') 

42 

43 objects = AccessControlManager() 

44 

45 class Meta: 

46 ordering = ['-start_time'] 

47 

48 @classmethod 

49 def create( 

50 cls, 

51 creator: User | int, 

52 flowsheet: Flowsheet | int, 

53 task_type: TaskType = TaskType.IDAES_SOLVE, 

54 parent: Optional['Task'] = None, 

55 status: TaskStatus = TaskStatus.Pending, 

56 save=False 

57 ): 

58 """Instantiate a task, optionally persisting immediately. 

59 

60 Args: 

61 creator: User creating the task or their primary key. 

62 flowsheet: Flowsheet associated with the task or its primary key. 

63 task_type: Category used to tailor downstream behaviour. 

64 parent: Parent task when building a hierarchy of work. 

65 status: Initial status to set on the task. 

66 save: When true the task is inserted into the database before returning. 

67 

68 Returns: 

69 The newly constructed `Task` instance. 

70 """ 

71 

72 task = Task( 

73 creator=creator, 

74 flowsheet_id=flowsheet, 

75 parent=parent, 

76 status=status, 

77 task_type=task_type 

78 ) 

79 

80 if save: 

81 task.save() 

82 

83 return task 

84 

85 @classmethod 

86 def create_parent_task( 

87 cls, 

88 creator: User | int, 

89 flowsheet_id: int, 

90 task_type: TaskType = TaskType.IDAES_SOLVE, 

91 scheduled_tasks: int = 0, 

92 status: TaskStatus = TaskStatus.Pending 

93 ): 

94 """Create a parent task seeded with metadata describing child workload.""" 

95 parent_task = Task( 

96 creator=creator, 

97 flowsheet_id=flowsheet_id, 

98 status=status, 

99 task_type=task_type, 

100 ) 

101 parent_task.metadata = TaskMeta.objects.create(scheduled_tasks=scheduled_tasks) 

102 parent_task.save() 

103 

104 return parent_task 

105 

106 def update_status_from_child(self, child_task: 'Task'): 

107 """Update parent completion state in response to a child transition.""" 

108 

109 metadata = self.metadata 

110 

111 if child_task.status == TaskStatus.Failed: 111 ↛ 113line 111 didn't jump to line 113 because the condition on line 111 was never true

112 # Use F expressions to avoid race conditions when multiple children update concurrently. 

113 metadata.failed_tasks = F('failed_tasks') + 1 

114 metadata.save(update_fields=['failed_tasks']) 

115 elif child_task.status == TaskStatus.Completed: 115 ↛ 120line 115 didn't jump to line 120 because the condition on line 115 was always true

116 metadata.successful_tasks = F('successful_tasks') + 1 

117 metadata.save(update_fields=['successful_tasks']) 

118 

119 # Refresh to obtain concrete integer values for comparison. 

120 metadata.refresh_from_db(fields=['failed_tasks', 'successful_tasks']) 

121 

122 if metadata.scheduled_tasks == (metadata.successful_tasks + metadata.failed_tasks): 

123 self.status = TaskStatus.Completed 

124 self.completed_time = timezone.now() 

125 self.save(update_fields=['status', 'completed_time']) 

126 

127 def set_failure_with_exception(self, exception: Exception | DetailedException, save: bool = False): 

128 """Record failure details on the task, preserving the stack trace.""" 

129 self.status = TaskStatus.Failed 

130 

131 detailed_exception = (exception 

132 if isinstance(exception, DetailedException) 

133 else DetailedException(exception, source="") 

134 ) 

135 

136 self.error = { 

137 "message": detailed_exception.message, 

138 "cause": detailed_exception.source, 

139 "traceback": detailed_exception.traceback 

140 } 

141 

142 if self.completed_time is None: 

143 # ensure completed timestamp reflects when failure was logged 

144 self.completed_time = timezone.now() 

145 

146 if save: 

147 self.save()