Coverage for backend/flowsheetInternals/unitops/viewsets/PortViewSet.py: 57%

160 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-11-06 23:27 +0000

1from drf_spectacular.utils import extend_schema, OpenApiParameter, OpenApiTypes 

2from core.viewset import ModelViewSet 

3from flowsheetInternals.unitops.models.Port import Port 

4from flowsheetInternals.unitops.serializers.PortSerializer import PortSerializer 

5from rest_framework.response import Response 

6from rest_framework import serializers 

7import traceback 

8from rest_framework.decorators import action 

9from django.db import transaction 

10 

11from core.auxiliary.models.PropertyInfo import PropertyInfo 

12from core.auxiliary.models.PropertySet import PropertySet 

13 

14from flowsheetInternals.unitops.models.SimulationObject import SimulationObject 

15from flowsheetInternals.unitops.models.simulation_object_factory import SimulationObjectFactory 

16from flowsheetInternals.graphicData.models.graphicObjectModel import GraphicObject 

17from common.config_types import * 

18from ..config.config_methods import * 

19from flowsheetInternals.unitops.models.delete_factory import DeleteFactory 

20 

21class MergeStreamsSerializer(serializers.Serializer): 

22 stream1 = serializers.IntegerField(required=True) # material stream 

23 stream2 = serializers.IntegerField(required=True) # material stream 

24 

25class SplitStreamSerializer(serializers.Serializer): 

26 stream = serializers.IntegerField(required=True) 

27 

28class AddStreamSerializer(serializers.Serializer): 

29 port = serializers.IntegerField(required=True) 

30 

31class CreateDNSerializer(serializers.Serializer): 

32 stream = serializers.IntegerField(required=True) 

33 

34class ConvertToDNSerializer(serializers.Serializer): 

35 stream = serializers.IntegerField(required=True) 

36 

37class RestoreConnectionsSerializer(serializers.Serializer): 

38 connections = serializers.ListField( 

39 child=serializers.DictField(child=serializers.JSONField()), 

40 required=True 

41 ) 

42 

43class PortViewSet(ModelViewSet): 

44 serializer_class = PortSerializer 

45 

46 def get_queryset(self): 

47 queryset = Port.objects.all() 

48 queryset = queryset.filter(unitOp__is_deleted=False) 

49 return queryset 

50 

51 @extend_schema( 

52 parameters=[ 

53 OpenApiParameter(name="flowsheet", required=True, type=OpenApiTypes.INT), 

54 ] 

55 ) 

56 def list(self, request): 

57 return super().list(request) 

58 

59 

60 def error_response(self, e): 

61 tb_info = traceback.format_exc() 

62 error_message = str(e) 

63 response_data = {'status': 'error', 'message': error_message, 'traceback': tb_info} 

64 return Response(response_data, status=400) 

65 

66 

67 @extend_schema(request=MergeStreamsSerializer, responses=None) 

68 @action(methods=['post'], detail=False, url_path='merge-streams') 

69 def merge_streams(self, request): 

70 """ 

71 Connects two material streams together, managing the connection/disconnection of UnitOperations. 

72 Logic: keep the outlet stream and keep the stream 2 position. 

73 For Product/Feed -> Intermediary, Keep stream 1&2, create new outlet stream 

74 """ 

75 try: 

76 serializer = MergeStreamsSerializer(data=request.data) 

77 serializer.is_valid(raise_exception=True) 

78 validated_data = serializer.validated_data 

79 

80 ids = [validated_data.get('stream1'), validated_data.get('stream2')] 

81 

82 # Fetch both streams in a single query with related data 

83 streams = SimulationObject.objects.filter(id__in=ids).select_related( 

84 "flowsheet" 

85 ).prefetch_related( 

86 "connectedPorts__unitOp", 

87 "graphicObject__group" 

88 ) 

89 

90 stream_map = {stream.id: stream for stream in streams} 

91 stream1 = stream_map[ids[0]] 

92 stream2 = stream_map[ids[1]] 

93 

94 # Wrap the complex merge operation in a transaction to reduce serialization conflicts 

95 with transaction.atomic(): 

96 stream1.merge_stream(stream2) 

97 

98 return Response({'status': 'success'}, status=200) 

99 except Exception as e: 

100 return self.error_response(e) 

101 

102 

103 

104 @extend_schema(request=SplitStreamSerializer, responses=None) 

105 @action(methods=['post'], detail=False, url_path='split-stream') 

106 def split_stream(self, request): 

107 """ 

108 Splits a stream into two separate streams (one inlet and one outlet - disconnected). 

109 """ 

110 try: 

111 serializer = SplitStreamSerializer(data=request.data) 

112 serializer.is_valid(raise_exception=True) 

113 validated_data = serializer.validated_data 

114 

115 stream = SimulationObject.objects.get(id=validated_data.get('stream')) 

116 stream.split_stream() 

117 return Response({'status': 'success'}, status=200) 

118 except Exception as e: 

119 return self.error_response(e) 

120 

121 

122 

123 @extend_schema(request=AddStreamSerializer, responses=None) 

124 @action(methods=['post'], detail=False, url_path='add-stream') 

125 def add_stream(self, request) -> Response: 

126 """ 

127 Creates a new stream for this port (previously with no stream attached). 

128 """ 

129 try: 

130 port = Port.objects.get(id=request.data.get('port')) 

131 SimulationObjectFactory.create_stream_at_port(port) 

132 return Response({'status': 'success'}, status=200) 

133 

134 except Exception as e: 

135 return self.error_response(e) 

136 

137 

138 

139 @extend_schema(request=ConvertToDNSerializer, responses=None ) 

