Flow는
요청이 한쪽 방향으로 흐르고 요청에 의해 생성된 값이 다른 방향으로 흐르는 파이프
Flow의 확장함수를 통해 특정 이벤트를 감지할 수 있다
대표적인 Flow의 생명주기 함수는
onEach, onStart, onCompletion 등이 있다.
onEach
Flow의 값을 하나씩 받기 위해 사용
upstream flow의 각 값이 downstream으로 emit되기 전에 지정된 작업을 수행하는 연산자
public fun <T> Flow<T>.onEach(action: suspend (T) -> Unit): Flow<T> = transform { value ->
action(value)
return@transform emit(value)
}
사용 예시
suspend fun main0() {
flowOf(1, 2, 3, 4)
.onEach {
delay(1000) // onEach 람다 -> 중단함수. delay 사용 가능
print(it)
}
.collect()
}
// (1초후) 1 (1초후) 2 (1초후) 3 (1초후) 4
onStart
Flow의 수집이 시작되기 전에 실행되는 작업을 정의하는 연산자. 시작 시 호출되는 리스너 설정
onStart 내부에서 emit 함수 사용 가능
// action이 FlowCollector라 emit 사용 가능
public fun <T> Flow<T>.onStart(
action: suspend FlowCollector<T>.() -> Unit
): Flow<T> = unsafeFlow { ... }
사용 예시
suspend fun main1() {
flowOf(1, 2)
.onStart {
println("Before")
emit(0)
}
.onEach {
println("onEach")
delay(1000)
}
.collect {
println(it)
}
}
//
Before
onEach
0
onEach
1
onEach
2
onCompletion
Flow가 완료되거나 취소될 때 실행되는 작업을 정의하는 연산자
try-catch-finally의 finally와 비슷한 개념
catch와는 다르게 upstream, downstream의 모든 예외를 감지
cause가 null인 경우 정상 완료
public fun <T> Flow<T>.onCompletion(
action: suspend FlowCollector<T>.(cause: Throwable?) -> Unit
): Flow<T> = unsafeFlow { ... }
사용 예시 1
suspend fun main() {
flowOf(1, 2, 3)
.onEach { delay(1000) }
.onCompletion { println("Completed") }
.collect { print(it) }
}
// 1 2 3 Completed
사용 예시 2
(코루틴 취소 시)
suspend fun main() = coroutineScope {
val job = launch {
flowOf(1, 2, 3)
.onEach {
println("onEach")
delay(1000)
}
.onCompletion { println("Completed") }
.collect { println(it) }
}
delay(1100)
job.cancel()
}
//
onEach
1
onEach
Completed
사용 예시 3
suspend fun main() {
flow {
emit(1)
throw RuntimeException("Upstream error")
}
.catch { e ->
println("Catch: $e") // upstream 에러만 catch
}
.onCompletion { e ->
println("Completion: $e") // upstream, downstream 에러 모두 감지
}
.collect { value ->
throw RuntimeException("Downstream error $value") // catch에서 못 잡음
}
}
//
Completion: java.lang.RuntimeException: Downstream error 1
Exception in thread "main" java.lang.RuntimeException: Downstream error 1 에러 발생
사용 예시 4
(cause가 null인 경우)
suspend fun main() {
flowOf(1)
.onCompletion {
if (it == null) {
println("Completion: $it")
}
}
.collect { value ->
println(value)
}
}
//
1
Completion: null
onEmpty
어떤 값도 emit하지 않고 완료될 때 실행되는 작업을 정의하는 연산자
public fun <T> Flow<T>.onEmpty(
action: suspend FlowCollector<T>.() -> Unit
): Flow<T> = unsafeFlow { ... }
사용 예시 1
suspend fun main() {
flow<List<Int>> { delay(1000) }
.onEmpty { emit(emptyList()) }
.collect { println(it) }
}
// []
사용 예시 2
suspend fun main() {
flow<Int> {
throw Exception("Error")
}.onEmpty {
println("not called") // 에러 때문에 실행되지 않음
}.catch {
println("catch called")
}.collect()
}
//
catch called
catch
Flow 실행 중 발생하는 예외를 처리하는 연산자
upstream 예외만 처리하고 downstream 예외는 처리하지 못한다
Flow 취소로 인한 예외는 잡지 않는다
public fun <T> Flow<T>.catch(action: suspend FlowCollector<T>.(cause: Throwable) -> Unit): Flow<T> =
flow {
val exception = catchImpl(this)
if (exception != null) action(exception)
}
사용 예시 1
onEach는 예외에 반응 X
onCompletion만 예외 발생 시 호출
class MyError : Throwable("My error")
suspend fun main() {
val f = flow {
emit(1)
emit(2)
throw MyError()
}
f.onEach { println("Got $it") }
.catch { println("Caught $it") }
.onCompletion { println("Completion") }
.collect { println("Collected $it") }
}
//
Got 1
Collected 1
Got 2
Collected 2
Caught MyError: My error
Completion
사용 예시 2
suspend fun main() {
flowOf("Message1")
.catch { emit("Error") }
.collect { println("Collected $it") }
}
// Collected Error
사용 예시 3
catch 함수는 upstream 에서의 예외만 반응
suspend fun main5() {
flowOf("Message1")
.catch { emit("Error") }
.onEach { throw Error(it) }
.collect { println("Collected $it") }
}
//
Exception in thread "main" java.lang.Error: Message1
잡히지 않은 예외 처리
Flow에서 catch되지 않은 예외는 Flow를 즉시 취소하고
collect는 다시 예외를 던진다
try - catch 사용
suspend fun main() {
val flow = flow {
emit("Message1")
throw MyError()
}
try {
flow.collect { println("Collected $it") }
} catch (e: MyError) {
println("Caught")
}
}
//
Collected Message1
Caught
collect 마지막 연산 이후 catch가 올 수 없기 때문에
최종 연산에서 발생한 예외는 되도록 catch 이전에 두어 발생할 수 있도록 해야 한다
suspend fun main() {
val flow = flow {
emit("Message1")
emit("Message2")
}
flow
.onStart { println("Before") }
.onEach { throw MyError() }
.catch { println("Caught $it") }
.collect()
}
//
Before
Caught MyError: My error
flowOn 연산자
플로우 연산은 모두 중단 함수라 Context가 필요하다
플로우가 Collect 되는 시점의 코루틴 Context를 가져온다
fun usersFlow(): Flow<String> = flow {
repeat(2) {
val ctx = currentCoroutineContext()
val name = ctx[CoroutineName]?.name
emit("User$it in $name")
}
}
suspend fun main() {
val users = usersFlow()
withContext(CoroutineName("Name1")) {
users.collect { println(it) }
}
withContext(CoroutineName("Name2")) {
users.collect { println(it) }
}
}
//
User0 in Name1
User1 in Name1
User0 in Name2
User1 in Name2
flowOn
flow가 실행되는 context를 변경하는 연산자
upstream 연산자에만 영향 / downstream으로 context가 전달 X
public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T> {
checkFlowContext(context)
return when {
context == EmptyCoroutineContext -> this
this is FusibleFlow -> fuse(context = context)
else -> ChannelFlowOperatorImpl(this, context = context)
}
}
사용 예시
upstream 연산자에만 동작
suspend fun present(
place: String,
message: String
) {
val ctx = currentCoroutineContext()
val name = ctx[CoroutineName]?.name
println("[$name] $message on $place")
}
fun messagesFlow(): Flow<String> = flow {
present("flow builder", "Message")
emit("Message")
}
suspend fun main() {
val users = messagesFlow()
withContext(CoroutineName("Name1")) {
users
.flowOn(CoroutineName("Name3"))
.onEach { present("onEach", it) }
.flowOn(CoroutineName("Name2"))
.collect { present("collect", it) }
}
}
//
[Name3] Message on flow builder
[Name2] Message on onEach
[Name1] Message on collect
launchIn
별도의 코루틴에서 Flow를 시작하기 위해 사용
Flow를 수집하는 새로운 코루틴을 시작하는 최종 연산자
non-blocking 함수
// Job 타입 리턴
// non-blocking
public fun <T> Flow<T>.launchIn(scope: CoroutineScope): Job = scope.launch {
collect()
}
// 아래 코드의 축약형이 launchIn
scope.launch {
flow.collect() // blocking
}
launchIn 사용 시
suspend fun main10() = coroutineScope {
val flow = flowOf(1, 2)
val job2 = flow
.onEach { value ->
delay(1000)
println(value)
}
.launchIn(this)
println("이 코드는 즉시 실행됨")
}
//
이 코드는 즉시 실행됨
(1초 후)
1
(1초 후)
2
scope.launch 사용 시
suspend fun main() = coroutineScope {
val flow = flowOf(1, 2)
launch {
flow
.onEach { delay(1000) }
.collect { value ->
println(value)
}
println("flow가 완료될 때까지 실행되지 않음")
}
}
//
(1초 후)
1
(1초 후)
2
flow가 완료될 때까지 실행되지 않음
'코루틴' 카테고리의 다른 글
[코루틴-Flow] SharedFlow (공유플로우) (0) | 2024.12.12 |
---|---|
[코루틴 - flow] Flow 처리 (중간 연산자) (1) | 2024.12.09 |
[코루틴] Flow 생성 (0) | 2024.12.01 |
[코루틴] Flow 원리 (0) | 2024.11.28 |
[코루틴] Flow란 - (1) (0) | 2024.11.24 |