Coverage for backend/django/core/auxiliary/methods/export_scenario_data.py: 81%

48 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2026-05-13 02:47 +0000

1"""Legacy scenario export helpers. 

2 

3These functions still export the older Solution/PropertyValue data model and are 

4not part of the object-storage CSV import pipeline used by DataColumn/DataRow/DataCell. 

5That separation is intentional for now and round-tripping through CSV import/export 

6should not be assumed here. 

7""" 

8 

9from core.auxiliary.models.Scenario import Scenario 

10from core.auxiliary.models.Flowsheet import Flowsheet 

11from core.auxiliary.models.PropertyValue import PropertyValue 

12from core.auxiliary.models.Solution import Solution 

13 

14 

15def values_per_index(scenario: Scenario): 

16 if not scenario.enable_dynamics: 16 ↛ 19line 16 didn't jump to line 19 because the condition on line 16 was always true

17 return 1 

18 else : 

19 return scenario.num_time_steps 

20 

21# Tested in test_mss.py 

22def export_scenario_data(flowsheet: Flowsheet, scenario: Scenario, properties: list[int] | None = None): 

23 # Create a column for each property value 

24 if properties: 

25 solutions = Solution.objects.filter(scenario=scenario, property__property_id__in=properties).order_by("solve_index") 

26 else: 

27 solutions = Solution.objects.filter(scenario=scenario).order_by("solve_index") 

28 

29 property_values = ( 

30 PropertyValue.objects.filter(flowsheet=flowsheet, solutions__in=solutions) 

31 .distinct() 

32 .values( 

33 'id', 

34 'property__displayName', 

35 'property__set__simulationObject__componentName', 

36 'indexedItems__displayName' 

37 ) 

38 ) 

39 

40 # Temp container 

41 grouped_data = {} 

42 # Group indexed items by property value id 

43 for row in property_values: 

44 pv_id = row['id'] 

45 

46 if pv_id not in grouped_data: 

47 grouped_data[pv_id] = { 

48 'uo_name': row['property__set__simulationObject__componentName'], 

49 'prop_name': row['property__displayName'], 

50 'indices': [] 

51 } 

52 

53 if row['indexedItems__displayName']: 

54 grouped_data[pv_id]['indices'].append(row['indexedItems__displayName']) 

55 

56 # Create a list of blanks to fill in missing data 

57 blanks = [None for _ in range(values_per_index(scenario))] 

58 

59 columns = {} 

60 data = {} 

61 

62 for pv_id, info in grouped_data.items(): 

63 base_name = f"{info['uo_name']} - {info['prop_name']}" 

64 index_names = " - ".join(info['indices']) # e.g Phases -> Liquid water, Phases -> Vapor water 

65 column_name = f"{base_name} ({index_names})" if index_names else base_name 

66 

67 columns[pv_id] = column_name 

68 data[column_name] = [] 

69 

70 # Populate the columns 

71 current_solve_index = 0 

72 

73 for solution in solutions: 

74 if solution.solve_index > current_solve_index: 

75 # Fill in blanks for missing solve indices 

76 for _ in range(solution.solve_index - current_solve_index - 1): 76 ↛ 77line 76 didn't jump to line 77 because the loop on line 76 never started

77 for column_name in data.keys(): 

78 data[column_name].extend(blanks) 

79 current_solve_index = solution.solve_index 

80 

81 column_name = columns[solution.property_id] 

82 # Because the values are an array, we flatten them into the data column 

83 data[column_name].extend(solution.values) 

84 

85 return data 

86 

87def collate(data: dict[str,list]): 

88 """ 

89 Collate the data into rows for CSV export 

90 """ 

91 # Collate the data into rows 

92 # Each row is a dict with keys as column names 

93 rows = [] 

94 max_length = max((len(v) for v in data.values()),default=0) 

95 for i in range(max_length): 95 ↛ 96line 95 didn't jump to line 96 because the loop on line 95 never started

96 row = {} 

97 for key, values in data.items(): 

98 row[key] = values[i] if i < len(values) else None 

99 rows.append(row) 

100 return rows 

101 

102