Skip to main content

malbox_plugin_sdk/context/
result_sink.rs

1//! Result pushing during task execution.
2//!
3//! [`ResultSink`] is the plugin-facing API for sending results back to
4//! the daemon. Small results travel inline on the control stream; large
5//! ones are stashed to disk and replaced with a reference the daemon pulls
6//! separately (guest plugins only).
7
8use crate::error::{Result, SdkError};
9use crate::result::PluginResult;
10use std::path::{Path, PathBuf};
11
12use super::message::{ResultFormat, ResultKind, TaskResultMessage};
13use super::{ContextInner, lock_or_err};
14
15#[cfg(feature = "guest")]
16use crate::stash::StashFormat;
17
18/// Handle for pushing results back to the daemon during task execution.
19///
20/// Obtained from [`Context::results()`](super::Context::results). Only
21/// usable inside an `on_task` handler; calling [`push`](ResultSink::push)
22/// outside of one returns an error.
23pub struct ResultSink<'a> {
24    pub(super) inner: &'a ContextInner,
25}
26
27impl ResultSink<'_> {
28    /// Push a single result back to the daemon.
29    ///
30    /// Small results (below the stash threshold) travel inline on the
31    /// control stream. Large byte/JSON payloads are written to the disk
32    /// stash and replaced with a reference marker that the daemon pulls
33    /// via the `PullResult` RPC (guest plugins only).
34    ///
35    /// # Panics
36    ///
37    /// Uses `blocking_send` internally. Must be called from a blocking
38    /// context (e.g. `spawn_blocking`), **not** from within an async task.
39    ///
40    /// # Errors
41    ///
42    /// - Returns `SdkError::InvalidContext` if called outside an `on_task` handler
43    ///   (when the context has no result channel).
44    /// - Returns `SdkError::Channel` if the result channel is closed (typically
45    ///   because the daemon disconnected).
46    /// - Returns `SdkError::Io` if the result is a `File` variant and the file
47    ///   cannot be read.
48    pub fn push(&self, result: PluginResult) -> Result<()> {
49        let tx = {
50            let guard = lock_or_err(&self.inner.result_tx)?;
51            guard
52                .as_ref()
53                .ok_or(SdkError::InvalidContext("push called outside active task"))?
54                .clone()
55        };
56
57        let msg = self.classify_and_build(result)?;
58
59        tx.blocking_send(msg)
60            .map_err(|e| SdkError::Channel(format!("push: {e}")))?;
61        Ok(())
62    }
63
64    /// Push a JSON result (raw bytes that are already JSON-encoded).
65    pub fn push_json(&self, name: &str, data: &[u8]) -> Result<()> {
66        self.push(PluginResult::Json {
67            name: name.into(),
68            data: data.to_vec(),
69        })
70    }
71
72    /// Push a raw byte result.
73    pub fn push_bytes(&self, name: &str, data: &[u8]) -> Result<()> {
74        self.push(PluginResult::Bytes {
75            name: name.into(),
76            data: data.to_vec(),
77        })
78    }
79
80    /// Push a file result by path.
81    pub fn push_file(&self, name: &str, path: impl AsRef<Path>) -> Result<()> {
82        self.push(PluginResult::File {
83            name: name.into(),
84            path: path.as_ref().to_path_buf(),
85        })
86    }
87
88    /// Push a batch of results. Results are sent one at a time in order;
89    /// if one fails, the remaining results are not sent and the error is
90    /// returned. Results sent before the failure are not rolled back.
91    pub fn push_all(&self, results: Vec<PluginResult>) -> Result<()> {
92        for result in results {
93            self.push(result)?;
94        }
95        Ok(())
96    }
97
98    #[cfg(feature = "guest")]
99    fn classify_and_build(&self, result: PluginResult) -> Result<TaskResultMessage> {
100        let threshold = self
101            .inner
102            .stash
103            .as_ref()
104            .map(|s| s.config().threshold_bytes)
105            .unwrap_or(usize::MAX);
106
107        match classify_result(result)? {
108            Classified::Inline { name, data, format } if data.len() < threshold => {
109                Ok(TaskResultMessage {
110                    task_id: self.inner.task_id,
111                    result_name: name,
112                    data,
113                    format,
114                    is_final: false,
115                    kind: ResultKind::Result,
116                    stash_handle: String::new(),
117                    stash_format: ResultFormat::Unspecified,
118                    stash_size: 0,
119                })
120            }
121            Classified::Inline { name, data, format } => {
122                let stash = self.inner.stash.as_ref().ok_or(SdkError::InvalidContext(
123                    "push: no stash configured for large payloads",
124                ))?;
125                let stash_format = result_format_to_stash(format);
126                let size_bytes = data.len() as u64;
127                let handle =
128                    stash.insert_bytes(self.inner.task_id, name.clone(), stash_format, data)?;
129                Ok(build_ref_message(
130                    self.inner.task_id,
131                    handle,
132                    name,
133                    format,
134                    size_bytes,
135                ))
136            }
137            Classified::File { name, path, format } => {
138                let stash = self.inner.stash.as_ref().ok_or(SdkError::InvalidContext(
139                    "push: no stash configured for file results",
140                ))?;
141                if let Ok(canonical) = std::fs::canonicalize(&path)
142                    && let Ok(mut guard) = self.inner.claimed_paths.lock()
143                {
144                    guard.insert(canonical);
145                }
146                let stash_format = result_format_to_stash(format);
147                let handle =
148                    stash.insert_file(self.inner.task_id, name.clone(), stash_format, path)?;
149                let size_bytes = stash.peek_size(&handle).unwrap_or(0);
150                Ok(build_ref_message(
151                    self.inner.task_id,
152                    handle,
153                    name,
154                    format,
155                    size_bytes,
156                ))
157            }
158        }
159    }
160
161    #[cfg(not(feature = "guest"))]
162    fn classify_and_build(&self, result: PluginResult) -> Result<TaskResultMessage> {
163        let classified = classify_result(result)?;
164        match classified {
165            Classified::Inline { name, data, format } => Ok(TaskResultMessage {
166                task_id: self.inner.task_id,
167                result_name: name,
168                data,
169                format,
170                is_final: false,
171                kind: ResultKind::Result,
172                stash_handle: String::new(),
173                stash_format: ResultFormat::Unspecified,
174                stash_size: 0,
175            }),
176            Classified::File { .. } => Err(SdkError::InvalidContext(
177                "push: file results require guest feature (stash not available)",
178            )),
179        }
180    }
181}
182
183/// Result of classifying a `PluginResult` for transport.
184#[allow(dead_code)]
185pub(crate) enum Classified {
186    Inline {
187        name: String,
188        data: Vec<u8>,
189        format: ResultFormat,
190    },
191    File {
192        name: String,
193        path: PathBuf,
194        format: ResultFormat,
195    },
196}
197
198/// Classify a `PluginResult` into inline bytes vs a file path.
199pub(crate) fn classify_result(result: PluginResult) -> Result<Classified> {
200    match result {
201        PluginResult::Json { name, data } => Ok(Classified::Inline {
202            name,
203            data,
204            format: ResultFormat::Json,
205        }),
206        PluginResult::Bytes { name, data } => Ok(Classified::Inline {
207            name,
208            data,
209            format: ResultFormat::Bytes,
210        }),
211        PluginResult::File { name, path } => Ok(Classified::File {
212            name,
213            path,
214            format: ResultFormat::Bytes,
215        }),
216    }
217}
218
219#[cfg(feature = "guest")]
220fn result_format_to_stash(format: ResultFormat) -> StashFormat {
221    match format {
222        ResultFormat::Json => StashFormat::Json,
223        _ => StashFormat::Bytes,
224    }
225}
226
227#[cfg(feature = "guest")]
228fn build_ref_message(
229    task_id: i32,
230    handle: String,
231    _result_name: String,
232    format: ResultFormat,
233    size_bytes: u64,
234) -> TaskResultMessage {
235    TaskResultMessage {
236        task_id,
237        result_name: String::new(),
238        data: Vec::new(),
239        format: ResultFormat::Unspecified,
240        is_final: false,
241        kind: ResultKind::ResultRef,
242        stash_handle: handle,
243        stash_format: format,
244        stash_size: size_bytes,
245    }
246}
247
248#[cfg(test)]
249mod tests {
250    use super::*;
251    use crate::context::Context;
252    use malbox_plugin_transport::traits::TransportEmitter;
253    use std::collections::HashMap;
254    use std::sync::Arc;
255
256    fn noop_emitter() -> Arc<dyn TransportEmitter + Send + Sync> {
257        Arc::new(())
258    }
259
260    #[test]
261    fn push_outside_active_task_returns_invalid_context() {
262        let ctx = Context::new(
263            0,
264            PathBuf::new(),
265            HashMap::new(),
266            noop_emitter(),
267            None,
268            #[cfg(feature = "guest")]
269            None,
270        );
271        let result = ctx.results().push(PluginResult::bytes("x", vec![1, 2]));
272        assert!(matches!(
273            result,
274            Err(SdkError::InvalidContext("push called outside active task"))
275        ));
276    }
277
278    #[test]
279    fn push_inside_active_task_sends_to_channel() {
280        let (tx, mut rx) = tokio::sync::mpsc::channel(4);
281        let ctx = Context::new(
282            42,
283            PathBuf::new(),
284            HashMap::new(),
285            noop_emitter(),
286            Some(tx),
287            #[cfg(feature = "guest")]
288            None,
289        );
290
291        ctx.results()
292            .push(PluginResult::bytes("r1", vec![1, 2, 3]))
293            .expect("push should succeed");
294
295        let msg = rx.try_recv().expect("channel should have one item");
296        assert_eq!(msg.task_id, 42);
297        assert_eq!(msg.result_name, "r1");
298        assert_eq!(msg.data, vec![1, 2, 3]);
299        assert!(!msg.is_final);
300    }
301
302    #[cfg(feature = "guest")]
303    #[test]
304    fn push_large_bytes_goes_to_stash_and_emits_ref() {
305        use crate::stash::{ResultStash, StashConfig};
306
307        let tmp = tempfile::tempdir().unwrap();
308        let stash = Arc::new(
309            ResultStash::new(
310                tmp.path().join("_stash"),
311                StashConfig {
312                    threshold_bytes: 10,
313                    ttl: std::time::Duration::from_secs(60),
314                },
315            )
316            .unwrap(),
317        );
318
319        let (tx, mut rx) = tokio::sync::mpsc::channel(4);
320        let ctx = Context::new(
321            7,
322            PathBuf::new(),
323            HashMap::new(),
324            noop_emitter(),
325            Some(tx),
326            Some(Arc::clone(&stash)),
327        );
328
329        ctx.results()
330            .push(PluginResult::bytes("big", vec![0u8; 64]))
331            .expect("push should succeed");
332
333        let msg = rx.try_recv().unwrap();
334        assert_eq!(msg.task_id, 7);
335        assert_eq!(msg.kind, ResultKind::ResultRef);
336        assert!(
337            msg.result_name.is_empty(),
338            "result_name lives in stash metadata, not the ref message"
339        );
340        assert!(!msg.stash_handle.is_empty());
341        assert_eq!(msg.stash_size, 64);
342    }
343
344    #[cfg(feature = "guest")]
345    #[test]
346    fn push_small_bytes_stays_inline() {
347        use crate::stash::{ResultStash, StashConfig};
348
349        let tmp = tempfile::tempdir().unwrap();
350        let stash = Arc::new(
351            ResultStash::new(
352                tmp.path().join("_stash"),
353                StashConfig {
354                    threshold_bytes: 1024,
355                    ttl: std::time::Duration::from_secs(60),
356                },
357            )
358            .unwrap(),
359        );
360
361        let (tx, mut rx) = tokio::sync::mpsc::channel(4);
362        let ctx = Context::new(
363            8,
364            PathBuf::new(),
365            HashMap::new(),
366            noop_emitter(),
367            Some(tx),
368            Some(Arc::clone(&stash)),
369        );
370
371        ctx.results()
372            .push(PluginResult::bytes("small", vec![1, 2, 3]))
373            .expect("push should succeed");
374
375        let msg = rx.try_recv().unwrap();
376        assert_eq!(msg.kind, ResultKind::Result);
377        assert_eq!(msg.result_name, "small");
378        assert_eq!(msg.data, vec![1, 2, 3]);
379    }
380
381    #[cfg(feature = "guest")]
382    #[test]
383    fn push_file_variant_always_goes_to_stash() {
384        use crate::stash::{ResultStash, StashConfig};
385
386        let tmp = tempfile::tempdir().unwrap();
387        let plugin_file = tmp.path().join("artifact.bin");
388        std::fs::write(&plugin_file, b"plugin-owned").unwrap();
389
390        let stash = Arc::new(
391            ResultStash::new(
392                tmp.path().join("_stash"),
393                StashConfig {
394                    threshold_bytes: 1024 * 1024,
395                    ttl: std::time::Duration::from_secs(60),
396                },
397            )
398            .unwrap(),
399        );
400
401        let (tx, mut rx) = tokio::sync::mpsc::channel(4);
402        let ctx = Context::new(
403            9,
404            PathBuf::new(),
405            HashMap::new(),
406            noop_emitter(),
407            Some(tx),
408            Some(Arc::clone(&stash)),
409        );
410
411        ctx.results()
412            .push(PluginResult::file("cap", plugin_file.clone()))
413            .expect("push should succeed");
414
415        let msg = rx.try_recv().unwrap();
416        assert_eq!(msg.kind, ResultKind::ResultRef);
417        assert!(plugin_file.exists());
418    }
419
420    #[test]
421    fn close_result_channel_makes_push_fail() {
422        let (tx, _rx) = tokio::sync::mpsc::channel(4);
423        let ctx = Context::new(
424            0,
425            PathBuf::new(),
426            HashMap::new(),
427            noop_emitter(),
428            Some(tx),
429            #[cfg(feature = "guest")]
430            None,
431        );
432
433        assert!(
434            ctx.results()
435                .push(PluginResult::bytes("ok", vec![1]))
436                .is_ok()
437        );
438
439        ctx.close_result_channel();
440
441        let result = ctx.results().push(PluginResult::bytes("fail", vec![2]));
442        assert!(matches!(
443            result,
444            Err(SdkError::InvalidContext("push called outside active task"))
445        ));
446    }
447
448    #[test]
449    fn result_sink_convenience_methods() {
450        let (tx, mut rx) = tokio::sync::mpsc::channel(16);
451        let ctx = Context::new(
452            1,
453            PathBuf::new(),
454            HashMap::new(),
455            noop_emitter(),
456            Some(tx),
457            #[cfg(feature = "guest")]
458            None,
459        );
460
461        ctx.results().push_json("j", b"{}").unwrap();
462        ctx.results().push_bytes("b", &[1, 2]).unwrap();
463
464        let msg1 = rx.try_recv().unwrap();
465        assert_eq!(msg1.result_name, "j");
466        assert_eq!(msg1.format, ResultFormat::Json);
467
468        let msg2 = rx.try_recv().unwrap();
469        assert_eq!(msg2.result_name, "b");
470        assert_eq!(msg2.format, ResultFormat::Bytes);
471    }
472}