Coverage for backend/django/core/auxiliary/models/Task.py: 86%

79 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2026-05-13 02:47 +0000

1"""Task model offering tracking and coordination for asynchronous operations.""" 

2 

3from typing import Optional 

4from django.db import models 

5from django.db.models import F 

6from django.utils import timezone 

7from authentication.user.models import User 

8from core.auxiliary.enums.generalEnums import TaskStatus 

9from core.auxiliary.models import Flowsheet 

10from core.exceptions import DetailedException 

11from core.managers import AccessControlManager 

12 

13 

14class TaskMeta(models.Model): 

15 """Aggregate counts that parent tasks use to monitor child progress.""" 

16 

17 scheduled_tasks = models.PositiveIntegerField(default=0) 

18 failed_tasks = models.PositiveIntegerField(default=0) 

19 successful_tasks = models.PositiveIntegerField(default=0) 

20 cancelled_tasks = models.PositiveIntegerField(default=0) 

21 

22class TaskType(models.TextChoices): 

23 """Enum of supported task categories.""" 

24 

25 ML_TRAINING = 'ML Training' 

26 IDAES_SOLVE = 'Solve' 

27 BUILD_STATE = 'Build State' 

28 CSV_IMPORT_SCENARIO = 'CSV Import Scenario' 

29 CSV_IMPORT_PINCH_UTILITIES = 'CSV Import Pinch Utilities' 

30 

31class Task(models.Model): 

32 """Represents a unit of work for platform computation (e.g. solving, ML), the status of which is observable 

33 and cancellable by users.""" 

34 

35 task_type = models.CharField(max_length=50, choices=TaskType.choices , default=TaskType.IDAES_SOLVE) 

36 creator = models.ForeignKey(User, on_delete=models.CASCADE) 

37 status = models.CharField(choices=TaskStatus.choices , default=TaskStatus.Pending) 

38 flowsheet = models.ForeignKey(Flowsheet, on_delete=models.CASCADE) 

39 start_time = models.DateTimeField(auto_now_add=True) 

40 completed_time = models.DateTimeField(null=True) 

41 error = models.JSONField(null=True) 

42 debug = models.JSONField(null=True) 

43 log = models.TextField(null=True) 

44 parent = models.ForeignKey('Task', on_delete=models.CASCADE, null=True, related_name='children') 

45 metadata = models.OneToOneField(TaskMeta, on_delete=models.CASCADE, null=True, related_name='task') 

46 

47 objects = AccessControlManager() 

48 

49 class Meta: 

50 ordering = ['-start_time'] 

51 

52 @classmethod 

53 def create( 

54 cls, 

55 creator: User | int, 

56 flowsheet: Flowsheet | int, 

57 task_type: TaskType = TaskType.IDAES_SOLVE, 

58 parent: Optional['Task'] = None, 

59 status: TaskStatus = TaskStatus.Pending, 

60 save=False 

61 ): 

62 """Instantiate a task, optionally persisting immediately. 

63 

64 Args: 

65 creator: User creating the task or their primary key. 

66 flowsheet: Flowsheet associated with the task or its primary key. 

67 task_type: Category used to tailor downstream behaviour. 

68 parent: Parent task when building a hierarchy of work. 

69 status: Initial status to set on the task. 

70 save: When true the task is inserted into the database before returning. 

71 

72 Returns: 

73 The newly constructed `Task` instance. 

74 """ 

75 

76 task = Task( 

77 creator=creator, 

78 flowsheet_id=flowsheet, 

79 parent=parent, 

80 status=status, 

81 task_type=task_type 

82 ) 

83 

84 if save: 

85 task.save() 

86 

87 return task 

88 

89 @classmethod 

90 def create_parent_task( 

91 cls, 

92 creator: User | int, 

93 flowsheet_id: int, 

94 task_type: TaskType = TaskType.IDAES_SOLVE, 

95 scheduled_tasks: int = 0, 

96 status: TaskStatus = TaskStatus.Pending 

97 ): 

98 """Create a parent task seeded with metadata describing child workload.""" 

