Coverage for backend/core/auxiliary/models/Task.py: 80%
64 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-11-06 23:27 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-11-06 23:27 +0000
1"""Task model offering tracking and coordination for asynchronous operations."""
3from typing import Optional
4from django.db import models
5from django.db.models import F
6from django.utils import timezone
7from authentication.user.models import User
8from core.auxiliary.enums.generalEnums import TaskStatus
9from core.auxiliary.models import Flowsheet
10from core.exceptions import DetailedException
11from core.managers import AccessControlManager
14class TaskMeta(models.Model):
15 """Aggregate counts that parent tasks use to monitor child progress."""
17 scheduled_tasks = models.PositiveIntegerField(default=0)
18 failed_tasks = models.PositiveIntegerField(default=0)
19 successful_tasks = models.PositiveIntegerField(default=0)
21class TaskType(models.TextChoices):
22 """Enum of supported task categories."""
24 ML_TRAINING = 'ML Training'
25 IDAES_SOLVE = 'Solve'
27class Task(models.Model):
28 """Represents a unit of work for platform computation (e.g. solving, ML), the status of which is observable
29 and cancellable by users."""
31 task_type = models.CharField(max_length=50, choices=TaskType.choices , default=TaskType.IDAES_SOLVE)
32 creator = models.ForeignKey(User, on_delete=models.CASCADE)
33 status = models.CharField(choices=TaskStatus.choices , default=TaskStatus.Pending)
34 flowsheet = models.ForeignKey(Flowsheet, on_delete=models.CASCADE)
35 start_time = models.DateTimeField(auto_now_add=True)
36 completed_time = models.DateTimeField(null=True)
37 error = models.JSONField(null=True)
38 debug = models.JSONField(null=True)
39 log = models.TextField(null=True)
40 parent = models.ForeignKey('Task', on_delete=models.CASCADE, null=True, related_name='children')
41 metadata = models.OneToOneField(TaskMeta, on_delete=models.CASCADE, null=True, related_name='task')
43 objects = AccessControlManager()
45 class Meta:
46 ordering = ['-start_time']
48 @classmethod
49 def create(
50 cls,
51 creator: User | int,
52 flowsheet: Flowsheet | int,
53 task_type: TaskType = TaskType.IDAES_SOLVE,
54 parent: Optional['Task'] = None,
55 status: TaskStatus = TaskStatus.Pending,
56 save=False
57 ):
58 """Instantiate a task, optionally persisting immediately.
60 Args:
61 creator: User creating the task or their primary key.
62 flowsheet: Flowsheet associated with the task or its primary key.
63 task_type: Category used to tailor downstream behaviour.
64 parent: Parent task when building a hierarchy of work.
65 status: Initial status to set on the task.
66 save: When true the task is inserted into the database before returning.
68 Returns:
69 The newly constructed `Task` instance.
70 """
72 task = Task(
73 creator=creator,
74 flowsheet_id=flowsheet,
75 parent=parent,
76 status=status,
77 task_type=task_type
78 )
80 if save:
81 task.save()
83 return task
85 @classmethod
86 def create_parent_task(
87 cls,
88 creator: User | int,
89 flowsheet_id: int,
90 task_type: TaskType = TaskType.IDAES_SOLVE,
91 scheduled_tasks: int = 0,
92 status: TaskStatus = TaskStatus.Pending
93 ):
94 """Create a parent task seeded with metadata describing child workload."""
95 parent_task = Task(
96 creator=creator,
97 flowsheet_id=flowsheet_id,
98 status=status,
99 task_type=task_type,
100 )
101 parent_task.metadata = TaskMeta.objects.create(scheduled_tasks=scheduled_tasks)
102 parent_task.save()
104 return parent_task
106 def update_status_from_child(self, child_task: 'Task'):
107 """Update parent completion state in response to a child transition."""
109 metadata = self.metadata
111 if child_task.status == TaskStatus.Failed: 111 ↛ 113line 111 didn't jump to line 113 because the condition on line 111 was never true
112 # Use F expressions to avoid race conditions when multiple children update concurrently.
113 metadata.failed_tasks = F('failed_tasks') + 1
114 metadata.save(update_fields=['failed_tasks'])
115 elif child_task.status == TaskStatus.Completed: 115 ↛ 120line 115 didn't jump to line 120 because the condition on line 115 was always true
116 metadata.successful_tasks = F('successful_tasks') + 1
117 metadata.save(update_fields=['successful_tasks'])
119 # Refresh to obtain concrete integer values for comparison.
120 metadata.refresh_from_db(fields=['failed_tasks', 'successful_tasks'])
122 if metadata.scheduled_tasks == (metadata.successful_tasks + metadata.failed_tasks):
123 self.status = TaskStatus.Completed
124 self.completed_time = timezone.now()
125 self.save(update_fields=['status', 'completed_time'])
127 def set_failure_with_exception(self, exception: Exception | DetailedException, save: bool = False):
128 """Record failure details on the task, preserving the stack trace."""
129 self.status = TaskStatus.Failed
131 detailed_exception = (exception
132 if isinstance(exception, DetailedException)
133 else DetailedException(exception, source="")
134 )
136 self.error = {
137 "message": detailed_exception.message,
138 "cause": detailed_exception.source,
139 "traceback": detailed_exception.traceback
140 }
142 if self.completed_time is None:
143 # ensure completed timestamp reflects when failure was logged
144 self.completed_time = timezone.now()
146 if save:
147 self.save()