본문 바로가기

코루틴

[코루틴-Flow] 생명주기 함수, empty, catch, flowOn, launchIn

 

Flow는

요청이 한쪽 방향으로 흐르고 요청에 의해 생성된 값이 다른 방향으로 흐르는 파이프

 

Flow의 확장함수를 통해 특정 이벤트를 감지할 수 있다

 

대표적인 Flow의 생명주기 함수는

onEach, onStart, onCompletion 등이 있다.

 

onEach

Flow의 값을 하나씩 받기 위해 사용

upstream flow의 각 값이 downstream으로 emit되기 전에 지정된 작업을 수행하는 연산자

public fun <T> Flow<T>.onEach(action: suspend (T) -> Unit): Flow<T> = transform { value ->
    action(value)
    return@transform emit(value)
}

 

사용 예시

suspend fun main0() {
    flowOf(1, 2, 3, 4)
        .onEach { 
            delay(1000)  // onEach 람다 -> 중단함수. delay 사용 가능
            print(it)
        }
        .collect()
}
// (1초후) 1 (1초후) 2 (1초후) 3 (1초후) 4

 

 

onStart

Flow의 수집이 시작되기 전에 실행되는 작업을 정의하는 연산자. 시작 시 호출되는 리스너 설정

onStart 내부에서 emit 함수 사용 가능

// action이 FlowCollector라 emit 사용 가능
public fun <T> Flow<T>.onStart(
    action: suspend FlowCollector<T>.() -> Unit
): Flow<T> = unsafeFlow { ... }

 

사용 예시

suspend fun main1() {
    flowOf(1, 2)
        .onStart {
            println("Before")
            emit(0)
        }
        .onEach {
            println("onEach")
            delay(1000)
        }
        .collect {
            println(it)
        }
}
// 
Before
onEach
0
onEach
1
onEach
2

 

 

onCompletion

Flow가 완료되거나 취소될 때 실행되는 작업을 정의하는 연산자

try-catch-finally의 finally와 비슷한 개념

catch와는 다르게 upstream, downstream의 모든 예외를 감지

cause가 null인 경우 정상 완료

public fun <T> Flow<T>.onCompletion(
    action: suspend FlowCollector<T>.(cause: Throwable?) -> Unit
): Flow<T> = unsafeFlow { ... }

 

사용 예시 1

suspend fun main() {
    flowOf(1, 2, 3)
        .onEach { delay(1000) }
        .onCompletion { println("Completed") }
        .collect { print(it) }
}
// 1 2 3 Completed

 

사용 예시 2

(코루틴 취소 시)

suspend fun main() = coroutineScope {
    val job = launch {
        flowOf(1, 2, 3)
            .onEach {
                println("onEach")
                delay(1000)
            }
            .onCompletion { println("Completed") }
            .collect { println(it) }
    }
    delay(1100)
    job.cancel()
}
// 
onEach
1
onEach
Completed

 

사용 예시 3

suspend fun main() {
    flow {
        emit(1)
        throw RuntimeException("Upstream error")
    }
        .catch { e ->
            println("Catch: $e")  // upstream 에러만 catch
        }
        .onCompletion { e ->
            println("Completion: $e")  // upstream, downstream 에러 모두 감지
        }
        .collect { value ->
            throw RuntimeException("Downstream error $value")  // catch에서 못 잡음
        }
}
// 
Completion: java.lang.RuntimeException: Downstream error 1
Exception in thread "main" java.lang.RuntimeException: Downstream error 1 에러 발생

 

사용 예시 4

(cause가 null인 경우)

suspend fun main() {
    flowOf(1)
        .onCompletion {
            if (it == null) {
                println("Completion: $it")
            }
        }
        .collect { value ->
            println(value)
        }
}
//
1
Completion: null

 

 

onEmpty

어떤 값도 emit하지 않고 완료될 때 실행되는 작업을 정의하는 연산자

public fun <T> Flow<T>.onEmpty(
    action: suspend FlowCollector<T>.() -> Unit
): Flow<T> = unsafeFlow { ... }

 

사용 예시 1

suspend fun main() {
    flow<List<Int>> { delay(1000) }
        .onEmpty { emit(emptyList()) }
        .collect { println(it) }
}
// []

 

사용 예시 2

suspend fun main() {
    flow<Int> {
        throw Exception("Error")
    }.onEmpty {
        println("not called") // 에러 때문에 실행되지 않음
    }.catch {
        println("catch called")
    }.collect()
}
// 
catch called

 

 

catch

Flow 실행 중 발생하는 예외를 처리하는 연산자
upstream 예외만 처리하고 downstream 예외는 처리하지 못한다

Flow 취소로 인한 예외는 잡지 않는다

public fun <T> Flow<T>.catch(action: suspend FlowCollector<T>.(cause: Throwable) -> Unit): Flow<T> =
    flow {
        val exception = catchImpl(this)
        if (exception != null) action(exception)
    }

 

사용 예시 1

onEach는 예외에 반응 X

onCompletion만 예외 발생 시 호출

class MyError : Throwable("My error")

