본문 바로가기

코루틴

Asynchronous Flow (비동기 플로우)

 

코틀린 공식 문서의 flow 문서를 번역해 보았습니다. 

https://kotlinlang.org/docs/flow.html

 

일시중단 함수는 비동기적으로 단일 값을 리턴하는데, 비동기적으로 계산된 복수의 값들은 어떻게 반환할 수 있을까?

코틀린 flow가 필요한 이유다

 

단일 값을 반환하는 일시 중단 함수

suspend fun getUser(): User {
    // 비동기 작업 후 단일 값(User) 반환
    return User()
}

 


복수의 값을 표현하는 방법

 

Collections 사용

fun simple(): List<Int> = listOf(1, 2, 3)
 
fun main() {
    simple().forEach { value -> print(value) } 
}
//
123

 

Sequence 사용

CPU 리소스를 사용하고 blocking을 하는 코드로 연산 시 Sequence 사용

fun simple(): Sequence<Int> = sequence {
    for (i in 1..3) {
        Thread.sleep(100)
        yield(i)
    }
}

fun main() {
    simple().forEach { value -> println(value) } 
}
// 
1
2
3

 

일시중단 함수

위 sequence는 메인 스레드를 blocking 한다

값들이 비동기로 계산된다면 blocking 시키지 않고 결과값을 리스트로 반환할 수 있도록 suspend로 표현할 수 있다

suspend fun simple(): List<Int> {
    delay(1000) // pretend we are doing something asynchronous here
    return listOf(1, 2, 3)
}

fun main() = runBlocking<Unit> {
    simple().forEach { value -> println(value) } 
}

 

Flows

List<Int> 타입의 결과값을 사용하면 값을 한 번에 반환해야 한다.

비동기적으로 계산되는 값들을 스트림으로 표현하기 위해서는 Flow 타입을 사용해야 한다

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100)
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    // Launch a concurrent coroutine to check if the main thread is blocked
    launch {
        for (k in 1..3) {
            println("I'm not blocked $k")
            delay(100)
        }
    }
    simple().collect { value -> println(value) } 
}
//
I'm not blocked 1
1
I'm not blocked 2
2
I'm not blocked 3
3

 

 

Flows are Cold (플로우는 차갑다)

Flow는 cold stream이다. 빌더 내부의 코드는 flow가 collect되기 전까지 실행되지 않는다

