1use crate::error::{Result, SdkError};
9use crate::result::PluginResult;
10use std::path::{Path, PathBuf};
11
12use super::message::{ResultFormat, ResultKind, TaskResultMessage};
13use super::{ContextInner, lock_or_err};
14
15#[cfg(feature = "guest")]
16use crate::stash::StashFormat;
17
18pub struct ResultSink<'a> {
24 pub(super) inner: &'a ContextInner,
25}
26
27impl ResultSink<'_> {
28 pub fn push(&self, result: PluginResult) -> Result<()> {
49 let tx = {
50 let guard = lock_or_err(&self.inner.result_tx)?;
51 guard
52 .as_ref()
53 .ok_or(SdkError::InvalidContext("push called outside active task"))?
54 .clone()
55 };
56
57 let msg = self.classify_and_build(result)?;
58
59 tx.blocking_send(msg)
60 .map_err(|e| SdkError::Channel(format!("push: {e}")))?;
61 Ok(())
62 }
63
64 pub fn push_json(&self, name: &str, data: &[u8]) -> Result<()> {
66 self.push(PluginResult::Json {
67 name: name.into(),
68 data: data.to_vec(),
69 })
70 }
71
72 pub fn push_bytes(&self, name: &str, data: &[u8]) -> Result<()> {
74 self.push(PluginResult::Bytes {
75 name: name.into(),
76 data: data.to_vec(),
77 })
78 }
79
80 pub fn push_file(&self, name: &str, path: impl AsRef<Path>) -> Result<()> {
82 self.push(PluginResult::File {
83 name: name.into(),
84 path: path.as_ref().to_path_buf(),
85 })
86 }
87
88 pub fn push_all(&self, results: Vec<PluginResult>) -> Result<()> {
92 for result in results {
93 self.push(result)?;
94 }
95 Ok(())
96 }
97
98 #[cfg(feature = "guest")]
99 fn classify_and_build(&self, result: PluginResult) -> Result<TaskResultMessage> {
100 let threshold = self
101 .inner
102 .stash
103 .as_ref()
104 .map(|s| s.config().threshold_bytes)
105 .unwrap_or(usize::MAX);
106
107 match classify_result(result)? {
108 Classified::Inline { name, data, format } if data.len() < threshold => {
109 Ok(TaskResultMessage {
110 task_id: self.inner.task_id,
111 result_name: name,
112 data,
113 format,
114 is_final: false,
115 kind: ResultKind::Result,
116 stash_handle: String::new(),
117 stash_format: ResultFormat::Unspecified,
118 stash_size: 0,
119 })
120 }
121 Classified::Inline { name, data, format } => {
122 let stash = self.inner.stash.as_ref().ok_or(SdkError::InvalidContext(
123 "push: no stash configured for large payloads",
124 ))?;
125 let stash_format = result_format_to_stash(format);
126 let size_bytes = data.len() as u64;
127 let handle =
128 stash.insert_bytes(self.inner.task_id, name.clone(), stash_format, data)?;
129 Ok(build_ref_message(
130 self.inner.task_id,
131 handle,
132 name,
133 format,
134 size_bytes,
135 ))
136 }
137 Classified::File { name, path, format } => {
138 let stash = self.inner.stash.as_ref().ok_or(SdkError::InvalidContext(
139 "push: no stash configured for file results",
140 ))?;
141 if let Ok(canonical) = std::fs::canonicalize(&path)
142 && let Ok(mut guard) = self.inner.claimed_paths.lock()
143 {
144 guard.insert(canonical);
145 }
146 let stash_format = result_format_to_stash(format);
147 let handle =
148 stash.insert_file(self.inner.task_id, name.clone(), stash_format, path)?;
149 let size_bytes = stash.peek_size(&handle).unwrap_or(0);
150 Ok(build_ref_message(
151 self.inner.task_id,
152 handle,
153 name,
154 format,
155 size_bytes,
156 ))
157 }
158 }
159 }
160
161 #[cfg(not(feature = "guest"))]
162 fn classify_and_build(&self, result: PluginResult) -> Result<TaskResultMessage> {
163 let classified = classify_result(result)?;
164 match classified {
165 Classified::Inline { name, data, format } => Ok(TaskResultMessage {
166 task_id: self.inner.task_id,
167 result_name: name,
168 data,
169 format,
170 is_final: false,
171 kind: ResultKind::Result,
172 stash_handle: String::new(),
173 stash_format: ResultFormat::Unspecified,
174 stash_size: 0,
175 }),
176 Classified::File { .. } => Err(SdkError::InvalidContext(
177 "push: file results require guest feature (stash not available)",
178 )),
179 }
180 }
181}
182
183#[allow(dead_code)]
185pub(crate) enum Classified {
186 Inline {
187 name: String,
188 data: Vec<u8>,
189 format: ResultFormat,
190 },
191 File {
192 name: String,
193 path: PathBuf,
194 format: ResultFormat,
195 },
196}
197
198pub(crate) fn classify_result(result: PluginResult) -> Result<Classified> {
200 match result {
201 PluginResult::Json { name, data } => Ok(Classified::Inline {
202 name,
203 data,
204 format: ResultFormat::Json,
205 }),
206 PluginResult::Bytes { name, data } => Ok(Classified::Inline {
207 name,
208 data,
209 format: ResultFormat::Bytes,
210 }),
211 PluginResult::File { name, path } => Ok(Classified::File {
212 name,
213 path,
214 format: ResultFormat::Bytes,
215 }),
216 }
217}
218
219#[cfg(feature = "guest")]
220fn result_format_to_stash(format: ResultFormat) -> StashFormat {
221 match format {
222 ResultFormat::Json => StashFormat::Json,
223 _ => StashFormat::Bytes,
224 }
225}
226
227#[cfg(feature = "guest")]
228fn build_ref_message(
229 task_id: i32,
230 handle: String,
231 _result_name: String,
232 format: ResultFormat,
233 size_bytes: u64,
234) -> TaskResultMessage {
235 TaskResultMessage {
236 task_id,
237 result_name: String::new(),
238 data: Vec::new(),
239 format: ResultFormat::Unspecified,
240 is_final: false,
241 kind: ResultKind::ResultRef,
242 stash_handle: handle,
243 stash_format: format,
244 stash_size: size_bytes,
245 }
246}
247
248#[cfg(test)]
249mod tests {
250 use super::*;
251 use crate::context::Context;
252 use malbox_plugin_transport::traits::TransportEmitter;
253 use std::collections::HashMap;
254 use std::sync::Arc;
255
256 fn noop_emitter() -> Arc<dyn TransportEmitter + Send + Sync> {
257 Arc::new(())
258 }
259
260 #[test]
261 fn push_outside_active_task_returns_invalid_context() {
262 let ctx = Context::new(
263 0,
264 PathBuf::new(),
265 HashMap::new(),
266 noop_emitter(),
267 None,
268 #[cfg(feature = "guest")]
269 None,
270 );
271 let result = ctx.results().push(PluginResult::bytes("x", vec![1, 2]));
272 assert!(matches!(
273 result,
274 Err(SdkError::InvalidContext("push called outside active task"))
275 ));
276 }
277
278 #[test]
279 fn push_inside_active_task_sends_to_channel() {
280 let (tx, mut rx) = tokio::sync::mpsc::channel(4);
281 let ctx = Context::new(
282 42,
283 PathBuf::new(),
284 HashMap::new(),
285 noop_emitter(),
286 Some(tx),
287 #[cfg(feature = "guest")]
288 None,
289 );
290
291 ctx.results()
292 .push(PluginResult::bytes("r1", vec![1, 2, 3]))
293 .expect("push should succeed");
294
295 let msg = rx.try_recv().expect("channel should have one item");
296 assert_eq!(msg.task_id, 42);
297 assert_eq!(msg.result_name, "r1");
298 assert_eq!(msg.data, vec![1, 2, 3]);
299 assert!(!msg.is_final);
300 }
301
302 #[cfg(feature = "guest")]
303 #[test]
304 fn push_large_bytes_goes_to_stash_and_emits_ref() {
305 use crate::stash::{ResultStash, StashConfig};
306
307 let tmp = tempfile::tempdir().unwrap();
308 let stash = Arc::new(
309 ResultStash::new(
310 tmp.path().join("_stash"),
311 StashConfig {
312 threshold_bytes: 10,
313 ttl: std::time::Duration::from_secs(60),
314 },
315 )
316 .unwrap(),
317 );
318
319 let (tx, mut rx) = tokio::sync::mpsc::channel(4);
320 let ctx = Context::new(
321 7,
322 PathBuf::new(),
323 HashMap::new(),
324 noop_emitter(),
325 Some(tx),
326 Some(Arc::clone(&stash)),
327 );
328
329 ctx.results()
330 .push(PluginResult::bytes("big", vec![0u8; 64]))
331 .expect("push should succeed");
332
333 let msg = rx.try_recv().unwrap();
334 assert_eq!(msg.task_id, 7);
335 assert_eq!(msg.kind, ResultKind::ResultRef);
336 assert!(
337 msg.result_name.is_empty(),
338 "result_name lives in stash metadata, not the ref message"
339 );
340 assert!(!msg.stash_handle.is_empty());
341 assert_eq!(msg.stash_size, 64);
342 }
343
344 #[cfg(feature = "guest")]
345 #[test]
346 fn push_small_bytes_stays_inline() {
347 use crate::stash::{ResultStash, StashConfig};
348
349 let tmp = tempfile::tempdir().unwrap();
350 let stash = Arc::new(
351 ResultStash::new(
352 tmp.path().join("_stash"),
353 StashConfig {
354 threshold_bytes: 1024,
355 ttl: std::time::Duration::from_secs(60),
356 },
357 )
358 .unwrap(),
359 );
360
361 let (tx, mut rx) = tokio::sync::mpsc::channel(4);
362 let ctx = Context::new(
363 8,
364 PathBuf::new(),
365 HashMap::new(),
366 noop_emitter(),
367 Some(tx),
368 Some(Arc::clone(&stash)),
369 );
370
371 ctx.results()
372 .push(PluginResult::bytes("small", vec![1, 2, 3]))
373 .expect("push should succeed");
374
375 let msg = rx.try_recv().unwrap();
376 assert_eq!(msg.kind, ResultKind::Result);
377 assert_eq!(msg.result_name, "small");
378 assert_eq!(msg.data, vec![1, 2, 3]);
379 }
380
381 #[cfg(feature = "guest")]
382 #[test]
383 fn push_file_variant_always_goes_to_stash() {
384 use crate::stash::{ResultStash, StashConfig};
385
386 let tmp = tempfile::tempdir().unwrap();
387 let plugin_file = tmp.path().join("artifact.bin");
388 std::fs::write(&plugin_file, b"plugin-owned").unwrap();
389
390 let stash = Arc::new(
391 ResultStash::new(
392 tmp.path().join("_stash"),
393 StashConfig {
394 threshold_bytes: 1024 * 1024,
395 ttl: std::time::Duration::from_secs(60),
396 },
397 )
398 .unwrap(),
399 );
400
401 let (tx, mut rx) = tokio::sync::mpsc::channel(4);
402 let ctx = Context::new(
403 9,
404 PathBuf::new(),
405 HashMap::new(),
406 noop_emitter(),
407 Some(tx),
408 Some(Arc::clone(&stash)),
409 );
410
411 ctx.results()
412 .push(PluginResult::file("cap", plugin_file.clone()))
413 .expect("push should succeed");
414
415 let msg = rx.try_recv().unwrap();
416 assert_eq!(msg.kind, ResultKind::ResultRef);
417 assert!(plugin_file.exists());
418 }
419
420 #[test]
421 fn close_result_channel_makes_push_fail() {
422 let (tx, _rx) = tokio::sync::mpsc::channel(4);
423 let ctx = Context::new(
424 0,
425 PathBuf::new(),
426 HashMap::new(),
427 noop_emitter(),
428 Some(tx),
429 #[cfg(feature = "guest")]
430 None,
431 );
432
433 assert!(
434 ctx.results()
435 .push(PluginResult::bytes("ok", vec![1]))
436 .is_ok()
437 );
438
439 ctx.close_result_channel();
440
441 let result = ctx.results().push(PluginResult::bytes("fail", vec![2]));
442 assert!(matches!(
443 result,
444 Err(SdkError::InvalidContext("push called outside active task"))
445 ));
446 }
447
448 #[test]
449 fn result_sink_convenience_methods() {
450 let (tx, mut rx) = tokio::sync::mpsc::channel(16);
451 let ctx = Context::new(
452 1,
453 PathBuf::new(),
454 HashMap::new(),
455 noop_emitter(),
456 Some(tx),
457 #[cfg(feature = "guest")]
458 None,
459 );
460
461 ctx.results().push_json("j", b"{}").unwrap();
462 ctx.results().push_bytes("b", &[1, 2]).unwrap();
463
464 let msg1 = rx.try_recv().unwrap();
465 assert_eq!(msg1.result_name, "j");
466 assert_eq!(msg1.format, ResultFormat::Json);
467
468 let msg2 = rx.try_recv().unwrap();
469 assert_eq!(msg2.result_name, "b");
470 assert_eq!(msg2.format, ResultFormat::Bytes);
471 }
472}