Coverage for backend/ahuora-builder/src/ahuora_builder/ml_wizard.py: 98%

54 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2026-05-13 02:47 +0000

1import contextlib 

2import json 

3from dataclasses import dataclass 

4from io import StringIO 

5from typing import Any 

6 

7import pandas as pd 

8import numpy as np 

9from sklearn.model_selection import train_test_split 

10from sklearn.metrics import mean_squared_error, r2_score 

11from idaes.core.surrogate.pysmo_surrogate import PysmoRBFTrainer, PysmoSurrogate 

12 

13from ahuora_builder_types.payloads.ml_request_schema import ( 

14 MLTrainingChartPayload as MLChart, 

15 MLTrainingRegressionMetricPayload as RegressionMetric, 

16 MLTrainingResultPayload as MLResult, 

17) 

18 

19 

20MAX_QQ_CHART_POINTS = 250 

21 

22 

23@dataclass 

24class TrainingOutput: 

25 """Intermediate ML training artefacts before object-storage upload.""" 

26 

27 result_payload: MLResult 

28 test_inputs_df: pd.DataFrame 

29 test_outputs_df: pd.DataFrame 

30 

31 

32def _json_indexed_records(frame: pd.DataFrame) -> dict[str, dict[str, float]]: 

33 """Convert a dataframe to a JSON-safe indexed mapping with string row keys.""" 

34 return { 

35 str(index): { 

36 str(column): float(value) 

37 for column, value in row.items() 

38 } 

39 for index, row in frame.to_dict(orient="index").items() 

40 } 

41 

42 

43def train_dataframe( 

44 df: pd.DataFrame, 

45 *, 

46 input_labels: list[str], 

47 output_labels: list[str], 

48) -> TrainingOutput: 

49 """Train a PySMO surrogate model from a dataframe and return upload-ready artefacts.""" 

50 train_df, test_df = train_test_split(df, test_size=0.2, random_state=42) 

51 

52 trainer = PysmoRBFTrainer( 

53 input_labels=input_labels, output_labels=output_labels, training_dataframe=train_df) 

54 trainer.config.basis_function = 'gaussian' 

55 

56 # Train surrogate (calls PySMO through IDAES Python wrapper) 

57 stream = StringIO() 

58 with contextlib.redirect_stdout(stream): 

59 rbf_train = trainer.train_surrogate() 

60 

61 

62 # create callable surrogate model 

63 rbf_surr = PysmoSurrogate(rbf_train, input_labels, 

64 output_labels, input_bounds=None) 

65 f = StringIO() 

66 rbf_surr.save(f) 

67 content = f.getvalue() 

68 json_data = json.loads(content) 

69 

70 df_evaluate = rbf_surr.evaluate_surrogate(test_df) 

71 

72 metrics: list[RegressionMetric] = [] 

73 charts: list[MLChart] = [] 

74 

75 for output_label in output_labels: 

76 charts.append(compute_chart( 

77 test_df[output_label], df_evaluate[output_label], output_label)) 

78 metrics.append( 

79 RegressionMetric( 

80 mean_squared_error=round( 

81 mean_squared_error(df_evaluate[output_label], test_df[output_label]), 

82 4, 

83 ), 

84 r2_score=round( 

85 r2_score(df_evaluate[output_label], test_df[output_label]), 

86 4, 

87 ), 

88 ) 

89 ) 

90 

91 return TrainingOutput( 

92 result_payload=MLResult( 

93 surrogate_model=json_data, 

94 charts=charts, 

95 metrics=metrics, 

96 test_results_bucket="", 

97 test_results_key="", 

98 timing={}, 

99 ), 

100 test_inputs_df=test_df[input_labels].copy(), 

101 test_outputs_df=df_evaluate[output_labels].copy(), 

102 ) 

103 

104 

105def compute_chart( 

106 test_data_df: pd.Series, 

107 eval_data_df: pd.Series, 

108 output_label: str, 

109) -> MLChart: 

110 """Build the chart payload for one output label.""" 

111 minn = round(np.min([np.min(test_data_df), np.min(eval_data_df)]), 4) 

112 maxx = round(np.max([np.max(test_data_df), np.max(eval_data_df)]), 4) 

113 qq_plot_data = compute_qq_coordinates(test_data_df, eval_data_df) 

114 

115 return MLChart( 

116 min=minn, 

117 max=maxx, 

118 qq_plot_data=qq_plot_data, 

119 output_label=output_label, 

120 ) 

121 

122 

123def compute_qq_coordinates(test_data: pd.Series, eval_data: pd.Series) -> str: 

124 """Compute QQ plot coordinates for two datasets as a frontend-ready JSON string.""" 

125 test_values = test_data.to_numpy().flatten() 

126 eval_values = eval_data.to_numpy().flatten() 

127 

128 # Sort values 

129 test_values.sort() 

130 eval_values.sort() 

131 

132 # Generate QQ plot data points 

133 n_points = min(len(test_values), MAX_QQ_CHART_POINTS) 

134 quantiles = np.linspace(0, 1, n_points) 

135 test_quantiles = np.quantile(test_values, quantiles) 

136 eval_quantiles = np.quantile(eval_values, quantiles) 

137 

138 # Prepare JSON response for frontend 

139 qq_data = [{"x": round(float(t), 4), "y": round(float(e), 4)} 

140 for t, e in zip(test_quantiles, eval_quantiles)] 

141 return json.dumps(qq_data)