Skip to main content

malbox_plugin_sdk/
stash.rs

1//! Disk-backed result stash for large guest-plugin results.
2//!
3//! Large results (above a configurable threshold) are written to a temp
4//! file under `<work_dir>/_stash/`, keyed by an opaque handle. The daemon
5//! pulls them via the `PullResult` gRPC RPC. SDK-written temp files are
6//! deleted after a successful pull; plugin-owned files (from
7//! `PluginResult::File`) are referenced by path and left alone.
8
9use crate::error::{Result, SdkError};
10use std::collections::HashMap;
11use std::path::{Path, PathBuf};
12use std::sync::Mutex;
13use std::time::{Duration, Instant};
14
15/// Opaque identifier for a stashed result. A UUID v4 string.
16pub type Handle = String;
17
18/// On-wire format hint, kept in sync with `proto::ResultFormat`.
19#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
20pub enum StashFormat {
21    Json,
22    Bytes,
23}
24
25/// Metadata for a single stashed result (path on disk, ownership, size).
26#[derive(Debug, Clone)]
27#[allow(dead_code)]
28pub struct StashEntry {
29    pub path: PathBuf,
30    pub result_name: String,
31    pub format: StashFormat,
32    pub size_bytes: u64,
33    /// True for SDK-written temp files (deleted after pull). False for
34    /// `PluginResult::File` references (left alone).
35    pub sdk_owned: bool,
36    pub task_id: i32,
37    pub created_at: Instant,
38}
39
40/// Controls when results are stashed to disk and how long they live.
41#[derive(Debug, Clone)]
42pub struct StashConfig {
43    /// Results above this many bytes go to disk + ref. Below stays inline.
44    pub threshold_bytes: usize,
45    /// Maximum age of an un-pulled entry before the TTL sweep reclaims it.
46    pub ttl: Duration,
47}
48
49impl Default for StashConfig {
50    fn default() -> Self {
51        Self {
52            threshold_bytes: 1024 * 1024, // 1 MB
53            ttl: Duration::from_secs(120),
54        }
55    }
56}
57
58/// Disk-backed result stash.
59pub struct ResultStash {
60    stash_dir: PathBuf,
61    config: StashConfig,
62    entries: Mutex<HashMap<Handle, StashEntry>>,
63}
64
65impl ResultStash {
66    /// Create a new stash rooted at `stash_dir`. Creates the directory if
67    /// missing.
68    pub fn new(stash_dir: PathBuf, config: StashConfig) -> Result<Self> {
69        std::fs::create_dir_all(&stash_dir).map_err(SdkError::Io)?;
70        Ok(Self {
71            stash_dir,
72            config,
73            entries: Mutex::new(HashMap::new()),
74        })
75    }
76
77    /// Return the stash configuration.
78    pub fn config(&self) -> &StashConfig {
79        &self.config
80    }
81
82    /// Insert an in-memory payload. Writes it to a new temp file in the
83    /// stash directory and returns the handle.
84    pub fn insert_bytes(
85        &self,
86        task_id: i32,
87        result_name: String,
88        format: StashFormat,
89        data: Vec<u8>,
90    ) -> Result<Handle> {
91        let handle = uuid::Uuid::new_v4().to_string();
92        let path = self.stash_dir.join(format!("{handle}.bin"));
93        std::fs::write(&path, &data).map_err(SdkError::Io)?;
94        let size_bytes = data.len() as u64;
95
96        let entry = StashEntry {
97            path,
98            result_name,
99            format,
100            size_bytes,
101            sdk_owned: true,
102            task_id,
103            created_at: Instant::now(),
104        };
105        self.entries
106            .lock()
107            .map_err(|_| SdkError::Channel("stash mutex poisoned".into()))?
108            .insert(handle.clone(), entry);
109        Ok(handle)
110    }
111
112    /// Insert a reference to an existing file (plugin-owned). The file is
113    /// not copied or touched, and is never deleted by the stash.
114    pub fn insert_file(
115        &self,
116        task_id: i32,
117        result_name: String,
118        format: StashFormat,
119        path: PathBuf,
120    ) -> Result<Handle> {
121        let size_bytes = std::fs::metadata(&path).map_err(SdkError::Io)?.len();
122        let handle = uuid::Uuid::new_v4().to_string();
123        let entry = StashEntry {
124            path,
125            result_name,
126            format,
127            size_bytes,
128            sdk_owned: false,
129            task_id,
130            created_at: Instant::now(),
131        };
132        self.entries
133            .lock()
134            .map_err(|_| SdkError::Channel("stash mutex poisoned".into()))?
135            .insert(handle.clone(), entry);
136        Ok(handle)
137    }
138
139    /// Take the entry for `handle`, removing it from the map.
140    ///
141    /// The returned entry's file has **not** been deleted - the caller
142    /// (typically `on_pull_result`) reads it, then calls
143    /// [`Self::cleanup_after_pull`] to delete SDK-owned files.
144    pub fn take(&self, handle: &str) -> Option<StashEntry> {
145        self.entries
146            .lock()
147            .unwrap_or_else(|e| e.into_inner())
148            .remove(handle)
149    }
150
151    /// Return `size_bytes` for `handle` without removing the entry.
152    pub fn peek_size(&self, handle: &str) -> Option<u64> {
153        self.entries
154            .lock()
155            .unwrap_or_else(|e| e.into_inner())
156            .get(handle)
157            .map(|e| e.size_bytes)
158    }
159
160    /// Delete the SDK-written temp file for a pulled entry. No-op for
161    /// plugin-owned files.
162    pub fn cleanup_after_pull(&self, entry: &StashEntry) {
163        if entry.sdk_owned {
164            let _ = std::fs::remove_file(&entry.path);
165        }
166    }
167
168    /// Remove all entries for a finished task. SDK-owned files are deleted;
169    /// plugin-owned files are left alone. Returns the number of entries
170    /// reclaimed.
171    #[allow(dead_code)]
172    pub fn sweep_task(&self, task_id: i32) -> usize {
173        self.sweep_where(|e| e.task_id == task_id)
174    }
175
176    /// Remove entries older than `self.config.ttl`. SDK-owned files deleted;
177    /// plugin-owned files left alone. Returns the number of entries
178    /// reclaimed.
179    pub fn sweep_expired(&self) -> usize {
180        let ttl = self.config.ttl;
181        let now = Instant::now();
182        self.sweep_where(|e| now.saturating_duration_since(e.created_at) > ttl)
183    }
184
185    fn sweep_where(&self, predicate: impl Fn(&StashEntry) -> bool) -> usize {
186        let mut entries = self.entries.lock().unwrap_or_else(|e| e.into_inner());
187        let handles: Vec<Handle> = entries
188            .iter()
189            .filter_map(|(h, e)| if predicate(e) { Some(h.clone()) } else { None })
190            .collect();
191        let n = handles.len();
192        for h in handles {
193            if let Some(entry) = entries.remove(&h)
194                && entry.sdk_owned
195            {
196                let _ = std::fs::remove_file(&entry.path);
197            }
198        }
199        n
200    }
201
202    /// Sweep any files in `stash_dir` that are not tracked in `entries`.
203    /// Called once on startup to clean up orphans from a previous crashed
204    /// run. Does not touch files referenced by live entries (which is
205    /// always empty on startup).
206    pub fn sweep_orphans_on_startup(stash_dir: &Path) -> std::io::Result<usize> {
207        if !stash_dir.exists() {
208            return Ok(0);
209        }
210        let mut n = 0;
211        for entry in std::fs::read_dir(stash_dir)? {
212            let entry = entry?;
213            let path = entry.path();
214            if path.is_file() {
215                let _ = std::fs::remove_file(&path);
216                n += 1;
217            }
218        }
219        Ok(n)
220    }
221}
222
223#[cfg(test)]
224mod tests {
225    use super::*;
226
227    fn temp_stash() -> (tempfile::TempDir, ResultStash) {
228        let dir = tempfile::tempdir().unwrap();
229        let stash_path = dir.path().join("_stash");
230        let stash = ResultStash::new(stash_path, StashConfig::default()).unwrap();
231        (dir, stash)
232    }
233
234    #[test]
235    fn insert_bytes_writes_file_and_returns_handle() {
236        let (_dir, stash) = temp_stash();
237        let handle = stash
238            .insert_bytes(1, "r".into(), StashFormat::Bytes, vec![1, 2, 3])
239            .unwrap();
240        assert!(!handle.is_empty());
241        let entry = stash.take(&handle).expect("entry should exist");
242        assert_eq!(entry.size_bytes, 3);
243        assert!(entry.sdk_owned);
244        let contents = std::fs::read(&entry.path).unwrap();
245        assert_eq!(contents, vec![1, 2, 3]);
246    }
247
248    #[test]
249    fn insert_file_records_path_without_copy() {
250        let dir = tempfile::tempdir().unwrap();
251        let plugin_file = dir.path().join("artifact.bin");
252        std::fs::write(&plugin_file, b"hello world").unwrap();
253
254        let stash_dir = dir.path().join("_stash");
255        let stash = ResultStash::new(stash_dir, StashConfig::default()).unwrap();
256        let handle = stash
257            .insert_file(7, "cap".into(), StashFormat::Bytes, plugin_file.clone())
258            .unwrap();
259
260        let entry = stash.take(&handle).unwrap();
261        assert_eq!(entry.path, plugin_file);
262        assert_eq!(entry.size_bytes, b"hello world".len() as u64);
263        assert!(!entry.sdk_owned);
264    }
265
266    #[test]
267    fn cleanup_after_pull_deletes_only_sdk_owned_files() {
268        let (_dir, stash) = temp_stash();
269        let h1 = stash
270            .insert_bytes(1, "r1".into(), StashFormat::Json, vec![9])
271            .unwrap();
272        let e1 = stash.take(&h1).unwrap();
273        assert!(e1.path.exists());
274        stash.cleanup_after_pull(&e1);
275        assert!(!e1.path.exists());
276
277        // Plugin-owned file: untouched.
278        let outer = tempfile::tempdir().unwrap();
279        let plugin_file = outer.path().join("keep.bin");
280        std::fs::write(&plugin_file, b"x").unwrap();
281        let h2 = stash
282            .insert_file(2, "r2".into(), StashFormat::Bytes, plugin_file.clone())
283            .unwrap();
284        let e2 = stash.take(&h2).unwrap();
285        stash.cleanup_after_pull(&e2);
286        assert!(
287            plugin_file.exists(),
288            "plugin-owned file must not be deleted"
289        );
290    }
291
292    #[test]
293    fn sweep_task_removes_all_entries_for_task() {
294        let (_dir, stash) = temp_stash();
295        let h1 = stash
296            .insert_bytes(5, "a".into(), StashFormat::Bytes, vec![1])
297            .unwrap();
298        let _ = stash
299            .insert_bytes(5, "b".into(), StashFormat::Bytes, vec![2])
300            .unwrap();
301        let _ = stash
302            .insert_bytes(6, "c".into(), StashFormat::Bytes, vec![3])
303            .unwrap();
304
305        let reclaimed = stash.sweep_task(5);
306        assert_eq!(reclaimed, 2);
307
308        // Handle h1 should be gone.
309        assert!(stash.take(&h1).is_none());
310    }
311
312    #[test]
313    fn sweep_expired_reclaims_old_entries() {
314        let dir = tempfile::tempdir().unwrap();
315        let stash = ResultStash::new(
316            dir.path().join("_stash"),
317            StashConfig {
318                threshold_bytes: 1024,
319                ttl: Duration::from_millis(1),
320            },
321        )
322        .unwrap();
323
324        let _ = stash
325            .insert_bytes(1, "r".into(), StashFormat::Bytes, vec![0])
326            .unwrap();
327        std::thread::sleep(Duration::from_millis(10));
328
329        let reclaimed = stash.sweep_expired();
330        assert_eq!(reclaimed, 1);
331    }
332
333    #[test]
334    fn sweep_orphans_deletes_loose_files() {
335        let dir = tempfile::tempdir().unwrap();
336        let stash_dir = dir.path().join("_stash");
337        std::fs::create_dir_all(&stash_dir).unwrap();
338        std::fs::write(stash_dir.join("orphan1.bin"), b"x").unwrap();
339        std::fs::write(stash_dir.join("orphan2.bin"), b"y").unwrap();
340
341        let n = ResultStash::sweep_orphans_on_startup(&stash_dir).unwrap();
342        assert_eq!(n, 2);
343        assert!(stash_dir.read_dir().unwrap().next().is_none());
344    }
345}