Coverage for backend/django/core/auxiliary/services/scenario_import.py: 80%

146 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2026-05-13 02:47 +0000

1import traceback 

2from itertools import chain 

3 

4from django.db import transaction 

5from django.utils import timezone 

6from rest_framework.exceptions import NotFound, ValidationError 

7 

8from common.models.notifications.payloads import NotificationServiceMessageType 

9from common.models.scenario_import import ScenarioCsvImportRequestPayload 

10from common.services import messaging 

11from core.auxiliary.enums.generalEnums import TaskStatus 

12from core.auxiliary.managers.TaskManager import handle_task_running_event 

13from core.auxiliary.models.DataCell import DataCell 

14from core.auxiliary.models.DataColumn import DataColumn 

15from core.auxiliary.models.DataRow import DataRow 

16from core.auxiliary.models.Scenario import Scenario, ScenarioTabTypeEnum 

17from core.auxiliary.models.Task import Task, TaskType 

18from core.auxiliary.models.UploadSession import ( 

19 UploadSession, 

20 UploadSessionPurpose, 

21 UploadSessionStatus, 

22) 

23from core.auxiliary.serializers import TaskSerializer 

24from core.auxiliary.services.csv_inspect import ( 

25 CsvInspectionError, 

26 filter_numeric_headers_by_first_row, 

27 parse_numeric_row, 

28 stream_csv_rows, 

29) 

30from core.auxiliary.services.object_storage import s3 as s3_storage 

31from core.auxiliary.services.uploads import inspect_upload_session 

32 

33 

34ROW_BATCH_SIZE = 1000 

35PROGRESS_UPDATE_INTERVAL = 1000 

36 

37 

38def _send_task_update(task: Task, message_type: NotificationServiceMessageType): 

39 messaging.send_flowsheet_notification_message( 

40 task.flowsheet_id, 

41 TaskSerializer(task).data, 

42 message_type, 

43 ) 

44 

45 

46def _get_completed_scenario_upload( 

47 upload_session_id: int, 

48 user_id: int, 

49 scenario_id: int, 

50) -> UploadSession: 

51 try: 

52 upload_session = UploadSession.objects.get(id=upload_session_id) 

53 except UploadSession.DoesNotExist as exc: 

54 raise ValidationError({"upload_session_id": "Upload session not found."}) from exc 

55 

56 if upload_session.created_by_id != user_id: 56 ↛ 57line 56 didn't jump to line 57 because the condition on line 56 was never true

57 raise ValidationError({"upload_session_id": "You do not own this upload session."}) 

58 if upload_session.status != UploadSessionStatus.COMPLETED: 58 ↛ 59line 58 didn't jump to line 59 because the condition on line 58 was never true

59 raise ValidationError({"upload_session_id": "The upload session must be completed before import."}) 

60 if upload_session.purpose != UploadSessionPurpose.SCENARIO_CSV: 60 ↛ 61line 60 didn't jump to line 61 because the condition on line 60 was never true

61 raise ValidationError({"upload_session_id": "The upload session purpose must be scenario_csv."}) 

62 if upload_session.scenario_id and upload_session.scenario_id != scenario_id: 62 ↛ 63line 62 didn't jump to line 63 because the condition on line 62 was never true

63 raise ValidationError({"upload_session_id": "The upload session belongs to a different scenario."}) 

64 

65 return upload_session 

66 

67 

68def _get_importable_scenario(scenario_id: int) -> Scenario: 

69 """Resolve and validate the target scenario before queueing a scenario CSV import.""" 

70 try: 

71 scenario = Scenario.objects.get(id=scenario_id) 

72 except Scenario.DoesNotExist as exc: 

73 raise NotFound({"scenario_id": "Scenario not found."}) from exc 

74 

75 if scenario.state_name not in { 75 ↛ 79line 75 didn't jump to line 79 because the condition on line 75 was never true

76 ScenarioTabTypeEnum.MultiSteadyState, 

77 ScenarioTabTypeEnum.Dynamic, 

78 }: 

79 raise ValidationError( 

80 { 

81 "scenario_id": ( 

82 "Only multi steady-state and dynamic scenarios support scenario CSV import." 

83 ) 

84 } 

85 ) 

86 

87 return scenario 

88 

89 

90def enqueue_scenario_import(*, user, scenario_id: int, upload_session_id: int) -> Task: 

91 """Queue a scenario CSV import unless an active task already owns that upload. 

92 

93 Reusing non-terminal tasks prevents duplicate pub/sub deliveries for the same 

94 upload session, while failed and cancelled tasks remain retryable. 

95 """ 

96 scenario = _get_importable_scenario(scenario_id) 

97 upload_session = _get_completed_scenario_upload(upload_session_id, user.id, scenario_id) 

98 if upload_session.flowsheet_id != scenario.flowsheet_id: 

99 raise ValidationError({"upload_session_id": "The upload session belongs to a different flowsheet."}) 

100 

