Skip to content

Commit 274d09b

Browse files
committed
Move IPC threading decision from receiver to sender side (picking channel implies intended threading)
1 parent 9efafc0 commit 274d09b

File tree

7 files changed

+26
-97
lines changed

7 files changed

+26
-97
lines changed

IPC/ARAIPCAudioUnit_v3.mm

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -222,28 +222,13 @@ void sendMessage (MessageID messageID, MessageEncoder * encoder) override
222222
~AUProxyPlugIn () override
223223
{
224224
#if !__has_feature(objc_arc)
225-
dispatch_release (_readAudioQueue);
226225
[_initAU release];
227226
#endif
228227
}
229228

230-
protected:
231-
DispatchTarget getDispatchTargetForIncomingTransaction (MessageID messageID) override
232-
{
233-
// AUMessageChannel cannot be called back from the same thread it receives the message,
234-
// so we dispatch audio requests to a dedicated read samples queue and let the inherited
235-
// dispatch all other calls to the main thread.
236-
if (messageID == ARA_IPC_HOST_METHOD_ID (ARAAudioAccessControllerInterface, readAudioSamples))
237-
return _readAudioQueue;
238-
return ProxyPlugIn::getDispatchTargetForIncomingTransaction (messageID);
239-
}
240-
241229
private:
242230
AUProxyPlugIn (NSObject<AUMessageChannel> * _Nonnull mainChannel, NSObject<AUMessageChannel> * _Nonnull otherChannel, AUAudioUnit * _Nonnull initAU)
243231
: ProxyPlugIn { this },
244-
// \todo maybe we should make this configurable, so hosts can set this queue if it already has an appropriate one?
245-
// \todo there's also QOS_CLASS_USER_INTERACTIVE which seems more appropriate but is undocumented...
246-
_readAudioQueue { dispatch_queue_create ("ARA read audio samples", dispatch_queue_attr_make_with_qos_class(DISPATCH_QUEUE_SERIAL, QOS_CLASS_USER_INITIATED, -1)) },
247232
_initAU { initAU }
248233
{
249234
setMainThreadChannel (new ProxyPlugInMessageChannel { mainChannel });
@@ -255,7 +240,6 @@ DispatchTarget getDispatchTargetForIncomingTransaction (MessageID messageID) ove
255240
}
256241

257242
private:
258-
const dispatch_queue_t _readAudioQueue;
259243
const AUAudioUnit * __strong _initAU; // workaround for macOS 14: keep the AU that vends the message channels alive, otherwise the channels will eventually stop working
260244
// \todo once this is fixed in macOS, we only need to store this on older macOS versions
261245
};

IPC/ARAIPCConnection.cpp

