Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions Sources/AsyncAlgorithms/Channels/AsyncChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ public final class AsyncChannel<Element: Sendable>: AsyncSequence, Sendable {
/// If the channel is already finished then this returns immediately.
/// If the task is cancelled, this function will resume without sending the element.
/// Other sending operations from other tasks will remain active.
public func send(_ element: Element) async {
public func send(
isolation: isolated (any Actor)? = #isolation,
_ element: Element
) async {
await self.storage.send(element: element)
}

Expand All @@ -55,7 +58,15 @@ public final class AsyncChannel<Element: Sendable>: AsyncSequence, Sendable {
public mutating func next() async -> Element? {
// Although the storage can throw, its usage in the context of an `AsyncChannel` guarantees it cannot.
// There is no public way of sending a failure to it.
try! await self.storage.next()
try! await self.storage.next(isolation: nil)
}

public mutating func next(
isolation actor: isolated (any Actor)?
) async -> Element? {
// Although the storage can throw, its usage in the context of an `AsyncChannel` guarantees it cannot.
// There is no public way of sending a failure to it.
try! await self.storage.next(isolation: actor)
}
}
}
Expand Down
17 changes: 15 additions & 2 deletions Sources/AsyncAlgorithms/Channels/AsyncThrowingChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ public final class AsyncThrowingChannel<Element: Sendable, Failure: Error>: Asyn
/// If the channel is already finished then this returns immediately.
/// If the task is cancelled, this function will resume without sending the element.
/// Other sending operations from other tasks will remain active.
public func send(_ element: Element) async {
public func send(
isolation: isolated (any Actor)? = #isolation,
_ element: Element
) async {
await self.storage.send(element: element)
}

Expand All @@ -58,7 +61,17 @@ public final class AsyncThrowingChannel<Element: Sendable, Failure: Error>: Asyn
let storage: ChannelStorage<Element, Failure>

public mutating func next() async throws -> Element? {
try await self.storage.next()
try await self.storage.next(isolation: nil)
}

public mutating func next(
isolation actor: isolated (any Actor)?
) async throws(Failure) -> Element? {
do {
return try await self.storage.next(isolation: actor)
} catch {
throw error as! Failure
}
}
}
}
Expand Down
13 changes: 11 additions & 2 deletions Sources/AsyncAlgorithms/Channels/ChannelStorage.swift
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ struct ChannelStorage<Element: Sendable, Failure: Error>: Sendable {
}
}

func send(element: Element) async {
func send(
isolation: isolated (any Actor)? = #isolation,
element: Element
) async {
// check if a suspension is needed
let action = self.stateMachine.withCriticalRegion { stateMachine in
stateMachine.send()
Expand All @@ -44,6 +47,8 @@ struct ChannelStorage<Element: Sendable, Failure: Error>: Sendable {
await withTaskCancellationHandler {
// a suspension is needed
await withUnsafeContinuation { (continuation: UnsafeContinuation<Void, Never>) in
// ensure isolated param is transitively captured so we don't hop executors unnecessarily
assert(isolation === #isolation)
let action = self.stateMachine.withCriticalRegion { stateMachine in
stateMachine.sendSuspended(continuation: continuation, element: element, producerID: producerID)
}
Expand Down Expand Up @@ -90,7 +95,9 @@ struct ChannelStorage<Element: Sendable, Failure: Error>: Sendable {
}
}

func next() async throws -> Element? {
func next(
isolation: isolated (any Actor)? = #isolation
) async throws -> Element? {
let action = self.stateMachine.withCriticalRegion { stateMachine in
stateMachine.next()
}
Expand All @@ -108,6 +115,8 @@ struct ChannelStorage<Element: Sendable, Failure: Error>: Sendable {

return try await withTaskCancellationHandler {
try await withUnsafeThrowingContinuation { (continuation: UnsafeContinuation<Element?, any Error>) in
// ensure isolated param is transitively captured so we don't hop executors unnecessarily
assert(isolation === #isolation)
let action = self.stateMachine.withCriticalRegion { stateMachine in
stateMachine.nextSuspended(
continuation: continuation,
Expand Down
32 changes: 32 additions & 0 deletions Tests/AsyncAlgorithmsTests/TestChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -173,4 +173,36 @@ final class TestChannel: XCTestCase {
let collected2 = await task2.value
XCTAssertEqual(collected2, 1)
}

@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
@MainActor
func test_asyncChannel_preserves_order_when_isolated() async {
// Given: an AsyncChannel
let sut = AsyncChannel<Int>()

let iterationRange = 1...10
var taskSendOrder = [Int]()
var channelIterationOrder = [Int]()

// When: Consuming from one isolated task
let consumingTask = Task { @MainActor in
for await value in sut {
channelIterationOrder.append(value)
if value == iterationRange.upperBound { break }
}
}

// When: Producing from several other similarly-isolated tasks
for element in iterationRange {
Task { @MainActor in
taskSendOrder.append(element)
await sut.send(element)
}
}

await consumingTask.value

// Then: Elements are received in the order that the producer tasks were started.
XCTAssertEqual(channelIterationOrder, taskSendOrder)
}
}
32 changes: 32 additions & 0 deletions Tests/AsyncAlgorithmsTests/TestThrowingChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -311,4 +311,36 @@ final class TestThrowingChannel: XCTestCase {
let collected2 = try await task2.value
XCTAssertEqual(collected2, 1)
}

@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
@MainActor
func test_asyncThrowingChannel_preserves_order_when_isolated() async throws {
// Given: an AsyncChannel
let sut = AsyncThrowingChannel<Int, Error>()

let iterationRange = 1...10
var taskSendOrder = [Int]()
var channelIterationOrder = [Int]()

// When: Consuming from one isolated task
let consumingTask = Task { @MainActor in
for try await value in sut {
channelIterationOrder.append(value)
if value == iterationRange.upperBound { break }
}
}

// When: Producing from several other similarly-isolated tasks
for element in iterationRange {
Task { @MainActor in
taskSendOrder.append(element)
await sut.send(element)
}
}

try await consumingTask.value

// Then: Elements are received in the order that the producer tasks were started.
XCTAssertEqual(channelIterationOrder, taskSendOrder)
}
}