Coverage for backend/django/core/auxiliary/services/scenario_import.py: 81%
153 statements
« prev ^ index » next coverage.py v7.10.7, created at 2026-06-23 21:51 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2026-06-23 21:51 +0000
1import traceback
2from itertools import chain
4from django.db import transaction
5from django.utils import timezone
6from rest_framework.exceptions import NotFound, ValidationError
8from common.models.notifications.payloads import NotificationServiceMessageType
9from common.models.scenario_import import ScenarioCsvImportRequestPayload
10from common.services import messaging
11from core.auxiliary.enums.generalEnums import TaskStatus
12from core.auxiliary.managers.TaskManager import handle_task_running_event
13from core.auxiliary.models.DataCell import DataCell
14from core.auxiliary.models.DataColumn import DataColumn
15from core.auxiliary.models.DataRow import DataRow
16from core.auxiliary.models.Scenario import Scenario, ScenarioInputModeEnum, ScenarioTabTypeEnum
17from core.auxiliary.models.Task import Task, TaskType
18from core.auxiliary.models.UploadSession import (
19 UploadSession,
20 UploadSessionPurpose,
21 UploadSessionStatus,
22)
23from core.auxiliary.serializers import TaskSerializer
24from core.auxiliary.services.csv_inspect import (
25 CsvInspectionError,
26 filter_numeric_headers_by_first_row,
27 parse_numeric_row,
28 stream_csv_rows,
29)
30from core.auxiliary.services.object_storage import s3 as s3_storage
31from core.auxiliary.services.uploads import inspect_upload_session
32from core.auxiliary.services.parameter_sweep import clear_parameter_sweep_definition
35ROW_BATCH_SIZE = 1000
36PROGRESS_UPDATE_INTERVAL = 1000
39def _send_task_update(task: Task, message_type: NotificationServiceMessageType):
40 messaging.send_flowsheet_notification_message(
41 task.flowsheet_id,
42 TaskSerializer(task).data,
43 message_type,
44 )
47def _get_completed_scenario_upload(
48 upload_session_id: int,
49 user_id: int,
50 scenario_id: int,
51) -> UploadSession:
52 try:
53 upload_session = UploadSession.objects.get(id=upload_session_id)
54 except UploadSession.DoesNotExist as exc:
55 raise ValidationError({"upload_session_id": "Upload session not found."}) from exc
57 if upload_session.created_by_id != user_id: 57 ↛ 58line 57 didn't jump to line 58 because the condition on line 57 was never true
58 raise ValidationError({"upload_session_id": "You do not own this upload session."})
59 if upload_session.status != UploadSessionStatus.COMPLETED: 59 ↛ 60line 59 didn't jump to line 60 because the condition on line 59 was never true
60 raise ValidationError({"upload_session_id": "The upload session must be completed before import."})
61 if upload_session.purpose != UploadSessionPurpose.SCENARIO_CSV: 61 ↛ 62line 61 didn't jump to line 62 because the condition on line 61 was never true
62 raise ValidationError({"upload_session_id": "The upload session purpose must be scenario_csv."})
63 if upload_session.scenario_id and upload_session.scenario_id != scenario_id: 63 ↛ 64line 63 didn't jump to line 64 because the condition on line 63 was never true
64 raise ValidationError({"upload_session_id": "The upload session belongs to a different scenario."})
66 return upload_session
69def _get_importable_scenario(scenario_id: int) -> Scenario:
70 """Resolve and validate the target scenario before queueing a scenario CSV import."""
71 try:
72 scenario = Scenario.objects.get(id=scenario_id)
73 except Scenario.DoesNotExist as exc:
74 raise NotFound({"scenario_id": "Scenario not found."}) from exc
76 if scenario.state_name not in { 76 ↛ 80line 76 didn't jump to line 80 because the condition on line 76 was never true
77 ScenarioTabTypeEnum.MultiSteadyState,
78 ScenarioTabTypeEnum.Dynamic,
79 }:
80 raise ValidationError(
81 {
82 "scenario_id": (
83 "Only multi steady-state and dynamic scenarios support scenario CSV import."
84 )
85 }
86 )
88 return scenario
91def enqueue_scenario_import(*, user, scenario_id: int, upload_session_id: int) -> Task:
92 """Queue a scenario CSV import unless an active task already owns that upload.
94 Reusing non-terminal tasks prevents duplicate pub/sub deliveries for the same
95 upload session, while failed and cancelled tasks remain retryable.
96 """
97 scenario = _get_importable_scenario(scenario_id)
98 upload_session = _get_completed_scenario_upload(upload_session_id, user.id, scenario_id)
99 if upload_session.flowsheet_id != scenario.flowsheet_id:
100 raise ValidationError({"upload_session_id": "The upload session belongs to a different flowsheet."})
102 existing_task = (
103 Task.objects.filter(
104 task_type=TaskType.CSV_IMPORT_SCENARIO,
105 flowsheet_id=scenario.flowsheet_id,
106 debug__scenario_id=scenario_id,
107 debug__upload_session_id=upload_session_id,
108 status__in=[TaskStatus.Pending, TaskStatus.Running, TaskStatus.Cancelling],
109 )
110 .order_by("-start_time")
111 .first()
112 )
113 if existing_task is not None:
114 return existing_task
116 task = Task.create(
117 user,
118 scenario.flowsheet_id,
119 task_type=TaskType.CSV_IMPORT_SCENARIO,
120 status=TaskStatus.Pending,
121 save=True,
122 )
123 task.debug = {
124 "scenario_id": scenario_id,
125 "upload_session_id": upload_session_id,
126 "rows_imported": 0,
127 }
128 task.save(update_fields=["debug"])
130 messaging.send_scenario_csv_import_message(
131 ScenarioCsvImportRequestPayload(
132 task_id=task.id,
133 flowsheet_id=scenario.flowsheet_id,
134 scenario_id=scenario_id,
135 upload_session_id=upload_session_id,
136 bucket=upload_session.bucket,
137 object_key=upload_session.object_key,
138 csv_delimiter=upload_session.csv_delimiter or None,
139 requested_by_user_id=user.id,
140 )
141 )
142 return task
145def _flush_rows(
146 *,
147 task: Task,
148 scenario: Scenario,
149 headers: list[str],
150 columns_by_name: dict[str, DataColumn],
151 row_buffer: list[DataRow],
152 value_buffer: list[list[float]],
153 imported_count: int,
154) -> int:
155 if not row_buffer: 155 ↛ 156line 155 didn't jump to line 156 because the condition on line 155 was never true
156 return imported_count
158 DataRow.objects.bulk_create(row_buffer)
159 cells: list[DataCell] = []
160 for data_row, row_values in zip(row_buffer, value_buffer):
161 for header, value in zip(headers, row_values):
162 cells.append(
163 DataCell(
164 flowsheet_id=scenario.flowsheet_id,
165 data_column=columns_by_name[header],
166 data_row=data_row,
167 value=value,
168 )
169 )
170 DataCell.objects.bulk_create(cells)
172 imported_count += len(row_buffer)
173 if imported_count % PROGRESS_UPDATE_INTERVAL == 0: 173 ↛ 174line 173 didn't jump to line 174 because the condition on line 173 was never true
174 task.debug = {**(task.debug or {}), "rows_imported": imported_count}
175 task.save(update_fields=["debug"])
176 _send_task_update(task, NotificationServiceMessageType.TASK_UPDATED)
178 row_buffer.clear()
179 value_buffer.clear()
180 return imported_count
183def process_scenario_import(payload: ScenarioCsvImportRequestPayload):
184 """Import scenario CSV data from object storage into scenario data tables.
186 The task is moved from `pending` to `running` by the task manager, then the
187 CSV is streamed from object storage so large uploads do not need to fit in
188 memory. Any import failure is recorded on the task and broadcast to the UI.
189 """
190 task = Task.objects.get(id=payload.task_id)
191 if task.status in {TaskStatus.Completed, TaskStatus.Cancelled, TaskStatus.Running, TaskStatus.Failed}: 191 ↛ 192line 191 didn't jump to line 192 because the condition on line 191 was never true
192 return
194 handle_task_running_event(task.id)
195 try:
196 scenario = _get_importable_scenario(payload.scenario_id)
197 try:
198 upload_session = UploadSession.objects.get(id=payload.upload_session_id)
199 except UploadSession.DoesNotExist as exc:
200 raise CsvInspectionError("The upload session was deleted before the import could run.") from exc
202 if upload_session.status != UploadSessionStatus.COMPLETED: 202 ↛ 203line 202 didn't jump to line 203 because the condition on line 202 was never true
203 raise CsvInspectionError("The upload session must be completed before scenario CSV import can run.")
204 if upload_session.purpose != UploadSessionPurpose.SCENARIO_CSV: 204 ↛ 205line 204 didn't jump to line 205 because the condition on line 204 was never true
205 raise CsvInspectionError("The upload session purpose must be scenario_csv for scenario CSV import.")
206 if upload_session.flowsheet_id != scenario.flowsheet_id: 206 ↛ 207line 206 didn't jump to line 207 because the condition on line 206 was never true
207 raise CsvInspectionError("The upload session belongs to a different flowsheet.")
208 if upload_session.scenario_id and upload_session.scenario_id != scenario.id: 208 ↛ 209line 208 didn't jump to line 209 because the condition on line 208 was never true
209 raise CsvInspectionError("The upload session belongs to a different scenario.")
211 delimiter = payload.csv_delimiter or upload_session.csv_delimiter
212 if not delimiter:
213 delimiter = inspect_upload_session(upload_session).delimiter
215 with transaction.atomic():
216 replacing_parameter_sweep = (
217 scenario.mss_input_mode == ScenarioInputModeEnum.ParameterSweep
218 or hasattr(scenario, "parameterSweepDefinition")
219 )
220 DataRow.objects.filter(scenario_id=scenario.id).delete()
221 if replacing_parameter_sweep:
222 DataColumn.objects.filter(
223 scenario_id=scenario.id,
224 property_value__isnull=False,
225 ).delete()
226 clear_parameter_sweep_definition(scenario)
227 scenario.mss_input_mode = ScenarioInputModeEnum.Csv
228 scenario.save(update_fields=["mss_input_mode"])
230 body = s3_storage.stream_object(payload.bucket, payload.object_key)
231 headers, reader = stream_csv_rows(body, delimiter)
232 first_row = next(reader, None)
233 if first_row is None: 233 ↛ 234line 233 didn't jump to line 234 because the condition on line 233 was never true
234 raise CsvInspectionError("The uploaded CSV must include at least one data row.")
236 numeric_headers, skipped_headers = filter_numeric_headers_by_first_row(headers, first_row)
237 if not numeric_headers: 237 ↛ 238line 237 didn't jump to line 238 because the condition on line 237 was never true
238 raise CsvInspectionError(
239 "The uploaded CSV does not contain any numeric columns in its first data row."
240 )
242 existing_columns = {
243 column.name: column
244 for column in DataColumn.objects.filter(scenario_id=scenario.id)
245 }
246 missing_headers = [header for header in numeric_headers if header not in existing_columns]
247 if missing_headers: 247 ↛ 264line 247 didn't jump to line 264 because the condition on line 247 was always true
248 DataColumn.objects.bulk_create(
249 [
250 DataColumn(
251 flowsheet_id=scenario.flowsheet_id,
252 scenario_id=scenario.id,
253 name=header,
254 value="",
255 )
256 for header in missing_headers
257 ]
258 )
259 existing_columns = {
260 column.name: column
261 for column in DataColumn.objects.filter(scenario_id=scenario.id)
262 }
264 row_buffer: list[DataRow] = []
265 value_buffer: list[list[float]] = []
266 imported_count = 0
268 for row_index, row in enumerate(chain([first_row], reader)):
269 row_number = row_index + 2
270 parsed_values = parse_numeric_row(row_number, row, numeric_headers)
271 row_buffer.append(
272 DataRow(
273 flowsheet_id=scenario.flowsheet_id,
274 scenario_id=scenario.id,
275 index=row_index,
276 )
277 )
278 value_buffer.append(parsed_values)
280 if len(row_buffer) >= ROW_BATCH_SIZE: 280 ↛ 281line 280 didn't jump to line 281 because the condition on line 280 was never true
281 imported_count = _flush_rows(
282 task=task,
283 scenario=scenario,
284 headers=numeric_headers,
285 columns_by_name=existing_columns,
286 row_buffer=row_buffer,
287 value_buffer=value_buffer,
288 imported_count=imported_count,
289 )
291 imported_count = _flush_rows(
292 task=task,
293 scenario=scenario,
294 headers=numeric_headers,
295 columns_by_name=existing_columns,
296 row_buffer=row_buffer,
297 value_buffer=value_buffer,
298 imported_count=imported_count,
299 )
301 task.status = TaskStatus.Completed
302 task.completed_time = timezone.now()
303 task.debug = {
304 **(task.debug or {}),
305 "rows_imported": imported_count,
306 "skipped_non_numeric_headers": skipped_headers,
307 }
308 task.error = None
309 task.save(update_fields=["status", "completed_time", "debug", "error"])
310 except Exception as exc:
311 task.status = TaskStatus.Failed
312 task.completed_time = timezone.now()
313 task.error = {
314 "message": str(exc),
315 "cause": "scenario_csv_import",
316 "traceback": traceback.format_exc(),
317 }
318 task.save(update_fields=["status", "completed_time", "error"])
319 _send_task_update(task, NotificationServiceMessageType.TASK_COMPLETED)
320 return
322 _send_task_update(task, NotificationServiceMessageType.TASK_COMPLETED)
325def mark_scenario_import_delivery_failure(payload: ScenarioCsvImportRequestPayload):
326 """Fail a queued scenario CSV import when Dapr cannot deliver the callback.
328 This is distinct from `process_scenario_import`, which only runs after the
329 callback arrives successfully. Delivery failures need their own task update
330 or the frontend will continue showing the task as pending.
331 """
332 task = Task.objects.get(id=payload.task_id)
333 if task.status in {TaskStatus.Completed, TaskStatus.Cancelled, TaskStatus.Failed}: 333 ↛ 334line 333 didn't jump to line 334 because the condition on line 333 was never true
334 return
336 task.status = TaskStatus.Failed
337 task.completed_time = timezone.now()
338 task.error = {
339 "message": (
340 "The scenario CSV import could not be started. "
341 "Retry the import to queue a new task."
342 ),
343 "cause": "scenario_csv_import_delivery_failed",
344 }
345 task.save(update_fields=["status", "completed_time", "error"])
346 _send_task_update(task, NotificationServiceMessageType.TASK_COMPLETED)