Coverage for backend/django/core/auxiliary/views/UploadSessionViews.py: 89%

207 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2026-05-13 02:47 +0000

1import logging 

2 

3from django.views.decorators.csrf import csrf_exempt 

4from drf_spectacular.utils import OpenApiParameter, OpenApiTypes, extend_schema 

5from pydantic import ValidationError as PydanticValidationError 

6from rest_framework import serializers, status 

7from rest_framework.decorators import api_view, authentication_classes, permission_classes 

8from rest_framework.permissions import IsAuthenticated 

9from rest_framework.response import Response 

10 

11from authentication.custom_drf_authentication import DaprApiTokenAuthentication 

12from common.models.scenario_import import ScenarioCsvImportEvent 

13from common.models.pinch_import import PinchUtilityCsvImportEvent 

14from core.auxiliary.serializers import TaskSerializer 

15from core.auxiliary.services.scenario_import import ( 

16 enqueue_scenario_import, 

17 mark_scenario_import_delivery_failure, 

18 process_scenario_import, 

19) 

20from core.auxiliary.services.pinch_import import ( 

21 enqueue_pinch_utility_import, 

22 mark_pinch_utility_import_delivery_failure, 

23 process_pinch_utility_import, 

24) 

25from core.auxiliary.services.uploads import ( 

26 MULTIPART_PART_SIZE_BYTES, 

27 abort_upload, 

28 complete_upload, 

29 get_upload_session, 

30 get_part_urls, 

31 get_upload_status, 

32 initiate_upload_session, 

33 inspect_upload_session, 

34) 

35from core.validation import api_view_ignore_access_control, api_view_validate 

36 

37 

38logger = logging.getLogger(__name__) 

39 

40 

41def _parse_scenario_import_event(data) -> ScenarioCsvImportEvent | None: 

42 """Best-effort CloudEvent parsing for scenario imports. 

43 

44 Dapr retries malformed deliveries aggressively if the app returns an error, 

45 so invalid envelopes are logged and acknowledged instead of raising. 

46 """ 

47 try: 

48 return ScenarioCsvImportEvent.model_validate(data) 

49 except PydanticValidationError: 

50 logger.warning( 

51 "Discarding malformed scenario CSV import event.", 

52 exc_info=True, 

53 ) 

54 return None 

55 

56 

57def _parse_pinch_utility_import_event(data) -> PinchUtilityCsvImportEvent | None: 

58 """Best-effort CloudEvent parsing for Pinch utility imports. 

59 

60 Returning `None` allows the caller to acknowledge malformed messages with a 

61 200 so they do not loop indefinitely through pub/sub retries. 

62 """ 

63 try: 

64 return PinchUtilityCsvImportEvent.model_validate(data) 

65 except PydanticValidationError: 

66 logger.warning( 

67 "Discarding malformed Pinch utility CSV import event.", 

68 exc_info=True, 

69 ) 

70 return None 

71 

72 

73class InitiateMultipartUploadSerializer(serializers.Serializer): 

74 purpose = serializers.ChoiceField(choices=["ml_training_csv", "scenario_csv", "pinch_utility_csv"]) 

75 flowsheet_id = serializers.IntegerField() 

76 scenario_id = serializers.IntegerField(required=False, allow_null=True) 

77 simulationObject_id = serializers.IntegerField(required=False, allow_null=True, source="simulation_object_id") 

78 original_filename = serializers.CharField(max_length=255) 

79 content_type = serializers.CharField(max_length=255, required=False, allow_blank=True) 

80 size_bytes = serializers.IntegerField(min_value=1) 

81 

82 

83class InitiateMultipartUploadResponseSerializer(serializers.Serializer): 

84 upload_session_id = serializers.IntegerField() 

85 bucket = serializers.CharField() 

86 object_key = serializers.CharField() 

87 s3_upload_id = serializers.CharField() 

88 part_size_bytes = serializers.IntegerField() 

89 expires_in_seconds = serializers.IntegerField() 

90 

91 

92class MultipartPartsSerializer(serializers.Serializer): 

93 upload_session_id = serializers.IntegerField() 

