1use crate::error::{Result, SdkError};
10use std::collections::HashMap;
11use std::path::{Path, PathBuf};
12use std::sync::Mutex;
13use std::time::{Duration, Instant};
14
15pub type Handle = String;
17
18#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
20pub enum StashFormat {
21 Json,
22 Bytes,
23}
24
25#[derive(Debug, Clone)]
27#[allow(dead_code)]
28pub struct StashEntry {
29 pub path: PathBuf,
30 pub result_name: String,
31 pub format: StashFormat,
32 pub size_bytes: u64,
33 pub sdk_owned: bool,
36 pub task_id: i32,
37 pub created_at: Instant,
38}
39
40#[derive(Debug, Clone)]
42pub struct StashConfig {
43 pub threshold_bytes: usize,
45 pub ttl: Duration,
47}
48
49impl Default for StashConfig {
50 fn default() -> Self {
51 Self {
52 threshold_bytes: 1024 * 1024, ttl: Duration::from_secs(120),
54 }
55 }
56}
57
58pub struct ResultStash {
60 stash_dir: PathBuf,
61 config: StashConfig,
62 entries: Mutex<HashMap<Handle, StashEntry>>,
63}
64
65impl ResultStash {
66 pub fn new(stash_dir: PathBuf, config: StashConfig) -> Result<Self> {
69 std::fs::create_dir_all(&stash_dir).map_err(SdkError::Io)?;
70 Ok(Self {
71 stash_dir,
72 config,
73 entries: Mutex::new(HashMap::new()),
74 })
75 }
76
77 pub fn config(&self) -> &StashConfig {
79 &self.config
80 }
81
82 pub fn insert_bytes(
85 &self,
86 task_id: i32,
87 result_name: String,
88 format: StashFormat,
89 data: Vec<u8>,
90 ) -> Result<Handle> {
91 let handle = uuid::Uuid::new_v4().to_string();
92 let path = self.stash_dir.join(format!("{handle}.bin"));
93 std::fs::write(&path, &data).map_err(SdkError::Io)?;
94 let size_bytes = data.len() as u64;
95
96 let entry = StashEntry {
97 path,
98 result_name,
99 format,
100 size_bytes,
101 sdk_owned: true,
102 task_id,
103 created_at: Instant::now(),
104 };
105 self.entries
106 .lock()
107 .map_err(|_| SdkError::Channel("stash mutex poisoned".into()))?
108 .insert(handle.clone(), entry);
109 Ok(handle)
110 }
111
112 pub fn insert_file(
115 &self,
116 task_id: i32,
117 result_name: String,
118 format: StashFormat,
119 path: PathBuf,
120 ) -> Result<Handle> {
121 let size_bytes = std::fs::metadata(&path).map_err(SdkError::Io)?.len();
122 let handle = uuid::Uuid::new_v4().to_string();
123 let entry = StashEntry {
124 path,
125 result_name,
126 format,
127 size_bytes,
128 sdk_owned: false,
129 task_id,
130 created_at: Instant::now(),
131 };
132 self.entries
133 .lock()
134 .map_err(|_| SdkError::Channel("stash mutex poisoned".into()))?
135 .insert(handle.clone(), entry);
136 Ok(handle)
137 }
138
139 pub fn take(&self, handle: &str) -> Option<StashEntry> {
145 self.entries
146 .lock()
147 .unwrap_or_else(|e| e.into_inner())
148 .remove(handle)
149 }
150
151 pub fn peek_size(&self, handle: &str) -> Option<u64> {
153 self.entries
154 .lock()
155 .unwrap_or_else(|e| e.into_inner())
156 .get(handle)
157 .map(|e| e.size_bytes)
158 }
159
160 pub fn cleanup_after_pull(&self, entry: &StashEntry) {
163 if entry.sdk_owned {
164 let _ = std::fs::remove_file(&entry.path);
165 }
166 }
167
168 #[allow(dead_code)]
172 pub fn sweep_task(&self, task_id: i32) -> usize {
173 self.sweep_where(|e| e.task_id == task_id)
174 }
175
176 pub fn sweep_expired(&self) -> usize {
180 let ttl = self.config.ttl;
181 let now = Instant::now();
182 self.sweep_where(|e| now.saturating_duration_since(e.created_at) > ttl)
183 }
184
185 fn sweep_where(&self, predicate: impl Fn(&StashEntry) -> bool) -> usize {
186 let mut entries = self.entries.lock().unwrap_or_else(|e| e.into_inner());
187 let handles: Vec<Handle> = entries
188 .iter()
189 .filter_map(|(h, e)| if predicate(e) { Some(h.clone()) } else { None })
190 .collect();
191 let n = handles.len();
192 for h in handles {
193 if let Some(entry) = entries.remove(&h)
194 && entry.sdk_owned
195 {
196 let _ = std::fs::remove_file(&entry.path);
197 }
198 }
199 n
200 }
201
202 pub fn sweep_orphans_on_startup(stash_dir: &Path) -> std::io::Result<usize> {
207 if !stash_dir.exists() {
208 return Ok(0);
209 }
210 let mut n = 0;
211 for entry in std::fs::read_dir(stash_dir)? {
212 let entry = entry?;
213 let path = entry.path();
214 if path.is_file() {
215 let _ = std::fs::remove_file(&path);
216 n += 1;
217 }
218 }
219 Ok(n)
220 }
221}
222
223#[cfg(test)]
224mod tests {
225 use super::*;
226
227 fn temp_stash() -> (tempfile::TempDir, ResultStash) {
228 let dir = tempfile::tempdir().unwrap();
229 let stash_path = dir.path().join("_stash");
230 let stash = ResultStash::new(stash_path, StashConfig::default()).unwrap();
231 (dir, stash)
232 }
233
234 #[test]
235 fn insert_bytes_writes_file_and_returns_handle() {
236 let (_dir, stash) = temp_stash();
237 let handle = stash
238 .insert_bytes(1, "r".into(), StashFormat::Bytes, vec![1, 2, 3])
239 .unwrap();
240 assert!(!handle.is_empty());
241 let entry = stash.take(&handle).expect("entry should exist");
242 assert_eq!(entry.size_bytes, 3);
243 assert!(entry.sdk_owned);
244 let contents = std::fs::read(&entry.path).unwrap();
245 assert_eq!(contents, vec![1, 2, 3]);
246 }
247
248 #[test]
249 fn insert_file_records_path_without_copy() {
250 let dir = tempfile::tempdir().unwrap();
251 let plugin_file = dir.path().join("artifact.bin");
252 std::fs::write(&plugin_file, b"hello world").unwrap();
253
254 let stash_dir = dir.path().join("_stash");
255 let stash = ResultStash::new(stash_dir, StashConfig::default()).unwrap();
256 let handle = stash
257 .insert_file(7, "cap".into(), StashFormat::Bytes, plugin_file.clone())
258 .unwrap();
259
260 let entry = stash.take(&handle).unwrap();
261 assert_eq!(entry.path, plugin_file);
262 assert_eq!(entry.size_bytes, b"hello world".len() as u64);
263 assert!(!entry.sdk_owned);
264 }
265
266 #[test]
267 fn cleanup_after_pull_deletes_only_sdk_owned_files() {
268 let (_dir, stash) = temp_stash();
269 let h1 = stash
270 .insert_bytes(1, "r1".into(), StashFormat::Json, vec![9])
271 .unwrap();
272 let e1 = stash.take(&h1).unwrap();
273 assert!(e1.path.exists());
274 stash.cleanup_after_pull(&e1);
275 assert!(!e1.path.exists());
276
277 let outer = tempfile::tempdir().unwrap();
279 let plugin_file = outer.path().join("keep.bin");
280 std::fs::write(&plugin_file, b"x").unwrap();
281 let h2 = stash
282 .insert_file(2, "r2".into(), StashFormat::Bytes, plugin_file.clone())
283 .unwrap();
284 let e2 = stash.take(&h2).unwrap();
285 stash.cleanup_after_pull(&e2);
286 assert!(
287 plugin_file.exists(),
288 "plugin-owned file must not be deleted"
289 );
290 }
291
292 #[test]
293 fn sweep_task_removes_all_entries_for_task() {
294 let (_dir, stash) = temp_stash();
295 let h1 = stash
296 .insert_bytes(5, "a".into(), StashFormat::Bytes, vec![1])
297 .unwrap();
298 let _ = stash
299 .insert_bytes(5, "b".into(), StashFormat::Bytes, vec![2])
300 .unwrap();
301 let _ = stash
302 .insert_bytes(6, "c".into(), StashFormat::Bytes, vec![3])
303 .unwrap();
304
305 let reclaimed = stash.sweep_task(5);
306 assert_eq!(reclaimed, 2);
307
308 assert!(stash.take(&h1).is_none());
310 }
311
312 #[test]
313 fn sweep_expired_reclaims_old_entries() {
314 let dir = tempfile::tempdir().unwrap();
315 let stash = ResultStash::new(
316 dir.path().join("_stash"),
317 StashConfig {
318 threshold_bytes: 1024,
319 ttl: Duration::from_millis(1),
320 },
321 )
322 .unwrap();
323
324 let _ = stash
325 .insert_bytes(1, "r".into(), StashFormat::Bytes, vec![0])
326 .unwrap();
327 std::thread::sleep(Duration::from_millis(10));
328
329 let reclaimed = stash.sweep_expired();
330 assert_eq!(reclaimed, 1);
331 }
332
333 #[test]
334 fn sweep_orphans_deletes_loose_files() {
335 let dir = tempfile::tempdir().unwrap();
336 let stash_dir = dir.path().join("_stash");
337 std::fs::create_dir_all(&stash_dir).unwrap();
338 std::fs::write(stash_dir.join("orphan1.bin"), b"x").unwrap();
339 std::fs::write(stash_dir.join("orphan2.bin"), b"y").unwrap();
340
341 let n = ResultStash::sweep_orphans_on_startup(&stash_dir).unwrap();
342 assert_eq!(n, 2);
343 assert!(stash_dir.read_dir().unwrap().next().is_none());
344 }
345}