Skip to content

Commit be52dbc

Browse files
committed
ARROW-18383: [C++] Avoid global variables for thread pools and at-fork handlers
Initialization order of module globals is undefined. In a particular case, the IO thread pool would first be instantiated at library load, registering an at-fork handler. Then, only after, the at-fork handlers would be initialized, losing the handler registered just before.
1 parent 21309ea commit be52dbc

3 files changed

Lines changed: 97 additions & 78 deletions

File tree

cpp/src/arrow/io/interfaces.cc

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,12 +50,14 @@ using internal::ThreadPool;
5050

5151
namespace io {
5252

53-
static IOContext g_default_io_context{};
54-
5553
IOContext::IOContext(MemoryPool* pool, StopToken stop_token)
5654
: IOContext(pool, internal::GetIOThreadPool(), std::move(stop_token)) {}
5755

58-
const IOContext& default_io_context() { return g_default_io_context; }
56+
const IOContext& default_io_context() {
57+
// Avoid using a global variable because of initialization order issues (ARROW-18383)
58+
static IOContext g_default_io_context{};
59+
return g_default_io_context;
60+
}
5961

6062
int GetIOThreadPoolCapacity() { return internal::GetIOThreadPool()->GetCapacity(); }
6163

@@ -103,7 +105,7 @@ class InputStreamBlockIterator {
103105

104106
} // namespace
105107

106-
const IOContext& Readable::io_context() const { return g_default_io_context; }
108+
const IOContext& Readable::io_context() const { return default_io_context(); }
107109

108110
Status InputStream::Advance(int64_t nbytes) { return Read(nbytes).status(); }
109111

cpp/src/arrow/util/atfork_internal.cc

Lines changed: 90 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -34,104 +34,120 @@ namespace internal {
3434

3535
namespace {
3636

37-
struct RunningHandler {
38-
// A temporary owning copy of a handler, to make sure that a handler
39-
// that runs before fork can still run after fork.
40-
std::shared_ptr<AtForkHandler> handler;
41-
// The token returned by the before-fork handler, to pass to after-fork handlers.
42-
std::any token;
43-
44-
explicit RunningHandler(std::shared_ptr<AtForkHandler> handler)
45-
: handler(std::move(handler)) {}
46-
};
37+
// Singleton state for at-fork management.
38+
// We do not use global variables because of initialization order issues (ARROW-18383).
39+
// Instead, a function-local static ensures the state is initialized
40+
// opportunistically (see GetAtForkState()).
41+
struct AtForkState {
42+
struct RunningHandler {
43+
// A temporary owning copy of a handler, to make sure that a handler
44+
// that runs before fork can still run after fork.
45+
std::shared_ptr<AtForkHandler> handler;
46+
// The token returned by the before-fork handler, to pass to after-fork handlers.
47+
std::any token;
48+
49+
explicit RunningHandler(std::shared_ptr<AtForkHandler> handler)
50+
: handler(std::move(handler)) {}
51+
};
52+
53+
void MaintainHandlersUnlocked() {
54+
auto it = std::remove_if(
55+
handlers_.begin(), handlers_.end(),
56+
[](const std::weak_ptr<AtForkHandler>& ptr) { return ptr.expired(); });
57+
handlers_.erase(it, handlers_.end());
58+
}
4759

48-
std::mutex g_mutex;
49-
std::vector<std::weak_ptr<AtForkHandler>> g_handlers;
50-
std::vector<RunningHandler> g_handlers_while_forking;
60+
void BeforeFork() {
61+
// Lock the mutex and keep it locked until the end of AfterForkParent(),
62+
// to avoid multiple concurrent forks and atforks.
63+
mutex_.lock();
5164

52-
void MaintainHandlersUnlocked() {
53-
auto it = std::remove_if(
54-
g_handlers.begin(), g_handlers.end(),
55-
[](const std::weak_ptr<AtForkHandler>& ptr) { return ptr.expired(); });
56-
g_handlers.erase(it, g_handlers.end());
57-
}
65+
DCHECK(handlers_while_forking_.empty()); // AfterForkParent clears it
5866

59-
void BeforeFork() {
60-
// Lock the mutex and keep it locked until the end of AfterForkParent(),
61-
// to avoid multiple concurrent forks and atforks.
62-
g_mutex.lock();
63-
64-
DCHECK(g_handlers_while_forking.empty()); // AfterForkParent clears it
67+
for (const auto& weak_handler : handlers_) {
68+
if (auto handler = weak_handler.lock()) {
69+
handlers_while_forking_.emplace_back(std::move(handler));
70+
}
71+
}
6572

66-
for (const auto& weak_handler : g_handlers) {
67-
if (auto handler = weak_handler.lock()) {
68-
g_handlers_while_forking.emplace_back(std::move(handler));
73+
// XXX can the handler call RegisterAtFork()?
74+
for (auto&& handler : handlers_while_forking_) {
75+
if (handler.handler->before) {
76+
handler.token = handler.handler->before();
77+
}
6978
}
7079
}
7180

72-
// XXX can the handler call RegisterAtFork()?
73-
for (auto&& handler : g_handlers_while_forking) {
74-
if (handler.handler->before) {
75-
handler.token = handler.handler->before();
81+
void AfterForkParent() {
82+
// The mutex was locked by BeforeFork()
83+
auto handlers = std::move(handlers_while_forking_);
84+
handlers_while_forking_.clear();
85+
86+
// Execute handlers in reverse order
87+
for (auto it = handlers.rbegin(); it != handlers.rend(); ++it) {
88+
auto&& handler = *it;
89+
if (handler.handler->parent_after) {
90+
handler.handler->parent_after(std::move(handler.token));
91+
}
7692
}
77-
}
78-
}
7993

80-
void AfterForkParent() {
81-
// The mutex was locked by BeforeFork()
94+
mutex_.unlock();
95+
// handlers will be destroyed here without the mutex locked, so that
96+
// any action taken by destructors might call RegisterAtFork
97+
}
8298

83-
auto handlers = std::move(g_handlers_while_forking);
84-
g_handlers_while_forking.clear();
85-
// Execute handlers in reverse order
86-
for (auto it = handlers.rbegin(); it != handlers.rend(); ++it) {
87-
auto&& handler = *it;
88-
if (handler.handler->parent_after) {
89-
handler.handler->parent_after(std::move(handler.token));
99+
void AfterForkChild() {
100+
// Need to reinitialize the mutex as it is probably invalid. Also, the
101+
// old mutex destructor may fail.
102+
// Fortunately, we are a single thread in the child process by now, so no
103+
// additional synchronization is needed.
104+
new (&mutex_) std::mutex;
105+
106+
auto handlers = std::move(handlers_while_forking_);
107+
handlers_while_forking_.clear();
108+
109+
// Execute handlers in reverse order
110+
for (auto it = handlers.rbegin(); it != handlers.rend(); ++it) {
111+
auto&& handler = *it;
112+
if (handler.handler->child_after) {
113+
handler.handler->child_after(std::move(handler.token));
114+
}
90115
}
91116
}
92117

93-
g_mutex.unlock();
94-
// handlers will be destroyed here without the mutex locked, so that
95-
// any action taken by destructors might call RegisterAtFork
96-
}
97-
98-
void AfterForkChild() {
99-
// Need to reinitialize the mutex as it is probably invalid. Also, the
100-
// old mutex destructor may fail.
101-
// Fortunately, we are a single thread in the child process by now, so no
102-
// additional synchronization is needed.
103-
new (&g_mutex) std::mutex;
104-
105-
auto handlers = std::move(g_handlers_while_forking);
106-
g_handlers_while_forking.clear();
107-
// Execute handlers in reverse order
108-
for (auto it = handlers.rbegin(); it != handlers.rend(); ++it) {
109-
auto&& handler = *it;
110-
if (handler.handler->child_after) {
111-
handler.handler->child_after(std::move(handler.token));
112-
}
118+
void RegisterAtFork(std::weak_ptr<AtForkHandler> weak_handler) {
119+
std::lock_guard<std::mutex> lock(mutex_);
120+
// This is O(n) for each at-fork registration. We assume that n remains
121+
// typically low and calls to this function are not performance-critical.
122+
MaintainHandlersUnlocked();
123+
handlers_.push_back(std::move(weak_handler));
113124
}
114-
}
115125

116-
struct AtForkInitializer {
117-
AtForkInitializer() {
126+
std::mutex mutex_;
127+
std::vector<std::weak_ptr<AtForkHandler>> handlers_;
128+
std::vector<RunningHandler> handlers_while_forking_;
129+
};
130+
131+
AtForkState* GetAtForkState() {
132+
static std::unique_ptr<AtForkState> state = []() {
133+
auto state = std::make_unique<AtForkState>();
118134
#ifndef _WIN32
119-
int r = pthread_atfork(&BeforeFork, &AfterForkParent, &AfterForkChild);
135+
int r = pthread_atfork(/*prepare=*/[] { GetAtForkState()->BeforeFork(); },
136+
/*parent=*/[] { GetAtForkState()->AfterForkParent(); },
137+
/*child=*/[] { GetAtForkState()->AfterForkChild(); });
120138
if (r != 0) {
121139
IOErrorFromErrno(r, "Error when calling pthread_atfork: ").Abort();
122140
}
123141
#endif
124-
}
125-
};
142+
return state;
143+
}();
144+
return state.get();
145+
}
126146

127147
}; // namespace
128148

129149
void RegisterAtFork(std::weak_ptr<AtForkHandler> weak_handler) {
130-
static AtForkInitializer initializer;
131-
132-
std::lock_guard<std::mutex> lock(g_mutex);
133-
MaintainHandlersUnlocked();
134-
g_handlers.push_back(std::move(weak_handler));
150+
GetAtForkState()->RegisterAtFork(std::move(weak_handler));
135151
}
136152

137153
} // namespace internal

cpp/src/arrow/util/thread_pool.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -535,6 +535,7 @@ std::shared_ptr<ThreadPool> ThreadPool::MakeCpuThreadPool() {
535535
}
536536

537537
ThreadPool* GetCpuThreadPool() {
538+
// Avoid using a global variable because of initialization order issues (ARROW-18383)
538539
static std::shared_ptr<ThreadPool> singleton = ThreadPool::MakeCpuThreadPool();
539540
return singleton.get();
540541
}

0 commit comments

Comments
 (0)