Flow(플로우)
코루틴을 기반으로 동작하는 코틀린의 비동기 스트림을 다루기 위한 API
순차적으로 값을 방출
Cold Stream으로 collect 함수 호출 시 데이터 발행
Flow 인터페이스
/**
* 값을 순차적으로 내보내고 정상적으로 또는 예외와 함께 완료되는 비동기 데이터 스트림
*/
public interface Flow<out T> {
// Flow의 값을 수집하는 함수
// 일시중단 함수로 코루틴 내에서 실행되어야 한다
public suspend fun collect(collector: FlowCollector<T>)
}
List vs Sequence vs Flow
List
모든 원소의 계산이 완료된 Collection
작은 데이터 크기에 적합
// 한 번에 모든 값을 만드는 List
fun allUsers(): List<User> =
api.getAllUsers().map { it.toUser() }
// 모든 값이 생성되길 기다림
fun getList(): List<String> = List(3) {
Thread.sleep(1000)
"User$it"
}
fun main() {
val list = getList()
println("Function Started")
list.forEach { println(it) }
}
// 결과
(3초 후)
Function Started
User0
User1
User2
Sequence
Cold Stream으로 필요할 때마다 값을 계산
큰 데이터 크기에 적합
하나씩 원소를 계산할 때 유용하며 동기적으로 처리
fun getSequence(): Sequence<String> = sequence {
repeat(3) {
Thread.sleep(1000)
yield("User$it")
}
}
fun main() {
val sequence = getSequence()
println("Function Started")
sequence.forEach { println(it) }
}
// 결과
Function Started
(1초 후)
User0
(1초 후)
User1
(1초 후)
User2
// 동기적 처리
val sequence = sequence {
yield(1)
Thread.sleep(1000) // 동기적 처리 - 스레드 블로킹
yield(2)
Thread.sleep(1000)
yield(3)
}
데이터 소스의 갯수가 많을 때
원소를 필요할 때만 계산하거나 읽는 지연 연산을 하게 되는 상황에서 사용하면 유용
val fibonacci: Sequence<BigInteger> = sequence {
var first = 0.toBigInteger()
var second = 1.toBigInteger()
while (true) {
yield(first)
val temp = first
first += second
second = temp
}
}
fun main() {
// 무한 sequence에서 필요한 만큼만 계산
val take5 = fibonacci.take(5).toList()
val take10 = fibonacci.take(10).toList()
val take10To20 = fibonacci.drop(10).take(10).toList()
val takeEven = fibonacci.filter { it % 2.toBigInteger() == 0.toBigInteger() }
println(take5)
println(take10)
println(take10To20)
println(takeEven.take(5).toList())
}
// 결과
[0, 1, 1, 2, 3]
[0, 1, 1, 2, 3, 5, 8, 13, 21, 34]
[55, 89, 144, 233, 377, 610, 987, 1597, 2584, 4181]
[0, 2, 8, 34, 144]
sequence에서는
빌더 내부에 중단점이 있으면 값을 기다리는 스레드가 블로킹된다 (blocking)
SequenceScope의 리시버에서 호출되는 함수 (yield, yieldAll) 외에 다른 중단 함수를 사용할 수 없다
fun getSequence(): Sequence<String> = sequence {
repeat(3) {
delay(1000)
yield("User$it")
}
}
// 컴파일 에러
Restricted suspending functions can only invoke member or
extension suspending functions on their restricted coroutine scope
sequence를 잘못 사용한 경우
fun getSequence(): Sequence<String> = sequence {
repeat(3) {
Thread.sleep(1000) // 스레드를 1초간 블록 -> 같은 스레드에서 launch로 시작된 코루틴이 대기
yield("User$it")
}
}
suspend fun main() {
withContext(newSingleThreadContext("main")) {
// 코루틴 시작
launch {
repeat(3) {
delay(100) // non-blocking delay
println("Processing on coroutine")
}
}
// 블로킹 시퀀스 처리 시작
val list = getSequence()
list.forEach { println(it) } // blocking 연산이 된다
}
}
// 결과
(1초 후)
User0
(1초 후)
User1
(1초 후)
User2
Processing on coroutine
(0.1초 후)
Processing on coroutine
(0.1초 후)
Processing on coroutine
위 상황 같은 경우 Flow 사용
fun getFlow(): Flow<String> = flow {
repeat(3) {
delay(1000) // non-blocking delay, 중단점
emit("User$it")
}
}
suspend fun main() {
withContext(newSingleThreadContext("main")) {
// 1. launch 코루틴 시작
launch {
repeat(3) {
delay(100) // 중단점 (다른 코루틴이 실행 가능)
println("Processing on coroutine")
}
}
// 2. Flow 수집 시작
val list = getFlow()
// collect시 getFlow 함수의 delay 일시중단 함수로 인해
// main의 launch 코루틴이 실행될 기회를 얻음
list.collect { println(it) }
}
}
// 결과
(0.1초 후)
Processing on coroutine
(0.1초 후)
Processing on coroutine
(0.1초 후)
Processing on coroutine
(1 - 3 * 0.1 = 0.7초 후)
User0
(1초 후)
User1
(1초 후)
User2
Flow의 특징
코루틴 지원, 비동기적으로 계산
Flow 빌더와 연산은 중단 함수
Flow는 취소 가능하다
Flow 최종 연산은 중단 가능하며 CoroutineContext의 전파가 가능하다
fun usersFlow(): Flow<String> = flow {
repeat(3) {
delay(1000)
val ctx = currentCoroutineContext()
val name = ctx[CoroutineName]?.name // 상위 코루틴의 이름이 전파됨
emit("User$it in $name")
}
}
suspend fun main() {
val users = usersFlow()
withContext(CoroutineName("Name")) { // 코루틴 컨텍스트 설정
val job = launch {
users.collect { println(it) }
}
launch {
delay(2100)
println("I got enough")
job.cancel() // Flow collect 취소
}
}
}
// 결과
(1초 후)
User0 in Name
(1초 후)
User1 in Name
(0.1초 후)
I got enough
Flow 구성 요소
시작연산
Flow 빌더, 다른 객체에서의 변환, 헬퍼 함수로부터 시작
중간연산
플로우를 변경
최종연산
중단 가능하거나 scope를 필요로 하는 유일한 연산
주로 collect가 최종연산 (다른 최종연산도 존재)
suspend fun main() {
flow { emit("Message 1") } // Flow Builder
.onEach { println(it) } // 중간 연산 (intermediate operation)
.onStart { println("Do Before") } // 중간 연산 (intermediate operation)
.onCompletion { println("Do After") } // 중간 연산 (intermediate operation)
.catch { emit("Error") } // 중간 연산 (intermediate operation)
.collect { println("Collected $it") } // 최종 연산
}
'코루틴' 카테고리의 다른 글
[코루틴] Flow 생성 (0) | 2024.12.01 |
---|---|
[코루틴] Flow 원리 (0) | 2024.11.28 |
[코루틴] 핫 데이터와 콜드 데이터 (0) | 2024.11.23 |
[코루틴] select (0) | 2024.11.18 |
[코루틴] Channel (채널) (0) | 2024.11.18 |