Coverage for backend/django/core/auxiliary/services/pinch_import.py: 78%

147 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2026-05-13 02:47 +0000

1import traceback 

2 

3from django.db import transaction 

4from django.utils import timezone 

5from rest_framework.exceptions import NotFound, ValidationError 

6 

7from PinchAnalysis.models.InputModels import PinchUtility 

8from PinchAnalysis.models.StreamDataProject import StreamDataProject 

9from common.models.notifications.payloads import NotificationServiceMessageType 

10from common.models.pinch_import import PinchUtilityCsvImportRequestPayload 

11from common.services import messaging 

12from core.auxiliary.enums import pinchEnums 

13from core.auxiliary.enums.generalEnums import TaskStatus 

14from core.auxiliary.managers.TaskManager import handle_task_running_event 

15from core.auxiliary.models.Task import Task, TaskType 

16from core.auxiliary.models.UploadSession import ( 

17 UploadSession, 

18 UploadSessionPurpose, 

19 UploadSessionStatus, 

20) 

21from core.auxiliary.serializers import TaskSerializer 

22from core.auxiliary.services.csv_inspect import CsvInspectionError, parse_float_cell, stream_csv_rows 

23from core.auxiliary.services.object_storage import s3 as s3_storage 

24from core.auxiliary.services.uploads import inspect_upload_session 

25 

26 

27ROW_BATCH_SIZE = 1000 

28PROGRESS_UPDATE_INTERVAL = 1000 

29REQUIRED_COLUMNS = ("name", "t_supply", "t_target") 

30OPTIONAL_COLUMNS = ("heat_flow", "dt_cont", "htc", "price", "type") 

31SUPPORTED_COLUMNS = REQUIRED_COLUMNS + OPTIONAL_COLUMNS 

32VALID_UTILITY_TYPES = {choice for choice, _ in pinchEnums.StreamType.choices if choice} 

33 

34 

35def _send_task_update(task: Task, message_type: NotificationServiceMessageType): 

36 messaging.send_flowsheet_notification_message( 

37 task.flowsheet_id, 

38 TaskSerializer(task).data, 

39 message_type, 

40 ) 

41 

42 

43def _get_pinch_project(project_id: int) -> StreamDataProject: 

44 try: 

45 return StreamDataProject.objects.select_related("flowsheet", "Inputs").get(id=project_id) 

46 except StreamDataProject.DoesNotExist as exc: 

47 raise NotFound({"project_id": "Pinch project not found."}) from exc 

48 

49 

50def _get_completed_pinch_upload(upload_session_id: int, user_id: int) -> UploadSession: 

51 try: 

52 upload_session = UploadSession.objects.get(id=upload_session_id) 

53 except UploadSession.DoesNotExist as exc: 

54 raise ValidationError({"upload_session_id": "Upload session not found."}) from exc 

55 

56 if upload_session.created_by_id != user_id: 56 ↛ 57line 56 didn't jump to line 57 because the condition on line 56 was never true

57 raise ValidationError({"upload_session_id": "You do not own this upload session."}) 

58 if upload_session.status != UploadSessionStatus.COMPLETED: 58 ↛ 59line 58 didn't jump to line 59 because the condition on line 58 was never true

59 raise ValidationError({"upload_session_id": "The upload session must be completed before import."}) 

60 if upload_session.purpose != UploadSessionPurpose.PINCH_UTILITY_CSV: 60 ↛ 61line 60 didn't jump to line 61 because the condition on line 60 was never true

61 raise ValidationError({"upload_session_id": "The upload session purpose must be pinch_utility_csv."}) 

62 return upload_session 

63 

64 

65def enqueue_pinch_utility_import(*, user, project_id: int, upload_session_id: int) -> Task: 

66 """Queue a Pinch utility CSV import unless the same upload already has an active task.""" 

67 project = _get_pinch_project(project_id) 

68 upload_session = _get_completed_pinch_upload(upload_session_id, user.id) 

69 

70 if upload_session.flowsheet_id != project.flowsheet_id: 

71 raise ValidationError({"upload_session_id": "The upload session belongs to a different flowsheet."}) 

72 

73 existing_task = ( 

74 Task.objects.filter( 

75 task_type=TaskType.CSV_IMPORT_PINCH_UTILITIES, 

76 flowsheet_id=project.flowsheet_id, 

77 debug__project_id=project_id, 

78 debug__upload_session_id=upload_session_id, 

79 ) 

80 .exclude(status__in=[TaskStatus.Failed, TaskStatus.Cancelled]) 

81 .order_by("-start_time") 

82 .first() 

83 ) 

84 if existing_task is not None: 

85 return existing_task 

86 

87 task = Task.create( 

88 user, 

89 project.flowsheet_id, 

90 task_type=TaskType.CSV_IMPORT_PINCH_UTILITIES, 

91 status=TaskStatus.Pending, 

92 save=True, 

93 ) 

