ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Swift Concurrency - Async sequence & stream
    Concurrency 2023. 3. 9. 09:10

    안녕하세요. 그린입니다🍏

    이번 포스팅에서는 비동기 시퀀스와 스트림에 대해 알아보고 기존 컴바인 코드에 어떻게 녹여내는지 학습해보겠습니다🙋🏻

     

    일반적으로 for문 같이 루프를 돌며 Swift 컬렉션을 반복할 때 반복할 코드에 전달될 요소를 결정하는데에는 두가지 핵심 요소가 있습니다.

    sequence와 iterator⭐️

    뭔가 한번씩은 다 들어봤고, 어 어디서 봤는데? 싶으실거에요!

    예를들면 Swift의 array 타입은 기본적으로 sequence 프로토콜을 준수하며 iterator 타입으로 IndexingIterator를 사용합니다.

     

    우리는 Swift 코드 작성 시 시퀀스와 직접 상호 작용하는 경우가 매우 많지만 for 루프를 사용할 때마다 Swift 언어 자체가 이러한 인스턴스를 자동으로 관리해주기 때문에 직접 다뤄볼 일이 없었을거에요.

     

    자 그런데 오늘 한번 같이 해보자구요!🙌

     

    이번 학습에서는 Swift의 동시성 세계에서 시간 경과에 따라 값을 제공하는 완전한 비동기식 시퀀스 및 스트림을 생성하는 방법을 배워볼거에요🎉🎉🎉

     

    Sequences & Iterators

    struct RemoteDataSequence: Sequence {
        var urls: [URL]
    
        func makeIterator() -> RemoteDataIterator {
            RemoteDataIterator(urls: urls)
        }
    }

    일련의 URL에서 데이터를 다운받을 수 있는 사용자 지정 시퀀스를 위처럼 만든다고 생각해볼께요.

    먼저 Sequence 프로토콜을 당연하게 채택해야 하겠죠?

    그 다음 makeIterator이라는 필수 메서드를 구현해줘야 합니다.

     

    그럼 여기서 툭 튀어나온 RemoteDataIterator이라는 타입도 만들어 볼까요?

     

    struct RemoteDataIterator: IteratorProtocol {
        var urls: [URL]
        fileprivate var index = 0
    
        mutating func next() -> Data? {
            guard index < urls.count else {
                return nil
            }
    
            let url = urls[index]
            index += 1
    
            // If a download fails, we simply move on to
            // the next URL in this case:
            guard let data = try? Data(contentsOf: url) else {
                return next()
            }
    
            return data
        }
    }

    이렇게 IteratorProtocol을 채택하고 next 메서드를 통해 새 반복을 돌려 줄 수 있어요.

    시스템에서 새 요소가 요청될 때 index를 증가시키는 방법이죠.

    새 for 루프가 시작될 때 마다 시스템이 자동으로 새로운 Iterator(시퀀스의 makeIterator 메서드를 이용해)를 생성하기에 여러번의 동시 반복을 관리할 필요는 없습니다.

     

    for data in RemoteDataSequence(urls: urls) {
        ...
    }

    이제 위 시퀀스 구현을 통해 이렇게 사용할 수 있습니다.

    커스텀한 시퀀스가 하나 만들어졌어요 짜잔!⭐️

     

    근데 위처럼 모든 데이터를 동기적으로 다운받는건 좋지 않겠죠? 

    동시성과 병렬을 중요시하는 Concurrency 세계에서는..??

    모든 다운로드가 완료될 때 까지 현재 스레드를 완전히 막아버리기 때문에도 말이죠ㅎㅎ..

     

    그렇기에 우리는 이것을 비동기 시퀀스로 변경해볼께요!

     

    비동기식 반복

    우선 반복부터 비동기로 처리할 수 있어야합니다.

    위에서 구현한 Iterator 부분 말이죠.

    struct RemoteDataSequence: AsyncSequence {
        typealias Element = Data
    
        var urls: [URL]
    
        func makeAsyncIterator() -> RemoteDataIterator {
            RemoteDataIterator(urls: urls)
        }
    }

    Swift 5.5에서는 위에서 보시는것처럼 AsyncSequence라는 것과 같이 비동기 시퀀스와 Iterator의 개념을 도입했어요.

    그렇기에 단순하게 비동기로 변경할 수 있습니다.

    우선 AsyncSequence 비동기 시퀀스 프로토콜을 채택하고 필수 메서드인 makeAsyncIterator을 구현해줍니다.

    코드에서 쓰인 Element는 컴파일러가 시퀀스 요소 유형을 추론할 수 있어야 되기 때문에 실제로 사용하진 않지만 넣어줬어요.

    그런데 Xcode 13.0 이상부터는 잘 추론하는걸로 보입니다🙋🏻

     

    그 다음으로, RemoteDataIterator에 비동기식 전환을 구현해줍니다.

    struct RemoteDataIterator: AsyncIteratorProtocol {
        var urls: [URL]
        fileprivate var urlSession = URLSession.shared
        fileprivate var index = 0
    
        mutating func next() async throws -> Data? {
            guard index < urls.count else {
                return nil
            }
    
            let url = urls[index]
            index += 1
    
            let (data, _) = try await urlSession.data(from: url)
            return data
        }
    }

    ASyncIteratorProtocol을 채택하고 next 메서드를 비동기 메서드로 전환합니다.

    그런데 데이터 다운 중 오류가 발생할 수 있음으로 throws를 붙여줍니다.

    마지막으로 urlSession 부분에서 네트워킹하는 쪽이기에 데이터를 완전히 비동기식으로 다운로드 할 수 있도록 처리해줍니다.

     

    자 이제 적용해볼까요?

    for try await data in RemoteDataSequence(urls: urls) {
        ...
    }

    for try await 구문을 통해 적용하면 끝!

    이렇게 된다면 데이터가 백그라운드에서 다운로드되고 준비가 완료되면 루프에 전달됩니다.

    여기서 오류가 나면 자동으로 for 루프를 벗어날것이기에 동기화에서도 아주 유용합니다.

    물론 모든 비동기 시퀀스가 throws를 붙여줘 이런 동작을 해줘야하진 않습니다.

    확실히 필요없다면 안쓰는게 더 좋죠 사실!

     

    자, 그 다음으로 이제 비동기 스트림에 대해 알아보시죠🙌

     

    비동기 스트림

    AsyncStreamAsyncThrowingStream이라는 유형을 통해 우리는 완전한 비동기 스트림을 구현해줄 수 있습니다.

    func remoteDataStream(
        forURLs urls: [URL],
        urlSession: URLSession = .shared
    ) -> AsyncThrowingStream<Data, Error> {
        AsyncThrowingStream { continuation in
            Task {
                do {
                    for url in urls {
                        let (data, _) = try await urlSession.data(from: url)
                        continuation.yield(data)
                    }
    
                    continuation.finish(throwing: nil)
                } catch {
                    continuation.finish(throwing: error)
                }
            }
        }
    }

    위 코드처럼 AsynThrowingStream 타입으로 지정합니다.

    내부를 감싸는데 continuation 보이시죠?

    우리가 Combine에서도 publisher를 send하여 데이터를 보내거나 complete시키거나 했습니다.

    이것처럼 사용될 친구에요!

    urls를 반복하며 데이터를 통신하여 불러오고 하나씩 들어올때마다 데이터를 yield를 통해 담아 보내줍니다.

    그럼 모든걸 기다렸다 보내는게 아닌 하나씩 들어오는걸 스트림으로 방출해줄 수 있습니다.

    완료가 되었다면 finish로 쏴주는데 에러가 없으니 nil을 넣어줍니다.

    만약 catch에서 에러가 발생하여 걸리면 finish에 error를 담아 던져줍니다.

     

    이러한 구현을 조금 더 간편하게 아래와 같이 사용할 수 있어요.

     

    func remoteDataStream(
        forURLs urls: [URL],
        urlSession: URLSession = .shared
    ) -> AsyncThrowingStream<Data, Error> {
        var index = 0
    
        return AsyncThrowingStream {
            guard index < urls.count else {
                return nil
            }
    
            let url = urls[index]
            index += 1
    
            let (data, _) = try await urlSession.data(from: url)
            return data
        }
    }

    보시면 continuation으로 yield하거나 finish하는 부분은 생략되있고 마지막에 data만 반환해줍니다.

    다만 반복 상태를 추적하기 위해 index 변수를 사용합니다.

     

    이러한 구현을 통해 실제론 아래처럼 비동기 스트림을 호출해 사용할 수 있습니다.

    for try await data in remoteDataStream(forURLs: urls) {
        ...
    }

     

    자 이제 이것들이 Combine과 어떤 관련이 있을까요?

     

    Combine과의 연관성

    Combine을 사용하신분이라면 아시겠지만 시간 흐름에 따라 값을 방출하고 관찰할 수 있습니다.

    이러한 개념과 위에서 다룬 Swift Concurrency의 비동기 스트림의 연관을 지어보죠.

    그런데 짚고 넘어가야할건 분명 Combine 자체도 충분히 괜찮고 Swift Concurrency라고해서 더 완벽하다 이런건 전혀 아닙니다.

    장점으로 들고갈건 비동기 시퀀스를 구성하기 위한 공수에 더 낮은 수준의 API를 제공해주기에 조금 더 사용이 쉽다는것이죠.

     

    자, 그럼 만약 여러분들의 기존 코드가 다 Combine으로 작업되어 있다고 가정해봅시다.
    그럴때 Swift Concurrency를 도입하고 싶다면 어떻게 변환할 수 있을까요?

     

    Combine이 이제는 완전히 호환되기에 어떤 값이든 AsyncSequence로 변환할 수 있습니다.

    func remoteDataPublisher(
        forURLs urls: [URL],
        urlSession: URLSession = .shared
    ) -> AnyPublisher<Data, URLError> {
        urls.publisher
            .setFailureType(to: URLError.self)
            .flatMap(maxPublishers: .max(1)) {
                urlSession.dataTaskPublisher(for: $0)
            }
            .map(\.data)
            .eraseToAnyPublisher()
    }

    보시면 기존 익숙한 AnyPublisher를 반환하는 Combine 코드가 있습니다.

    위 메서드가 반환하는 AnyPublisher를 단순히 AsyncSequence로 변환하려면 값 속성에 접근하기만 하면 됩니다.

    나머지는 시스템이 다 알아서 처리해줍니다👏

     

    let publisher = remoteDataPublisher(forURLs: urls)
    
    for try await data in publisher.values {
        ...
    }

    짜잔~~

    매우 깔끔해졌죠?

    단순히 값 속성인 values로 접근만하면 AsyncSequence로 변환되어 사용할 수 있습니다.

    그렇기에 기존 Combine 코드도 이렇게 붙여주어 Swift Concurrency의 비동기 스트림으로 변환해 편리하게 사용할 수 있죠!

     

    결론

    AsyncStream과 AsyncSequence를 기존 Combine과 잘 혼합해봐요🙌

     

    마무리

    그저 갓.... values로 값 속성만 엑세스 해주면 변한다니..

    애플 갓입니다.

     

    [참고 자료]

    https://www.swiftbysundell.com/articles/async-sequences-streams-and-combine/

     

    Async sequences, streams, and Combine | Swift by Sundell

    When iterating over any Swift collection using a standard for loop, there are two key components that decide what elements that will be passed into our iteration code — a sequence, and an iterator. For example, Swift’s standard Array type is a sequence

    www.swiftbysundell.com

Designed by Tistory.