Coverage for backend/django/core/auxiliary/methods/export_scenario_data.py: 81%
48 statements
« prev ^ index » next coverage.py v7.10.7, created at 2026-05-13 02:47 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2026-05-13 02:47 +0000
1"""Legacy scenario export helpers.
3These functions still export the older Solution/PropertyValue data model and are
4not part of the object-storage CSV import pipeline used by DataColumn/DataRow/DataCell.
5That separation is intentional for now and round-tripping through CSV import/export
6should not be assumed here.
7"""
9from core.auxiliary.models.Scenario import Scenario
10from core.auxiliary.models.Flowsheet import Flowsheet
11from core.auxiliary.models.PropertyValue import PropertyValue
12from core.auxiliary.models.Solution import Solution
15def values_per_index(scenario: Scenario):
16 if not scenario.enable_dynamics: 16 ↛ 19line 16 didn't jump to line 19 because the condition on line 16 was always true
17 return 1
18 else :
19 return scenario.num_time_steps
21# Tested in test_mss.py
22def export_scenario_data(flowsheet: Flowsheet, scenario: Scenario, properties: list[int] | None = None):
23 # Create a column for each property value
24 if properties:
25 solutions = Solution.objects.filter(scenario=scenario, property__property_id__in=properties).order_by("solve_index")
26 else:
27 solutions = Solution.objects.filter(scenario=scenario).order_by("solve_index")
29 property_values = (
30 PropertyValue.objects.filter(flowsheet=flowsheet, solutions__in=solutions)
31 .distinct()
32 .values(
33 'id',
34 'property__displayName',
35 'property__set__simulationObject__componentName',
36 'indexedItems__displayName'
37 )
38 )
40 # Temp container
41 grouped_data = {}
42 # Group indexed items by property value id
43 for row in property_values:
44 pv_id = row['id']
46 if pv_id not in grouped_data:
47 grouped_data[pv_id] = {
48 'uo_name': row['property__set__simulationObject__componentName'],
49 'prop_name': row['property__displayName'],
50 'indices': []
51 }
53 if row['indexedItems__displayName']:
54 grouped_data[pv_id]['indices'].append(row['indexedItems__displayName'])
56 # Create a list of blanks to fill in missing data
57 blanks = [None for _ in range(values_per_index(scenario))]
59 columns = {}
60 data = {}
62 for pv_id, info in grouped_data.items():
63 base_name = f"{info['uo_name']} - {info['prop_name']}"
64 index_names = " - ".join(info['indices']) # e.g Phases -> Liquid water, Phases -> Vapor water
65 column_name = f"{base_name} ({index_names})" if index_names else base_name
67 columns[pv_id] = column_name
68 data[column_name] = []
70 # Populate the columns
71 current_solve_index = 0
73 for solution in solutions:
74 if solution.solve_index > current_solve_index:
75 # Fill in blanks for missing solve indices
76 for _ in range(solution.solve_index - current_solve_index - 1): 76 ↛ 77line 76 didn't jump to line 77 because the loop on line 76 never started
77 for column_name in data.keys():
78 data[column_name].extend(blanks)
79 current_solve_index = solution.solve_index
81 column_name = columns[solution.property_id]
82 # Because the values are an array, we flatten them into the data column
83 data[column_name].extend(solution.values)
85 return data
87def collate(data: dict[str,list]):
88 """
89 Collate the data into rows for CSV export
90 """
91 # Collate the data into rows
92 # Each row is a dict with keys as column names
93 rows = []
94 max_length = max((len(v) for v in data.values()),default=0)
95 for i in range(max_length): 95 ↛ 96line 95 didn't jump to line 96 because the loop on line 95 never started
96 row = {}
97 for key, values in data.items():
98 row[key] = values[i] if i < len(values) else None
99 rows.append(row)
100 return rows