94 task.debug = { 

95 "project_id": project_id, 

96 "upload_session_id": upload_session_id, 

97 "utilities_imported": 0, 

98 } 

99 task.save(update_fields=["debug"]) 

100 

101 messaging.send_pinch_utility_csv_import_message( 

102 PinchUtilityCsvImportRequestPayload( 

103 task_id=task.id, 

104 flowsheet_id=project.flowsheet_id, 

105 project_id=project_id, 

106 upload_session_id=upload_session_id, 

107 bucket=upload_session.bucket, 

108 object_key=upload_session.object_key, 

109 csv_delimiter=upload_session.csv_delimiter or None, 

110 requested_by_user_id=user.id, 

111 ) 

112 ) 

113 return task 

114 

115 

116def _row_has_any_supported_value(row: dict[str, str | None]) -> bool: 

117 return any((row.get(column) or "").strip() for column in SUPPORTED_COLUMNS) 

118 

119 

120def _parse_float( 

121 row_number: int, 

122 row: dict[str, str], 

123 column_name: str, 

124 *, 

125 required: bool, 

126) -> float | None: 

127 parsed = parse_float_cell(row_number, row, column_name, required=required) 

128 if parsed is None and required: 128 ↛ 129line 128 didn't jump to line 129 because the condition on line 128 was never true

129 raise CsvInspectionError(f"Row {row_number}, column '{column_name}' is required.") 

130 return parsed 

131 

132 

133def _parse_type(row_number: int, row: dict[str, str]) -> str: 

134 raw_value = (row.get("type") or "").strip() 

135 if not raw_value: 

136 return pinchEnums.StreamType.Both 

137 if raw_value not in VALID_UTILITY_TYPES: 137 ↛ 138line 137 didn't jump to line 138 because the condition on line 137 was never true

138 raise CsvInspectionError( 

139 f"Row {row_number}, column 'type' must be one of: {', '.join(sorted(VALID_UTILITY_TYPES))}." 

140 ) 

141 return raw_value 

142 

143 

144def _build_utility(row_number: int, row: dict[str, str], project: StreamDataProject) -> PinchUtility | None: 

145 if not _row_has_any_supported_value(row): 145 ↛ 146line 145 didn't jump to line 146 because the condition on line 145 was never true

146 return None 

147 

148 name = (row.get("name") or "").strip() 

149 if not name: 149 ↛ 150line 149 didn't jump to line 150 because the condition on line 149 was never true

150 raise CsvInspectionError(f"Row {row_number}, column 'name' is required.") 

151 

152 return PinchUtility( 

153 flowsheet_id=project.flowsheet_id, 

154 input_owner=project.Inputs, 

155 name=name, 

156 t_supply=_parse_float(row_number, row, "t_supply", required=True), 

157 t_target=_parse_float(row_number, row, "t_target", required=True), 

158 heat_flow=_parse_float(row_number, row, "heat_flow", required=False), 

159 dt_cont=_parse_float(row_number, row, "dt_cont", required=False), 

160 htc=_parse_float(row_number, row, "htc", required=False), 

161 price=_parse_float(row_number, row, "price", required=False), 

162 type=_parse_type(row_number, row), 

163 ) 

164 

165 

166def _flush_utilities( 

167 *, 

168 task: Task, 

169 utility_buffer: list[PinchUtility], 

170 imported_count: int, 

171) -> int: 

172 if not utility_buffer: 172 ↛ 173line 172 didn't jump to line 173 because the condition on line 172 was never true

173 return imported_count 

174 

175 PinchUtility.objects.bulk_create(utility_buffer) 

176 imported_count += len(utility_buffer) 

177 if imported_count % PROGRESS_UPDATE_INTERVAL == 0: 177 ↛ 178line 177 didn't jump to line 178 because the condition on line 177 was never true

178 task.debug = {**(task.debug or {}), "utilities_imported": imported_count} 

179 task.save(update_fields=["debug"]) 

180 _send_task_update(task, NotificationServiceMessageType.TASK_UPDATED) 

181 

182 utility_buffer.clear() 

183 return imported_count 

184 

185 

186def process_pinch_utility_import(payload: PinchUtilityCsvImportRequestPayload): 

187 """Import Pinch utility rows from object storage into the target project. 

188 

189 The payload must reference an existing pending task, a completed upload session, 

190 and a Pinch project owned by the same flowsheet. The CSV is streamed from object 

191 storage, validated row-by-row, and inserted in batches while task progress 

192 notifications are emitted. Invalid CSV structure or row values raise 

193 `CsvInspectionError` or `ValueError`, which are captured on the task as a failure. 

194 """ 

195 task = Task.objects.get(id=payload.task_id) 

