Skip to content

Commit fd3b33a

Browse files
author
rickymo
committed
- init scope synchronously
- cancel scope gracefully when close so that cancellation error can be thrown to running stream
1 parent 02aa39c commit fd3b33a

2 files changed

Lines changed: 58 additions & 63 deletions

File tree

Sources/SwiftTaskQueue/SwiftTaskQueue.swift

Lines changed: 19 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -32,32 +32,24 @@ public class TaskQueue{
3232

3333
public let label:String?
3434

35-
private var pendingTasksContinuation: AsyncStream<PendingTask>.Continuation?
35+
private var pendingTasksContinuation: AsyncStream<PendingTask>.Continuation
3636

37-
private var pendingTasks: AsyncStream<PendingTask>?
37+
private var pendingTasks: AsyncStream<PendingTask>
3838

3939
private var scope: Task<Void,Never>?
4040

41-
private let initTaskDispatchQueue = DispatchQueue(label: "initTask")
41+
private var isScopeCancelled = false
4242

43-
private var initTask: Task<Void,Error>?
4443

45-
private var preInitPendingTasks: [PendingTask] = []
46-
47-
private func initScope(initContinuation:CheckedContinuation<Void,Never>)
44+
private func initScope()
4845
{
49-
pendingTasks = AsyncStream{ continuation in
50-
pendingTasksContinuation = continuation
51-
// print("pendingTasksContinuation initialized")
52-
initContinuation.resume()
53-
}
5446
scope = Task{
55-
guard let pendingTasks = pendingTasks else { return }
5647
for await pendingTask in pendingTasks
5748
{
5849
// print("PendingTask \(pendingTask.label ?? "") received", label ?? "")
5950
// print("\(label ?? "TaskQueue"): scope isCancelled \(Task.isCancelled)")
6051
if(Task.isCancelled){ break }
52+
if(isScopeCancelled){ break }
6153
if(pendingTask.isCancelled) { continue }
6254
if let task = pendingTask as? AsyncTask
6355
{
@@ -80,6 +72,9 @@ public class TaskQueue{
8072
// print("StreamTask \(pendingTask.tag ?? "") start",source: tag)
8173
for try await value in AsyncThrowingStream(Any.self, task.block)
8274
{
75+
//check task cancelled
76+
// print("StreamTask cancelled=\(Task.isCancelled)")
77+
if(isScopeCancelled){ throw CancellationError() }
8378
// print("StreamTask \(pendingTask.tag ?? "") yield",source: tag)
8479
task.continuation.yield(value)
8580
}
@@ -88,6 +83,7 @@ public class TaskQueue{
8883
}
8984
catch
9085
{
86+
// print("StreamTask error \(error)")
9187
// log.error("StreamTask \(pendingTask.tag ?? "") error \(error)",source: tag)
9288
task.continuation.finish(throwing: error)
9389
}
@@ -98,6 +94,7 @@ public class TaskQueue{
9894
// print("PendingTask discard \(pendingTask)", label ?? "")
9995
}
10096
if(Task.isCancelled){ break }
97+
if(isScopeCancelled){ break }
10198
}
10299
for await pendingTask in pendingTasks
103100
{
@@ -115,65 +112,27 @@ public class TaskQueue{
115112

116113
public init(label: String? = nil){
117114
self.label = label
118-
initTask = Task{
119-
await withCheckedContinuation{ initScope(initContinuation: $0) }
120-
//pendingTasks should be available since here
121-
// print("initScope done. pendingTasksContinuation=\(pendingTasksContinuation)")
122-
try initTaskDispatchQueue.sync {
123-
if let pendingTasksContinuation = pendingTasksContinuation
124-
{
125-
126-
// print("yield preInitPendingTasks start \(preInitPendingTasks.count)")
127-
for pendingTask in preInitPendingTasks
128-
{
129-
pendingTasksContinuation.yield(pendingTask)
130-
}
131-
// print("yield preInitPendingTasks done \(preInitPendingTasks.count)")
132-
133-
}
134-
else
135-
{
136-
throw fatalError("pendingTasksContinuation not available after init")
137-
}
138-
}
139-
// print("initTask done")
140-
initTask = nil
141-
}
115+
(pendingTasks,pendingTasksContinuation) = AsyncStream.makeStream()
116+
initScope()
142117
}
143118

144119
public func close()
145120
{
146-
if let scope = scope,
147-
!scope.isCancelled
121+
if !isScopeCancelled
148122
{
149-
scope.cancel()
123+
isScopeCancelled = true
150124
}
151125
}
152126

153127
public func dispatch(label:String?=nil,block: @escaping () async throws -> Void)
154128
{
155-
if initTask == nil, let pendingTasksContinuation = pendingTasksContinuation
156-
{
129+
157130
// print("yield directly \(label)")
158-
pendingTasksContinuation.yield(AsyncTask(label: label, continuation: nil, block: block))
159-
}
160-
else
161-
{
162-
initTaskDispatchQueue.sync {
163-
// print("append preInitPendingTasks \(label) start \(preInitPendingTasks.count)")
164-
preInitPendingTasks.append(AsyncTask(label: label, continuation: nil, block: block))
165-
// print("append preInitPendingTasks \(label) done \(preInitPendingTasks.count)")
166-
}
167-
}
131+
pendingTasksContinuation.yield(AsyncTask(label: label, continuation: nil, block: block))
168132
}
169133

170134
public func dispatch<T>(label:String?=nil,block: @escaping () async throws -> T) async throws -> T
171135
{
172-
if let initTask = initTask
173-
{
174-
try await initTask.value
175-
}
176-
177136
var pendingTask : AsyncTask?
178137
let cancel = {
179138
pendingTask?.isCancelled = true
@@ -182,7 +141,7 @@ public class TaskQueue{
182141
return (try await withCheckedThrowingContinuation({ continuation in
183142
let task = AsyncTask(label: label, continuation: continuation, block: block)
184143
pendingTask = task
185-
pendingTasksContinuation?.yield(task)
144+
pendingTasksContinuation.yield(task)
186145
})) as! T
187146
} onCancel: {
188147
cancel()
@@ -192,7 +151,8 @@ public class TaskQueue{
192151
public func dispatchStream<T>( label:String?=nil, block:@escaping (AsyncThrowingStream<T,Error>.Continuation) -> Void) -> AsyncThrowingStream<T,Error>
193152
{
194153
let anyStream = AsyncThrowingStream<Any,Error> { continuation in
195-
pendingTasksContinuation?.yield(StreamTask(label: label, continuation: continuation, block: { anyContinuation in
154+
pendingTasksContinuation.yield(StreamTask(label: label, continuation: continuation, block: { anyContinuation in
155+
196156
Task{
197157
do
198158
{
@@ -212,10 +172,6 @@ public class TaskQueue{
212172
return AsyncThrowingStream<T,Error> { typedContinuation in
213173
Task
214174
{
215-
if let initTask = initTask
216-
{
217-
try await initTask.value
218-
}
219175
do{
220176
for try await element in anyStream
221177
{

Tests/SwiftTaskQueueTests/SwiftTaskQueueTests.swift

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,45 @@ final class SwiftTaskQueueTests: XCTestCase {
7777
XCTAssertFalse(resultStr.contains("4"))
7878
}
7979

80+
func testCloseWhileStreamTask() async throws{
81+
print("testCloseWhileStreamTask start")
82+
let taskQueue = TaskQueue()
83+
let stream = taskQueue.dispatchStream(label: "task") { continuation in
84+
print("\(Date()) dispatchStream start")
85+
Task{
86+
print("\(Date()) dispatchStream task start")
87+
for i in 3..<10
88+
{
89+
try? await Task.sleep(nanoseconds: 1000000000)
90+
print("\(Date()) dispatchStream task yield \(i)")
91+
continuation.yield(i)
92+
}
93+
continuation.finish()
94+
print("\(Date()) dispatchStream task end")
95+
}
96+
}
97+
print("testCloseWhileStreamTask do")
98+
Task{
99+
try? await Task.sleep(nanoseconds: 5000000000)
100+
taskQueue.close()
101+
}
102+
var hasThrown = false
103+
do{
104+
var results : String = ""
105+
for try await result in stream
106+
{
107+
print("task result \(result)")
108+
results += "\(result)"
109+
}
110+
print("task result \(results)")
111+
}
112+
catch
113+
{
114+
print("task error \(error)")
115+
hasThrown = true
116+
}
117+
XCTAssertTrue(hasThrown)
118+
}
80119

81120
actor TestResult{
82121
var result : String = ""

0 commit comments

Comments
 (0)