Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 71 additions & 55 deletions Sources/CollectionConcurrencyKit.swift
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,14 @@ public extension Sequence {
/// - parameter operation: The closure to run for each element.
func concurrentForEach(
withPriority priority: TaskPriority? = nil,
_ operation: @escaping (Element) async -> Void
_ operation: (Element) async -> Void
) async {
await withTaskGroup(of: Void.self) { group in
for element in self {
group.addTask(priority: priority) {
await operation(element)
await withoutActuallyEscaping(operation) { escapableOperation in
await withTaskGroup(of: Void.self) { group in
for element in self {
group.addTask(priority: priority) {
await escapableOperation(element)
}
}
}
}
Expand All @@ -62,17 +64,19 @@ public extension Sequence {
/// - throws: Rethrows any error thrown by the passed closure.
func concurrentForEach(
withPriority priority: TaskPriority? = nil,
_ operation: @escaping (Element) async throws -> Void
_ operation: (Element) async throws -> Void
) async throws {
try await withThrowingTaskGroup(of: Void.self) { group in
for element in self {
group.addTask(priority: priority) {
try await operation(element)
try await withoutActuallyEscaping(operation) { escapableOperation in
try await withThrowingTaskGroup(of: Void.self) { group in
for element in self {
group.addTask(priority: priority) {
try await escapableOperation(element)
}
}
}

// Propagate any errors thrown by the group's tasks:
for try await _ in group {}
// Propagate any errors thrown by the group's tasks:
for try await _ in group {}
}
}
}
}
Expand Down Expand Up @@ -119,16 +123,18 @@ public extension Sequence {
/// the transformed values will match the original sequence.
func concurrentMap<T>(
withPriority priority: TaskPriority? = nil,
_ transform: @escaping (Element) async -> T
_ transform: (Element) async -> T
) async -> [T] {
let tasks = map { element in
Task(priority: priority) {
await transform(element)
await withoutActuallyEscaping(transform) { escapableTransform in
let tasks = map { element in
Task(priority: priority) {
await escapableTransform(element)
}
}
}

return await tasks.asyncMap { task in
await task.value
return await tasks.asyncMap { task in
await task.value
}
}
}

Expand All @@ -150,16 +156,18 @@ public extension Sequence {
/// - throws: Rethrows any error thrown by the passed closure.
func concurrentMap<T>(
withPriority priority: TaskPriority? = nil,
_ transform: @escaping (Element) async throws -> T
_ transform: (Element) async throws -> T
) async throws -> [T] {
let tasks = map { element in
Task(priority: priority) {
try await transform(element)
try await withoutActuallyEscaping(transform) { escapableTransform in
let tasks = map { element in
Task(priority: priority) {
try await escapableTransform(element)
}
}
}

return try await tasks.asyncMap { task in
try await task.value
return try await tasks.asyncMap { task in
try await task.value
}
}
}
}
Expand Down Expand Up @@ -214,16 +222,18 @@ public extension Sequence {
/// except for the values that were transformed into `nil`.
func concurrentCompactMap<T>(
withPriority priority: TaskPriority? = nil,
_ transform: @escaping (Element) async -> T?
_ transform: (Element) async -> T?
) async -> [T] {
let tasks = map { element in
Task(priority: priority) {
await transform(element)
await withoutActuallyEscaping(transform) { escapableTransform in
let tasks = map { element in
Task(priority: priority) {
await escapableTransform(element)
}
}
}

return await tasks.asyncCompactMap { task in
await task.value
return await tasks.asyncCompactMap { task in
await task.value
}
}
}

Expand All @@ -247,16 +257,18 @@ public extension Sequence {
/// - throws: Rethrows any error thrown by the passed closure.
func concurrentCompactMap<T>(
withPriority priority: TaskPriority? = nil,
_ transform: @escaping (Element) async throws -> T?
_ transform: (Element) async throws -> T?
) async throws -> [T] {
let tasks = map { element in
Task(priority: priority) {
try await transform(element)
try await withoutActuallyEscaping(transform) { escapableTransform in
let tasks = map { element in
Task(priority: priority) {
try await escapableTransform(element)
}
}
}

return try await tasks.asyncCompactMap { task in
try await task.value
return try await tasks.asyncCompactMap { task in
try await task.value
}
}
}
}
Expand Down Expand Up @@ -309,16 +321,18 @@ public extension Sequence {
/// within the returned array.
func concurrentFlatMap<T: Sequence>(
withPriority priority: TaskPriority? = nil,
_ transform: @escaping (Element) async -> T
_ transform: (Element) async -> T
) async -> [T.Element] {
let tasks = map { element in
Task(priority: priority) {
await transform(element)
await withoutActuallyEscaping(transform) { escapableTransform in
let tasks = map { element in
Task(priority: priority) {
await escapableTransform(element)
}
}
}

return await tasks.asyncFlatMap { task in
await task.value
return await tasks.asyncFlatMap { task in
await task.value
}
}
}

Expand All @@ -343,16 +357,18 @@ public extension Sequence {
/// - throws: Rethrows any error thrown by the passed closure.
func concurrentFlatMap<T: Sequence>(
withPriority priority: TaskPriority? = nil,
_ transform: @escaping (Element) async throws -> T
_ transform: (Element) async throws -> T
) async throws -> [T.Element] {
let tasks = map { element in
Task(priority: priority) {
try await transform(element)
try await withoutActuallyEscaping(transform) { escapableTransform in
let tasks = map { element in
Task(priority: priority) {
try await escapableTransform(element)
}
}
}

return try await tasks.asyncFlatMap { task in
try await task.value
return try await tasks.asyncFlatMap { task in
try await task.value
}
}
}
}