Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 81 additions & 2 deletions src/carnot/planner/logical_planner_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,85 @@ TEST_F(LogicalPlannerTest, PlanWithExecFuncs) {
EXPECT_OK(plan->ToProto());
}

constexpr char kBPFTraceProgramMaxKernel[] = R"bpftrace(
kprobe:tcp_drop
{
...
}
)bpftrace";

constexpr char kBPFTraceProgramMinKernel[] = R"bpftrace(
tracepoint:skb:kfree_skb
{
...
}
)bpftrace";

constexpr char kTwoTraceProgramsPxl[] = R"pxl(
import pxtrace
import px

before_518_trace_program = pxtrace.TraceProgram(
program="""$0""",
max_kernel='5.18',
)

after_519_trace_program = pxtrace.TraceProgram(
program="""$1""",
min_kernel='5.19',
)

table_name = 'tcp_drop_table'
pxtrace.UpsertTracepoint('tcp_drop_tracer',
table_name,
[before_518_trace_program, after_519_trace_program],
pxtrace.kprobe(),
'10m')
)pxl";

constexpr char kBPFTwoTraceProgramsPb[] = R"proto(
name: "tcp_drop_tracer"
ttl {
seconds: 600
}
programs {
table_name: "tcp_drop_table"
bpftrace {
program: "\nkprobe:tcp_drop\n{\n ...\n}\n"
}
selectors {
selector_type: MAX_KERNEL
value: "5.18"
}
}
programs {
table_name: "tcp_drop_table"
bpftrace {
program: "\ntracepoint:skb:kfree_skb\n{\n ...\n}\n"
}
selectors {
selector_type: MIN_KERNEL
value: "5.19"
}
}
)proto";

TEST_F(LogicalPlannerTest, CompileTwoTracePrograms) {
auto planner = LogicalPlanner::Create(info_).ConsumeValueOrDie();
plannerpb::CompileMutationsRequest req;
req.set_query_str(
absl::Substitute(kTwoTraceProgramsPxl, kBPFTraceProgramMaxKernel, kBPFTraceProgramMinKernel));
*req.mutable_logical_planner_state() =
testutils::CreateTwoPEMsOneKelvinPlannerState(testutils::kHttpEventsSchema);
auto trace_ir_or_s = planner->CompileTrace(req);
ASSERT_OK(trace_ir_or_s);
auto trace_ir = trace_ir_or_s.ConsumeValueOrDie();
plannerpb::CompileMutationsResponse resp;
ASSERT_OK(trace_ir->ToProto(&resp));
ASSERT_EQ(resp.mutations_size(), 1);
EXPECT_THAT(resp.mutations()[0].trace(), EqualsProto(kBPFTwoTraceProgramsPb));
}

constexpr char kSingleProbePxl[] = R"pxl(
import pxtrace
import px
Expand All @@ -391,7 +470,7 @@ pxtrace.UpsertTracepoint('http_return',
"5m")
)pxl";

constexpr char kSingleProbeProgramPb[] = R"pxl(
constexpr char kSingleProbeProgramPb[] = R"proto(
name: "http_return"
ttl {
seconds: 300
Expand Down Expand Up @@ -435,7 +514,7 @@ programs {
}
}
}
)pxl";
)proto";

