Coverage for backend/django/core/auxiliary/services/uploads.py: 80%

223 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2026-05-13 02:47 +0000

1import logging 

2import os 

3import re 

4import uuid 

5from datetime import timedelta 

6from typing import Any 

7 

8from botocore.exceptions import ClientError 

9from django.conf import settings 

10from django.utils import timezone 

11from rest_framework.exceptions import NotFound, PermissionDenied, ValidationError 

12 

13from core.auxiliary.models.MLModel import MLModel 

14from core.auxiliary.models.Scenario import Scenario, ScenarioTabTypeEnum 

15from core.auxiliary.models.UploadSession import ( 

16 UploadSession, 

17 UploadSessionPurpose, 

18 UploadSessionStatus, 

19) 

20from core.auxiliary.services.csv_inspect import ( 

21 DEFAULT_INSPECTION_BYTES, 

22 CsvInspectionResult, 

23 inspect_csv_sample, 

24) 

25from core.auxiliary.services.csv_lifecycle import ( 

26 SHORT_CSV_LIFECYCLE_TTL, 

27 completed_csv_lifecycle_ttl, 

28 expires_at_from_ttl, 

29) 

30from core.auxiliary.services.object_storage import s3 as s3_storage 

31from core.auxiliary.services.upload_types import ( 

32 CompletedMultipartPart, 

33 CompleteUploadResult, 

34 MultipartInitiationResult, 

35 MultipartPartUrlsResult, 

36 MultipartUploadStatusResult, 

37 UploadCleanupSummary, 

38) 

39from flowsheetInternals.unitops.models import SimulationObject 

40 

41 

42MULTIPART_PART_SIZE_BYTES = 8 * 1024 * 1024 

43PRESIGNED_PART_URL_EXPIRY_SECONDS = 60 * 60 

44MAX_NORMALIZED_FILENAME_LENGTH = 128 

45 

46 

47logger = logging.getLogger(__name__) 

48 

49 

50def _schedule_csv_expiration( 

51 *, 

52 bucket: str, 

53 object_key: str, 

54 ttl: timedelta, 

55 reference_time=None, 

56): 

57 """Mirror the application TTL policy into the bucket lifecycle configuration.""" 

58 expires_at = expires_at_from_ttl(ttl, reference_time=reference_time) 

59 s3_storage.schedule_object_expiration( 

60 bucket=bucket, 

61 key=object_key, 

62 expires_at=expires_at, 

63 ) 

64 return expires_at 

65 

66 

67def _can_resume_upload_session(upload_session: UploadSession, *, now=None) -> bool: 

68 """Return whether an upload session is still eligible for browser resume calls.""" 

69 current_time = now or timezone.now() 

70 return ( 

71 upload_session.status in {UploadSessionStatus.INITIATED, UploadSessionStatus.UPLOADING} 

72 and (upload_session.expires_at is None or upload_session.expires_at > current_time) 

73 ) 

74 

75 

76def _get_in_progress_upload_session(*, upload_session_id: int, user_id: int) -> UploadSession: 

77 """Load, reconcile, and validate an upload session before accepting more browser parts.""" 

78 upload_session = _synchronise_upload_session_state( 

79 _get_owned_upload_session(upload_session_id, user_id) 

80 ) 

81 if upload_session.status == UploadSessionStatus.EXPIRED: 

82 raise ValidationError( 

83 { 

84 "upload_session_id": ( 

85 "This upload session expired and cannot be resumed. Choose the CSV again to start over." 

86 ) 

87 } 

88 ) 

89 return upload_session 

90 

91 

92def _validate_flowsheet_linkage( 

93 *, 

94 flowsheet_id: int, 

95 scenario: Scenario | None = None, 

96 simulation_object: SimulationObject | None = None, 

97) -> None: 

98 """Ensure the provided flowsheet id matches the linked scenario or simulation object.""" 

99 linked_flowsheet_id = None 

100 if scenario is not None: 

101 linked_flowsheet_id = scenario.flowsheet_id 

102 elif simulation_object is not None: 

103 linked_flowsheet_id = simulation_object.flowsheet_id 

104 

105 if linked_flowsheet_id is not None and linked_flowsheet_id != flowsheet_id: 105 ↛ 106line 105 didn't jump to line 106 because the condition on line 105 was never true

106 raise ValidationError({"flowsheet_id": "flowsheet_id does not match the referenced scenario or simulation object."}) 

107 

