Skip to main content

malbox_plugin_sdk/runtime/
guest.rs

1//! Guest plugin runtime - exposes a [`GuestPlugin`] as a gRPC server.
2//!
3//! Guest plugins run inside a VM. The daemon is the gRPC client; this
4//! runtime spins up the matching server using the [`GuestPlugin`] trait
5//! implementation.
6//!
7//! Module layout:
8//! - [`mod@self`] - `GuestRuntime`, `GuestRuntimeConfig` (public API)
9//! - [`files`] - file push/pull and path-traversal protection
10//! - [`stream`] - result streaming + log entry conversion
11//! - [`collector`] - auto-collection logic
12
13pub(crate) mod collector;
14mod files;
15mod stream;
16
17use collector::AutoCollectSection;
18
19use crate::error::{Result, SdkError};
20use crate::log::LogBus;
21use crate::plugin::guest::GuestPlugin;
22use crate::stash::{ResultStash, StashConfig};
23
24use malbox_plugin_transport::grpc::proto;
25use malbox_plugin_transport::plugin::{
26    FileChunkStream, GuestPluginService, GuestPluginServiceServer, LogEntryStream,
27    ResultChunkStream, TaskResultStream,
28};
29
30use std::net::SocketAddr;
31use std::path::PathBuf;
32use std::sync::Arc;
33
34use tokio::sync::{mpsc, oneshot};
35use tokio_stream::wrappers::ReceiverStream;
36use tonic::{Request, Response, Status};
37use tracing::{info, warn};
38
39/// Auto-collection settings for a single directory (artifacts or external logs).
40///
41/// These values are baked into the plugin binary at compile time by the
42/// `#[guest_plugin]` macro, which reads them from `plugin.toml`.
43#[derive(Debug, Clone, Copy)]
44pub struct AutoCollectRuntimeConfig {
45    /// Whether auto-collection is enabled for this directory.
46    pub enabled: bool,
47    /// Glob patterns for files to include (e.g. `["**/*.log"]`).
48    pub include: &'static [&'static str],
49    /// Glob patterns for files to exclude.
50    pub exclude: &'static [&'static str],
51    /// Skip files larger than this (bytes). Prevents collecting huge dumps.
52    pub max_file_size: u64,
53}
54
55/// Configuration for the guest plugin runtime.
56///
57/// All values are baked into the plugin binary at compile time by the
58/// `#[guest_plugin]` macro (sourced from `plugin.toml`). Paths use
59/// `&'static str` so the config can be a `const`.
60#[derive(Debug, Clone, Copy)]
61pub struct GuestRuntimeConfig {
62    /// Address the gRPC server binds to (e.g. `0.0.0.0:50100`).
63    pub listen_addr: SocketAddr,
64    /// Directory where the daemon pushes sample files.
65    pub sample_dir: &'static str,
66    /// Directory where plugins write artifact files for collection.
67    pub artifact_dir: &'static str,
68    /// Directory used for the disk-backed result stash.
69    pub stash_dir: &'static str,
70    /// Directory for log overflow files when the ring buffer is full.
71    pub log_dir: &'static str,
72    /// Directory for external log files collected after analysis.
73    pub external_log_dir: &'static str,
74    /// Results larger than this (bytes) are stashed to disk instead of sent inline.
75    pub stash_threshold_bytes: usize,
76    /// How long (seconds) a stashed result can remain un-pulled before cleanup.
77    pub stash_ttl_secs: u64,
78    /// `tracing` filter directive (e.g. `"info"` or `"my_plugin=debug"`).
79    pub log_filter: &'static str,
80    /// Default analysis timeout in seconds if the daemon does not specify one.
81    pub analysis_timeout: u64,
82    /// Auto-collection settings for the artifact directory.
83    pub auto_collect_artifacts: AutoCollectRuntimeConfig,
84    /// Auto-collection settings for the external log directory.
85    pub auto_collect_external_logs: AutoCollectRuntimeConfig,
86}
87
88/// Sweep log overflow files older than 10 minutes. Called once on runtime
89/// startup to clean up orphans from prior crashed runs.
90pub fn sweep_log_overflow_orphans(log_dir: &std::path::Path) {
91    use std::time::{Duration, SystemTime};
92
93    let grace = Duration::from_secs(10 * 60);
94    let now = SystemTime::now();
95
96    let read_dir = match std::fs::read_dir(log_dir) {
97        Ok(d) => d,
98        Err(_) => return,
99    };
100
101    for entry in read_dir.flatten() {
102        let path = entry.path();
103        let name = match path.file_name().and_then(|n| n.to_str()) {
104            Some(n) => n,
105            None => continue,
106        };
107        if !name.ends_with(".overflow.jsonl") {
108            continue;
109        }
110        let modified = entry
111            .metadata()
112            .and_then(|m| m.modified())
113            .unwrap_or(SystemTime::UNIX_EPOCH);
114        let age = now.duration_since(modified).unwrap_or(Duration::ZERO);
115        if age > grace {
116            let _ = std::fs::remove_file(&path);
117        }
118    }
119}
120
121// ---------------------------------------------------------------------------
122// GuestService - implements GuestPluginService (tonic trait) directly
123// ---------------------------------------------------------------------------
124
125/// Internal gRPC service implementation that delegates RPCs to a [`GuestPlugin`].
126struct GuestService<P: Send + Sync + 'static> {
127    plugin: Arc<P>,
128    shutdown_tx: std::sync::Mutex<Option<oneshot::Sender<()>>>,
129    sample_dir: PathBuf,
130    artifact_dir: PathBuf,
131    external_log_dir: PathBuf,
132    log_bus: Arc<LogBus>,
133    stash: Arc<ResultStash>,
134    auto_collect_artifacts: AutoCollectSection,
135    auto_collect_external_logs: AutoCollectSection,
136    default_timeout: u64,
137}
138
139#[tonic::async_trait]
140impl<P: GuestPlugin> GuestPluginService for GuestService<P> {
141    type ExecuteTaskStream = TaskResultStream;
142    type PullFileStream = FileChunkStream;
143    type StreamLogsStream = LogEntryStream;
144    type PullResultStream = ResultChunkStream;
145
146    // -- Lifecycle --------------------------------------------------------
147
148    async fn initialize(
149        &self,
150        _request: Request<proto::InitializeRequest>,
151    ) -> std::result::Result<Response<proto::InitializeResponse>, Status> {
152        Ok(Response::new(proto::InitializeResponse {
153            success: true,
154            error_message: String::new(),
155            capabilities: vec![],
156        }))
157    }
158
159    async fn health_check(
160        &self,
161        _request: Request<proto::HealthCheckRequest>,
162    ) -> std::result::Result<Response<proto::HealthCheckResponse>, Status> {
163        let plugin = self.plugin.clone();
164        let (ready, reason) = tokio::task::spawn_blocking(move || {
165            let status = plugin.health_check();
166            (status.is_ready(), status.reason().to_owned())
167        })
168        .await
169        .unwrap_or((false, "health check panicked".to_string()));
170        Ok(Response::new(proto::HealthCheckResponse { ready, reason }))
171    }
172
173    async fn shutdown(
174        &self,
175        _request: Request<proto::ShutdownRequest>,
176    ) -> std::result::Result<Response<proto::ShutdownResponse>, Status> {
177        if let Ok(mut guard) = self.shutdown_tx.lock()
178            && let Some(tx) = guard.take()
179        {
180            let _ = tx.send(());
181        }
182        Ok(Response::new(proto::ShutdownResponse {
183            acknowledged: true,
184        }))
185    }
186
187    // -- Task execution ---------------------------------------------------
188
189    async fn execute_task(
190        &self,
191        request: Request<proto::TaskRequest>,
192    ) -> std::result::Result<Response<Self::ExecuteTaskStream>, Status> {
193        let req = request.into_inner();
194        let (proto_tx, proto_rx) =
195            mpsc::channel::<std::result::Result<proto::TaskResult, Status>>(32);
196        let (result_tx, mut result_rx) =
197            mpsc::channel::<crate::context::message::TaskResultMessage>(32);
198
199        let plugin = self.plugin.clone();
200        let full_sample_path = if std::path::Path::new(&req.sample_path).is_relative()
201            && !req.sample_path.is_empty()
202        {
203            self.sample_dir.join(&req.sample_path)
204        } else {
205            PathBuf::from(&req.sample_path)
206        };
207
208        let stash = Arc::clone(&self.stash);
209        let artifact_dir = self.artifact_dir.clone();
210        let external_log_dir = self.external_log_dir.clone();
211        let auto_collect_artifacts = self.auto_collect_artifacts.clone();
212        let auto_collect_external_logs = self.auto_collect_external_logs.clone();
213        let default_timeout = self.default_timeout;
214
215        // Relay: convert internal messages to proto and forward to tonic stream
216        let relay_stash = Arc::clone(&stash);
217        let relay_tx = proto_tx.clone();
218        tokio::spawn(async move {
219            while let Some(msg) = result_rx.recv().await {
220                let proto_msg = stream::message_to_proto(msg, &relay_stash);
221                if relay_tx.send(Ok(proto_msg)).await.is_err() {
222                    break;
223                }
224            }
225        });
226
227        tokio::task::spawn_blocking(move || {
228            stream::guest_linear_task(
229                plugin,
230                stream::TaskExecution {
231                    task_id: req.task_id,
232                    sample_path: full_sample_path,
233                    config: req.config,
234                    result_tx,
235                    proto_tx,
236                    stash,
237                    artifact_dir,
238                    external_log_dir,
239                    auto_collect_artifacts,
240                    auto_collect_external_logs,
241                    default_timeout,
242                },
243            );
244        });
245
246        let stream: TaskResultStream = Box::pin(ReceiverStream::new(proto_rx));
247        Ok(Response::new(stream))
248    }
249
250    // -- File transfer ----------------------------------------------------
251
252    async fn push_file(
253        &self,
254        request: Request<tonic::Streaming<proto::FileChunk>>,
255    ) -> std::result::Result<Response<proto::FileTransferResponse>, Status> {
256        let mut streaming = request.into_inner();
257        let mut dest = String::new();
258        let mut data = Vec::new();
259        while let Some(chunk) = streaming
260            .message()
261            .await
262            .map_err(|e| Status::internal(format!("stream error: {}", e)))?
263        {
264            if dest.is_empty() {
265                dest.clone_from(&chunk.path);
266            }
267            data.extend_from_slice(&chunk.data);
268        }
269        match files::push_file(&self.sample_dir, &dest, data).await {
270            Ok(()) => Ok(Response::new(proto::FileTransferResponse {
271                success: true,
272                error_message: String::new(),
273            })),
274            Err(error_message) => Ok(Response::new(proto::FileTransferResponse {
275                success: false,
276                error_message,
277            })),
278        }
279    }
280
281    async fn pull_file(
282        &self,
283        request: Request<proto::PullFileRequest>,
284    ) -> std::result::Result<Response<Self::PullFileStream>, Status> {
285        let req = request.into_inner();
286        match files::pull_file(&self.artifact_dir, &req.path).await {
287            Ok(data) => {
288                let path = req.path;
289                let chunks: Vec<std::result::Result<proto::FileChunk, Status>> = data
290                    .chunks(64 * 1024)
291                    .enumerate()
292                    .map(|(i, chunk)| {
293                        let remaining = data.len() - (i * 64 * 1024) - chunk.len();
294                        Ok(proto::FileChunk {
295                            path: path.clone(),
296                            data: chunk.to_vec(),
297                            is_last: remaining == 0,
298                        })
299                    })
300                    .collect();
301                let stream: Self::PullFileStream = Box::pin(tokio_stream::iter(chunks));
302                Ok(Response::new(stream))
303            }
304            Err(e) => Err(Status::internal(e)),
305        }
306    }
307
308    // -- Command execution (no-op) ----------------------------------------
309
310    async fn execute_command(
311        &self,
312        _request: Request<proto::ExecRequest>,
313    ) -> std::result::Result<Response<proto::ExecResponse>, Status> {
314        Ok(Response::new(proto::ExecResponse {
315            exit_code: None,
316            stdout: vec![],
317            stderr: vec![],
318            pid: None,
319        }))
320    }
321
322    // -- Log streaming ----------------------------------------------------
323
324    async fn stream_logs(
325        &self,
326        request: Request<proto::LogStreamRequest>,
327    ) -> std::result::Result<Response<Self::StreamLogsStream>, Status> {
328        let req = request.into_inner();
329        let log_bus = Arc::clone(&self.log_bus);
330        let s = async_stream::stream! {
331            if req.include_buffered {
332                for entry in log_bus.drain_atomic() {
333                    yield Ok(stream::log_entry_to_proto(entry));
334                }
335            }
336            loop {
337                let entries = log_bus.recv_atomic().await;
338                if entries.is_empty() { break; }
339                for entry in entries {
340                    yield Ok(stream::log_entry_to_proto(entry));
341                }
342            }
343        };
344        Ok(Response::new(Box::pin(s)))
345    }
346
347    // -- Large result pull ------------------------------------------------
348
349    async fn pull_result(
350        &self,
351        request: Request<proto::PullResultRequest>,
352    ) -> std::result::Result<Response<Self::PullResultStream>, Status> {
353        let req = request.into_inner();
354        let entry = self
355            .stash
356            .take(&req.handle)
357            .ok_or_else(|| Status::not_found(format!("stash handle not found: {}", req.handle)))?;
358        let file = tokio::fs::File::open(&entry.path)
359            .await
360            .map_err(|e| Status::internal(format!("failed to open stashed result: {e}")))?;
361        let stash = Arc::clone(&self.stash);
362        let s = async_stream::stream! {
363            use tokio::io::AsyncReadExt;
364            let mut file = file;
365            let mut buf = vec![0u8; 64 * 1024];
366            let mut index: u32 = 0;
367            loop {
368                let n = match file.read(&mut buf).await {
369                    Ok(n) => n,
370                    Err(e) => {
371                        yield Err(tonic::Status::internal(format!("read error: {e}")));
372                        stash.cleanup_after_pull(&entry);
373                        return;
374                    }
375                };
376                if n == 0 {
377                    yield Ok(proto::ResultChunk { data: vec![], index, is_last: true });
378                    break;
379                }
380                yield Ok(proto::ResultChunk { data: buf[..n].to_vec(), index, is_last: false });
381                index += 1;
382            }
383            stash.cleanup_after_pull(&entry);
384        };
385        Ok(Response::new(Box::pin(s)))
386    }
387}
388
389// ---------------------------------------------------------------------------
390// GuestRuntime - public API
391// ---------------------------------------------------------------------------
392
393/// Runtime for [`GuestPlugin`] implementations.
394///
395/// Runs a linear lifecycle per task: `on_start` -> `execute_sample` -> wait
396/// -> `on_stop`, with auto-collection of artifacts and external logs.
397pub struct GuestRuntime<P: Send + Sync + 'static> {
398    plugin: Arc<P>,
399    config: GuestRuntimeConfig,
400    log_bus: Option<Arc<LogBus>>,
401}
402
403impl<P: GuestPlugin> GuestRuntime<P> {
404    /// Create a guest runtime with custom configuration.
405    pub fn with_config(plugin: P, config: GuestRuntimeConfig) -> Self {
406        Self {
407            plugin: Arc::new(plugin),
408            config,
409            log_bus: None,
410        }
411    }
412
413    /// Attach an externally-created [`LogBus`] so the gRPC log stream uses
414    /// the same bus that the tracing layer / C++ FFI already writes to.
415    pub fn with_log_bus(mut self, bus: Arc<LogBus>) -> Self {
416        self.log_bus = Some(bus);
417        self
418    }
419
420    /// Start the gRPC server and block until shutdown.
421    pub async fn run(self) -> Result<()> {
422        let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
423
424        let sample_dir: PathBuf = PathBuf::from(self.config.sample_dir);
425        let artifact_dir: PathBuf = PathBuf::from(self.config.artifact_dir);
426        let stash_dir: PathBuf = PathBuf::from(self.config.stash_dir);
427        let log_dir: PathBuf = PathBuf::from(self.config.log_dir);
428        let external_log_dir: PathBuf = PathBuf::from(self.config.external_log_dir);
429
430        // Ensure all runtime directories exist.
431        for (label, dir) in [
432            ("sample_dir", &sample_dir),
433            ("artifact_dir", &artifact_dir),
434            ("stash_dir", &stash_dir),
435            ("log_dir", &log_dir),
436            ("external_log_dir", &external_log_dir),
437        ] {
438            tokio::fs::create_dir_all(dir)
439                .await
440                .map_err(|e| SdkError::Init(format!("failed to create {}: {}", label, e)))?;
441        }
442
443        // Set up the result stash for large payloads. On startup, sweep any
444        // orphan files left behind by a prior crashed run.
445        if let Err(e) = ResultStash::sweep_orphans_on_startup(&stash_dir) {
446            warn!(error = %e, "failed to sweep result stash orphans on startup");
447        }
448        let stash_config = StashConfig {
449            threshold_bytes: self.config.stash_threshold_bytes,
450            ttl: std::time::Duration::from_secs(self.config.stash_ttl_secs),
451        };
452        let stash = Arc::new(
453            ResultStash::new(stash_dir, stash_config)
454                .map_err(|e| SdkError::Init(format!("failed to create stash: {}", e)))?,
455        );
456
457        // Spawn a background TTL sweeper: purges stash entries that were
458        // never pulled by the daemon within the configured TTL window.
459        {
460            let sweep_stash = Arc::clone(&stash);
461            let sweep_interval = sweep_stash.config().ttl / 2;
462            tokio::spawn(async move {
463                let mut ticker = tokio::time::interval(sweep_interval);
464                // Skip the immediate first tick.
465                ticker.tick().await;
466                loop {
467                    ticker.tick().await;
468                    let n = sweep_stash.sweep_expired();
469                    if n > 0 {
470                        warn!(
471                            reclaimed = n,
472                            "TTL sweep reclaimed stale result stash entries"
473                        );
474                    }
475                }
476            });
477        }
478
479        // Use the externally-provided LogBus (C++ FFI path where tracing
480        // was already initialised with this bus), or create a fresh one and
481        // install the tracing layer ourselves (Rust SDK path).
482        let log_bus = match self.log_bus {
483            Some(bus) => bus,
484            None => {
485                sweep_log_overflow_orphans(&log_dir);
486                let overflow_path =
487                    log_dir.join(format!("run-{}.overflow.jsonl", std::process::id()));
488
489                let bus = Arc::new(LogBus::with_overflow(1024, overflow_path));
490                crate::internal::init_tracing(self.config.log_filter, Some(Arc::clone(&bus)));
491                bus
492            }
493        };
494
495        let to_section = |cfg: &AutoCollectRuntimeConfig| AutoCollectSection {
496            enabled: cfg.enabled,
497            include: cfg.include.iter().map(|s| (*s).to_string()).collect(),
498            exclude: cfg.exclude.iter().map(|s| (*s).to_string()).collect(),
499            max_file_size: cfg.max_file_size,
500        };
501
502        let service = GuestService {
503            plugin: self.plugin,
504            shutdown_tx: std::sync::Mutex::new(Some(shutdown_tx)),
505            sample_dir,
506            artifact_dir,
507            external_log_dir,
508            log_bus: Arc::clone(&log_bus),
509            stash: Arc::clone(&stash),
510            auto_collect_artifacts: to_section(&self.config.auto_collect_artifacts),
511            auto_collect_external_logs: to_section(&self.config.auto_collect_external_logs),
512            default_timeout: self.config.analysis_timeout,
513        };
514
515        let addr = self.config.listen_addr;
516
517        info!(address = %addr, "Guest plugin gRPC server starting");
518
519        tonic::transport::Server::builder()
520            .add_service(GuestPluginServiceServer::new(service))
521            .serve_with_shutdown(addr, async {
522                let _ = shutdown_rx.await;
523            })
524            .await
525            .map_err(|e| {
526                let mut msg = format!("gRPC server error on {addr}: {e}");
527                let mut source = std::error::Error::source(&e);
528                while let Some(cause) = source {
529                    msg.push_str(&format!("\n  caused by: {cause}"));
530                    source = std::error::Error::source(cause);
531                }
532                SdkError::Init(msg)
533            })?;
534
535        info!("Guest plugin gRPC server stopped");
536        Ok(())
537    }
538
539    /// Start the gRPC server, creating a tokio runtime if needed.
540    ///
541    /// This is the entry point for `fn main()` style plugins that don't
542    /// already have an async runtime.
543    pub fn run_blocking(self) -> Result<()> {
544        let rt = tokio::runtime::Runtime::new()
545            .map_err(|e| SdkError::Init(format!("failed to create tokio runtime: {}", e)))?;
546        rt.block_on(self.run())
547    }
548}
549
550#[cfg(test)]
551mod guest_runtime_config_tests {
552    use super::*;
553
554    const STATIC_SANITY: GuestRuntimeConfig = GuestRuntimeConfig {
555        listen_addr: std::net::SocketAddr::V4(std::net::SocketAddrV4::new(
556            std::net::Ipv4Addr::UNSPECIFIED,
557            50100,
558        )),
559        sample_dir: "/opt/malbox/samples",
560        artifact_dir: "/opt/malbox/artifacts",
561        stash_dir: "/opt/malbox/stash",
562        log_dir: "/opt/malbox/logs",
563        external_log_dir: "/opt/malbox/ext-logs",
564        stash_threshold_bytes: 1_048_576,
565        stash_ttl_secs: 120,
566        log_filter: "info",
567        analysis_timeout: 120,
568        auto_collect_artifacts: AutoCollectRuntimeConfig {
569            enabled: true,
570            include: &["**/*"],
571            exclude: &[],
572            max_file_size: 50 * 1024 * 1024,
573        },
574        auto_collect_external_logs: AutoCollectRuntimeConfig {
575            enabled: true,
576            include: &["**/*"],
577            exclude: &[],
578            max_file_size: 50 * 1024 * 1024,
579        },
580    };
581
582    #[test]
583    fn guest_runtime_config_is_const_constructible() {
584        // Compilation of STATIC_SANITY above is the actual check.
585        assert_eq!(STATIC_SANITY.listen_addr.port(), 50100);
586    }
587}