Skip to content

Commit 43f6941

Browse files
committed
threading cleanup
1 parent 1f15489 commit 43f6941

31 files changed

Lines changed: 318 additions & 251 deletions

File tree

src/libraries/System.Runtime.InteropServices.JavaScript/src/System/Runtime/InteropServices/JavaScript/JSSynchronizationContext.cs

Lines changed: 61 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -11,22 +11,26 @@
1111
using System.Runtime.CompilerServices;
1212
using QueueType = System.Threading.Channels.Channel<System.Runtime.InteropServices.JavaScript.JSSynchronizationContext.WorkItem>;
1313

14-
namespace System.Runtime.InteropServices.JavaScript {
14+
namespace System.Runtime.InteropServices.JavaScript
15+
{
1516
/// <summary>
1617
/// Provides a thread-safe default SynchronizationContext for the browser that will automatically
1718
/// route callbacks to the main browser thread where they can interact with the DOM and other
1819
/// thread-affinity-having APIs like WebSockets, fetch, WebGL, etc.
1920
/// Callbacks are processed during event loop turns via the runtime's background job system.
2021
/// </summary>
21-
internal sealed unsafe class JSSynchronizationContext : SynchronizationContext {
22+
internal sealed unsafe class JSSynchronizationContext : SynchronizationContext
23+
{
2224
public readonly Thread MainThread;
2325

24-
internal readonly struct WorkItem {
26+
internal readonly struct WorkItem
27+
{
2528
public readonly SendOrPostCallback Callback;
2629
public readonly object? Data;
2730
public readonly ManualResetEventSlim? Signal;
2831

29-
public WorkItem (SendOrPostCallback callback, object? data, ManualResetEventSlim? signal) {
32+
public WorkItem(SendOrPostCallback callback, object? data, ManualResetEventSlim? signal)
33+
{
3034
Callback = callback;
3135
Data = data;
3236
Signal = signal;
@@ -35,31 +39,33 @@ public WorkItem (SendOrPostCallback callback, object? data, ManualResetEventSlim
3539

3640
private static JSSynchronizationContext? MainThreadSynchronizationContext;
3741
private readonly QueueType Queue;
38-
private readonly Action _DataIsAvailable;
3942

40-
private JSSynchronizationContext (Thread mainThread)
41-
: this (
42-
mainThread,
43+
private JSSynchronizationContext()
44+
: this(
45+
Thread.CurrentThread,
4346
Channel.CreateUnbounded<WorkItem>(
4447
new UnboundedChannelOptions { SingleWriter = false, SingleReader = true, AllowSynchronousContinuations = true }
4548
)
4649
)
4750
{
4851
}
4952

50-
private JSSynchronizationContext (Thread mainThread, QueueType queue) {
53+
private JSSynchronizationContext(Thread mainThread, QueueType queue)
54+
{
5155
MainThread = mainThread;
5256
Queue = queue;
53-
_DataIsAvailable = DataIsAvailable;
5457
}
5558

56-
public override SynchronizationContext CreateCopy () {
59+
public override SynchronizationContext CreateCopy()
60+
{
5761
return new JSSynchronizationContext(MainThread, Queue);
5862
}
5963

60-
private void AwaitNewData () {
64+
private void AwaitNewData()
65+
{
6166
var vt = Queue.Reader.WaitToReadAsync();
62-
if (vt.IsCompleted) {
67+
if (vt.IsCompleted)
68+
{
6369
DataIsAvailable();
6470
return;
6571
}
@@ -69,31 +75,34 @@ private void AwaitNewData () {
6975
// fire a callback that will schedule a background job to pump the queue on the main thread.
7076
var awaiter = vt.AsTask().ConfigureAwait(false).GetAwaiter();
7177
// UnsafeOnCompleted avoids spending time flowing the execution context (we don't need it.)
72-
awaiter.UnsafeOnCompleted(_DataIsAvailable);
78+
awaiter.UnsafeOnCompleted(DataIsAvailable);
7379
}
7480

75-
private void DataIsAvailable () {
76-
// While we COULD pump here, we don't want to. We want the pump to happen on the next event loop turn.
77-
// Otherwise we could get a chain where a pump generates a new work item and that makes us pump again, forever.
78-
ScheduleBackgroundJob((void*)(delegate* unmanaged[Cdecl]<void>)&BackgroundJobHandler);
81+
private void DataIsAvailable()
82+
{
83+
MainThreadScheduleBackgroundJob((void*)(delegate* unmanaged[Cdecl]<void>)&BackgroundJobHandler);
7984
}
8085

81-
public override void Post (SendOrPostCallback d, object? state) {
86+
public override void Post(SendOrPostCallback d, object? state)
87+
{
8288
var workItem = new WorkItem(d, state, null);
8389
if (!Queue.Writer.TryWrite(workItem))
8490
throw new Exception("Internal error");
8591
}
8692

8793
// This path can only run when threading is enabled
88-
#pragma warning disable CA1416
94+
#pragma warning disable CA1416
8995

90-
public override void Send (SendOrPostCallback d, object? state) {
91-
if (Thread.CurrentThread == MainThread) {
96+
public override void Send(SendOrPostCallback d, object? state)
97+
{
98+
if (Thread.CurrentThread == MainThread)
99+
{
92100
d(state);
93101
return;
94102
}
95103

96-
using (var signal = new ManualResetEventSlim(false)) {
104+
using (var signal = new ManualResetEventSlim(false))
105+
{
97106
var workItem = new WorkItem(d, state, signal);
98107
if (!Queue.Writer.TryWrite(workItem))
99108
throw new Exception("Internal error");
@@ -102,40 +111,51 @@ public override void Send (SendOrPostCallback d, object? state) {
102111
}
103112
}
104113

105-
internal static void Install () {
106-
MainThreadSynchronizationContext ??= new JSSynchronizationContext(Thread.CurrentThread);
107-
108-
SynchronizationContext.SetSynchronizationContext(MainThreadSynchronizationContext);
109-
MainThreadSynchronizationContext.AwaitNewData();
114+
internal static void Install()
115+
{
116+
var ctx = Current as JSSynchronizationContext;
117+
if (ctx == null)
118+
{
119+
ctx = new JSSynchronizationContext();
120+
MainThreadSynchronizationContext = ctx;
121+
SetSynchronizationContext(ctx);
122+
}
123+
ctx.AwaitNewData();
110124
}
111125

112126
[MethodImplAttribute(MethodImplOptions.InternalCall)]
113-
internal static extern unsafe void ScheduleBackgroundJob(void* callback);
127+
internal static extern unsafe void MainThreadScheduleBackgroundJob(void* callback);
114128

115129
#pragma warning disable CS3016 // Arrays as attribute arguments is not CLS-compliant
116130
[UnmanagedCallersOnly(CallConvs = new[] { typeof(CallConvCdecl) })]
117-
private static unsafe void BackgroundJobHandler () {
131+
#pragma warning restore CS3016
132+
// this callback will arrive on the bound thread
133+
private static unsafe void BackgroundJobHandler()
134+
{
118135
MainThreadSynchronizationContext!.Pump();
119136
}
120137

121-
[UnmanagedCallersOnly(CallConvs = new[] { typeof(CallConvCdecl) })]
122-
private static unsafe void RequestPumpCallback () {
123-
ScheduleBackgroundJob((void*)(delegate* unmanaged[Cdecl]<void>)&BackgroundJobHandler);
124-
}
125-
126-
private void Pump () {
127-
try {
128-
while (Queue.Reader.TryRead(out var item)) {
129-
try {
138+
private void Pump()
139+
{
140+
try
141+
{
142+
while (Queue.Reader.TryRead(out var item))
143+
{
144+
try
145+
{
130146
item.Callback(item.Data);
131147
// While we would ideally have a catch block here and do something to dispatch/forward unhandled
132148
// exceptions, the standard threadpool (and thus standard synchronizationcontext) have zero
133149
// error handling, so for consistency with them we do nothing. Don't throw in SyncContext callbacks.
134-
} finally {
150+
}
151+
finally
152+
{
135153
item.Signal?.Set();
136154
}
137155
}
138-
} finally {
156+
}
157+
finally
158+
{
139159
// If an item throws, we want to ensure that the next pump gets scheduled appropriately regardless.
140160
AwaitNewData();
141161
}

src/mono/System.Private.CoreLib/src/System/Threading/ThreadPool.Browser.Mono.cs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ public bool Unregister(WaitHandle? waitObject)
2828
}
2929
}
3030

31-
public static partial class ThreadPool
31+
public static unsafe partial class ThreadPool
3232
{
3333
// Indicates whether the thread pool should yield the thread from the dispatch loop to the runtime periodically so that
3434
// the runtime may use the thread for processing other work
@@ -79,7 +79,7 @@ internal static void RequestWorkerThread()
7979
if (_callbackQueued)
8080
return;
8181
_callbackQueued = true;
82-
QueueCallback();
82+
MainThreadScheduleBackgroundJob((void*)(delegate* unmanaged[Cdecl]<void>)&BackgroundJobHandler);
8383
}
8484

8585
internal static void NotifyWorkItemProgress()
@@ -110,12 +110,13 @@ private static RegisteredWaitHandle RegisterWaitForSingleObject(
110110
throw new PlatformNotSupportedException();
111111
}
112112

113-
[DynamicDependency("Callback")]
114113
[MethodImplAttribute(MethodImplOptions.InternalCall)]
115-
private static extern void QueueCallback();
114+
internal static extern unsafe void MainThreadScheduleBackgroundJob(void* callback);
116115

117-
private static void Callback()
118-
{
116+
#pragma warning disable CS3016 // Arrays as attribute arguments is not CLS-compliant
117+
[UnmanagedCallersOnly(CallConvs = new[] { typeof(CallConvCdecl) })]
118+
#pragma warning restore CS3016
119+
private static unsafe void BackgroundJobHandler () {
119120
_callbackQueued = false;
120121
ThreadPoolWorkQueue.Dispatch();
121122
}

src/mono/System.Private.CoreLib/src/System/Threading/TimerQueue.Browser.Mono.cs

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
using System.Diagnostics;
66
using System.Runtime.CompilerServices;
77
using System.Diagnostics.CodeAnalysis;
8+
using System.Runtime.InteropServices;
89

910
namespace System.Threading
1011
{
@@ -13,7 +14,7 @@ namespace System.Threading
1314
// Based on TimerQueue.Portable.cs
1415
// Not thread safe
1516
//
16-
internal partial class TimerQueue
17+
internal unsafe partial class TimerQueue
1718
{
1819
private static List<TimerQueue>? s_scheduledTimers;
1920
private static List<TimerQueue>? s_scheduledTimersToFire;
@@ -27,19 +28,20 @@ private TimerQueue(int _)
2728
{
2829
}
2930

30-
[DynamicDependency("TimeoutCallback")]
3131
// This replaces the current pending setTimeout with shorter one
3232
[MethodImplAttribute(MethodImplOptions.InternalCall)]
33-
private static extern void SetTimeout(int timeout);
33+
private static extern unsafe void MainThreadScheduleTimer(void* callback, int shortestDueTimeMs);
3434

35-
// Called by mini-wasm.c:mono_set_timeout_exec
36-
private static void TimeoutCallback()
37-
{
35+
#pragma warning disable CS3016 // Arrays as attribute arguments is not CLS-compliant
36+
[UnmanagedCallersOnly(CallConvs = new[] { typeof(CallConvCdecl) })]
37+
#pragma warning restore CS3016
38+
// Called by mini-wasm.c:mono_wasm_execute_timer
39+
private static unsafe void TimerHandler () {
3840
// always only have one scheduled at a time
3941
s_shortestDueTimeMs = long.MaxValue;
4042

4143
long currentTimeMs = TickCount64;
42-
ReplaceNextSetTimeout(PumpTimerQueue(currentTimeMs), currentTimeMs);
44+
ReplaceNextTimer(PumpTimerQueue(currentTimeMs), currentTimeMs);
4345
}
4446

4547
// this is called with shortest of timers scheduled on the particular TimerQueue
@@ -57,13 +59,13 @@ private bool SetTimer(uint actualDuration)
5759

5860
_scheduledDueTimeMs = currentTimeMs + (int)actualDuration;
5961

60-
ReplaceNextSetTimeout(ShortestDueTime(), currentTimeMs);
62+
ReplaceNextTimer(ShortestDueTime(), currentTimeMs);
6163

6264
return true;
6365
}
6466

6567
// shortest time of all TimerQueues
66-
private static void ReplaceNextSetTimeout(long shortestDueTimeMs, long currentTimeMs)
68+
private static void ReplaceNextTimer(long shortestDueTimeMs, long currentTimeMs)
6769
{
6870
if (shortestDueTimeMs == long.MaxValue)
6971
{
@@ -77,7 +79,7 @@ private static void ReplaceNextSetTimeout(long shortestDueTimeMs, long currentTi
7779
int shortestWait = Math.Max((int)(shortestDueTimeMs - currentTimeMs), 0);
7880
// this would cancel the previous schedule and create shorter one
7981
// it is expensive call
80-
SetTimeout(shortestWait);
82+
MainThreadScheduleTimer((void*)(delegate* unmanaged[Cdecl]<void>)&TimerHandler, shortestWait);
8183
}
8284
}
8385

src/mono/mono/metadata/gc.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -695,7 +695,7 @@ mono_wasm_gc_finalize_notify (void)
695695
/* use this if we are going to start the finalizer thread on wasm. */
696696
mono_coop_sem_post (&finalizer_sem);
697697
#else
698-
mono_threads_schedule_background_job (mono_runtime_do_background_work);
698+
mono_main_thread_schedule_background_job (mono_runtime_do_background_work);
699699
#endif
700700
}
701701

src/mono/mono/metadata/sgen-mono.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2897,7 +2897,7 @@ sgen_client_binary_protocol_collection_end (int minor_gc_count, int generation,
28972897
void
28982898
sgen_client_schedule_background_job (void (*cb)(void))
28992899
{
2900-
mono_threads_schedule_background_job (cb);
2900+
mono_main_thread_schedule_background_job (cb);
29012901
}
29022902

29032903
#endif

0 commit comments

Comments
 (0)