TEST_F(LogicalPlannerTest, CompileTrace) {
auto planner = LogicalPlanner::Create(info_).ConsumeValueOrDie();
Expand Down
1 change: 1 addition & 0 deletions src/carnot/planner/objects/qlobject.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ enum class QLObjectType {
// General module type.
kModule,
kTraceModule,
kTraceProgram,
kDict,
kTracingVariable,
kProbe,
Expand Down
11 changes: 8 additions & 3 deletions src/carnot/planner/probes/probes.cc
Original file line number Diff line number Diff line change
Expand Up @@ -197,12 +197,17 @@ StatusOr<TracepointDeployment*> MutationsIR::CreateKProbeTracepointDeployment(
return raw;
}

Status TracepointDeployment::AddBPFTrace(const std::string& bpftrace,
const std::string& output_name) {
Status TracepointDeployment::AddBPFTrace(const std::string& bpftrace_str,
const std::string& output_name,
const std::vector<TracepointSelector>& selectors) {
carnot::planner::dynamic_tracing::ir::logical::TracepointDeployment::TracepointProgram
tracepoint_pb;
tracepoint_pb.mutable_bpftrace()->set_program(bpftrace);
tracepoint_pb.mutable_bpftrace()->set_program(bpftrace_str);
// set the output table to write program results to
tracepoint_pb.set_table_name(output_name);
for (const auto& selector : selectors) {
*tracepoint_pb.add_selectors() = selector;
}
tracepoints_.push_back(tracepoint_pb);
return Status::OK();
}
Expand Down
6 changes: 5 additions & 1 deletion src/carnot/planner/probes/probes.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ namespace carnot {
namespace planner {
namespace compiler {

using TracepointSelector = carnot::planner::dynamic_tracing::ir::logical::TracepointSelector;

class ProbeOutput {
public:
ProbeOutput() = delete;
Expand Down Expand Up @@ -192,9 +194,11 @@ class TracepointDeployment {
*
* @param bpftrace_program the program in string format.
* @param output_name the output table to write program results.
* @param selectors the selectors to use for the program.
* @return Status
*/
Status AddBPFTrace(const std::string& bpftrace_program, const std::string& output_name);
Status AddBPFTrace(const std::string& bpftrace_str, const std::string& output_name,
const std::vector<TracepointSelector>& selectors);

std::string name() const { return name_; }

Expand Down
213 changes: 213 additions & 0 deletions src/carnot/planner/probes/probes_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -648,6 +648,219 @@ TEST_F(ProbeCompilerTest, parse_bpftrace) {
testing::proto::EqualsProto(absl::Substitute(kBPFTraceProgramPb, literal_bpf_trace)));
}

constexpr char kBPFTraceProgramMaxKernel[] = R"bpftrace(
kprobe:tcp_drop
{
...
}
)bpftrace";

constexpr char kBPFTraceProgramMinKernel[] = R"bpftrace(
tracepoint:skb:kfree_skb
{
...
}
)bpftrace";

// Test that we can compile/parse a single TraceProgram object with a valid selector
constexpr char kBPFSingleTraceProgramObjectPxl[] = R"pxl(
import pxtrace
import px

after_519_trace_program = pxtrace.TraceProgram(
program="""$0""",
min_kernel='5.19',
)

table_name = 'tcp_drop_table'
pxtrace.UpsertTracepoint('tcp_drop_tracer',
table_name,
after_519_trace_program,
pxtrace.kprobe(),
'10m')
)pxl";

constexpr char kBPFSingleTraceProgramObjectPb[] = R"proto(
name: "tcp_drop_tracer"
ttl {
seconds: 600
}
programs {
table_name: "tcp_drop_table"
bpftrace {
program: "$0"
}
selectors {
selector_type: MIN_KERNEL
value: "5.19"
}
}
)proto";

TEST_F(ProbeCompilerTest, parse_single_bpftrace_program_object) {
ASSERT_OK_AND_ASSIGN(auto probe_ir,
CompileProbeScript(absl::Substitute(kBPFSingleTraceProgramObjectPxl,
kBPFTraceProgramMinKernel)));
plannerpb::CompileMutationsResponse pb;
EXPECT_OK(probe_ir->ToProto(&pb));
ASSERT_EQ(pb.mutations_size(), 1);

std::string literal_bpf_trace_min = kBPFTraceProgramMinKernel;
literal_bpf_trace_min = std::regex_replace(literal_bpf_trace_min, std::regex("\n"), "\\n");

EXPECT_THAT(pb.mutations()[0].trace(),
testing::proto::EqualsProto(
absl::Substitute(kBPFSingleTraceProgramObjectPb, literal_bpf_trace_min)));
}

// Test that we can compile a list of TraceProgram objects with valid selectors
constexpr char kBPFTraceProgramObjectsPxl[] = R"pxl(
import pxtrace
import px

before_518_trace_program = pxtrace.TraceProgram(
program="""$0""",
max_kernel='5.18',
)

after_519_trace_program = pxtrace.TraceProgram(
program="""$1""",
min_kernel='5.19',
)

table_name = 'tcp_drop_table'
pxtrace.UpsertTracepoint('tcp_drop_tracer',
table_name,
[before_518_trace_program, after_519_trace_program],
pxtrace.kprobe(),
'10m')
)pxl";

constexpr char kBPFTraceProgramObjectsPb[] = R"proto(
name: "tcp_drop_tracer"
ttl {
seconds: 600
}
programs {
table_name: "tcp_drop_table"
bpftrace {
program: "$0"
}
selectors {
selector_type: MAX_KERNEL
value: "5.18"
}
}
programs {
table_name: "tcp_drop_table"
bpftrace {
program: "$1"
}
selectors {
selector_type: MIN_KERNEL
value: "5.19"
}
}
)proto";

TEST_F(ProbeCompilerTest, parse_multiple_bpftrace_program_objects) {
ASSERT_OK_AND_ASSIGN(auto probe_ir, CompileProbeScript(absl::Substitute(
kBPFTraceProgramObjectsPxl, kBPFTraceProgramMinKernel,
kBPFTraceProgramMaxKernel)));
plannerpb::CompileMutationsResponse pb;
EXPECT_OK(probe_ir->ToProto(&pb));
ASSERT_EQ(pb.mutations_size(), 1);

std::string literal_bpf_trace_min = kBPFTraceProgramMinKernel;
literal_bpf_trace_min = std::regex_replace(literal_bpf_trace_min, std::regex("\n"), "\\n");

std::string literal_bpf_trace_max = kBPFTraceProgramMaxKernel;
literal_bpf_trace_max = std::regex_replace(literal_bpf_trace_max, std::regex("\n"), "\\n");

EXPECT_THAT(pb.mutations()[0].trace(),
testing::proto::EqualsProto(absl::Substitute(
kBPFTraceProgramObjectsPb, literal_bpf_trace_min, literal_bpf_trace_max)));
}

// Test that passing an unsupported selector type to TraceProgram throws a compiler error
constexpr char kBPFUnsupportedTraceProgramObjectSelectorPxl[] = R"pxl(
import pxtrace
import px

after_519_trace_program = pxtrace.TraceProgram(
program="""$0""",
min_kernel='5.19',
my_unsupported_selector='12345',
)

table_name = 'tcp_drop_table'
pxtrace.UpsertTracepoint('tcp_drop_tracer',
table_name,
after_519_trace_program,
pxtrace.kprobe(),
'10m')
)pxl";

