Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 38 additions & 7 deletions Sources/ScreenStatetKit/AsyncStream/StreamProducerType.swift
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ public actor StreamProducer<Element>: StreamProducerType where Element: Sendable

typealias Continuation = AsyncStream<Element>.Continuation

public let withLatest: Bool
private var continuations: [String:Continuation] = [:]
private let storage = StreamStorage()
private var latestElement: Element?

public let withLatest: Bool

/// Events stream
public var stream: AsyncStream<Element> {
AsyncStream { continuation in
Expand All @@ -46,24 +47,23 @@ public actor StreamProducer<Element>: StreamProducerType where Element: Sendable
if withLatest {
latestElement = element
}
continuations.values.forEach({ $0.yield(element) })
storage.emit(element: element)
}

public func finish() {
continuations.values.forEach({ $0.finish() })
continuations.removeAll()
storage.finish()
}

private func append(_ continuation: Continuation) {
let key = UUID().uuidString
continuation.onTermination = {[weak self] _ in
self?.onTermination(forKey: key)
}
continuations.updateValue(continuation, forKey: key)
storage.update(continuation, forKey: key)
}

private func removeContinuation(forKey key: String) {
continuations.removeValue(forKey: key)
storage.removeContinuation(forKey: key)
}

nonisolated private func onTermination(forKey key: String) {
Expand All @@ -72,6 +72,7 @@ public actor StreamProducer<Element>: StreamProducerType where Element: Sendable
}
}

@available(*, deprecated, renamed: "finish", message: "The Stream will be automatically finished when deallocated. No need to call it manually.")
public nonisolated func nonIsolatedFinish() {
Task(priority: .high) {
await finish()
Expand All @@ -84,3 +85,33 @@ public actor StreamProducer<Element>: StreamProducerType where Element: Sendable
}
}
}

//MARK: - Storage
extension StreamProducer {
private final class StreamStorage {

private var continuations: [String:Continuation] = [:]

func emit(element: Element) {
continuations.values.forEach({ $0.yield(element) })
}

func update(_ continuation: Continuation, forKey key: String) {
continuations.updateValue(continuation, forKey: key)
}

func removeContinuation(forKey key: String) {
continuations.removeValue(forKey: key)
}

func finish() {
continuations.values.forEach({ $0.finish() })
continuations.removeAll()
}

deinit {
finish()
}
}
}

136 changes: 117 additions & 19 deletions Sources/ScreenStatetKit/Helpers/CancelBag.swift
Original file line number Diff line number Diff line change
Expand Up @@ -5,55 +5,146 @@
// Created by Anthony on 4/12/25.
//



import Foundation
import SwiftUI

