Skip to main content

malbox_plugin_sdk/runtime/guest/
stream.rs

1//! Result streaming + log entry conversion for the guest runtime.
2//!
3//! The guest linear task runs a [`GuestPlugin`]'s lifecycle sequence
4//! (`on_start` -> `execute_sample` -> wait -> `on_stop`) and streams
5//! results back over an mpsc channel. Auto-collection of artifacts and
6//! external logs is handled after `on_stop`.
7
8use crate::context::message::{ResultFormat, ResultKind, TaskResultMessage};
9use crate::context::{Context, ResultSender};
10use crate::log::LogEntry;
11use crate::plugin::guest::{GuestPlugin, LaunchResult};
12use crate::stash::ResultStash;
13use malbox_plugin_transport::grpc::proto;
14use malbox_plugin_transport::plugin::GrpcEmitter;
15use malbox_plugin_transport::traits::TransportEmitter;
16use prost::Message;
17use std::collections::HashMap;
18use std::path::PathBuf;
19use std::sync::Arc;
20use tokio::sync::mpsc;
21use tonic::Status;
22use tracing::{debug, error, info, instrument};
23
24use super::collector::{self, AutoCollectSection};
25
26/// All parameters for a single guest task execution (everything except the
27/// generic plugin instance).
28pub(super) struct TaskExecution {
29    pub task_id: i32,
30    pub sample_path: PathBuf,
31    pub config: HashMap<String, String>,
32    pub result_tx: ResultSender,
33    pub proto_tx: mpsc::Sender<std::result::Result<proto::TaskResult, Status>>,
34    pub stash: Arc<ResultStash>,
35    pub artifact_dir: PathBuf,
36    pub external_log_dir: PathBuf,
37    pub auto_collect_artifacts: AutoCollectSection,
38    pub auto_collect_external_logs: AutoCollectSection,
39    pub default_timeout: u64,
40}
41
42/// Convert a [`TaskResultMessage`] to the protobuf `TaskResult` representation.
43pub(super) fn message_to_proto(msg: TaskResultMessage, _stash: &ResultStash) -> proto::TaskResult {
44    match msg.kind {
45        ResultKind::ResultRef => {
46            let ref_msg = proto::ResultRef {
47                handle: msg.stash_handle,
48                result_name: msg.result_name,
49                format: result_format_to_proto(msg.stash_format).into(),
50                size_bytes: msg.stash_size,
51            };
52            proto::TaskResult {
53                task_id: msg.task_id,
54                result_name: String::new(),
55                data: ref_msg.encode_to_vec(),
56                format: proto::ResultFormat::Unspecified.into(),
57                is_final: msg.is_final,
58                kind: proto::ResultKind::ResultRef.into(),
59            }
60        }
61        ResultKind::Progress => proto::TaskResult {
62            task_id: msg.task_id,
63            result_name: msg.result_name,
64            data: msg.data,
65            format: result_format_to_proto(msg.format).into(),
66            is_final: msg.is_final,
67            kind: proto::ResultKind::Progress.into(),
68        },
69        ResultKind::Result => proto::TaskResult {
70            task_id: msg.task_id,
71            result_name: msg.result_name,
72            data: msg.data,
73            format: result_format_to_proto(msg.format).into(),
74            is_final: msg.is_final,
75            kind: proto::ResultKind::Result.into(),
76        },
77    }
78}
79
80fn result_format_to_proto(format: ResultFormat) -> proto::ResultFormat {
81    match format {
82        ResultFormat::Json => proto::ResultFormat::Json,
83        ResultFormat::Bytes => proto::ResultFormat::Bytes,
84        ResultFormat::Unspecified => proto::ResultFormat::Unspecified,
85    }
86}
87
88/// Send the final-marker `TaskResult` that signals end-of-stream to the daemon.
89fn send_final_marker(
90    task_id: i32,
91    proto_tx: &mpsc::Sender<std::result::Result<proto::TaskResult, Status>>,
92) {
93    let final_marker = proto::TaskResult {
94        task_id,
95        result_name: String::new(),
96        data: vec![],
97        format: proto::ResultFormat::Unspecified.into(),
98        is_final: true,
99        kind: proto::ResultKind::Result.into(),
100    };
101    let _ = proto_tx.blocking_send(Ok(final_marker));
102}
103
104/// Run a [`GuestPlugin`]'s linear lifecycle and stream results back over
105/// the task's result channel.
106///
107/// The sequence is:
108/// 1. Send READY signal
109/// 2. `on_start` - plugin sets up monitoring
110/// 3. `execute_sample` - SDK launches the sample
111/// 4. Wait for analysis timeout
112/// 5. `on_stop` - plugin flushes results
113/// 6. Auto-collect artifacts and external logs
114/// 7. Send final marker
115#[instrument(skip_all, fields(task_id = exec.task_id))]
116pub(super) fn guest_linear_task<P: GuestPlugin>(plugin: Arc<P>, exec: TaskExecution) {
117    let TaskExecution {
118        task_id,
119        sample_path,
120        config,
121        result_tx,
122        proto_tx,
123        stash,
124        artifact_dir,
125        external_log_dir,
126        auto_collect_artifacts,
127        auto_collect_external_logs,
128        default_timeout,
129    } = exec;
130
131    let ready_result = proto::TaskResult {
132        task_id,
133        result_name: String::new(),
134        data: vec![],
135        format: proto::ResultFormat::Unspecified.into(),
136        is_final: false,
137        kind: proto::ResultKind::Ready.into(),
138    };
139    if let Err(e) = proto_tx.blocking_send(Ok(ready_result)) {
140        error!(error = %e, "Failed to send READY signal");
141        return;
142    }
143
144    let emitter: Arc<dyn TransportEmitter + Send + Sync> =
145        Arc::new(GrpcEmitter::with_task(task_id, proto_tx.clone()));
146
147    let ctx = Context::new(
148        task_id,
149        sample_path.clone(),
150        config.clone(),
151        emitter,
152        Some(result_tx.clone()),
153        Some(Arc::clone(&stash)),
154    );
155
156    if let Err(e) = plugin.on_start(&ctx) {
157        error!(error = %e, "GuestPlugin on_start failed");
158        send_final_marker(task_id, &proto_tx);
159        return;
160    }
161
162    info!(path = %sample_path.display(), "Calling execute_sample");
163    match plugin.execute_sample(&sample_path) {
164        Ok(LaunchResult::Launched) => {
165            info!(path = %sample_path.display(), "Sample launched successfully");
166        }
167        Ok(LaunchResult::UseDefault) => {
168            info!(path = %sample_path.display(), "Plugin returned UseDefault, using default launcher");
169            let result = crate::plugin::guest::default_launch(&sample_path);
170            match result {
171                LaunchResult::Launched => {
172                    info!(path = %sample_path.display(), "Default launch succeeded");
173                }
174                LaunchResult::UseDefault => {
175                    error!(path = %sample_path.display(), "Default launch also returned UseDefault");
176                }
177            }
178        }
179        Err(e) => {
180            error!(path = %sample_path.display(), error = %e, "execute_sample failed");
181        }
182    }
183
184    let timeout = config
185        .get("analysis_timeout")
186        .and_then(|v| v.parse::<u64>().ok())
187        .unwrap_or(default_timeout);
188    info!(timeout_secs = timeout, "Waiting for analysis timeout");
189    std::thread::sleep(std::time::Duration::from_secs(timeout));
190
191    if let Err(e) = plugin.on_stop(&ctx) {
192        error!(error = %e, "GuestPlugin on_stop failed");
193    }
194
195    if auto_collect_artifacts.enabled {
196        debug!("auto-collecting artifacts from {}", artifact_dir.display());
197        let claimed = ctx.claimed_paths();
198        collector::auto_collect(
199            &ctx,
200            &artifact_dir,
201            &auto_collect_artifacts,
202            Some(&claimed),
203            "artifacts",
204        );
205    }
206
207    if auto_collect_external_logs.enabled {
208        debug!(
209            "auto-collecting external logs from {}",
210            external_log_dir.display()
211        );
212        collector::auto_collect(
213            &ctx,
214            &external_log_dir,
215            &auto_collect_external_logs,
216            None,
217            "ext-logs",
218        );
219    }
220
221    // Drain the internal result channel and convert to proto
222    drop(result_tx);
223
224    send_final_marker(task_id, &proto_tx);
225}
226
227/// Convert a SDK [`LogEntry`] to the protobuf [`LogEntry`](proto::LogEntry) representation.
228pub(super) fn log_entry_to_proto(entry: LogEntry) -> proto::LogEntry {
229    proto::LogEntry {
230        timestamp_ns: entry.timestamp_ns,
231        level: entry.level as i32,
232        target: entry.target,
233        message: entry.message,
234        fields: entry.fields,
235    }
236}