94 part_numbers = serializers.ListField(child=serializers.IntegerField(min_value=1), allow_empty=False) 

95 

96 

97class MultipartPartsResponseSerializer(serializers.Serializer): 

98 urls = serializers.DictField(child=serializers.CharField()) 

99 

100 

101class MultipartStatusSerializer(serializers.Serializer): 

102 upload_session_id = serializers.IntegerField() 

103 

104 

105class UploadedPartSerializer(serializers.Serializer): 

106 part_number = serializers.IntegerField() 

107 etag = serializers.CharField() 

108 size_bytes = serializers.IntegerField() 

109 

110 

111class CompletedPartSerializer(serializers.Serializer): 

112 part_number = serializers.IntegerField() 

113 etag = serializers.CharField() 

114 

115 

116class MultipartStatusResponseSerializer(serializers.Serializer): 

117 """Response shape used by the resumable-upload frontend reconciliation flow.""" 

118 

119 status = serializers.CharField() 

120 can_resume = serializers.BooleanField() 

121 expires_at = serializers.DateTimeField(required=False, allow_null=True) 

122 uploaded_parts = UploadedPartSerializer(many=True) 

123 part_size_bytes = serializers.IntegerField() 

124 bucket = serializers.CharField() 

125 object_key = serializers.CharField() 

126 s3_upload_id = serializers.CharField() 

127 

128 

129class CompleteMultipartUploadSerializer(serializers.Serializer): 

130 upload_session_id = serializers.IntegerField() 

131 parts = CompletedPartSerializer(many=True) 

132 

133 

134class CompleteMultipartUploadResponseSerializer(serializers.Serializer): 

135 bucket = serializers.CharField() 

136 object_key = serializers.CharField() 

137 size_bytes = serializers.IntegerField() 

138 etag = serializers.CharField(required=False, allow_null=True) 

139 

140 

141class AbortMultipartUploadSerializer(serializers.Serializer): 

142 upload_session_id = serializers.IntegerField() 

143 

144 

145class CsvInspectResponseSerializer(serializers.Serializer): 

146 headers = serializers.ListField(child=serializers.CharField()) 

147 detected_delimiter = serializers.CharField() 

148 preview_rows = serializers.ListField(child=serializers.DictField(), required=False) 

149 warnings = serializers.ListField(child=serializers.CharField(), required=False) 

150 

151 

152class ImportScenarioFromUploadSerializer(serializers.Serializer): 

153 scenario_id = serializers.IntegerField() 

154 upload_session_id = serializers.IntegerField() 

155 

156 

157class ImportPinchUtilitiesFromUploadSerializer(serializers.Serializer): 

158 project_id = serializers.IntegerField() 

159 upload_session_id = serializers.IntegerField() 

160 

161 

162class UploadCleanupSummarySerializer(serializers.Serializer): 

163 aborted_stale_uploads = serializers.IntegerField() 

164 expired_ml_uploads = serializers.IntegerField() 

165 

166 

167@api_view_validate 

168@extend_schema( 

169 request=InitiateMultipartUploadSerializer, 

170 responses=InitiateMultipartUploadResponseSerializer, 

171) 

172@api_view(["POST"]) 

173def initiate_multipart_upload(request) -> Response: 

174 """Create a multipart upload session for a browser-managed CSV upload.""" 

175 serializer = InitiateMultipartUploadSerializer(data=request.data) 

176 serializer.is_valid(raise_exception=True) 

177 query_flowsheet_id = int(request.query_params["flowsheet"]) 

178 if serializer.validated_data["flowsheet_id"] != query_flowsheet_id: 178 ↛ 179line 178 didn't jump to line 179 because the condition on line 178 was never true

179 raise serializers.ValidationError({"flowsheet_id": "flowsheet_id must match the flowsheet query parameter."}) 

180 result = initiate_upload_session(user=request.user, **serializer.validated_data) 

181 return Response( 

182 InitiateMultipartUploadResponseSerializer(result.model_dump()).data, 

183 status=status.HTTP_200_OK, 

184 ) 

185 

186 

187@api_view_validate 

188@extend_schema(request=MultipartPartsSerializer, responses=MultipartPartsResponseSerializer) 

