Coverage for backend/django/Economics/costing/cost_curves/viewsets.py: 100%
77 statements
« prev ^ index » next coverage.py v7.10.7, created at 2026-06-23 21:51 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2026-06-23 21:51 +0000
1from django.db import transaction
2from drf_spectacular.utils import OpenApiParameter, extend_schema
3from rest_framework import status
4from rest_framework.decorators import action
5from rest_framework.response import Response
7from core.viewset import ModelViewSet
8from Economics.costing.models import CostCurve
9from Economics.costing.cost_curves.catalog import cost_curve_equipment_categories
10from Economics.costing.cost_curves.serializers import (
11 CostCurveAuthoringSerializer,
12 CostCurveEquipmentCategorySerializer,
13 CostCurveSerializer,
14)
15from Economics.costing.capital.generated_lines import _sync_generated_capital_lines
16from Economics.shared.access import _require_write_access
17from Economics.studies.api_mutations import _mark_study_stale
18from Economics.studies.models import EconomicsStudy
21class CostCurveViewSet(ModelViewSet):
22 serializer_class = CostCurveSerializer
24 def get_serializer_class(self):
25 if self.action in {"create", "update", "partial_update"}:
26 return CostCurveAuthoringSerializer
27 return CostCurveSerializer
29 @extend_schema(
30 parameters=[
31 OpenApiParameter("flowsheet", int, OpenApiParameter.QUERY, required=True),
32 OpenApiParameter("active", bool, OpenApiParameter.QUERY, required=False),
33 ],
34 responses=CostCurveSerializer(many=True),
35 )
36 def list(self, request, *args, **kwargs):
37 return super().list(request, *args, **kwargs)
39 @extend_schema(request=CostCurveAuthoringSerializer, responses={status.HTTP_201_CREATED: CostCurveSerializer})
40 def create(self, request, *args, **kwargs):
41 _require_write_access(request.user)
42 response = super().create(request, *args, **kwargs)
43 instance = CostCurve.objects.get(pk=response.data["id"])
44 response.data = CostCurveSerializer(instance, context=self.get_serializer_context()).data
45 return response
47 @extend_schema(request=CostCurveAuthoringSerializer, responses=CostCurveSerializer)
48 def update(self, request, *args, **kwargs):
49 _require_write_access(request.user)
50 with transaction.atomic():
51 response = super().update(request, *args, **kwargs)
52 instance = self.get_object()
53 _refresh_dependent_studies_for_curve(instance)
54 response.data = CostCurveSerializer(instance, context=self.get_serializer_context()).data
55 return response
57 @extend_schema(request=CostCurveAuthoringSerializer(partial=True), responses=CostCurveSerializer)
58 def partial_update(self, request, *args, **kwargs):
59 _require_write_access(request.user)
60 with transaction.atomic():
61 response = super().partial_update(request, *args, **kwargs)
62 instance = self.get_object()
63 _refresh_dependent_studies_for_curve(instance)
64 response.data = CostCurveSerializer(instance, context=self.get_serializer_context()).data
65 return response
67 def destroy(self, request, *args, **kwargs):
68 _require_write_access(request.user)
69 instance = self.get_object()
70 affected_study_ids = set(
71 instance.equipment_mappings.values_list(
72 "costable_item__study_id", flat=True
73 )
74 )
75 affected_study_ids.update(
76 instance.capital_lines.values_list("study_id", flat=True)
77 )
78 affected_studies = list(
79 EconomicsStudy.objects.filter(pk__in=affected_study_ids)
80 )
81 with transaction.atomic():
82 response = super().destroy(request, *args, **kwargs)
83 for study in affected_studies:
84 _sync_generated_capital_lines(study)
85 _mark_study_stale(study, reason="cost_curve_changed")
86 return response
88 @extend_schema(
89 parameters=[
90 OpenApiParameter("flowsheet", int, OpenApiParameter.QUERY, required=True),
91 OpenApiParameter("active", bool, OpenApiParameter.QUERY, required=False),
92 ],
93 responses=CostCurveEquipmentCategorySerializer(many=True),
94 )
95 @action(detail=False, methods=["get"], url_path="equipment-options")
96 def equipment_options(self, request):
97 options = cost_curve_equipment_categories(self.get_queryset())
98 serializer = CostCurveEquipmentCategorySerializer(options, many=True)
99 return Response(serializer.data)
101 def get_queryset(self):
102 queryset = CostCurve.objects.all()
103 active = self.request.query_params.get("active")
104 return queryset.filter(active=active.lower() == "true") if active is not None else queryset
107def _dependent_studies_for_curve(curve: CostCurve) -> list[EconomicsStudy]:
108 affected_study_ids = set(
109 curve.equipment_mappings.values_list(
110 "costable_item__study_id", flat=True
111 )
112 )
113 affected_study_ids.update(curve.capital_lines.values_list("study_id", flat=True))
114 return list(EconomicsStudy.objects.filter(pk__in=affected_study_ids).order_by("pk"))
117def _refresh_dependent_studies_for_curve(curve: CostCurve) -> None:
118 for study in _dependent_studies_for_curve(curve):
119 _sync_generated_capital_lines(study)
120 _mark_study_stale(study, reason="cost_curve_changed")