Coverage for backend/django/core/auxiliary/services/pinch_import.py: 78%
147 statements
« prev ^ index » next coverage.py v7.10.7, created at 2026-05-13 02:47 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2026-05-13 02:47 +0000
1import traceback
3from django.db import transaction
4from django.utils import timezone
5from rest_framework.exceptions import NotFound, ValidationError
7from PinchAnalysis.models.InputModels import PinchUtility
8from PinchAnalysis.models.StreamDataProject import StreamDataProject
9from common.models.notifications.payloads import NotificationServiceMessageType
10from common.models.pinch_import import PinchUtilityCsvImportRequestPayload
11from common.services import messaging
12from core.auxiliary.enums import pinchEnums
13from core.auxiliary.enums.generalEnums import TaskStatus
14from core.auxiliary.managers.TaskManager import handle_task_running_event
15from core.auxiliary.models.Task import Task, TaskType
16from core.auxiliary.models.UploadSession import (
17 UploadSession,
18 UploadSessionPurpose,
19 UploadSessionStatus,
20)
21from core.auxiliary.serializers import TaskSerializer
22from core.auxiliary.services.csv_inspect import CsvInspectionError, parse_float_cell, stream_csv_rows
23from core.auxiliary.services.object_storage import s3 as s3_storage
24from core.auxiliary.services.uploads import inspect_upload_session
27ROW_BATCH_SIZE = 1000
28PROGRESS_UPDATE_INTERVAL = 1000
29REQUIRED_COLUMNS = ("name", "t_supply", "t_target")
30OPTIONAL_COLUMNS = ("heat_flow", "dt_cont", "htc", "price", "type")
31SUPPORTED_COLUMNS = REQUIRED_COLUMNS + OPTIONAL_COLUMNS
32VALID_UTILITY_TYPES = {choice for choice, _ in pinchEnums.StreamType.choices if choice}
35def _send_task_update(task: Task, message_type: NotificationServiceMessageType):
36 messaging.send_flowsheet_notification_message(
37 task.flowsheet_id,
38 TaskSerializer(task).data,
39 message_type,
40 )
43def _get_pinch_project(project_id: int) -> StreamDataProject:
44 try:
45 return StreamDataProject.objects.select_related("flowsheet", "Inputs").get(id=project_id)
46 except StreamDataProject.DoesNotExist as exc:
47 raise NotFound({"project_id": "Pinch project not found."}) from exc
50def _get_completed_pinch_upload(upload_session_id: int, user_id: int) -> UploadSession:
51 try:
52 upload_session = UploadSession.objects.get(id=upload_session_id)
53 except UploadSession.DoesNotExist as exc:
54 raise ValidationError({"upload_session_id": "Upload session not found."}) from exc
56 if upload_session.created_by_id != user_id: 56 ↛ 57line 56 didn't jump to line 57 because the condition on line 56 was never true
57 raise ValidationError({"upload_session_id": "You do not own this upload session."})
58 if upload_session.status != UploadSessionStatus.COMPLETED: 58 ↛ 59line 58 didn't jump to line 59 because the condition on line 58 was never true
59 raise ValidationError({"upload_session_id": "The upload session must be completed before import."})
60 if upload_session.purpose != UploadSessionPurpose.PINCH_UTILITY_CSV: 60 ↛ 61line 60 didn't jump to line 61 because the condition on line 60 was never true
61 raise ValidationError({"upload_session_id": "The upload session purpose must be pinch_utility_csv."})
62 return upload_session
65def enqueue_pinch_utility_import(*, user, project_id: int, upload_session_id: int) -> Task:
66 """Queue a Pinch utility CSV import unless the same upload already has an active task."""
67 project = _get_pinch_project(project_id)
68 upload_session = _get_completed_pinch_upload(upload_session_id, user.id)
70 if upload_session.flowsheet_id != project.flowsheet_id:
71 raise ValidationError({"upload_session_id": "The upload session belongs to a different flowsheet."})
73 existing_task = (
74 Task.objects.filter(
75 task_type=TaskType.CSV_IMPORT_PINCH_UTILITIES,
76 flowsheet_id=project.flowsheet_id,
77 debug__project_id=project_id,
78 debug__upload_session_id=upload_session_id,
79 )
80 .exclude(status__in=[TaskStatus.Failed, TaskStatus.Cancelled])
81 .order_by("-start_time")
82 .first()
83 )
84 if existing_task is not None:
85 return existing_task
87 task = Task.create(
88 user,
89 project.flowsheet_id,
90 task_type=TaskType.CSV_IMPORT_PINCH_UTILITIES,
91 status=TaskStatus.Pending,
92 save=True,
93 )
94 task.debug = {
95 "project_id": project_id,
96 "upload_session_id": upload_session_id,
97 "utilities_imported": 0,
98 }
99 task.save(update_fields=["debug"])
101 messaging.send_pinch_utility_csv_import_message(
102 PinchUtilityCsvImportRequestPayload(
103 task_id=task.id,
104 flowsheet_id=project.flowsheet_id,
105 project_id=project_id,
106 upload_session_id=upload_session_id,
107 bucket=upload_session.bucket,
108 object_key=upload_session.object_key,
109 csv_delimiter=upload_session.csv_delimiter or None,
110 requested_by_user_id=user.id,
111 )
112 )
113 return task
116def _row_has_any_supported_value(row: dict[str, str | None]) -> bool:
117 return any((row.get(column) or "").strip() for column in SUPPORTED_COLUMNS)
120def _parse_float(
121 row_number: int,
122 row: dict[str, str],
123 column_name: str,
124 *,
125 required: bool,
126) -> float | None:
127 parsed = parse_float_cell(row_number, row, column_name, required=required)
128 if parsed is None and required: 128 ↛ 129line 128 didn't jump to line 129 because the condition on line 128 was never true
129 raise CsvInspectionError(f"Row {row_number}, column '{column_name}' is required.")
130 return parsed
133def _parse_type(row_number: int, row: dict[str, str]) -> str:
134 raw_value = (row.get("type") or "").strip()
135 if not raw_value:
136 return pinchEnums.StreamType.Both
137 if raw_value not in VALID_UTILITY_TYPES: 137 ↛ 138line 137 didn't jump to line 138 because the condition on line 137 was never true
138 raise CsvInspectionError(
139 f"Row {row_number}, column 'type' must be one of: {', '.join(sorted(VALID_UTILITY_TYPES))}."
140 )
141 return raw_value
144def _build_utility(row_number: int, row: dict[str, str], project: StreamDataProject) -> PinchUtility | None:
145 if not _row_has_any_supported_value(row): 145 ↛ 146line 145 didn't jump to line 146 because the condition on line 145 was never true
146 return None
148 name = (row.get("name") or "").strip()
149 if not name: 149 ↛ 150line 149 didn't jump to line 150 because the condition on line 149 was never true
150 raise CsvInspectionError(f"Row {row_number}, column 'name' is required.")
152 return PinchUtility(
153 flowsheet_id=project.flowsheet_id,
154 input_owner=project.Inputs,
155 name=name,
156 t_supply=_parse_float(row_number, row, "t_supply", required=True),
157 t_target=_parse_float(row_number, row, "t_target", required=True),
158 heat_flow=_parse_float(row_number, row, "heat_flow", required=False),
159 dt_cont=_parse_float(row_number, row, "dt_cont", required=False),
160 htc=_parse_float(row_number, row, "htc", required=False),
161 price=_parse_float(row_number, row, "price", required=False),
162 type=_parse_type(row_number, row),
163 )
166def _flush_utilities(
167 *,
168 task: Task,
169 utility_buffer: list[PinchUtility],
170 imported_count: int,
171) -> int:
172 if not utility_buffer: 172 ↛ 173line 172 didn't jump to line 173 because the condition on line 172 was never true
173 return imported_count
175 PinchUtility.objects.bulk_create(utility_buffer)
176 imported_count += len(utility_buffer)
177 if imported_count % PROGRESS_UPDATE_INTERVAL == 0: 177 ↛ 178line 177 didn't jump to line 178 because the condition on line 177 was never true
178 task.debug = {**(task.debug or {}), "utilities_imported": imported_count}
179 task.save(update_fields=["debug"])
180 _send_task_update(task, NotificationServiceMessageType.TASK_UPDATED)
182 utility_buffer.clear()
183 return imported_count
186def process_pinch_utility_import(payload: PinchUtilityCsvImportRequestPayload):
187 """Import Pinch utility rows from object storage into the target project.
189 The payload must reference an existing pending task, a completed upload session,
190 and a Pinch project owned by the same flowsheet. The CSV is streamed from object
191 storage, validated row-by-row, and inserted in batches while task progress
192 notifications are emitted. Invalid CSV structure or row values raise
193 `CsvInspectionError` or `ValueError`, which are captured on the task as a failure.
194 """
195 task = Task.objects.get(id=payload.task_id)
196 if task.status in {TaskStatus.Completed, TaskStatus.Cancelled, TaskStatus.Running, TaskStatus.Failed}: 196 ↛ 197line 196 didn't jump to line 197 because the condition on line 196 was never true
197 return
199 handle_task_running_event(task.id)
200 try:
201 project = _get_pinch_project(payload.project_id)
202 try:
203 upload_session = UploadSession.objects.get(id=payload.upload_session_id)
204 except UploadSession.DoesNotExist as exc:
205 raise CsvInspectionError("The upload session was deleted before the import could run.") from exc
207 if upload_session.status != UploadSessionStatus.COMPLETED: 207 ↛ 208line 207 didn't jump to line 208 because the condition on line 207 was never true
208 raise CsvInspectionError("The upload session must be completed before Pinch utility import can run.")
209 if upload_session.purpose != UploadSessionPurpose.PINCH_UTILITY_CSV: 209 ↛ 210line 209 didn't jump to line 210 because the condition on line 209 was never true
210 raise CsvInspectionError("The upload session purpose must be pinch_utility_csv for Pinch import.")
211 if upload_session.flowsheet_id != project.flowsheet_id: 211 ↛ 212line 211 didn't jump to line 212 because the condition on line 211 was never true
212 raise CsvInspectionError("The upload session belongs to a different flowsheet.")
214 delimiter = payload.csv_delimiter or upload_session.csv_delimiter
215 if not delimiter:
216 delimiter = inspect_upload_session(upload_session).delimiter
218 with transaction.atomic():
219 # Pinch CSV imports replace the current utility table for the project.
220 PinchUtility.objects.filter(input_owner=project.Inputs).delete()
222 body = s3_storage.stream_object(payload.bucket, payload.object_key)
223 _, reader = stream_csv_rows(body, delimiter)
225 utility_buffer: list[PinchUtility] = []
226 imported_count = 0
228 for row_index, row in enumerate(reader):
229 row_number = row_index + 2
230 utility = _build_utility(row_number, row, project)
231 if utility is None: 231 ↛ 232line 231 didn't jump to line 232 because the condition on line 231 was never true
232 continue
233 utility_buffer.append(utility)
235 if len(utility_buffer) >= ROW_BATCH_SIZE: 235 ↛ 236line 235 didn't jump to line 236 because the condition on line 235 was never true
236 imported_count = _flush_utilities(
237 task=task,
238 utility_buffer=utility_buffer,
239 imported_count=imported_count,
240 )
242 imported_count = _flush_utilities(
243 task=task,
244 utility_buffer=utility_buffer,
245 imported_count=imported_count,
246 )
248 if imported_count == 0: 248 ↛ 249line 248 didn't jump to line 249 because the condition on line 248 was never true
249 raise CsvInspectionError("The uploaded CSV must include at least one utility row.")
251 task.status = TaskStatus.Completed
252 task.completed_time = timezone.now()
253 task.debug = {**(task.debug or {}), "utilities_imported": imported_count}
254 task.error = None
255 task.save(update_fields=["status", "completed_time", "debug", "error"])
256 except Exception as exc:
257 task.status = TaskStatus.Failed
258 task.completed_time = timezone.now()
259 task.error = {
260 "message": str(exc),
261 "cause": "pinch_utility_csv_import",
262 "traceback": traceback.format_exc(),
263 }
264 task.save(update_fields=["status", "completed_time", "error"])
265 _send_task_update(task, NotificationServiceMessageType.TASK_COMPLETED)
266 return
268 _send_task_update(task, NotificationServiceMessageType.TASK_COMPLETED)
271def mark_pinch_utility_import_delivery_failure(payload: PinchUtilityCsvImportRequestPayload):
272 """Fail a queued Pinch utility CSV import when Dapr cannot deliver the callback."""
273 task = Task.objects.get(id=payload.task_id)
274 if task.status in {TaskStatus.Completed, TaskStatus.Cancelled, TaskStatus.Failed}: 274 ↛ 275line 274 didn't jump to line 275 because the condition on line 274 was never true
275 return
277 task.status = TaskStatus.Failed
278 task.completed_time = timezone.now()
279 task.error = {
280 "message": (
281 "The utility CSV import could not be started. "
282 "Retry the import to queue a new task."
283 ),
284 "cause": "pinch_utility_csv_import_delivery_failed",
285 }
286 task.save(update_fields=["status", "completed_time", "error"])
287 _send_task_update(task, NotificationServiceMessageType.TASK_COMPLETED)