Skip to content

Commit 3186ae1

Browse files
committed
coderabbit suggestions
1 parent 13f6f22 commit 3186ae1

File tree

12 files changed

+109
-47
lines changed

12 files changed

+109
-47
lines changed

src/connectors/kafka/processor.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ impl ParseableSinkProcessor {
6464
vec![log_source_entry],
6565
TelemetryType::default(),
6666
tenant_id,
67+
None,
6768
)
6869
.await?;
6970

src/correlation.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -139,9 +139,11 @@ impl Correlations {
139139
.await?;
140140
let tenant = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT);
141141
// Update in memory
142-
if let Some(corrs) = self.write().await.get_mut(tenant) {
143-
corrs.insert(correlation.id.to_owned(), correlation.clone());
144-
}
142+
self.write()
143+
.await
144+
.entry(tenant.to_string())
145+
.or_default()
146+
.insert(correlation.id.to_owned(), correlation.clone());
145147

146148
Ok(correlation)
147149
}
@@ -208,7 +210,11 @@ impl Correlations {
208210
.await?;
209211

210212
// Delete from memory
211-
self.write().await.remove(&correlation.id);
213+
self.write()
214+
.await
215+
.entry(tenant_id.as_deref().unwrap_or(DEFAULT_TENANT).to_owned())
216+
.or_default()
217+
.remove(&correlation.id);
212218

213219
Ok(())
214220
}

src/handlers/http/ingest.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ pub async fn ingest(
120120
vec![log_source_entry.clone()],
121121
telemetry_type,
122122
&tenant_id,
123-
None
123+
None,
124124
)
125125
.await
126126
.map_err(|e| {
@@ -239,7 +239,7 @@ pub async fn setup_otel_stream(
239239
vec![log_source_entry.clone()],
240240
telemetry_type,
241241
&tenant_id,
242-
None
242+
None,
243243
)
244244
.await?;
245245
let mut time_partition = None;

src/handlers/http/middleware.rs

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ where
168168
// if action is Ingest and multi-tenancy is on, then request MUST have tenant id
169169
// else check for the presence of tenant id using other details
170170

171-
// an optional error to track the presence of CORRECTtenant header in case of ingestion
171+
// an optional error to track the presence of CORRECT tenant header in case of ingestion
172172
let mut header_error = None;
173173
let user_and_tenant_id: Result<(Result<String, RBACError>, Option<String>), RBACError> =
174174
if PARSEABLE.options.is_multi_tenant() {
@@ -203,14 +203,11 @@ where
203203
HeaderValue::from_str(tid).unwrap(),
204204
);
205205
t = tenant;
206+
} else {
207+
// remove the header if already present
208+
req.headers_mut().remove("tenant");
206209
}
207210
t
208-
// else {
209-
// header_error = Some(actix_web::Error::from(PostError::Header(
210-
// crate::utils::header_parsing::ParseHeaderError::InvalidTenantId,
211-
// )));
212-
// None
213-
// }
214211
};
215212
let userid = get_user_from_request(req.request());
216213
Ok((userid, tenant))

src/metastore/metastores/object_store_metastore.rs

