Skip to content

Commit 13f6f22

Browse files
committed
Merge branch 'main' into multi-tenancy
2 parents b568d17 + 92b0ed0 commit 13f6f22

File tree

9 files changed

+193
-16
lines changed

9 files changed

+193
-16
lines changed

src/handlers/http/ingest.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ pub async fn ingest(
120120
vec![log_source_entry.clone()],
121121
telemetry_type,
122122
&tenant_id,
123+
None
123124
)
124125
.await
125126
.map_err(|e| {
@@ -238,6 +239,7 @@ pub async fn setup_otel_stream(
238239
vec![log_source_entry.clone()],
239240
telemetry_type,
240241
&tenant_id,
242+
None
241243
)
242244
.await?;
243245
let mut time_partition = None;

src/handlers/http/modal/utils/logstream_utils.rs

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,14 @@
1919
use crate::{
2020
event::format::LogSource,
2121
handlers::{
22-
CUSTOM_PARTITION_KEY, LOG_SOURCE_KEY, STATIC_SCHEMA_FLAG, STREAM_TYPE_KEY,
23-
TELEMETRY_TYPE_KEY, TIME_PARTITION_KEY, TIME_PARTITION_LIMIT_KEY, TelemetryType,
24-
UPDATE_STREAM_KEY,
22+
CUSTOM_PARTITION_KEY, DATASET_TAG_KEY, DatasetTag, LOG_SOURCE_KEY, STATIC_SCHEMA_FLAG,
23+
STREAM_TYPE_KEY, TELEMETRY_TYPE_KEY, TIME_PARTITION_KEY, TIME_PARTITION_LIMIT_KEY,
24+
TelemetryType, UPDATE_STREAM_KEY,
2525
},
2626
storage::StreamType,
2727
};
2828
use actix_web::http::header::HeaderMap;
29+
use tracing::warn;
2930

3031
#[derive(Debug, Default)]
3132
pub struct PutStreamHeaders {
@@ -37,6 +38,7 @@ pub struct PutStreamHeaders {
3738
pub stream_type: StreamType,
3839
pub log_source: LogSource,
3940
pub telemetry_type: TelemetryType,
41+
pub dataset_tag: Option<DatasetTag>,
4042
}
4143

4244
impl From<&HeaderMap> for PutStreamHeaders {
@@ -70,6 +72,16 @@ impl From<&HeaderMap> for PutStreamHeaders {
7072
.get(TELEMETRY_TYPE_KEY)
7173
.and_then(|v| v.to_str().ok())
7274
.map_or(TelemetryType::Logs, TelemetryType::from),
75+
dataset_tag: headers
76+
.get(DATASET_TAG_KEY)
77+
.and_then(|v| v.to_str().ok())
78+
.and_then(|v| match DatasetTag::try_from(v) {
79+
Ok(tag) => Some(tag),
80+
Err(err) => {
81+
warn!("Invalid dataset tag '{v}': {err}");
82+
None
83+
}
84+
}),
7385
}
7486
}
7587
}

src/handlers/mod.rs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ pub const AUTHORIZATION_KEY: &str = "authorization";
3535
pub const UPDATE_STREAM_KEY: &str = "x-p-update-stream";
3636
pub const STREAM_TYPE_KEY: &str = "x-p-stream-type";
3737
pub const TELEMETRY_TYPE_KEY: &str = "x-p-telemetry-type";
38+
pub const DATASET_TAG_KEY: &str = "x-p-dataset-tag";
3839
const COOKIE_AGE_DAYS: usize = 7;
3940
const SESSION_COOKIE_NAME: &str = "session";
4041
const USER_COOKIE_NAME: &str = "username";
@@ -81,3 +82,37 @@ impl Display for TelemetryType {
8182
})
8283
}
8384
}
85+
86+
/// Tag for categorizing datasets/streams by observability domain
87+
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
88+
#[serde(rename_all = "kebab-case")]
89+
pub enum DatasetTag {
90+
AgentObservability,
91+
K8sObservability,
92+
DatabaseObservability,
93+
}
94+
95+
impl TryFrom<&str> for DatasetTag {
96+
type Error = &'static str;
97+
98+
fn try_from(s: &str) -> Result<Self, Self::Error> {
99+
match s.to_lowercase().as_str() {
100+
"agent-observability" => Ok(DatasetTag::AgentObservability),
101+
"k8s-observability" => Ok(DatasetTag::K8sObservability),
102+
"database-observability" => Ok(DatasetTag::DatabaseObservability),
103+
_ => Err(
104+
"Invalid dataset tag. Supported values: agent-observability, k8s-observability, database-observability",
105+
),
106+
}
107+
}
108+
}
109+
110+
impl Display for DatasetTag {
111+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
112+
f.write_str(match self {
113+
DatasetTag::AgentObservability => "agent-observability",
114+
DatasetTag::K8sObservability => "k8s-observability",
115+
DatasetTag::DatabaseObservability => "database-observability",
116+
})
117+
}
118+
}

src/metadata.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use std::sync::Arc;
2525

