Coverage for backend/flowsheetInternals/graphicData/viewsets/GroupingViewSet.py: 29%

202 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-11-06 23:27 +0000

1import traceback 

2from core.viewset import ModelViewSet 

3from rest_framework import serializers 

4from rest_framework.response import Response 

5from flowsheetInternals.graphicData.models.groupingModel import Breadcrumbs, Connection, Grouping, GraphicObject, AbstractionType 

6from flowsheetInternals.graphicData.serializers.groupingSerializer import GroupingSerializer 

7from drf_spectacular.utils import extend_schema, OpenApiParameter, OpenApiTypes 

8from rest_framework.decorators import action 

9from flowsheetInternals.unitops.models.SimulationObject import SimulationObject 

10from core.auxiliary.models import PropertyInfo # Import at top of file 

11from flowsheetInternals.unitops.models.delete_factory import DeleteFactory 

12from pydantic import RootModel 

13from typing import List 

14from flowsheetInternals.graphicData.logic.make_group import make_group 

15from flowsheetInternals.graphicData.logic.ungroup import ungroup 

16from core.auxiliary.models.Flowsheet import Flowsheet 

17class MakeGroupSerializer(serializers.Serializer): 

18 containedObjects = serializers.ListField(child=serializers.IntegerField()) 

19 

20class DeleteSelectedObjects(serializers.Serializer): 

21 containedObjects = serializers.ListField(child=serializers.IntegerField()) 

22 

23class SelectionRectangleSerializer(serializers.Serializer): 

24 containedObjects = serializers.ListField(child=serializers.IntegerField()) 

25 deltaX = serializers.FloatField() 

26 deltaY = serializers.FloatField() 

27 

28class UngroupSerializer(serializers.Serializer): 

29 parentGroup = serializers.IntegerField() 

30 

31# Workaround for DRF Spectacular not supporting lists: https://github.com/tfranzel/drf-spectacular/issues/1232 

32class BreadcrumbsList(RootModel): 

33 root: List[Breadcrumbs] 

34 

35class GetConnectionsList(RootModel): 

36 root: List[Connection] 

37 

38class CustomGroupingSerializer(serializers.Serializer): 

39 group = serializers.CharField(required=True) 

40 componentName = serializers.CharField(required=True) 

41 abstractionType = serializers.CharField(required=False, allow_blank=True) 

42 

43class GroupingViewSet(ModelViewSet): 

44 serializer_class = GroupingSerializer 

45 

46 def get_queryset(self): 

47 queryset = Grouping.objects.all().prefetch_related("propertyInfos") 

48 flowsheetId = self.request.query_params.get("flowsheet") 

49 if flowsheetId is not None: 

50 queryset = queryset.filter(simulationObject__flowsheet=flowsheetId) 

51 return queryset 

52 

53 @extend_schema( 

54 parameters=[ 

55 OpenApiParameter(name="flowsheet", required=True, type=OpenApiTypes.INT), 

56 ] 

57 ) 

58 

59 def list(self, request): 

60 return super().list(request) 

61 

62 def create(self, request): 

63 flowsheetId = request.data.get("_flowsheet") 

64 if flowsheetId is None: 

65 return Response({"error": "flowsheet is required"}, status=400) 

66 

67 # create the object 

68 serializer = GroupingSerializer(data=request.data) 

69 

70 serializer.is_valid(raise_exception=True) 

71 serializer.save() 

72 return Response(serializer.data, status=201) 

73 

74 @extend_schema(request=CustomGroupingSerializer, responses=None) 

75 @action(detail=False, methods=['post'], url_path='create-custom-group') 

76 def create_custom_group(self, request): 

77 serializer = CustomGroupingSerializer(data=request.data) 

78 serializer.is_valid(raise_exception=True) 

79 flowsheetId = request.query_params.get("flowsheet") 

80 parentGroupName = serializer.validated_data.get("group") 

81 abstractionType = serializer.validated_data.get("abstractionType", AbstractionType.Zone) 

82 group = Grouping.create( 

83 flowsheet=Flowsheet.objects.get(pk=flowsheetId), 

84 group=Grouping.objects.get(simulationObject__componentName=parentGroupName), 

85 componentName=serializer.validated_data.get("componentName"), 

86 visible=False 

87 ) 

88 

89 group.abstractionType = abstractionType 

90 group.save() 

91 