108 

109def _normalise_filename(filename: str) -> str: 

110 """Sanitize a client-supplied filename so it is safe to embed into an object key.""" 

111 base_name = filename.rsplit("/", 1)[-1].rsplit("\\", 1)[-1] 

112 cleaned = re.sub(r"[^A-Za-z0-9._-]", "_", base_name).strip("._") 

113 if not cleaned: 113 ↛ 114line 113 didn't jump to line 114 because the condition on line 113 was never true

114 return "upload.csv" 

115 

116 stem, suffix = os.path.splitext(cleaned) 

117 suffix = suffix[:16] 

118 max_stem_length = max(1, MAX_NORMALIZED_FILENAME_LENGTH - len(suffix)) 

119 truncated_stem = stem[:max_stem_length].rstrip("._-") or "upload" 

120 return f"{truncated_stem}{suffix}" 

121 

122 

123def _build_object_key(purpose: str, flowsheet_id: int, original_filename: str) -> str: 

124 """Build a unique object key for a CSV upload scoped to a flowsheet and purpose.""" 

125 safe_name = _normalise_filename(original_filename) 

126 return f"csv-uploads/{purpose}/flowsheet-{flowsheet_id}/{uuid.uuid4().hex}-{safe_name}" 

127 

128 

129def _get_simulation_object(simulation_object_id: int) -> SimulationObject: 

130 try: 

131 return SimulationObject.objects.get(id=simulation_object_id) 

132 except SimulationObject.DoesNotExist as exc: 

133 raise NotFound({"simulationObject_id": "SimulationObject not found."}) from exc 

134 

135 

136def _get_scenario(scenario_id: int) -> Scenario: 

137 try: 

138 return Scenario.objects.get(id=scenario_id) 

139 except Scenario.DoesNotExist as exc: 

140 raise NotFound({"scenario_id": "Scenario not found."}) from exc 

141 

142 

143def _get_owned_upload_session(upload_session_id: int, user_id: int) -> UploadSession: 

144 """Load an upload session and enforce ownership checks for the calling user.""" 

145 try: 

146 upload_session = UploadSession.objects.select_related("scenario", "simulationObject").get(id=upload_session_id) 

147 except UploadSession.DoesNotExist as exc: 

148 raise NotFound({"upload_session_id": "Upload session not found."}) from exc 

149 

150 if upload_session.created_by_id != user_id: 150 ↛ 151line 150 didn't jump to line 151 because the condition on line 150 was never true

151 raise PermissionDenied("You do not own this upload session.") 

152 

153 return upload_session 

154 

155 

156def initiate_upload_session( 

157 *, 

158 user, 

159 flowsheet_id: int, 

160 purpose: str, 

161 original_filename: str, 

162 content_type: str, 

163 size_bytes: int, 

164 scenario_id: int | None = None, 

165 simulation_object_id: int | None = None, 

166) -> MultipartInitiationResult: 

167 """Create an upload session and the matching multipart upload in object storage.""" 

168 if size_bytes <= 0: 168 ↛ 169line 168 didn't jump to line 169 because the condition on line 168 was never true

169 raise ValidationError({"size_bytes": "size_bytes must be greater than zero."}) 

170 

171 scenario = None 

172 simulation_object = None 

173 if purpose == UploadSessionPurpose.ML_TRAINING_CSV: 

174 if simulation_object_id is None: 174 ↛ 175line 174 didn't jump to line 175 because the condition on line 174 was never true

175 raise ValidationError({"simulationObject_id": "simulationObject_id is required for ML uploads."}) 

176 simulation_object = _get_simulation_object(simulation_object_id) 

177 elif purpose == UploadSessionPurpose.SCENARIO_CSV: 

178 if scenario_id is None: 178 ↛ 179line 178 didn't jump to line 179 because the condition on line 178 was never true

179 raise ValidationError({"scenario_id": "scenario_id is required for scenario CSV uploads."}) 

180 scenario = _get_scenario(scenario_id) 

181 if scenario.state_name not in { 181 ↛ 185line 181 didn't jump to line 185 because the condition on line 181 was never true

182 ScenarioTabTypeEnum.MultiSteadyState, 

183 ScenarioTabTypeEnum.Dynamic, 

184 }: 

185 raise ValidationError( 

186 { 

187 "scenario_id": ( 

188 "Only multi steady-state and dynamic scenarios support scenario CSV uploads." 

189 ) 

190 } 

191 ) 

