Coverage for backend/django/Economics/costing/cost_curves/viewsets.py: 100%

77 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2026-06-23 21:51 +0000

1from django.db import transaction 

2from drf_spectacular.utils import OpenApiParameter, extend_schema 

3from rest_framework import status 

4from rest_framework.decorators import action 

5from rest_framework.response import Response 

6 

7from core.viewset import ModelViewSet 

8from Economics.costing.models import CostCurve 

9from Economics.costing.cost_curves.catalog import cost_curve_equipment_categories 

10from Economics.costing.cost_curves.serializers import ( 

11 CostCurveAuthoringSerializer, 

12 CostCurveEquipmentCategorySerializer, 

13 CostCurveSerializer, 

14) 

15from Economics.costing.capital.generated_lines import _sync_generated_capital_lines 

16from Economics.shared.access import _require_write_access 

17from Economics.studies.api_mutations import _mark_study_stale 

18from Economics.studies.models import EconomicsStudy 

19 

20 

21class CostCurveViewSet(ModelViewSet): 

22 serializer_class = CostCurveSerializer 

23 

24 def get_serializer_class(self): 

25 if self.action in {"create", "update", "partial_update"}: 

26 return CostCurveAuthoringSerializer 

27 return CostCurveSerializer 

28 

29 @extend_schema( 

30 parameters=[ 

31 OpenApiParameter("flowsheet", int, OpenApiParameter.QUERY, required=True), 

32 OpenApiParameter("active", bool, OpenApiParameter.QUERY, required=False), 

33 ], 

34 responses=CostCurveSerializer(many=True), 

35 ) 

36 def list(self, request, *args, **kwargs): 

37 return super().list(request, *args, **kwargs) 

38 

39 @extend_schema(request=CostCurveAuthoringSerializer, responses={status.HTTP_201_CREATED: CostCurveSerializer}) 

40 def create(self, request, *args, **kwargs): 

41 _require_write_access(request.user) 

42 response = super().create(request, *args, **kwargs) 

43 instance = CostCurve.objects.get(pk=response.data["id"]) 

44 response.data = CostCurveSerializer(instance, context=self.get_serializer_context()).data 

45 return response 

46 

47 @extend_schema(request=CostCurveAuthoringSerializer, responses=CostCurveSerializer) 

48 def update(self, request, *args, **kwargs): 

49 _require_write_access(request.user) 

50 with transaction.atomic(): 

51 response = super().update(request, *args, **kwargs) 

52 instance = self.get_object() 

53 _refresh_dependent_studies_for_curve(instance) 

54 response.data = CostCurveSerializer(instance, context=self.get_serializer_context()).data 

55 return response 

56 

57 @extend_schema(request=CostCurveAuthoringSerializer(partial=True), responses=CostCurveSerializer) 

58 def partial_update(self, request, *args, **kwargs): 

59 _require_write_access(request.user) 

60 with transaction.atomic(): 

61 response = super().partial_update(request, *args, **kwargs) 

62 instance = self.get_object() 

63 _refresh_dependent_studies_for_curve(instance) 

64 response.data = CostCurveSerializer(instance, context=self.get_serializer_context()).data 

65 return response 

66 

67 def destroy(self, request, *args, **kwargs): 

68 _require_write_access(request.user) 

69 instance = self.get_object() 

70 affected_study_ids = set( 

71 instance.equipment_mappings.values_list( 

72 "costable_item__study_id", flat=True 

73 ) 

74 ) 

75 affected_study_ids.update( 

76 instance.capital_lines.values_list("study_id", flat=True) 

77 ) 

78 affected_studies = list( 

79 EconomicsStudy.objects.filter(pk__in=affected_study_ids) 

80 ) 

81 with transaction.atomic(): 

82 response = super().destroy(request, *args, **kwargs) 

83 for study in affected_studies: 

84 _sync_generated_capital_lines(study) 

85 _mark_study_stale(study, reason="cost_curve_changed") 

86 return response 

87 

88 @extend_schema( 

89 parameters=[ 

90 OpenApiParameter("flowsheet", int, OpenApiParameter.QUERY, required=True), 

91 OpenApiParameter("active", bool, OpenApiParameter.QUERY, required=False), 

92 ], 

93 responses=CostCurveEquipmentCategorySerializer(many=True), 

94 ) 

95 @action(detail=False, methods=["get"], url_path="equipment-options") 

96 def equipment_options(self, request): 

97 options = cost_curve_equipment_categories(self.get_queryset()) 

98 serializer = CostCurveEquipmentCategorySerializer(options, many=True) 

99 return Response(serializer.data) 

100 

101 def get_queryset(self): 

102 queryset = CostCurve.objects.all() 

103 active = self.request.query_params.get("active") 

104 return queryset.filter(active=active.lower() == "true") if active is not None else queryset 

105 

106 

107def _dependent_studies_for_curve(curve: CostCurve) -> list[EconomicsStudy]: 

108 affected_study_ids = set( 

109 curve.equipment_mappings.values_list( 

110 "costable_item__study_id", flat=True 

111 ) 

112 ) 

113 affected_study_ids.update(curve.capital_lines.values_list("study_id", flat=True)) 

114 return list(EconomicsStudy.objects.filter(pk__in=affected_study_ids).order_by("pk")) 

115 

116 

117def _refresh_dependent_studies_for_curve(curve: CostCurve) -> None: 

118 for study in _dependent_studies_for_curve(curve): 

119 _sync_generated_capital_lines(study) 

120 _mark_study_stale(study, reason="cost_curve_changed")