use crate::runner::log_reporter::LogReporter;
use std::collections::HashMap;
use std::path::Path;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::Command;
const DEFAULT_TIMEOUT_SECS: u64 = 600;
pub struct ExecResult {
pub exit_code: i32,
}
pub async fn execute(
command: &str,
image: Option<&str>,
workspace: &Path,
env: &HashMap<String, String>,
network: Option<&str>,
timeout_seconds: Option<u64>,
log_reporter: &LogReporter,
) -> Result<ExecResult, Box<dyn std::error::Error + Send + Sync>> {
let timeout = timeout_seconds.unwrap_or(DEFAULT_TIMEOUT_SECS);
match image {
Some(img) => {
eprintln!("Executing (docker): {img} — {command}");
execute_docker(command, img, workspace, env, network, timeout, log_reporter).await
}
None => {
eprintln!("Executing (bare): {command}");
execute_bare(command, workspace, env, timeout, log_reporter).await
}
}
}
async fn execute_docker(
command: &str,
image: &str,
workspace: &Path,
env: &HashMap<String, String>,
network: Option<&str>,
timeout: u64,
log_reporter: &LogReporter,
) -> Result<ExecResult, Box<dyn std::error::Error + Send + Sync>> {
if !image.starts_with("anvil-prepared:") {
ensure_registry_login(image, env, log_reporter).await;
eprintln!("Pulling image: {image}");
log_reporter
.append(&format!("Pulling image: {image}"))
.await;
let pull_output = std::process::Command::new("docker")
.args(["pull", image])
.output();
match pull_output {
Ok(output) if output.status.success() => {
eprintln!("Image ready: {image}");
}
Ok(output) => {
let stderr = String::from_utf8_lossy(&output.stderr);
let msg = format!("Warning: docker pull failed: {stderr}");
eprintln!("{msg}");
log_reporter.append(&msg).await;
}
Err(e) => {
let msg = format!("Warning: docker pull error: {e}");
eprintln!("{msg}");
log_reporter.append(&msg).await;
}
}
} else {
eprintln!("Using local prepared image: {image}");
}
let workspace_str = workspace.to_str().unwrap_or(".");
let mut args = vec![
"run".to_string(),
"--rm".into(),
"--add-host=host.docker.internal:host-gateway".into(),
];
if let Some(net) = network {
args.push("--network".into());
args.push(net.to_string());
}
for (k, v) in env {
args.push("-e".into());
args.push(format!("{k}={v}"));
}
args.push("-e".into());
args.push("CI=true".into());
args.push("-e".into());
args.push("ANVIL_CI=true".into());
args.push("-e".into());
args.push("ANVIL_WORKSPACE=/workspace".into());
args.push("-v".into());
args.push(format!("{workspace_str}:/workspace"));
args.push("-w".into());
args.push("/workspace".into());
let uid = unsafe { libc::getuid() };
let gid = unsafe { libc::getgid() };
let wrapped_command = format!(
"({command})\n_exit_code=$?\nchown -R {uid}:{gid} /workspace 2>/dev/null || true\nexit $_exit_code"
);
args.push(image.to_string());
args.push("/bin/sh".into());
args.push("-c".into());
args.push(wrapped_command);
run_and_stream(
"docker",
&args,
workspace,
&HashMap::new(),
timeout,
log_reporter,
)
.await
}
async fn execute_bare(
command: &str,
workspace: &Path,
env: &HashMap<String, String>,
timeout: u64,
log_reporter: &LogReporter,
) -> Result<ExecResult, Box<dyn std::error::Error + Send + Sync>> {
let mut full_env = env.clone();
full_env.insert(
"ANVIL_WORKSPACE".into(),
workspace.to_str().unwrap_or(".").into(),
);
full_env.insert("CI".into(), "true".into());
full_env.insert("ANVIL_CI".into(), "true".into());
run_and_stream(
"/bin/sh",
&["-c".to_string(), command.to_string()],
workspace,
&full_env,
timeout,
log_reporter,
)
.await
}
async fn ensure_registry_login(
image: &str,
env: &HashMap<String, String>,
log_reporter: &LogReporter,
) {
let registry = match env.get("ANVIL_REGISTRY") {
Some(r) if !r.is_empty() => r,
_ => return,
};
let token = match env.get("ANVIL_REGISTRY_TOKEN") {
Some(t) if !t.is_empty() => t,
_ => return,
};
let user = env
.get("ANVIL_REGISTRY_USER")
.map(|s| s.as_str())
.unwrap_or("anvil");
if !image.starts_with(registry.as_str()) {
return;
}
eprintln!("Logging in to registry: {registry}");
log_reporter
.append(&format!("Logging in to registry: {registry}"))
.await;
let result = std::process::Command::new("docker")
.args(["login", registry, "-u", user, "--password-stdin"])
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()
.and_then(|mut child| {
use std::io::Write;
if let Some(ref mut stdin) = child.stdin {
stdin.write_all(token.as_bytes())?;
}
child.wait_with_output()
});
match result {
Ok(output) if output.status.success() => {
eprintln!("Registry login succeeded");
}
Ok(output) => {
let stderr = String::from_utf8_lossy(&output.stderr);
let msg = format!("Warning: registry login failed: {stderr}");
eprintln!("{msg}");
log_reporter.append(&msg).await;
}
Err(e) => {
let msg = format!("Warning: registry login error: {e}");
eprintln!("{msg}");
log_reporter.append(&msg).await;
}
}
}
async fn run_and_stream(
program: &str,
args: &[String],
cwd: &Path,
env: &HashMap<String, String>,
timeout: u64,
log_reporter: &LogReporter,
) -> Result<ExecResult, Box<dyn std::error::Error + Send + Sync>> {
let mut cmd = Command::new(program);
cmd.args(args)
.current_dir(cwd)
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped());
for (k, v) in env {
cmd.env(k, v);
}
let mut child = cmd.spawn()?;
let reporter_out = log_reporter.clone();
let stdout = child.stdout.take();
let stdout_task = tokio::spawn(async move {
if let Some(stdout) = stdout {
let mut lines = BufReader::new(stdout).lines();
while let Ok(Some(line)) = lines.next_line().await {
reporter_out.append(&line).await;
}
}
});
let reporter_err = log_reporter.clone();
let stderr = child.stderr.take();
let stderr_task = tokio::spawn(async move {
if let Some(stderr) = stderr {
let mut lines = BufReader::new(stderr).lines();
while let Ok(Some(line)) = lines.next_line().await {
reporter_err.append(&line).await;
}
}
});
let result = tokio::time::timeout(std::time::Duration::from_secs(timeout), async {
let _ = stdout_task.await;
let _ = stderr_task.await;
child.wait().await
})
.await;
log_reporter.flush().await;
match result {
Ok(Ok(status)) => Ok(ExecResult {
exit_code: status.code().unwrap_or(1),
}),
Ok(Err(e)) => Err(format!("process error: {e}").into()),
Err(_) => {
let _ = child.kill().await;
log_reporter.append("Job timed out, killed").await;
log_reporter.flush().await;
Ok(ExecResult { exit_code: 1 })
}
}
}