Coverage for backend/common/src/common/services/csv.py: 84%

91 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2026-05-13 02:47 +0000

1import csv 

2import io 

3from dataclasses import dataclass 

4 

5 

6SUPPORTED_DELIMITERS = (",", ";") 

7UNSUPPORTED_DELIMITER_HINTS = ("|", "\t") 

8DEFAULT_SINGLE_COLUMN_DELIMITER = "," 

9DEFAULT_PREVIEW_ROWS = 10 

10DEFAULT_INSPECTION_BYTES = 1024 * 1024 

11 

12 

13class CsvInspectionError(ValueError): 

14 pass 

15 

16 

17@dataclass(slots=True) 

18class CsvInspectionResult: 

19 headers: list[str] 

20 delimiter: str 

21 preview_rows: list[dict[str, str]] 

22 warnings: list[str] 

23 

24 

25def decode_csv_bytes(sample_bytes: bytes) -> str: 

26 try: 

27 return sample_bytes.decode("utf-8-sig") 

28 except UnicodeDecodeError as exc: 

29 raise CsvInspectionError("CSV files must be UTF-8 encoded text.") from exc 

30 

31 

32def detect_delimiter(sample_text: str) -> str: 

33 """ 

34 Detect a supported CSV delimiter. 

35 

36 Single-column CSV files are valid and naturally contain no delimiter 

37 characters, so we default those to comma after ruling out obvious 

38 unsupported delimiters (for example tab or pipe). 

39 """ 

40 lines = [line for line in sample_text.splitlines() if line.strip()] 

41 if not lines: 41 ↛ 42line 41 didn't jump to line 42 because the condition on line 41 was never true

42 raise CsvInspectionError("The uploaded CSV is empty.") 

43 

44 sample = "\n".join(lines[:10]) 

45 try: 

46 dialect = csv.Sniffer().sniff(sample, delimiters="".join(SUPPORTED_DELIMITERS)) 

47 delimiter = dialect.delimiter 

48 except csv.Error: 

49 header_line = lines[0] 

50 counts = {delimiter: header_line.count(delimiter) for delimiter in SUPPORTED_DELIMITERS} 

51 if any(counts.values()): 51 ↛ 52line 51 didn't jump to line 52 because the condition on line 51 was never true

52 delimiter = max(counts, key=counts.get) 

53 else: 

54 candidate_lines = lines[:2] 

55 has_consistent_unsupported_delimiter = any( 

56 candidate_lines and all(line.count(hint) > 0 for line in candidate_lines) 

57 for hint in UNSUPPORTED_DELIMITER_HINTS 

58 ) 

59 if has_consistent_unsupported_delimiter: 

60 raise CsvInspectionError("CSV delimiter must be either ',' or ';'.") 

61 delimiter = DEFAULT_SINGLE_COLUMN_DELIMITER 

62 

63 if delimiter not in SUPPORTED_DELIMITERS: 63 ↛ 64line 63 didn't jump to line 64 because the condition on line 63 was never true

64 raise CsvInspectionError("CSV delimiter must be either ',' or ';'.") 

65 

66 return delimiter 

67 

68 

69def inspect_csv_sample(sample_bytes: bytes, preview_rows: int = DEFAULT_PREVIEW_ROWS) -> CsvInspectionResult: 

70 sample_text = decode_csv_bytes(sample_bytes) 

71 delimiter = detect_delimiter(sample_text) 

72 reader = csv.DictReader(io.StringIO(sample_text), delimiter=delimiter) 

73 headers = [header for header in (reader.fieldnames or []) if header] 

74 if not headers: 74 ↛ 75line 74 didn't jump to line 75 because the condition on line 74 was never true

75 raise CsvInspectionError("The uploaded CSV must include a header row.") 

76 

77 preview: list[dict[str, str]] = [] 

78 for row in reader: 

79 preview.append({header: row.get(header, "") for header in headers}) 

80 if len(preview) >= preview_rows: 

81 break 

82 

83 return CsvInspectionResult( 

84 headers=headers, 

85 delimiter=delimiter, 

86 preview_rows=preview, 

87 warnings=[], 

88 ) 

89 

90 

91def stream_csv_rows(binary_stream, delimiter: str): 

92 if delimiter not in SUPPORTED_DELIMITERS: 92 ↛ 93line 92 didn't jump to line 93 because the condition on line 92 was never true

93 raise CsvInspectionError("CSV delimiter must be either ',' or ';'.") 

94 

95 text_stream = io.TextIOWrapper(binary_stream, encoding="utf-8-sig", newline="") 

96 reader = csv.DictReader(text_stream, delimiter=delimiter) 

97 headers = [header for header in (reader.fieldnames or []) if header] 

98 if not headers: 98 ↛ 99line 98 didn't jump to line 99 because the condition on line 98 was never true

99 raise CsvInspectionError("The uploaded CSV must include a header row.") 

100 

101 return headers, reader 

102 

103 

104def parse_float_cell( 

105 row_number: int, 

106 row: dict[str, str | None], 

107 column_name: str, 

108 *, 

109 required: bool, 

110) -> float | None: 

111 raw_value = (row.get(column_name) or "").strip() 

112 if not raw_value: 

113 if required: 113 ↛ 114line 113 didn't jump to line 114 because the condition on line 113 was never true

114 raise CsvInspectionError(f"Row {row_number}, column '{column_name}' is empty.") 

115 return None 

116 

117 try: 

118 return float(raw_value) 

119 except ValueError as exc: 

120 raise CsvInspectionError( 

121 f"Row {row_number}, column '{column_name}' must contain a numeric value." 

122 ) from exc 

123 

124 

125def parse_numeric_row(row_number: int, row: dict[str, str | None], headers: list[str]) -> list[float]: 

126 return [ 

127 parse_float_cell(row_number, row, header, required=True) 

128 for header in headers 

129 ] 

130 

131 

132def filter_numeric_headers_by_first_row( 

133 headers: list[str], 

134 row: dict[str, str | None], 

135) -> tuple[list[str], list[str]]: 

136 numeric_headers: list[str] = [] 

137 skipped_headers: list[str] = [] 

138 

139 for header in headers: 

140 raw_value = (row.get(header) or "").strip() 

141 if not raw_value: 141 ↛ 142line 141 didn't jump to line 142 because the condition on line 141 was never true

142 skipped_headers.append(header) 

143 continue 

144 

145 try: 

146 float(raw_value) 

147 except ValueError: 

148 skipped_headers.append(header) 

149 continue 

150 

151 numeric_headers.append(header) 

152 

153 return numeric_headers, skipped_headers