192 elif purpose == UploadSessionPurpose.PINCH_UTILITY_CSV: 192 ↛ 200line 192 didn't jump to line 200 because the condition on line 192 was always true

193 if simulation_object_id is not None or scenario_id is not None: 193 ↛ 194line 193 didn't jump to line 194 because the condition on line 193 was never true

194 raise ValidationError( 

195 { 

196 "purpose": "Pinch utility uploads do not accept scenario_id or simulationObject_id linkage." 

197 } 

198 ) 

199 else: 

200 raise ValidationError({"purpose": "Unsupported upload purpose."}) 

201 

202 _validate_flowsheet_linkage( 

203 flowsheet_id=flowsheet_id, 

204 scenario=scenario, 

205 simulation_object=simulation_object, 

206 ) 

207 

208 bucket = s3_storage.get_bucket_name() 

209 object_key = _build_object_key(purpose, flowsheet_id, original_filename) 

210 upload_id = s3_storage.create_multipart_upload(bucket, object_key, content_type) 

211 initial_expires_at = _schedule_csv_expiration( 

212 bucket=bucket, 

213 object_key=object_key, 

214 ttl=SHORT_CSV_LIFECYCLE_TTL, 

215 ) 

216 

217 upload_session = UploadSession.objects.create( 

218 created_by=user, 

219 flowsheet_id=flowsheet_id, 

220 purpose=purpose, 

221 scenario=scenario, 

222 simulationObject=simulation_object, 

223 bucket=bucket, 

224 object_key=object_key, 

225 s3_upload_id=upload_id, 

226 original_filename=_normalise_filename(original_filename), 

227 content_type=content_type or "text/csv", 

228 size_bytes=size_bytes, 

229 status=UploadSessionStatus.INITIATED, 

230 expires_at=initial_expires_at, 

231 ) 

232 

233 return MultipartInitiationResult( 

234 upload_session_id=upload_session.id, 

235 bucket=upload_session.bucket, 

236 object_key=upload_session.object_key, 

237 s3_upload_id=upload_session.s3_upload_id, 

238 part_size_bytes=MULTIPART_PART_SIZE_BYTES, 

239 expires_in_seconds=PRESIGNED_PART_URL_EXPIRY_SECONDS, 

240 ) 

241 

242 

243def get_part_urls(*, upload_session_id: int, part_numbers: list[int], user_id: int) -> MultipartPartUrlsResult: 

244 """Generate presigned URLs for the requested multipart part numbers.""" 

245 upload_session = _get_in_progress_upload_session( 

246 upload_session_id=upload_session_id, 

247 user_id=user_id, 

248 ) 

249 if upload_session.status not in {UploadSessionStatus.INITIATED, UploadSessionStatus.UPLOADING}: 249 ↛ 250line 249 didn't jump to line 250 because the condition on line 249 was never true

250 raise ValidationError({"upload_session_id": "Part URLs can only be requested while an upload is in progress."}) 

251 

252 if upload_session.status == UploadSessionStatus.INITIATED: 252 ↛ 256line 252 didn't jump to line 256 because the condition on line 252 was always true

253 upload_session.status = UploadSessionStatus.UPLOADING 

254 upload_session.save(update_fields=["status"]) 

255 

256 return MultipartPartUrlsResult( 

257 urls={ 

258 str(part_number): s3_storage.presign_upload_part( 

259 upload_session.bucket, 

260 upload_session.object_key, 

261 upload_session.s3_upload_id, 

262 part_number, 

263 PRESIGNED_PART_URL_EXPIRY_SECONDS, 

264 ) 

265 for part_number in part_numbers 

266 } 

267 ) 

268 

269 

270def get_upload_session(*, upload_session_id: int, user_id: int) -> UploadSession: 

271 """Return an owned upload session after reconciling any stale object-storage state.""" 

272 upload_session = _get_owned_upload_session(upload_session_id, user_id) 

273 return _synchronise_upload_session_state(upload_session) 

274 

275 

276def get_upload_status(*, upload_session_id: int, user_id: int) -> MultipartUploadStatusResult: 

277 """Return the current upload-session status and any uploaded multipart parts.""" 

278 now = timezone.now() 

279 upload_session = _synchronise_upload_session_state( 

280 _get_owned_upload_session(upload_session_id, user_id) 

281 ) 

