diff --git a/crates/openshell-cli/src/run.rs b/crates/openshell-cli/src/run.rs index f1a44ad31..bba22efe9 100644 --- a/crates/openshell-cli/src/run.rs +++ b/crates/openshell-cli/src/run.rs @@ -219,6 +219,8 @@ struct ProvisioningDisplay { completed_steps: Vec, /// Progress bars for completed steps (so they can be cleared). completed_bars: Vec, + /// The currently active provisioning step. + active_step: Option, /// The currently active step label (shown on the spinner). active_label: String, /// Detail text shown next to the active step (e.g. image name). @@ -253,6 +255,7 @@ impl ProvisioningDisplay { spacer, completed_steps: Vec::new(), completed_bars: Vec::new(), + active_step: None, active_label: ProvisioningStep::RequestingSandbox .active_label() .to_string(), @@ -290,11 +293,15 @@ impl ProvisioningDisplay { self.step_start = Instant::now(); self.spinner.reset_elapsed(); self.active_detail.clear(); + if self.active_step == Some(step) { + self.active_step = None; + } } /// Set the active (in-progress) step shown on the spinner. - fn set_active(&mut self, label: &str) { - self.active_label = label.to_string(); + fn set_active(&mut self, step: ProvisioningStep) { + self.active_step = Some(step); + self.active_label = step.active_label().to_string(); self.active_detail.clear(); // Reset the spinner's elapsed time for the new step. self.spinner.reset_elapsed(); @@ -304,11 +311,17 @@ impl ProvisioningDisplay { /// Set the active step from a known provisioning step enum. fn set_active_step(&mut self, step: ProvisioningStep) { - self.set_active(step.active_label()); + if self.active_step == Some(step) { + return; + } + self.set_active(step); } /// Set detail text shown alongside the active step (e.g. image name). fn set_active_detail(&mut self, detail: &str) { + if self.active_detail == detail { + return; + } self.active_detail = detail.to_string(); self.update_spinner(); } @@ -6949,7 +6962,7 @@ fn format_timestamp_ms(ms: i64) -> String { #[cfg(test)] mod tests { use super::{ - ProvisioningStep, TlsOptions, build_sandbox_resource_limits, + ProvisioningDisplay, ProvisioningStep, TlsOptions, build_sandbox_resource_limits, dockerfile_sources_supported_for_gateway, format_endpoint, format_gateway_select_header, format_gateway_select_items, format_provider_attachment_table, gateway_add, gateway_auth_label, gateway_env_override_warning, gateway_select_with, gateway_type_label, @@ -6970,6 +6983,7 @@ mod tests { use std::path::{Path, PathBuf}; use std::process::Command; use std::thread; + use std::time::{Duration, Instant}; use tonic::Status; use openshell_bootstrap::GatewayMetadata; @@ -7178,6 +7192,48 @@ mod tests { assert_eq!(progress_step_from_metadata("driver-private-step"), None); } + #[test] + fn provisioning_display_ignores_repeated_active_step_updates() { + let mut display = ProvisioningDisplay::new(); + display.set_active_step(ProvisioningStep::PullingSandboxImage); + display.set_active_detail("Downloading layer-1 (1 MB/2 MB)"); + + let original_step_start = Instant::now() + .checked_sub(Duration::from_secs(5)) + .expect("test duration should be representable"); + display.step_start = original_step_start; + + display.set_active_step(ProvisioningStep::PullingSandboxImage); + display.set_active_detail("Downloading layer-1 (1 MB/2 MB)"); + + assert_eq!( + display.active_step, + Some(ProvisioningStep::PullingSandboxImage) + ); + assert_eq!(display.active_detail, "Downloading layer-1 (1 MB/2 MB)"); + assert_eq!(display.step_start, original_step_start); + display.clear(); + } + + #[test] + fn provisioning_display_resets_detail_on_active_step_transition() { + let mut display = ProvisioningDisplay::new(); + display.set_active_step(ProvisioningStep::PullingSandboxImage); + display.set_active_detail("Downloading layer-1 (1 MB/2 MB)"); + + let original_step_start = Instant::now() + .checked_sub(Duration::from_secs(5)) + .expect("test duration should be representable"); + display.step_start = original_step_start; + + display.set_active_step(ProvisioningStep::StartingSandbox); + + assert_eq!(display.active_step, Some(ProvisioningStep::StartingSandbox)); + assert!(display.active_detail.is_empty()); + assert!(display.step_start > original_step_start); + display.clear(); + } + #[test] fn refresh_status_table_includes_operational_fields() { let header = refresh_status_header(); diff --git a/crates/openshell-driver-docker/src/lib.rs b/crates/openshell-driver-docker/src/lib.rs index 1fdcea9cd..eec81cf59 100644 --- a/crates/openshell-driver-docker/src/lib.rs +++ b/crates/openshell-driver-docker/src/lib.rs @@ -8,9 +8,9 @@ use bollard::Docker; use bollard::errors::Error as BollardError; use bollard::models::{ - ContainerCreateBody, ContainerSummary, ContainerSummaryStateEnum, DeviceRequest, - EndpointSettings, HostConfig, NetworkCreateRequest, NetworkingConfig, RestartPolicy, - RestartPolicyNameEnum, SystemInfo, + ContainerCreateBody, ContainerSummary, ContainerSummaryStateEnum, CreateImageInfo, + DeviceRequest, EndpointSettings, HostConfig, NetworkCreateRequest, NetworkingConfig, + ProgressDetail, RestartPolicy, RestartPolicyNameEnum, SystemInfo, }; use bollard::query_parameters::{ CreateContainerOptionsBuilder, CreateImageOptions, DownloadFromContainerOptionsBuilder, @@ -24,14 +24,19 @@ use openshell_core::driver_utils::{ LABEL_SANDBOX_NAMESPACE, SUPERVISOR_IMAGE_BINARY_PATH, }; use openshell_core::gpu::cdi_gpu_device_ids; +use openshell_core::progress::{ + PROGRESS_STEP_PULLING_IMAGE, PROGRESS_STEP_REQUESTING_SANDBOX, PROGRESS_STEP_STARTING_SANDBOX, + mark_progress_active, mark_progress_complete, mark_progress_detail, +}; use openshell_core::proto::compute::v1::{ CreateSandboxRequest, CreateSandboxResponse, DeleteSandboxRequest, DeleteSandboxResponse, - DriverCondition, DriverSandbox, DriverSandboxStatus, DriverSandboxTemplate, - GetCapabilitiesRequest, GetCapabilitiesResponse, GetSandboxRequest, GetSandboxResponse, - ListSandboxesRequest, ListSandboxesResponse, StopSandboxRequest, StopSandboxResponse, - ValidateSandboxCreateRequest, ValidateSandboxCreateResponse, WatchSandboxesDeletedEvent, - WatchSandboxesEvent, WatchSandboxesRequest, WatchSandboxesSandboxEvent, - compute_driver_server::ComputeDriver, watch_sandboxes_event, + DriverCondition, DriverPlatformEvent, DriverSandbox, DriverSandboxStatus, + DriverSandboxTemplate, GetCapabilitiesRequest, GetCapabilitiesResponse, GetSandboxRequest, + GetSandboxResponse, ListSandboxesRequest, ListSandboxesResponse, StopSandboxRequest, + StopSandboxResponse, ValidateSandboxCreateRequest, ValidateSandboxCreateResponse, + WatchSandboxesDeletedEvent, WatchSandboxesEvent, WatchSandboxesPlatformEvent, + WatchSandboxesRequest, WatchSandboxesSandboxEvent, compute_driver_server::ComputeDriver, + watch_sandboxes_event, }; use openshell_core::{Config, Error, Result as CoreResult}; use std::collections::HashMap; @@ -41,7 +46,8 @@ use std::path::{Path, PathBuf}; use std::pin::Pin; use std::sync::Arc; use std::time::Duration; -use tokio::sync::{broadcast, mpsc}; +use tokio::sync::{Mutex, broadcast, mpsc}; +use tokio::task::JoinHandle; use tokio_stream::wrappers::ReceiverStream; use tonic::{Request, Response, Status}; use tracing::{info, warn}; @@ -222,9 +228,21 @@ pub struct DockerComputeDriver { docker: Arc, config: DockerDriverRuntimeConfig, events: broadcast::Sender, + pending: Arc>>, supervisor_readiness: Arc, } +struct PendingSandboxRecord { + sandbox: DriverSandbox, + task: Option>, +} + +#[derive(Debug, Clone)] +struct DockerProvisioningFailure { + reason: &'static str, + message: String, +} + #[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] struct DockerResourceLimits { nano_cpus: Option, @@ -300,6 +318,7 @@ impl DockerComputeDriver { supports_gpu, }, events: broadcast::channel(WATCH_BUFFER).0, + pending: Arc::new(Mutex::new(HashMap::new())), supervisor_readiness, }; @@ -366,6 +385,20 @@ impl DockerComputeDriver { Ok(()) } + fn validate_sandbox_auth(sandbox: &DriverSandbox) -> Result<(), Status> { + let token_present = sandbox + .spec + .as_ref() + .is_some_and(|spec| !spec.sandbox_token.trim().is_empty()); + if token_present { + return Ok(()); + } + + Err(Status::failed_precondition( + "docker sandboxes require gateway JWT auth; configure [openshell.gateway.gateway_jwt]", + )) + } + fn validate_gpu_request(gpu: bool, supports_gpu: bool) -> Result<(), Status> { if gpu && !supports_gpu { return Err(Status::failed_precondition( @@ -383,25 +416,36 @@ impl DockerComputeDriver { let container = self .find_managed_container_summary(sandbox_id, sandbox_name) .await?; - Ok(container.and_then(|summary| { + if let Some(sandbox) = container.and_then(|summary| { sandbox_from_container_summary(&summary, self.supervisor_readiness.as_ref()) - })) + }) { + return Ok(Some(sandbox)); + } + + Ok(self.pending_snapshot(sandbox_id, sandbox_name).await) } async fn current_snapshots(&self) -> Result, Status> { let containers = self.list_managed_container_summaries().await?; - let mut sandboxes = containers + let container_sandboxes = containers .iter() .filter_map(|summary| { sandbox_from_container_summary(summary, self.supervisor_readiness.as_ref()) }) .collect::>(); + let mut by_id = self.pending_snapshot_map().await; + for sandbox in container_sandboxes { + by_id.insert(sandbox.id.clone(), sandbox); + } + let mut sandboxes = by_id.into_values().collect::>(); sandboxes.sort_by(|left, right| left.id.cmp(&right.id)); Ok(sandboxes) } async fn create_sandbox_inner(&self, sandbox: &DriverSandbox) -> Result<(), Status> { Self::validate_sandbox(sandbox, &self.config)?; + Self::validate_sandbox_auth(sandbox)?; + let _ = build_container_create_body(sandbox, &self.config)?; if self .find_managed_container_summary(&sandbox.id, &sandbox.name) @@ -411,16 +455,76 @@ impl DockerComputeDriver { return Err(Status::already_exists("sandbox already exists")); } + self.reserve_pending_sandbox(sandbox).await?; + let image = sandbox_image(sandbox).unwrap_or_default(); + self.publish_docker_progress( + &sandbox.id, + "Scheduled", + format!("Docker sandbox accepted for image \"{image}\""), + HashMap::from([("image_ref".to_string(), image)]), + ); + self.publish_sandbox_snapshot(pending_sandbox_snapshot( + sandbox, + &self.config.sandbox_namespace, + provisioning_condition(), + false, + )); + + let driver = self.clone(); + let sandbox_for_task = sandbox.clone(); + let sandbox_id = sandbox.id.clone(); + let task = tokio::spawn(async move { + driver.provision_sandbox(sandbox_for_task).await; + }); + + let mut pending = self.pending.lock().await; + if let Some(record) = pending.get_mut(&sandbox_id) { + record.task = Some(task); + } else { + task.abort(); + } + + Ok(()) + } + + async fn provision_sandbox(&self, sandbox: DriverSandbox) { + match self.provision_sandbox_inner(&sandbox).await { + Ok(()) => { + self.clear_pending_sandbox(&sandbox.id).await; + } + Err(failure) => { + self.fail_pending_sandbox(&sandbox, &failure).await; + } + } + } + + async fn provision_sandbox_inner( + &self, + sandbox: &DriverSandbox, + ) -> Result<(), DockerProvisioningFailure> { let template = sandbox .spec .as_ref() .and_then(|spec| spec.template.as_ref()) .expect("validated sandbox has template"); - self.ensure_image_available(&template.image).await?; - let token_file_created = write_sandbox_token_file(sandbox, &self.config).await?; + self.ensure_image_available(&sandbox.id, &template.image) + .await + .map_err(|status| { + DockerProvisioningFailure::new("ImagePullFailed", status.message()) + })?; + let token_file_created = write_sandbox_token_file(sandbox, &self.config) + .await + .map_err(|status| { + DockerProvisioningFailure::new("SandboxTokenWriteFailed", status.message()) + })?; let container_name = container_name_for_sandbox(sandbox); - let create_body = build_container_create_body(sandbox, &self.config)?; + let create_body = build_container_create_body(sandbox, &self.config).map_err(|status| { + if token_file_created { + cleanup_sandbox_token_file(sandbox, &self.config); + } + DockerProvisioningFailure::new("ContainerCreateFailed", status.message()) + })?; self.docker .create_container( Some( @@ -435,8 +539,17 @@ impl DockerComputeDriver { if token_file_created { cleanup_sandbox_token_file(sandbox, &self.config); } - create_status_from_docker_error("create docker sandbox container", err) + DockerProvisioningFailure::from_status( + "ContainerCreateFailed", + create_status_from_docker_error("create docker sandbox container", err), + ) })?; + self.publish_docker_progress( + &sandbox.id, + "Created", + format!("Created Docker container \"{container_name}\""), + HashMap::from([("container_name".to_string(), container_name.clone())]), + ); if let Err(err) = self.docker.start_container(&container_name, None).await { let cleanup = self @@ -457,11 +570,27 @@ impl DockerComputeDriver { if token_file_created { cleanup_sandbox_token_file(sandbox, &self.config); } - return Err(create_status_from_docker_error( - "start docker sandbox container", - err, + return Err(DockerProvisioningFailure::from_status( + "ContainerStartFailed", + create_status_from_docker_error("start docker sandbox container", err), )); } + self.publish_docker_progress( + &sandbox.id, + "Started", + format!("Started Docker container \"{container_name}\""), + HashMap::from([("container_name".to_string(), container_name)]), + ); + if let Err(err) = self + .publish_container_snapshot(&sandbox.id, &sandbox.name) + .await + { + warn!( + sandbox_id = %sandbox.id, + error = %err, + "Failed to publish Docker sandbox snapshot after start" + ); + } Ok(()) } @@ -471,14 +600,44 @@ impl DockerComputeDriver { sandbox_id: &str, sandbox_name: &str, ) -> Result { + let pending = self.remove_pending_sandbox(sandbox_id, sandbox_name).await; + if let Some(record) = pending.as_ref() + && let Some(task) = record.task.as_ref() + { + task.abort(); + } + let Some(container) = self .find_managed_container_summary(sandbox_id, sandbox_name) .await? else { + if let Some(record) = pending { + let container_name = container_name_for_sandbox(&record.sandbox); + match self + .docker + .remove_container( + &container_name, + Some(RemoveContainerOptionsBuilder::default().force(true).build()), + ) + .await + { + Ok(()) => { + cleanup_sandbox_token_file(&record.sandbox, &self.config); + return Ok(true); + } + Err(err) if is_not_found_error(&err) => { + cleanup_sandbox_token_file(&record.sandbox, &self.config); + return Ok(true); + } + Err(err) => { + return Err(internal_status("delete docker sandbox container", err)); + } + } + } return Ok(false); }; let Some(target) = summary_container_target(&container) else { - return Ok(false); + return Ok(pending.is_some()); }; match self @@ -490,12 +649,12 @@ impl DockerComputeDriver { .await { Ok(()) => { - cleanup_sandbox_token_file_by_id(sandbox_id, &self.config); + cleanup_sandbox_token_file_for_delete(sandbox_id, pending.as_ref(), &self.config); Ok(true) } Err(err) if is_not_found_error(&err) => { - cleanup_sandbox_token_file_by_id(sandbox_id, &self.config); - Ok(false) + cleanup_sandbox_token_file_for_delete(sandbox_id, pending.as_ref(), &self.config); + Ok(pending.is_some()) } Err(err) => Err(internal_status("delete docker sandbox container", err)), } @@ -506,6 +665,14 @@ impl DockerComputeDriver { .find_managed_container_summary(sandbox_id, sandbox_name) .await? else { + if let Some(record) = self.remove_pending_sandbox(sandbox_id, sandbox_name).await { + if let Some(task) = record.task { + task.abort(); + } + cleanup_sandbox_token_file(&record.sandbox, &self.config); + self.publish_deleted(record.sandbox.id); + return Ok(()); + } return Err(Status::not_found("sandbox not found")); }; let Some(target) = summary_container_target(&container) else { @@ -631,6 +798,167 @@ impl DockerComputeDriver { Ok(stopped) } + async fn reserve_pending_sandbox(&self, sandbox: &DriverSandbox) -> Result<(), Status> { + let mut pending = self.pending.lock().await; + if pending + .values() + .any(|record| record.sandbox.id == sandbox.id || record.sandbox.name == sandbox.name) + { + return Err(Status::already_exists("sandbox already exists")); + } + + pending.insert( + sandbox.id.clone(), + PendingSandboxRecord { + sandbox: pending_sandbox_snapshot( + sandbox, + &self.config.sandbox_namespace, + provisioning_condition(), + false, + ), + task: None, + }, + ); + Ok(()) + } + + async fn pending_snapshot( + &self, + sandbox_id: &str, + sandbox_name: &str, + ) -> Option { + let pending = self.pending.lock().await; + pending + .values() + .find(|record| pending_sandbox_matches(&record.sandbox, sandbox_id, sandbox_name)) + .map(|record| record.sandbox.clone()) + } + + async fn pending_snapshot_map(&self) -> HashMap { + let pending = self.pending.lock().await; + pending + .iter() + .map(|(sandbox_id, record)| (sandbox_id.clone(), record.sandbox.clone())) + .collect() + } + + async fn clear_pending_sandbox(&self, sandbox_id: &str) { + let mut pending = self.pending.lock().await; + pending.remove(sandbox_id); + } + + async fn remove_pending_sandbox( + &self, + sandbox_id: &str, + sandbox_name: &str, + ) -> Option { + let mut pending = self.pending.lock().await; + let id = pending.iter().find_map(|(id, record)| { + pending_sandbox_matches(&record.sandbox, sandbox_id, sandbox_name).then(|| id.clone()) + })?; + pending.remove(&id) + } + + async fn fail_pending_sandbox( + &self, + sandbox: &DriverSandbox, + failure: &DockerProvisioningFailure, + ) { + cleanup_sandbox_token_file(sandbox, &self.config); + let snapshot = pending_sandbox_snapshot( + sandbox, + &self.config.sandbox_namespace, + error_condition(failure.reason, &failure.message), + false, + ); + { + let mut pending = self.pending.lock().await; + if let Some(record) = pending.get_mut(&sandbox.id) { + record.sandbox = snapshot.clone(); + record.task = None; + } else { + return; + } + } + + self.publish_platform_event( + sandbox.id.clone(), + platform_event( + "docker", + "Warning", + failure.reason, + format!("Docker sandbox provisioning failed: {}", failure.message), + ), + ); + self.publish_sandbox_snapshot(snapshot); + } + + async fn publish_container_snapshot( + &self, + sandbox_id: &str, + sandbox_name: &str, + ) -> Result<(), Status> { + if let Some(summary) = self + .find_managed_container_summary(sandbox_id, sandbox_name) + .await? + && let Some(sandbox) = + sandbox_from_container_summary(&summary, self.supervisor_readiness.as_ref()) + { + self.publish_sandbox_snapshot(sandbox); + } + Ok(()) + } + + fn publish_sandbox_snapshot(&self, sandbox: DriverSandbox) { + let _ = self.events.send(WatchSandboxesEvent { + payload: Some(watch_sandboxes_event::Payload::Sandbox( + WatchSandboxesSandboxEvent { + sandbox: Some(sandbox), + }, + )), + }); + } + + fn publish_deleted(&self, sandbox_id: String) { + let _ = self.events.send(WatchSandboxesEvent { + payload: Some(watch_sandboxes_event::Payload::Deleted( + WatchSandboxesDeletedEvent { sandbox_id }, + )), + }); + } + + fn publish_platform_event(&self, sandbox_id: String, event: DriverPlatformEvent) { + let _ = self.events.send(WatchSandboxesEvent { + payload: Some(watch_sandboxes_event::Payload::PlatformEvent( + WatchSandboxesPlatformEvent { + sandbox_id, + event: Some(event), + }, + )), + }); + } + + fn publish_docker_progress( + &self, + sandbox_id: &str, + reason: &str, + message: String, + mut metadata: HashMap, + ) { + attach_docker_progress_metadata(&mut metadata, reason, &message); + self.publish_platform_event( + sandbox_id.to_string(), + DriverPlatformEvent { + timestamp_ms: openshell_core::time::now_ms(), + source: "docker".to_string(), + r#type: "Normal".to_string(), + reason: reason.to_string(), + message, + metadata, + }, + ); + } + async fn poll_loop(self) { let mut previous = match self.current_snapshot_map().await { Ok(snapshots) => snapshots, @@ -730,18 +1058,32 @@ impl DockerComputeDriver { })) } - async fn ensure_image_available(&self, image: &str) -> Result<(), Status> { + async fn ensure_image_available(&self, sandbox_id: &str, image: &str) -> Result<(), Status> { let policy = self.config.image_pull_policy.trim().to_ascii_lowercase(); match policy.as_str() { "" | "ifnotpresent" => { if self.docker.inspect_image(image).await.is_ok() { + self.publish_docker_progress( + sandbox_id, + "ImagePresent", + format!("Docker image \"{image}\" is already present"), + HashMap::from([("image_ref".to_string(), image.to_string())]), + ); return Ok(()); } - self.pull_image(image).await + self.pull_image(sandbox_id, image).await } - "always" => self.pull_image(image).await, + "always" => self.pull_image(sandbox_id, image).await, "never" => match self.docker.inspect_image(image).await { - Ok(_) => Ok(()), + Ok(_) => { + self.publish_docker_progress( + sandbox_id, + "ImagePresent", + format!("Docker image \"{image}\" is already present"), + HashMap::from([("image_ref".to_string(), image.to_string())]), + ); + Ok(()) + } Err(err) if is_not_found_error(&err) => Err(Status::failed_precondition(format!( "docker image '{image}' is not present locally and image_pull_policy=Never" ))), @@ -753,7 +1095,13 @@ impl DockerComputeDriver { } } - async fn pull_image(&self, image: &str) -> Result<(), Status> { + async fn pull_image(&self, sandbox_id: &str, image: &str) -> Result<(), Status> { + self.publish_docker_progress( + sandbox_id, + "Pulling", + format!("Pulling Docker image \"{image}\""), + HashMap::from([("image_ref".to_string(), image.to_string())]), + ); let mut stream = self.docker.create_image( Some(CreateImageOptions { from_image: Some(image.to_string()), @@ -763,8 +1111,26 @@ impl DockerComputeDriver { None, ); while let Some(result) = stream.next().await { - result.map_err(|err| internal_status("pull Docker image", err))?; + let info = result.map_err(|err| internal_status("pull Docker image", err))?; + if let Some(message) = info + .error_detail + .as_ref() + .and_then(|detail| detail.message.as_ref()) + { + return Err(Status::failed_precondition(format!( + "pull Docker image '{image}' failed: {message}" + ))); + } + if let Some(event) = docker_pull_progress_event(image, &info) { + self.publish_platform_event(sandbox_id.to_string(), event); + } } + self.publish_docker_progress( + sandbox_id, + "Pulled", + format!("Pulled Docker image \"{image}\""), + HashMap::from([("image_ref".to_string(), image.to_string())]), + ); Ok(()) } } @@ -918,6 +1284,235 @@ impl ComputeDriver for DockerComputeDriver { } } +impl DockerProvisioningFailure { + fn new(reason: &'static str, message: impl Into) -> Self { + Self { + reason, + message: message.into(), + } + } + + fn from_status(reason: &'static str, status: Status) -> Self { + Self::new(reason, status.message()) + } +} + +fn sandbox_image(sandbox: &DriverSandbox) -> Option { + sandbox + .spec + .as_ref() + .and_then(|spec| spec.template.as_ref()) + .map(|template| template.image.clone()) + .filter(|image| !image.trim().is_empty()) +} + +fn pending_sandbox_snapshot( + sandbox: &DriverSandbox, + namespace: &str, + condition: DriverCondition, + deleting: bool, +) -> DriverSandbox { + DriverSandbox { + id: sandbox.id.clone(), + name: sandbox.name.clone(), + namespace: namespace.to_string(), + spec: None, + status: Some(DriverSandboxStatus { + sandbox_name: sandbox.name.clone(), + instance_id: String::new(), + agent_fd: String::new(), + sandbox_fd: String::new(), + conditions: vec![condition], + deleting, + }), + } +} + +fn pending_sandbox_matches(sandbox: &DriverSandbox, sandbox_id: &str, sandbox_name: &str) -> bool { + (!sandbox_id.is_empty() && sandbox.id == sandbox_id) + || (!sandbox_name.is_empty() && sandbox.name == sandbox_name) +} + +fn provisioning_condition() -> DriverCondition { + DriverCondition { + r#type: "Ready".to_string(), + status: "False".to_string(), + reason: "Starting".to_string(), + message: "Docker container is starting".to_string(), + last_transition_time: String::new(), + } +} + +fn error_condition(reason: &str, message: &str) -> DriverCondition { + DriverCondition { + r#type: "Ready".to_string(), + status: "False".to_string(), + reason: reason.to_string(), + message: message.to_string(), + last_transition_time: String::new(), + } +} + +fn platform_event( + source: &str, + event_type: &str, + reason: &str, + message: String, +) -> DriverPlatformEvent { + DriverPlatformEvent { + timestamp_ms: openshell_core::time::now_ms(), + source: source.to_string(), + r#type: event_type.to_string(), + reason: reason.to_string(), + message, + metadata: HashMap::new(), + } +} + +fn docker_pull_progress_event(image: &str, info: &CreateImageInfo) -> Option { + let status = info.status.as_deref().map(str::trim)?; + if status.is_empty() { + return None; + } + + let mut metadata = HashMap::from([ + ("image_ref".to_string(), image.to_string()), + ("docker_status".to_string(), status.to_string()), + ]); + if let Some(layer_id) = info.id.as_deref().filter(|id| !id.is_empty()) { + metadata.insert("layer_id".to_string(), layer_id.to_string()); + } + if let Some(detail) = docker_pull_progress_detail(info) { + metadata.insert("detail".to_string(), detail); + } + attach_docker_progress_metadata(&mut metadata, "PullingLayer", status); + + Some(DriverPlatformEvent { + timestamp_ms: openshell_core::time::now_ms(), + source: "docker".to_string(), + r#type: "Normal".to_string(), + reason: "PullingLayer".to_string(), + message: docker_pull_message(info, status), + metadata, + }) +} + +fn docker_pull_message(info: &CreateImageInfo, status: &str) -> String { + info.id.as_deref().filter(|id| !id.is_empty()).map_or_else( + || format!("Docker image pull: {status}"), + |layer_id| format!("Docker image pull {layer_id}: {status}"), + ) +} + +fn docker_pull_progress_detail(info: &CreateImageInfo) -> Option { + let status = info.status.as_deref().unwrap_or("Pulling"); + let layer_id = info.id.as_deref().filter(|id| !id.is_empty()); + let progress = info + .progress_detail + .as_ref() + .and_then(format_progress_detail); + + match (layer_id, progress) { + (Some(layer_id), Some(progress)) => Some(format!("{status} {layer_id} ({progress})")), + (Some(layer_id), None) => Some(format!("{status} {layer_id}")), + (None, Some(progress)) => Some(format!("{status} ({progress})")), + (None, None) => (!status.is_empty()).then(|| status.to_string()), + } +} + +fn format_progress_detail(progress: &ProgressDetail) -> Option { + let current = progress.current.and_then(|value| u64::try_from(value).ok()); + let total = progress + .total + .and_then(|value| u64::try_from(value).ok()) + .filter(|value| *value > 0); + + match (current, total) { + (Some(current), Some(total)) => { + Some(format!("{}/{}", format_bytes(current), format_bytes(total))) + } + (Some(current), _) if current > 0 => Some(format_bytes(current)), + _ => None, + } +} + +fn format_bytes(bytes: u64) -> String { + const KB: u64 = 1024; + const MB: u64 = 1024 * KB; + const GB: u64 = 1024 * MB; + + if bytes >= GB { + #[allow(clippy::cast_precision_loss)] + let gb = bytes as f64 / GB as f64; + format!("{gb:.1} GB") + } else if bytes >= MB { + format!("{} MB", bytes / MB) + } else if bytes >= KB { + format!("{} KB", bytes / KB) + } else { + format!("{bytes} B") + } +} + +fn attach_docker_progress_metadata( + metadata: &mut HashMap, + reason: &str, + message: &str, +) { + match reason { + "Scheduled" => { + mark_progress_complete( + metadata, + PROGRESS_STEP_REQUESTING_SANDBOX, + "Sandbox allocated", + ); + mark_progress_active(metadata, PROGRESS_STEP_PULLING_IMAGE); + if let Some(image) = metadata.get("image_ref").cloned() { + mark_progress_detail(metadata, image); + } + } + "Pulling" => { + mark_progress_active(metadata, PROGRESS_STEP_PULLING_IMAGE); + if let Some(image) = metadata.get("image_ref").cloned() { + mark_progress_detail(metadata, image); + } + } + "PullingLayer" => { + mark_progress_active(metadata, PROGRESS_STEP_PULLING_IMAGE); + if let Some(detail) = metadata + .get("detail") + .cloned() + .filter(|detail| !detail.is_empty()) + { + mark_progress_detail(metadata, detail); + } else if !message.is_empty() { + mark_progress_detail(metadata, message); + } + } + "ImagePresent" => { + mark_progress_complete( + metadata, + PROGRESS_STEP_PULLING_IMAGE, + "Image already present", + ); + mark_progress_active(metadata, PROGRESS_STEP_STARTING_SANDBOX); + } + "Pulled" => { + mark_progress_complete(metadata, PROGRESS_STEP_PULLING_IMAGE, "Image pulled"); + mark_progress_active(metadata, PROGRESS_STEP_STARTING_SANDBOX); + } + "Created" => { + mark_progress_active(metadata, PROGRESS_STEP_STARTING_SANDBOX); + mark_progress_detail(metadata, "Container created"); + } + "Started" => { + mark_progress_active(metadata, PROGRESS_STEP_STARTING_SANDBOX); + mark_progress_detail(metadata, "Waiting for supervisor relay"); + } + _ => {} + } +} + fn build_binds( sandbox: &DriverSandbox, config: &DockerDriverRuntimeConfig, @@ -1013,6 +1608,18 @@ fn cleanup_sandbox_token_file(sandbox: &DriverSandbox, config: &DockerDriverRunt cleanup_sandbox_token_file_by_id(&sandbox.id, config); } +fn cleanup_sandbox_token_file_for_delete( + sandbox_id: &str, + pending: Option<&PendingSandboxRecord>, + config: &DockerDriverRuntimeConfig, +) { + if !sandbox_id.is_empty() { + cleanup_sandbox_token_file_by_id(sandbox_id, config); + } else if let Some(record) = pending { + cleanup_sandbox_token_file(&record.sandbox, config); + } +} + fn cleanup_sandbox_token_file_by_id(sandbox_id: &str, config: &DockerDriverRuntimeConfig) { let Ok(path) = sandbox_token_host_path_by_id(sandbox_id, config) else { return; diff --git a/crates/openshell-driver-docker/src/tests.rs b/crates/openshell-driver-docker/src/tests.rs index 9afec4be4..fad773521 100644 --- a/crates/openshell-driver-docker/src/tests.rs +++ b/crates/openshell-driver-docker/src/tests.rs @@ -7,6 +7,11 @@ use openshell_core::driver_utils::{ LABEL_MANAGED_BY, LABEL_MANAGED_BY_VALUE, LABEL_SANDBOX_ID, LABEL_SANDBOX_NAME, LABEL_SANDBOX_NAMESPACE, }; +use openshell_core::progress::{ + PROGRESS_ACTIVE_DETAIL_KEY, PROGRESS_ACTIVE_STEP_KEY, PROGRESS_COMPLETE_LABEL_KEY, + PROGRESS_COMPLETE_STEP_KEY, PROGRESS_STEP_PULLING_IMAGE, PROGRESS_STEP_REQUESTING_SANDBOX, + PROGRESS_STEP_STARTING_SANDBOX, +}; use openshell_core::proto::compute::v1::{ DriverResourceRequirements, DriverSandboxSpec, DriverSandboxTemplate, }; @@ -553,6 +558,28 @@ fn validate_sandbox_rejects_gpu_when_cdi_unavailable() { assert!(err.message().contains("Docker CDI")); } +#[test] +fn validate_sandbox_auth_requires_gateway_token() { + let mut sandbox = test_sandbox(); + sandbox.spec.as_mut().unwrap().sandbox_token.clear(); + + let err = DockerComputeDriver::validate_sandbox_auth(&sandbox).unwrap_err(); + + assert_eq!(err.code(), tonic::Code::FailedPrecondition); + assert_eq!( + err.message(), + "docker sandboxes require gateway JWT auth; configure [openshell.gateway.gateway_jwt]" + ); +} + +#[test] +fn validate_sandbox_auth_accepts_gateway_token() { + let mut sandbox = test_sandbox(); + sandbox.spec.as_mut().unwrap().sandbox_token = "secret.jwt.value".to_string(); + + DockerComputeDriver::validate_sandbox_auth(&sandbox).unwrap(); +} + #[test] fn build_container_create_body_maps_gpu_to_all_cdi_device() { let mut config = runtime_config(); @@ -736,6 +763,123 @@ fn driver_status_marks_restarting_sandboxes_as_error() { ); } +#[test] +fn docker_scheduled_event_adds_progress_metadata() { + let mut metadata = HashMap::from([( + "image_ref".to_string(), + "ghcr.io/acme/sandbox:latest".to_string(), + )]); + + attach_docker_progress_metadata( + &mut metadata, + "Scheduled", + "Docker sandbox accepted for image \"ghcr.io/acme/sandbox:latest\"", + ); + + assert_eq!( + metadata.get(PROGRESS_COMPLETE_STEP_KEY).map(String::as_str), + Some(PROGRESS_STEP_REQUESTING_SANDBOX) + ); + assert_eq!( + metadata + .get(PROGRESS_COMPLETE_LABEL_KEY) + .map(String::as_str), + Some("Sandbox allocated") + ); + assert_eq!( + metadata.get(PROGRESS_ACTIVE_STEP_KEY).map(String::as_str), + Some(PROGRESS_STEP_PULLING_IMAGE) + ); + assert_eq!( + metadata.get(PROGRESS_ACTIVE_DETAIL_KEY).map(String::as_str), + Some("ghcr.io/acme/sandbox:latest") + ); +} + +#[test] +fn docker_pulled_event_advances_to_starting_progress() { + let mut metadata = HashMap::new(); + + attach_docker_progress_metadata( + &mut metadata, + "Pulled", + "Pulled Docker image \"ghcr.io/acme/sandbox:latest\"", + ); + + assert_eq!( + metadata.get(PROGRESS_COMPLETE_STEP_KEY).map(String::as_str), + Some(PROGRESS_STEP_PULLING_IMAGE) + ); + assert_eq!( + metadata + .get(PROGRESS_COMPLETE_LABEL_KEY) + .map(String::as_str), + Some("Image pulled") + ); + assert_eq!( + metadata.get(PROGRESS_ACTIVE_STEP_KEY).map(String::as_str), + Some(PROGRESS_STEP_STARTING_SANDBOX) + ); +} + +#[test] +fn docker_pull_progress_event_adds_layer_detail_metadata() { + let event = docker_pull_progress_event( + "ghcr.io/acme/sandbox:latest", + &CreateImageInfo { + id: Some("layer-1".to_string()), + status: Some("Downloading".to_string()), + progress_detail: Some(ProgressDetail { + current: Some(42 * 1024 * 1024), + total: Some(84 * 1024 * 1024), + }), + ..Default::default() + }, + ) + .expect("pull progress event"); + + assert_eq!(event.source, "docker"); + assert_eq!(event.reason, "PullingLayer"); + assert_eq!( + event + .metadata + .get(PROGRESS_ACTIVE_STEP_KEY) + .map(String::as_str), + Some(PROGRESS_STEP_PULLING_IMAGE) + ); + assert_eq!( + event + .metadata + .get(PROGRESS_ACTIVE_DETAIL_KEY) + .map(String::as_str), + Some("Downloading layer-1 (42 MB/84 MB)") + ); +} + +#[test] +fn pending_sandbox_snapshot_uses_docker_namespace_and_starting_condition() { + let sandbox = test_sandbox(); + + let snapshot = + pending_sandbox_snapshot(&sandbox, "docker-dev", provisioning_condition(), false); + + assert_eq!(snapshot.id, "sbx-123"); + assert_eq!(snapshot.name, "demo"); + assert_eq!(snapshot.namespace, "docker-dev"); + assert!(snapshot.spec.is_none()); + assert!(pending_sandbox_matches(&snapshot, "sbx-123", "")); + assert!(pending_sandbox_matches(&snapshot, "", "demo")); + + let status = snapshot.status.expect("status"); + assert!(!status.deleting); + assert_eq!(status.sandbox_name, "demo"); + assert_eq!(status.conditions.len(), 1); + assert_eq!(status.conditions[0].r#type, "Ready"); + assert_eq!(status.conditions[0].status, "False"); + assert_eq!(status.conditions[0].reason, "Starting"); + assert_eq!(status.conditions[0].message, "Docker container is starting"); +} + #[test] fn validate_linux_elf_binary_rejects_non_elf_files() { let tempdir = TempDir::new().unwrap();