Skip to content

How to use AsyncStream in Swift | Modern Swift Concurrency #18

When values arrive over time — from a timer, a delegate callback, a notification, or a socket — AsyncStream lets you wrap that push-based source into a pull-based AsyncSequence that any for await loop can consume, bridging callback-based APIs cleanly into Swift's structured concurrency model.

What You'll Learn

  • What AsyncStream is and why it exists as the bridge between callback/delegate APIs and async/await
  • How the continuation object works as the producer-side handle for yielding and finishing the stream
  • Why onTermination is critical for preventing resource leaks when the consumer cancels
  • How structured task cancellation propagates from a for await loop into the AsyncStream producer
  • When to use AsyncStream vs. AsyncThrowingStream and when simpler async functions suffice

Mental Model

AsyncStream is like a radio broadcast. The broadcaster (the producer inside the AsyncStream closure) pushes values onto the air whenever they are ready — continuation.yield(value) is the transmission. The listener (the consumer with for await) tunes in and receives transmissions one at a time, in order, without polling or callbacks.

The broadcaster doesn't know how many listeners there are, and the listener doesn't know when the next transmission will arrive — they simply suspend and wait. When the broadcaster finishes the show, they call continuation.finish() and go off the air. The listener's for await loop ends naturally.

The important twist: if the listener walks away and turns off their radio (task cancelled), the broadcast should also stop — there is no point continuing to produce values nobody is receiving. onTermination is the broadcaster's notification that the last listener has tuned out, so they can shut down their transmitter (cancel the internal task, close the socket, invalidate the timer).

Detailed Explanation

AsyncStream<Element> is a concrete implementation of AsyncSequence designed specifically as a bridge between imperative, callback-based code and Swift's structured concurrency. Before AsyncStream, wrapping a delegate-based API (like CLLocationManagerDelegate) for use with async/await required custom AsyncSequence implementations with actors, continuation storage, and careful cancellation handling. AsyncStream provides all of that infrastructure in one type.

An AsyncStream is created by passing a closure that receives a Continuation. The continuation has three key methods: yield(_:) sends a new value to any waiting consumer; finish() signals that no more values will be produced and terminates the for await loop; and onTermination is a property you set to receive a callback when the stream is cancelled or finished from the consumer side.

The producer and consumer live on different sides of a buffer. The continuation buffers values until the consumer is ready to receive them. The default buffer policy is .unbounded, meaning an arbitrarily fast producer can enqueue an arbitrary number of values. For sensors or timers this is fine, but for a high-frequency data source you should specify a bounded buffer with a dropping or buffering policy.

Task cancellation integrates cleanly: when the Task running the for await loop is cancelled, the loop exits at the next suspension point and the AsyncStream's onTermination closure fires. This is your signal to clean up the producer side — cancel timers, close connections, invalidate delegates. Without onTermination, the producer continues producing values nobody receives, wasting resources.

AsyncStream is for non-throwing sequences. If the producer can fail, use AsyncThrowingStream<Element, Error> instead. The for await loop then requires tryfor try await value in stream. For a sequence that emits exactly one value and then finishes, a plain async function is simpler than AsyncStream and should be preferred.

Code Structure

The sample file (18-how-to-use-asyncstream-in-swift.swift) contains AsyncStreamCounter (the producer: creates a stream that emits integers up to a limit, one every 400ms), AsyncStreamLessonViewModel (the consumer: iterates the stream, collects values in a @Published array, and manages task cancellation), and AsyncStreamLessonView (the SwiftUI view with Start/Stop toolbar buttons).

Complete Code

18-how-to-use-asyncstream-in-swift.swift

swift
import SwiftUI

// Producer type: wraps the asynchronous counting logic in an AsyncStream.
struct AsyncStreamCounter {
    // Returns a new AsyncStream each time it is called — each call starts a fresh sequence.
    func values(upTo limit: Int) -> AsyncStream<Int> {
        // The AsyncStream closure runs immediately when the stream is created.
        // `continuation` is the handle for emitting values and signaling completion.
        AsyncStream { continuation in
            // Internal Task drives the production of values asynchronously.
            // This Task is NOT structured — it is the producer's own lifecycle management.
            let task = Task {
                for value in 1...limit {
                    guard !Task.isCancelled else { break } // stop producing if cancelled
                    try? await Task.sleep(nanoseconds: 400_000_000) // 400ms between each value
                    continuation.yield(value) // push this value to any waiting consumer
                }
                continuation.finish() // signal end of sequence; the for await loop will exit
            }

            // onTermination fires when the consumer cancels or the stream finishes.
            // We cancel the internal Task to stop producing values nobody is listening to.
            continuation.onTermination = { _ in
                task.cancel() // clean up: stop the internal counting task
            }
        }
    }
}

