Skip to content

Commit 1545af7

Browse files
committed
fix: proper listing of hottier
1 parent f7b01e3 commit 1545af7

2 files changed

Lines changed: 53 additions & 39 deletions

File tree

src/hottier.rs

Lines changed: 50 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,8 @@ impl HotTierManager {
199199
.await?;
200200

201201
let mut stream_hot_tier: StreamHotTier = serde_json::from_slice(&bytes)?;
202-
stream_hot_tier.oldest_date_time_entry = self.get_oldest_date_time_entry(stream).await?;
202+
stream_hot_tier.oldest_date_time_entry =
203+
self.get_oldest_date_time_entry(stream, tenant_id).await?;
203204

204205
Ok(stream_hot_tier)
205206
}
@@ -450,15 +451,15 @@ impl HotTierManager {
450451
self.put_hot_tier(stream, &mut stream_hot_tier, tenant_id)
451452
.await?;
452453
file_processed = true;
453-
let path = self.get_stream_path_for_date(stream, &date);
454+
let path = self.get_stream_path_for_date(stream, &date, tenant_id);
454455
let mut hot_tier_manifest = HotTierManager::get_hot_tier_manifest_from_path(path).await?;
455456
hot_tier_manifest.files.push(parquet_file.clone());
456457
hot_tier_manifest
457458
.files
458459
.sort_by_key(|file| file.file_path.clone());
459460
// write the manifest file to the hot tier directory
460461
let manifest_path = self
461-
.get_stream_path_for_date(stream, &date)
462+
.get_stream_path_for_date(stream, &date, tenant_id)
462463
.join("hottier.manifest.json");
463464
fs::create_dir_all(manifest_path.parent().unwrap()).await?;
464465
fs::write(manifest_path, serde_json::to_vec(&hot_tier_manifest)?).await?;
@@ -467,9 +468,18 @@ impl HotTierManager {
467468
}
468469

469470
///fetch the list of dates available in the hot tier directory for the stream and sort them
470-
pub async fn fetch_hot_tier_dates(&self, stream: &str) -> Result<Vec<NaiveDate>, HotTierError> {
471+
pub async fn fetch_hot_tier_dates(
472+
&self,
473+
stream: &str,
474+
tenant_id: &Option<String>,
475+
) -> Result<Vec<NaiveDate>, HotTierError> {
471476
let mut date_list = Vec::new();
472-
let path = self.hot_tier_path.join(stream);
477+
let path = if let Some(tenant) = tenant_id.as_ref() {
478+
self.hot_tier_path.join(tenant).join(stream)
479+
} else {
480+
self.hot_tier_path.join(stream)
481+
};
482+
// let path = self.hot_tier_path.join(stream);
473483
if !path.exists() {
474484
return Ok(date_list);
475485
}
@@ -524,37 +534,43 @@ impl HotTierManager {
524534
}
525535

526536
/// get hot tier path for the stream and date
527-
pub fn get_stream_path_for_date(&self, stream: &str, date: &NaiveDate) -> PathBuf {
528-
self.hot_tier_path.join(stream).join(format!("date={date}"))
537+
pub fn get_stream_path_for_date(
538+
&self,
539+
stream: &str,
540+
date: &NaiveDate,
541+
tenant_id: &Option<String>,
542+
) -> PathBuf {
543+
if let Some(tenant) = tenant_id.as_ref() {
544+
self.hot_tier_path
545+
.join(tenant)
546+
.join(stream)
547+
.join(format!("date={date}"))
548+
} else {
549+
self.hot_tier_path.join(stream).join(format!("date={date}"))
550+
}
529551
}
530552

531553
/// Returns the list of manifest files present in hot tier directory for the stream
532554
pub async fn get_hot_tier_manifest_files(
533555
&self,
534-
stream: &str,
535556
manifest_files: &mut Vec<File>,
536557
) -> Result<Vec<File>, HotTierError> {
537-
// Fetch the list of hot tier parquet files for the given stream.
538-
let mut hot_tier_files = self.get_hot_tier_parquet_files(stream).await?;
539-
540-
// Retain only the files in `hot_tier_files` that also exist in `manifest_files`.
541-
hot_tier_files.retain(|file| {
542-
manifest_files
543-
.iter()
544-
.any(|manifest_file| manifest_file.file_path.eq(&file.file_path))
558+
// Instead of reading all hot tier manifests from disk (expensive I/O on every query),
559+
// check which query-relevant files exist locally in the hot tier directory.
560+
let mut hot_tier_files = Vec::new();
561+
562+
manifest_files.retain(|file| {
563+
let hot_tier_path = self.hot_tier_path.join(&file.file_path);
564+
if hot_tier_path.exists() {
565+
hot_tier_files.push(file.clone());
566+
false // remove from manifest_files, will be served from hot tier
567+
} else {
568+
true // keep in manifest_files, will be fetched from object store
569+
}
545570
});
546571

547-
// Sort `hot_tier_files` in descending order by file path.
572+
// Sort both lists in descending order by file path.
548573
hot_tier_files.sort_unstable_by(|a, b| b.file_path.cmp(&a.file_path));
549-
550-
// Update `manifest_files` to exclude files that are present in the filtered `hot_tier_files`.
551-
manifest_files.retain(|manifest_file| {
552-
hot_tier_files
553-
.iter()
554-
.all(|file| !file.file_path.eq(&manifest_file.file_path))
555-
});
556-
557-
// Sort `manifest_files` in descending order by file path.
558574
manifest_files.sort_unstable_by(|a, b| b.file_path.cmp(&a.file_path));
559575

560576
Ok(hot_tier_files)
@@ -564,16 +580,17 @@ impl HotTierManager {
564580
pub async fn get_hot_tier_parquet_files(
565581
&self,
566582
stream: &str,
583+
tenant_id: &Option<String>,
567584
) -> Result<Vec<File>, HotTierError> {
568585
// Fetch list of dates for the given stream
569-
let date_list = self.fetch_hot_tier_dates(stream).await?;
586+
let date_list = self.fetch_hot_tier_dates(stream, tenant_id).await?;
570587

571588
// Create an unordered iter of futures to async collect files
572589
let mut tasks = FuturesUnordered::new();
573590

574591
// For each date, fetch the manifest and extract parquet files
575592
for date in date_list {
576-
let path = self.get_stream_path_for_date(stream, &date);
593+
let path = self.get_stream_path_for_date(stream, &date, tenant_id);
577594
tasks.push(async move {
578595
HotTierManager::get_hot_tier_manifest_from_path(path)
579596
.await
@@ -621,9 +638,9 @@ impl HotTierManager {
621638
tenant_id: &Option<String>,
622639
) -> Result<bool, HotTierError> {
623640
let mut delete_successful = false;
624-
let dates = self.fetch_hot_tier_dates(stream).await?;
641+
let dates = self.fetch_hot_tier_dates(stream, tenant_id).await?;
625642
'loop_dates: for date in dates {
626-
let path = self.get_stream_path_for_date(stream, &date);
643+
let path = self.get_stream_path_for_date(stream, &date, tenant_id);
627644
if !path.exists() {
628645
continue;
629646
}
@@ -712,14 +729,15 @@ impl HotTierManager {
712729
pub async fn get_oldest_date_time_entry(
713730
&self,
714731
stream: &str,
732+
tenant_id: &Option<String>,
715733
) -> Result<Option<String>, HotTierError> {
716-
let date_list = self.fetch_hot_tier_dates(stream).await?;
734+
let date_list = self.fetch_hot_tier_dates(stream, tenant_id).await?;
717735
if date_list.is_empty() {
718736
return Ok(None);
719737
}
720738

721739
for date in date_list {
722-
let path = self.get_stream_path_for_date(stream, &date);
740+
let path = self.get_stream_path_for_date(stream, &date, tenant_id);
723741
let hours_dir = ReadDirStream::new(fs::read_dir(&path).await?);
724742
let mut hours: Vec<DirEntry> = hours_dir.try_collect().await?;
725743
hours.retain(|entry| {

src/query/stream_schema_provider.rs

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -202,11 +202,11 @@ impl StandardTableProvider {
202202
time_partition: Option<String>,
203203
) -> Result<(), DataFusionError> {
204204
let hot_tier_files = hot_tier_manager
205-
.get_hot_tier_manifest_files(&self.stream, manifest_files)
205+
.get_hot_tier_manifest_files(manifest_files)
206206
.await
207207
.map_err(|err| DataFusionError::External(Box::new(err)))?;
208208

209-
let hot_tier_files = hot_tier_files
209+
let hot_tier_files: Vec<File> = hot_tier_files
210210
.into_iter()
211211
.map(|mut file| {
212212
let path = PARSEABLE
@@ -221,11 +221,7 @@ impl StandardTableProvider {
221221
.collect();
222222

223223
let (partitioned_files, statistics) = self.partitioned_files(hot_tier_files);
224-
// let object_store_url = if let Some(tenant_id) = self.tenant_id.as_ref() {
225-
// &format!("file:///{tenant_id}/")
226-
// } else {
227-
// "file:///"
228-
// };
224+
229225
let object_store_url = "file:///";
230226
self.create_parquet_physical_plan(
231227
execution_plans,

0 commit comments

Comments
 (0)