ref:main
use reqwest::header::{HeaderValue, AUTHORIZATION, CONTENT_TYPE};
use std::sync::Arc;
use tokio::sync::Mutex;
const MAX_BUFFER_LINES: usize = 100;
const MAX_RETRIES: usize = 2;
#[derive(Clone)]
pub struct LogReporter {
client: reqwest::Client,
url: String,
auth: String,
buffer: Arc<Mutex<Vec<String>>>,
}
impl LogReporter {
pub fn new(server_url: &str, job_id: &str, runner_token: &str) -> Self {
let base = server_url.trim_end_matches('/');
Self {
client: reqwest::Client::new(),
url: format!("{base}/api/v1/runners/jobs/{job_id}/logs"),
auth: format!("Bearer {runner_token}"),
buffer: Arc::new(Mutex::new(Vec::new())),
}
}
/// Append raw output data (will be split on newlines).
pub async fn append(&self, data: &str) {
let lines: Vec<String> = data.lines().map(|l| l.to_string()).collect();
let should_flush = {
let mut buf = self.buffer.lock().await;
buf.extend(lines);
buf.len() >= MAX_BUFFER_LINES
};
if should_flush {
self.flush().await;
}
}
/// Flush buffered lines to the server.
pub async fn flush(&self) {
let lines = {
let mut buf = self.buffer.lock().await;
if buf.is_empty() {
return;
}
std::mem::take(&mut *buf)
};
self.send_lines(&lines).await;
}
async fn send_lines(&self, lines: &[String]) {
for attempt in 0..=MAX_RETRIES {
let result = self
.client
.post(&self.url)
.header(AUTHORIZATION, HeaderValue::from_str(&self.auth).unwrap())
.header(CONTENT_TYPE, HeaderValue::from_static("application/json"))
.json(&serde_json::json!({ "lines": lines }))
.send()
.await;
match result {
Ok(resp) if resp.status().is_success() => return,
Ok(resp) => {
if attempt < MAX_RETRIES {
eprintln!("log upload failed (status {}), retrying...", resp.status());
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
} else {
eprintln!(
"log upload failed after {} retries, dropping lines",
MAX_RETRIES
);
}
}
Err(e) => {
if attempt < MAX_RETRIES {
eprintln!("log upload error: {e}, retrying...");
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
} else {
eprintln!("log upload error after {} retries: {e}", MAX_RETRIES);
}
}
}
}
}
/// Start a background flush timer. Cancel by aborting the returned handle.
pub fn start_flush_timer(&self) -> tokio::task::JoinHandle<()> {
let reporter = self.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_millis(1000));
loop {
interval.tick().await;
reporter.flush().await;
}
})
}
}