본문 바로가기

코루틴

[코루틴] Flow란 - (1)

 

Flow(플로우)

코루틴을 기반으로 동작하는 코틀린의 비동기 스트림을 다루기 위한 API
순차적으로 값을 방출
Cold Stream으로 collect 함수 호출 시 데이터 발행

 

 

Flow 인터페이스

/**
* 값을 순차적으로 내보내고 정상적으로 또는 예외와 함께 완료되는 비동기 데이터 스트림
*/
public interface Flow<out T> {
    // Flow의 값을 수집하는 함수
    // 일시중단 함수로 코루틴 내에서 실행되어야 한다
    public suspend fun collect(collector: FlowCollector<T>)
}

 

 

 

List vs Sequence vs Flow

List

모든 원소의 계산이 완료된 Collection

작은 데이터 크기에 적합

// 한 번에 모든 값을 만드는 List
fun allUsers(): List<User> = 
    api.getAllUsers().map { it.toUser() }


// 모든 값이 생성되길 기다림
fun getList(): List<String> = List(3) {
    Thread.sleep(1000)
    "User$it"
}

fun main() {
    val list = getList()
    println("Function Started")
    list.forEach { println(it) }
}
// 결과
(3초 후)
Function Started
User0
User1
User2

 

 

Sequence

Cold Stream으로 필요할 때마다 값을 계산

큰 데이터 크기에 적합

하나씩 원소를 계산할 때 유용하며 동기적으로 처리

fun getSequence(): Sequence<String> = sequence {
    repeat(3) {
        Thread.sleep(1000)
        yield("User$it")
    }
}

fun main() {
    val sequence = getSequence()
    println("Function Started")
    sequence.forEach { println(it) }
}
// 결과
Function Started
(1초 후)
User0
(1초 후)
User1
(1초 후)
User2


// 동기적 처리
val sequence = sequence {
    yield(1)
    Thread.sleep(1000)    // 동기적 처리 - 스레드 블로킹
    yield(2)
    Thread.sleep(1000)
    yield(3)
}

 

 

데이터 소스의 갯수가 많을 때

원소를 필요할 때만 계산하거나 읽는 지연 연산을 하게 되는 상황에서 사용하면 유용

val fibonacci: Sequence<BigInteger> = sequence {
    var first = 0.toBigInteger()
    var second = 1.toBigInteger()
    while (true) {
        yield(first)
        val temp = first
        first += second
        second = temp
    }
}

fun main() {
    // 무한 sequence에서 필요한 만큼만 계산
    val take5 = fibonacci.take(5).toList()
    val take10 = fibonacci.take(10).toList()
    val take10To20 = fibonacci.drop(10).take(10).toList()
    val takeEven = fibonacci.filter { it % 2.toBigInteger() == 0.toBigInteger() }
    println(take5)
    println(take10)
    println(take10To20)
    println(takeEven.take(5).toList())
}
// 결과
[0, 1, 1, 2, 3]
[0, 1, 1, 2, 3, 5, 8, 13, 21, 34]
[55, 89, 144, 233, 377, 610, 987, 1597, 2584, 4181]
[0, 2, 8, 34, 144]

 

 

sequence에서는

빌더 내부에 중단점이 있으면 값을 기다리는 스레드가 블로킹된다 (blocking)

SequenceScope의 리시버에서 호출되는 함수 (yield, yieldAll) 외에 다른 중단 함수를 사용할 수 없다

fun getSequence(): Sequence<String> = sequence {
    repeat(3) {
        delay(1000)
        yield("User$it")
    }
}
// 컴파일 에러
Restricted suspending functions can only invoke member or 
extension suspending functions on their restricted coroutine scope

 

 

sequence를 잘못 사용한 경우

fun getSequence(): Sequence<String> = sequence {
    repeat(3) {
        Thread.sleep(1000) // 스레드를 1초간 블록 -> 같은 스레드에서 launch로 시작된 코루틴이 대기
        yield("User$it")
    }
}

suspend fun main() {
    withContext(newSingleThreadContext("main")) {
        // 코루틴 시작
        launch {
            repeat(3) {
                delay(100) // non-blocking delay
                println("Processing on coroutine")
            }
        }

        // 블로킹 시퀀스 처리 시작
        val list = getSequence()
        list.forEach { println(it) } // blocking 연산이 된다
    }
}
// 결과
(1초 후)
User0
(1초 후)
User1
(1초 후)
User2
Processing on coroutine
(0.1초 후)
Processing on coroutine
(0.1초 후)
Processing on coroutine

 

 

위 상황 같은 경우 Flow 사용

fun getFlow(): Flow<String> = flow {
    repeat(3) {
        delay(1000) // non-blocking delay, 중단점
        emit("User$it")
    }
}

suspend fun main() {
    withContext(newSingleThreadContext("main")) {
        // 1. launch 코루틴 시작
        launch {
            repeat(3) {
                delay(100) // 중단점 (다른 코루틴이 실행 가능)
                println("Processing on coroutine")
            }
        }

        // 2. Flow 수집 시작
        val list = getFlow()
        // collect시 getFlow 함수의 delay 일시중단 함수로 인해 
        // main의 launch 코루틴이 실행될 기회를 얻음
        list.collect { println(it) }
    }
}
// 결과
(0.1초 후)
Processing on coroutine
(0.1초 후)
Processing on coroutine
(0.1초 후)
Processing on coroutine
(1 - 3 * 0.1 = 0.7초 후)
User0
(1초 후)
User1
(1초 후)
User2

 

 

Flow의 특징

코루틴 지원, 비동기적으로 계산
Flow 빌더와 연산은 중단 함수
Flow는 취소 가능하다
Flow 최종 연산은 중단 가능하며 CoroutineContext의 전파가 가능하다

 

fun usersFlow(): Flow<String> = flow {
    repeat(3) {
        delay(1000)
        val ctx = currentCoroutineContext()
        val name = ctx[CoroutineName]?.name // 상위 코루틴의 이름이 전파됨
        emit("User$it in $name")
    }
}

suspend fun main() {
    val users = usersFlow()

    withContext(CoroutineName("Name")) { // 코루틴 컨텍스트 설정
        val job = launch {
            users.collect { println(it) }
        }
        launch {
            delay(2100)
            println("I got enough")
            job.cancel() // Flow collect 취소
        }
    }
}
// 결과
(1초 후)
User0 in Name
(1초 후)
User1 in Name
(0.1초 후)
I got enough

 

 

Flow 구성 요소

 

시작연산

Flow 빌더, 다른 객체에서의 변환, 헬퍼 함수로부터 시작

 

중간연산

플로우를 변경

 

최종연산

중단 가능하거나 scope를 필요로 하는 유일한 연산

주로 collect가 최종연산 (다른 최종연산도 존재)

 

suspend fun main() {
    flow { emit("Message 1") }                  // Flow Builder
        .onEach { println(it) }                 // 중간 연산 (intermediate operation)
        .onStart { println("Do Before") }       // 중간 연산 (intermediate operation)
        .onCompletion { println("Do After") }   // 중간 연산 (intermediate operation)
        .catch { emit("Error") }                // 중간 연산 (intermediate operation)
        .collect { println("Collected $it") }   // 최종 연산
}

 

 

'코루틴' 카테고리의 다른 글

[코루틴] Flow 생성  (0) 2024.12.01
[코루틴] Flow 원리  (0) 2024.11.28
[코루틴] 핫 데이터와 콜드 데이터  (0) 2024.11.23
[코루틴] select  (0) 2024.11.18
[코루틴] Channel (채널)  (0) 2024.11.18