282 if upload_session.status in { 

283 UploadSessionStatus.COMPLETED, 

284 UploadSessionStatus.ABORTED, 

285 UploadSessionStatus.FAILED, 

286 UploadSessionStatus.EXPIRED, 

287 }: 

288 return MultipartUploadStatusResult( 

289 upload_session_id=upload_session.id, 

290 status=upload_session.status, 

291 can_resume=_can_resume_upload_session(upload_session, now=now), 

292 expires_at=upload_session.expires_at, 

293 uploaded_parts=[], 

294 part_size_bytes=MULTIPART_PART_SIZE_BYTES, 

295 bucket=upload_session.bucket, 

296 object_key=upload_session.object_key, 

297 s3_upload_id=upload_session.s3_upload_id, 

298 ) 

299 

300 uploaded_parts = s3_storage.list_uploaded_parts( 

301 upload_session.bucket, 

302 upload_session.object_key, 

303 upload_session.s3_upload_id, 

304 ) 

305 return MultipartUploadStatusResult( 

306 upload_session_id=upload_session.id, 

307 status=upload_session.status, 

308 can_resume=_can_resume_upload_session(upload_session, now=now), 

309 expires_at=upload_session.expires_at, 

310 uploaded_parts=uploaded_parts, 

311 part_size_bytes=MULTIPART_PART_SIZE_BYTES, 

312 bucket=upload_session.bucket, 

313 object_key=upload_session.object_key, 

314 s3_upload_id=upload_session.s3_upload_id, 

315 ) 

316 

317 

318def _validate_completed_parts(parts: list[dict[str, object]]) -> list[CompletedMultipartPart]: 

319 """Reject malformed multipart completion payloads before sending them to S3.""" 

320 if not parts: 

321 raise ValidationError({"parts": "At least one uploaded part is required."}) 

322 

323 seen_part_numbers: set[int] = set() 

324 validated_parts: list[CompletedMultipartPart] = [] 

325 for raw_part in parts: 

326 part = CompletedMultipartPart.model_validate(raw_part) 

327 part_number = part.part_number 

328 etag = part.etag.strip() 

329 if part_number in seen_part_numbers: 

330 raise ValidationError({"parts": "Duplicate part numbers are not allowed in multipart completion."}) 

331 if not etag: 331 ↛ 332line 331 didn't jump to line 332 because the condition on line 331 was never true

332 raise ValidationError({"parts": f"Part {part_number} is missing an etag."}) 

333 

334 seen_part_numbers.add(part_number) 

335 validated_parts.append(CompletedMultipartPart(part_number=part_number, etag=etag)) 

336 

337 return validated_parts 

338 

339 

340def complete_upload(*, upload_session_id: int, parts: list[dict[str, object]], user_id: int) -> CompleteUploadResult: 

341 """Finalize a multipart upload session and persist any expiry metadata.""" 

342 upload_session = _get_in_progress_upload_session( 

343 upload_session_id=upload_session_id, 

344 user_id=user_id, 

345 ) 

346 if upload_session.status == UploadSessionStatus.COMPLETED: 346 ↛ 347line 346 didn't jump to line 347 because the condition on line 346 was never true

347 return CompleteUploadResult( 

348 upload_session_id=upload_session.id, 

349 bucket=upload_session.bucket, 

350 object_key=upload_session.object_key, 

351 size_bytes=upload_session.size_bytes, 

352 etag=None, 

353 location=None, 

354 ) 

355 

356 if upload_session.status not in {UploadSessionStatus.INITIATED, UploadSessionStatus.UPLOADING}: 356 ↛ 357line 356 didn't jump to line 357 because the condition on line 356 was never true

357 raise ValidationError({"upload_session_id": "Only in-progress uploads can be completed."}) 

358 

359 validated_parts = _validate_completed_parts(parts) 

360 completion = s3_storage.complete_multipart_upload( 

361 upload_session.bucket, 

362 upload_session.object_key, 

363 upload_session.s3_upload_id, 

364 validated_parts, 

365 ) 

366 

367 upload_session.status = UploadSessionStatus.COMPLETED 

368 upload_session.completed_at = timezone.now() 

369 upload_session.expires_at = _schedule_csv_expiration( 

370 bucket=upload_session.bucket, 

371 object_key=upload_session.object_key, 

372 ttl=completed_csv_lifecycle_ttl(upload_session.purpose), 

373 reference_time=upload_session.completed_at, 

374 ) 

375 upload_session.save(update_fields=["status", "completed_at", "expires_at"]) 