92 return Response(GroupingSerializer(group).data, status=201) 

93 

94 def partial_update(self, request, pk=None): 

95 # This means that offsets for x and y are required 

96 offsetX = request.data.pop('offsetX', None) 

97 offsetY = request.data.pop('offsetY', None) 

98 x = request.data.pop('x', None) 

99 y = request.data.pop('y', None) 

100 width = request.data.pop('width', None) 

101 height = request.data.pop('height', None) 

102 containedObjects = request.data.pop('containedObjects', None) 

103 propertyInfos = request.data.pop('propertyInfos', None) 

104 abstractionType = request.data.pop('abstractionType', None) 

105 

106 try: 

107 instance: Grouping = self.get_object() 

108 except Grouping.DoesNotExist: 

109 return Response({"error": "Object not found"}, status=404) 

110 

111 # Handle containedObjects if present 

112 if containedObjects is not None: 

113 instance.containedObjects.set(SimulationObject.objects.filter(pk__in=containedObjects)) 

114 instance.save() 

115 

116 # Handle propertyInfos if present 

117 if propertyInfos is not None: 

118 instance.propertyInfos.set(PropertyInfo.objects.filter(pk__in=propertyInfos)) 

119 instance.save() 

120 

121 if abstractionType is not None: 

122 instance.abstractionType = abstractionType 

123 instance.save() 

124 

125 # Handle the rest of the update 

126 serializer = GroupingSerializer(instance, data=request.data, partial=True) 

127 if not serializer.is_valid(): 

128 return Response(serializer.errors, status=400) 

129 serializer.save() 

130 

131 # Set x and y positions 

132 gObj = instance.get_graphic_object() 

133 graphic_objects_to_update = [gObj] 

134 if x is not None and y is not None: 

135 if containedObjects is None: 

136 # didn't update the contained objects, just moved the group so set offsetX and offsetY to the difference between graphic object positions 

137 offsetX = float(x) - float(gObj.x) 

138 offsetY = float(y) - float(gObj.y) 

139 else: 

140 gObj.x = x 

141 gObj.y = y 

142 if width is not None and height is not None: 

143 gObj.width = width 

144 gObj.height = height 

145 if offsetX is not None and offsetY is not None: 

146 gObj.x = str(float(offsetX) + float(gObj.x)) 

147 gObj.y = str(float(offsetY) + float(gObj.y)) 

148 GraphicObject.objects.bulk_update(graphic_objects_to_update, ["x", "y", "width", "height"]) 

149 

150 return Response(serializer.data, status=206) 

151 

152 

153 def destroy(self, request, *args, **kwargs): 

154 """ 

155 override the destroy method to also delete all the unit operations and material streams 

156 """ 

157 instance: Grouping = self.get_object() 

158 instance.clear_group() 

159 return Response(status=200) 

160 

161 

162 

163 @extend_schema(request=MakeGroupSerializer, responses=None) 

164 @action(detail=False, methods=['post'], url_path='make-group') 

165 def make_group(self, request): 

166 try: 

167 serializer = MakeGroupSerializer(data=request.data) 

168 serializer.is_valid(raise_exception=True) 

169 validated_data = serializer.validated_data 

170 contained_objects_id = validated_data.get("containedObjects") 

171 make_group(contained_objects_id) 

172 

173 return Response({'status': 'success'}, status=200) 

174 except Exception as e: 

175 print(traceback.format_exc()) 

176 return Response({'status': 'error', 'message': str(e)}, status=400) 

177 

178 

179 

180 @extend_schema(request=SelectionRectangleSerializer, responses=None) 

181 @action(detail=False, methods=['post'], url_path='move-selection') 

182 def move_selection(self, request): 

183 try: 

184 serializer = SelectionRectangleSerializer(data=request.data) 

185 serializer.is_valid(raise_exception=True) 

186 validated_data = serializer.validated_data 

187 

188 contained_objects_id = validated_data.get("containedObjects") 

189 graphic_objects = GraphicObject.objects.filter( 

190 simulationObject__id__in=contained_objects_id, 

191 simulationObject__in=SimulationObject.objects.all() 

192 ) 

193 

194 deltaX = validated_data.get("deltaX") 

195 deltaY = validated_data.get("deltaY") 

196 

197 for graphic_object in graphic_objects: 

198 graphic_object.x = float(graphic_object.x) + deltaX 

