How to use AsyncStream in Swift | Modern Swift Concurrency #18
When values arrive over time — from a timer, a delegate callback, a notification, or a socket — AsyncStream lets you wrap that push-based source into a pull-based AsyncSequence that any for await loop can consume, bridging callback-based APIs cleanly into Swift's structured concurrency model.
What You'll Learn
- What
AsyncStreamis and why it exists as the bridge between callback/delegate APIs andasync/await - How the
continuationobject works as the producer-side handle for yielding and finishing the stream - Why
onTerminationis critical for preventing resource leaks when the consumer cancels - How structured task cancellation propagates from a
for awaitloop into theAsyncStreamproducer - When to use
AsyncStreamvs.AsyncThrowingStreamand when simplerasyncfunctions suffice
Mental Model
AsyncStream is like a radio broadcast. The broadcaster (the producer inside the AsyncStream closure) pushes values onto the air whenever they are ready — continuation.yield(value) is the transmission. The listener (the consumer with for await) tunes in and receives transmissions one at a time, in order, without polling or callbacks.
The broadcaster doesn't know how many listeners there are, and the listener doesn't know when the next transmission will arrive — they simply suspend and wait. When the broadcaster finishes the show, they call continuation.finish() and go off the air. The listener's for await loop ends naturally.
The important twist: if the listener walks away and turns off their radio (task cancelled), the broadcast should also stop — there is no point continuing to produce values nobody is receiving. onTermination is the broadcaster's notification that the last listener has tuned out, so they can shut down their transmitter (cancel the internal task, close the socket, invalidate the timer).
Detailed Explanation
AsyncStream<Element> is a concrete implementation of AsyncSequence designed specifically as a bridge between imperative, callback-based code and Swift's structured concurrency. Before AsyncStream, wrapping a delegate-based API (like CLLocationManagerDelegate) for use with async/await required custom AsyncSequence implementations with actors, continuation storage, and careful cancellation handling. AsyncStream provides all of that infrastructure in one type.
An AsyncStream is created by passing a closure that receives a Continuation. The continuation has three key methods: yield(_:) sends a new value to any waiting consumer; finish() signals that no more values will be produced and terminates the for await loop; and onTermination is a property you set to receive a callback when the stream is cancelled or finished from the consumer side.
The producer and consumer live on different sides of a buffer. The continuation buffers values until the consumer is ready to receive them. The default buffer policy is .unbounded, meaning an arbitrarily fast producer can enqueue an arbitrary number of values. For sensors or timers this is fine, but for a high-frequency data source you should specify a bounded buffer with a dropping or buffering policy.
Task cancellation integrates cleanly: when the Task running the for await loop is cancelled, the loop exits at the next suspension point and the AsyncStream's onTermination closure fires. This is your signal to clean up the producer side — cancel timers, close connections, invalidate delegates. Without onTermination, the producer continues producing values nobody receives, wasting resources.
AsyncStream is for non-throwing sequences. If the producer can fail, use AsyncThrowingStream<Element, Error> instead. The for await loop then requires try — for try await value in stream. For a sequence that emits exactly one value and then finishes, a plain async function is simpler than AsyncStream and should be preferred.
Code Structure
The sample file (18-how-to-use-asyncstream-in-swift.swift) contains AsyncStreamCounter (the producer: creates a stream that emits integers up to a limit, one every 400ms), AsyncStreamLessonViewModel (the consumer: iterates the stream, collects values in a @Published array, and manages task cancellation), and AsyncStreamLessonView (the SwiftUI view with Start/Stop toolbar buttons).
Complete Code
18-how-to-use-asyncstream-in-swift.swift
import SwiftUI
// Producer type: wraps the asynchronous counting logic in an AsyncStream.
struct AsyncStreamCounter {
// Returns a new AsyncStream each time it is called — each call starts a fresh sequence.
func values(upTo limit: Int) -> AsyncStream<Int> {
// The AsyncStream closure runs immediately when the stream is created.
// `continuation` is the handle for emitting values and signaling completion.
AsyncStream { continuation in
// Internal Task drives the production of values asynchronously.
// This Task is NOT structured — it is the producer's own lifecycle management.
let task = Task {
for value in 1...limit {
guard !Task.isCancelled else { break } // stop producing if cancelled
try? await Task.sleep(nanoseconds: 400_000_000) // 400ms between each value
continuation.yield(value) // push this value to any waiting consumer
}
continuation.finish() // signal end of sequence; the for await loop will exit
}
// onTermination fires when the consumer cancels or the stream finishes.
// We cancel the internal Task to stop producing values nobody is listening to.
continuation.onTermination = { _ in
task.cancel() // clean up: stop the internal counting task
}
}
}
}
// @MainActor: all @Published mutations are on the main thread automatically.
@MainActor
final class AsyncStreamLessonViewModel: ObservableObject {
@Published var values: [Int] = [] // displayed in the list; grows as new values arrive
private let counter = AsyncStreamCounter() // the producer
private var task: Task<Void, Never>? // retained so we can cancel it
// Starts a new count from the beginning, cancelling any previous in-progress count.
func start() {
task?.cancel() // cancel previous stream before starting a new one
values = [] // reset displayed values
// This Task owns the for await loop. Cancelling this task
// propagates into the AsyncStream via onTermination.
task = Task {
for await value in counter.values(upTo: 5) {
values.append(value) // @MainActor ensures this is a safe main-thread update
}
}
}
// Cancels the in-progress count mid-stream.
func stop() {
task?.cancel() // cancels the for await loop → triggers onTermination → cancels producer task
task = nil
}
}
struct AsyncStreamLessonView: View {
@StateObject private var viewModel = AsyncStreamLessonViewModel()
var body: some View {
List(viewModel.values, id: \.self) { value in
Text("Value \(value)")
}
.toolbar {
Button("Start") { viewModel.start() } // cancels previous stream, starts fresh
Button("Stop") { viewModel.stop() } // halts current stream mid-count
}
}
}Code Walkthrough
AsyncStream { continuation in ... }— The closure body runs synchronously when the stream is created (i.e., whencounter.values(upTo: 5)is called). Inside, you set up the production infrastructure. You do not callcontinuation.yieldsynchronously in the closure — instead, you launch an asynchronous producer (here aTask) that callsyieldover time.The internal
let task = Task { ... }— This task is the actual producer. It runs independently of the consumer's task. TheTask.isCancelledcheck inside the loop ensures that iftask.cancel()is called (viaonTermination), the loop exits cleanly at the next iteration rather than producing more values.continuation.yield(value)— Deposits a value into the stream's buffer. If the consumer'sfor awaitis currently suspended waiting for the next value, the consumer is resumed immediately. If the consumer is busy, the value sits in the buffer until the consumer is ready.continuation.finish()— Sends an end-of-sequence signal. The consumer'sfor awaitloop will exit naturally after processing all buffered values. If you omitfinish(), the loop will hang forever waiting for a value that never comes — a deadlock.continuation.onTermination = { _ in task.cancel() }— This is the resource cleanup hook. It fires when: (a) the consumer cancels its task, (b)finish()is called, or (c) the stream is deallocated. By cancelling the internal producer task here, you ensure the producer and consumer lifecycles are coupled — the producer stops when the consumer stops.task?.cancel()at the start ofstart()— If the user taps "Start" while a previous count is in progress, the previous task is cancelled before starting a new one. This prevents two concurrentfor awaitloops over two different streams both trying to append to the samevaluesarray, which would produce interleaved, out-of-order results.
Common Mistakes
Mistake: Forgetting to call continuation.finish() and wondering why the for await loop never exits.AsyncStream does not know when you are done producing values unless you tell it. Without finish(), the for await loop suspends indefinitely after the last value, waiting for a value that never arrives. This hangs the consuming task permanently. Always call finish() when the producer's work is done — either at the end of a finite loop, or when a termination condition is met.
Mistake: Not implementing onTermination and allowing a resource (timer, socket, delegate) to keep running after the consumer cancels.
If the consumer's task is cancelled mid-stream and onTermination is not set, the internal producer task continues running, the timer keeps firing, or the socket stays open. For a counter this is just wasted CPU; for a network socket or hardware sensor, it is a resource and battery drain. Always set onTermination to cancel or clean up anything the producer started.
Mistake: Using AsyncStream for a function that only returns one value.AsyncStream is for sequences of values over time. If your producer calls yield exactly once and then finish, a plain async function that returns the value directly is simpler, more readable, and more efficient. Reserve AsyncStream for genuinely ongoing sequences: timers, location updates, socket messages, Bluetooth events, notification streams.
Key Takeaways
AsyncStreambridges push-based, callback-style APIs intoasync/awaitby providing acontinuationobject whoseyieldandfinishmethods drive the sequence from the producer side.onTerminationis not optional in production code — it is the mechanism that couples the producer's lifecycle to the consumer's, preventing resource leaks when the consumer cancels.- Task cancellation propagates correctly through
AsyncStream: cancelling the consumer'sTasktriggersonTermination, which should cancel the producer's internal task, closing the resource loop cleanly.
Last updated: June 27, 2026