fun simple(): Flow<Int> = flow { 
    println("Flow started")
    for (i in 1..3) {
        delay(100)
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    println("Calling simple function...")
    val flow = simple()
    println("Calling collect...")
    flow.collect { value -> println(value) } 
    println("Calling collect again...")
    flow.collect { value -> println(value) } 
}
// 
Calling simple function...
Calling collect...
Flow started
1
2
3
Calling collect again...
Flow started
1
2
3

 

simple 함수에 suspend가 표시되지 않는 이유는, 함수 자체는 바로 반환되고 어떤 것도 기다리지 않는다.

collect 될때마다 새로 시작된다

 

 

Flow 취소 기초 (Flow cancellation Basics)

Flow는 코루틴의 협력적 취소(cooperative cancellation) 원칙을 따른다

일반적으로 cancellable suspend 함수에서 Flow가 일시중단 될 때 Flow 수집이 취소될 수 있다 

fun simple(): Flow<Int> = flow { 
    for (i in 1..3) {
        delay(100)          
        println("Emitting $i")
        emit(i)
    }
}

// 취소 가능한 withTimeoutOrNull 함수
fun main() = runBlocking<Unit> {
    withTimeoutOrNull(250) { // Timeout after 250ms 
        simple().collect { value -> println(value) } 
    }
    println("Done")
}
//
Emitting 1
1
Emitting 2
2
Done

 

 

Flow Builder (플로우 빌더)

 

flow 빌더

// 기본적인 빌더
flow { ... }

 

flowOf 빌더

// 정해진 값의 세트를 방출하는 Flow를 정의
flowOf { ... }

 

asFlow 확장 함수

확장함수를 사용해 다양한 Collection과 Sequence는 Flow로 변환

// Convert an integer range to a flow
(1..3).asFlow().collect { value -> println(value) }

 

 

 

Flow 중간 연산자

Flow는 중간 연산자를 사용해 변환될 수 있다

Upstream Flow에 적용되어 downstream Flow를 반환한다

Flow 중간 연산자 내부 코드 블록에서는 일시중단 함수를 호출 할 수 있다

suspend fun performRequest(request: Int): String {
    delay(1000) // 일시중단 함수
    return "response $request"
}

fun main() = runBlocking<Unit> {
    (1..3).asFlow()
        .map { request -> performRequest(request) }
        .collect { response -> println(response) }
}
//
response 1
response 2
response 3

 

Transform Operator (변환 연산자)

임의의 횟수만큼 값을 emit할 수 있다

public inline fun <T, R> Flow<T>.transform(
    @BuilderInference crossinline transform: suspend FlowCollector<R>.(value: T) -> Unit
): Flow<R> = flow {
    collect { value ->
        return@collect transform(value)
    }
}

 

예시

(오래걸리는 비동기 요청 전에 문자열 방출하는 코드)

suspend fun performRequest(request: Int): String {
    delay(1000)
    return "response $request"
}

fun main() = runBlocking<Unit> {
    (1..3).asFlow()
        .transform { request ->
            emit("Making request $request")
            emit(performRequest(request))
        }
        .collect { response -> println(response) }
}
//
Making request 1
response 1
Making request 2
response 2
Making request 3
response 3

 

 

Size-limiting operators (크기 한정 연산자)

take 같은 크기 한정 연산자들은 해당 조건에 도달했을 때 flow의 실행을 취소

코루틴에서 취소는 Exception을 던지는 것으로 수행되므로 try - finally 같은 기능들은 취소에서 정상적으로 동작한다

fun numbers(): Flow<Int> = flow {
    try {
        emit(1)
        emit(2)
        println("This line will not execute")
        emit(3)
    } finally {
        println("Finally in numbers")
    }
}

fun main() = runBlocking<Unit> {
    numbers()
        .take(2) // take only the first two
        .collect { value -> println(value) }
}
// 
1
2
Finally in numbers

 

 

Terminal Flow operators (Flow 최종 연산자)

최종 연산자는 Flow 수집을 시작하는 일시중단 함수이다

collect 연산자가 가장 기본 연산자이며 더 쉽게 사용할 수 있는 최종 연산자들도 있다

 

toList, toSet 연산자

다양한 Coollection으로 변환 수행

 

first 연산자

첫 값만 가져오기 위한 연산자

 

single 연산자

하나의 값만 방출되는 것을 확인

 

fold, reduce 연산자

flow를 값으로 줄임

 

fun main() = runBlocking<Unit> {
    val sum = (1..5).asFlow()
        .map { it * it } 
        .reduce { a, b -> a + b }
    println(sum)
}
// 55

 

 

Flows are sequential (Flow는 순차적이다)

여러 Flow에서 특수한 연산자를 사용하지 않는 한, 각 Flow는 순차적으로 동작한다

기본적으로 collection은 최종 연산자를 호출하는 Coroutine에서 동작한다

emit된 각 값들은 중간 연산자들에 의해 upstream에서 downstream으로 처리된 후, 최종 연산자에게 전달

fun main() = runBlocking<Unit> {
    (1..5).asFlow()
        .filter {
            println("Filter $it")
            it % 2 == 0
        }
        .map {
            println("Map $it")
            "string $it"
        }.collect {
            println("Collect $it")
        }
}
//
Filter 1
Filter 2
Map 2
Collect string 2
Filter 3
Filter 4
Map 4
Collect string 4
Filter 5

 

Flow Context

Flow 수집은 코루틴을 호출하는 Context에서 일어난다

withContext(context) {
    simple().collect { value ->
        println(value) // run in the specified context
    }
}

 

Flow의 이런 성질을 context preservation(문맥 보존)이라고 한다 

 

기본적으로 flow 빌더 내부의 코드는 해당 Flow의 collector가 제공하는 context에서 실행된다

simple 함수를 collect 하는 부분이 Main Thread에서 호출되기에, simple 함수의 flow 또한 MainThread에서 호출된다

즉, 어디서 collect하던지 그 context를 그대로 사용한다는 것이다

fun simple(): Flow<Int> = flow {
    println("${Thread.currentThread().name} Started simple flow")
    for (i in 1..3) {
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    simple().collect { value -> 
        println("${Thread.currentThread().name} Collected $value")
    }
}
// 
main Started simple flow
main Collected 1
main Collected 2
main Collected 3

 

 

WithContext를 사용하면 주의해야 할 점

WithContext는 코루틴을 사용하는 코드의 Context를 변경하는데 사용된다

CPU를 사용하는 오래 걸리는 작업의 경우 Default 디스패처 Context, UI 업데이트는 Main에서 실행되어야 할 수 있다

 

하지만 flow 빌더 코드는 Context 보존 성질로 인해 다른 Context에서 emit하는 것이 허용되지 않는다

fun simple(): Flow<Int> = flow {
    // flow builder에서의 잘못된 context change 방법
    withContext(Dispatchers.Default) {
        for (i in 1..3) {
            Thread.sleep(100) // CPU를 사용하는 계산인 것처럼 가정
            emit(i)
        }
    }
}

fun main() = runBlocking<Unit> {
    simple().collect { value -> println(value) }
}
// 
Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated
Please refer to 'flow' documentation or use 'flowOn' instead

 

 

flowOn 연산자

위 예제에서 flowOn 연산자를 사용하는 것을 권장한다

flowOn은 값 방출을 위한 Context를 변경하는데 사용된다

flowOn 연산자는 Flow의 기본적인 순차처리 성질을 변경하고 Context에서 CoroutineDispatcher를 변경해야 할 때 upstream Flow를 위한 다른 코루틴을 생성한다

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        Thread.sleep(100)
        println("${Thread.currentThread().name} Emitting $i")
        emit(i)
    }
}.flowOn(Dispatchers.Default) // flow 빌더에서 context를 변경하는 올바른 방법

fun main() = runBlocking<Unit> {
    simple().collect { value ->
        println("${Thread.currentThread().name} Collected $value")
    }
}
//
DefaultDispatcher-worker-1 Emitting 1
main Collected 1
DefaultDispatcher-worker-1 Emitting 2
main Collected 2
DefaultDispatcher-worker-1 Emitting 3
main Collected 3