Coverage for backend/django/core/auxiliary/services/object_storage/s3.py: 77%

98 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2026-05-13 02:47 +0000

1import hashlib 

2import re 

3from datetime import UTC, datetime 

4from functools import lru_cache 

5 

6import boto3 

7from botocore.client import BaseClient 

8from botocore.config import Config 

9from botocore.exceptions import ClientError 

10from botocore.response import StreamingBody 

11from django.conf import settings 

12 

13from core.auxiliary.services.upload_types import ( 

14 CompletedMultipartPart, 

15 CompletedMultipartUploadStorageResult, 

16 UploadedPart, 

17) 

18 

19 

20DEFAULT_REGION = "us-east-1" 

21 

22 

23def get_bucket_name() -> str: 

24 """Return the configured default object-storage bucket name.""" 

25 bucket = settings.SEAWEED_S3_BUCKET 

26 if not bucket: 26 ↛ 27line 26 didn't jump to line 27 because the condition on line 26 was never true

27 raise RuntimeError("SEAWEED_S3_BUCKET is not configured.") 

28 return bucket 

29 

30 

31def _client_config() -> Config: 

32 """Build a botocore config for the configured S3-compatible object store.""" 

33 config_kwargs = {"signature_version": "s3v4"} 

34 if settings.SEAWEED_S3_FORCE_PATH_STYLE: 34 ↛ 36line 34 didn't jump to line 36 because the condition on line 34 was always true

35 config_kwargs["s3"] = {"addressing_style": "path"} 

36 return Config(**config_kwargs) 

37 

38 

39def _build_client(endpoint_url: str) -> BaseClient: 

40 """Create an S3 client for the provided internal or public endpoint URL.""" 

41 return boto3.client( 

42 "s3", 

43 endpoint_url=endpoint_url, 

44 aws_access_key_id=settings.SEAWEED_S3_ACCESS_KEY, 

45 aws_secret_access_key=settings.SEAWEED_S3_SECRET_KEY, 

46 region_name=settings.SEAWEED_S3_REGION or DEFAULT_REGION, 

47 config=_client_config(), 

48 ) 

49 

50 

51@lru_cache(maxsize=1) 

52def get_s3_client() -> BaseClient: 

53 """Return the cached internal S3 client used for storage operations.""" 

54 endpoint_url = settings.SEAWEED_S3_ENDPOINT 

55 if not endpoint_url: 55 ↛ 56line 55 didn't jump to line 56 because the condition on line 55 was never true

56 raise RuntimeError("SEAWEED_S3_ENDPOINT is not configured.") 

57 return _build_client(endpoint_url) 

58 

59 

60@lru_cache(maxsize=1) 

61def get_presign_client() -> BaseClient: 

62 """Return the cached S3 client used to generate browser-facing presigned URLs.""" 

63 endpoint_url = settings.SEAWEED_S3_PUBLIC_ENDPOINT or settings.SEAWEED_S3_ENDPOINT 

64 if not endpoint_url: 64 ↛ 65line 64 didn't jump to line 65 because the condition on line 64 was never true

65 raise RuntimeError("SEAWEED_S3_PUBLIC_ENDPOINT or SEAWEED_S3_ENDPOINT must be configured.") 

66 return _build_client(endpoint_url) 

67 

68 

69def _lifecycle_rule_id(prefix: str) -> str: 

70 """Build a deterministic lifecycle rule identifier for a prefix.""" 

71 return f"expire-{hashlib.sha1(prefix.encode('utf-8')).hexdigest()[:16]}" 

72 

73 

74def _get_lifecycle_rules(bucket: str) -> list[dict]: 

75 """Fetch lifecycle rules for a bucket, tolerating an empty configuration.""" 

76 try: 

77 response = get_s3_client().get_bucket_lifecycle_configuration(Bucket=bucket) 

78 except ClientError as exc: 

79 error_code = exc.response.get("Error", {}).get("Code") 

80 if error_code == "NoSuchLifecycleConfiguration": 80 ↛ 82line 80 didn't jump to line 82 because the condition on line 80 was always true

81 return [] 

82 raise 

83 return list(response.get("Rules", [])) 

84 

85 

86def _put_lifecycle_rule(bucket: str, rule: dict) -> None: 

87 """Upsert a lifecycle rule on the bucket.""" 

88 rules = [existing for existing in _get_lifecycle_rules(bucket) if existing.get("ID") != rule["ID"]] 

89 rules.append(rule) 

90 get_s3_client().put_bucket_lifecycle_configuration( 

91 Bucket=bucket, 

92 LifecycleConfiguration={"Rules": rules}, 

93 ) 

94 

95 

96def _sanitize_content_disposition_filename(filename: str) -> str: 

97 sanitized = re.sub(r'[\r\n"\\;]', "_", filename).strip() 

98 return sanitized or "download" 

99 

100 

101def schedule_object_expiration(bucket: str, key: str, expires_at: datetime) -> None: 

102 """Schedule expiration for a single object using an exact-key prefix rule.""" 

103 # S3-compatible lifecycle prefix filters are starts-with matches rather than strict 

104 # equality checks. We use the full object key here; because upload keys include a 

105 # UUID segment, the effective match remains unique for our generated objects. 

