Skip to content

Commit 54157cd

Browse files
committed
feat: support blob v2 compaction workflow
- update lance submodule to include blob v2 compaction and zero-copy support - add sf-cli db commands for blob compaction e2e testing and audio verification - enable songs table compaction in backend table compactor - move blob storage optimization writeup to docs and add compaction deep dive
1 parent 925942e commit 54157cd

11 files changed

Lines changed: 985 additions & 12 deletions

File tree

backend/src/state.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use std::{
2-
collections::HashMap,
2+
collections::{HashMap, HashSet},
33
env,
44
sync::Arc,
55
time::{Duration, Instant},
@@ -406,9 +406,7 @@ fn spawn_table_compactor(
406406
let config = CompactConfig {
407407
fragment_threshold: threshold,
408408
prune_older_than_hours: 2,
409-
// songs table uses blob v2 encoding (data_storage_version=2.2) which
410-
// the current lance version cannot compact yet.
411-
skip_tables: ["songs"].iter().map(|s| s.to_string()).collect(),
409+
skip_tables: HashSet::new(),
412410
};
413411

414412
tokio::spawn(async move {

cli/src/cli.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -524,4 +524,22 @@ pub enum DbCommands {
524524
/// Target version number.
525525
version: u64,
526526
},
527+
/// Run blob v2 compaction e2e test with synthetic data.
528+
TestBlobCompact {
529+
/// Number of synthetic songs to insert (default: 5).
530+
#[arg(long, default_value = "5")]
531+
count: usize,
532+
/// Size of each synthetic audio blob in bytes (default: 5MB).
533+
#[arg(long, default_value = "5242880")]
534+
blob_size: usize,
535+
},
536+
/// Verify audio data retrieval for songs in the database.
537+
VerifyAudio {
538+
/// Only verify specific song IDs (comma-separated).
539+
#[arg(long)]
540+
ids: Option<String>,
541+
/// Maximum number of songs to verify (default: all).
542+
#[arg(long)]
543+
limit: Option<usize>,
544+
},
527545
}

cli/src/commands/db_manage.rs

Lines changed: 231 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1215,3 +1215,234 @@ fn downcast_timestamp_ms<'a>(
12151215
.downcast_ref::<TimestampMillisecondArray>()
12161216
.ok_or_else(|| anyhow!("column `{column}` is not TimestampMillisecondArray"))
12171217
}
1218+
1219+
// ---------------------------------------------------------------------------
1220+
// Blob V2 Compaction E2E Test
1221+
// ---------------------------------------------------------------------------
1222+
1223+
pub async fn test_blob_compact(db_path: &Path, count: usize, blob_size: usize) -> Result<()> {
1224+
use std::time::Instant;
1225+
1226+
use static_flow_shared::music_store::{MusicDataStore, SongRecord};
1227+
1228+
if count == 0 {
1229+
bail!("count must be at least 1");
1230+
}
1231+
1232+
let test_dir = db_path.join("_test_blob_compact");
1233+
if test_dir.exists() {
1234+
std::fs::remove_dir_all(&test_dir)?;
1235+
}
1236+
std::fs::create_dir_all(&test_dir)?;
1237+
let db_uri = test_dir.to_string_lossy().to_string();
1238+
1239+
tracing::info!(
1240+
"=== Blob V2 Compaction E2E Test ===\n songs: {count}\n blob_size: {} bytes\n test_db: \
1241+
{db_uri}",
1242+
blob_size
1243+
);
1244+
1245+
let store = MusicDataStore::connect(&db_uri).await?;
1246+
let now_ms = chrono::Utc::now().timestamp_millis();
1247+
1248+
// Step 1: insert N songs (each as separate fragment)
1249+
let start = Instant::now();
1250+
for i in 0..count {
1251+
let record = SongRecord {
1252+
id: format!("test-song-{i:04}"),
1253+
title: format!("Test Song {i}"),
1254+
artist: "CompactionTest".into(),
1255+
album: "BlobV2Test".into(),
1256+
album_id: None,
1257+
cover_image: None,
1258+
duration_ms: 180_000,
1259+
format: "mp3".into(),
1260+
bitrate: 320,
1261+
lyrics_lrc: None,
1262+
lyrics_translation: None,
1263+
audio_data: vec![(i as u8).wrapping_add(42); blob_size],
1264+
source: "test".into(),
1265+
source_id: None,
1266+
tags: "test,compaction".into(),
1267+
searchable_text: format!("Test Song {i} CompactionTest"),
1268+
vector_en: None,
1269+
vector_zh: None,
1270+
created_at: now_ms,
1271+
updated_at: now_ms,
1272+
};
1273+
store.upsert_song(&record).await?;
1274+
}
1275+
let insert_ms = start.elapsed().as_millis();
1276+
tracing::info!("Inserted {count} songs in {insert_ms}ms");
1277+
1278+
// Step 2: count fragments before compaction
1279+
let table = store.connection().open_table("songs").execute().await?;
1280+
let ds_before = table.dataset().context("no dataset")?;
1281+
let frags_before = ds_before.get().await?.get_fragments().len();
1282+
tracing::info!("Fragments before compact: {frags_before}");
1283+
1284+
// Step 3: compact
1285+
let compact_start = Instant::now();
1286+
table.optimize(OptimizeAction::All).await?;
1287+
let compact_ms = compact_start.elapsed().as_millis();
1288+
tracing::info!("Compact completed in {compact_ms}ms");
1289+
1290+
// Step 4: fragments after
1291+
let ds_after = table.dataset().context("no dataset")?;
1292+
let frags_after = ds_after.get().await?.get_fragments().len();
1293+
tracing::info!("Fragments after compact: {frags_after}");
1294+
1295+
// Step 5: prune
1296+
table
1297+
.optimize(OptimizeAction::Prune {
1298+
older_than: Some(ChronoDuration::zero()),
1299+
delete_unverified: Some(true),
1300+
error_if_tagged_old_versions: Some(false),
1301+
})
1302+
.await?;
1303+
tracing::info!("Prune completed");
1304+
1305+
// Step 6: verify audio data integrity
1306+
let mut pass = 0usize;
1307+
let mut fail = 0usize;
1308+
for i in 0..count {
1309+
let id = format!("test-song-{i:04}");
1310+
match store.get_song_audio(&id).await {
1311+
Ok(Some((data, fmt))) => {
1312+
let expected_byte = (i as u8).wrapping_add(42);
1313+
let size_ok = data.len() == blob_size;
1314+
let content_ok = data[0] == expected_byte && data[data.len() - 1] == expected_byte;
1315+
let fmt_ok = fmt == "mp3";
1316+
if size_ok && content_ok && fmt_ok {
1317+
pass += 1;
1318+
} else {
1319+
fail += 1;
1320+
tracing::error!(
1321+
"FAIL {id}: size={}/{blob_size} first_byte={}/{expected_byte} format={fmt}",
1322+
data.len(),
1323+
data[0]
1324+
);
1325+
}
1326+
},
1327+
Ok(None) => {
1328+
fail += 1;
1329+
tracing::error!("FAIL {id}: audio not found");
1330+
},
1331+
Err(err) => {
1332+
fail += 1;
1333+
tracing::error!("FAIL {id}: {err}");
1334+
},
1335+
}
1336+
}
1337+
1338+
// Step 7: verify metadata
1339+
for i in 0..count {
1340+
let id = format!("test-song-{i:04}");
1341+
match store.get_song(&id).await {
1342+
Ok(Some(detail)) => {
1343+
if detail.title != format!("Test Song {i}") || detail.artist != "CompactionTest" {
1344+
fail += 1;
1345+
tracing::error!(
1346+
"FAIL {id}: metadata mismatch title={} artist={}",
1347+
detail.title,
1348+
detail.artist
1349+
);
1350+
}
1351+
},
1352+
Ok(None) => {
1353+
fail += 1;
1354+
tracing::error!("FAIL {id}: metadata not found");
1355+
},
1356+
Err(err) => {
1357+
fail += 1;
1358+
tracing::error!("FAIL {id}: metadata error {err}");
1359+
},
1360+
}
1361+
}
1362+
1363+
// Cleanup
1364+
if let Err(err) = std::fs::remove_dir_all(&test_dir) {
1365+
tracing::warn!("Failed to cleanup test dir: {err}");
1366+
}
1367+
1368+
let total_ms = start.elapsed().as_millis();
1369+
tracing::info!(
1370+
"\n=== Result ===\n pass: {pass}/{count}\n fail: {fail}\n fragments: {frags_before} -> \
1371+
{frags_after}\n total: {total_ms}ms"
1372+
);
1373+
1374+
if fail > 0 {
1375+
bail!("{fail} verification(s) failed");
1376+
}
1377+
Ok(())
1378+
}
1379+
1380+
// ---------------------------------------------------------------------------
1381+
// Verify Audio Data Retrieval
1382+
// ---------------------------------------------------------------------------
1383+
1384+
pub async fn verify_audio(db_path: &Path, ids: Option<String>, limit: Option<usize>) -> Result<()> {
1385+
use static_flow_shared::music_store::MusicDataStore;
1386+
1387+
let db_uri = db_path.to_string_lossy().to_string();
1388+
let store = MusicDataStore::connect(&db_uri).await?;
1389+
1390+
let target_ids: Vec<String> = match ids {
1391+
Some(ref csv) => csv.split(',').map(|s| s.trim().to_string()).collect(),
1392+
None => {
1393+
// Query all song IDs from the table
1394+
let db = connect_db(db_path).await?;
1395+
let table = open_table(&db, "songs").await?;
1396+
let mut query = table.query();
1397+
if let Some(lim) = limit {
1398+
query = query.limit(lim);
1399+
}
1400+
let stream = query
1401+
.select(Select::columns(&["id"]))
1402+
.execute()
1403+
.await
1404+
.map_err(|err| anyhow!("failed to query songs: {err}"))?;
1405+
let batches = stream.try_collect::<Vec<_>>().await?;
1406+
let mut all_ids = Vec::new();
1407+
for batch in &batches {
1408+
let id_col = downcast_string(batch, "id")?;
1409+
for row in 0..batch.num_rows() {
1410+
all_ids.push(id_col.value(row).to_string());
1411+
}
1412+
}
1413+
all_ids
1414+
},
1415+
};
1416+
1417+
if target_ids.is_empty() {
1418+
tracing::info!("No songs to verify.");
1419+
return Ok(());
1420+
}
1421+
1422+
tracing::info!("Verifying audio for {} song(s)...", target_ids.len());
1423+
1424+
let mut ok = 0usize;
1425+
let mut err_count = 0usize;
1426+
for id in &target_ids {
1427+
match store.get_song_audio(id).await {
1428+
Ok(Some((data, fmt))) => {
1429+
tracing::info!(" ✓ {id}: {} bytes, format={fmt}", data.len());
1430+
ok += 1;
1431+
},
1432+
Ok(None) => {
1433+
tracing::error!(" ✗ {id}: audio not found");
1434+
err_count += 1;
1435+
},
1436+
Err(err) => {
1437+
tracing::error!(" ✗ {id}: {err}");
1438+
err_count += 1;
1439+
},
1440+
}
1441+
}
1442+
1443+
tracing::info!("{ok}/{} songs verified OK", target_ids.len());
1444+
if err_count > 0 {
1445+
bail!("{err_count} song(s) failed audio verification");
1446+
}
1447+
Ok(())
1448+
}

cli/src/commands/mod.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,14 @@ pub async fn run(cli: Cli) -> Result<()> {
290290
table,
291291
version,
292292
} => db_manage::restore_table(&db_path, &table, version).await,
293+
DbCommands::TestBlobCompact {
294+
count,
295+
blob_size,
296+
} => db_manage::test_blob_compact(&db_path, count, blob_size).await,
297+
DbCommands::VerifyAudio {
298+
ids,
299+
limit,
300+
} => db_manage::verify_audio(&db_path, ids, limit).await,
293301
},
294302
}
295303
}

cli/src/db.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
use std::{collections::HashSet, path::Path, sync::Arc};
22

33
use anyhow::{Context, Result};
4-
use arrow_array::{new_null_array, Array, ArrayRef, RecordBatch, RecordBatchIterator, RecordBatchReader, StringArray};
4+
use arrow_array::{
5+
new_null_array, Array, ArrayRef, RecordBatch, RecordBatchIterator, RecordBatchReader,
6+
StringArray,
7+
};
58
use arrow_schema::{DataType, Schema};
69
use futures::TryStreamExt;
710
use lancedb::{
@@ -33,7 +36,9 @@ pub async fn ensure_table(db: &Connection, name: &str, schema: Arc<Schema>) -> R
3336
Err(_) => {
3437
let batch = RecordBatch::new_empty(schema.clone());
3538
let batches = RecordBatchIterator::new(vec![Ok(batch)].into_iter(), schema.clone());
36-
db.create_table(name, Box::new(batches) as Box<dyn RecordBatchReader + Send>).execute().await?;
39+
db.create_table(name, Box::new(batches) as Box<dyn RecordBatchReader + Send>)
40+
.execute()
41+
.await?;
3742
db.open_table(name).execute().await?
3843
},
3944
};

0 commit comments

Comments
 (0)