101 existing_task = ( 

102 Task.objects.filter( 

103 task_type=TaskType.CSV_IMPORT_SCENARIO, 

104 flowsheet_id=scenario.flowsheet_id, 

105 debug__scenario_id=scenario_id, 

106 debug__upload_session_id=upload_session_id, 

107 status__in=[TaskStatus.Pending, TaskStatus.Running, TaskStatus.Cancelling], 

108 ) 

109 .order_by("-start_time") 

110 .first() 

111 ) 

112 if existing_task is not None: 

113 return existing_task 

114 

115 task = Task.create( 

116 user, 

117 scenario.flowsheet_id, 

118 task_type=TaskType.CSV_IMPORT_SCENARIO, 

119 status=TaskStatus.Pending, 

120 save=True, 

121 ) 

122 task.debug = { 

123 "scenario_id": scenario_id, 

124 "upload_session_id": upload_session_id, 

125 "rows_imported": 0, 

126 } 

127 task.save(update_fields=["debug"]) 

128 

129 messaging.send_scenario_csv_import_message( 

130 ScenarioCsvImportRequestPayload( 

131 task_id=task.id, 

132 flowsheet_id=scenario.flowsheet_id, 

133 scenario_id=scenario_id, 

134 upload_session_id=upload_session_id, 

135 bucket=upload_session.bucket, 

136 object_key=upload_session.object_key, 

137 csv_delimiter=upload_session.csv_delimiter or None, 

138 requested_by_user_id=user.id, 

139 ) 

140 ) 

141 return task 

142 

143 

144def _flush_rows( 

145 *, 

146 task: Task, 

147 scenario: Scenario, 

148 headers: list[str], 

149 columns_by_name: dict[str, DataColumn], 

150 row_buffer: list[DataRow], 

151 value_buffer: list[list[float]], 

152 imported_count: int, 

153) -> int: 

154 if not row_buffer: 154 ↛ 155line 154 didn't jump to line 155 because the condition on line 154 was never true

155 return imported_count 

156 

157 DataRow.objects.bulk_create(row_buffer) 

158 cells: list[DataCell] = [] 

159 for data_row, row_values in zip(row_buffer, value_buffer): 

160 for header, value in zip(headers, row_values): 

161 cells.append( 

162 DataCell( 

163 flowsheet_id=scenario.flowsheet_id, 

164 data_column=columns_by_name[header], 

165 data_row=data_row, 

166 value=value, 

167 ) 

168 ) 

169 DataCell.objects.bulk_create(cells) 

170 

171 imported_count += len(row_buffer) 

172 if imported_count % PROGRESS_UPDATE_INTERVAL == 0: 172 ↛ 173line 172 didn't jump to line 173 because the condition on line 172 was never true

173 task.debug = {**(task.debug or {}), "rows_imported": imported_count} 

174 task.save(update_fields=["debug"]) 

175 _send_task_update(task, NotificationServiceMessageType.TASK_UPDATED) 

176 

177 row_buffer.clear() 

178 value_buffer.clear() 

179 return imported_count 

180 

181 

182def process_scenario_import(payload: ScenarioCsvImportRequestPayload): 

183 """Import scenario CSV data from object storage into scenario data tables. 

184 

185 The task is moved from `pending` to `running` by the task manager, then the 

186 CSV is streamed from object storage so large uploads do not need to fit in 

187 memory. Any import failure is recorded on the task and broadcast to the UI. 

188 """ 

189 task = Task.objects.get(id=payload.task_id) 

190 if task.status in {TaskStatus.Completed, TaskStatus.Cancelled, TaskStatus.Running, TaskStatus.Failed}: 190 ↛ 191line 190 didn't jump to line 191 because the condition on line 190 was never true

191 return 

192 

193 handle_task_running_event(task.id) 

194 try: 

195 scenario = _get_importable_scenario(payload.scenario_id) 

196 try: 

197 upload_session = UploadSession.objects.get(id=payload.upload_session_id) 

198 except UploadSession.DoesNotExist as exc: 

199 raise CsvInspectionError("The upload session was deleted before the import could run.") from exc 

200 

201 if upload_session.status != UploadSessionStatus.COMPLETED: 201 ↛ 202line 201 didn't jump to line 202 because the condition on line 201 was never true

202 raise CsvInspectionError("The upload session must be completed before scenario CSV import can run.") 

203 if upload_session.purpose != UploadSessionPurpose.SCENARIO_CSV: 203 ↛ 204line 203 didn't jump to line 204 because the condition on line 203 was never true

204 raise CsvInspectionError("The upload session purpose must be scenario_csv for scenario CSV import.") 

205 if upload_session.flowsheet_id != scenario.flowsheet_id: 205 ↛ 206line 205 didn't jump to line 206 because the condition on line 205 was never true

206 raise CsvInspectionError("The upload session belongs to a different flowsheet.") 

207 if upload_session.scenario_id and upload_session.scenario_id != scenario.id: 207 ↛ 208line 207 didn't jump to line 208 because the condition on line 207 was never true

208 raise CsvInspectionError("The upload session belongs to a different scenario.") 

209 

210 delimiter = payload.csv_delimiter or upload_session.csv_delimiter 

211 if not delimiter: 

212 delimiter = inspect_upload_session(upload_session).delimiter 

213 

214 with transaction.atomic(): 

215 DataRow.objects.filter(scenario_id=scenario.id).delete() 

216 

217 body = s3_storage.stream_object(payload.bucket, payload.object_key) 

218 headers, reader = stream_csv_rows(body, delimiter) 

219 first_row = next(reader, None) 

220 if first_row is None: 220 ↛ 221line 220 didn't jump to line 221 because the condition on line 220 was never true

221 raise CsvInspectionError("The uploaded CSV must include at least one data row.") 

222 

223 numeric_headers, skipped_headers = filter_numeric_headers_by_first_row(headers, first_row) 

224 if not numeric_headers: 224 ↛ 225line 224 didn't jump to line 225 because the condition on line 224 was never true

225 raise CsvInspectionError( 

226 "The uploaded CSV does not contain any numeric columns in its first data row." 

227 ) 

228 

229 existing_columns = { 

230 column.name: column 

231 for column in DataColumn.objects.filter(scenario_id=scenario.id) 

232 } 

233 missing_headers = [header for header in numeric_headers if header not in existing_columns] 

234 if missing_headers: 234 ↛ 251line 234 didn't jump to line 251 because the condition on line 234 was always true

235 DataColumn.objects.bulk_create( 

236 [ 

237 DataColumn( 

238 flowsheet_id=scenario.flowsheet_id, 

239 scenario_id=scenario.id, 

240 name=header, 

241 value="", 

242 ) 

243 for header in missing_headers 

244 ] 

245 ) 

246 existing_columns = { 

247 column.name: column 

248 for column in DataColumn.objects.filter(scenario_id=scenario.id) 

249 } 

250 

251 row_buffer: list[DataRow] = [] 

252 value_buffer: list[list[float]] = [] 

253 imported_count = 0 

254 

255 for row_index, row in enumerate(chain([first_row], reader)): 

256 row_number = row_index + 2 

257 parsed_values = parse_numeric_row(row_number, row, numeric_headers) 

258 row_buffer.append( 

259 DataRow( 

260 flowsheet_id=scenario.flowsheet_id, 

261 scenario_id=scenario.id, 

262 index=row_index, 

263 ) 

264 ) 

265 value_buffer.append(parsed_values) 

266 

267 if len(row_buffer) >= ROW_BATCH_SIZE: 267 ↛ 268line 267 didn't jump to line 268 because the condition on line 267 was never true

268 imported_count = _flush_rows( 

269 task=task, 

270 scenario=scenario, 

271 headers=numeric_headers, 

272 columns_by_name=existing_columns, 

273 row_buffer=row_buffer, 

274 value_buffer=value_buffer, 

275 imported_count=imported_count, 

276 ) 

277 

278 imported_count = _flush_rows( 

279 task=task, 

280 scenario=scenario, 

281 headers=numeric_headers, 

282 columns_by_name=existing_columns, 

283 row_buffer=row_buffer, 

284 value_buffer=value_buffer, 

285 imported_count=imported_count, 

286 ) 

287 

288 task.status = TaskStatus.Completed 

289 task.completed_time = timezone.now() 

290 task.debug = { 

291 **(task.debug or {}), 

292 "rows_imported": imported_count, 

293 "skipped_non_numeric_headers": skipped_headers, 

294 } 

295 task.error = None 

296 task.save(update_fields=["status", "completed_time", "debug", "error"]) 

297 except Exception as exc: 

298 task.status = TaskStatus.Failed 

299 task.completed_time = timezone.now() 

300 task.error = { 

301 "message": str(exc), 

302 "cause": "scenario_csv_import", 

303 "traceback": traceback.format_exc(), 

304 } 

305 task.save(update_fields=["status", "completed_time", "error"]) 

306 _send_task_update(task, NotificationServiceMessageType.TASK_COMPLETED) 

307 return 

308 

309 _send_task_update(task, NotificationServiceMessageType.TASK_COMPLETED) 

310 

311 

312def mark_scenario_import_delivery_failure(payload: ScenarioCsvImportRequestPayload): 

313 """Fail a queued scenario CSV import when Dapr cannot deliver the callback. 

314 

315 This is distinct from `process_scenario_import`, which only runs after the 

316 callback arrives successfully. Delivery failures need their own task update 

317 or the frontend will continue showing the task as pending. 

318 """ 

319 task = Task.objects.get(id=payload.task_id) 

320 if task.status in {TaskStatus.Completed, TaskStatus.Cancelled, TaskStatus.Failed}: 320 ↛ 321line 320 didn't jump to line 321 because the condition on line 320 was never true

321 return 

322 

323 task.status = TaskStatus.Failed 

324 task.completed_time = timezone.now() 

325 task.error = { 

326 "message": ( 

327 "The scenario CSV import could not be started. " 

328 "Retry the import to queue a new task." 

329 ), 

330 "cause": "scenario_csv_import_delivery_failed", 

331 } 

332 task.save(update_fields=["status", "completed_time", "error"]) 

333 _send_task_update(task, NotificationServiceMessageType.TASK_COMPLETED)