Coverage for backend/django/core/auxiliary/views/UploadSessionViews.py: 89%
207 statements
« prev ^ index » next coverage.py v7.10.7, created at 2026-05-13 02:47 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2026-05-13 02:47 +0000
1import logging
3from django.views.decorators.csrf import csrf_exempt
4from drf_spectacular.utils import OpenApiParameter, OpenApiTypes, extend_schema
5from pydantic import ValidationError as PydanticValidationError
6from rest_framework import serializers, status
7from rest_framework.decorators import api_view, authentication_classes, permission_classes
8from rest_framework.permissions import IsAuthenticated
9from rest_framework.response import Response
11from authentication.custom_drf_authentication import DaprApiTokenAuthentication
12from common.models.scenario_import import ScenarioCsvImportEvent
13from common.models.pinch_import import PinchUtilityCsvImportEvent
14from core.auxiliary.serializers import TaskSerializer
15from core.auxiliary.services.scenario_import import (
16 enqueue_scenario_import,
17 mark_scenario_import_delivery_failure,
18 process_scenario_import,
19)
20from core.auxiliary.services.pinch_import import (
21 enqueue_pinch_utility_import,
22 mark_pinch_utility_import_delivery_failure,
23 process_pinch_utility_import,
24)
25from core.auxiliary.services.uploads import (
26 MULTIPART_PART_SIZE_BYTES,
27 abort_upload,
28 complete_upload,
29 get_upload_session,
30 get_part_urls,
31 get_upload_status,
32 initiate_upload_session,
33 inspect_upload_session,
34)
35from core.validation import api_view_ignore_access_control, api_view_validate
38logger = logging.getLogger(__name__)
41def _parse_scenario_import_event(data) -> ScenarioCsvImportEvent | None:
42 """Best-effort CloudEvent parsing for scenario imports.
44 Dapr retries malformed deliveries aggressively if the app returns an error,
45 so invalid envelopes are logged and acknowledged instead of raising.
46 """
47 try:
48 return ScenarioCsvImportEvent.model_validate(data)
49 except PydanticValidationError:
50 logger.warning(
51 "Discarding malformed scenario CSV import event.",
52 exc_info=True,
53 )
54 return None
57def _parse_pinch_utility_import_event(data) -> PinchUtilityCsvImportEvent | None:
58 """Best-effort CloudEvent parsing for Pinch utility imports.
60 Returning `None` allows the caller to acknowledge malformed messages with a
61 200 so they do not loop indefinitely through pub/sub retries.
62 """
63 try:
64 return PinchUtilityCsvImportEvent.model_validate(data)
65 except PydanticValidationError:
66 logger.warning(
67 "Discarding malformed Pinch utility CSV import event.",
68 exc_info=True,
69 )
70 return None
73class InitiateMultipartUploadSerializer(serializers.Serializer):
74 purpose = serializers.ChoiceField(choices=["ml_training_csv", "scenario_csv", "pinch_utility_csv"])
75 flowsheet_id = serializers.IntegerField()
76 scenario_id = serializers.IntegerField(required=False, allow_null=True)
77 simulationObject_id = serializers.IntegerField(required=False, allow_null=True, source="simulation_object_id")
78 original_filename = serializers.CharField(max_length=255)
79 content_type = serializers.CharField(max_length=255, required=False, allow_blank=True)
80 size_bytes = serializers.IntegerField(min_value=1)
83class InitiateMultipartUploadResponseSerializer(serializers.Serializer):
84 upload_session_id = serializers.IntegerField()
85 bucket = serializers.CharField()
86 object_key = serializers.CharField()
87 s3_upload_id = serializers.CharField()
88 part_size_bytes = serializers.IntegerField()
89 expires_in_seconds = serializers.IntegerField()
92class MultipartPartsSerializer(serializers.Serializer):
93 upload_session_id = serializers.IntegerField()
94 part_numbers = serializers.ListField(child=serializers.IntegerField(min_value=1), allow_empty=False)
97class MultipartPartsResponseSerializer(serializers.Serializer):
98 urls = serializers.DictField(child=serializers.CharField())
101class MultipartStatusSerializer(serializers.Serializer):
102 upload_session_id = serializers.IntegerField()
105class UploadedPartSerializer(serializers.Serializer):
106 part_number = serializers.IntegerField()
107 etag = serializers.CharField()
108 size_bytes = serializers.IntegerField()
111class CompletedPartSerializer(serializers.Serializer):
112 part_number = serializers.IntegerField()
113 etag = serializers.CharField()
116class MultipartStatusResponseSerializer(serializers.Serializer):
117 """Response shape used by the resumable-upload frontend reconciliation flow."""
119 status = serializers.CharField()
120 can_resume = serializers.BooleanField()
121 expires_at = serializers.DateTimeField(required=False, allow_null=True)
122 uploaded_parts = UploadedPartSerializer(many=True)
123 part_size_bytes = serializers.IntegerField()
124 bucket = serializers.CharField()
125 object_key = serializers.CharField()
126 s3_upload_id = serializers.CharField()
129class CompleteMultipartUploadSerializer(serializers.Serializer):
130 upload_session_id = serializers.IntegerField()
131 parts = CompletedPartSerializer(many=True)
134class CompleteMultipartUploadResponseSerializer(serializers.Serializer):
135 bucket = serializers.CharField()
136 object_key = serializers.CharField()
137 size_bytes = serializers.IntegerField()
138 etag = serializers.CharField(required=False, allow_null=True)
141class AbortMultipartUploadSerializer(serializers.Serializer):
142 upload_session_id = serializers.IntegerField()
145class CsvInspectResponseSerializer(serializers.Serializer):
146 headers = serializers.ListField(child=serializers.CharField())
147 detected_delimiter = serializers.CharField()
148 preview_rows = serializers.ListField(child=serializers.DictField(), required=False)
149 warnings = serializers.ListField(child=serializers.CharField(), required=False)
152class ImportScenarioFromUploadSerializer(serializers.Serializer):
153 scenario_id = serializers.IntegerField()
154 upload_session_id = serializers.IntegerField()
157class ImportPinchUtilitiesFromUploadSerializer(serializers.Serializer):
158 project_id = serializers.IntegerField()
159 upload_session_id = serializers.IntegerField()
162class UploadCleanupSummarySerializer(serializers.Serializer):
163 aborted_stale_uploads = serializers.IntegerField()
164 expired_ml_uploads = serializers.IntegerField()
167@api_view_validate
168@extend_schema(
169 request=InitiateMultipartUploadSerializer,
170 responses=InitiateMultipartUploadResponseSerializer,
171)
172@api_view(["POST"])
173def initiate_multipart_upload(request) -> Response:
174 """Create a multipart upload session for a browser-managed CSV upload."""
175 serializer = InitiateMultipartUploadSerializer(data=request.data)
176 serializer.is_valid(raise_exception=True)
177 query_flowsheet_id = int(request.query_params["flowsheet"])
178 if serializer.validated_data["flowsheet_id"] != query_flowsheet_id: 178 ↛ 179line 178 didn't jump to line 179 because the condition on line 178 was never true
179 raise serializers.ValidationError({"flowsheet_id": "flowsheet_id must match the flowsheet query parameter."})
180 result = initiate_upload_session(user=request.user, **serializer.validated_data)
181 return Response(
182 InitiateMultipartUploadResponseSerializer(result.model_dump()).data,
183 status=status.HTTP_200_OK,
184 )
187@api_view_validate
188@extend_schema(request=MultipartPartsSerializer, responses=MultipartPartsResponseSerializer)
189@api_view(["POST"])
190def get_multipart_part_urls(request) -> Response:
191 """Return presigned upload URLs for the requested multipart part numbers."""
192 serializer = MultipartPartsSerializer(data=request.data)
193 serializer.is_valid(raise_exception=True)
194 urls = get_part_urls(user_id=request.user.id, **serializer.validated_data)
195 return Response(
196 MultipartPartsResponseSerializer(urls.model_dump()).data,
197 status=status.HTTP_200_OK,
198 )
201@api_view_validate
202@extend_schema(
203 parameters=[OpenApiParameter(name="upload_session_id", required=True, type=OpenApiTypes.INT)],
204 responses=MultipartStatusResponseSerializer,
205)
206@api_view(["GET"])
207def get_multipart_upload_status(request) -> Response:
208 """Return multipart-upload progress for resume and reconciliation flows."""
209 serializer = MultipartStatusSerializer(data=request.query_params)
210 serializer.is_valid(raise_exception=True)
211 upload_status = get_upload_status(user_id=request.user.id, **serializer.validated_data)
212 return Response(
213 MultipartStatusResponseSerializer(upload_status.model_dump()).data,
214 status=status.HTTP_200_OK,
215 )
218@api_view_validate
219@extend_schema(request=CompleteMultipartUploadSerializer, responses=CompleteMultipartUploadResponseSerializer)
220@api_view(["POST"])
221def complete_multipart_upload(request) -> Response:
222 """Complete a multipart upload after the browser has uploaded all parts."""
223 serializer = CompleteMultipartUploadSerializer(data=request.data)
224 serializer.is_valid(raise_exception=True)
225 completion = complete_upload(user_id=request.user.id, **serializer.validated_data)
226 return Response(
227 CompleteMultipartUploadResponseSerializer(
228 completion.model_dump(exclude={"upload_session_id", "location"})
229 ).data,
230 status=status.HTTP_200_OK,
231 )
234@api_view_validate
235@extend_schema(request=AbortMultipartUploadSerializer, responses=None)
236@api_view(["POST"])
237def abort_multipart_upload(request) -> Response:
238 """Abort an in-progress multipart upload session."""
239 serializer = AbortMultipartUploadSerializer(data=request.data)
240 serializer.is_valid(raise_exception=True)
241 abort_upload(user_id=request.user.id, **serializer.validated_data)
242 return Response(status=status.HTTP_200_OK)
245@api_view_validate
246@extend_schema(
247 parameters=[OpenApiParameter(name="upload_session_id", required=True, type=OpenApiTypes.INT)],
248 responses=CsvInspectResponseSerializer,
249)
250@api_view(["GET"])
251def inspect_uploaded_csv(request) -> Response:
252 """Inspect a completed CSV upload and return typed header/preview metadata."""
253 serializer = MultipartStatusSerializer(data=request.query_params)
254 serializer.is_valid(raise_exception=True)
255 upload_session = get_upload_session(user_id=request.user.id, **serializer.validated_data)
256 inspection = inspect_upload_session(upload_session)
257 return Response(
258 CsvInspectResponseSerializer(
259 {
260 "headers": inspection.headers,
261 "detected_delimiter": inspection.delimiter,
262 "preview_rows": inspection.preview_rows,
263 "warnings": inspection.warnings,
264 }
265 ).data,
266 status=status.HTTP_200_OK,
267 )
270@api_view_validate
271@extend_schema(request=ImportScenarioFromUploadSerializer, responses=TaskSerializer)
272@api_view(["POST"])
273def import_scenario_from_upload(request) -> Response:
274 """Create a task that imports a completed CSV upload into a scenario input table."""
275 serializer = ImportScenarioFromUploadSerializer(data=request.data)
276 serializer.is_valid(raise_exception=True)
277 task = enqueue_scenario_import(user=request.user, **serializer.validated_data)
278 return Response(TaskSerializer(task).data, status=status.HTTP_200_OK)
281@api_view_validate
282@extend_schema(request=ImportPinchUtilitiesFromUploadSerializer, responses=TaskSerializer)
283@api_view(["POST"])
284def import_pinch_utilities_from_upload(request) -> Response:
285 """Create a task that imports a completed CSV upload into a Pinch utility table."""
286 serializer = ImportPinchUtilitiesFromUploadSerializer(data=request.data)
287 serializer.is_valid(raise_exception=True)
288 task = enqueue_pinch_utility_import(user=request.user, **serializer.validated_data)
289 return Response(TaskSerializer(task).data, status=status.HTTP_200_OK)
292@extend_schema(exclude=True)
293@api_view(["POST"])
294@authentication_classes([DaprApiTokenAuthentication])
295@permission_classes([IsAuthenticated])
296@csrf_exempt
297@api_view_ignore_access_control
298def process_scenario_import_event(request) -> Response:
299 """Handle Dapr-delivered scenario CSV import events inside the Django worker process."""
300 scenario_import_event = _parse_scenario_import_event(request.data)
301 if scenario_import_event is None: 301 ↛ 302line 301 didn't jump to line 302 because the condition on line 301 was never true
302 return Response(status=status.HTTP_200_OK)
304 process_scenario_import(scenario_import_event.data)
305 return Response(status=status.HTTP_200_OK)
308@extend_schema(exclude=True)
309@api_view(["POST"])
310@authentication_classes([DaprApiTokenAuthentication])
311@permission_classes([IsAuthenticated])
312@csrf_exempt
313@api_view_ignore_access_control
314def process_scenario_import_error_event(request) -> Response:
315 """Handle dead-lettered scenario CSV import events and fail the queued task.
317 This endpoint is wired to the subscription dead-letter topic so delivery
318 failures become visible in the task stream and import UI instead of leaving
319 the original task stuck in `pending`.
320 """
321 scenario_import_event = _parse_scenario_import_event(request.data)
322 if scenario_import_event is None:
323 return Response(status=status.HTTP_200_OK)
325 mark_scenario_import_delivery_failure(scenario_import_event.data)
326 return Response(status=status.HTTP_200_OK)
329@extend_schema(exclude=True)
330@api_view(["POST"])
331@authentication_classes([DaprApiTokenAuthentication])
332@permission_classes([IsAuthenticated])
333@csrf_exempt
334@api_view_ignore_access_control
335def process_pinch_utility_import_event(request) -> Response:
336 """Handle Dapr-delivered Pinch utility CSV import events inside the Django worker process."""
337 pinch_import_event = _parse_pinch_utility_import_event(request.data)
338 if pinch_import_event is None: 338 ↛ 339line 338 didn't jump to line 339 because the condition on line 338 was never true
339 return Response(status=status.HTTP_200_OK)
341 process_pinch_utility_import(pinch_import_event.data)
342 return Response(status=status.HTTP_200_OK)
345@extend_schema(exclude=True)
346@api_view(["POST"])
347@authentication_classes([DaprApiTokenAuthentication])
348@permission_classes([IsAuthenticated])
349@csrf_exempt
350@api_view_ignore_access_control
351def process_pinch_utility_import_error_event(request) -> Response:
352 """Handle dead-lettered Pinch utility CSV import events and fail the queued task.
354 Dapr forwards undeliverable messages here after the main import callback
355 fails delivery, allowing the queued task to transition cleanly to `failed`.
356 """
357 pinch_import_event = _parse_pinch_utility_import_event(request.data)
358 if pinch_import_event is None:
359 return Response(status=status.HTTP_200_OK)
361 mark_pinch_utility_import_delivery_failure(pinch_import_event.data)
362 return Response(status=status.HTTP_200_OK)