189@api_view(["POST"]) 

190def get_multipart_part_urls(request) -> Response: 

191 """Return presigned upload URLs for the requested multipart part numbers.""" 

192 serializer = MultipartPartsSerializer(data=request.data) 

193 serializer.is_valid(raise_exception=True) 

194 urls = get_part_urls(user_id=request.user.id, **serializer.validated_data) 

195 return Response( 

196 MultipartPartsResponseSerializer(urls.model_dump()).data, 

197 status=status.HTTP_200_OK, 

198 ) 

199 

200 

201@api_view_validate 

202@extend_schema( 

203 parameters=[OpenApiParameter(name="upload_session_id", required=True, type=OpenApiTypes.INT)], 

204 responses=MultipartStatusResponseSerializer, 

205) 

206@api_view(["GET"]) 

207def get_multipart_upload_status(request) -> Response: 

208 """Return multipart-upload progress for resume and reconciliation flows.""" 

209 serializer = MultipartStatusSerializer(data=request.query_params) 

210 serializer.is_valid(raise_exception=True) 

211 upload_status = get_upload_status(user_id=request.user.id, **serializer.validated_data) 

212 return Response( 

213 MultipartStatusResponseSerializer(upload_status.model_dump()).data, 

214 status=status.HTTP_200_OK, 

215 ) 

216 

217 

218@api_view_validate 

219@extend_schema(request=CompleteMultipartUploadSerializer, responses=CompleteMultipartUploadResponseSerializer) 

220@api_view(["POST"]) 

221def complete_multipart_upload(request) -> Response: 

222 """Complete a multipart upload after the browser has uploaded all parts.""" 

223 serializer = CompleteMultipartUploadSerializer(data=request.data) 

224 serializer.is_valid(raise_exception=True) 

225 completion = complete_upload(user_id=request.user.id, **serializer.validated_data) 

226 return Response( 

227 CompleteMultipartUploadResponseSerializer( 

228 completion.model_dump(exclude={"upload_session_id", "location"}) 

229 ).data, 

230 status=status.HTTP_200_OK, 

231 ) 

232 

233 

234@api_view_validate 

235@extend_schema(request=AbortMultipartUploadSerializer, responses=None) 

236@api_view(["POST"]) 

237def abort_multipart_upload(request) -> Response: 

238 """Abort an in-progress multipart upload session.""" 

239 serializer = AbortMultipartUploadSerializer(data=request.data) 

240 serializer.is_valid(raise_exception=True) 

241 abort_upload(user_id=request.user.id, **serializer.validated_data) 

242 return Response(status=status.HTTP_200_OK) 

243 

244 

245@api_view_validate 

246@extend_schema( 

247 parameters=[OpenApiParameter(name="upload_session_id", required=True, type=OpenApiTypes.INT)], 

248 responses=CsvInspectResponseSerializer, 

249) 

250@api_view(["GET"]) 

251def inspect_uploaded_csv(request) -> Response: 

252 """Inspect a completed CSV upload and return typed header/preview metadata.""" 

253 serializer = MultipartStatusSerializer(data=request.query_params) 

254 serializer.is_valid(raise_exception=True) 

255 upload_session = get_upload_session(user_id=request.user.id, **serializer.validated_data) 

256 inspection = inspect_upload_session(upload_session) 

257 return Response( 

258 CsvInspectResponseSerializer( 

259 { 

260 "headers": inspection.headers, 

261 "detected_delimiter": inspection.delimiter, 

262 "preview_rows": inspection.preview_rows, 

263 "warnings": inspection.warnings, 

264 } 

265 ).data, 

266 status=status.HTTP_200_OK, 

267 ) 

268 

269 

270@api_view_validate 

271@extend_schema(request=ImportScenarioFromUploadSerializer, responses=TaskSerializer) 

272@api_view(["POST"]) 

273def import_scenario_from_upload(request) -> Response: 

274 """Create a task that imports a completed CSV upload into a scenario input table.""" 

275 serializer = ImportScenarioFromUploadSerializer(data=request.data) 

276 serializer.is_valid(raise_exception=True) 

277 task = enqueue_scenario_import(user=request.user, **serializer.validated_data) 