106 _put_lifecycle_rule( 

107 bucket, 

108 { 

109 "ID": _lifecycle_rule_id(key), 

110 "Status": "Enabled", 

111 "Filter": {"Prefix": key}, 

112 "Expiration": {"Date": expires_at.astimezone(UTC).replace(microsecond=0)}, 

113 }, 

114 ) 

115 

116 

117def create_multipart_upload(bucket: str, key: str, content_type: str) -> str: 

118 """Start a multipart upload and return the storage upload id.""" 

119 response = get_s3_client().create_multipart_upload( 

120 Bucket=bucket, 

121 Key=key, 

122 ContentType=content_type or "text/csv", 

123 ) 

124 return response["UploadId"] 

125 

126 

127def presign_upload_part( 

128 bucket: str, 

129 key: str, 

130 upload_id: str, 

131 part_number: int, 

132 expires_seconds: int, 

133) -> str: 

134 """Generate a presigned URL for one multipart upload part.""" 

135 return get_presign_client().generate_presigned_url( 

136 "upload_part", 

137 Params={ 

138 "Bucket": bucket, 

139 "Key": key, 

140 "UploadId": upload_id, 

141 "PartNumber": part_number, 

142 }, 

143 ExpiresIn=expires_seconds, 

144 ) 

145 

146 

147def presign_download_url( 

148 bucket: str, 

149 key: str, 

150 filename: str, 

151 expires_seconds: int = 3600, 

152) -> str: 

153 """Generate a presigned URL for downloading an object from S3-compatible storage.""" 

154 safe_filename = _sanitize_content_disposition_filename(filename) 

155 return get_presign_client().generate_presigned_url( 

156 "get_object", 

157 Params={ 

158 "Bucket": bucket, 

159 "Key": key, 

160 "ResponseContentDisposition": f'attachment; filename="{safe_filename}"', 

161 }, 

162 ExpiresIn=expires_seconds, 

163 ) 

164 

165 

166def list_uploaded_parts(bucket: str, key: str, upload_id: str) -> list[UploadedPart]: 

167 """Return all uploaded multipart parts currently stored for an upload id.""" 

168 client = get_s3_client() 

169 parts: list[UploadedPart] = [] 

170 part_number_marker = 0 

171 

172 while True: 

173 response = client.list_parts( 

174 Bucket=bucket, 

175 Key=key, 

176 UploadId=upload_id, 

177 PartNumberMarker=part_number_marker, 

178 ) 

179 parts.extend( 

180 UploadedPart( 

181 part_number=part["PartNumber"], 

182 etag=part["ETag"], 

183 size_bytes=part["Size"], 

184 ) 

185 for part in response.get("Parts", []) 

186 ) 

187 if not response.get("IsTruncated"): 187 ↛ 189line 187 didn't jump to line 189 because the condition on line 187 was always true

188 break 

189 part_number_marker = response.get("NextPartNumberMarker", 0) 

190 

191 return parts 

192 

193 

194def complete_multipart_upload( 

195 bucket: str, 

196 key: str, 

197 upload_id: str, 

198 parts: list[CompletedMultipartPart], 

199) -> CompletedMultipartUploadStorageResult: 

200 """Finalize a multipart upload and return the storage completion metadata.""" 

201 sorted_parts = sorted(parts, key=lambda part: part.part_number) 

202 response = get_s3_client().complete_multipart_upload( 

203 Bucket=bucket, 

204 Key=key, 

205 UploadId=upload_id, 

206 MultipartUpload={ 

207 "Parts": [ 

208 {"PartNumber": part.part_number, "ETag": part.etag} 

209 for part in sorted_parts 

210 ] 

211 }, 

212 ) 

213 return CompletedMultipartUploadStorageResult( 

214 bucket=bucket, 

215 object_key=key, 

216 etag=response.get("ETag"), 

217 location=response.get("Location"), 

218 ) 

219 

220 

221def abort_multipart_upload(bucket: str, key: str, upload_id: str) -> None: 

222 """Abort a multipart upload that should no longer accept browser parts.""" 

223 get_s3_client().abort_multipart_upload(Bucket=bucket, Key=key, UploadId=upload_id) 

224 

225 

226def get_range(bucket: str, key: str, byte_range: str) -> bytes: 

227 """Read a byte range from an object for lightweight CSV inspection.""" 

228 response = get_s3_client().get_object(Bucket=bucket, Key=key, Range=f"bytes={byte_range}") 

229 return response["Body"].read() 

230 

231 

232def stream_object(bucket: str, key: str) -> StreamingBody: 

233 """Open a streaming reader for the full object body.""" 

234 response = get_s3_client().get_object(Bucket=bucket, Key=key) 

235 return response["Body"] 

236 

237 

238def object_exists(bucket: str, key: str) -> bool: 

239 """Return whether the configured object is still available in object storage.""" 

240 try: 

241 get_s3_client().head_object(Bucket=bucket, Key=key) 

242 except ClientError as exc: 

243 error_code = exc.response.get("Error", {}).get("Code") 

244 if error_code in {"404", "NoSuchKey", "NotFound"}: 

245 return False 

246 raise 

247 return True 

248 

249 

250def delete_object(bucket: str, key: str) -> None: 

251 """Delete an object from S3-compatible object storage.""" 

252 get_s3_client().delete_object(Bucket=bucket, Key=key)