99 parent_task = Task( 

100 creator=creator, 

101 flowsheet_id=flowsheet_id, 

102 status=status, 

103 task_type=task_type, 

104 ) 

105 parent_task.metadata = TaskMeta.objects.create(scheduled_tasks=scheduled_tasks) 

106 parent_task.save() 

107 

108 return parent_task 

109 

110 def update_status_from_child(self, child_task: 'Task') -> bool: 

111 """Update parent completion state in response to a child transition. 

112 

113 If a user has already requested cancellation of the parent, preserve the 

114 ``cancelling`` status even when the final child settles normally. The 

115 caller will then resolve the parent to ``cancelled`` once all in-flight 

116 children have finished. 

117 

118 Returns ``True`` only when this call transitions the parent itself into 

119 ``completed``. Callers use that signal to trigger parent-level follow-up 

120 work such as summary notifications. 

121 """ 

122 

123 metadata = self.metadata 

124 # Once a user has explicitly cancelled the parent, the final child 

125 # settling should only update counters; parent finalisation is handled by 

126 # `mark_parent_cancelled()` after all in-flight children finish. 

127 parent_was_cancelling = self.status == TaskStatus.Cancelling 

128 

129 if child_task.status == TaskStatus.Failed: 129 ↛ 131line 129 didn't jump to line 131 because the condition on line 129 was never true

130 # Use F expressions to avoid race conditions when multiple children update concurrently. 

131 metadata.failed_tasks = F('failed_tasks') + 1 

132 metadata.save(update_fields=['failed_tasks']) 

133 elif child_task.status == TaskStatus.Completed: 133 ↛ 136line 133 didn't jump to line 136 because the condition on line 133 was always true

134 metadata.successful_tasks = F('successful_tasks') + 1 

135 metadata.save(update_fields=['successful_tasks']) 

136 elif child_task.status == TaskStatus.Cancelled: 

137 metadata.cancelled_tasks = F('cancelled_tasks') + 1 

138 metadata.save(update_fields=['cancelled_tasks']) 

139 

140 # Refresh to obtain concrete integer values for comparison. 

141 metadata.refresh_from_db(fields=['failed_tasks', 'successful_tasks', 'cancelled_tasks']) 

142 

143 if ( 

144 not parent_was_cancelling 

145 and metadata.scheduled_tasks 

146 == (metadata.successful_tasks + metadata.failed_tasks + metadata.cancelled_tasks) 

147 ): 

148 self.status = TaskStatus.Completed 

149 self.completed_time = timezone.now() 

150 self.save(update_fields=['status', 'completed_time']) 

151 return True 

152 

153 return False 

154 

155 @classmethod 

156 def increment_cancelled_children_for_parent(cls, parent_task_id: int, count: int = 1): 

157 """Increment a parent task's cancelled-child counter in one database update. 

158 

159 When count is greater than 1, multiple child cancellations are batched into 

160 a single update. Returns early without touching the database when count is 

161 zero or negative. 

162 """ 

163 if count <= 0: 163 ↛ 164line 163 didn't jump to line 164 because the condition on line 163 was never true

164 return 

165 

166 TaskMeta.objects.filter(task__id=parent_task_id).update( 

167 cancelled_tasks=F('cancelled_tasks') + count 

168 ) 

169 

170 def set_failure_with_exception(self, exception: Exception | DetailedException, save: bool = False): 

171 """Record failure details on the task, preserving the stack trace.""" 

172 self.status = TaskStatus.Failed 

173 

174 detailed_exception = (exception 

175 if isinstance(exception, DetailedException) 

176 else DetailedException(exception, source="") 

177 ) 

178 

179 self.error = { 

180 "message": detailed_exception.message, 

181 "cause": detailed_exception.source, 

182 "traceback": detailed_exception.traceback 

183 } 

184 

185 if self.completed_time is None: 185 ↛ 189line 185 didn't jump to line 189 because the condition on line 185 was always true

186 # ensure completed timestamp reflects when failure was logged 

187 self.completed_time = timezone.now() 

188 

189 if save: 189 ↛ exitline 189 didn't return from function 'set_failure_with_exception' because the condition on line 189 was always true

190 self.save()