Lines changed: 32 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -514,9 +514,9 @@ impl Metastore for ObjectStoreMetastore {
514514

515515
/// Fetch all dashboards
516516
async fn get_dashboards(&self) -> Result<HashMap<String, Vec<Bytes>>, MetastoreError> {
517-
let mut dashboards = HashMap::new();
517+
let mut dashboards: HashMap<String, Vec<Bytes>> = HashMap::new();
518518
let base_paths = PARSEABLE.list_tenants().unwrap_or_else(|| vec!["".into()]);
519-
for mut tenant in base_paths {
519+
for tenant in base_paths {
520520
let tenant_id = &Some(tenant.clone());
521521
let users_dir = RelativePathBuf::from_iter([&tenant, USERS_ROOT_DIR]);
522522
for user in self
@@ -533,11 +533,16 @@ impl Metastore for ObjectStoreMetastore {
533533
tenant_id,
534534
)
535535
.await?;
536-
if tenant.is_empty() {
537-
tenant.clone_from(&DEFAULT_TENANT.to_string());
538-
}
539-
dashboards.insert(tenant.to_owned(), dashboard_bytes);
540-
// dashboards.extend(dashboard_bytes);
536+
537+
let tenant_key = if tenant.is_empty() {
538+
DEFAULT_TENANT.to_string()
539+
} else {
540+
tenant.clone()
541+
};
542+
dashboards
543+
.entry(tenant_key)
544+
.or_default()
545+
.extend(dashboard_bytes);
541546
}
542547
}
543548

@@ -690,9 +695,21 @@ impl Metastore for ObjectStoreMetastore {
690695

691696
if version == Some("v1") {
692697
// delete older version of the filter
693-
self.storage.delete_object(&filters_path, tenant_id).await?;
698+
// get filter id to delete
699+
let filterid = meta
700+
.get("filter_id")
701+
.and_then(|filter_id| filter_id.as_str());
702+
if let Some(filterid) = filterid {
703+
self.storage
704+
.delete_object(
705+
&filters_path.join(format!("{filterid}.json")),
706+
tenant_id,
707+
)
708+
.await?;
709+
}
710+
// self.storage.delete_object(&filters_path, tenant_id).await?;
694711

695-
filter_value = migrate_v1_v2(filter_value);
712+
filter_value = migrate_v1_v2(filter_value, tenant_id);
696713
let user_id = filter_value
697714
.as_object()
698715
.unwrap()
@@ -717,6 +734,7 @@ impl Metastore for ObjectStoreMetastore {
717734
user_id,
718735
stream_name,
719736
&format!("{filter_id}.json"),
737+
tenant_id,
720738
);
721739
let filter_bytes = to_bytes(&filter_value);
722740
self.storage
@@ -933,7 +951,11 @@ impl Metastore for ObjectStoreMetastore {
933951
.collect::<Vec<_>>();
934952

935953
for date in dates {
936-
let date_path = object_store::path::Path::from(format!("{}/{}", stream_name, &date));
954+
let date_path = if let Some(tenant) = tenant_id {
955+
object_store::path::Path::from(format!("{}/{}/{}", tenant, stream_name, &date))
956+
} else {
957+
object_store::path::Path::from(format!("{}/{}", stream_name, &date))
958+
};
937959
let resp = self.storage.list_with_delimiter(Some(date_path)).await?;
938960

939961
let manifest_paths: Vec<String> = resp

src/parseable/mod.rs

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -473,7 +473,7 @@ impl Parseable {
473473
vec![log_source_entry.clone()],
474474
TelemetryType::Logs,
475475
&tenant_id,
476-
None
476+
None,
477477
)
478478
.await;
479479

@@ -512,6 +512,7 @@ impl Parseable {
512512
}
513513

514514
// Check if the stream exists and create a new stream if doesn't exist
515+
#[allow(clippy::too_many_arguments)]
515516
pub async fn create_stream_if_not_exists(
516517
&self,
517518
stream_name: &str,
@@ -1045,14 +1046,15 @@ impl Parseable {
10451046
return Err(anyhow::Error::msg("P_MULTI_TENANCY is set to false"));
10461047
}
10471048

1048-
if self.tenants.read().unwrap().contains(&tenant_id) {
1049+
let mut tenants = self.tenants.write().unwrap();
1050+
if tenants.contains(&tenant_id) {
10491051
return Err(anyhow::Error::msg(format!(
10501052
"Tenant with id- {tenant_id} already exists"
10511053
)));
1052-
} else {
1053-
self.tenants.write().unwrap().push(tenant_id.clone());
1054-
TENANT_METADATA.insert_tenant(tenant_id, tenant_meta);
10551054
}
1055+
tenants.push(tenant_id.clone());
1056+
drop(tenants);
1057+
TENANT_METADATA.insert_tenant(tenant_id, tenant_meta);
10561058

10571059
Ok(())
10581060
}
@@ -1120,6 +1122,9 @@ impl Parseable {
11201122
// delete resources
11211123

11221124
// delete from in-mem
1125+
if let Ok(mut tenants) = self.tenants.write() {
1126+
tenants.retain(|t| t != tenant_id);
1127+
}
11231128
TENANT_METADATA.delete_tenant(tenant_id);
11241129
Ok(())
11251130
}
@@ -1154,7 +1159,9 @@ impl Parseable {
11541159
}
11551160
}
11561161

1157-
if let Ok(mut t) = self.tenants.write() {
1162+
if let Ok(mut t) = self.tenants.write()
1163+
&& is_multi_tenant
1164+
{
11581165
t.extend(dirs);
11591166
Ok(Some(()))
11601167
} else {

src/prism/home/mod.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use crate::{
2727
event::format::{LogSource, LogSourceEntry},
2828
handlers::{TelemetryType, http::logstream::error::StreamError},
2929
metastore::MetastoreError,
30-
parseable::PARSEABLE,
30+
parseable::{DEFAULT_TENANT, PARSEABLE},
3131
rbac::{
3232
Users,
3333
map::{SessionKey, users},
@@ -155,7 +155,10 @@ pub async fn generate_home_response(
155155

156156
// Generate checklist and count triggered alerts
157157
let data_ingested = datasets.iter().any(|d| d.ingestion);
158-
let user_count = users().len();
158+
let user_count = users()
159+
.get(tenant_id.as_deref().unwrap_or(DEFAULT_TENANT))
160+
.map(|m| m.len())
161+
.unwrap_or(0);
159162
let user_added = user_count > 1; // more than just the default admin user
160163

161164
// Calculate triggered alerts count

src/rbac/user.rs

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -349,7 +349,9 @@ impl UserGroup {
349349
pub fn validate(&self, tenant_id: &Option<String>) -> Result<(), RBACError> {
350350
let valid_name = is_valid_group_name(&self.name);
351351
let tenant = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT);
352-
if read_user_groups().contains_key(&self.name) {
352+
if let Some(tenant_ug) = read_user_groups().get(tenant)
353+
&& tenant_ug.contains_key(&self.name)
354+
{
353355
return Err(RBACError::UserGroupExists(self.name.clone()));
354356
}
355357
let mut non_existent_roles = Vec::new();
@@ -361,6 +363,8 @@ impl UserGroup {
361363
non_existent_roles.push(role.clone());
362364
}
363365
}
366+
} else {
367+
non_existent_roles.extend(self.roles.iter().cloned());
364368
}
365369
}
366370
let mut non_existent_users = Vec::new();
@@ -372,6 +376,8 @@ impl UserGroup {
372376
non_existent_users.push(group_user.userid().to_string());
373377
}
374378
}
379+
} else {
380+
non_existent_users.extend(self.users.iter().map(|u| u.userid().to_string()));
375381
}
376382
}
377383

@@ -496,12 +502,4 @@ impl UserGroup {
496502

497503
self.remove_users(users_to_remove)
498504
}
499-
500-
// pub async fn update_in_metadata(&self, tenant_id: &Option<String>) -> Result<(), RBACError> {
501-
// let mut metadata = get_metadata(tenant_id).await?;
502-
// metadata.user_groups.retain(|x| x.name != self.name);
503-
// metadata.user_groups.push(self.clone());
504-
// put_metadata(&metadata).await?;
505-
// Ok(())
506-
// }
507505
}

src/storage/object_storage.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1236,8 +1236,19 @@ pub fn stream_json_path(stream_name: &str, tenant_id: &Option<String>) -> Relati
12361236

12371237
/// if filter_id is an empty str it should not append it to the rel path
12381238
#[inline(always)]
1239-
pub fn filter_path(user_id: &str, stream_name: &str, filter_file_name: &str) -> RelativePathBuf {
1239+
pub fn filter_path(
1240+
user_id: &str,
1241+
stream_name: &str,
1242+
filter_file_name: &str,
1243+
tenant_id: &Option<String>,
1244+
) -> RelativePathBuf {
1245+
let root = if let Some(tenant) = tenant_id.as_ref() {
1246+
tenant
1247+
} else {
1248+
""
1249+
};
12401250
RelativePathBuf::from_iter([
1251+
root,
12411252
USERS_ROOT_DIR,
12421253
user_id,
12431254
FILTER_DIR,

src/storage/retention.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,13 @@ pub struct Task {
109109
}
110110

111111
impl Task {
112-
pub fn new(description: String, days: u32) -> Self {
112+
pub fn new(description: String, mut days: u32) -> Self {
113+
if days.eq(&0) {
114+
days = 7;
115+
tracing::warn!(
116+
"Using default 7 days for retention since an invalid value of 0 was provided by the user"
117+
);
118+
}
113119
let days = NonZeroU32::new(days).unwrap();
114120
Self {
115121
description,

0 commit comments

Comments
 (0)