Lines changed: 15 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -80,22 +80,20 @@
8080
//
8181
// Another challenge when injecting IPC into the ARA communication is that for each
8282
// message that comes in, an appropriate thread has to be selected to process it.
83-
// For the main thread channel, this is trivial.
83+
// When a new transaction is initiated, the implementation checks whether this is happening
84+
// on the main thread or on any other thread, and chooses the appropriate channel accordingly.
85+
// On the receiving side, calls coming in on the main thread channel are forwarded to the main
86+
// thread (unless the receive code already runs there), and for the other threads the code is
87+
// executed directly on the receive thread.
8488
//
85-
// For the channel that handles all communication on other threads, the implementation
86-
// adds a token to each call message that identifies the sending thread. The reply message
87-
// or any stacked call the receiver might make in response to the message will pass back
88-
// this thread token, enabling proper routing back to the original sending thread.
89+
// Replies or callbacks are routed back to the originating thread in the sender. This is done
90+
// by adding a token when sending a message that identifies the sending thread, and replies and
91+
// callbacks pass this token back to allow for proper dispatching from the receive thread to the
92+
// thread that initiated the transaction.
8993
// Since the actual ARA code is agnostic to IPC being used, the receiving side uses
9094
// thread local storage to make the sender's thread token available for all stacked calls
9195
// that the ARA code might make in response to the message.
9296
//
93-
// If a message is received that does not contain the token, this message indicates
94-
// the start of a transaction that was initiated on the other side. For this new
95-
// transaction, a proper thread has to be selected. This is done based on the message ID -
96-
// the current implementation dispatches audio reading to a dedicated dispatch queue and
97-
// handles the remaining calls directly on the IPC receive thread.
98-
//
9997
// Note that there is a crucial difference between the dispatch of a new transaction and
10098
// the dispatch of any follow-up messages in the transaction: the initial message is dispatched
10199
// to a thread that is potentially executing other code as well in some form of run loop,
@@ -298,15 +296,16 @@ void MessageDispatcher::routeReceivedMessage (MessageID messageID, const Message
298296
else
299297
{
300298
ARA_INTERNAL_ASSERT (messageID != 0);
301-
if (const auto dispatchTarget { _connection->getMessageHandler ()->getDispatchTargetForIncomingTransaction (messageID) })
299+
if (!_sendLock &&
300+
!_connection->wasCreatedOnCurrentThread ())
302301
{
303302
ARA_IPC_LOG ("dispatches received message with ID %i (new transaction)", messageID);
304303
#if defined (_WIN32)
305304
auto params { new APCProcessReceivedMessageParams { this, messageID, decoder } };
306-
const auto result { ::QueueUserAPC (APCRouteNewTransactionFunc, dispatchTarget, reinterpret_cast<ULONG_PTR> (params)) };
305+
const auto result { ::QueueUserAPC (APCRouteNewTransactionFunc, _connection->getCreationThreadDispatchTarget (), reinterpret_cast<ULONG_PTR> (params)) };
307306
ARA_INTERNAL_ASSERT (result != 0);
308307
#elif defined (__APPLE__)
309-
dispatch_async (dispatchTarget,
308+
dispatch_async (_connection->getCreationThreadDispatchTarget (),
310309
^{
311310
_handleReceivedMessage (messageID, decoder);
312311
});
@@ -381,7 +380,7 @@ HANDLE _GetRealCurrentThread ()
381380
#endif
382381

383382

384-
MessageHandler::MessageHandler ()
383+
Connection::Connection ()
385384
: _creationThreadID { std::this_thread::get_id () },
386385
#if defined (_WIN32)
387386
_creationThreadDispatchTarget { _GetRealCurrentThread () }
@@ -399,16 +398,6 @@ MessageHandler::MessageHandler ()
399398
#endif
400399
}
401400

402-
MessageHandler::DispatchTarget MessageHandler::getDispatchTargetForIncomingTransaction (MessageID /*messageID*/)
403-
{
404-
return (std::this_thread::get_id () == _creationThreadID) ? nullptr : _creationThreadDispatchTarget;
405-
}
406-
407-
408-
Connection::Connection ()
409-
: _creationThreadID { std::this_thread::get_id () }
410-
{}
411-
412401
Connection::~Connection ()
413402
{
414403
delete _otherDispatcher;
@@ -436,7 +425,7 @@ void Connection::setMessageHandler (MessageHandler* messageHandler)
436425
void Connection::sendMessage (MessageID messageID, MessageEncoder* encoder, ReplyHandler replyHandler, void* replyHandlerUserData)
437426
{
438427
ARA_INTERNAL_ASSERT ((_mainDispatcher != nullptr) && (_otherDispatcher != nullptr) && (_messageHandler != nullptr));
439-
if (std::this_thread::get_id () == _creationThreadID)
428+
if (wasCreatedOnCurrentThread ())
440429
_mainDispatcher->sendMessage (messageID, encoder, replyHandler, replyHandlerUserData);
441430
else
442431
_otherDispatcher->sendMessage (messageID, encoder, replyHandler, replyHandlerUserData);

IPC/ARAIPCConnection.h

Lines changed: 11 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -51,35 +51,12 @@ class MessageDispatcher;
5151
class MessageHandler
5252
{
5353
public:
54-
MessageHandler ();
5554
virtual ~MessageHandler () = default;
5655

57-
//! type returned from getDispatchTargetForIncomingTransaction()
58-
#if defined (_WIN32)
59-
using DispatchTarget = HANDLE;
60-
friend void APCProcessReceivedMessageFunc (ULONG_PTR parameter);
61-
#elif defined (__APPLE__)
62-
using DispatchTarget = dispatch_queue_t;
63-
#else
64-
#error "not yet implemented on this platform"
65-
#endif
66-
67-
//! IPC connections will call this method to determine which thread should be
68-
//! used for handling an incoming transaction. Returning nullptr results in the
69-
//! current thread being used, otherwise the call will be forwarded to the
70-
//! returned target thread.
71-
//! The default implementation dispatches to the thread where the MessageHandler
72-
//! was created, which can be utilized by overrides of this method.
73-
virtual DispatchTarget getDispatchTargetForIncomingTransaction (MessageID messageID);
74-
7556
//! IPC connections will call this method for incoming messages after
7657
//! after filtering replies and routing them to the correct thread.
7758
virtual void handleReceivedMessage (const MessageID messageID, const MessageDecoder* const decoder,
7859
MessageEncoder* const replyEncoder) = 0;
79-
80-
private:
81-
std::thread::id const _creationThreadID;
82-
DispatchTarget const _creationThreadDispatchTarget;
8360
};
8461

8562

@@ -132,15 +109,25 @@ class Connection
132109

133110
MessageHandler* getMessageHandler () { return _messageHandler; }
134111

135-
#if ARA_ENABLE_INTERNAL_ASSERTS
136112
bool wasCreatedOnCurrentThread () const { return std::this_thread::get_id () == _creationThreadID; }
113+
114+
//! type returned from getDispatchTargetForIncomingTransaction()
115+
#if defined (_WIN32)
116+
using DispatchTarget = HANDLE;
117+
friend void APCProcessReceivedMessageFunc (ULONG_PTR parameter);
118+
#elif defined (__APPLE__)
119+
using DispatchTarget = dispatch_queue_t;
120+
#else
121+
#error "not yet implemented on this platform"
137122
#endif
123+
DispatchTarget getCreationThreadDispatchTarget () { return _creationThreadDispatchTarget; }
138124

139125
private:
140126
MessageDispatcher* _mainDispatcher {};
141127
MessageDispatcher* _otherDispatcher {};
142128
MessageHandler* _messageHandler {};
143129
std::thread::id const _creationThreadID;
130+
DispatchTarget const _creationThreadDispatchTarget;
144131
};
145132
//! @}
146133

IPC/ARAIPCProxyHost.cpp

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -661,16 +661,6 @@ ProxyHost::ProxyHost (Connection* connection)
661661
: RemoteCaller (connection)
662662
{}
663663

664-
ProxyHost::DispatchTarget ProxyHost::getDispatchTargetForIncomingTransaction (MessageID messageID)
665-
{
666-
// getPlaybackRegionHeadAndTailTime() is valid on any thread, so we can directly serve it from
667-
// the current IPC thread. For all other calls, we call the inherited default implementation to
668-
// dispatch to the creation thread.
669-
if (messageID == ARA_IPC_PLUGIN_METHOD_ID (ARADocumentControllerInterface, getPlaybackRegionHeadAndTailTime))
670-
return nullptr;
671-
return MessageHandler::getDispatchTargetForIncomingTransaction (messageID);
672-
}
673-
674664
void ProxyHost::handleReceivedMessage (const MessageID messageID, const MessageDecoder* const decoder,
675665
MessageEncoder* const replyEncoder)
676666
{

IPC/ARAIPCProxyHost.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,6 @@ class ProxyHost : public MessageHandler, public RemoteCaller
4545
explicit ProxyHost (Connection* connection);
4646

4747
public:
48-
DispatchTarget getDispatchTargetForIncomingTransaction (MessageID messageID) override;
49-
5048
void handleReceivedMessage (const MessageID messageID, const MessageDecoder* const decoder,
5149
MessageEncoder* const replyEncoder) override;
5250
};

IPC/ARAIPCProxyPlugIn.cpp

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1431,23 +1431,6 @@ void ARAIPCProxyPlugInUninitializeARA (ARAIPCConnectionRef connectionRef, const
14311431

14321432
/*******************************************************************************/
14331433

1434-
ProxyPlugIn::DispatchTarget ProxyPlugIn::getDispatchTargetForIncomingTransaction (MessageID messageID)
1435-
{
1436-
// The archiving controller interface must never be called without the host first starting
1437-
// the transaction via some (un)archiving call.
1438-
ARA_INTERNAL_ASSERT (!MethodID::isMessageToHostInterface<ARAArchivingControllerInterface> (messageID));
1439-
1440-
// The model update controller interface must never be called without the host first starting
1441-
// the transaction via notifyModelUpdates().
1442-
ARA_INTERNAL_ASSERT (!MethodID::isMessageToHostInterface<ARAModelUpdateControllerInterface> (messageID));
1443-
1444-
// readAudioSamples() is valid on any thread, so we can directly serve it from the current IPC thread.
1445-
// For all other calls, we call the inherited default implementation to dispatch to the creation thread.
1446-
if (messageID == ARA_IPC_HOST_METHOD_ID (ARAAudioAccessControllerInterface, readAudioSamples))
1447-
return nullptr;
1448-
return MessageHandler::getDispatchTargetForIncomingTransaction (messageID);
1449-
}
1450-
14511434
void ProxyPlugIn::handleReceivedMessage (const MessageID messageID, const MessageDecoder* const decoder,
14521435
MessageEncoder* const replyEncoder)
14531436
{

IPC/ARAIPCProxyPlugIn.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,6 @@ class ProxyPlugIn : public MessageHandler, public RemoteCaller
4545
public:
4646
using RemoteCaller::RemoteCaller;
4747

48-
DispatchTarget getDispatchTargetForIncomingTransaction (MessageID messageID) override;
49-
5048
void handleReceivedMessage (const MessageID messageID, const MessageDecoder* const decoder,
5149
MessageEncoder* const replyEncoder) override;
5250
};

0 commit comments

Comments
 (0)