Coverage for backend/django/core/auxiliary/services/scenario_import.py: 80%
146 statements
« prev ^ index » next coverage.py v7.10.7, created at 2026-05-13 02:47 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2026-05-13 02:47 +0000
1import traceback
2from itertools import chain
4from django.db import transaction
5from django.utils import timezone
6from rest_framework.exceptions import NotFound, ValidationError
8from common.models.notifications.payloads import NotificationServiceMessageType
9from common.models.scenario_import import ScenarioCsvImportRequestPayload
10from common.services import messaging
11from core.auxiliary.enums.generalEnums import TaskStatus
12from core.auxiliary.managers.TaskManager import handle_task_running_event
13from core.auxiliary.models.DataCell import DataCell
14from core.auxiliary.models.DataColumn import DataColumn
15from core.auxiliary.models.DataRow import DataRow
16from core.auxiliary.models.Scenario import Scenario, ScenarioTabTypeEnum
17from core.auxiliary.models.Task import Task, TaskType
18from core.auxiliary.models.UploadSession import (
19 UploadSession,
20 UploadSessionPurpose,
21 UploadSessionStatus,
22)
23from core.auxiliary.serializers import TaskSerializer
24from core.auxiliary.services.csv_inspect import (
25 CsvInspectionError,
26 filter_numeric_headers_by_first_row,
27 parse_numeric_row,
28 stream_csv_rows,
29)
30from core.auxiliary.services.object_storage import s3 as s3_storage
31from core.auxiliary.services.uploads import inspect_upload_session
34ROW_BATCH_SIZE = 1000
35PROGRESS_UPDATE_INTERVAL = 1000
38def _send_task_update(task: Task, message_type: NotificationServiceMessageType):
39 messaging.send_flowsheet_notification_message(
40 task.flowsheet_id,
41 TaskSerializer(task).data,
42 message_type,
43 )
46def _get_completed_scenario_upload(
47 upload_session_id: int,
48 user_id: int,
49 scenario_id: int,
50) -> UploadSession:
51 try:
52 upload_session = UploadSession.objects.get(id=upload_session_id)
53 except UploadSession.DoesNotExist as exc:
54 raise ValidationError({"upload_session_id": "Upload session not found."}) from exc
56 if upload_session.created_by_id != user_id: 56 ↛ 57line 56 didn't jump to line 57 because the condition on line 56 was never true
57 raise ValidationError({"upload_session_id": "You do not own this upload session."})
58 if upload_session.status != UploadSessionStatus.COMPLETED: 58 ↛ 59line 58 didn't jump to line 59 because the condition on line 58 was never true
59 raise ValidationError({"upload_session_id": "The upload session must be completed before import."})
60 if upload_session.purpose != UploadSessionPurpose.SCENARIO_CSV: 60 ↛ 61line 60 didn't jump to line 61 because the condition on line 60 was never true
61 raise ValidationError({"upload_session_id": "The upload session purpose must be scenario_csv."})
62 if upload_session.scenario_id and upload_session.scenario_id != scenario_id: 62 ↛ 63line 62 didn't jump to line 63 because the condition on line 62 was never true
63 raise ValidationError({"upload_session_id": "The upload session belongs to a different scenario."})
65 return upload_session
68def _get_importable_scenario(scenario_id: int) -> Scenario:
69 """Resolve and validate the target scenario before queueing a scenario CSV import."""
70 try:
71 scenario = Scenario.objects.get(id=scenario_id)
72 except Scenario.DoesNotExist as exc:
73 raise NotFound({"scenario_id": "Scenario not found."}) from exc
75 if scenario.state_name not in { 75 ↛ 79line 75 didn't jump to line 79 because the condition on line 75 was never true
76 ScenarioTabTypeEnum.MultiSteadyState,
77 ScenarioTabTypeEnum.Dynamic,
78 }:
79 raise ValidationError(
80 {
81 "scenario_id": (
82 "Only multi steady-state and dynamic scenarios support scenario CSV import."
83 )
84 }
85 )
87 return scenario
90def enqueue_scenario_import(*, user, scenario_id: int, upload_session_id: int) -> Task:
91 """Queue a scenario CSV import unless an active task already owns that upload.
93 Reusing non-terminal tasks prevents duplicate pub/sub deliveries for the same
94 upload session, while failed and cancelled tasks remain retryable.
95 """
96 scenario = _get_importable_scenario(scenario_id)
97 upload_session = _get_completed_scenario_upload(upload_session_id, user.id, scenario_id)
98 if upload_session.flowsheet_id != scenario.flowsheet_id:
99 raise ValidationError({"upload_session_id": "The upload session belongs to a different flowsheet."})
101 existing_task = (
102 Task.objects.filter(
103 task_type=TaskType.CSV_IMPORT_SCENARIO,
104 flowsheet_id=scenario.flowsheet_id,
105 debug__scenario_id=scenario_id,
106 debug__upload_session_id=upload_session_id,
107 status__in=[TaskStatus.Pending, TaskStatus.Running, TaskStatus.Cancelling],
108 )
109 .order_by("-start_time")
110 .first()
111 )
112 if existing_task is not None:
113 return existing_task
115 task = Task.create(
116 user,
117 scenario.flowsheet_id,
118 task_type=TaskType.CSV_IMPORT_SCENARIO,
119 status=TaskStatus.Pending,
120 save=True,
121 )
122 task.debug = {
123 "scenario_id": scenario_id,
124 "upload_session_id": upload_session_id,
125 "rows_imported": 0,
126 }
127 task.save(update_fields=["debug"])
129 messaging.send_scenario_csv_import_message(
130 ScenarioCsvImportRequestPayload(
131 task_id=task.id,
132 flowsheet_id=scenario.flowsheet_id,
133 scenario_id=scenario_id,
134 upload_session_id=upload_session_id,
135 bucket=upload_session.bucket,
136 object_key=upload_session.object_key,
137 csv_delimiter=upload_session.csv_delimiter or None,
138 requested_by_user_id=user.id,
139 )
140 )
141 return task
144def _flush_rows(
145 *,
146 task: Task,
147 scenario: Scenario,
148 headers: list[str],
149 columns_by_name: dict[str, DataColumn],
150 row_buffer: list[DataRow],
151 value_buffer: list[list[float]],
152 imported_count: int,
153) -> int:
154 if not row_buffer: 154 ↛ 155line 154 didn't jump to line 155 because the condition on line 154 was never true
155 return imported_count
157 DataRow.objects.bulk_create(row_buffer)
158 cells: list[DataCell] = []
159 for data_row, row_values in zip(row_buffer, value_buffer):
160 for header, value in zip(headers, row_values):
161 cells.append(
162 DataCell(
163 flowsheet_id=scenario.flowsheet_id,
164 data_column=columns_by_name[header],
165 data_row=data_row,
166 value=value,
167 )
168 )
169 DataCell.objects.bulk_create(cells)
171 imported_count += len(row_buffer)
172 if imported_count % PROGRESS_UPDATE_INTERVAL == 0: 172 ↛ 173line 172 didn't jump to line 173 because the condition on line 172 was never true
173 task.debug = {**(task.debug or {}), "rows_imported": imported_count}
174 task.save(update_fields=["debug"])
175 _send_task_update(task, NotificationServiceMessageType.TASK_UPDATED)
177 row_buffer.clear()
178 value_buffer.clear()
179 return imported_count
182def process_scenario_import(payload: ScenarioCsvImportRequestPayload):
183 """Import scenario CSV data from object storage into scenario data tables.
185 The task is moved from `pending` to `running` by the task manager, then the
186 CSV is streamed from object storage so large uploads do not need to fit in
187 memory. Any import failure is recorded on the task and broadcast to the UI.
188 """
189 task = Task.objects.get(id=payload.task_id)
190 if task.status in {TaskStatus.Completed, TaskStatus.Cancelled, TaskStatus.Running, TaskStatus.Failed}: 190 ↛ 191line 190 didn't jump to line 191 because the condition on line 190 was never true
191 return
193 handle_task_running_event(task.id)
194 try:
195 scenario = _get_importable_scenario(payload.scenario_id)
196 try:
197 upload_session = UploadSession.objects.get(id=payload.upload_session_id)
198 except UploadSession.DoesNotExist as exc:
199 raise CsvInspectionError("The upload session was deleted before the import could run.") from exc
201 if upload_session.status != UploadSessionStatus.COMPLETED: 201 ↛ 202line 201 didn't jump to line 202 because the condition on line 201 was never true
202 raise CsvInspectionError("The upload session must be completed before scenario CSV import can run.")
203 if upload_session.purpose != UploadSessionPurpose.SCENARIO_CSV: 203 ↛ 204line 203 didn't jump to line 204 because the condition on line 203 was never true
204 raise CsvInspectionError("The upload session purpose must be scenario_csv for scenario CSV import.")
205 if upload_session.flowsheet_id != scenario.flowsheet_id: 205 ↛ 206line 205 didn't jump to line 206 because the condition on line 205 was never true
206 raise CsvInspectionError("The upload session belongs to a different flowsheet.")
207 if upload_session.scenario_id and upload_session.scenario_id != scenario.id: 207 ↛ 208line 207 didn't jump to line 208 because the condition on line 207 was never true
208 raise CsvInspectionError("The upload session belongs to a different scenario.")
210 delimiter = payload.csv_delimiter or upload_session.csv_delimiter
211 if not delimiter:
212 delimiter = inspect_upload_session(upload_session).delimiter
214 with transaction.atomic():
215 DataRow.objects.filter(scenario_id=scenario.id).delete()
217 body = s3_storage.stream_object(payload.bucket, payload.object_key)
218 headers, reader = stream_csv_rows(body, delimiter)
219 first_row = next(reader, None)
220 if first_row is None: 220 ↛ 221line 220 didn't jump to line 221 because the condition on line 220 was never true
221 raise CsvInspectionError("The uploaded CSV must include at least one data row.")
223 numeric_headers, skipped_headers = filter_numeric_headers_by_first_row(headers, first_row)
224 if not numeric_headers: 224 ↛ 225line 224 didn't jump to line 225 because the condition on line 224 was never true
225 raise CsvInspectionError(
226 "The uploaded CSV does not contain any numeric columns in its first data row."
227 )
229 existing_columns = {
230 column.name: column
231 for column in DataColumn.objects.filter(scenario_id=scenario.id)
232 }
233 missing_headers = [header for header in numeric_headers if header not in existing_columns]
234 if missing_headers: 234 ↛ 251line 234 didn't jump to line 251 because the condition on line 234 was always true
235 DataColumn.objects.bulk_create(
236 [
237 DataColumn(
238 flowsheet_id=scenario.flowsheet_id,
239 scenario_id=scenario.id,
240 name=header,
241 value="",
242 )
243 for header in missing_headers
244 ]
245 )
246 existing_columns = {
247 column.name: column
248 for column in DataColumn.objects.filter(scenario_id=scenario.id)
249 }
251 row_buffer: list[DataRow] = []
252 value_buffer: list[list[float]] = []
253 imported_count = 0
255 for row_index, row in enumerate(chain([first_row], reader)):
256 row_number = row_index + 2
257 parsed_values = parse_numeric_row(row_number, row, numeric_headers)
258 row_buffer.append(
259 DataRow(
260 flowsheet_id=scenario.flowsheet_id,
261 scenario_id=scenario.id,
262 index=row_index,
263 )
264 )
265 value_buffer.append(parsed_values)
267 if len(row_buffer) >= ROW_BATCH_SIZE: 267 ↛ 268line 267 didn't jump to line 268 because the condition on line 267 was never true
268 imported_count = _flush_rows(
269 task=task,
270 scenario=scenario,
271 headers=numeric_headers,
272 columns_by_name=existing_columns,
273 row_buffer=row_buffer,
274 value_buffer=value_buffer,
275 imported_count=imported_count,
276 )
278 imported_count = _flush_rows(
279 task=task,
280 scenario=scenario,
281 headers=numeric_headers,
282 columns_by_name=existing_columns,
283 row_buffer=row_buffer,
284 value_buffer=value_buffer,
285 imported_count=imported_count,
286 )
288 task.status = TaskStatus.Completed
289 task.completed_time = timezone.now()
290 task.debug = {
291 **(task.debug or {}),
292 "rows_imported": imported_count,
293 "skipped_non_numeric_headers": skipped_headers,
294 }
295 task.error = None
296 task.save(update_fields=["status", "completed_time", "debug", "error"])
297 except Exception as exc:
298 task.status = TaskStatus.Failed
299 task.completed_time = timezone.now()
300 task.error = {
301 "message": str(exc),
302 "cause": "scenario_csv_import",
303 "traceback": traceback.format_exc(),
304 }
305 task.save(update_fields=["status", "completed_time", "error"])
306 _send_task_update(task, NotificationServiceMessageType.TASK_COMPLETED)
307 return
309 _send_task_update(task, NotificationServiceMessageType.TASK_COMPLETED)
312def mark_scenario_import_delivery_failure(payload: ScenarioCsvImportRequestPayload):
313 """Fail a queued scenario CSV import when Dapr cannot deliver the callback.
315 This is distinct from `process_scenario_import`, which only runs after the
316 callback arrives successfully. Delivery failures need their own task update
317 or the frontend will continue showing the task as pending.
318 """
319 task = Task.objects.get(id=payload.task_id)
320 if task.status in {TaskStatus.Completed, TaskStatus.Cancelled, TaskStatus.Failed}: 320 ↛ 321line 320 didn't jump to line 321 because the condition on line 320 was never true
321 return
323 task.status = TaskStatus.Failed
324 task.completed_time = timezone.now()
325 task.error = {
326 "message": (
327 "The scenario CSV import could not be started. "
328 "Retry the import to queue a new task."
329 ),
330 "cause": "scenario_csv_import_delivery_failed",
331 }
332 task.save(update_fields=["status", "completed_time", "error"])
333 _send_task_update(task, NotificationServiceMessageType.TASK_COMPLETED)