Coverage for backend/django/Economics/studies/services/flowsheet_copy.py: 92%
161 statements
« prev ^ index » next coverage.py v7.10.7, created at 2026-06-23 21:51 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2026-06-23 21:51 +0000
1from __future__ import annotations
3from typing import TypeVar
5from django.db import transaction
6from django.db.models import Model
7from pydantic import BaseModel, ConfigDict
9from core.auxiliary.methods.copy_flowsheet.copy_caching import ModelLookupDict
10from core.auxiliary.models import Flowsheet, PropertyInfo
11from core.auxiliary.models.PropertyValue import PropertyValue
12from Economics.costing.models import (
13 CapitalCostLine,
14 CostCurve,
15 CostDriver,
16 CostableItem,
17 EquipmentMapping,
18 OperatingCostLine,
19)
20from Economics.settings_profiles.models import EconomicsAssumptions, EconomicsBaseline, EconomicsSettingsProfile
21from Economics.formulas.models import EconomicsLineFormula, EconomicsMetricFormula
22from Economics.studies.models import EconomicsStudy
23from flowsheetInternals.unitops.models.SimulationObject import SimulationObject
26ModelT = TypeVar("ModelT", bound=Model)
29class EconomicsCopyContract(BaseModel):
30 model_config = ConfigDict(frozen=True)
33class EconomicsFlowsheetCopyResult(EconomicsCopyContract):
34 """Typed copy summary used by tests and future diagnostics."""
36 studies: int = 0
37 assumptions: int = 0
38 baselines: int = 0
39 settings_profiles: int = 0
40 cost_curves: int = 0
41 costable_items: int = 0
42 cost_drivers: int = 0
43 equipment_mappings: int = 0
44 capital_lines: int = 0
45 operating_lines: int = 0
46 metric_formulas: int = 0
47 line_formulas: int = 0
50class EconomicsFlowsheetCopyError(ValueError):
51 """Raised when an economics reference cannot be remapped during flowsheet copy."""
54class _EconomicsCopyContext:
55 """Strict old-to-new economics remapper for the existing flowsheet copy flow.
57 Core flowsheet copy owns the low-level copy of simulation
58 objects, property rows, and property formulas. This context uses those
59 lookup tables to copy only economics configuration. Historical result runs,
60 result lines, dependencies, and chart datasets are deliberately excluded.
61 """
63 def __init__(
64 self,
65 *,
66 source_flowsheet: Flowsheet,
67 target_flowsheet: Flowsheet,
68 model_lookups: ModelLookupDict,
69 ):
70 self.source_flowsheet = source_flowsheet
71 self.target_flowsheet = target_flowsheet
72 self.model_lookups = model_lookups
73 self.studies: dict[int, EconomicsStudy] = {}
74 self.settings_profiles: dict[int, EconomicsSettingsProfile] = {}
75 self.cost_curves: dict[int, CostCurve] = {}
76 self.costable_items: dict[int, CostableItem] = {}
77 self.capital_lines: dict[int, CapitalCostLine] = {}
78 self.operating_lines: dict[int, OperatingCostLine] = {}
80 def core(self, model_type: type[ModelT], old_pk: int | None, field_name: str) -> ModelT | None:
81 """Resolve a copied core model or fail the copy with a field-specific error."""
83 if old_pk is None:
84 return None
85 model_lookup = self.model_lookups.get(model_type)
86 if model_lookup is None:
87 raise EconomicsFlowsheetCopyError(f"Missing copy lookup for {field_name} ({model_type.__name__}).")
88 copied = model_lookup.get_model(old_pk)
89 if copied is None: 89 ↛ 90line 89 didn't jump to line 90 because the condition on line 89 was never true
90 raise EconomicsFlowsheetCopyError(f"Could not remap economics {field_name} with source id {old_pk}.")
91 return copied
93 def economics(self, mapping: dict[int, ModelT], old_pk: int | None, field_name: str) -> ModelT | None:
94 if old_pk is None:
95 return None
96 copied = mapping.get(old_pk)
97 if copied is None: 97 ↛ 98line 97 didn't jump to line 98 because the condition on line 97 was never true
98 raise EconomicsFlowsheetCopyError(f"Could not remap economics {field_name} with source id {old_pk}.")
99 return copied
102def copy_economics_configuration_for_flowsheet(
103 *,
104 source_flowsheet: Flowsheet,
105 target_flowsheet: Flowsheet,
106 model_lookups: ModelLookupDict,
107) -> EconomicsFlowsheetCopyResult:
108 """Copy economics configuration into a copied flowsheet.
110 The operation is intentionally all-or-fail. Every simulation object,
111 property, study, costable item, and curve reference must resolve to the row
112 created for the new flowsheet. Result snapshots are not copied because
113 copied studies must be recalculated from the copied flowsheet state.
114 """
116 with transaction.atomic():
117 context = _EconomicsCopyContext(
118 source_flowsheet=source_flowsheet,
119 target_flowsheet=target_flowsheet,
120 model_lookups=model_lookups,
121 )
122 counts = {
123 "cost_curves": _copy_cost_curves(context),
124 "settings_profiles": _copy_settings_profiles(context),
125 "studies": _copy_studies(context),
126 "assumptions": _copy_assumptions(context),
127 "baselines": _copy_baselines(context),
128 "costable_items": _copy_costable_items(context),
129 "cost_drivers": _copy_cost_drivers(context),
130 "equipment_mappings": _copy_equipment_mappings(context),
131 "capital_lines": _copy_capital_lines(context),
132 "operating_lines": _copy_operating_lines(context),
133 "metric_formulas": _copy_metric_formulas(context),
134 "line_formulas": _copy_line_formulas(context),
135 }
136 return EconomicsFlowsheetCopyResult(**counts)
139def _copy_cost_curves(context: _EconomicsCopyContext) -> int:
140 count = 0
141 for curve in CostCurve.objects.filter(flowsheet=context.source_flowsheet).order_by("created_at", "pk"):
142 copied = _save_copy(
143 CostCurve(
144 flowsheet=context.target_flowsheet,
145 curve_key=curve.curve_key,
146 name=curve.name,
147 equipment_category=curve.equipment_category,
148 equipment_subtype=curve.equipment_subtype,
149 cost_basis=curve.cost_basis,
150 evaluation_kind=curve.evaluation_kind,
151 output_unit=curve.output_unit,
152 expression_text=curve.expression_text,
153 required_driver_specs=curve.required_driver_specs,
154 discrete_variants=curve.discrete_variants,
155 valid_min=curve.valid_min,
156 valid_max=curve.valid_max,
157 valid_range_note=curve.valid_range_note,
158 currency=curve.currency,
159 basis_date=curve.basis_date,
160 basis_index_name=curve.basis_index_name,
161 basis_index_value=curve.basis_index_value,
162 source_document_title=curve.source_document_title,
163 source_page=curve.source_page,
164 source_figure=curve.source_figure,
165 source_data_origin=curve.source_data_origin,
166 source_range_precision=curve.source_range_precision,
167 source_license_status=curve.source_license_status,
168 source_reference=curve.source_reference,
169 source_note=curve.source_note,
170 applicability_warning=curve.applicability_warning,
171 active=curve.active,
172 )
173 )
174 context.cost_curves[curve.pk] = copied
175 count += 1
176 return count
179def _copy_settings_profiles(context: _EconomicsCopyContext) -> int:
180 count = 0
181 for profile in EconomicsSettingsProfile.objects.filter(flowsheet=context.source_flowsheet).order_by("created_at", "pk"): 181 ↛ 182line 181 didn't jump to line 182 because the loop on line 181 never started
182 copied = _save_copy(
183 EconomicsSettingsProfile(
184 flowsheet=context.target_flowsheet,
185 name=profile.name,
186 is_default=profile.is_default,
187 currency=profile.currency,
188 location=profile.location,
189 basis_date=profile.basis_date,
190 discount_rate_percent=profile.discount_rate_percent,
191 project_lifetime_years=profile.project_lifetime_years,
192 inflation_method=profile.inflation_method,
193 annual_operating_hours=profile.annual_operating_hours,
194 tax_rate_percent=profile.tax_rate_percent,
195 depreciation_enabled=profile.depreciation_enabled,
196 default_depreciation_life_years=profile.default_depreciation_life_years,
197 default_depreciation_salvage_percent=profile.default_depreciation_salvage_percent,
198 contingency_percent=profile.contingency_percent,
199 electrical_upgrade_rate_amount=profile.electrical_upgrade_rate_amount,
200 default_lang_factor=profile.default_lang_factor,
201 capital_index_series=profile.capital_index_series,
202 operating_index_series=profile.operating_index_series,
203 default_rate_overrides=profile.default_rate_overrides,
204 manual_capex=profile.manual_capex,
205 manual_annual_opex=profile.manual_annual_opex,
206 annual_heat_basis_mode=profile.annual_heat_basis_mode,
207 manual_annual_heat_basis=profile.manual_annual_heat_basis,
208 manual_annual_heat_basis_unit=profile.manual_annual_heat_basis_unit,
209 average_power_input=profile.average_power_input,
210 average_power_unit=profile.average_power_unit,
211 residual_value=profile.residual_value,
212 notes=profile.notes,
213 baseline_notes=profile.baseline_notes,
214 )
215 )
216 context.settings_profiles[profile.pk] = copied
217 count += 1
218 return count
221def _copy_studies(context: _EconomicsCopyContext) -> int:
222 count = 0
223 for study in EconomicsStudy.objects.filter(flowsheet=context.source_flowsheet).order_by("created_at", "pk"):
224 copied = _save_copy(
225 EconomicsStudy(
226 flowsheet=context.target_flowsheet,
227 settings_profile=context.economics(
228 context.settings_profiles,
229 study.settings_profile_id,
230 "study.settings_profile",
231 ),
232 name=study.name,
233 description=study.description,
234 )
235 )
236 context.studies[study.pk] = copied
237 count += 1
238 return count
241def _copy_assumptions(context: _EconomicsCopyContext) -> int:
242 count = 0
243 for assumptions in EconomicsAssumptions.objects.filter(flowsheet=context.source_flowsheet).order_by("created_at", "pk"):
244 _save_copy(
245 EconomicsAssumptions(
246 flowsheet=context.target_flowsheet,
247 study=context.economics(context.studies, assumptions.study_id, "assumptions.study"),
248 currency=assumptions.currency,
249 location=assumptions.location,
250 basis_date=assumptions.basis_date,
251 discount_rate_percent=assumptions.discount_rate_percent,
252 project_lifetime_years=assumptions.project_lifetime_years,
253 inflation_method=assumptions.inflation_method,
254 annual_operating_hours=assumptions.annual_operating_hours,
255 tax_rate_percent=assumptions.tax_rate_percent,
256 depreciation_enabled=assumptions.depreciation_enabled,
257 default_depreciation_life_years=assumptions.default_depreciation_life_years,
258 default_depreciation_salvage_percent=assumptions.default_depreciation_salvage_percent,
259 contingency_percent=assumptions.contingency_percent,
260 electrical_upgrade_rate_amount=assumptions.electrical_upgrade_rate_amount,
261 default_lang_factor=assumptions.default_lang_factor,
262 capital_index_series=assumptions.capital_index_series,
263 operating_index_series=assumptions.operating_index_series,
264 default_rate_overrides=assumptions.default_rate_overrides,
265 notes=assumptions.notes,
266 )
267 )
268 count += 1
269 return count
272def _copy_baselines(context: _EconomicsCopyContext) -> int:
273 count = 0
274 for baseline in EconomicsBaseline.objects.filter(flowsheet=context.source_flowsheet).order_by("created_at", "pk"):
275 _save_copy(
276 EconomicsBaseline(
277 flowsheet=context.target_flowsheet,
278 study=context.economics(context.studies, baseline.study_id, "baseline.study"),
279 manual_capex=baseline.manual_capex,
280 manual_annual_opex=baseline.manual_annual_opex,
281 annual_heat_basis_mode=baseline.annual_heat_basis_mode,
282 manual_annual_heat_basis=baseline.manual_annual_heat_basis,
283 manual_annual_heat_basis_unit=baseline.manual_annual_heat_basis_unit,
284 average_power_input=baseline.average_power_input,
285 average_power_unit=baseline.average_power_unit,
286 residual_value=baseline.residual_value,
287 notes=baseline.notes,
288 )
289 )
290 count += 1
291 return count
294def _copy_costable_items(context: _EconomicsCopyContext) -> int:
295 count = 0
296 for item in CostableItem.objects.filter(flowsheet=context.source_flowsheet).order_by("created_at", "pk"):
297 copied = _save_copy(
298 CostableItem(
299 flowsheet=context.target_flowsheet,
300 study=context.economics(context.studies, item.study_id, "costable_item.study"),
301 item_type=item.item_type,
302 simulation_object=context.core(SimulationObject, item.simulation_object_id, "costable_item.simulation_object"),
303 name=item.name,
304 included=item.included,
305 manual=item.manual,
306 notes=item.notes,
307 )
308 )
309 context.costable_items[item.pk] = copied
310 count += 1
311 return count
314def _copy_cost_drivers(context: _EconomicsCopyContext) -> int:
315 count = 0
316 for driver in CostDriver.objects.filter(flowsheet=context.source_flowsheet).order_by("created_at", "pk"):
317 _save_copy(
318 CostDriver(
319 flowsheet=context.target_flowsheet,
320 costable_item=context.economics(context.costable_items, driver.costable_item_id, "cost_driver.costable_item"),
321 source=driver.source,
322 property_info=context.core(PropertyInfo, driver.property_info_id, "cost_driver.property_info"),
323 manual_property_info=context.core(PropertyInfo, driver.manual_property_info_id, "cost_driver.manual_property_info"),
324 sizing_mode=driver.sizing_mode,
325 canonical_unit=driver.canonical_unit,
326 design_value=driver.design_value,
327 unresolved_reason_code=driver.unresolved_reason_code,
328 warning_payload=driver.warning_payload,
329 )
330 )
331 count += 1
332 return count
335def _copy_equipment_mappings(context: _EconomicsCopyContext) -> int:
336 count = 0
337 for mapping in EquipmentMapping.objects.filter(flowsheet=context.source_flowsheet).order_by("created_at", "pk"):
338 _save_copy(
339 EquipmentMapping(
340 flowsheet=context.target_flowsheet,
341 costable_item=context.economics(context.costable_items, mapping.costable_item_id, "equipment_mapping.costable_item"),
342 cost_curve=context.economics(context.cost_curves, mapping.cost_curve_id, "equipment_mapping.cost_curve"),
343 equipment_category=mapping.equipment_category,
344 equipment_subtype=mapping.equipment_subtype,
345 cost_basis=mapping.cost_basis,
346 install_factor_profile=mapping.install_factor_profile,
347 install_factor=mapping.install_factor,
348 use_study_lang_factor=mapping.use_study_lang_factor,
349 applicability_notes=mapping.applicability_notes,
350 )
351 )
352 count += 1
353 return count
356def _copy_capital_lines(context: _EconomicsCopyContext) -> int:
357 count = 0
358 for line in CapitalCostLine.objects.filter(flowsheet=context.source_flowsheet).order_by("created_at", "pk"):
359 copied = _save_copy(
360 CapitalCostLine(
361 flowsheet=context.target_flowsheet,
362 study=context.economics(context.studies, line.study_id, "capital_line.study"),
363 costable_item=context.economics(context.costable_items, line.costable_item_id, "capital_line.costable_item"),
364 cost_curve=context.economics(context.cost_curves, line.cost_curve_id, "capital_line.cost_curve"),
365 label=line.label,
366 line_type=line.line_type,
367 calculation_basis=line.calculation_basis,
368 amount=line.amount,
369 basis_percent=line.basis_percent,
370 depreciation_mode=line.depreciation_mode,
371 depreciation_life_years=line.depreciation_life_years,
372 depreciation_salvage_percent=line.depreciation_salvage_percent,
373 peak_demand_kw=line.peak_demand_kw,
374 minimum_peak_demand_kw=line.minimum_peak_demand_kw,
375 currency=line.currency,
376 included=line.included,
377 manual=line.manual,
378 source=line.source,
379 confidence=line.confidence,
380 warning_payload=line.warning_payload,
381 driver_inputs=_remap_capital_line_driver_inputs(context, line.driver_inputs),
382 )
383 )
384 context.capital_lines[line.pk] = copied
385 count += 1
386 return count
389def _remap_capital_line_driver_inputs(context: _EconomicsCopyContext, driver_inputs) -> dict:
390 if not isinstance(driver_inputs, dict): 390 ↛ 391line 390 didn't jump to line 391 because the condition on line 390 was never true
391 return {}
392 remapped = {}
393 property_lookup = context.model_lookups.get(PropertyInfo)
394 for key, raw_input in driver_inputs.items():
395 if not isinstance(raw_input, dict): 395 ↛ 396line 395 didn't jump to line 396 because the condition on line 395 was never true
396 continue
397 next_input = dict(raw_input)
398 if raw_input.get("source") == "property": 398 ↛ 405line 398 didn't jump to line 405 because the condition on line 398 was always true
399 copied_property = property_lookup.get_model(raw_input.get("property_info")) if property_lookup else None
400 if copied_property is None: 400 ↛ 401line 400 didn't jump to line 401 because the condition on line 400 was never true
401 next_input["source"] = ""
402 next_input["property_info"] = None
403 else:
404 next_input["property_info"] = copied_property.pk
405 remapped[str(key)] = next_input
406 return remapped
409def _copy_operating_lines(context: _EconomicsCopyContext) -> int:
410 count = 0
411 for line in OperatingCostLine.objects.filter(flowsheet=context.source_flowsheet).order_by("created_at", "pk"):
412 copied = _save_copy(
413 OperatingCostLine(
414 flowsheet=context.target_flowsheet,
415 study=context.economics(context.studies, line.study_id, "operating_line.study"),
416 costable_item=context.economics(context.costable_items, line.costable_item_id, "operating_line.costable_item"),
417 label=line.label,
418 line_type=line.line_type,
419 category=line.category,
420 economic_effect=line.economic_effect,
421 currency=line.currency,
422 basis_quantity=line.basis_quantity,
423 basis_unit=line.basis_unit,
424 basis_quantity_source=line.basis_quantity_source,
425 rate_amount=line.rate_amount,
426 rate_unit=line.rate_unit,
427 rate_type=line.rate_type,
428 rate_source_mode=line.rate_source_mode,
429 calculation_method=line.calculation_method,
430 source_property_info=context.core(
431 PropertyInfo,
432 line.source_property_info_id,
433 "operating_line.source_property_info",
434 ),
435 source_default_rate=line.source_default_rate,
436 outlet_stream_disposition=line.outlet_stream_disposition,
437 included=line.included,
438 manual=line.manual,
439 source=line.source,
440 warning_payload=line.warning_payload,
441 )
442 )
443 context.operating_lines[line.pk] = copied
444 count += 1
445 return count
448def _copy_metric_formulas(context: _EconomicsCopyContext) -> int:
449 count = 0
450 for formula in EconomicsMetricFormula.objects.filter(flowsheet=context.source_flowsheet).order_by("created_at", "pk"):
451 _save_copy(
452 EconomicsMetricFormula(
453 flowsheet=context.target_flowsheet,
454 study=context.economics(context.studies, formula.study_id, "metric_formula.study"),
455 property_value=context.core(PropertyValue, formula.property_value_id, "metric_formula.property_value"),
456 metric_key=formula.metric_key,
457 formula_key=formula.formula_key,
458 formula=formula.formula,
459 property_formula=formula.property_formula,
460 unit=formula.unit,
461 value=formula.value,
462 status=formula.status,
463 formula_audit=formula.formula_audit,
464 blocked_reason=formula.blocked_reason,
465 )
466 )
467 count += 1
468 return count
471def _copy_line_formulas(context: _EconomicsCopyContext) -> int:
472 count = 0
473 for formula in EconomicsLineFormula.objects.filter(flowsheet=context.source_flowsheet).order_by("created_at", "pk"):
474 _save_copy(
475 EconomicsLineFormula(
476 flowsheet=context.target_flowsheet,
477 study=context.economics(context.studies, formula.study_id, "line_formula.study"),
478 property_value=context.core(PropertyValue, formula.property_value_id, "line_formula.property_value"),
479 capital_line=context.economics(context.capital_lines, formula.capital_line_id, "line_formula.capital_line"),
480 operating_line=context.economics(context.operating_lines, formula.operating_line_id, "line_formula.operating_line"),
481 line_key=formula.line_key,
482 formula_key=formula.formula_key,
483 formula=formula.formula,
484 property_formula=formula.property_formula,
485 unit=formula.unit,
486 value=formula.value,
487 status=formula.status,
488 formula_audit=formula.formula_audit,
489 blocked_reason=formula.blocked_reason,
490 )
491 )
492 count += 1
493 return count
496def _save_copy(instance: ModelT) -> ModelT:
497 instance.save()
498 return instance