diff --git a/ballista/rust/core/Cargo.toml b/ballista/rust/core/Cargo.toml index 190f7b85ba524..a55c73ebf64ae 100644 --- a/ballista/rust/core/Cargo.toml +++ b/ballista/rust/core/Cargo.toml @@ -42,6 +42,8 @@ tokio = "1.0" tonic = "0.6" uuid = { version = "0.8", features = ["v4"] } chrono = { version = "0.4", default-features = false } +clap = "2" +parse_arg = "0.1.3" arrow-flight = { version = "7.0.0" } datafusion = { path = "../../../datafusion", version = "6.0.0" } diff --git a/ballista/rust/core/proto/ballista.proto b/ballista/rust/core/proto/ballista.proto index a0bb84193346f..b3adb3668597e 100644 --- a/ballista/rust/core/proto/ballista.proto +++ b/ballista/rust/core/proto/ballista.proto @@ -838,6 +838,7 @@ message ExecutorMetadata { string id = 1; string host = 2; uint32 port = 3; + uint32 grpc_port = 4; } message ExecutorRegistration { @@ -848,12 +849,46 @@ message ExecutorRegistration { string host = 2; } uint32 port = 3; + uint32 grpc_port = 4; } message ExecutorHeartbeat { ExecutorMetadata meta = 1; // Unix epoch-based timestamp in seconds uint64 timestamp = 2; + ExecutorState state = 3; +} + +message ExecutorState { + repeated ExecutorMetric metrics = 1; +} + +message ExecutorMetric { + // TODO add more metrics + oneof metric { + uint64 available_memory = 1; + } +} + +message ExecutorSpecification { + repeated ExecutorResource resources = 1; +} + +message ExecutorResource { + // TODO add more resources + oneof resource { + uint32 task_slots = 1; + } +} + +message ExecutorData { + string executor_id = 1; + repeated ExecutorResourcePair resources = 2; +} + +message ExecutorResourcePair { + ExecutorResource total = 1; + ExecutorResource available = 2; } message RunningTask { @@ -906,6 +941,41 @@ message PollWorkResult { TaskDefinition task = 1; } +message RegisterExecutorParams { + ExecutorRegistration metadata = 1; + ExecutorSpecification specification = 2; +} + +message RegisterExecutorResult { + bool success = 1; +} + +message SendHeartBeatParams { + ExecutorRegistration metadata = 1; + ExecutorState state = 2; +} + +message SendHeartBeatResult { + // TODO it's from Spark for BlockManager + bool reregister = 1; +} + +message StopExecutorParams { +} + +message StopExecutorResult { +} + +message UpdateTaskStatusParams { + ExecutorRegistration metadata = 1; + // All tasks must be reported until they reach the failed or completed state + repeated TaskStatus task_status = 2; +} + +message UpdateTaskStatusResult { + bool success = 1; +} + message ExecuteQueryParams { oneof query { LogicalPlanNode logical_plan = 1; @@ -965,10 +1035,28 @@ message FilePartitionMetadata { repeated string filename = 1; } +message LaunchTaskParams { + // Allow to launch a task set to an executor at once + repeated TaskDefinition task = 1; +} + +message LaunchTaskResult { + bool success = 1; + // TODO when part of the task set are scheduled successfully +} + service SchedulerGrpc { // Executors must poll the scheduler for heartbeat and to receive tasks rpc PollWork (PollWorkParams) returns (PollWorkResult) {} + rpc RegisterExecutor(RegisterExecutorParams) returns (RegisterExecutorResult) {} + + // Push-based task scheduler will only leverage this interface + // rather than the PollWork interface to report executor states + rpc SendHeartBeat (SendHeartBeatParams) returns (SendHeartBeatResult) {} + + rpc UpdateTaskStatus (UpdateTaskStatusParams) returns (UpdateTaskStatusResult) {} + rpc GetFileMetadata (GetFileMetadataParams) returns (GetFileMetadataResult) {} rpc ExecuteQuery (ExecuteQueryParams) returns (ExecuteQueryResult) {} @@ -976,6 +1064,12 @@ service SchedulerGrpc { rpc GetJobStatus (GetJobStatusParams) returns (GetJobStatusResult) {} } +service ExecutorGrpc { + rpc LaunchTask (LaunchTaskParams) returns (LaunchTaskResult) {} + + rpc StopExecutor (StopExecutorParams) returns (StopExecutorResult) {} +} + /////////////////////////////////////////////////////////////////////////////////////////////////// // Arrow Data Types /////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/ballista/rust/core/src/config.rs b/ballista/rust/core/src/config.rs index 2256808fa34e2..12e50e2af9e51 100644 --- a/ballista/rust/core/src/config.rs +++ b/ballista/rust/core/src/config.rs @@ -18,6 +18,7 @@ //! Ballista configuration +use clap::arg_enum; use core::fmt; use std::collections::HashMap; use std::result; @@ -196,6 +197,22 @@ impl BallistaConfig { } } +// an enum used to configure the scheduler policy +// needs to be visible to code generated by configure_me +arg_enum! { + #[derive(Clone, Copy, Debug, serde::Deserialize)] + pub enum TaskSchedulingPolicy { + PullStaged, + PushStaged, + } +} + +impl parse_arg::ParseArgFromStr for TaskSchedulingPolicy { + fn describe_type(mut writer: W) -> fmt::Result { + write!(writer, "The scheduler policy for the scheduler") + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/ballista/rust/core/src/serde/scheduler/mod.rs b/ballista/rust/core/src/serde/scheduler/mod.rs index 8c13c3210eefc..43438e29dbe2e 100644 --- a/ballista/rust/core/src/serde/scheduler/mod.rs +++ b/ballista/rust/core/src/serde/scheduler/mod.rs @@ -77,6 +77,7 @@ pub struct ExecutorMeta { pub id: String, pub host: String, pub port: u16, + pub grpc_port: u16, } #[allow(clippy::from_over_into)] @@ -86,6 +87,7 @@ impl Into for ExecutorMeta { id: self.id, host: self.host, port: self.port as u32, + grpc_port: self.grpc_port as u32, } } } @@ -96,10 +98,149 @@ impl From for ExecutorMeta { id: meta.id, host: meta.host, port: meta.port as u16, + grpc_port: meta.grpc_port as u16, } } } +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)] +pub struct ExecutorSpecification { + pub task_slots: u32, +} + +#[allow(clippy::from_over_into)] +impl Into for ExecutorSpecification { + fn into(self) -> protobuf::ExecutorSpecification { + protobuf::ExecutorSpecification { + resources: vec![protobuf::executor_resource::Resource::TaskSlots( + self.task_slots, + )] + .into_iter() + .map(|r| protobuf::ExecutorResource { resource: Some(r) }) + .collect(), + } + } +} + +impl From for ExecutorSpecification { + fn from(input: protobuf::ExecutorSpecification) -> Self { + let mut ret = Self { task_slots: 0 }; + for resource in input.resources { + if let Some(protobuf::executor_resource::Resource::TaskSlots(task_slots)) = + resource.resource + { + ret.task_slots = task_slots + } + } + ret + } +} + +#[derive(Debug, Clone, Serialize)] +pub struct ExecutorData { + pub executor_id: String, + pub total_task_slots: u32, + pub available_task_slots: u32, +} + +struct ExecutorResourcePair { + total: protobuf::executor_resource::Resource, + available: protobuf::executor_resource::Resource, +} + +#[allow(clippy::from_over_into)] +impl Into for ExecutorData { + fn into(self) -> protobuf::ExecutorData { + protobuf::ExecutorData { + executor_id: self.executor_id, + resources: vec![ExecutorResourcePair { + total: protobuf::executor_resource::Resource::TaskSlots( + self.total_task_slots, + ), + available: protobuf::executor_resource::Resource::TaskSlots( + self.available_task_slots, + ), + }] + .into_iter() + .map(|r| protobuf::ExecutorResourcePair { + total: Some(protobuf::ExecutorResource { + resource: Some(r.total), + }), + available: Some(protobuf::ExecutorResource { + resource: Some(r.available), + }), + }) + .collect(), + } + } +} + +impl From for ExecutorData { + fn from(input: protobuf::ExecutorData) -> Self { + let mut ret = Self { + executor_id: input.executor_id, + total_task_slots: 0, + available_task_slots: 0, + }; + for resource in input.resources { + if let Some(task_slots) = resource.total { + if let Some(protobuf::executor_resource::Resource::TaskSlots( + task_slots, + )) = task_slots.resource + { + ret.total_task_slots = task_slots + } + }; + if let Some(task_slots) = resource.available { + if let Some(protobuf::executor_resource::Resource::TaskSlots( + task_slots, + )) = task_slots.resource + { + ret.available_task_slots = task_slots + } + }; + } + ret + } +} + +#[derive(Debug, Clone, Copy, Serialize)] +pub struct ExecutorState { + // in bytes + pub available_memory_size: u64, +} + +#[allow(clippy::from_over_into)] +impl Into for ExecutorState { + fn into(self) -> protobuf::ExecutorState { + protobuf::ExecutorState { + metrics: vec![protobuf::executor_metric::Metric::AvailableMemory( + self.available_memory_size, + )] + .into_iter() + .map(|m| protobuf::ExecutorMetric { metric: Some(m) }) + .collect(), + } + } +} + +impl From for ExecutorState { + fn from(input: protobuf::ExecutorState) -> Self { + let mut ret = Self { + available_memory_size: u64::MAX, + }; + for metric in input.metrics { + if let Some(protobuf::executor_metric::Metric::AvailableMemory( + available_memory_size, + )) = metric.metric + { + ret.available_memory_size = available_memory_size + } + } + ret + } +} + /// Summary of executed partition #[derive(Debug, Copy, Clone, Default)] pub struct PartitionStats { diff --git a/ballista/rust/executor/Cargo.toml b/ballista/rust/executor/Cargo.toml index c01bb20681dbd..1f1625f8a9f2c 100644 --- a/ballista/rust/executor/Cargo.toml +++ b/ballista/rust/executor/Cargo.toml @@ -45,6 +45,7 @@ tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread"] } tokio-stream = { version = "0.1", features = ["net"] } tonic = "0.6" uuid = { version = "0.8", features = ["v4"] } +hyper = "0.14.4" [dev-dependencies] diff --git a/ballista/rust/executor/executor_config_spec.toml b/ballista/rust/executor/executor_config_spec.toml index 6f170c85e8234..1dd3de99012c1 100644 --- a/ballista/rust/executor/executor_config_spec.toml +++ b/ballista/rust/executor/executor_config_spec.toml @@ -54,6 +54,12 @@ type = "u16" default = "50051" doc = "bind port" +[[param]] +name = "bind_grpc_port" +type = "u16" +default = "50052" +doc = "bind grpc service port" + [[param]] name = "work_dir" type = "String" @@ -65,3 +71,10 @@ name = "concurrent_tasks" type = "usize" default = "4" doc = "Max concurrent tasks." + +[[param]] +abbr = "s" +name = "task_scheduling_policy" +type = "ballista_core::config::TaskSchedulingPolicy" +doc = "The task scheduing policy for the scheduler, see TaskSchedulingPolicy::variants() for options. Default: PullStaged" +default = "ballista_core::config::TaskSchedulingPolicy::PullStaged" diff --git a/ballista/rust/executor/src/execution_loop.rs b/ballista/rust/executor/src/execution_loop.rs index 4d12dfc1c7551..69bc8380c2c2e 100644 --- a/ballista/rust/executor/src/execution_loop.rs +++ b/ballista/rust/executor/src/execution_loop.rs @@ -26,12 +26,11 @@ use tonic::transport::Channel; use ballista_core::serde::protobuf::ExecutorRegistration; use ballista_core::serde::protobuf::{ - self, scheduler_grpc_client::SchedulerGrpcClient, task_status, FailedTask, - PartitionId, PollWorkParams, PollWorkResult, ShuffleWritePartition, TaskDefinition, - TaskStatus, + scheduler_grpc_client::SchedulerGrpcClient, PollWorkParams, PollWorkResult, + TaskDefinition, TaskStatus, }; -use protobuf::CompletedTask; +use crate::as_task_status; use crate::executor::Executor; use ballista_core::error::BallistaError; use ballista_core::serde::physical_plan::from_proto::parse_protobuf_hash_partitioning; @@ -144,37 +143,6 @@ async fn run_received_tasks( Ok(()) } -fn as_task_status( - execution_result: ballista_core::error::Result>, - executor_id: String, - task_id: PartitionId, -) -> TaskStatus { - match execution_result { - Ok(partitions) => { - info!("Task {:?} finished", task_id); - - TaskStatus { - partition_id: Some(task_id), - status: Some(task_status::Status::Completed(CompletedTask { - executor_id, - partitions, - })), - } - } - Err(e) => { - let error_msg = e.to_string(); - info!("Task {:?} failed: {}", task_id, error_msg); - - TaskStatus { - partition_id: Some(task_id), - status: Some(task_status::Status::Failed(FailedTask { - error: format!("Task failed due to Tokio error: {}", error_msg), - })), - } - } - } -} - async fn sample_tasks_status( task_status_receiver: &mut Receiver, ) -> Vec { diff --git a/ballista/rust/executor/src/executor.rs b/ballista/rust/executor/src/executor.rs index ff2f08faafd48..e7479bd5fe9fc 100644 --- a/ballista/rust/executor/src/executor.rs +++ b/ballista/rust/executor/src/executor.rs @@ -22,6 +22,7 @@ use std::sync::Arc; use ballista_core::error::BallistaError; use ballista_core::execution_plans::ShuffleWriterExec; use ballista_core::serde::protobuf; +use ballista_core::serde::scheduler::ExecutorSpecification; use datafusion::error::DataFusionError; use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use datafusion::physical_plan::display::DisplayableExecutionPlan; @@ -31,13 +32,27 @@ use datafusion::physical_plan::{ExecutionPlan, Partitioning}; pub struct Executor { /// Directory for storing partial results work_dir: String, + + /// Specification like total task slots + pub specification: ExecutorSpecification, } impl Executor { /// Create a new executor instance pub fn new(work_dir: &str) -> Self { + Executor::new_with_specification( + work_dir, + ExecutorSpecification { task_slots: 4 }, + ) + } + + pub fn new_with_specification( + work_dir: &str, + specification: ExecutorSpecification, + ) -> Self { Self { work_dir: work_dir.to_owned(), + specification, } } } diff --git a/ballista/rust/executor/src/executor_server.rs b/ballista/rust/executor/src/executor_server.rs new file mode 100644 index 0000000000000..3f220ea83a391 --- /dev/null +++ b/ballista/rust/executor/src/executor_server.rs @@ -0,0 +1,291 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::convert::TryInto; +use std::sync::Arc; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use tokio::sync::mpsc; + +use log::{debug, info}; +use tonic::transport::{Channel, Server}; +use tonic::{Request, Response, Status}; + +use ballista_core::error::BallistaError; +use ballista_core::serde::physical_plan::from_proto::parse_protobuf_hash_partitioning; +use ballista_core::serde::protobuf::executor_grpc_server::{ + ExecutorGrpc, ExecutorGrpcServer, +}; +use ballista_core::serde::protobuf::executor_registration::OptionalHost; +use ballista_core::serde::protobuf::scheduler_grpc_client::SchedulerGrpcClient; +use ballista_core::serde::protobuf::{ + ExecutorRegistration, LaunchTaskParams, LaunchTaskResult, RegisterExecutorParams, + SendHeartBeatParams, StopExecutorParams, StopExecutorResult, TaskDefinition, + UpdateTaskStatusParams, +}; +use ballista_core::serde::scheduler::{ExecutorSpecification, ExecutorState}; +use datafusion::physical_plan::ExecutionPlan; + +use crate::as_task_status; +use crate::executor::Executor; + +pub async fn startup( + mut scheduler: SchedulerGrpcClient, + executor: Arc, + executor_meta: ExecutorRegistration, +) { + // TODO make the buffer size configurable + let (tx_task, rx_task) = mpsc::channel::(1000); + + let executor_server = ExecutorServer::new( + scheduler.clone(), + executor.clone(), + executor_meta.clone(), + ExecutorEnv { tx_task }, + ); + + // 1. Start executor grpc service + { + let executor_meta = executor_meta.clone(); + let addr = format!( + "{}:{}", + executor_meta + .optional_host + .map(|h| match h { + OptionalHost::Host(host) => host, + }) + .unwrap_or_else(|| String::from("127.0.0.1")), + executor_meta.grpc_port + ); + let addr = addr.parse().unwrap(); + info!("Setup executor grpc service for {:?}", addr); + + let server = ExecutorGrpcServer::new(executor_server.clone()); + let grpc_server_future = Server::builder().add_service(server).serve(addr); + tokio::spawn(async move { grpc_server_future.await }); + } + + let executor_server = Arc::new(executor_server); + + // 2. Do executor registration + match register_executor(&mut scheduler, &executor_meta, &executor.specification).await + { + Ok(_) => { + info!("Executor registration succeed"); + } + Err(error) => { + panic!("Executor registration failed due to: {}", error); + } + }; + + // 3. Start Heartbeater + { + let heartbeater = Heartbeater::new(executor_server.clone()); + heartbeater.start().await; + } + + // 4. Start TaskRunnerPool + { + let task_runner_pool = TaskRunnerPool::new(executor_server.clone()); + task_runner_pool.start(rx_task).await; + } +} + +#[allow(clippy::clone_on_copy)] +async fn register_executor( + scheduler: &mut SchedulerGrpcClient, + executor_meta: &ExecutorRegistration, + specification: &ExecutorSpecification, +) -> Result<(), BallistaError> { + let result = scheduler + .register_executor(RegisterExecutorParams { + metadata: Some(executor_meta.clone()), + specification: Some(specification.clone().into()), + }) + .await?; + if result.into_inner().success { + Ok(()) + } else { + Err(BallistaError::General( + "Executor registration failed!!!".to_owned(), + )) + } +} + +#[derive(Clone)] +pub struct ExecutorServer { + _start_time: u128, + executor: Arc, + executor_meta: ExecutorRegistration, + scheduler: SchedulerGrpcClient, + executor_env: ExecutorEnv, +} + +#[derive(Clone)] +struct ExecutorEnv { + tx_task: mpsc::Sender, +} + +unsafe impl Sync for ExecutorEnv {} + +impl ExecutorServer { + fn new( + scheduler: SchedulerGrpcClient, + executor: Arc, + executor_meta: ExecutorRegistration, + executor_env: ExecutorEnv, + ) -> Self { + Self { + _start_time: SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis(), + executor, + executor_meta, + scheduler, + executor_env, + } + } + + async fn heartbeat(&self) { + // TODO Error handling + self.scheduler + .clone() + .send_heart_beat(SendHeartBeatParams { + metadata: Some(self.executor_meta.clone()), + state: Some(self.get_executor_state().await.into()), + }) + .await + .unwrap(); + } + + async fn run_task(&self, task: TaskDefinition) -> Result<(), BallistaError> { + let task_id = task.task_id.unwrap(); + let task_id_log = format!( + "{}/{}/{}", + task_id.job_id, task_id.stage_id, task_id.partition_id + ); + info!("Start to run task {}", task_id_log); + + let plan: Arc = (&task.plan.unwrap()).try_into().unwrap(); + let shuffle_output_partitioning = + parse_protobuf_hash_partitioning(task.output_partitioning.as_ref())?; + + let execution_result = self + .executor + .execute_shuffle_write( + task_id.job_id.clone(), + task_id.stage_id as usize, + task_id.partition_id as usize, + plan, + shuffle_output_partitioning, + ) + .await; + info!("Done with task {}", task_id_log); + debug!("Statistics: {:?}", execution_result); + + // TODO use another channel to update the status of a task set + self.scheduler + .clone() + .update_task_status(UpdateTaskStatusParams { + metadata: Some(self.executor_meta.clone()), + task_status: vec![as_task_status( + execution_result, + self.executor_meta.id.clone(), + task_id, + )], + }) + .await?; + + Ok(()) + } + + // TODO with real state + async fn get_executor_state(&self) -> ExecutorState { + ExecutorState { + available_memory_size: u64::MAX, + } + } +} + +struct Heartbeater { + executor_server: Arc, +} + +impl Heartbeater { + fn new(executor_server: Arc) -> Self { + Self { executor_server } + } + + async fn start(&self) { + let executor_server = self.executor_server.clone(); + tokio::spawn(async move { + info!("Starting heartbeater to send heartbeat the scheduler periodically"); + loop { + executor_server.heartbeat().await; + tokio::time::sleep(Duration::from_millis(60000)).await; + } + }); + } +} + +struct TaskRunnerPool { + executor_server: Arc, +} + +impl TaskRunnerPool { + fn new(executor_server: Arc) -> Self { + Self { executor_server } + } + + async fn start(&self, mut rx_task: mpsc::Receiver) { + let executor_server = self.executor_server.clone(); + tokio::spawn(async move { + info!("Starting the task runner pool"); + loop { + let task = rx_task.recv().await.unwrap(); + info!("Received task {:?}", task); + + let server = executor_server.clone(); + tokio::spawn(async move { + server.run_task(task).await.unwrap(); + }); + } + }); + } +} + +#[tonic::async_trait] +impl ExecutorGrpc for ExecutorServer { + async fn launch_task( + &self, + request: Request, + ) -> Result, Status> { + let tasks = request.into_inner().task; + let task_sender = self.executor_env.tx_task.clone(); + for task in tasks { + task_sender.send(task).await.unwrap(); + } + Ok(Response::new(LaunchTaskResult { success: true })) + } + + async fn stop_executor( + &self, + _request: Request, + ) -> Result, Status> { + todo!() + } +} diff --git a/ballista/rust/executor/src/lib.rs b/ballista/rust/executor/src/lib.rs index 714698b357f6c..a2711da08cc4f 100644 --- a/ballista/rust/executor/src/lib.rs +++ b/ballista/rust/executor/src/lib.rs @@ -20,7 +20,46 @@ pub mod collect; pub mod execution_loop; pub mod executor; +pub mod executor_server; pub mod flight_service; mod standalone; pub use standalone::new_standalone_executor; + +use log::info; + +use ballista_core::serde::protobuf::{ + task_status, CompletedTask, FailedTask, PartitionId, ShuffleWritePartition, + TaskStatus, +}; + +pub fn as_task_status( + execution_result: ballista_core::error::Result>, + executor_id: String, + task_id: PartitionId, +) -> TaskStatus { + match execution_result { + Ok(partitions) => { + info!("Task {:?} finished", task_id); + + TaskStatus { + partition_id: Some(task_id), + status: Some(task_status::Status::Completed(CompletedTask { + executor_id, + partitions, + })), + } + } + Err(e) => { + let error_msg = e.to_string(); + info!("Task {:?} failed: {}", task_id, error_msg); + + TaskStatus { + partition_id: Some(task_id), + status: Some(task_status::Status::Failed(FailedTask { + error: format!("Task failed due to Tokio error: {}", error_msg), + })), + } + } + } +} diff --git a/ballista/rust/executor/src/main.rs b/ballista/rust/executor/src/main.rs index b411a776f8291..2321ce338eb58 100644 --- a/ballista/rust/executor/src/main.rs +++ b/ballista/rust/executor/src/main.rs @@ -21,16 +21,18 @@ use std::sync::Arc; use anyhow::{Context, Result}; use arrow_flight::flight_service_server::FlightServiceServer; -use ballista_executor::execution_loop; +use ballista_executor::{execution_loop, executor_server}; use log::info; use tempfile::TempDir; use tonic::transport::Server; use uuid::Uuid; +use ballista_core::config::TaskSchedulingPolicy; use ballista_core::serde::protobuf::{ executor_registration, scheduler_grpc_client::SchedulerGrpcClient, ExecutorRegistration, }; +use ballista_core::serde::scheduler::ExecutorSpecification; use ballista_core::{print_version, BALLISTA_VERSION}; use ballista_executor::executor::Executor; use ballista_executor::flight_service::BallistaFlightService; @@ -67,6 +69,7 @@ async fn main() -> Result<()> { let external_host = opt.external_host; let bind_host = opt.bind_host; let port = opt.bind_port; + let grpc_port = opt.bind_grpc_port; let addr = format!("{}:{}", bind_host, port); let addr = addr @@ -94,32 +97,54 @@ async fn main() -> Result<()> { .clone() .map(executor_registration::OptionalHost::Host), port: port as u32, + grpc_port: grpc_port as u32, }; + let executor_specification = ExecutorSpecification { + task_slots: opt.concurrent_tasks as u32, + }; + let executor = Arc::new(Executor::new_with_specification( + &work_dir, + executor_specification, + )); let scheduler = SchedulerGrpcClient::connect(scheduler_url) .await .context("Could not connect to scheduler")?; - let executor = Arc::new(Executor::new(&work_dir)); - - let service = BallistaFlightService::new(executor.clone()); + let scheduler_policy = opt.task_scheduling_policy; + match scheduler_policy { + TaskSchedulingPolicy::PushStaged => { + tokio::spawn(executor_server::startup( + scheduler, + executor.clone(), + executor_meta, + )); + } + _ => { + tokio::spawn(execution_loop::poll_loop( + scheduler, + executor.clone(), + executor_meta, + opt.concurrent_tasks, + )); + } + } - let server = FlightServiceServer::new(service); - info!( - "Ballista v{} Rust Executor listening on {:?}", - BALLISTA_VERSION, addr - ); - let server_future = tokio::spawn(Server::builder().add_service(server).serve(addr)); - tokio::spawn(execution_loop::poll_loop( - scheduler, - executor, - executor_meta, - opt.concurrent_tasks, - )); + // Arrow flight service + { + let service = BallistaFlightService::new(executor.clone()); + let server = FlightServiceServer::new(service); + info!( + "Ballista v{} Rust Executor listening on {:?}", + BALLISTA_VERSION, addr + ); + let server_future = + tokio::spawn(Server::builder().add_service(server).serve(addr)); + server_future + .await + .context("Tokio error")? + .context("Could not start executor server")?; + } - server_future - .await - .context("Tokio error")? - .context("Could not start executor server")?; Ok(()) } diff --git a/ballista/rust/executor/src/standalone.rs b/ballista/rust/executor/src/standalone.rs index 04174d4de2147..03f47bb77b0cf 100644 --- a/ballista/rust/executor/src/standalone.rs +++ b/ballista/rust/executor/src/standalone.rs @@ -62,6 +62,8 @@ pub async fn new_standalone_executor( id: Uuid::new_v4().to_string(), // assign this executor a unique ID optional_host: Some(OptionalHost::Host("localhost".to_string())), port: addr.port() as u32, + // TODO Make it configurable + grpc_port: 50020, }; tokio::spawn(execution_loop::poll_loop( scheduler, diff --git a/ballista/rust/scheduler/scheduler_config_spec.toml b/ballista/rust/scheduler/scheduler_config_spec.toml index 81e77d31b0a00..cf03fc08a72a9 100644 --- a/ballista/rust/scheduler/scheduler_config_spec.toml +++ b/ballista/rust/scheduler/scheduler_config_spec.toml @@ -57,4 +57,11 @@ abbr = "p" name = "bind_port" type = "u16" default = "50050" -doc = "bind port. Default: 50050" \ No newline at end of file +doc = "bind port. Default: 50050" + +[[param]] +abbr = "s" +name = "scheduler_policy" +type = "ballista_core::config::TaskSchedulingPolicy" +doc = "The scheduing policy for the scheduler, see TaskSchedulingPolicy::variants() for options. Default: PullStaged" +default = "ballista_core::config::TaskSchedulingPolicy::PullStaged" \ No newline at end of file diff --git a/ballista/rust/scheduler/src/lib.rs b/ballista/rust/scheduler/src/lib.rs index 61da0d9cbf3e2..4b2f17859151d 100644 --- a/ballista/rust/scheduler/src/lib.rs +++ b/ballista/rust/scheduler/src/lib.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -#![doc = include_str!("../README.md")] +#![doc = include_str ! ("../README.md")] pub mod api; pub mod planner; @@ -41,21 +41,27 @@ pub mod externalscaler { include!(concat!(env!("OUT_DIR"), "/externalscaler.rs")); } +use std::collections::{HashMap, HashSet}; +use std::fmt; use std::{convert::TryInto, sync::Arc}; -use std::{fmt, net::IpAddr}; use ballista_core::serde::protobuf::{ execute_query_params::Query, executor_registration::OptionalHost, job_status, scheduler_grpc_server::SchedulerGrpc, task_status, ExecuteQueryParams, ExecuteQueryResult, FailedJob, FileType, GetFileMetadataParams, GetFileMetadataResult, GetJobStatusParams, GetJobStatusResult, JobStatus, - PartitionId, PollWorkParams, PollWorkResult, QueuedJob, RunningJob, TaskDefinition, - TaskStatus, + LaunchTaskParams, PartitionId, PollWorkParams, PollWorkResult, QueuedJob, + RegisterExecutorParams, RegisterExecutorResult, RunningJob, SendHeartBeatParams, + SendHeartBeatResult, TaskDefinition, TaskStatus, UpdateTaskStatusParams, + UpdateTaskStatusResult, +}; +use ballista_core::serde::scheduler::{ + ExecutorData, ExecutorMeta, ExecutorSpecification, }; -use ballista_core::serde::scheduler::ExecutorMeta; use clap::arg_enum; use datafusion::physical_plan::ExecutionPlan; + #[cfg(feature = "sled")] extern crate sled_package as sled; @@ -81,29 +87,51 @@ use crate::externalscaler::{ }; use crate::planner::DistributedPlanner; -use log::{debug, error, info, warn}; +use log::{debug, error, info, trace, warn}; use rand::{distributions::Alphanumeric, thread_rng, Rng}; use tonic::{Request, Response, Status}; use self::state::{ConfigBackendClient, SchedulerState}; -use ballista_core::config::BallistaConfig; +use anyhow::Context; +use ballista_core::config::{BallistaConfig, TaskSchedulingPolicy}; +use ballista_core::error::BallistaError; use ballista_core::execution_plans::ShuffleWriterExec; +use ballista_core::serde::protobuf::executor_grpc_client::ExecutorGrpcClient; use ballista_core::serde::scheduler::to_proto::hash_partitioning_to_proto; use datafusion::prelude::{ExecutionConfig, ExecutionContext}; -use std::time::{Instant, SystemTime, UNIX_EPOCH}; +use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; +use tokio::sync::{mpsc, RwLock}; +use tonic::transport::Channel; #[derive(Clone)] pub struct SchedulerServer { - caller_ip: IpAddr, pub(crate) state: Arc, start_time: u128, + policy: TaskSchedulingPolicy, + scheduler_env: Option, + executors_client: Arc>>>, +} + +#[derive(Clone)] +pub struct SchedulerEnv { + pub tx_job: mpsc::Sender, } impl SchedulerServer { - pub fn new( + pub fn new(config: Arc, namespace: String) -> Self { + SchedulerServer::new_with_policy( + config, + namespace, + TaskSchedulingPolicy::PullStaged, + None, + ) + } + + pub fn new_with_policy( config: Arc, namespace: String, - caller_ip: IpAddr, + policy: TaskSchedulingPolicy, + scheduler_env: Option, ) -> Self { let state = Arc::new(SchedulerState::new(config, namespace)); let state_clone = state.clone(); @@ -112,17 +140,178 @@ impl SchedulerServer { tokio::spawn(async move { state_clone.synchronize_job_status_loop().await }); Self { - caller_ip, state, start_time: SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() .as_millis(), + policy, + scheduler_env, + executors_client: Arc::new(RwLock::new(HashMap::new())), + } + } + + async fn schedule_job(&self, job_id: String) -> Result<(), BallistaError> { + let alive_executors = self + .state + .get_alive_executors_metadata_within_one_minute() + .await?; + let alive_executors: HashMap = alive_executors + .into_iter() + .map(|e| (e.id.clone(), e)) + .collect(); + let available_executors = self.state.get_available_executors_data().await?; + let mut available_executors: Vec = available_executors + .into_iter() + .filter(|e| alive_executors.contains_key(&e.executor_id)) + .collect(); + + // In case of there's no enough resources, reschedule the tasks of the job + if available_executors.is_empty() { + let tx_job = self.scheduler_env.as_ref().unwrap().tx_job.clone(); + // TODO Maybe it's better to use an exclusive runtime for this kind task scheduling + tokio::spawn(async move { + warn!("Not enough available executors for task running"); + tokio::time::sleep(Duration::from_millis(100)).await; + tx_job.send(job_id).await.unwrap(); + }); + return Ok(()); + } + + let (tasks_assigment, num_tasks) = + self.fetch_tasks(&mut available_executors, &job_id).await?; + if num_tasks > 0 { + for (idx_executor, tasks) in tasks_assigment.into_iter().enumerate() { + if !tasks.is_empty() { + let executor_data = &available_executors[idx_executor]; + debug!( + "Start to launch tasks {:?} to executor {:?}", + tasks, executor_data.executor_id + ); + let mut client = { + let clients = self.executors_client.read().await; + info!("Size of executor clients: {:?}", clients.len()); + clients.get(&executor_data.executor_id).unwrap().clone() + }; + // Update the resources first + self.state.save_executor_data(executor_data.clone()).await?; + // TODO check whether launching task is successful or not + client.launch_task(LaunchTaskParams { task: tasks }).await?; + } else { + // Since the task assignment policy is round robin, + // if find tasks for one executor is empty, just break fast + break; + } + } + return Ok(()); + } + + Ok(()) + } + + async fn fetch_tasks( + &self, + available_executors: &mut Vec, + job_id: &str, + ) -> Result<(Vec>, usize), BallistaError> { + let mut ret: Vec> = + Vec::with_capacity(available_executors.len()); + for _idx in 0..available_executors.len() { + ret.push(Vec::new()); + } + let mut num_tasks = 0; + loop { + info!("Go inside fetching task loop"); + let mut has_tasks = true; + for (idx, executor) in available_executors.iter_mut().enumerate() { + if executor.available_task_slots == 0 { + break; + } + let plan = self + .state + .assign_next_schedulable_job_task(&executor.executor_id, job_id) + .await + .map_err(|e| { + let msg = format!("Error finding next assignable task: {}", e); + error!("{}", msg); + tonic::Status::internal(msg) + })?; + if let Some((task, _plan)) = &plan { + let partition_id = task.partition_id.as_ref().unwrap(); + info!( + "Sending new task to {}: {}/{}/{}", + executor.executor_id, + partition_id.job_id, + partition_id.stage_id, + partition_id.partition_id + ); + } + match plan { + Some((status, plan)) => { + let plan_clone = plan.clone(); + let output_partitioning = if let Some(shuffle_writer) = + plan_clone.as_any().downcast_ref::() + { + shuffle_writer.shuffle_output_partitioning() + } else { + return Err(BallistaError::General(format!( + "Task root plan was not a ShuffleWriterExec: {:?}", + plan_clone + ))); + }; + + ret[idx].push(TaskDefinition { + plan: Some(plan.try_into().unwrap()), + task_id: status.partition_id, + output_partitioning: hash_partitioning_to_proto( + output_partitioning, + ) + .map_err(|_| Status::internal("TBD".to_string()))?, + }); + executor.available_task_slots -= 1; + num_tasks += 1; + } + _ => { + // Indicate there's no more tasks to be scheduled + has_tasks = false; + break; + } + } + } + if !has_tasks { + break; + } + let has_executors = + available_executors.get(0).unwrap().available_task_slots > 0; + if !has_executors { + break; + } } + Ok((ret, num_tasks)) } +} - pub fn set_caller_ip(&mut self, ip: IpAddr) { - self.caller_ip = ip; +pub struct TaskScheduler { + scheduler_server: Arc, +} + +impl TaskScheduler { + pub fn new(scheduler_server: Arc) -> Self { + Self { scheduler_server } + } + + pub fn start(&self, mut rx_job: mpsc::Receiver) { + let scheduler_server = self.scheduler_server.clone(); + tokio::spawn(async move { + info!("Starting the task scheduler"); + loop { + let job_id = rx_job.recv().await.unwrap(); + info!("Fetch job {:?} to be scheduled", job_id.clone()); + + let server = scheduler_server.clone(); + server.schedule_job(job_id).await.unwrap(); + } + }); } } @@ -181,6 +370,13 @@ impl SchedulerGrpc for SchedulerServer { &self, request: Request, ) -> std::result::Result, tonic::Status> { + if let TaskSchedulingPolicy::PushStaged = self.policy { + error!("Poll work interface is not supported for push-based task scheduling"); + return Err(tonic::Status::failed_precondition( + "Bad request because poll work is not supported for push-based task scheduling", + )); + } + let remote_addr = request.remote_addr(); if let PollWorkParams { metadata: Some(metadata), can_accept_task, @@ -195,8 +391,9 @@ impl SchedulerGrpc for SchedulerServer { .map(|h| match h { OptionalHost::Host(host) => host, }) - .unwrap_or_else(|| self.caller_ip.to_string()), + .unwrap_or_else(|| remote_addr.unwrap().ip().to_string()), port: metadata.port as u16, + grpc_port: metadata.grpc_port as u16, }; let mut lock = self.state.lock().await.map_err(|e| { let msg = format!("Could not lock the state: {}", e); @@ -278,6 +475,195 @@ impl SchedulerGrpc for SchedulerServer { } } + async fn register_executor( + &self, + request: Request, + ) -> Result, Status> { + let remote_addr = request.remote_addr(); + if let RegisterExecutorParams { + metadata: Some(metadata), + specification: Some(specification), + } = request.into_inner() + { + info!("Received register executor request for {:?}", metadata); + let metadata: ExecutorMeta = ExecutorMeta { + id: metadata.id, + host: metadata + .optional_host + .map(|h| match h { + OptionalHost::Host(host) => host, + }) + .unwrap_or_else(|| remote_addr.unwrap().ip().to_string()), + port: metadata.port as u16, + grpc_port: metadata.grpc_port as u16, + }; + // Check whether the executor starts the grpc service + { + let executor_url = + format!("http://{}:{}", metadata.host, metadata.grpc_port); + info!("Connect to executor {:?}", executor_url); + let executor_client = ExecutorGrpcClient::connect(executor_url) + .await + .context("Could not connect to executor") + .map_err(|e| tonic::Status::internal(format!("{:?}", e)))?; + let mut clients = self.executors_client.write().await; + // TODO check duplicated registration + clients.insert(metadata.id.clone(), executor_client); + info!("Size of executor clients: {:?}", clients.len()); + } + let mut lock = self.state.lock().await.map_err(|e| { + let msg = format!("Could not lock the state: {}", e); + error!("{}", msg); + tonic::Status::internal(msg) + })?; + self.state + .save_executor_metadata(metadata.clone()) + .await + .map_err(|e| { + let msg = format!("Could not save executor metadata: {}", e); + error!("{}", msg); + tonic::Status::internal(msg) + })?; + let executor_spec: ExecutorSpecification = specification.into(); + let executor_data = ExecutorData { + executor_id: metadata.id.clone(), + total_task_slots: executor_spec.task_slots, + available_task_slots: executor_spec.task_slots, + }; + self.state + .save_executor_data(executor_data) + .await + .map_err(|e| { + let msg = format!("Could not save executor data: {}", e); + error!("{}", msg); + tonic::Status::internal(msg) + })?; + lock.unlock().await; + Ok(Response::new(RegisterExecutorResult { success: true })) + } else { + warn!("Received invalid register executor request"); + Err(tonic::Status::invalid_argument( + "Missing metadata in request", + )) + } + } + + async fn send_heart_beat( + &self, + request: Request, + ) -> Result, Status> { + let remote_addr = request.remote_addr(); + if let SendHeartBeatParams { + metadata: Some(metadata), + state: Some(state), + } = request.into_inner() + { + debug!("Received heart beat request for {:?}", metadata); + trace!("Related executor state is {:?}", state); + let metadata: ExecutorMeta = ExecutorMeta { + id: metadata.id, + host: metadata + .optional_host + .map(|h| match h { + OptionalHost::Host(host) => host, + }) + .unwrap_or_else(|| remote_addr.unwrap().ip().to_string()), + port: metadata.port as u16, + grpc_port: metadata.grpc_port as u16, + }; + { + let mut lock = self.state.lock().await.map_err(|e| { + let msg = format!("Could not lock the state: {}", e); + error!("{}", msg); + tonic::Status::internal(msg) + })?; + self.state + .save_executor_state(metadata, Some(state)) + .await + .map_err(|e| { + let msg = format!("Could not save executor metadata: {}", e); + error!("{}", msg); + tonic::Status::internal(msg) + })?; + lock.unlock().await; + } + Ok(Response::new(SendHeartBeatResult { reregister: false })) + } else { + warn!("Received invalid executor heart beat request"); + Err(tonic::Status::invalid_argument( + "Missing metadata or metrics in request", + )) + } + } + + async fn update_task_status( + &self, + request: Request, + ) -> Result, Status> { + if let UpdateTaskStatusParams { + metadata: Some(metadata), + task_status, + } = request.into_inner() + { + debug!("Received task status update request for {:?}", metadata); + trace!("Related task status is {:?}", task_status); + let mut jobs = HashSet::new(); + { + let mut lock = self.state.lock().await.map_err(|e| { + let msg = format!("Could not lock the state: {}", e); + error!("{}", msg); + tonic::Status::internal(msg) + })?; + let num_tasks = task_status.len(); + for task_status in task_status { + self.state + .save_task_status(&task_status) + .await + .map_err(|e| { + let msg = format!("Could not save task status: {}", e); + error!("{}", msg); + tonic::Status::internal(msg) + })?; + if task_status.partition_id.is_some() { + jobs.insert(task_status.partition_id.unwrap().job_id.clone()); + } + } + let mut executor_data = self + .state + .get_executor_data(&metadata.id) + .await + .map_err(|e| { + let msg = format!( + "Could not get metadata data for id {:?}: {}", + &metadata.id, e + ); + error!("{}", msg); + tonic::Status::internal(msg) + })?; + executor_data.available_task_slots += num_tasks as u32; + self.state + .save_executor_data(executor_data) + .await + .map_err(|e| { + let msg = format!("Could not save metadata data: {}", e); + error!("{}", msg); + tonic::Status::internal(msg) + })?; + lock.unlock().await; + } + let tx_job = self.scheduler_env.as_ref().unwrap().tx_job.clone(); + for job_id in jobs { + tx_job.send(job_id).await.unwrap(); + } + Ok(Response::new(UpdateTaskStatusResult { success: true })) + } else { + warn!("Received invalid task status update request"); + Err(tonic::Status::invalid_argument( + "Missing metadata or task status in request", + )) + } + } + async fn get_file_metadata( &self, request: Request, @@ -390,6 +776,12 @@ impl SchedulerGrpc for SchedulerServer { let state = self.state.clone(); let job_id_spawn = job_id.clone(); + let tx_job: Option> = match self.policy { + TaskSchedulingPolicy::PullStaged => None, + TaskSchedulingPolicy::PushStaged => { + Some(self.scheduler_env.as_ref().unwrap().tx_job.clone()) + } + }; tokio::spawn(async move { // create physical plan using DataFusion let datafusion_ctx = create_datafusion_context(&config); @@ -503,6 +895,11 @@ impl SchedulerGrpc for SchedulerServer { )); } } + + if let Some(tx_job) = tx_job { + // Send job_id to the scheduler channel + tx_job.send(job_id_spawn).await.unwrap(); + } }); Ok(Response::new(ExecuteQueryResult { job_id })) @@ -537,10 +934,7 @@ pub fn create_datafusion_context(config: &BallistaConfig) -> ExecutionContext { #[cfg(all(test, feature = "sled"))] mod test { - use std::{ - net::{IpAddr, Ipv4Addr}, - sync::Arc, - }; + use std::sync::Arc; use tonic::Request; @@ -558,16 +952,13 @@ mod test { async fn test_poll_work() -> Result<(), BallistaError> { let state = Arc::new(StandaloneClient::try_new_temporary()?); let namespace = "default"; - let scheduler = SchedulerServer::new( - state.clone(), - namespace.to_owned(), - IpAddr::V4(Ipv4Addr::LOCALHOST), - ); + let scheduler = SchedulerServer::new(state.clone(), namespace.to_owned()); let state = SchedulerState::new(state, namespace.to_string()); let exec_meta = ExecutorRegistration { id: "abc".to_owned(), optional_host: Some(OptionalHost::Host("".to_owned())), port: 0, + grpc_port: 0, }; let request: Request = Request::new(PollWorkParams { metadata: Some(exec_meta.clone()), diff --git a/ballista/rust/scheduler/src/main.rs b/ballista/rust/scheduler/src/main.rs index 23a0386419879..5da5bbef6197d 100644 --- a/ballista/rust/scheduler/src/main.rs +++ b/ballista/rust/scheduler/src/main.rs @@ -22,8 +22,8 @@ use ballista_scheduler::externalscaler::external_scaler_server::ExternalScalerSe use futures::future::{self, Either, TryFutureExt}; use hyper::{server::conn::AddrStream, service::make_service_fn, Server}; use std::convert::Infallible; -use std::net::{IpAddr, Ipv4Addr}; use std::{net::SocketAddr, sync::Arc}; +use tonic::transport::server::Connected; use tonic::transport::Server as TonicServer; use tower::Service; @@ -36,9 +36,14 @@ use ballista_scheduler::api::{get_routes, EitherBody, Error}; use ballista_scheduler::state::EtcdClient; #[cfg(feature = "sled")] use ballista_scheduler::state::StandaloneClient; -use ballista_scheduler::{state::ConfigBackendClient, ConfigBackend, SchedulerServer}; +use ballista_scheduler::{ + state::ConfigBackendClient, ConfigBackend, SchedulerEnv, SchedulerServer, + TaskScheduler, +}; +use ballista_core::config::TaskSchedulingPolicy; use log::info; +use tokio::sync::mpsc; #[macro_use] extern crate configure_me; @@ -52,29 +57,43 @@ mod config { "/scheduler_configure_me_config.rs" )); } + use config::prelude::*; async fn start_server( config_backend: Arc, namespace: String, addr: SocketAddr, + policy: TaskSchedulingPolicy, ) -> Result<()> { info!( "Ballista v{} Scheduler listening on {:?}", BALLISTA_VERSION, addr ); //should only call SchedulerServer::new() once in the process - let scheduler_server_without_caller_ip = SchedulerServer::new( - config_backend.clone(), - namespace.clone(), - IpAddr::V4(Ipv4Addr::UNSPECIFIED), + info!( + "Starting Scheduler grpc server with task scheduling policy of {:?}", + policy ); + let scheduler_server = match policy { + TaskSchedulingPolicy::PushStaged => { + // TODO make the buffer size configurable + let (tx_job, rx_job) = mpsc::channel::(10000); + let scheduler_server = SchedulerServer::new_with_policy( + config_backend.clone(), + namespace.clone(), + policy, + Some(SchedulerEnv { tx_job }), + ); + let task_scheduler = TaskScheduler::new(Arc::new(scheduler_server.clone())); + task_scheduler.start(rx_job); + scheduler_server + } + _ => SchedulerServer::new(config_backend.clone(), namespace.clone()), + }; Ok(Server::bind(&addr) .serve(make_service_fn(move |request: &AddrStream| { - let mut scheduler_server = scheduler_server_without_caller_ip.clone(); - scheduler_server.set_caller_ip(request.remote_addr().ip()); - let scheduler_grpc_server = SchedulerGrpcServer::new(scheduler_server.clone()); @@ -84,10 +103,16 @@ async fn start_server( .add_service(scheduler_grpc_server) .add_service(keda_scaler) .into_service(); - let mut warp = warp::service(get_routes(scheduler_server)); + let mut warp = warp::service(get_routes(scheduler_server.clone())); + let connect_info = request.connect_info(); future::ok::<_, Infallible>(tower::service_fn( move |req: hyper::Request| { + // Set the connect info from hyper to tonic + let (mut parts, body) = req.into_parts(); + parts.extensions.insert(connect_info.clone()); + let req = http::Request::from_parts(parts, body); + let header = req.headers().get(hyper::header::ACCEPT); if header.is_some() && header.unwrap().eq("application/json") { return Either::Left( @@ -163,6 +188,8 @@ async fn main() -> Result<()> { ) } }; - start_server(client, namespace, addr).await?; + + let policy: TaskSchedulingPolicy = opt.scheduler_policy; + start_server(client, namespace, addr, policy).await?; Ok(()) } diff --git a/ballista/rust/scheduler/src/standalone.rs b/ballista/rust/scheduler/src/standalone.rs index 6ab5bd62a8f03..55239d8b5a5ee 100644 --- a/ballista/rust/scheduler/src/standalone.rs +++ b/ballista/rust/scheduler/src/standalone.rs @@ -20,10 +20,7 @@ use ballista_core::{ BALLISTA_VERSION, }; use log::info; -use std::{ - net::{IpAddr, Ipv4Addr, SocketAddr}, - sync::Arc, -}; +use std::{net::SocketAddr, sync::Arc}; use tokio::net::TcpListener; use tonic::transport::Server; @@ -35,7 +32,6 @@ pub async fn new_standalone_scheduler() -> Result { let server = SchedulerGrpcServer::new(SchedulerServer::new( Arc::new(client), "ballista".to_string(), - IpAddr::V4(Ipv4Addr::LOCALHOST), )); // Let the OS assign a random, free port let listener = TcpListener::bind("localhost:0").await?; diff --git a/ballista/rust/scheduler/src/state/mod.rs b/ballista/rust/scheduler/src/state/mod.rs index ef6de83127027..fb45579628690 100644 --- a/ballista/rust/scheduler/src/state/mod.rs +++ b/ballista/rust/scheduler/src/state/mod.rs @@ -31,7 +31,7 @@ use ballista_core::serde::protobuf::{ ExecutorMetadata, FailedJob, FailedTask, JobStatus, PhysicalPlanNode, RunningJob, RunningTask, TaskStatus, }; -use ballista_core::serde::scheduler::PartitionStats; +use ballista_core::serde::scheduler::{ExecutorData, PartitionStats}; use ballista_core::{error::BallistaError, serde::scheduler::ExecutorMeta}; use ballista_core::{error::Result, execution_plans::UnresolvedShuffleExec}; @@ -118,6 +118,13 @@ impl SchedulerState { Ok(result) } + pub async fn get_alive_executors_metadata_within_one_minute( + &self, + ) -> Result> { + self.get_alive_executors_metadata(Duration::from_secs(60)) + .await + } + pub async fn get_alive_executors_metadata( &self, last_seen_threshold: Duration, @@ -133,6 +140,14 @@ impl SchedulerState { } pub async fn save_executor_metadata(&self, meta: ExecutorMeta) -> Result<()> { + self.save_executor_state(meta, None).await + } + + pub async fn save_executor_state( + &self, + meta: ExecutorMeta, + state: Option, + ) -> Result<()> { let key = get_executor_key(&self.namespace, &meta.id); let meta: ExecutorMetadata = meta.into(); let timestamp = SystemTime::now() @@ -142,11 +157,57 @@ impl SchedulerState { let heartbeat = ExecutorHeartbeat { meta: Some(meta), timestamp, + state, }; let value: Vec = encode_protobuf(&heartbeat)?; self.config_client.put(key, value).await } + pub async fn save_executor_data(&self, executor_data: ExecutorData) -> Result<()> { + let key = get_executor_data_key(&self.namespace, &executor_data.executor_id); + let executor_data: protobuf::ExecutorData = executor_data.into(); + let value: Vec = encode_protobuf(&executor_data)?; + self.config_client.put(key, value).await + } + + pub async fn get_executors_data(&self) -> Result> { + let mut result = vec![]; + + let entries = self + .config_client + .get_from_prefix(&get_executors_data_prefix(&self.namespace)) + .await?; + for (_key, entry) in entries { + let executor_data: protobuf::ExecutorData = decode_protobuf(&entry)?; + result.push(executor_data.into()); + } + Ok(result) + } + + pub async fn get_available_executors_data(&self) -> Result> { + let mut res = self + .get_executors_data() + .await? + .into_iter() + .filter_map(|exec| (exec.available_task_slots > 0).then(|| exec)) + .collect::>(); + res.sort_by(|a, b| Ord::cmp(&b.available_task_slots, &a.available_task_slots)); + Ok(res) + } + + pub async fn get_executor_data(&self, executor_id: &str) -> Result { + let key = get_executor_data_key(&self.namespace, executor_id); + let value = &self.config_client.get(&key).await?; + if value.is_empty() { + return Err(BallistaError::General(format!( + "No executor data found for {}", + key + ))); + } + let value: protobuf::ExecutorData = decode_protobuf(value)?; + Ok(value.into()) + } + pub async fn save_job_metadata( &self, job_id: &str, @@ -233,6 +294,18 @@ impl SchedulerState { Ok((&value).try_into()?) } + pub async fn get_job_tasks( + &self, + job_id: &str, + ) -> Result> { + self.config_client + .get_from_prefix(&get_task_prefix_for_job(&self.namespace, job_id)) + .await? + .into_iter() + .map(|(key, bytes)| Ok((key, decode_protobuf(&bytes)?))) + .collect() + } + pub async fn get_all_tasks(&self) -> Result> { self.config_client .get_from_prefix(&get_task_prefix(&self.namespace)) @@ -281,6 +354,42 @@ impl SchedulerState { executor_id: &str, ) -> Result)>> { let tasks = self.get_all_tasks().await?; + self.assign_next_schedulable_task_inner(executor_id, tasks) + .await + } + + pub async fn assign_next_schedulable_job_task( + &self, + executor_id: &str, + job_id: &str, + ) -> Result)>> { + let job_tasks = self.get_job_tasks(job_id).await?; + self.assign_next_schedulable_task_inner(executor_id, job_tasks) + .await + } + + async fn assign_next_schedulable_task_inner( + &self, + executor_id: &str, + tasks: HashMap, + ) -> Result)>> { + match self.get_next_schedulable_task(tasks).await? { + Some((status, plan)) => { + let mut status = status.clone(); + status.status = Some(task_status::Status::Running(RunningTask { + executor_id: executor_id.to_owned(), + })); + self.save_task_status(&status).await?; + Ok(Some((status, plan))) + } + _ => Ok(None), + } + } + + async fn get_next_schedulable_task( + &self, + tasks: HashMap, + ) -> Result)>> { // TODO: Make the duration a configurable parameter let executors = self .get_alive_executors_metadata(Duration::from_secs(60)) @@ -385,12 +494,7 @@ impl SchedulerState { remove_unresolved_shuffles(plan.as_ref(), &partition_locations)?; // If we get here, there are no more unresolved shuffled and the task can be run - let mut status = status.clone(); - status.status = Some(task_status::Status::Running(RunningTask { - executor_id: executor_id.to_owned(), - })); - self.save_task_status(&status).await?; - return Ok(Some((status, plan))); + return Ok(Some((status.clone(), plan))); } } Ok(None) @@ -583,6 +687,14 @@ fn get_executor_key(namespace: &str, id: &str) -> String { format!("{}/{}", get_executors_prefix(namespace), id) } +fn get_executors_data_prefix(namespace: &str) -> String { + format!("/ballista/{}/resources/executors", namespace) +} + +fn get_executor_data_key(namespace: &str, id: &str) -> String { + format!("{}/{}", get_executors_data_prefix(namespace), id) +} + fn get_job_prefix(namespace: &str) -> String { format!("/ballista/{}/jobs", namespace) } @@ -670,6 +782,7 @@ mod test { id: "123".to_owned(), host: "localhost".to_owned(), port: 123, + grpc_port: 124, }; state.save_executor_metadata(meta.clone()).await?; let result: Vec<_> = state