376 return CompleteUploadResult( 

377 upload_session_id=upload_session.id, 

378 bucket=completion.bucket, 

379 object_key=completion.object_key, 

380 size_bytes=upload_session.size_bytes, 

381 etag=completion.etag, 

382 location=completion.location, 

383 ) 

384 

385 

386def _abort_multipart_upload_best_effort(upload_session: UploadSession) -> None: 

387 try: 

388 s3_storage.abort_multipart_upload( 

389 upload_session.bucket, 

390 upload_session.object_key, 

391 upload_session.s3_upload_id, 

392 ) 

393 except ClientError: 

394 logger.warning( 

395 "Failed to abort multipart upload during session reconciliation.", 

396 extra={"upload_session_id": upload_session.id}, 

397 exc_info=True, 

398 ) 

399 

400 

401def _synchronise_upload_session_state(upload_session: UploadSession) -> UploadSession: 

402 """Update persisted session state from age-based and object-storage-derived facts.""" 

403 now = timezone.now() 

404 

405 if upload_session.status in {UploadSessionStatus.INITIATED, UploadSessionStatus.UPLOADING}: 

406 # The bucket lifecycle eventually removes abandoned parts, but the API must 

407 # stop resumptions as soon as the one-hour window has elapsed. 

408 if upload_session.expires_at is not None and upload_session.expires_at <= now: 

409 _abort_multipart_upload_best_effort(upload_session) 

410 upload_session.status = UploadSessionStatus.EXPIRED 

411 upload_session.save(update_fields=["status"]) 

412 return upload_session 

413 

414 stale_cutoff = now - timedelta(hours=settings.SEAWEED_STALE_MULTIPART_UPLOAD_HOURS) 

415 if upload_session.created_at < stale_cutoff: 415 ↛ 416line 415 didn't jump to line 416 because the condition on line 415 was never true

416 _abort_multipart_upload_best_effort(upload_session) 

417 upload_session.status = UploadSessionStatus.ABORTED 

418 upload_session.save(update_fields=["status"]) 

419 return upload_session 

420 

421 if ( 

422 upload_session.status == UploadSessionStatus.COMPLETED 

423 and upload_session.expires_at is not None 

424 and upload_session.expires_at <= now 

425 and not s3_storage.object_exists(upload_session.bucket, upload_session.object_key) 

426 ): 

427 upload_session.status = UploadSessionStatus.EXPIRED 

428 upload_session.save(update_fields=["status"]) 

429 

430 return upload_session 

431 

432 

433def abort_upload(*, upload_session_id: int, user_id: int) -> UploadSession: 

434 """Abort an in-progress multipart upload session.""" 

435 upload_session = _synchronise_upload_session_state( 

436 _get_owned_upload_session(upload_session_id, user_id) 

437 ) 

438 if upload_session.status == UploadSessionStatus.ABORTED: 

439 return upload_session 

440 

441 if upload_session.status not in {UploadSessionStatus.INITIATED, UploadSessionStatus.UPLOADING}: 

442 raise ValidationError({"upload_session_id": "Only in-progress uploads can be aborted."}) 

443 

444 s3_storage.abort_multipart_upload( 

445 upload_session.bucket, 

446 upload_session.object_key, 

447 upload_session.s3_upload_id, 

448 ) 

449 upload_session.status = UploadSessionStatus.ABORTED 

450 upload_session.save(update_fields=["status"]) 

451 return upload_session 

452 

453 

454def inspect_upload_session(upload_session: UploadSession) -> CsvInspectionResult: 

455 """Inspect an uploaded CSV object and cache its header metadata on the session.""" 

456 if upload_session.status != UploadSessionStatus.COMPLETED: 456 ↛ 457line 456 didn't jump to line 457 because the condition on line 456 was never true

457 raise ValidationError({"upload_session_id": "The upload session must be completed before CSV inspection can run."}) 

458 

459 if upload_session.csv_headers and upload_session.csv_delimiter: 

460 sample_bytes = s3_storage.get_range( 

461 upload_session.bucket, 

462 upload_session.object_key, 

463 f"0-{DEFAULT_INSPECTION_BYTES - 1}", 

464 ) 

465 inspection = inspect_csv_sample(sample_bytes) 

466 return CsvInspectionResult( 

467 headers=list(upload_session.csv_headers), 

468 delimiter=upload_session.csv_delimiter, 

469 preview_rows=inspection.preview_rows, 

470 warnings=list(upload_session.csv_warnings or inspection.warnings), 

471 ) 