suspend fun main() {
    val f = flow {
        emit(1)
        emit(2)
        throw MyError()
    }

    f.onEach { println("Got $it") }
        .catch { println("Caught $it") }
        .onCompletion { println("Completion") }
        .collect { println("Collected $it") }
}
// 
Got 1
Collected 1
Got 2
Collected 2
Caught MyError: My error
Completion

 

사용 예시 2

suspend fun main() {
    flowOf("Message1")
        .catch { emit("Error") }
        .collect { println("Collected $it") }
}
// Collected Error

 

사용 예시 3

catch 함수는 upstream 에서의 예외만 반응

suspend fun main5() {
    flowOf("Message1")
        .catch { emit("Error") }
        .onEach { throw Error(it) }
        .collect { println("Collected $it") }
}
// 
Exception in thread "main" java.lang.Error: Message1

 

 

잡히지 않은 예외 처리

Flow에서 catch되지 않은 예외는 Flow를 즉시 취소하고

collect는 다시 예외를 던진다

 

try - catch 사용

suspend fun main() {
    val flow = flow {
        emit("Message1")
        throw MyError()
    }

    try {
        flow.collect { println("Collected $it") }
    } catch (e: MyError) {
        println("Caught")
    }
}
//
Collected Message1
Caught

 

collect 마지막 연산 이후 catch가 올 수 없기 때문에

최종 연산에서 발생한 예외는 되도록 catch 이전에 두어 발생할 수 있도록 해야 한다

suspend fun main() {
    val flow = flow {
        emit("Message1")
        emit("Message2")
    }

    flow
        .onStart { println("Before") }
        .onEach { throw MyError() }
        .catch { println("Caught $it") }
        .collect()
}
//
Before
Caught MyError: My error

 

 

flowOn 연산자

플로우 연산은 모두 중단 함수라 Context가 필요하다

플로우가 Collect 되는 시점의 코루틴 Context를 가져온다

fun usersFlow(): Flow<String> = flow {
    repeat(2) {
        val ctx = currentCoroutineContext()
        val name = ctx[CoroutineName]?.name
        emit("User$it in $name")
    }
}

suspend fun main() {
    val users = usersFlow()
    withContext(CoroutineName("Name1")) {
        users.collect { println(it) }
    }
    withContext(CoroutineName("Name2")) {
        users.collect { println(it) }
    }
}
//
User0 in Name1
User1 in Name1
User0 in Name2
User1 in Name2

 

 

flowOn

flow가 실행되는 context를 변경하는 연산자

upstream 연산자에만 영향 / downstream으로 context가 전달 X

public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T> {
    checkFlowContext(context)
    return when {
        context == EmptyCoroutineContext -> this
        this is FusibleFlow -> fuse(context = context)
        else -> ChannelFlowOperatorImpl(this, context = context)
    }
}

 

사용 예시

upstream 연산자에만 동작

suspend fun present(
    place: String,
    message: String
) {
    val ctx = currentCoroutineContext()
    val name = ctx[CoroutineName]?.name
    println("[$name] $message on $place")
}

fun messagesFlow(): Flow<String> = flow {
    present("flow builder", "Message")
    emit("Message")
}

suspend fun main() {
    val users = messagesFlow()
    withContext(CoroutineName("Name1")) {
        users
            .flowOn(CoroutineName("Name3"))
            .onEach { present("onEach", it) }
            .flowOn(CoroutineName("Name2"))
            .collect { present("collect", it) }
    }
}
// 
[Name3] Message on flow builder
[Name2] Message on onEach
[Name1] Message on collect

 

 

launchIn

별도의 코루틴에서 Flow를 시작하기 위해 사용

Flow를 수집하는 새로운 코루틴을 시작하는 최종 연산자

non-blocking 함수

// Job 타입 리턴
// non-blocking
public fun <T> Flow<T>.launchIn(scope: CoroutineScope): Job = scope.launch {
    collect()
}

// 아래 코드의 축약형이 launchIn
scope.launch { 
    flow.collect()  // blocking
}

 

launchIn 사용 시

suspend fun main10() = coroutineScope {
    val flow = flowOf(1, 2)
    val job2 = flow
        .onEach { value ->
            delay(1000)
            println(value)
        }
        .launchIn(this)
    println("이 코드는 즉시 실행됨")
}
// 
이 코드는 즉시 실행됨
(1초 후)
1
(1초 후)
2

 

scope.launch 사용 시

suspend fun main() = coroutineScope {
    val flow = flowOf(1, 2)
    launch {
        flow
            .onEach { delay(1000) }
            .collect { value ->
                println(value)
            }
        println("flow가 완료될 때까지 실행되지 않음")
    }
}
//
(1초 후)
1
(1초 후)
2
flow가 완료될 때까지 실행되지 않음

 

'코루틴' 카테고리의 다른 글

[코루틴-Flow] SharedFlow (공유플로우)  (0) 2024.12.12
[코루틴 - flow] Flow 처리 (중간 연산자)  (1) 2024.12.09
[코루틴] Flow 생성  (0) 2024.12.01
[코루틴] Flow 원리  (0) 2024.11.28
[코루틴] Flow란 - (1)  (0) 2024.11.24