278 return Response(TaskSerializer(task).data, status=status.HTTP_200_OK) 

279 

280 

281@api_view_validate 

282@extend_schema(request=ImportPinchUtilitiesFromUploadSerializer, responses=TaskSerializer) 

283@api_view(["POST"]) 

284def import_pinch_utilities_from_upload(request) -> Response: 

285 """Create a task that imports a completed CSV upload into a Pinch utility table.""" 

286 serializer = ImportPinchUtilitiesFromUploadSerializer(data=request.data) 

287 serializer.is_valid(raise_exception=True) 

288 task = enqueue_pinch_utility_import(user=request.user, **serializer.validated_data) 

289 return Response(TaskSerializer(task).data, status=status.HTTP_200_OK) 

290 

291 

292@extend_schema(exclude=True) 

293@api_view(["POST"]) 

294@authentication_classes([DaprApiTokenAuthentication]) 

295@permission_classes([IsAuthenticated]) 

296@csrf_exempt 

297@api_view_ignore_access_control 

298def process_scenario_import_event(request) -> Response: 

299 """Handle Dapr-delivered scenario CSV import events inside the Django worker process.""" 

300 scenario_import_event = _parse_scenario_import_event(request.data) 

301 if scenario_import_event is None: 301 ↛ 302line 301 didn't jump to line 302 because the condition on line 301 was never true

302 return Response(status=status.HTTP_200_OK) 

303 

304 process_scenario_import(scenario_import_event.data) 

305 return Response(status=status.HTTP_200_OK) 

306 

307 

308@extend_schema(exclude=True) 

309@api_view(["POST"]) 

310@authentication_classes([DaprApiTokenAuthentication]) 

311@permission_classes([IsAuthenticated]) 

312@csrf_exempt 

313@api_view_ignore_access_control 

314def process_scenario_import_error_event(request) -> Response: 

315 """Handle dead-lettered scenario CSV import events and fail the queued task. 

316 

317 This endpoint is wired to the subscription dead-letter topic so delivery 

318 failures become visible in the task stream and import UI instead of leaving 

319 the original task stuck in `pending`. 

320 """ 

321 scenario_import_event = _parse_scenario_import_event(request.data) 

322 if scenario_import_event is None: 

323 return Response(status=status.HTTP_200_OK) 

324 

325 mark_scenario_import_delivery_failure(scenario_import_event.data) 

326 return Response(status=status.HTTP_200_OK) 

327 

328 

329@extend_schema(exclude=True) 

330@api_view(["POST"]) 

331@authentication_classes([DaprApiTokenAuthentication]) 

332@permission_classes([IsAuthenticated]) 

333@csrf_exempt 

334@api_view_ignore_access_control 

335def process_pinch_utility_import_event(request) -> Response: 

336 """Handle Dapr-delivered Pinch utility CSV import events inside the Django worker process.""" 

337 pinch_import_event = _parse_pinch_utility_import_event(request.data) 

338 if pinch_import_event is None: 338 ↛ 339line 338 didn't jump to line 339 because the condition on line 338 was never true

339 return Response(status=status.HTTP_200_OK) 

340 

341 process_pinch_utility_import(pinch_import_event.data) 

342 return Response(status=status.HTTP_200_OK) 

343 

344 

345@extend_schema(exclude=True) 

346@api_view(["POST"]) 

347@authentication_classes([DaprApiTokenAuthentication]) 

348@permission_classes([IsAuthenticated]) 

349@csrf_exempt 

350@api_view_ignore_access_control 

351def process_pinch_utility_import_error_event(request) -> Response: 

352 """Handle dead-lettered Pinch utility CSV import events and fail the queued task. 

353 

354 Dapr forwards undeliverable messages here after the main import callback 

355 fails delivery, allowing the queued task to transition cleanly to `failed`. 

356 """ 

357 pinch_import_event = _parse_pinch_utility_import_event(request.data) 

358 if pinch_import_event is None: 

359 return Response(status=status.HTTP_200_OK) 

360 

361 mark_pinch_utility_import_delivery_failure(pinch_import_event.data) 

362 return Response(status=status.HTTP_200_OK)