196 if task.status in {TaskStatus.Completed, TaskStatus.Cancelled, TaskStatus.Running, TaskStatus.Failed}: 196 ↛ 197line 196 didn't jump to line 197 because the condition on line 196 was never true

197 return 

198 

199 handle_task_running_event(task.id) 

200 try: 

201 project = _get_pinch_project(payload.project_id) 

202 try: 

203 upload_session = UploadSession.objects.get(id=payload.upload_session_id) 

204 except UploadSession.DoesNotExist as exc: 

205 raise CsvInspectionError("The upload session was deleted before the import could run.") from exc 

206 

207 if upload_session.status != UploadSessionStatus.COMPLETED: 207 ↛ 208line 207 didn't jump to line 208 because the condition on line 207 was never true

208 raise CsvInspectionError("The upload session must be completed before Pinch utility import can run.") 

209 if upload_session.purpose != UploadSessionPurpose.PINCH_UTILITY_CSV: 209 ↛ 210line 209 didn't jump to line 210 because the condition on line 209 was never true

210 raise CsvInspectionError("The upload session purpose must be pinch_utility_csv for Pinch import.") 

211 if upload_session.flowsheet_id != project.flowsheet_id: 211 ↛ 212line 211 didn't jump to line 212 because the condition on line 211 was never true

212 raise CsvInspectionError("The upload session belongs to a different flowsheet.") 

213 

214 delimiter = payload.csv_delimiter or upload_session.csv_delimiter 

215 if not delimiter: 

216 delimiter = inspect_upload_session(upload_session).delimiter 

217 

218 with transaction.atomic(): 

219 # Pinch CSV imports replace the current utility table for the project. 

220 PinchUtility.objects.filter(input_owner=project.Inputs).delete() 

221 

222 body = s3_storage.stream_object(payload.bucket, payload.object_key) 

223 _, reader = stream_csv_rows(body, delimiter) 

224 

225 utility_buffer: list[PinchUtility] = [] 

226 imported_count = 0 

227 

228 for row_index, row in enumerate(reader): 

229 row_number = row_index + 2 

230 utility = _build_utility(row_number, row, project) 

231 if utility is None: 231 ↛ 232line 231 didn't jump to line 232 because the condition on line 231 was never true

232 continue 

233 utility_buffer.append(utility) 

234 

235 if len(utility_buffer) >= ROW_BATCH_SIZE: 235 ↛ 236line 235 didn't jump to line 236 because the condition on line 235 was never true

236 imported_count = _flush_utilities( 

237 task=task, 

238 utility_buffer=utility_buffer, 

239 imported_count=imported_count, 

240 ) 

241 

242 imported_count = _flush_utilities( 

243 task=task, 

244 utility_buffer=utility_buffer, 

245 imported_count=imported_count, 

246 ) 

247 

248 if imported_count == 0: 248 ↛ 249line 248 didn't jump to line 249 because the condition on line 248 was never true

249 raise CsvInspectionError("The uploaded CSV must include at least one utility row.") 

250 

251 task.status = TaskStatus.Completed 

252 task.completed_time = timezone.now() 

253 task.debug = {**(task.debug or {}), "utilities_imported": imported_count} 

254 task.error = None 

255 task.save(update_fields=["status", "completed_time", "debug", "error"]) 

256 except Exception as exc: 

257 task.status = TaskStatus.Failed 

258 task.completed_time = timezone.now() 

259 task.error = { 

260 "message": str(exc), 

261 "cause": "pinch_utility_csv_import", 

262 "traceback": traceback.format_exc(), 

263 } 

264 task.save(update_fields=["status", "completed_time", "error"]) 

265 _send_task_update(task, NotificationServiceMessageType.TASK_COMPLETED) 

266 return 

267 

268 _send_task_update(task, NotificationServiceMessageType.TASK_COMPLETED) 

269 

270 

271def mark_pinch_utility_import_delivery_failure(payload: PinchUtilityCsvImportRequestPayload): 

272 """Fail a queued Pinch utility CSV import when Dapr cannot deliver the callback.""" 

273 task = Task.objects.get(id=payload.task_id) 

274 if task.status in {TaskStatus.Completed, TaskStatus.Cancelled, TaskStatus.Failed}: 274 ↛ 275line 274 didn't jump to line 275 because the condition on line 274 was never true

275 return 

276 

277 task.status = TaskStatus.Failed 

278 task.completed_time = timezone.now() 

279 task.error = { 

280 "message": ( 

281 "The utility CSV import could not be started. " 

282 "Retry the import to queue a new task." 

283 ), 

284 "cause": "pinch_utility_csv_import_delivery_failed", 

285 } 

286 task.save(update_fields=["status", "completed_time", "error"]) 

287 _send_task_update(task, NotificationServiceMessageType.TASK_COMPLETED)