472 

473 sample_bytes = s3_storage.get_range( 

474 upload_session.bucket, 

475 upload_session.object_key, 

476 f"0-{DEFAULT_INSPECTION_BYTES - 1}", 

477 ) 

478 inspection = inspect_csv_sample(sample_bytes) 

479 

480 upload_session.csv_headers = inspection.headers 

481 upload_session.csv_delimiter = inspection.delimiter 

482 upload_session.csv_warnings = inspection.warnings 

483 upload_session.csv_inspected_at = timezone.now() 

484 upload_session.save( 

485 update_fields=[ 

486 "csv_headers", 

487 "csv_delimiter", 

488 "csv_warnings", 

489 "csv_inspected_at", 

490 ] 

491 ) 

492 return inspection 

493 

494 

495def attach_upload_to_ml_model(*, upload_session_id: int, simulation_object_id: int, user_id: int) -> MLModel: 

496 """Create or reuse an ML model bound to a completed CSV upload session.""" 

497 upload_session = _get_owned_upload_session(upload_session_id, user_id) 

498 if upload_session.status != UploadSessionStatus.COMPLETED: 498 ↛ 499line 498 didn't jump to line 499 because the condition on line 498 was never true

499 raise ValidationError({"upload_session_id": "The upload session must be completed before it can be attached to an ML model."}) 

500 if upload_session.purpose != UploadSessionPurpose.ML_TRAINING_CSV: 500 ↛ 501line 500 didn't jump to line 501 because the condition on line 500 was never true

501 raise ValidationError({"upload_session_id": "The upload session purpose must be ml_training_csv."}) 

502 

503 simulation_object = _get_simulation_object(simulation_object_id) 

504 if upload_session.simulationObject_id and upload_session.simulationObject_id != simulation_object.id: 504 ↛ 505line 504 didn't jump to line 505 because the condition on line 504 was never true

505 raise ValidationError({"upload_session_id": "This upload session belongs to a different simulation object."}) 

506 if upload_session.flowsheet_id != simulation_object.flowsheet_id: 506 ↛ 507line 506 didn't jump to line 507 because the condition on line 506 was never true

507 raise ValidationError({"upload_session_id": "This upload session belongs to a different flowsheet."}) 

508 

509 existing_model = MLModel.objects.filter( 

510 simulationObject=simulation_object, 

511 csv_upload_session=upload_session, 

512 ).order_by("-id").first() 

513 if existing_model is not None: 

514 return existing_model 

515 

516 inspection = inspect_upload_session(upload_session) 

517 if not inspection.preview_rows: 

518 raise ValidationError({"upload_session_id": "The uploaded CSV must include at least one data row."}) 

519 

520 return MLModel.objects.create( 

521 flowsheet_id=simulation_object.flowsheet_id, 

522 simulationObject=simulation_object, 

523 csv_file_name=upload_session.original_filename, 

524 csv_bucket=upload_session.bucket, 

525 csv_object_key=upload_session.object_key, 

526 csv_headers=inspection.headers, 

527 csv_delimiter=inspection.delimiter, 

528 csv_upload_session=upload_session, 

529 surrogate_model={}, 

530 progress=1, 

531 ) 

532 

533 

534def cleanup_upload_sessions() -> UploadCleanupSummary: 

535 """Abort stale multipart uploads that have not been reconciled through normal retrieval.""" 

536 now = timezone.now() 

537 stale_cutoff = now - timedelta(hours=settings.SEAWEED_STALE_MULTIPART_UPLOAD_HOURS) 

538 aborted_count = 0 

539 

540 stale_uploads = UploadSession.objects.filter( 

541 status__in=[UploadSessionStatus.INITIATED, UploadSessionStatus.UPLOADING], 

542 created_at__lt=stale_cutoff, 

543 ) 

544 for upload_session in stale_uploads: 

545 s3_storage.abort_multipart_upload( 

546 upload_session.bucket, 

547 upload_session.object_key, 

548 upload_session.s3_upload_id, 

549 ) 

550 upload_session.status = UploadSessionStatus.ABORTED 

551 upload_session.save(update_fields=["status"]) 

552 aborted_count += 1 

553 

554 return UploadCleanupSummary( 

555 aborted_stale_uploads=aborted_count, 

556 expired_ml_uploads=0, 

557 )