// @MainActor: all @Published mutations are on the main thread automatically.
@MainActor
final class AsyncStreamLessonViewModel: ObservableObject {
    @Published var values: [Int] = [] // displayed in the list; grows as new values arrive

    private let counter = AsyncStreamCounter() // the producer
    private var task: Task<Void, Never>? // retained so we can cancel it

    // Starts a new count from the beginning, cancelling any previous in-progress count.
    func start() {
        task?.cancel() // cancel previous stream before starting a new one
        values = []    // reset displayed values

        // This Task owns the for await loop. Cancelling this task
        // propagates into the AsyncStream via onTermination.
        task = Task {
            for await value in counter.values(upTo: 5) {
                values.append(value) // @MainActor ensures this is a safe main-thread update
            }
        }
    }

    // Cancels the in-progress count mid-stream.
    func stop() {
        task?.cancel() // cancels the for await loop → triggers onTermination → cancels producer task
        task = nil
    }
}

struct AsyncStreamLessonView: View {
    @StateObject private var viewModel = AsyncStreamLessonViewModel()

    var body: some View {
        List(viewModel.values, id: \.self) { value in
            Text("Value \(value)")
        }
        .toolbar {
            Button("Start") { viewModel.start() } // cancels previous stream, starts fresh
            Button("Stop") { viewModel.stop() }   // halts current stream mid-count
        }
    }
}

Code Walkthrough

  1. AsyncStream { continuation in ... } — The closure body runs synchronously when the stream is created (i.e., when counter.values(upTo: 5) is called). Inside, you set up the production infrastructure. You do not call continuation.yield synchronously in the closure — instead, you launch an asynchronous producer (here a Task) that calls yield over time.

  2. The internal let task = Task { ... } — This task is the actual producer. It runs independently of the consumer's task. The Task.isCancelled check inside the loop ensures that if task.cancel() is called (via onTermination), the loop exits cleanly at the next iteration rather than producing more values.

  3. continuation.yield(value) — Deposits a value into the stream's buffer. If the consumer's for await is currently suspended waiting for the next value, the consumer is resumed immediately. If the consumer is busy, the value sits in the buffer until the consumer is ready.

  4. continuation.finish() — Sends an end-of-sequence signal. The consumer's for await loop will exit naturally after processing all buffered values. If you omit finish(), the loop will hang forever waiting for a value that never comes — a deadlock.

  5. continuation.onTermination = { _ in task.cancel() } — This is the resource cleanup hook. It fires when: (a) the consumer cancels its task, (b) finish() is called, or (c) the stream is deallocated. By cancelling the internal producer task here, you ensure the producer and consumer lifecycles are coupled — the producer stops when the consumer stops.

  6. task?.cancel() at the start of start() — If the user taps "Start" while a previous count is in progress, the previous task is cancelled before starting a new one. This prevents two concurrent for await loops over two different streams both trying to append to the same values array, which would produce interleaved, out-of-order results.

Common Mistakes

Mistake: Forgetting to call continuation.finish() and wondering why the for await loop never exits.
AsyncStream does not know when you are done producing values unless you tell it. Without finish(), the for await loop suspends indefinitely after the last value, waiting for a value that never arrives. This hangs the consuming task permanently. Always call finish() when the producer's work is done — either at the end of a finite loop, or when a termination condition is met.

Mistake: Not implementing onTermination and allowing a resource (timer, socket, delegate) to keep running after the consumer cancels.
If the consumer's task is cancelled mid-stream and onTermination is not set, the internal producer task continues running, the timer keeps firing, or the socket stays open. For a counter this is just wasted CPU; for a network socket or hardware sensor, it is a resource and battery drain. Always set onTermination to cancel or clean up anything the producer started.

Mistake: Using AsyncStream for a function that only returns one value.
AsyncStream is for sequences of values over time. If your producer calls yield exactly once and then finish, a plain async function that returns the value directly is simpler, more readable, and more efficient. Reserve AsyncStream for genuinely ongoing sequences: timers, location updates, socket messages, Bluetooth events, notification streams.

Key Takeaways

  • AsyncStream bridges push-based, callback-style APIs into async/await by providing a continuation object whose yield and finish methods drive the sequence from the producer side.
  • onTermination is not optional in production code — it is the mechanism that couples the producer's lifecycle to the consumer's, preventing resource leaks when the consumer cancels.
  • Task cancellation propagates correctly through AsyncStream: cancelling the consumer's Task triggers onTermination, which should cancel the producer's internal task, closing the resource loop cleanly.

Last updated: June 27, 2026

Released under the MIT License.