140 @action(methods=['post'], detail=False, url_path='convert-to-dn') 

141 def convert_to_dn(self, request) -> Response: 

142 """ 

143 Turns an Intermediary stream into a decision node 

144 """ 

145 try: 

146 serializer = CreateDNSerializer(data=request.data) 

147 serializer.is_valid(raise_exception=True) 

148 validated_data = serializer.validated_data 

149 

150 # Get stream and outlets 

151 id = validated_data.get('stream', None) 

152 stream = SimulationObject.objects.get(id=id) 

153 stream_port = stream.connectedPorts.first() 

154 

155 num_inlets, num_outlets = 1, 1 

156 if stream.connectedPorts.count() <= 1: 156 ↛ 163line 156 didn't jump to line 163 because the condition on line 156 was always true

157 if stream_port.direction == ConType.Inlet: 157 ↛ 160line 157 didn't jump to line 160 because the condition on line 157 was always true

158 num_inlets, num_outlets = 0, 1 

159 else: 

160 num_inlets, num_outlets = 1, 0 

161 

162 # Create decision node 

163 stream.make_decision_node(num_inlets=num_inlets, num_outlets=num_outlets) 

164 

165 return Response({'status': 'success'}, status=200) 

166 except Exception as e: 

167 return self.error_response(e) 

168 

169 

170 def destroy(self, request, *args, **kwargs): 

171 """ 

172 Override the destroy method to allow for deleting attached streams and updating port indexes. 

173 """ 

174 try: 

175 port: Port = self.get_object() 

176 stream = port.stream 

177 

178 port.reindex_port_on_delete() 

179 port.unitOp.update_height() 

180 

181 if stream and stream.connectedPorts.count() == 0: 

182 # If the stream has no connected ports, delete it 

183 DeleteFactory.delete_object(stream) 

184 

185 return Response({'status': 'success'}, status=204) 

186 except Exception as e: 

187 return self.error_response(e) 

188 

189 @extend_schema(request=RestoreConnectionsSerializer, responses=None) 

190 @action(methods=['post'], detail=False, url_path='restore-connections') 

191 def restore_connections(self, request): 

192 """ 

193 Enhanced restoration with detailed error tracking and dependency handling. 

194 Restores port connections during undo operations without creating recycle tears. 

195 This endpoint bypasses the normal merge_stream logic that can create recycle blocks. 

196 """ 

197 try: 

198 with transaction.atomic(): # Ensure all-or-nothing operation 

199 serializer = RestoreConnectionsSerializer(data=request.data) 

200 serializer.is_valid(raise_exception=True) 

201 validated_data = serializer.validated_data 

202 

203 connections = validated_data.get('connections', []) 

204 

205 # Bulk-fetch DB objects to reduce queries 

206 port_ids = {c.get('portId') for c in connections if c.get('portId') is not None} 

207 stream_ids = {c.get('streamId') for c in connections if c.get('streamId') is not None} 

208 

209 # Fetch all ports and streams in single queries 

210 port_map = Port.objects.filter(id__in=port_ids).select_related('unitOp__flowsheet').in_bulk() 

211 stream_map = ( 

212 SimulationObject.objects.filter(id__in=stream_ids) 

213 .select_related('flowsheet') 

214 .in_bulk() 

215 ) 

216 

217 updated_ports = [] 

218 restored_count = 0 

219 failed_connections = [] 

220 skipped_connections = [] 

221 

222 for connection in connections: 

223 port_id = connection.get('portId') 

224 stream_id = connection.get('streamId') 

225 

226 if port_id is None: 

227 skipped_connections.append({ 

228 'portId': port_id, 

229 'streamId': stream_id, 

230 'reason': 'Invalid port ID' 

231 }) 

232 continue 

233 

234 port = port_map.get(port_id) 

235 if not port: 

236 failed_connections.append({ 

237 'portId': port_id, 

238 'streamId': stream_id, 

239 'error': f'Port {port_id} not found', 

240 'retry_recommended': False # Port truly doesn't exist 

241 }) 

242 continue 

243 

244 # Handle disconnection 

245 if stream_id is None: 

246 port.stream = None 

247 updated_ports.append(port) 

248 restored_count += 1 

249 continue 

250 

251 # Handle connection 

252 stream = stream_map.get(stream_id) 

253 if not stream: 

254 failed_connections.append({ 

255 'portId': port_id, 

256 'streamId': stream_id, 

257 'error': f'Stream {stream_id} not found', 

258 'retry_recommended': True 

259 }) 

260 continue 

261 

262 # Ensure same flowsheet (security check) 

263 if port.unitOp.flowsheet_id != stream.flowsheet_id: 

264 failed_connections.append({ 

265 'portId': port_id, 

266 'streamId': stream_id, 

267 'error': 'Port and Stream belong to different flowsheets', 

268 'retry_recommended': False 

269 }) 

270 continue 

271 

272 port.stream = stream 

273 updated_ports.append(port) 

274 restored_count += 1 

275 

276 # Bulk update all ports in single query 

277 if updated_ports: 

278 Port.objects.bulk_update(updated_ports, ['stream']) 

279 

280 response_data = { 

281 'status': 'success' if restored_count else 'failed', 

282 'restored_count': restored_count, 

283 'total_connections': len(connections), 

284 'failed_connections': failed_connections, 

285 'skipped_connections': skipped_connections, 

286 'retry_candidates': [f for f in failed_connections if f.get('retry_recommended')] 

287 } 

288 

289 return Response(response_data, status=200) 

290 

291 except Exception as e: 

292 return self.error_response(e)