ref:main
use crate::runner::artifacts;
use crate::runner::executor::{self, ExecResult};
use crate::runner::heartbeat;
use crate::runner::log_reporter::LogReporter;
use crate::runner::prepare;
use crate::runner::service_manager;
use crate::runner::workspace;
use crate::runner::RunnerConfig;
use reqwest::header::{HeaderValue, AUTHORIZATION, CONTENT_TYPE};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::Notify;
const MIN_BACKOFF_MS: u64 = 5_000;
const MAX_BACKOFF_MS: u64 = 30_000;
/// Start the runner main loop.
pub async fn start(config: RunnerConfig) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// Validate connection
eprintln!("Connecting to {}...", config.server_url);
heartbeat::send_once(&config).await?;
eprintln!("Connected. Runner '{}' ready.", config.name);
// Prune prepared images accumulated from past runs — keep 10 most recent,
// drop anything older than 30 days.
prepare::prune_prepared_images(30, 10);
// Start heartbeat
let heartbeat_handle = heartbeat::start(&config);
// Shutdown signal
let shutdown = Arc::new(Notify::new());
let shutdown_clone = shutdown.clone();
tokio::spawn(async move {
tokio::signal::ctrl_c()
.await
.expect("failed to listen for ctrl+c");
eprintln!("\nShutting down...");
shutdown_clone.notify_waiters();
});
// Run polling slots
let mut handles = Vec::new();
for slot in 1..=config.parallel {
let cfg = config.clone();
let stop = shutdown.clone();
handles.push(tokio::spawn(async move {
poller_loop(cfg, slot, stop).await;
}));
}
// Wait for shutdown signal
shutdown.notified().await;
// Grace period: wait for active jobs (up to 30s)
eprintln!("Waiting for active jobs to finish (30s grace period)...");
let grace = tokio::time::timeout(
std::time::Duration::from_secs(30),
futures::future::join_all(handles),
)
.await;
if grace.is_err() {
eprintln!("Grace period expired, forcing shutdown.");
}
// Stop heartbeat
heartbeat_handle.abort();
// Final heartbeat
let _ = heartbeat::send_once(&config).await;
eprintln!("Runner stopped.");
Ok(())
}
async fn poller_loop(config: RunnerConfig, slot: u32, shutdown: Arc<Notify>) {
let client = reqwest::Client::new();
let mut backoff_ms = MIN_BACKOFF_MS;
loop {
// Check shutdown
let poll_result = tokio::select! {
_ = shutdown.notified() => return,
result = claim_job(&client, &config) => result,
};
match poll_result {
Ok(Some(job)) => {
backoff_ms = MIN_BACKOFF_MS; // Reset backoff
let job_id = job
.get("id")
.and_then(|v| v.as_str())
.unwrap_or("unknown")
.to_string();
eprintln!(
"[slot {slot}] Claimed job {}",
&job_id[..8.min(job_id.len())]
);
let result = execute_job(&config, &job, slot).await;
report_result(&client, &config, &job_id, result).await;
// Handle --once / --ephemeral
if config.once || config.ephemeral {
if config.ephemeral {
let _ = deregister(&client, &config).await;
}
eprintln!(
"[slot {slot}] Exiting ({})",
if config.ephemeral {
"ephemeral"
} else {
"once"
}
);
std::process::exit(0);
}
}
Ok(None) => {
// No jobs available — backoff
tokio::select! {
_ = shutdown.notified() => return,
_ = tokio::time::sleep(std::time::Duration::from_millis(backoff_ms)) => {},
}
backoff_ms = (backoff_ms * 2).min(MAX_BACKOFF_MS);
}
Err(e) => {
eprintln!("[slot {slot}] Poll error: {e}");
drop(e); // Drop before await to satisfy Send bound
tokio::select! {
_ = shutdown.notified() => return,
_ = tokio::time::sleep(std::time::Duration::from_millis(backoff_ms)) => {},
}
backoff_ms = (backoff_ms * 2).min(MAX_BACKOFF_MS);
}
}
}
}
async fn claim_job(
client: &reqwest::Client,
config: &RunnerConfig,
) -> Result<Option<serde_json::Value>, Box<dyn std::error::Error + Send + Sync>> {
let url = config.api_url(&format!("/runners/{}/jobs/claim", config.runner_id));
let resp = client
.post(&url)
.header(
AUTHORIZATION,
HeaderValue::from_str(&config.auth_header()).unwrap(),
)
.send()
.await?;
match resp.status().as_u16() {
200 => {
let body: serde_json::Value = resp.json().await?;
let job = body.get("job").cloned().or(Some(body));
Ok(job)
}
204 => Ok(None),
status => {
let text = resp.text().await.unwrap_or_default();
Err(format!("claim failed ({status}): {text}").into())
}
}
}
async fn execute_job(
config: &RunnerConfig,
job: &serde_json::Value,
slot: u32,
) -> Result<ExecResult, Box<dyn std::error::Error + Send + Sync>> {
let job_id = job.get("id").and_then(|v| v.as_str()).unwrap_or("unknown");
let command = job
.get("command")
.and_then(|v| v.as_str())
.ok_or("job missing 'command'")?;
let image = job.get("image").and_then(|v| v.as_str());
let prepare_cmds: Vec<String> = job
.get("prepare")
.and_then(|v| v.as_array())
.map(|arr| {
arr.iter()
.filter_map(|v| v.as_str().map(String::from))
.collect()
})
.unwrap_or_default();
let repo_url = job.get("repo_clone_url").and_then(|v| v.as_str());
let commit_sha = job.get("commit_sha").and_then(|v| v.as_str());
let timeout_seconds = job.get("timeout_seconds").and_then(|v| v.as_u64());
// Update status to running
let client = reqwest::Client::new();
update_job_status(&client, config, job_id, "running", None).await;
// Set up log reporter
let log_reporter = LogReporter::new(&config.server_url, job_id, &config.runner_token);
let flush_handle = log_reporter.start_flush_timer();
// Prepare workspace
let workspace_path = if let (Some(url), Some(sha)) = (repo_url, commit_sha) {
match workspace::prepare(&config.work_dir_path(), url, sha, slot) {
Ok(ws) => Some(ws),
Err(e) => {
log_reporter
.append(&format!("Workspace preparation failed: {e}"))
.await;
log_reporter.flush().await;
flush_handle.abort();
return Err(e);
}
}
} else {
None
};
let ws = workspace_path
.as_deref()
.unwrap_or_else(|| std::path::Path::new("."));
// Start services
let services = job.get("services");
let svc_context = if let Some(svcs) = services {
if svcs.is_object() && !svcs.as_object().unwrap().is_empty() {
match service_manager::start_services(job_id, svcs).await {
Ok(ctx) => Some(ctx),
Err(e) => {
log_reporter
.append(&format!("Service startup failed: {e}"))
.await;
log_reporter.flush().await;
flush_handle.abort();
return Err(e);
}
}
} else {
None
}
} else {
None
};
// Build env vars from job + services
let mut env = HashMap::new();
if let Some(job_env) = job.get("env").and_then(|v| v.as_object()) {
for (k, v) in job_env {
env.insert(k.clone(), v.as_str().unwrap_or("").to_string());
}
}
if let Some(ref ctx) = svc_context {
env.extend(ctx.env_vars.clone());
}
let network = svc_context.as_ref().and_then(|ctx| {
if ctx.network.is_empty() {
None
} else {
Some(ctx.network.as_str())
}
});
// Build (or reuse) a prepared image if the pipeline specified a prepare block.
// The prepared image bakes apt-get/rustup/etc. so each step doesn't re-install
// its toolchain. Fail loudly if prepare is requested but can't complete —
// silently falling back to the bare base image is what caused issue #49.
let prepared_tag: Option<String> = if !prepare_cmds.is_empty() {
match image {
Some(base) => match prepare::prepare_image(base, &prepare_cmds, &log_reporter).await {
Ok(tag) => Some(tag),
Err(e) => {
log_reporter.append(&format!("Prepare failed: {e}")).await;
log_reporter.flush().await;
if let Some(ctx) = &svc_context {
service_manager::cleanup_services(&ctx.containers, &ctx.network);
}
flush_handle.abort();
return Err(e);
}
},
None => {
let msg =
"Prepare block was provided but step has no 'image' — prepare requires a container image".to_string();
log_reporter.append(&msg).await;
log_reporter.flush().await;
if let Some(ctx) = &svc_context {
service_manager::cleanup_services(&ctx.containers, &ctx.network);
}
flush_handle.abort();
return Err(msg.into());
}
}
} else {
None
};
let effective_image = prepared_tag.as_deref().or(image);
// Execute
let result = executor::execute(
command,
effective_image,
ws,
&env,
network,
timeout_seconds,
&log_reporter,
)
.await;
// Upload artifacts (before cleanup, while workspace still exists)
let artifact_specs = artifacts::parse_specs(job);
if !artifact_specs.is_empty() {
log_reporter
.append(&format!(
"Uploading {} artifact(s)...",
artifact_specs.len()
))
.await;
let count = artifacts::upload_artifacts(config, job_id, ws, &artifact_specs).await;
log_reporter
.append(&format!("Uploaded {count} artifact(s)"))
.await;
}
// Cleanup services
if let Some(ctx) = &svc_context {
service_manager::cleanup_services(&ctx.containers, &ctx.network);
}
// Cleanup workspace
if let Some(ref ws_path) = workspace_path {
match config.cleanup.as_str() {
"always" => workspace::cleanup(ws_path),
"on-success" => {
if let Ok(ref r) = result {
if r.exit_code == 0 {
workspace::cleanup(ws_path);
}
}
}
_ => {} // "never" — keep for caching
}
}
// Stop flush timer
flush_handle.abort();
result
}
async fn report_result(
client: &reqwest::Client,
config: &RunnerConfig,
job_id: &str,
result: Result<ExecResult, Box<dyn std::error::Error + Send + Sync>>,
) {
let (status, exit_code) = match result {
Ok(r) => {
let s = if r.exit_code == 0 { "passed" } else { "failed" };
(s, Some(r.exit_code))
}
Err(e) => {
eprintln!("Job execution error: {e}");
("failed", Some(1))
}
};
eprintln!(
"Job {} {status} (exit_code: {})",
&job_id[..8.min(job_id.len())],
exit_code.unwrap_or(-1)
);
update_job_status(client, config, job_id, status, exit_code).await;
}
async fn update_job_status(
client: &reqwest::Client,
config: &RunnerConfig,
job_id: &str,
status: &str,
exit_code: Option<i32>,
) {
let url = config.api_url(&format!("/runners/jobs/{job_id}/status"));
let mut body = serde_json::json!({ "status": status });
if let Some(code) = exit_code {
body["exit_code"] = serde_json::json!(code);
}
let result = client
.patch(&url)
.header(
AUTHORIZATION,
HeaderValue::from_str(&config.auth_header()).unwrap(),
)
.header(CONTENT_TYPE, HeaderValue::from_static("application/json"))
.json(&body)
.send()
.await;
if let Err(e) = result {
eprintln!("failed to update job status: {e}");
}
}
async fn deregister(
client: &reqwest::Client,
config: &RunnerConfig,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let url = config.api_url(&format!("/runners/{}", config.runner_id));
client
.delete(&url)
.header(
AUTHORIZATION,
HeaderValue::from_str(&config.auth_header()).unwrap(),
)
.send()
.await?;
Ok(())
}