mirror of
https://github.com/BetterCorp/BetterFrame.git
synced 2026-05-26 17:56:34 +00:00
feat(os-ota): resumable chunked download with Range header support
OS bundle download was buffering 1.2GB in RAM then writing → network timeout or memory pressure killed it. Now: Kiosk side: - Streams directly to /var/tmp/betterframe/ in 256KB chunks - On network error: resumes from last byte written (Range header) - Up to 5 retries with 10s backoff between attempts - Progress logged every ~50MB - sha256 verified on the complete file on disk (not in memory) Server side: - /api/kiosk/os/download/:id supports Range: bytes=N- header - Returns 206 Partial Content with Content-Range for resume - streamBundle accepts start/end for partial reads via createReadStream - Advertises Accept-Ranges: bytes on all responses
This commit is contained in:
parent
53739ada20
commit
595521db88
3 changed files with 159 additions and 41 deletions
|
|
@ -129,46 +129,132 @@ pub fn apply(server: &str, key: &str, info: &UpdateInfo) -> Result<(), String> {
|
||||||
info.version, info.size_bytes, info.release_id
|
info.version, info.size_bytes, info.release_id
|
||||||
);
|
);
|
||||||
|
|
||||||
// 1. Download
|
// 1. Download with chunked streaming + resume support.
|
||||||
|
// Streams directly to disk (no 1.2GB in RAM). On network failure,
|
||||||
|
// resumes from where it left off using Range header. Retries up to
|
||||||
|
// 5 times with 10s backoff between attempts.
|
||||||
let url = format!("{}{}", server, info.download_url);
|
let url = format!("{}{}", server, info.download_url);
|
||||||
let client = reqwest::blocking::Client::new();
|
|
||||||
let resp = client
|
|
||||||
.get(&url)
|
|
||||||
.header("Authorization", format!("Bearer {key}"))
|
|
||||||
.timeout(Duration::from_secs(600)) // OS bundles run hundreds of MB
|
|
||||||
.send()
|
|
||||||
.map_err(|e| format!("download request: {e}"))?;
|
|
||||||
if !resp.status().is_success() {
|
|
||||||
return Err(format!("download HTTP {}", resp.status()));
|
|
||||||
}
|
|
||||||
let bytes = resp.bytes().map_err(|e| format!("download body: {e}"))?;
|
|
||||||
if bytes.len() as u64 != info.size_bytes {
|
|
||||||
return Err(format!(
|
|
||||||
"size mismatch: expected {}, got {}",
|
|
||||||
info.size_bytes,
|
|
||||||
bytes.len()
|
|
||||||
));
|
|
||||||
}
|
|
||||||
|
|
||||||
// 2. sha256 (catch transport corruption; RAUC will re-verify the CMS
|
|
||||||
// signature separately when it opens the bundle).
|
|
||||||
let mut hasher = Sha256::new();
|
|
||||||
hasher.update(&bytes);
|
|
||||||
let digest = hasher.finalize();
|
|
||||||
let got_sha = hex_lower(&digest);
|
|
||||||
if got_sha != info.sha256 {
|
|
||||||
return Err(format!(
|
|
||||||
"sha256 mismatch: expected {}, got {}",
|
|
||||||
info.sha256, got_sha
|
|
||||||
));
|
|
||||||
}
|
|
||||||
|
|
||||||
// 3. Stage on disk for `rauc install` (it expects a file path, not a fd).
|
|
||||||
// /var/tmp survives /tmp's potential tmpfs size cap; bundles can be big.
|
|
||||||
let staging_dir = PathBuf::from("/var/tmp/betterframe");
|
let staging_dir = PathBuf::from("/var/tmp/betterframe");
|
||||||
fs::create_dir_all(&staging_dir).map_err(|e| format!("mkdir staging: {e}"))?;
|
fs::create_dir_all(&staging_dir).map_err(|e| format!("mkdir staging: {e}"))?;
|
||||||
let bundle_path = staging_dir.join(format!("os-{}.raucb", info.release_id));
|
let bundle_path = staging_dir.join(format!("os-{}.raucb", info.release_id));
|
||||||
fs::write(&bundle_path, &bytes).map_err(|e| format!("write bundle: {e}"))?;
|
|
||||||
|
let max_retries = 5;
|
||||||
|
for attempt in 1..=max_retries {
|
||||||
|
let existing_bytes = fs::metadata(&bundle_path)
|
||||||
|
.map(|m| m.len())
|
||||||
|
.unwrap_or(0);
|
||||||
|
|
||||||
|
// If we already have the full file from a previous attempt, skip download.
|
||||||
|
if existing_bytes >= info.size_bytes {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
info!(
|
||||||
|
"os-update: download attempt {attempt}/{max_retries} (resuming from {existing_bytes} / {} bytes)",
|
||||||
|
info.size_bytes
|
||||||
|
);
|
||||||
|
|
||||||
|
let client = reqwest::blocking::Client::new();
|
||||||
|
let mut req = client
|
||||||
|
.get(&url)
|
||||||
|
.header("Authorization", format!("Bearer {key}"));
|
||||||
|
if existing_bytes > 0 {
|
||||||
|
req = req.header("Range", format!("bytes={existing_bytes}-"));
|
||||||
|
}
|
||||||
|
|
||||||
|
let resp = match req.timeout(Duration::from_secs(300)).send() {
|
||||||
|
Ok(r) => r,
|
||||||
|
Err(e) => {
|
||||||
|
warn!("os-update: download request failed (attempt {attempt}): {e}");
|
||||||
|
if attempt < max_retries {
|
||||||
|
std::thread::sleep(Duration::from_secs(10));
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
return Err(format!("download failed after {max_retries} attempts: {e}"));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let status = resp.status().as_u16();
|
||||||
|
if status != 200 && status != 206 {
|
||||||
|
return Err(format!("download HTTP {status}"));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stream chunks to disk.
|
||||||
|
use std::io::Write;
|
||||||
|
let mut file = fs::OpenOptions::new()
|
||||||
|
.create(true)
|
||||||
|
.append(true)
|
||||||
|
.open(&bundle_path)
|
||||||
|
.map_err(|e| format!("open bundle file: {e}"))?;
|
||||||
|
|
||||||
|
let mut reader = resp;
|
||||||
|
let mut buf = vec![0u8; 256 * 1024]; // 256KB chunks
|
||||||
|
let mut downloaded = existing_bytes;
|
||||||
|
let mut stream_ok = true;
|
||||||
|
|
||||||
|
loop {
|
||||||
|
match std::io::Read::read(&mut reader, &mut buf) {
|
||||||
|
Ok(0) => break, // EOF
|
||||||
|
Ok(n) => {
|
||||||
|
file.write_all(&buf[..n]).map_err(|e| format!("write chunk: {e}"))?;
|
||||||
|
downloaded += n as u64;
|
||||||
|
// Log progress every ~50MB
|
||||||
|
if downloaded % (50 * 1024 * 1024) < (256 * 1024) as u64 {
|
||||||
|
info!("os-update: {downloaded} / {} bytes ({:.0}%)",
|
||||||
|
info.size_bytes,
|
||||||
|
(downloaded as f64 / info.size_bytes as f64) * 100.0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
warn!("os-update: stream error at {downloaded} bytes (attempt {attempt}): {e}");
|
||||||
|
stream_ok = false;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
file.sync_all().ok();
|
||||||
|
|
||||||
|
if stream_ok && downloaded >= info.size_bytes {
|
||||||
|
break; // Download complete
|
||||||
|
}
|
||||||
|
|
||||||
|
if attempt < max_retries {
|
||||||
|
info!("os-update: retrying in 10s...");
|
||||||
|
std::thread::sleep(Duration::from_secs(10));
|
||||||
|
} else {
|
||||||
|
return Err(format!("download incomplete after {max_retries} attempts ({downloaded}/{} bytes)", info.size_bytes));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2. sha256 verify the complete file on disk.
|
||||||
|
let file_size = fs::metadata(&bundle_path)
|
||||||
|
.map(|m| m.len())
|
||||||
|
.unwrap_or(0);
|
||||||
|
if file_size != info.size_bytes {
|
||||||
|
let _ = fs::remove_file(&bundle_path);
|
||||||
|
return Err(format!("size mismatch: expected {}, got {file_size}", info.size_bytes));
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut hasher = Sha256::new();
|
||||||
|
let mut f = fs::File::open(&bundle_path).map_err(|e| format!("open for hash: {e}"))?;
|
||||||
|
let mut buf = vec![0u8; 256 * 1024];
|
||||||
|
loop {
|
||||||
|
match std::io::Read::read(&mut f, &mut buf) {
|
||||||
|
Ok(0) => break,
|
||||||
|
Ok(n) => hasher.update(&buf[..n]),
|
||||||
|
Err(e) => {
|
||||||
|
let _ = fs::remove_file(&bundle_path);
|
||||||
|
return Err(format!("read for hash: {e}"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
drop(f);
|
||||||
|
let digest = hasher.finalize();
|
||||||
|
let got_sha = hex_lower(&digest);
|
||||||
|
if got_sha != info.sha256 {
|
||||||
|
let _ = fs::remove_file(&bundle_path);
|
||||||
|
return Err(format!("sha256 mismatch: expected {}, got {got_sha}"));
|
||||||
|
}
|
||||||
|
|
||||||
// 4. Hand off to rauc. `rauc install` blocks until the bundle is fully
|
// 4. Hand off to rauc. `rauc install` blocks until the bundle is fully
|
||||||
// copied into the inactive slot and bootloader is flipped. Exit code 0
|
// copied into the inactive slot and bootloader is flipped. Exit code 0
|
||||||
|
|
|
||||||
|
|
@ -858,11 +858,39 @@ function registerKioskRoutes(
|
||||||
}
|
}
|
||||||
|
|
||||||
const bundle = await osUpdates.streamBundle(release.artifact_path);
|
const bundle = await osUpdates.streamBundle(release.artifact_path);
|
||||||
|
const totalSize = bundle.size;
|
||||||
|
|
||||||
|
// Support Range requests for resumable downloads.
|
||||||
|
const rangeHeader = getRequestHeader(event, "range");
|
||||||
|
if (rangeHeader) {
|
||||||
|
const match = rangeHeader.match(/bytes=(\d+)-(\d*)/);
|
||||||
|
if (match) {
|
||||||
|
const start = Number(match[1]);
|
||||||
|
const end = match[2] ? Number(match[2]) : totalSize - 1;
|
||||||
|
if (start >= totalSize) {
|
||||||
|
return new Response(null, { status: 416, headers: { "content-range": `bytes */${totalSize}` } });
|
||||||
|
}
|
||||||
|
const rangeBundle = await osUpdates.streamBundle(release.artifact_path, start, end);
|
||||||
|
return new Response(rangeBundle.body, {
|
||||||
|
status: 206,
|
||||||
|
headers: {
|
||||||
|
"content-type": "application/vnd.rauc",
|
||||||
|
"content-length": String(end - start + 1),
|
||||||
|
"content-range": `bytes ${start}-${end}/${totalSize}`,
|
||||||
|
"accept-ranges": "bytes",
|
||||||
|
"x-bf-sha256": release.sha256,
|
||||||
|
"x-bf-version": release.version,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return new Response(bundle.body, {
|
return new Response(bundle.body, {
|
||||||
status: 200,
|
status: 200,
|
||||||
headers: {
|
headers: {
|
||||||
"content-type": "application/vnd.rauc",
|
"content-type": "application/vnd.rauc",
|
||||||
"content-length": String(bundle.size),
|
"content-length": String(totalSize),
|
||||||
|
"accept-ranges": "bytes",
|
||||||
"x-bf-sha256": release.sha256,
|
"x-bf-sha256": release.sha256,
|
||||||
"x-bf-version": release.version,
|
"x-bf-version": release.version,
|
||||||
"x-bf-compatibility": release.compatibility,
|
"x-bf-compatibility": release.compatibility,
|
||||||
|
|
|
||||||
|
|
@ -17,7 +17,7 @@ export interface OsUpdateApi {
|
||||||
storeBuffer(bytes: Buffer, expectedSha256?: string | null): Promise<StoredOsBundle>;
|
storeBuffer(bytes: Buffer, expectedSha256?: string | null): Promise<StoredOsBundle>;
|
||||||
storeFromUrl(url: string, expectedSha256?: string | null): Promise<StoredOsBundle>;
|
storeFromUrl(url: string, expectedSha256?: string | null): Promise<StoredOsBundle>;
|
||||||
readBundle(path: string, expectedSha256: string): Promise<Buffer>;
|
readBundle(path: string, expectedSha256: string): Promise<Buffer>;
|
||||||
streamBundle(path: string): Promise<{ body: ReadableStream<Uint8Array>; size: number }>;
|
streamBundle(path: string, start?: number, end?: number): Promise<{ body: ReadableStream<Uint8Array>; size: number }>;
|
||||||
removeBundle(path: string): Promise<void>;
|
removeBundle(path: string): Promise<void>;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -99,10 +99,14 @@ export function initOsUpdates(config: OsUpdateConfig): OsUpdateApi {
|
||||||
return buf;
|
return buf;
|
||||||
}
|
}
|
||||||
|
|
||||||
async function streamBundle(path: string): Promise<{ body: ReadableStream<Uint8Array>; size: number }> {
|
async function streamBundle(path: string, start?: number, end?: number): Promise<{ body: ReadableStream<Uint8Array>; size: number }> {
|
||||||
const size = (await stat(path)).size;
|
const totalSize = (await stat(path)).size;
|
||||||
|
const opts = (start != null || end != null)
|
||||||
|
? { start: start ?? 0, end: end ?? totalSize - 1 }
|
||||||
|
: undefined;
|
||||||
|
const size = opts ? (opts.end - opts.start + 1) : totalSize;
|
||||||
return {
|
return {
|
||||||
body: Readable.toWeb(createReadStream(path)) as ReadableStream<Uint8Array>,
|
body: Readable.toWeb(createReadStream(path, opts)) as ReadableStream<Uint8Array>,
|
||||||
size,
|
size,
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue