diff --git a/Sources/AsyncAlgorithms/Channels/AsyncChannel.swift b/Sources/AsyncAlgorithms/Channels/AsyncChannel.swift index c94ab57b..d23aac49 100644 --- a/Sources/AsyncAlgorithms/Channels/AsyncChannel.swift +++ b/Sources/AsyncAlgorithms/Channels/AsyncChannel.swift @@ -35,7 +35,10 @@ public final class AsyncChannel: AsyncSequence, Sendable { /// If the channel is already finished then this returns immediately. /// If the task is cancelled, this function will resume without sending the element. /// Other sending operations from other tasks will remain active. - public func send(_ element: Element) async { + public func send( + isolation: isolated (any Actor)? = #isolation, + _ element: Element + ) async { await self.storage.send(element: element) } @@ -55,7 +58,15 @@ public final class AsyncChannel: AsyncSequence, Sendable { public mutating func next() async -> Element? { // Although the storage can throw, its usage in the context of an `AsyncChannel` guarantees it cannot. // There is no public way of sending a failure to it. - try! await self.storage.next() + try! await self.storage.next(isolation: nil) + } + + public mutating func next( + isolation actor: isolated (any Actor)? + ) async -> Element? { + // Although the storage can throw, its usage in the context of an `AsyncChannel` guarantees it cannot. + // There is no public way of sending a failure to it. + try! await self.storage.next(isolation: actor) } } } diff --git a/Sources/AsyncAlgorithms/Channels/AsyncThrowingChannel.swift b/Sources/AsyncAlgorithms/Channels/AsyncThrowingChannel.swift index 622cdc4d..1e042077 100644 --- a/Sources/AsyncAlgorithms/Channels/AsyncThrowingChannel.swift +++ b/Sources/AsyncAlgorithms/Channels/AsyncThrowingChannel.swift @@ -34,7 +34,10 @@ public final class AsyncThrowingChannel: Asyn /// If the channel is already finished then this returns immediately. /// If the task is cancelled, this function will resume without sending the element. /// Other sending operations from other tasks will remain active. - public func send(_ element: Element) async { + public func send( + isolation: isolated (any Actor)? = #isolation, + _ element: Element + ) async { await self.storage.send(element: element) } @@ -58,7 +61,17 @@ public final class AsyncThrowingChannel: Asyn let storage: ChannelStorage public mutating func next() async throws -> Element? { - try await self.storage.next() + try await self.storage.next(isolation: nil) + } + + public mutating func next( + isolation actor: isolated (any Actor)? + ) async throws(Failure) -> Element? { + do { + return try await self.storage.next(isolation: actor) + } catch { + throw error as! Failure + } } } } diff --git a/Sources/AsyncAlgorithms/Channels/ChannelStorage.swift b/Sources/AsyncAlgorithms/Channels/ChannelStorage.swift index ad180fac..8fc526a3 100644 --- a/Sources/AsyncAlgorithms/Channels/ChannelStorage.swift +++ b/Sources/AsyncAlgorithms/Channels/ChannelStorage.swift @@ -24,7 +24,10 @@ struct ChannelStorage: Sendable { } } - func send(element: Element) async { + func send( + isolation: isolated (any Actor)? = #isolation, + element: Element + ) async { // check if a suspension is needed let action = self.stateMachine.withCriticalRegion { stateMachine in stateMachine.send() @@ -44,6 +47,8 @@ struct ChannelStorage: Sendable { await withTaskCancellationHandler { // a suspension is needed await withUnsafeContinuation { (continuation: UnsafeContinuation) in + // ensure isolated param is transitively captured so we don't hop executors unnecessarily + assert(isolation === #isolation) let action = self.stateMachine.withCriticalRegion { stateMachine in stateMachine.sendSuspended(continuation: continuation, element: element, producerID: producerID) } @@ -90,7 +95,9 @@ struct ChannelStorage: Sendable { } } - func next() async throws -> Element? { + func next( + isolation: isolated (any Actor)? = #isolation + ) async throws -> Element? { let action = self.stateMachine.withCriticalRegion { stateMachine in stateMachine.next() } @@ -108,6 +115,8 @@ struct ChannelStorage: Sendable { return try await withTaskCancellationHandler { try await withUnsafeThrowingContinuation { (continuation: UnsafeContinuation) in + // ensure isolated param is transitively captured so we don't hop executors unnecessarily + assert(isolation === #isolation) let action = self.stateMachine.withCriticalRegion { stateMachine in stateMachine.nextSuspended( continuation: continuation, diff --git a/Tests/AsyncAlgorithmsTests/TestChannel.swift b/Tests/AsyncAlgorithmsTests/TestChannel.swift index b5d28cd9..d3222e2b 100644 --- a/Tests/AsyncAlgorithmsTests/TestChannel.swift +++ b/Tests/AsyncAlgorithmsTests/TestChannel.swift @@ -173,4 +173,36 @@ final class TestChannel: XCTestCase { let collected2 = await task2.value XCTAssertEqual(collected2, 1) } + + @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) + @MainActor + func test_asyncChannel_preserves_order_when_isolated() async { + // Given: an AsyncChannel + let sut = AsyncChannel() + + let iterationRange = 1...10 + var taskSendOrder = [Int]() + var channelIterationOrder = [Int]() + + // When: Consuming from one isolated task + let consumingTask = Task { @MainActor in + for await value in sut { + channelIterationOrder.append(value) + if value == iterationRange.upperBound { break } + } + } + + // When: Producing from several other similarly-isolated tasks + for element in iterationRange { + Task { @MainActor in + taskSendOrder.append(element) + await sut.send(element) + } + } + + await consumingTask.value + + // Then: Elements are received in the order that the producer tasks were started. + XCTAssertEqual(channelIterationOrder, taskSendOrder) + } } diff --git a/Tests/AsyncAlgorithmsTests/TestThrowingChannel.swift b/Tests/AsyncAlgorithmsTests/TestThrowingChannel.swift index 20a0ddc6..46b7a6cd 100644 --- a/Tests/AsyncAlgorithmsTests/TestThrowingChannel.swift +++ b/Tests/AsyncAlgorithmsTests/TestThrowingChannel.swift @@ -311,4 +311,36 @@ final class TestThrowingChannel: XCTestCase { let collected2 = try await task2.value XCTAssertEqual(collected2, 1) } + + @available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) + @MainActor + func test_asyncThrowingChannel_preserves_order_when_isolated() async throws { + // Given: an AsyncChannel + let sut = AsyncThrowingChannel() + + let iterationRange = 1...10 + var taskSendOrder = [Int]() + var channelIterationOrder = [Int]() + + // When: Consuming from one isolated task + let consumingTask = Task { @MainActor in + for try await value in sut { + channelIterationOrder.append(value) + if value == iterationRange.upperBound { break } + } + } + + // When: Producing from several other similarly-isolated tasks + for element in iterationRange { + Task { @MainActor in + taskSendOrder.append(element) + await sut.send(element) + } + } + + try await consumingTask.value + + // Then: Elements are received in the order that the producer tasks were started. + XCTAssertEqual(channelIterationOrder, taskSendOrder) + } }