2626
use crate::catalog::snapshot::ManifestItem;
2727
use crate::event::format::LogSourceEntry;
28-
use crate::handlers::TelemetryType;
28+
use crate::handlers::{DatasetTag, TelemetryType};
2929
use crate::hottier::StreamHotTier;
3030
use crate::metrics::{
3131
EVENTS_INGESTED, EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE, EVENTS_INGESTED_SIZE_DATE,
@@ -93,6 +93,7 @@ pub struct LogStreamMetadata {
9393
pub stream_type: StreamType,
9494
pub log_source: Vec<LogSourceEntry>,
9595
pub telemetry_type: TelemetryType,
96+
pub dataset_tag: Option<DatasetTag>,
9697
}
9798

9899
impl LogStreamMetadata {
@@ -108,6 +109,7 @@ impl LogStreamMetadata {
108109
schema_version: SchemaVersion,
109110
log_source: Vec<LogSourceEntry>,
110111
telemetry_type: TelemetryType,
112+
dataset_tag: Option<DatasetTag>,
111113
) -> Self {
112114
LogStreamMetadata {
113115
created_at: if created_at.is_empty() {
@@ -132,6 +134,7 @@ impl LogStreamMetadata {
132134
schema_version,
133135
log_source,
134136
telemetry_type,
137+
dataset_tag,
135138
..Default::default()
136139
}
137140
}

src/migration/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -418,6 +418,7 @@ async fn setup_logstream_metadata(
418418
stream_type,
419419
log_source,
420420
telemetry_type,
421+
dataset_tag,
421422
..
422423
} = serde_json::from_value(stream_metadata_value).unwrap_or_default();
423424

@@ -463,6 +464,7 @@ async fn setup_logstream_metadata(
463464
stream_type,
464465
log_source,
465466
telemetry_type,
467+
dataset_tag,
466468
};
467469

468470
Ok(metadata)

src/parseable/mod.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ use crate::{
5252
format::{LogSource, LogSourceEntry},
5353
},
5454
handlers::{
55-
STREAM_TYPE_KEY, TelemetryType,
55+
DatasetTag, STREAM_TYPE_KEY, TelemetryType,
5656
http::{
5757
cluster::{PMETA_STREAM_NAME, sync_streams_with_ingestors},
5858
ingest::PostError,
@@ -415,6 +415,7 @@ impl Parseable {
415415
let schema_version = stream_metadata.schema_version;
416416
let log_source = stream_metadata.log_source;
417417
let telemetry_type = stream_metadata.telemetry_type;
418+
let dataset_tag = stream_metadata.dataset_tag;
418419
let mut metadata = LogStreamMetadata::new(
419420
created_at,
420421
time_partition,
@@ -426,6 +427,7 @@ impl Parseable {
426427
schema_version,
427428
log_source,
428429
telemetry_type,
430+
dataset_tag,
429431
);
430432

431433
// Set hot tier fields from the stored metadata
@@ -471,6 +473,7 @@ impl Parseable {
471473
vec![log_source_entry.clone()],
472474
TelemetryType::Logs,
473475
&tenant_id,
476+
None
474477
)
475478
.await;
476479

@@ -517,6 +520,7 @@ impl Parseable {
517520
log_source: Vec<LogSourceEntry>,
518521
telemetry_type: TelemetryType,
519522
tenant_id: &Option<String>,
523+
dataset_tag: Option<DatasetTag>,
520524
) -> Result<bool, PostError> {
521525
if self.streams.contains(stream_name, tenant_id) {
522526
return Ok(true);
@@ -549,6 +553,7 @@ impl Parseable {
549553
log_source,
550554
telemetry_type,
551555
tenant_id,
556+
dataset_tag,
552557
)
553558
.await?;
554559

@@ -625,6 +630,7 @@ impl Parseable {
625630
stream_type,
626631
log_source,
627632
telemetry_type,
633+
dataset_tag,
628634
} = headers.into();
629635

630636
let stream_in_memory_dont_update =
@@ -698,6 +704,7 @@ impl Parseable {
698704
vec![log_source_entry],
699705
telemetry_type,
700706
tenant_id,
707+
dataset_tag,
701708
)
702709
.await?;
703710

@@ -759,6 +766,7 @@ impl Parseable {
759766
log_source: Vec<LogSourceEntry>,
760767
telemetry_type: TelemetryType,
761768
tenant_id: &Option<String>,
769+
dataset_tag: Option<DatasetTag>,
762770
) -> Result<(), CreateStreamError> {
763771
// fail to proceed if invalid stream name
764772
if stream_type != StreamType::Internal {
@@ -783,6 +791,7 @@ impl Parseable {
783791
},
784792
log_source: log_source.clone(),
785793
telemetry_type,
794+
dataset_tag,
786795
..Default::default()
787796
};
788797

@@ -812,6 +821,7 @@ impl Parseable {
812821
SchemaVersion::V1, // New stream
813822
log_source,
814823
telemetry_type,
824+
dataset_tag,
815825
);
816826
let ingestor_id = INGESTOR_META
817827
.get()

0 commit comments

Comments
 (0)