Skip to content
This repository was archived by the owner on Feb 25, 2025. It is now read-only.

Commit fe0ab34

Browse files
committed
gave ownership of the concurrent task queue to the context
1 parent f339f1e commit fe0ab34

8 files changed

Lines changed: 41 additions & 58 deletions

File tree

fml/concurrent_message_loop.cc

Lines changed: 17 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@ std::unique_ptr<ConcurrentMessageLoop> ConcurrentMessageLoop::Create(
1818
}
1919

2020
ConcurrentMessageLoop::ConcurrentMessageLoop(size_t worker_count)
21-
: worker_count_(std::max<size_t>(worker_count, 1ul)) {
21+
: worker_count_(std::max<size_t>(worker_count, 1ul)),
22+
task_runner_(new ConcurrentTaskRunner(this)) {
2223
for (size_t i = 0; i < worker_count_; ++i) {
2324
workers_.emplace_back([i, this]() {
2425
fml::Thread::SetCurrentThreadName(fml::Thread::ThreadConfig(
@@ -33,6 +34,10 @@ ConcurrentMessageLoop::ConcurrentMessageLoop(size_t worker_count)
3334
}
3435

3536
ConcurrentMessageLoop::~ConcurrentMessageLoop() {
37+
{
38+
std::scoped_lock lock(task_runner_->weak_loop_mutex_);
39+
task_runner_->weak_loop_ = nullptr;
40+
}
3641
Terminate();
3742
for (auto& worker : workers_) {
3843
worker.join();
@@ -43,33 +48,18 @@ size_t ConcurrentMessageLoop::GetWorkerCount() const {
4348
return worker_count_;
4449
}
4550

46-
std::shared_ptr<ConcurrentTaskRunner> ConcurrentMessageLoop::GetTaskRunner() {
47-
return std::make_shared<ConcurrentTaskRunner>(weak_from_this());
48-
}
49-
5051
void ConcurrentMessageLoop::PostTask(const fml::closure& task) {
5152
if (!task) {
5253
return;
5354
}
5455

55-
std::unique_lock lock(tasks_mutex_);
56-
57-
// Don't just drop tasks on the floor in case of shutdown.
58-
if (shutdown_) {
59-
FML_DLOG(WARNING)
60-
<< "Tried to post a task to shutdown concurrent message "
61-
"loop. The task will be executed on the callers thread.";
62-
lock.unlock();
63-
task();
64-
return;
65-
}
66-
67-
tasks_.push(task);
68-
6956
// Unlock the mutex before notifying the condition variable because that mutex
7057
// has to be acquired on the other thread anyway. Waiting in this scope till
7158
// it is acquired there is a pessimization.
72-
lock.unlock();
59+
{
60+
std::unique_lock lock(tasks_mutex_);
61+
tasks_.push(task);
62+
}
7363

7464
tasks_condition_.notify_one();
7565
}
@@ -148,9 +138,8 @@ std::vector<fml::closure> ConcurrentMessageLoop::GetThreadTasksLocked() {
148138
return pending_tasks;
149139
}
150140

151-
ConcurrentTaskRunner::ConcurrentTaskRunner(
152-
std::weak_ptr<ConcurrentMessageLoop> weak_loop)
153-
: weak_loop_(std::move(weak_loop)) {}
141+
ConcurrentTaskRunner::ConcurrentTaskRunner(ConcurrentMessageLoop* weak_loop)
142+
: weak_loop_(weak_loop) {}
154143

155144
ConcurrentTaskRunner::~ConcurrentTaskRunner() = default;
156145

@@ -159,15 +148,12 @@ void ConcurrentTaskRunner::PostTask(const fml::closure& task) {
159148
return;
160149
}
161150

162-
if (auto loop = weak_loop_.lock()) {
163-
loop->PostTask(task);
164-
return;
151+
{
152+
std::scoped_lock lock(weak_loop_mutex_);
153+
if (weak_loop_) {
154+
weak_loop_->PostTask(task);
155+
}
165156
}
166-
167-
FML_DLOG(WARNING)
168-
<< "Tried to post to a concurrent message loop that has already died. "
169-
"Executing the task on the callers thread.";
170-
task();
171157
}
172158

173159
bool ConcurrentMessageLoop::RunsTasksOnCurrentThread() {

fml/concurrent_message_loop.h

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,7 @@ namespace fml {
1818

1919
class ConcurrentTaskRunner;
2020

21-
class ConcurrentMessageLoop
22-
: public std::enable_shared_from_this<ConcurrentMessageLoop> {
21+
class ConcurrentMessageLoop {
2322
public:
2423
static std::unique_ptr<ConcurrentMessageLoop> Create(
2524
size_t worker_count = std::thread::hardware_concurrency());
@@ -28,7 +27,7 @@ class ConcurrentMessageLoop
2827

2928
size_t GetWorkerCount() const;
3029

31-
std::shared_ptr<ConcurrentTaskRunner> GetTaskRunner();
30+
std::shared_ptr<ConcurrentTaskRunner> GetTaskRunner() { return task_runner_; }
3231

3332
void Terminate();
3433

@@ -47,6 +46,7 @@ class ConcurrentMessageLoop
4746
std::vector<std::thread::id> worker_thread_ids_;
4847
std::map<std::thread::id, std::vector<fml::closure>> thread_tasks_;
4948
bool shutdown_ = false;
49+
std::shared_ptr<ConcurrentTaskRunner> task_runner_;
5050

5151
explicit ConcurrentMessageLoop(size_t worker_count);
5252

@@ -63,7 +63,7 @@ class ConcurrentMessageLoop
6363

6464
class ConcurrentTaskRunner : public BasicTaskRunner {
6565
public:
66-
explicit ConcurrentTaskRunner(std::weak_ptr<ConcurrentMessageLoop> weak_loop);
66+
explicit ConcurrentTaskRunner(ConcurrentMessageLoop* weak_loop);
6767

6868
virtual ~ConcurrentTaskRunner();
6969

@@ -72,7 +72,9 @@ class ConcurrentTaskRunner : public BasicTaskRunner {
7272
private:
7373
friend ConcurrentMessageLoop;
7474

75-
std::weak_ptr<ConcurrentMessageLoop> weak_loop_;
75+
// Raw pointer that is cleared out in ~ConcurrentMessageLoop.
76+
ConcurrentMessageLoop* weak_loop_;
77+
std::mutex weak_loop_mutex_;
7678

7779
FML_DISALLOW_COPY_AND_ASSIGN(ConcurrentTaskRunner);
7880
};

impeller/playground/backend/vulkan/playground_impl_vk.cc

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,7 @@ void PlaygroundImplVK::DestroyWindowHandle(WindowHandle handle) {
4949
}
5050

5151
PlaygroundImplVK::PlaygroundImplVK(PlaygroundSwitches switches)
52-
: PlaygroundImpl(switches),
53-
concurrent_loop_(fml::ConcurrentMessageLoop::Create()),
54-
handle_(nullptr, &DestroyWindowHandle) {
52+
: PlaygroundImpl(switches), handle_(nullptr, &DestroyWindowHandle) {
5553
if (!::glfwVulkanSupported()) {
5654
#ifdef TARGET_OS_MAC
5755
VALIDATION_LOG << "Attempted to initialize a Vulkan playground on macOS "
@@ -83,7 +81,8 @@ PlaygroundImplVK::PlaygroundImplVK(PlaygroundSwitches switches)
8381
&::glfwGetInstanceProcAddress);
8482
context_settings.shader_libraries_data = ShaderLibraryMappingsForPlayground();
8583
context_settings.cache_directory = fml::paths::GetCachesDirectory();
86-
context_settings.worker_task_runner = concurrent_loop_->GetTaskRunner();
84+
context_settings.worker_concurrent_loop =
85+
fml::ConcurrentMessageLoop::Create();
8786
context_settings.enable_validation = switches_.enable_vulkan_validation;
8887

8988
auto context = ContextVK::Create(std::move(context_settings));
@@ -115,11 +114,7 @@ PlaygroundImplVK::PlaygroundImplVK(PlaygroundSwitches switches)
115114
context_ = std::move(context);
116115
}
117116

118-
PlaygroundImplVK::~PlaygroundImplVK() {
119-
// Make sure to kill the concurrent loop before the context so that we don't
120-
// have threads talking to a dead context.
121-
concurrent_loop_.reset();
122-
};
117+
PlaygroundImplVK::~PlaygroundImplVK() = default;
123118

124119
// |PlaygroundImpl|
125120
std::shared_ptr<Context> PlaygroundImplVK::GetContext() const {

impeller/renderer/backend/vulkan/context_vk.cc

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -305,11 +305,12 @@ void ContextVK::Setup(Settings settings) {
305305
//----------------------------------------------------------------------------
306306
/// Setup the pipeline library.
307307
///
308+
worker_message_loop_ = std::move(settings.worker_concurrent_loop);
308309
auto pipeline_library = std::shared_ptr<PipelineLibraryVK>(
309-
new PipelineLibraryVK(device.value.get(), //
310-
caps, //
311-
std::move(settings.cache_directory), //
312-
settings.worker_task_runner //
310+
new PipelineLibraryVK(device.value.get(), //
311+
caps, //
312+
std::move(settings.cache_directory), //
313+
worker_message_loop_->GetTaskRunner() //
313314
));
314315

315316
if (!pipeline_library->IsValid()) {

impeller/renderer/backend/vulkan/context_vk.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ class ContextVK final : public Context, public BackendCast<ContextVK, Context> {
3636
PFN_vkGetInstanceProcAddr proc_address_callback = nullptr;
3737
std::vector<std::shared_ptr<fml::Mapping>> shader_libraries_data;
3838
fml::UniqueFD cache_directory;
39-
std::shared_ptr<fml::ConcurrentTaskRunner> worker_task_runner;
39+
std::unique_ptr<fml::ConcurrentMessageLoop> worker_concurrent_loop;
4040
bool enable_validation = false;
4141

4242
Settings() = default;
@@ -137,6 +137,7 @@ class ContextVK final : public Context, public BackendCast<ContextVK, Context> {
137137
std::shared_ptr<FenceWaiterVK> fence_waiter_;
138138
std::string device_name_;
139139
const uint64_t hash_;
140+
std::unique_ptr<fml::ConcurrentMessageLoop> worker_message_loop_;
140141

141142
bool is_valid_ = false;
142143

impeller/renderer/backend/vulkan/test/mock_vulkan.cc

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -272,8 +272,7 @@ PFN_vkVoidFunction GetMockVulkanProcAddress(VkInstance instance,
272272

273273
std::shared_ptr<ContextVK> CreateMockVulkanContext(void) {
274274
ContextVK::Settings settings;
275-
auto message_loop = fml::ConcurrentMessageLoop::Create();
276-
settings.worker_task_runner = message_loop->GetTaskRunner();
275+
settings.worker_concurrent_loop = fml::ConcurrentMessageLoop::Create();
277276
settings.proc_address_callback = GetMockVulkanProcAddress;
278277
return ContextVK::Create(std::move(settings));
279278
}

shell/platform/android/android_surface_vulkan_impeller.cc

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ namespace flutter {
2222

2323
static std::shared_ptr<impeller::Context> CreateImpellerContext(
2424
const fml::RefPtr<vulkan::VulkanProcTable>& proc_table,
25-
const std::shared_ptr<fml::ConcurrentMessageLoop>& concurrent_loop,
25+
std::unique_ptr<fml::ConcurrentMessageLoop> concurrent_loop,
2626
bool enable_vulkan_validation) {
2727
std::vector<std::shared_ptr<fml::Mapping>> shader_mappings = {
2828
std::make_shared<fml::NonOwnedMapping>(impeller_entity_shaders_vk_data,
@@ -40,7 +40,7 @@ static std::shared_ptr<impeller::Context> CreateImpellerContext(
4040
settings.proc_address_callback = instance_proc_addr;
4141
settings.shader_libraries_data = std::move(shader_mappings);
4242
settings.cache_directory = fml::paths::GetCachesDirectory();
43-
settings.worker_task_runner = concurrent_loop->GetTaskRunner();
43+
settings.worker_concurrent_loop = std::move(concurrent_loop);
4444
settings.enable_validation = enable_vulkan_validation;
4545
return impeller::ContextVK::Create(std::move(settings));
4646
}
@@ -50,10 +50,10 @@ AndroidSurfaceVulkanImpeller::AndroidSurfaceVulkanImpeller(
5050
const std::shared_ptr<PlatformViewAndroidJNI>& jni_facade,
5151
bool enable_vulkan_validation)
5252
: AndroidSurface(android_context),
53-
proc_table_(fml::MakeRefCounted<vulkan::VulkanProcTable>()),
54-
workers_(fml::ConcurrentMessageLoop::Create()) {
53+
proc_table_(fml::MakeRefCounted<vulkan::VulkanProcTable>()) {
5554
impeller_context_ =
56-
CreateImpellerContext(proc_table_, workers_, enable_vulkan_validation);
55+
CreateImpellerContext(proc_table_, fml::ConcurrentMessageLoop::Create(),
56+
enable_vulkan_validation);
5757
is_valid_ =
5858
proc_table_->HasAcquiredMandatoryProcAddresses() && impeller_context_;
5959
}

shell/platform/android/android_surface_vulkan_impeller.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@ class AndroidSurfaceVulkanImpeller : public AndroidSurface {
5050
private:
5151
fml::RefPtr<vulkan::VulkanProcTable> proc_table_;
5252
fml::RefPtr<AndroidNativeWindow> native_window_;
53-
std::shared_ptr<fml::ConcurrentMessageLoop> workers_;
5453
std::shared_ptr<impeller::Context> impeller_context_;
5554
bool is_valid_ = false;
5655

0 commit comments

Comments
 (0)