Coverage for backend/django/core/auxiliary/services/uploads.py: 80%
223 statements
« prev ^ index » next coverage.py v7.10.7, created at 2026-05-13 02:47 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2026-05-13 02:47 +0000
1import logging
2import os
3import re
4import uuid
5from datetime import timedelta
6from typing import Any
8from botocore.exceptions import ClientError
9from django.conf import settings
10from django.utils import timezone
11from rest_framework.exceptions import NotFound, PermissionDenied, ValidationError
13from core.auxiliary.models.MLModel import MLModel
14from core.auxiliary.models.Scenario import Scenario, ScenarioTabTypeEnum
15from core.auxiliary.models.UploadSession import (
16 UploadSession,
17 UploadSessionPurpose,
18 UploadSessionStatus,
19)
20from core.auxiliary.services.csv_inspect import (
21 DEFAULT_INSPECTION_BYTES,
22 CsvInspectionResult,
23 inspect_csv_sample,
24)
25from core.auxiliary.services.csv_lifecycle import (
26 SHORT_CSV_LIFECYCLE_TTL,
27 completed_csv_lifecycle_ttl,
28 expires_at_from_ttl,
29)
30from core.auxiliary.services.object_storage import s3 as s3_storage
31from core.auxiliary.services.upload_types import (
32 CompletedMultipartPart,
33 CompleteUploadResult,
34 MultipartInitiationResult,
35 MultipartPartUrlsResult,
36 MultipartUploadStatusResult,
37 UploadCleanupSummary,
38)
39from flowsheetInternals.unitops.models import SimulationObject
42MULTIPART_PART_SIZE_BYTES = 8 * 1024 * 1024
43PRESIGNED_PART_URL_EXPIRY_SECONDS = 60 * 60
44MAX_NORMALIZED_FILENAME_LENGTH = 128
47logger = logging.getLogger(__name__)
50def _schedule_csv_expiration(
51 *,
52 bucket: str,
53 object_key: str,
54 ttl: timedelta,
55 reference_time=None,
56):
57 """Mirror the application TTL policy into the bucket lifecycle configuration."""
58 expires_at = expires_at_from_ttl(ttl, reference_time=reference_time)
59 s3_storage.schedule_object_expiration(
60 bucket=bucket,
61 key=object_key,
62 expires_at=expires_at,
63 )
64 return expires_at
67def _can_resume_upload_session(upload_session: UploadSession, *, now=None) -> bool:
68 """Return whether an upload session is still eligible for browser resume calls."""
69 current_time = now or timezone.now()
70 return (
71 upload_session.status in {UploadSessionStatus.INITIATED, UploadSessionStatus.UPLOADING}
72 and (upload_session.expires_at is None or upload_session.expires_at > current_time)
73 )
76def _get_in_progress_upload_session(*, upload_session_id: int, user_id: int) -> UploadSession:
77 """Load, reconcile, and validate an upload session before accepting more browser parts."""
78 upload_session = _synchronise_upload_session_state(
79 _get_owned_upload_session(upload_session_id, user_id)
80 )
81 if upload_session.status == UploadSessionStatus.EXPIRED:
82 raise ValidationError(
83 {
84 "upload_session_id": (
85 "This upload session expired and cannot be resumed. Choose the CSV again to start over."
86 )
87 }
88 )
89 return upload_session
92def _validate_flowsheet_linkage(
93 *,
94 flowsheet_id: int,
95 scenario: Scenario | None = None,
96 simulation_object: SimulationObject | None = None,
97) -> None:
98 """Ensure the provided flowsheet id matches the linked scenario or simulation object."""
99 linked_flowsheet_id = None
100 if scenario is not None:
101 linked_flowsheet_id = scenario.flowsheet_id
102 elif simulation_object is not None:
103 linked_flowsheet_id = simulation_object.flowsheet_id
105 if linked_flowsheet_id is not None and linked_flowsheet_id != flowsheet_id: 105 ↛ 106line 105 didn't jump to line 106 because the condition on line 105 was never true
106 raise ValidationError({"flowsheet_id": "flowsheet_id does not match the referenced scenario or simulation object."})
109def _normalise_filename(filename: str) -> str:
110 """Sanitize a client-supplied filename so it is safe to embed into an object key."""
111 base_name = filename.rsplit("/", 1)[-1].rsplit("\\", 1)[-1]
112 cleaned = re.sub(r"[^A-Za-z0-9._-]", "_", base_name).strip("._")
113 if not cleaned: 113 ↛ 114line 113 didn't jump to line 114 because the condition on line 113 was never true
114 return "upload.csv"
116 stem, suffix = os.path.splitext(cleaned)
117 suffix = suffix[:16]
118 max_stem_length = max(1, MAX_NORMALIZED_FILENAME_LENGTH - len(suffix))
119 truncated_stem = stem[:max_stem_length].rstrip("._-") or "upload"
120 return f"{truncated_stem}{suffix}"
123def _build_object_key(purpose: str, flowsheet_id: int, original_filename: str) -> str:
124 """Build a unique object key for a CSV upload scoped to a flowsheet and purpose."""
125 safe_name = _normalise_filename(original_filename)
126 return f"csv-uploads/{purpose}/flowsheet-{flowsheet_id}/{uuid.uuid4().hex}-{safe_name}"
129def _get_simulation_object(simulation_object_id: int) -> SimulationObject:
130 try:
131 return SimulationObject.objects.get(id=simulation_object_id)
132 except SimulationObject.DoesNotExist as exc:
133 raise NotFound({"simulationObject_id": "SimulationObject not found."}) from exc
136def _get_scenario(scenario_id: int) -> Scenario:
137 try:
138 return Scenario.objects.get(id=scenario_id)
139 except Scenario.DoesNotExist as exc:
140 raise NotFound({"scenario_id": "Scenario not found."}) from exc
143def _get_owned_upload_session(upload_session_id: int, user_id: int) -> UploadSession:
144 """Load an upload session and enforce ownership checks for the calling user."""
145 try:
146 upload_session = UploadSession.objects.select_related("scenario", "simulationObject").get(id=upload_session_id)
147 except UploadSession.DoesNotExist as exc:
148 raise NotFound({"upload_session_id": "Upload session not found."}) from exc
150 if upload_session.created_by_id != user_id: 150 ↛ 151line 150 didn't jump to line 151 because the condition on line 150 was never true
151 raise PermissionDenied("You do not own this upload session.")
153 return upload_session
156def initiate_upload_session(
157 *,
158 user,
159 flowsheet_id: int,
160 purpose: str,
161 original_filename: str,
162 content_type: str,
163 size_bytes: int,
164 scenario_id: int | None = None,
165 simulation_object_id: int | None = None,
166) -> MultipartInitiationResult:
167 """Create an upload session and the matching multipart upload in object storage."""
168 if size_bytes <= 0: 168 ↛ 169line 168 didn't jump to line 169 because the condition on line 168 was never true
169 raise ValidationError({"size_bytes": "size_bytes must be greater than zero."})
171 scenario = None
172 simulation_object = None
173 if purpose == UploadSessionPurpose.ML_TRAINING_CSV:
174 if simulation_object_id is None: 174 ↛ 175line 174 didn't jump to line 175 because the condition on line 174 was never true
175 raise ValidationError({"simulationObject_id": "simulationObject_id is required for ML uploads."})
176 simulation_object = _get_simulation_object(simulation_object_id)
177 elif purpose == UploadSessionPurpose.SCENARIO_CSV:
178 if scenario_id is None: 178 ↛ 179line 178 didn't jump to line 179 because the condition on line 178 was never true
179 raise ValidationError({"scenario_id": "scenario_id is required for scenario CSV uploads."})
180 scenario = _get_scenario(scenario_id)
181 if scenario.state_name not in { 181 ↛ 185line 181 didn't jump to line 185 because the condition on line 181 was never true
182 ScenarioTabTypeEnum.MultiSteadyState,
183 ScenarioTabTypeEnum.Dynamic,
184 }:
185 raise ValidationError(
186 {
187 "scenario_id": (
188 "Only multi steady-state and dynamic scenarios support scenario CSV uploads."
189 )
190 }
191 )
192 elif purpose == UploadSessionPurpose.PINCH_UTILITY_CSV: 192 ↛ 200line 192 didn't jump to line 200 because the condition on line 192 was always true
193 if simulation_object_id is not None or scenario_id is not None: 193 ↛ 194line 193 didn't jump to line 194 because the condition on line 193 was never true
194 raise ValidationError(
195 {
196 "purpose": "Pinch utility uploads do not accept scenario_id or simulationObject_id linkage."
197 }
198 )
199 else:
200 raise ValidationError({"purpose": "Unsupported upload purpose."})
202 _validate_flowsheet_linkage(
203 flowsheet_id=flowsheet_id,
204 scenario=scenario,
205 simulation_object=simulation_object,
206 )
208 bucket = s3_storage.get_bucket_name()
209 object_key = _build_object_key(purpose, flowsheet_id, original_filename)
210 upload_id = s3_storage.create_multipart_upload(bucket, object_key, content_type)
211 initial_expires_at = _schedule_csv_expiration(
212 bucket=bucket,
213 object_key=object_key,
214 ttl=SHORT_CSV_LIFECYCLE_TTL,
215 )
217 upload_session = UploadSession.objects.create(
218 created_by=user,
219 flowsheet_id=flowsheet_id,
220 purpose=purpose,
221 scenario=scenario,
222 simulationObject=simulation_object,
223 bucket=bucket,
224 object_key=object_key,
225 s3_upload_id=upload_id,
226 original_filename=_normalise_filename(original_filename),
227 content_type=content_type or "text/csv",
228 size_bytes=size_bytes,
229 status=UploadSessionStatus.INITIATED,
230 expires_at=initial_expires_at,
231 )
233 return MultipartInitiationResult(
234 upload_session_id=upload_session.id,
235 bucket=upload_session.bucket,
236 object_key=upload_session.object_key,
237 s3_upload_id=upload_session.s3_upload_id,
238 part_size_bytes=MULTIPART_PART_SIZE_BYTES,
239 expires_in_seconds=PRESIGNED_PART_URL_EXPIRY_SECONDS,
240 )
243def get_part_urls(*, upload_session_id: int, part_numbers: list[int], user_id: int) -> MultipartPartUrlsResult:
244 """Generate presigned URLs for the requested multipart part numbers."""
245 upload_session = _get_in_progress_upload_session(
246 upload_session_id=upload_session_id,
247 user_id=user_id,
248 )
249 if upload_session.status not in {UploadSessionStatus.INITIATED, UploadSessionStatus.UPLOADING}: 249 ↛ 250line 249 didn't jump to line 250 because the condition on line 249 was never true
250 raise ValidationError({"upload_session_id": "Part URLs can only be requested while an upload is in progress."})
252 if upload_session.status == UploadSessionStatus.INITIATED: 252 ↛ 256line 252 didn't jump to line 256 because the condition on line 252 was always true
253 upload_session.status = UploadSessionStatus.UPLOADING
254 upload_session.save(update_fields=["status"])
256 return MultipartPartUrlsResult(
257 urls={
258 str(part_number): s3_storage.presign_upload_part(
259 upload_session.bucket,
260 upload_session.object_key,
261 upload_session.s3_upload_id,
262 part_number,
263 PRESIGNED_PART_URL_EXPIRY_SECONDS,
264 )
265 for part_number in part_numbers
266 }
267 )
270def get_upload_session(*, upload_session_id: int, user_id: int) -> UploadSession:
271 """Return an owned upload session after reconciling any stale object-storage state."""
272 upload_session = _get_owned_upload_session(upload_session_id, user_id)
273 return _synchronise_upload_session_state(upload_session)
276def get_upload_status(*, upload_session_id: int, user_id: int) -> MultipartUploadStatusResult:
277 """Return the current upload-session status and any uploaded multipart parts."""
278 now = timezone.now()
279 upload_session = _synchronise_upload_session_state(
280 _get_owned_upload_session(upload_session_id, user_id)
281 )
282 if upload_session.status in {
283 UploadSessionStatus.COMPLETED,
284 UploadSessionStatus.ABORTED,
285 UploadSessionStatus.FAILED,
286 UploadSessionStatus.EXPIRED,
287 }:
288 return MultipartUploadStatusResult(
289 upload_session_id=upload_session.id,
290 status=upload_session.status,
291 can_resume=_can_resume_upload_session(upload_session, now=now),
292 expires_at=upload_session.expires_at,
293 uploaded_parts=[],
294 part_size_bytes=MULTIPART_PART_SIZE_BYTES,
295 bucket=upload_session.bucket,
296 object_key=upload_session.object_key,
297 s3_upload_id=upload_session.s3_upload_id,
298 )
300 uploaded_parts = s3_storage.list_uploaded_parts(
301 upload_session.bucket,
302 upload_session.object_key,
303 upload_session.s3_upload_id,
304 )
305 return MultipartUploadStatusResult(
306 upload_session_id=upload_session.id,
307 status=upload_session.status,
308 can_resume=_can_resume_upload_session(upload_session, now=now),
309 expires_at=upload_session.expires_at,
310 uploaded_parts=uploaded_parts,
311 part_size_bytes=MULTIPART_PART_SIZE_BYTES,
312 bucket=upload_session.bucket,
313 object_key=upload_session.object_key,
314 s3_upload_id=upload_session.s3_upload_id,
315 )
318def _validate_completed_parts(parts: list[dict[str, object]]) -> list[CompletedMultipartPart]:
319 """Reject malformed multipart completion payloads before sending them to S3."""
320 if not parts:
321 raise ValidationError({"parts": "At least one uploaded part is required."})
323 seen_part_numbers: set[int] = set()
324 validated_parts: list[CompletedMultipartPart] = []
325 for raw_part in parts:
326 part = CompletedMultipartPart.model_validate(raw_part)
327 part_number = part.part_number
328 etag = part.etag.strip()
329 if part_number in seen_part_numbers:
330 raise ValidationError({"parts": "Duplicate part numbers are not allowed in multipart completion."})
331 if not etag: 331 ↛ 332line 331 didn't jump to line 332 because the condition on line 331 was never true
332 raise ValidationError({"parts": f"Part {part_number} is missing an etag."})
334 seen_part_numbers.add(part_number)
335 validated_parts.append(CompletedMultipartPart(part_number=part_number, etag=etag))
337 return validated_parts
340def complete_upload(*, upload_session_id: int, parts: list[dict[str, object]], user_id: int) -> CompleteUploadResult:
341 """Finalize a multipart upload session and persist any expiry metadata."""
342 upload_session = _get_in_progress_upload_session(
343 upload_session_id=upload_session_id,
344 user_id=user_id,
345 )
346 if upload_session.status == UploadSessionStatus.COMPLETED: 346 ↛ 347line 346 didn't jump to line 347 because the condition on line 346 was never true
347 return CompleteUploadResult(
348 upload_session_id=upload_session.id,
349 bucket=upload_session.bucket,
350 object_key=upload_session.object_key,
351 size_bytes=upload_session.size_bytes,
352 etag=None,
353 location=None,
354 )
356 if upload_session.status not in {UploadSessionStatus.INITIATED, UploadSessionStatus.UPLOADING}: 356 ↛ 357line 356 didn't jump to line 357 because the condition on line 356 was never true
357 raise ValidationError({"upload_session_id": "Only in-progress uploads can be completed."})
359 validated_parts = _validate_completed_parts(parts)
360 completion = s3_storage.complete_multipart_upload(
361 upload_session.bucket,
362 upload_session.object_key,
363 upload_session.s3_upload_id,
364 validated_parts,
365 )
367 upload_session.status = UploadSessionStatus.COMPLETED
368 upload_session.completed_at = timezone.now()
369 upload_session.expires_at = _schedule_csv_expiration(
370 bucket=upload_session.bucket,
371 object_key=upload_session.object_key,
372 ttl=completed_csv_lifecycle_ttl(upload_session.purpose),
373 reference_time=upload_session.completed_at,
374 )
375 upload_session.save(update_fields=["status", "completed_at", "expires_at"])
376 return CompleteUploadResult(
377 upload_session_id=upload_session.id,
378 bucket=completion.bucket,
379 object_key=completion.object_key,
380 size_bytes=upload_session.size_bytes,
381 etag=completion.etag,
382 location=completion.location,
383 )
386def _abort_multipart_upload_best_effort(upload_session: UploadSession) -> None:
387 try:
388 s3_storage.abort_multipart_upload(
389 upload_session.bucket,
390 upload_session.object_key,
391 upload_session.s3_upload_id,
392 )
393 except ClientError:
394 logger.warning(
395 "Failed to abort multipart upload during session reconciliation.",
396 extra={"upload_session_id": upload_session.id},
397 exc_info=True,
398 )
401def _synchronise_upload_session_state(upload_session: UploadSession) -> UploadSession:
402 """Update persisted session state from age-based and object-storage-derived facts."""
403 now = timezone.now()
405 if upload_session.status in {UploadSessionStatus.INITIATED, UploadSessionStatus.UPLOADING}:
406 # The bucket lifecycle eventually removes abandoned parts, but the API must
407 # stop resumptions as soon as the one-hour window has elapsed.
408 if upload_session.expires_at is not None and upload_session.expires_at <= now:
409 _abort_multipart_upload_best_effort(upload_session)
410 upload_session.status = UploadSessionStatus.EXPIRED
411 upload_session.save(update_fields=["status"])
412 return upload_session
414 stale_cutoff = now - timedelta(hours=settings.SEAWEED_STALE_MULTIPART_UPLOAD_HOURS)
415 if upload_session.created_at < stale_cutoff: 415 ↛ 416line 415 didn't jump to line 416 because the condition on line 415 was never true
416 _abort_multipart_upload_best_effort(upload_session)
417 upload_session.status = UploadSessionStatus.ABORTED
418 upload_session.save(update_fields=["status"])
419 return upload_session
421 if (
422 upload_session.status == UploadSessionStatus.COMPLETED
423 and upload_session.expires_at is not None
424 and upload_session.expires_at <= now
425 and not s3_storage.object_exists(upload_session.bucket, upload_session.object_key)
426 ):
427 upload_session.status = UploadSessionStatus.EXPIRED
428 upload_session.save(update_fields=["status"])
430 return upload_session
433def abort_upload(*, upload_session_id: int, user_id: int) -> UploadSession:
434 """Abort an in-progress multipart upload session."""
435 upload_session = _synchronise_upload_session_state(
436 _get_owned_upload_session(upload_session_id, user_id)
437 )
438 if upload_session.status == UploadSessionStatus.ABORTED:
439 return upload_session
441 if upload_session.status not in {UploadSessionStatus.INITIATED, UploadSessionStatus.UPLOADING}:
442 raise ValidationError({"upload_session_id": "Only in-progress uploads can be aborted."})
444 s3_storage.abort_multipart_upload(
445 upload_session.bucket,
446 upload_session.object_key,
447 upload_session.s3_upload_id,
448 )
449 upload_session.status = UploadSessionStatus.ABORTED
450 upload_session.save(update_fields=["status"])
451 return upload_session
454def inspect_upload_session(upload_session: UploadSession) -> CsvInspectionResult:
455 """Inspect an uploaded CSV object and cache its header metadata on the session."""
456 if upload_session.status != UploadSessionStatus.COMPLETED: 456 ↛ 457line 456 didn't jump to line 457 because the condition on line 456 was never true
457 raise ValidationError({"upload_session_id": "The upload session must be completed before CSV inspection can run."})
459 if upload_session.csv_headers and upload_session.csv_delimiter:
460 sample_bytes = s3_storage.get_range(
461 upload_session.bucket,
462 upload_session.object_key,
463 f"0-{DEFAULT_INSPECTION_BYTES - 1}",
464 )
465 inspection = inspect_csv_sample(sample_bytes)
466 return CsvInspectionResult(
467 headers=list(upload_session.csv_headers),
468 delimiter=upload_session.csv_delimiter,
469 preview_rows=inspection.preview_rows,
470 warnings=list(upload_session.csv_warnings or inspection.warnings),
471 )
473 sample_bytes = s3_storage.get_range(
474 upload_session.bucket,
475 upload_session.object_key,
476 f"0-{DEFAULT_INSPECTION_BYTES - 1}",
477 )
478 inspection = inspect_csv_sample(sample_bytes)
480 upload_session.csv_headers = inspection.headers
481 upload_session.csv_delimiter = inspection.delimiter
482 upload_session.csv_warnings = inspection.warnings
483 upload_session.csv_inspected_at = timezone.now()
484 upload_session.save(
485 update_fields=[
486 "csv_headers",
487 "csv_delimiter",
488 "csv_warnings",
489 "csv_inspected_at",
490 ]
491 )
492 return inspection
495def attach_upload_to_ml_model(*, upload_session_id: int, simulation_object_id: int, user_id: int) -> MLModel:
496 """Create or reuse an ML model bound to a completed CSV upload session."""
497 upload_session = _get_owned_upload_session(upload_session_id, user_id)
498 if upload_session.status != UploadSessionStatus.COMPLETED: 498 ↛ 499line 498 didn't jump to line 499 because the condition on line 498 was never true
499 raise ValidationError({"upload_session_id": "The upload session must be completed before it can be attached to an ML model."})
500 if upload_session.purpose != UploadSessionPurpose.ML_TRAINING_CSV: 500 ↛ 501line 500 didn't jump to line 501 because the condition on line 500 was never true
501 raise ValidationError({"upload_session_id": "The upload session purpose must be ml_training_csv."})
503 simulation_object = _get_simulation_object(simulation_object_id)
504 if upload_session.simulationObject_id and upload_session.simulationObject_id != simulation_object.id: 504 ↛ 505line 504 didn't jump to line 505 because the condition on line 504 was never true
505 raise ValidationError({"upload_session_id": "This upload session belongs to a different simulation object."})
506 if upload_session.flowsheet_id != simulation_object.flowsheet_id: 506 ↛ 507line 506 didn't jump to line 507 because the condition on line 506 was never true
507 raise ValidationError({"upload_session_id": "This upload session belongs to a different flowsheet."})
509 existing_model = MLModel.objects.filter(
510 simulationObject=simulation_object,
511 csv_upload_session=upload_session,
512 ).order_by("-id").first()
513 if existing_model is not None:
514 return existing_model
516 inspection = inspect_upload_session(upload_session)
517 if not inspection.preview_rows:
518 raise ValidationError({"upload_session_id": "The uploaded CSV must include at least one data row."})
520 return MLModel.objects.create(
521 flowsheet_id=simulation_object.flowsheet_id,
522 simulationObject=simulation_object,
523 csv_file_name=upload_session.original_filename,
524 csv_bucket=upload_session.bucket,
525 csv_object_key=upload_session.object_key,
526 csv_headers=inspection.headers,
527 csv_delimiter=inspection.delimiter,
528 csv_upload_session=upload_session,
529 surrogate_model={},
530 progress=1,
531 )
534def cleanup_upload_sessions() -> UploadCleanupSummary:
535 """Abort stale multipart uploads that have not been reconciled through normal retrieval."""
536 now = timezone.now()
537 stale_cutoff = now - timedelta(hours=settings.SEAWEED_STALE_MULTIPART_UPLOAD_HOURS)
538 aborted_count = 0
540 stale_uploads = UploadSession.objects.filter(
541 status__in=[UploadSessionStatus.INITIATED, UploadSessionStatus.UPLOADING],
542 created_at__lt=stale_cutoff,
543 )
544 for upload_session in stale_uploads:
545 s3_storage.abort_multipart_upload(
546 upload_session.bucket,
547 upload_session.object_key,
548 upload_session.s3_upload_id,
549 )
550 upload_session.status = UploadSessionStatus.ABORTED
551 upload_session.save(update_fields=["status"])
552 aborted_count += 1
554 return UploadCleanupSummary(
555 aborted_stale_uploads=aborted_count,
556 expired_ml_uploads=0,
557 )