malbox_plugin_sdk/runtime/guest/
stream.rs1use crate::context::message::{ResultFormat, ResultKind, TaskResultMessage};
9use crate::context::{Context, ResultSender};
10use crate::log::LogEntry;
11use crate::plugin::guest::{GuestPlugin, LaunchResult};
12use crate::stash::ResultStash;
13use malbox_plugin_transport::grpc::proto;
14use malbox_plugin_transport::plugin::GrpcEmitter;
15use malbox_plugin_transport::traits::TransportEmitter;
16use prost::Message;
17use std::collections::HashMap;
18use std::path::PathBuf;
19use std::sync::Arc;
20use tokio::sync::mpsc;
21use tonic::Status;
22use tracing::{debug, error, info, instrument};
23
24use super::collector::{self, AutoCollectSection};
25
26pub(super) struct TaskExecution {
29 pub task_id: i32,
30 pub sample_path: PathBuf,
31 pub config: HashMap<String, String>,
32 pub result_tx: ResultSender,
33 pub proto_tx: mpsc::Sender<std::result::Result<proto::TaskResult, Status>>,
34 pub stash: Arc<ResultStash>,
35 pub artifact_dir: PathBuf,
36 pub external_log_dir: PathBuf,
37 pub auto_collect_artifacts: AutoCollectSection,
38 pub auto_collect_external_logs: AutoCollectSection,
39 pub default_timeout: u64,
40}
41
42pub(super) fn message_to_proto(msg: TaskResultMessage, _stash: &ResultStash) -> proto::TaskResult {
44 match msg.kind {
45 ResultKind::ResultRef => {
46 let ref_msg = proto::ResultRef {
47 handle: msg.stash_handle,
48 result_name: msg.result_name,
49 format: result_format_to_proto(msg.stash_format).into(),
50 size_bytes: msg.stash_size,
51 };
52 proto::TaskResult {
53 task_id: msg.task_id,
54 result_name: String::new(),
55 data: ref_msg.encode_to_vec(),
56 format: proto::ResultFormat::Unspecified.into(),
57 is_final: msg.is_final,
58 kind: proto::ResultKind::ResultRef.into(),
59 }
60 }
61 ResultKind::Progress => proto::TaskResult {
62 task_id: msg.task_id,
63 result_name: msg.result_name,
64 data: msg.data,
65 format: result_format_to_proto(msg.format).into(),
66 is_final: msg.is_final,
67 kind: proto::ResultKind::Progress.into(),
68 },
69 ResultKind::Result => proto::TaskResult {
70 task_id: msg.task_id,
71 result_name: msg.result_name,
72 data: msg.data,
73 format: result_format_to_proto(msg.format).into(),
74 is_final: msg.is_final,
75 kind: proto::ResultKind::Result.into(),
76 },
77 }
78}
79
80fn result_format_to_proto(format: ResultFormat) -> proto::ResultFormat {
81 match format {
82 ResultFormat::Json => proto::ResultFormat::Json,
83 ResultFormat::Bytes => proto::ResultFormat::Bytes,
84 ResultFormat::Unspecified => proto::ResultFormat::Unspecified,
85 }
86}
87
88fn send_final_marker(
90 task_id: i32,
91 proto_tx: &mpsc::Sender<std::result::Result<proto::TaskResult, Status>>,
92) {
93 let final_marker = proto::TaskResult {
94 task_id,
95 result_name: String::new(),
96 data: vec![],
97 format: proto::ResultFormat::Unspecified.into(),
98 is_final: true,
99 kind: proto::ResultKind::Result.into(),
100 };
101 let _ = proto_tx.blocking_send(Ok(final_marker));
102}
103
104#[instrument(skip_all, fields(task_id = exec.task_id))]
116pub(super) fn guest_linear_task<P: GuestPlugin>(plugin: Arc<P>, exec: TaskExecution) {
117 let TaskExecution {
118 task_id,
119 sample_path,
120 config,
121 result_tx,
122 proto_tx,
123 stash,
124 artifact_dir,
125 external_log_dir,
126 auto_collect_artifacts,
127 auto_collect_external_logs,
128 default_timeout,
129 } = exec;
130
131 let ready_result = proto::TaskResult {
132 task_id,
133 result_name: String::new(),
134 data: vec![],
135 format: proto::ResultFormat::Unspecified.into(),
136 is_final: false,
137 kind: proto::ResultKind::Ready.into(),
138 };
139 if let Err(e) = proto_tx.blocking_send(Ok(ready_result)) {
140 error!(error = %e, "Failed to send READY signal");
141 return;
142 }
143
144 let emitter: Arc<dyn TransportEmitter + Send + Sync> =
145 Arc::new(GrpcEmitter::with_task(task_id, proto_tx.clone()));
146
147 let ctx = Context::new(
148 task_id,
149 sample_path.clone(),
150 config.clone(),
151 emitter,
152 Some(result_tx.clone()),
153 Some(Arc::clone(&stash)),
154 );
155
156 if let Err(e) = plugin.on_start(&ctx) {
157 error!(error = %e, "GuestPlugin on_start failed");
158 send_final_marker(task_id, &proto_tx);
159 return;
160 }
161
162 info!(path = %sample_path.display(), "Calling execute_sample");
163 match plugin.execute_sample(&sample_path) {
164 Ok(LaunchResult::Launched) => {
165 info!(path = %sample_path.display(), "Sample launched successfully");
166 }
167 Ok(LaunchResult::UseDefault) => {
168 info!(path = %sample_path.display(), "Plugin returned UseDefault, using default launcher");
169 let result = crate::plugin::guest::default_launch(&sample_path);
170 match result {
171 LaunchResult::Launched => {
172 info!(path = %sample_path.display(), "Default launch succeeded");
173 }
174 LaunchResult::UseDefault => {
175 error!(path = %sample_path.display(), "Default launch also returned UseDefault");
176 }
177 }
178 }
179 Err(e) => {
180 error!(path = %sample_path.display(), error = %e, "execute_sample failed");
181 }
182 }
183
184 let timeout = config
185 .get("analysis_timeout")
186 .and_then(|v| v.parse::<u64>().ok())
187 .unwrap_or(default_timeout);
188 info!(timeout_secs = timeout, "Waiting for analysis timeout");
189 std::thread::sleep(std::time::Duration::from_secs(timeout));
190
191 if let Err(e) = plugin.on_stop(&ctx) {
192 error!(error = %e, "GuestPlugin on_stop failed");
193 }
194
195 if auto_collect_artifacts.enabled {
196 debug!("auto-collecting artifacts from {}", artifact_dir.display());
197 let claimed = ctx.claimed_paths();
198 collector::auto_collect(
199 &ctx,
200 &artifact_dir,
201 &auto_collect_artifacts,
202 Some(&claimed),
203 "artifacts",
204 );
205 }
206
207 if auto_collect_external_logs.enabled {
208 debug!(
209 "auto-collecting external logs from {}",
210 external_log_dir.display()
211 );
212 collector::auto_collect(
213 &ctx,
214 &external_log_dir,
215 &auto_collect_external_logs,
216 None,
217 "ext-logs",
218 );
219 }
220
221 drop(result_tx);
223
224 send_final_marker(task_id, &proto_tx);
225}
226
227pub(super) fn log_entry_to_proto(entry: LogEntry) -> proto::LogEntry {
229 proto::LogEntry {
230 timestamp_ns: entry.timestamp_ns,
231 level: entry.level as i32,
232 target: entry.target,
233 message: entry.message,
234 fields: entry.fields,
235 }
236}