/// A container that manages the lifetime of `Task`s.
///
/// Tasks stored in a ``CancelBag`` can be cancelled individually using an
/// identifier or all at once using ``cancelAll()``.
///
/// When a task finishes (successfully or with cancellation), it is automatically
/// removed from the bag.
///
/// If the ``CancelBag`` is tied to the lifetime of a view or object, all stored
/// tasks will be cancelled when the bag is deallocated.
public actor CancelBag {

private let storage: CancelBagStorage

public init() {
storage = .init()
public var isEmpty: Bool {
storage.isEmpty
}

public var count: Int {
storage.count
}

public var policy: CancelStrategy {
storage.duplicatePolicy
}

public init(duplicate policy: CancelStrategy) {
self.storage = .init(duplicatePolicy: policy)
}

/// Cancels all stored tasks and clears the bag.
public func cancelAll() {
storage.cancelAll()
}

public func cancel(forIdentifier identifier: String) {
@available(*, deprecated, renamed: "cancelAll", message: "CancelBag will automatically cancel all tasks when deallocated. No need call this method directly.")
nonisolated public func cancelAllInTask() {
Task(priority: .high) {
await cancelAll()
}
}

/// Cancels the task associated with the given identifier.
///
/// - Parameter identifier: The identifier used when storing the task.
public func cancel(forIdentifier identifier: AnyHashable) {
storage.cancel(forIdentifier: identifier)
}

/// Appends a canceller to the bag.
///
/// This method is nonisolated so tasks can store themselves without
/// requiring the caller to `await`.
private func insert(_ canceller: Canceller) {
storage.insert(canceller: canceller)
}

/// This ensures completed tasks do not remain in the bag.
/// - Parameter watchId: ``Canceller``'s `watchId`
private func removeCanceller(by watchId: UUID) async {
storage.remove(by: watchId)
}

nonisolated fileprivate func append(canceller: Canceller) {
Task(priority: .high) {
await insert(canceller)
Task {[weak self] in
await self?.insert(canceller)
await canceller.waitResult()
await self?.removeCanceller(by: canceller.watchId)
}
}
}

extension CancelBag {

/// Defines how `CancelBag` handles tasks with the same identifier.
public enum CancelStrategy: Int8, Sendable {

//// Cancel the currently executing task if a new task with the same identifier is added.
case cancelExisting

/// Cancel the newly added task if a task with the same identifier already exists.
case cancelNew
}
}

//MARK: - Storage
private final class CancelBagStorage {

private var cancellers: [String: Canceller] = [:]
private var cancellers: [AnyHashable: Canceller]
let duplicatePolicy: CancelBag.CancelStrategy

var isEmpty: Bool {
cancellers.isEmpty
}

var count: Int {
cancellers.count
}

init(duplicatePolicy: CancelBag.CancelStrategy) {
self.cancellers = .init()
self.duplicatePolicy = duplicatePolicy
}

func cancelAll() {
let runningTasks = cancellers.values.filter({ !$0.isCancelled })
runningTasks.forEach{ $0.cancel() }
cancellers.removeAll()
}

func cancel(forIdentifier identifier: String) {
func cancel(forIdentifier identifier: AnyHashable) {
guard let task = cancellers[identifier] else { return }
task.cancel()
cancellers.removeValue(forKey: identifier)
}

func remove(by watchId: UUID) {
guard let key = cancellers.first(where: { $0.value.watchId == watchId })?.key else { return }
cancellers.removeValue(forKey: key)
}

func insert(canceller: Canceller) {
cancel(forIdentifier: canceller.id)
guard let existing = cancellers[canceller.id] else {
_insert(canceller: canceller)
return
}
switch duplicatePolicy {
case .cancelExisting:
existing.cancel()
cancellers.removeValue(forKey: existing.id)
_insert(canceller: canceller)
case .cancelNew:
canceller.cancel()
}
}

private func _insert(canceller: Canceller) {
guard !canceller.isCancelled else { return }
cancellers.updateValue(canceller, forKey: canceller.id)
}
Expand All @@ -63,30 +154,37 @@ private final class CancelBagStorage {
}
}

private struct Canceller: Identifiable, Sendable {

//MARK: - Canceller
private struct Canceller {

let cancel: @Sendable () -> Void
let id: String
let waitResult: @Sendable () async -> Void
let id: AnyHashable
let watchId: UUID
var isCancelled: Bool { isCancelledBock() }

private let isCancelledBock: @Sendable () -> Bool

init<S,E>(_ task: Task<S,E>, identifier: String = UUID().uuidString) {
init<S,E>(_ task: Task<S,E>, identifier: AnyHashable) {
cancel = { task.cancel() }
waitResult = { _ = await task.result }
isCancelledBock = { task.isCancelled }
id = identifier
watchId = .init()
}
}

//MARK: - Short Path
extension Task {

public func store(in bag: CancelBag) {
let canceller = Canceller(self)
bag.append(canceller: canceller)
public func store(in bag: CancelBag?) {
let canceller = Canceller(self, identifier: .init(UUID()))
bag?.append(canceller: canceller)
}

public func store(in bag: CancelBag, withIdentifier identifier: String) {
let canceller = Canceller(self, identifier: identifier)
bag.append(canceller: canceller)
public func store(in bag: CancelBag?, withIdentifier identifier: any Hashable) {
let canceller = Canceller(self, identifier: .init(identifier))
bag?.append(canceller: canceller)
}
}
1 change: 0 additions & 1 deletion Sources/ScreenStatetKit/Helpers/LoadingTrackable.swift
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
// Created by Anthony on 4/12/25.
//


import Foundation

public protocol LoadingTrackable {
Expand Down
43 changes: 40 additions & 3 deletions Sources/ScreenStatetKit/Store/ScreenActionStore.swift
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,47 @@ import Foundation

public protocol ScreenActionStore: TypeNamed, Actor {

associatedtype AScreenState: ScreenState
associatedtype Action: Sendable & ActionLockable

func binding(state: AScreenState)
/// Handles the given action and performs the corresponding logic.
///
/// - Parameter action: The action to process.
func receive(action: Action) async
}

extension ScreenActionStore {

nonisolated func receive(action: Action)
/// `ActionStore` receive an action from a nonisolated context.
///
/// This method allows dispatching an `Action` to the actor without requiring
/// the caller to `await`. Internally it creates a `Task` that forwards the
/// action to `receive(action:)`.
///
/// If a `CancelBag` is provided, the created task will be stored in the bag
/// using the `action` as its identifier. This allows the task to be cancelled
/// later or automatically replaced if another task with the same identifier
/// is stored.
///
/// - Parameters:
/// - action: The action to send to the receiver.
/// - canceller: An optional `CancelBag` used to manage the lifetime of the
/// created task. If provided, the task will be stored using `action`
/// as its identifier.
///
/// - Tip: If the ``CancelBag`` is tied to the lifetime of a view, its tasks will be
/// cancelled automatically when the view is destroyed. Otherwise, the tasks
/// are guaranteed to complete before the action store is deallocated.
///
/// - Note: The `Action` type must conform to `Hashable` so it can be used
/// as a unique identifier for task cancellation.
nonisolated
public func nonisolatedReceive(
action: Action,
canceller: CancelBag? = .none
)
where Action: Hashable {
Task {
await receive(action: action)
}.store(in: canceller, withIdentifier: action)
}
}
Loading
Loading