TEST_F(ProbeCompilerTest, parse_unsupported_selector_in_trace_program_object) {
auto probe_ir_or_s = CompileProbeScript(kBPFUnsupportedTraceProgramObjectSelectorPxl);
ASSERT_NOT_OK(probe_ir_or_s);
EXPECT_THAT(
probe_ir_or_s.status(),
HasCompilerError("Unsupported selector argument provided \'my_unsupported_selector\'"));
}

// Test that an invalid selector value throws a compiler error (currently needs to be a string)
constexpr char kBPFInvalidTraceProgramObjectSelectorPxl[] = R"pxl(
import pxtrace
import px

after_519_trace_program = pxtrace.TraceProgram(
program="""$0""",
min_kernel='5.19',
max_kernel=None,
)

table_name = 'tcp_drop_table'
pxtrace.UpsertTracepoint('tcp_drop_tracer',
table_name,
after_519_trace_program,
pxtrace.kprobe(),
'10m')
)pxl";

TEST_F(ProbeCompilerTest, parse_invalid_trace_program_object) {
auto probe_ir_or_s = CompileProbeScript(kBPFInvalidTraceProgramObjectSelectorPxl);
ASSERT_NOT_OK(probe_ir_or_s);
EXPECT_THAT(probe_ir_or_s.status(),
HasCompilerError("Expected \'String\' in arg \'max_kernel\', got \'none\'"));
}

// Test that an empty selector value throws a compiler error
constexpr char kBPFEmptyTraceProgramObjectSelectorPxl[] = R"pxl(
import pxtrace
import px

after_519_trace_program = pxtrace.TraceProgram(
program="""$0""",
min_kernel='5.19',
max_kernel='',
)

table_name = 'tcp_drop_table'
pxtrace.UpsertTracepoint('tcp_drop_tracer',
table_name,
after_519_trace_program,
pxtrace.kprobe(),
'10m')
)pxl";

TEST_F(ProbeCompilerTest, parse_empty_trace_program_object) {
auto probe_ir_or_s = CompileProbeScript(kBPFEmptyTraceProgramObjectSelectorPxl);
ASSERT_NOT_OK(probe_ir_or_s);
EXPECT_THAT(probe_ir_or_s.status(),
HasCompilerError("Empty selector value provided for \'max_kernel\'"));
}

constexpr char kConfigChangePxl[] = R"pxl(
import pxconfig
import px
Expand Down
Loading