199 graphic_object.y = float(graphic_object.y) + deltaY 

200 

201 GraphicObject.objects.bulk_update(graphic_objects, ["x", "y"]) 

202 

203 return Response({'status': 'success'}, status=200) 

204 except Exception as e: 

205 return Response({'status': 'error', 'message': str(e)}, status=400) 

206 

207 

208 

209 @extend_schema(request=DeleteSelectedObjects, responses=None) 

210 @action(detail=False, methods=['post'], url_path='delete-selected-objects') 

211 def delete_selected_objects(self, request): 

212 try: 

213 serializer = DeleteSelectedObjects(data=request.data) 

214 serializer.is_valid(raise_exception=True) 

215 validated_data = serializer.validated_data 

216 

217 contained_objects_id = validated_data.get("containedObjects") 

218 DeleteFactory.delete_multiple_objects(SimulationObject.objects.filter(pk__in=contained_objects_id)) 

219 

220 return Response({'status': 'success'}, status=200) 

221 except Exception as e: 

222 return Response({'status': 'error', 'message': str(e)}, status=400) 

223 

224 #ungoruop method is passed in the id of the parent group ansd all objects within this gorup have thiert parent group set to none assigned  

225 

226 @extend_schema(request=UngroupSerializer, responses=None) 

227 @action(detail=False, methods=['post'], url_path='ungroup') 

228 def ungroup(self, request): 

229 try: 

230 serializer = UngroupSerializer(data=request.data) 

231 serializer.is_valid(raise_exception=True) 

232 validated_data = serializer.validated_data 

233 group_id = validated_data.get("parentGroup") 

234 group = Grouping.objects.get(pk=group_id) 

235 ungroup(group) 

236 

237 return Response({'status': 'success'}, status=200) 

238 except Exception as e: 

239 traceback.print_exc() 

240 return Response({'status': 'error', 'message': str(e)}, status=400) 

241 

242 

243 

244 @extend_schema( 

245 request=None, 

246 responses=BreadcrumbsList, 

247 parameters=[ 

248 OpenApiParameter(name="group", required=True, type=OpenApiTypes.INT), 

249 ]) 

250 @action(detail=False, methods=['get'], url_path='breadcrumbs') 

251 def breadcrumbs(self, request): 

252 """ 

253 This method will return the bread crumbs for the group that the user is currently in 

254 """ 

255 try: 

256 group_id = request.query_params.get("group") 

257 # If group_id is -1, return empty breadcrumbs array 

258 # -1 is the id of the root group 

259 if group_id == "-1": 

260 return Response([], status=200) 

261 group = Grouping.objects.get(pk=group_id) 

262 breadcrumbs = group.get_breadcrumbs_trail() 

263 breadcrumbs_data = [crumb.model_dump() for crumb in breadcrumbs] 

264 return Response(breadcrumbs_data, status=200) 

265 except Exception as e: 

266 return Response({'status': 'error', 'message': str(e)}, status=400) 

267 

268 

269 

270 @extend_schema( 

271 request=None, 

272 responses=GetConnectionsList, 

273 parameters=[ 

274 OpenApiParameter(name="group", required=True, type=OpenApiTypes.INT), 

275 ]) 

276 @action(detail=False, methods=['get'], url_path='connections') 

277 def get_connections(self, request): 

278 """ 

279 This method will return the connections for the group that the user is currently in 

280 """ 

281 try: 

282 group_id = request.query_params.get("group") 

283 group = Grouping.objects.get(pk=group_id) 

284 connections = group.get_connections() 

285 connections_data = [connection.model_dump() for connection in connections] 

286 return Response(connections_data, status=200) 

287 except Exception as e: 

288 return Response({'status': 'error', 'message': str(e)}, status=400) 

289 

290 @extend_schema( 

291 request=None, 

292 responses=List[OpenApiTypes.STR], 

293 ) 

294 @action(detail=False, methods=['get'], url_path='zones') 

295 def zones(self, request): 

296 """ 

297 This method will return the group simulation object name for the flowsheet 

298 """ 

299 try: 

300 groups = Grouping.objects.filter(simulationObject__is_deleted=False) 

301 zones = [group.simulationObject.componentName for group in groups if group.simulationObject] 

302 return Response(zones, status=200) 

303 except Exception as e: 

304 return Response({'status': 'error', 'message': str(e)}, status=400)