1pub(crate) mod collector;
14mod files;
15mod stream;
16
17use collector::AutoCollectSection;
18
19use crate::error::{Result, SdkError};
20use crate::log::LogBus;
21use crate::plugin::guest::GuestPlugin;
22use crate::stash::{ResultStash, StashConfig};
23
24use malbox_plugin_transport::grpc::proto;
25use malbox_plugin_transport::plugin::{
26 FileChunkStream, GuestPluginService, GuestPluginServiceServer, LogEntryStream,
27 ResultChunkStream, TaskResultStream,
28};
29
30use std::net::SocketAddr;
31use std::path::PathBuf;
32use std::sync::Arc;
33
34use tokio::sync::{mpsc, oneshot};
35use tokio_stream::wrappers::ReceiverStream;
36use tonic::{Request, Response, Status};
37use tracing::{info, warn};
38
39#[derive(Debug, Clone, Copy)]
44pub struct AutoCollectRuntimeConfig {
45 pub enabled: bool,
47 pub include: &'static [&'static str],
49 pub exclude: &'static [&'static str],
51 pub max_file_size: u64,
53}
54
55#[derive(Debug, Clone, Copy)]
61pub struct GuestRuntimeConfig {
62 pub listen_addr: SocketAddr,
64 pub sample_dir: &'static str,
66 pub artifact_dir: &'static str,
68 pub stash_dir: &'static str,
70 pub log_dir: &'static str,
72 pub external_log_dir: &'static str,
74 pub stash_threshold_bytes: usize,
76 pub stash_ttl_secs: u64,
78 pub log_filter: &'static str,
80 pub analysis_timeout: u64,
82 pub auto_collect_artifacts: AutoCollectRuntimeConfig,
84 pub auto_collect_external_logs: AutoCollectRuntimeConfig,
86}
87
88pub fn sweep_log_overflow_orphans(log_dir: &std::path::Path) {
91 use std::time::{Duration, SystemTime};
92
93 let grace = Duration::from_secs(10 * 60);
94 let now = SystemTime::now();
95
96 let read_dir = match std::fs::read_dir(log_dir) {
97 Ok(d) => d,
98 Err(_) => return,
99 };
100
101 for entry in read_dir.flatten() {
102 let path = entry.path();
103 let name = match path.file_name().and_then(|n| n.to_str()) {
104 Some(n) => n,
105 None => continue,
106 };
107 if !name.ends_with(".overflow.jsonl") {
108 continue;
109 }
110 let modified = entry
111 .metadata()
112 .and_then(|m| m.modified())
113 .unwrap_or(SystemTime::UNIX_EPOCH);
114 let age = now.duration_since(modified).unwrap_or(Duration::ZERO);
115 if age > grace {
116 let _ = std::fs::remove_file(&path);
117 }
118 }
119}
120
121struct GuestService<P: Send + Sync + 'static> {
127 plugin: Arc<P>,
128 shutdown_tx: std::sync::Mutex<Option<oneshot::Sender<()>>>,
129 sample_dir: PathBuf,
130 artifact_dir: PathBuf,
131 external_log_dir: PathBuf,
132 log_bus: Arc<LogBus>,
133 stash: Arc<ResultStash>,
134 auto_collect_artifacts: AutoCollectSection,
135 auto_collect_external_logs: AutoCollectSection,
136 default_timeout: u64,
137}
138
139#[tonic::async_trait]
140impl<P: GuestPlugin> GuestPluginService for GuestService<P> {
141 type ExecuteTaskStream = TaskResultStream;
142 type PullFileStream = FileChunkStream;
143 type StreamLogsStream = LogEntryStream;
144 type PullResultStream = ResultChunkStream;
145
146 async fn initialize(
149 &self,
150 _request: Request<proto::InitializeRequest>,
151 ) -> std::result::Result<Response<proto::InitializeResponse>, Status> {
152 Ok(Response::new(proto::InitializeResponse {
153 success: true,
154 error_message: String::new(),
155 capabilities: vec![],
156 }))
157 }
158
159 async fn health_check(
160 &self,
161 _request: Request<proto::HealthCheckRequest>,
162 ) -> std::result::Result<Response<proto::HealthCheckResponse>, Status> {
163 let plugin = self.plugin.clone();
164 let (ready, reason) = tokio::task::spawn_blocking(move || {
165 let status = plugin.health_check();
166 (status.is_ready(), status.reason().to_owned())
167 })
168 .await
169 .unwrap_or((false, "health check panicked".to_string()));
170 Ok(Response::new(proto::HealthCheckResponse { ready, reason }))
171 }
172
173 async fn shutdown(
174 &self,
175 _request: Request<proto::ShutdownRequest>,
176 ) -> std::result::Result<Response<proto::ShutdownResponse>, Status> {
177 if let Ok(mut guard) = self.shutdown_tx.lock()
178 && let Some(tx) = guard.take()
179 {
180 let _ = tx.send(());
181 }
182 Ok(Response::new(proto::ShutdownResponse {
183 acknowledged: true,
184 }))
185 }
186
187 async fn execute_task(
190 &self,
191 request: Request<proto::TaskRequest>,
192 ) -> std::result::Result<Response<Self::ExecuteTaskStream>, Status> {
193 let req = request.into_inner();
194 let (proto_tx, proto_rx) =
195 mpsc::channel::<std::result::Result<proto::TaskResult, Status>>(32);
196 let (result_tx, mut result_rx) =
197 mpsc::channel::<crate::context::message::TaskResultMessage>(32);
198
199 let plugin = self.plugin.clone();
200 let full_sample_path = if std::path::Path::new(&req.sample_path).is_relative()
201 && !req.sample_path.is_empty()
202 {
203 self.sample_dir.join(&req.sample_path)
204 } else {
205 PathBuf::from(&req.sample_path)
206 };
207
208 let stash = Arc::clone(&self.stash);
209 let artifact_dir = self.artifact_dir.clone();
210 let external_log_dir = self.external_log_dir.clone();
211 let auto_collect_artifacts = self.auto_collect_artifacts.clone();
212 let auto_collect_external_logs = self.auto_collect_external_logs.clone();
213 let default_timeout = self.default_timeout;
214
215 let relay_stash = Arc::clone(&stash);
217 let relay_tx = proto_tx.clone();
218 tokio::spawn(async move {
219 while let Some(msg) = result_rx.recv().await {
220 let proto_msg = stream::message_to_proto(msg, &relay_stash);
221 if relay_tx.send(Ok(proto_msg)).await.is_err() {
222 break;
223 }
224 }
225 });
226
227 tokio::task::spawn_blocking(move || {
228 stream::guest_linear_task(
229 plugin,
230 stream::TaskExecution {
231 task_id: req.task_id,
232 sample_path: full_sample_path,
233 config: req.config,
234 result_tx,
235 proto_tx,
236 stash,
237 artifact_dir,
238 external_log_dir,
239 auto_collect_artifacts,
240 auto_collect_external_logs,
241 default_timeout,
242 },
243 );
244 });
245
246 let stream: TaskResultStream = Box::pin(ReceiverStream::new(proto_rx));
247 Ok(Response::new(stream))
248 }
249
250 async fn push_file(
253 &self,
254 request: Request<tonic::Streaming<proto::FileChunk>>,
255 ) -> std::result::Result<Response<proto::FileTransferResponse>, Status> {
256 let mut streaming = request.into_inner();
257 let mut dest = String::new();
258 let mut data = Vec::new();
259 while let Some(chunk) = streaming
260 .message()
261 .await
262 .map_err(|e| Status::internal(format!("stream error: {}", e)))?
263 {
264 if dest.is_empty() {
265 dest.clone_from(&chunk.path);
266 }
267 data.extend_from_slice(&chunk.data);
268 }
269 match files::push_file(&self.sample_dir, &dest, data).await {
270 Ok(()) => Ok(Response::new(proto::FileTransferResponse {
271 success: true,
272 error_message: String::new(),
273 })),
274 Err(error_message) => Ok(Response::new(proto::FileTransferResponse {
275 success: false,
276 error_message,
277 })),
278 }
279 }
280
281 async fn pull_file(
282 &self,
283 request: Request<proto::PullFileRequest>,
284 ) -> std::result::Result<Response<Self::PullFileStream>, Status> {
285 let req = request.into_inner();
286 match files::pull_file(&self.artifact_dir, &req.path).await {
287 Ok(data) => {
288 let path = req.path;
289 let chunks: Vec<std::result::Result<proto::FileChunk, Status>> = data
290 .chunks(64 * 1024)
291 .enumerate()
292 .map(|(i, chunk)| {
293 let remaining = data.len() - (i * 64 * 1024) - chunk.len();
294 Ok(proto::FileChunk {
295 path: path.clone(),
296 data: chunk.to_vec(),
297 is_last: remaining == 0,
298 })
299 })
300 .collect();
301 let stream: Self::PullFileStream = Box::pin(tokio_stream::iter(chunks));
302 Ok(Response::new(stream))
303 }
304 Err(e) => Err(Status::internal(e)),
305 }
306 }
307
308 async fn execute_command(
311 &self,
312 _request: Request<proto::ExecRequest>,
313 ) -> std::result::Result<Response<proto::ExecResponse>, Status> {
314 Ok(Response::new(proto::ExecResponse {
315 exit_code: None,
316 stdout: vec![],
317 stderr: vec![],
318 pid: None,
319 }))
320 }
321
322 async fn stream_logs(
325 &self,
326 request: Request<proto::LogStreamRequest>,
327 ) -> std::result::Result<Response<Self::StreamLogsStream>, Status> {
328 let req = request.into_inner();
329 let log_bus = Arc::clone(&self.log_bus);
330 let s = async_stream::stream! {
331 if req.include_buffered {
332 for entry in log_bus.drain_atomic() {
333 yield Ok(stream::log_entry_to_proto(entry));
334 }
335 }
336 loop {
337 let entries = log_bus.recv_atomic().await;
338 if entries.is_empty() { break; }
339 for entry in entries {
340 yield Ok(stream::log_entry_to_proto(entry));
341 }
342 }
343 };
344 Ok(Response::new(Box::pin(s)))
345 }
346
347 async fn pull_result(
350 &self,
351 request: Request<proto::PullResultRequest>,
352 ) -> std::result::Result<Response<Self::PullResultStream>, Status> {
353 let req = request.into_inner();
354 let entry = self
355 .stash
356 .take(&req.handle)
357 .ok_or_else(|| Status::not_found(format!("stash handle not found: {}", req.handle)))?;
358 let file = tokio::fs::File::open(&entry.path)
359 .await
360 .map_err(|e| Status::internal(format!("failed to open stashed result: {e}")))?;
361 let stash = Arc::clone(&self.stash);
362 let s = async_stream::stream! {
363 use tokio::io::AsyncReadExt;
364 let mut file = file;
365 let mut buf = vec![0u8; 64 * 1024];
366 let mut index: u32 = 0;
367 loop {
368 let n = match file.read(&mut buf).await {
369 Ok(n) => n,
370 Err(e) => {
371 yield Err(tonic::Status::internal(format!("read error: {e}")));
372 stash.cleanup_after_pull(&entry);
373 return;
374 }
375 };
376 if n == 0 {
377 yield Ok(proto::ResultChunk { data: vec![], index, is_last: true });
378 break;
379 }
380 yield Ok(proto::ResultChunk { data: buf[..n].to_vec(), index, is_last: false });
381 index += 1;
382 }
383 stash.cleanup_after_pull(&entry);
384 };
385 Ok(Response::new(Box::pin(s)))
386 }
387}
388
389pub struct GuestRuntime<P: Send + Sync + 'static> {
398 plugin: Arc<P>,
399 config: GuestRuntimeConfig,
400 log_bus: Option<Arc<LogBus>>,
401}
402
403impl<P: GuestPlugin> GuestRuntime<P> {
404 pub fn with_config(plugin: P, config: GuestRuntimeConfig) -> Self {
406 Self {
407 plugin: Arc::new(plugin),
408 config,
409 log_bus: None,
410 }
411 }
412
413 pub fn with_log_bus(mut self, bus: Arc<LogBus>) -> Self {
416 self.log_bus = Some(bus);
417 self
418 }
419
420 pub async fn run(self) -> Result<()> {
422 let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
423
424 let sample_dir: PathBuf = PathBuf::from(self.config.sample_dir);
425 let artifact_dir: PathBuf = PathBuf::from(self.config.artifact_dir);
426 let stash_dir: PathBuf = PathBuf::from(self.config.stash_dir);
427 let log_dir: PathBuf = PathBuf::from(self.config.log_dir);
428 let external_log_dir: PathBuf = PathBuf::from(self.config.external_log_dir);
429
430 for (label, dir) in [
432 ("sample_dir", &sample_dir),
433 ("artifact_dir", &artifact_dir),
434 ("stash_dir", &stash_dir),
435 ("log_dir", &log_dir),
436 ("external_log_dir", &external_log_dir),
437 ] {
438 tokio::fs::create_dir_all(dir)
439 .await
440 .map_err(|e| SdkError::Init(format!("failed to create {}: {}", label, e)))?;
441 }
442
443 if let Err(e) = ResultStash::sweep_orphans_on_startup(&stash_dir) {
446 warn!(error = %e, "failed to sweep result stash orphans on startup");
447 }
448 let stash_config = StashConfig {
449 threshold_bytes: self.config.stash_threshold_bytes,
450 ttl: std::time::Duration::from_secs(self.config.stash_ttl_secs),
451 };
452 let stash = Arc::new(
453 ResultStash::new(stash_dir, stash_config)
454 .map_err(|e| SdkError::Init(format!("failed to create stash: {}", e)))?,
455 );
456
457 {
460 let sweep_stash = Arc::clone(&stash);
461 let sweep_interval = sweep_stash.config().ttl / 2;
462 tokio::spawn(async move {
463 let mut ticker = tokio::time::interval(sweep_interval);
464 ticker.tick().await;
466 loop {
467 ticker.tick().await;
468 let n = sweep_stash.sweep_expired();
469 if n > 0 {
470 warn!(
471 reclaimed = n,
472 "TTL sweep reclaimed stale result stash entries"
473 );
474 }
475 }
476 });
477 }
478
479 let log_bus = match self.log_bus {
483 Some(bus) => bus,
484 None => {
485 sweep_log_overflow_orphans(&log_dir);
486 let overflow_path =
487 log_dir.join(format!("run-{}.overflow.jsonl", std::process::id()));
488
489 let bus = Arc::new(LogBus::with_overflow(1024, overflow_path));
490 crate::internal::init_tracing(self.config.log_filter, Some(Arc::clone(&bus)));
491 bus
492 }
493 };
494
495 let to_section = |cfg: &AutoCollectRuntimeConfig| AutoCollectSection {
496 enabled: cfg.enabled,
497 include: cfg.include.iter().map(|s| (*s).to_string()).collect(),
498 exclude: cfg.exclude.iter().map(|s| (*s).to_string()).collect(),
499 max_file_size: cfg.max_file_size,
500 };
501
502 let service = GuestService {
503 plugin: self.plugin,
504 shutdown_tx: std::sync::Mutex::new(Some(shutdown_tx)),
505 sample_dir,
506 artifact_dir,
507 external_log_dir,
508 log_bus: Arc::clone(&log_bus),
509 stash: Arc::clone(&stash),
510 auto_collect_artifacts: to_section(&self.config.auto_collect_artifacts),
511 auto_collect_external_logs: to_section(&self.config.auto_collect_external_logs),
512 default_timeout: self.config.analysis_timeout,
513 };
514
515 let addr = self.config.listen_addr;
516
517 info!(address = %addr, "Guest plugin gRPC server starting");
518
519 tonic::transport::Server::builder()
520 .add_service(GuestPluginServiceServer::new(service))
521 .serve_with_shutdown(addr, async {
522 let _ = shutdown_rx.await;
523 })
524 .await
525 .map_err(|e| {
526 let mut msg = format!("gRPC server error on {addr}: {e}");
527 let mut source = std::error::Error::source(&e);
528 while let Some(cause) = source {
529 msg.push_str(&format!("\n caused by: {cause}"));
530 source = std::error::Error::source(cause);
531 }
532 SdkError::Init(msg)
533 })?;
534
535 info!("Guest plugin gRPC server stopped");
536 Ok(())
537 }
538
539 pub fn run_blocking(self) -> Result<()> {
544 let rt = tokio::runtime::Runtime::new()
545 .map_err(|e| SdkError::Init(format!("failed to create tokio runtime: {}", e)))?;
546 rt.block_on(self.run())
547 }
548}
549
550#[cfg(test)]
551mod guest_runtime_config_tests {
552 use super::*;
553
554 const STATIC_SANITY: GuestRuntimeConfig = GuestRuntimeConfig {
555 listen_addr: std::net::SocketAddr::V4(std::net::SocketAddrV4::new(
556 std::net::Ipv4Addr::UNSPECIFIED,
557 50100,
558 )),
559 sample_dir: "/opt/malbox/samples",
560 artifact_dir: "/opt/malbox/artifacts",
561 stash_dir: "/opt/malbox/stash",
562 log_dir: "/opt/malbox/logs",
563 external_log_dir: "/opt/malbox/ext-logs",
564 stash_threshold_bytes: 1_048_576,
565 stash_ttl_secs: 120,
566 log_filter: "info",
567 analysis_timeout: 120,
568 auto_collect_artifacts: AutoCollectRuntimeConfig {
569 enabled: true,
570 include: &["**/*"],
571 exclude: &[],
572 max_file_size: 50 * 1024 * 1024,
573 },
574 auto_collect_external_logs: AutoCollectRuntimeConfig {
575 enabled: true,
576 include: &["**/*"],
577 exclude: &[],
578 max_file_size: 50 * 1024 * 1024,
579 },
580 };
581
582 #[test]
583 fn guest_runtime_config_is_const_constructible() {
584 assert_eq!(STATIC_SANITY.listen_addr.port(), 50100);
586 }
587}