Coverage for backend/django/core/auxiliary/models/Task.py: 86%
79 statements
« prev ^ index » next coverage.py v7.10.7, created at 2026-05-13 02:47 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2026-05-13 02:47 +0000
1"""Task model offering tracking and coordination for asynchronous operations."""
3from typing import Optional
4from django.db import models
5from django.db.models import F
6from django.utils import timezone
7from authentication.user.models import User
8from core.auxiliary.enums.generalEnums import TaskStatus
9from core.auxiliary.models import Flowsheet
10from core.exceptions import DetailedException
11from core.managers import AccessControlManager
14class TaskMeta(models.Model):
15 """Aggregate counts that parent tasks use to monitor child progress."""
17 scheduled_tasks = models.PositiveIntegerField(default=0)
18 failed_tasks = models.PositiveIntegerField(default=0)
19 successful_tasks = models.PositiveIntegerField(default=0)
20 cancelled_tasks = models.PositiveIntegerField(default=0)
22class TaskType(models.TextChoices):
23 """Enum of supported task categories."""
25 ML_TRAINING = 'ML Training'
26 IDAES_SOLVE = 'Solve'
27 BUILD_STATE = 'Build State'
28 CSV_IMPORT_SCENARIO = 'CSV Import Scenario'
29 CSV_IMPORT_PINCH_UTILITIES = 'CSV Import Pinch Utilities'
31class Task(models.Model):
32 """Represents a unit of work for platform computation (e.g. solving, ML), the status of which is observable
33 and cancellable by users."""
35 task_type = models.CharField(max_length=50, choices=TaskType.choices , default=TaskType.IDAES_SOLVE)
36 creator = models.ForeignKey(User, on_delete=models.CASCADE)
37 status = models.CharField(choices=TaskStatus.choices , default=TaskStatus.Pending)
38 flowsheet = models.ForeignKey(Flowsheet, on_delete=models.CASCADE)
39 start_time = models.DateTimeField(auto_now_add=True)
40 completed_time = models.DateTimeField(null=True)
41 error = models.JSONField(null=True)
42 debug = models.JSONField(null=True)
43 log = models.TextField(null=True)
44 parent = models.ForeignKey('Task', on_delete=models.CASCADE, null=True, related_name='children')
45 metadata = models.OneToOneField(TaskMeta, on_delete=models.CASCADE, null=True, related_name='task')
47 objects = AccessControlManager()
49 class Meta:
50 ordering = ['-start_time']
52 @classmethod
53 def create(
54 cls,
55 creator: User | int,
56 flowsheet: Flowsheet | int,
57 task_type: TaskType = TaskType.IDAES_SOLVE,
58 parent: Optional['Task'] = None,
59 status: TaskStatus = TaskStatus.Pending,
60 save=False
61 ):
62 """Instantiate a task, optionally persisting immediately.
64 Args:
65 creator: User creating the task or their primary key.
66 flowsheet: Flowsheet associated with the task or its primary key.
67 task_type: Category used to tailor downstream behaviour.
68 parent: Parent task when building a hierarchy of work.
69 status: Initial status to set on the task.
70 save: When true the task is inserted into the database before returning.
72 Returns:
73 The newly constructed `Task` instance.
74 """
76 task = Task(
77 creator=creator,
78 flowsheet_id=flowsheet,
79 parent=parent,
80 status=status,
81 task_type=task_type
82 )
84 if save:
85 task.save()
87 return task
89 @classmethod
90 def create_parent_task(
91 cls,
92 creator: User | int,
93 flowsheet_id: int,
94 task_type: TaskType = TaskType.IDAES_SOLVE,
95 scheduled_tasks: int = 0,
96 status: TaskStatus = TaskStatus.Pending
97 ):
98 """Create a parent task seeded with metadata describing child workload."""
99 parent_task = Task(
100 creator=creator,
101 flowsheet_id=flowsheet_id,
102 status=status,
103 task_type=task_type,
104 )
105 parent_task.metadata = TaskMeta.objects.create(scheduled_tasks=scheduled_tasks)
106 parent_task.save()
108 return parent_task
110 def update_status_from_child(self, child_task: 'Task') -> bool:
111 """Update parent completion state in response to a child transition.
113 If a user has already requested cancellation of the parent, preserve the
114 ``cancelling`` status even when the final child settles normally. The
115 caller will then resolve the parent to ``cancelled`` once all in-flight
116 children have finished.
118 Returns ``True`` only when this call transitions the parent itself into
119 ``completed``. Callers use that signal to trigger parent-level follow-up
120 work such as summary notifications.
121 """
123 metadata = self.metadata
124 # Once a user has explicitly cancelled the parent, the final child
125 # settling should only update counters; parent finalisation is handled by
126 # `mark_parent_cancelled()` after all in-flight children finish.
127 parent_was_cancelling = self.status == TaskStatus.Cancelling
129 if child_task.status == TaskStatus.Failed: 129 ↛ 131line 129 didn't jump to line 131 because the condition on line 129 was never true
130 # Use F expressions to avoid race conditions when multiple children update concurrently.
131 metadata.failed_tasks = F('failed_tasks') + 1
132 metadata.save(update_fields=['failed_tasks'])
133 elif child_task.status == TaskStatus.Completed: 133 ↛ 136line 133 didn't jump to line 136 because the condition on line 133 was always true
134 metadata.successful_tasks = F('successful_tasks') + 1
135 metadata.save(update_fields=['successful_tasks'])
136 elif child_task.status == TaskStatus.Cancelled:
137 metadata.cancelled_tasks = F('cancelled_tasks') + 1
138 metadata.save(update_fields=['cancelled_tasks'])
140 # Refresh to obtain concrete integer values for comparison.
141 metadata.refresh_from_db(fields=['failed_tasks', 'successful_tasks', 'cancelled_tasks'])
143 if (
144 not parent_was_cancelling
145 and metadata.scheduled_tasks
146 == (metadata.successful_tasks + metadata.failed_tasks + metadata.cancelled_tasks)
147 ):
148 self.status = TaskStatus.Completed
149 self.completed_time = timezone.now()
150 self.save(update_fields=['status', 'completed_time'])
151 return True
153 return False
155 @classmethod
156 def increment_cancelled_children_for_parent(cls, parent_task_id: int, count: int = 1):
157 """Increment a parent task's cancelled-child counter in one database update.
159 When count is greater than 1, multiple child cancellations are batched into
160 a single update. Returns early without touching the database when count is
161 zero or negative.
162 """
163 if count <= 0: 163 ↛ 164line 163 didn't jump to line 164 because the condition on line 163 was never true
164 return
166 TaskMeta.objects.filter(task__id=parent_task_id).update(
167 cancelled_tasks=F('cancelled_tasks') + count
168 )
170 def set_failure_with_exception(self, exception: Exception | DetailedException, save: bool = False):
171 """Record failure details on the task, preserving the stack trace."""
172 self.status = TaskStatus.Failed
174 detailed_exception = (exception
175 if isinstance(exception, DetailedException)
176 else DetailedException(exception, source="")
177 )
179 self.error = {
180 "message": detailed_exception.message,
181 "cause": detailed_exception.source,
182 "traceback": detailed_exception.traceback
183 }
185 if self.completed_time is None: 185 ↛ 189line 185 didn't jump to line 189 because the condition on line 185 was always true
186 # ensure completed timestamp reflects when failure was logged
187 self.completed_time = timezone.now()
189 if save: 189 ↛ exitline 189 didn't return from function 'set_failure_with_exception' because the condition on line 189 was always true
190 self.save()