Flow 처리 (Flow Processing)
Flow 생성과 최종 연산 사이의 연산
플로우 처리를 위해 사용하는 함수들
변환 연산자, 필터링 연산자, 결합 연산자, 컬렉션 연산자 등등..
변환 연산자
map
transform 사용 / 항상 하나의 값만 emit
public inline fun <T, R> Flow<T>.map(
crossinline transform: suspend (value: T) -> R
): Flow<R> = transform { value ->
return@transform emit(transform(value))
}
사용 예시
suspend fun main() {
flowOf(1, 2, 3)
.map { it * it }
.collect { print(it) }
}
// 1 4 9
fold
초기값부터 시작해서 각 요소에 대해 왼쪽에서 오른쪽으로 2개의 값을 하나로 합치는 최종 연산자
public inline fun <T, R> Iterable<T>.fold(
initial: R,
operation: (acc: R, T) -> R
): R {
var accumulator = initial
for (element in this) accumulator = operation(accumulator, element)
return accumulator
}
사용 예시
suspend fun main() {
val list = listOf(1, 2, 3, 4)
val res = list.fold(0) { acc, i -> acc + i }
println(res)
val res2 = list.fold(1) { acc , i -> acc * i }
println(res2)
val flowList = flowOf(1, 2, 3, 4)
.onEach { delay(1000) }
val flowRes = flowList.fold(0) { acc, i -> acc + i }
println(flowRes
}
//
10
24
(4초 후)
10
scan
누적되는 과정의 모든 값을 생성하는 중간 연산자
fold와 비슷하지만, 중간 결과를 모두 emit
public fun <T, R> Flow<T>.scan(
initial: R,
@BuilderInference operation: suspend (accumulator: R, value: T) -> R
): Flow<R> = runningFold(initial, operation)
public fun <T, R> Flow<T>.runningFold(
initial: R,
@BuilderInference operation: suspend (accumulator: R, value: T) -> R
): Flow<R> = flow {
var accumulator: R = initial
emit(accumulator)
collect { value ->
accumulator = operation(accumulator, value)
emit(accumulator)
}
}
사용 예시
suspend fun main() {
val list = listOf(1, 2, 3, 4)
val res = list.scan(0) { acc, i -> acc + i }
println(res)
flowOf(1, 2, 3, 4)
.onEach { delay(1000) }
.scan(0) { acc, v -> acc + v }
.collect { println(it) }
}
//
[0, 1, 3, 6, 10]
0
(1초 후)
1
(1초 후)
3
(1초 후)
6
(1초 후)
10
Flow에서는 동시성 문제로 인해 flatMap 함수 대신 상황에 맞는 구체적인 변환 연산자 제공
flatMapConcat
생성된 플로우를 하나씩 처리
두 번째 Flow는 첫 번째 플로우가 완료되었을 때 시작
Flow의 각 값을 다른 Flow로 변환하고, 그 Flow들을 순차적으로 연결하여 평탄화
순차적으로 실행되어 이전 Flow가 완료될 때까지 다음 Flow 시작 안함
Flow 원소의 순서가 보장된다
public fun <T, R> Flow<T>.flatMapConcat(
transform: suspend (value: T) -> Flow<R>
): Flow<R> = map(transform).flattenConcat()
public fun <T> Flow<Flow<T>>.flattenConcat(): Flow<T> = flow {
collect { value -> emitAll(value) }
}
사용 예시
fun flowFrom(element: String) = flowOf(1, 2, 3)
.onEach { delay(1000) }
.map { "${it}_${element} " }
suspend fun main() {
flowOf("A", "B", "C")
.flatMapConcat { flowFrom(it) }
.collect { println(it) }
}
//
(1초 후)
1_A
(1초 후)
2_A
(1초 후)
3_A
(1초 후)
1_B
(1초 후)
2_B
(1초 후)
3_B
(1초 후)
1_C
(1초 후)
2_C
(1초 후)
3_C
flatMapMerge
만들어진 플로우를 동시에 처리.
Flow의 각 값을 다른 Flow로 변환하고, 이 Flow들을 동시에(concurrent) 실행하여 결과를 병합
동시성 제한 가능 (concurrency 파라미터)
순서 미보장
// concurrency로 동시 처리 가능한 플로우 수 설정 가능
public fun <T, R> Flow<T>.flatMapMerge(
concurrency: Int = DEFAULT_CONCURRENCY,
transform: suspend (value: T) -> Flow<R>
): Flow<R> =
map(transform).flattenMerge(concurrency)
public fun <T> Flow<Flow<T>>.flattenMerge(concurrency: Int = DEFAULT_CONCURRENCY): Flow<T> {
require(concurrency > 0) { "Expected positive concurrency level, but had $concurrency" }
return if (concurrency == 1) flattenConcat() else ChannelFlowMerge(this, concurrency)
}
사용 예시
fun flowFrom(element: String) = flowOf(1, 2, 3)
.onEach { delay(1000) }
.map { "${it}_${element} " }
suspend fun main() {
flowOf("A", "B", "C")
.flatMapMerge { flowFrom(it) }
.collect { println(it) }
}
//
(1초 후)
1_A
1_B
1_C
(1초 후)
2_A
2_B
2_C
(1초 후)
3_A
3_C
3_B
flatMapLatest
새로운 플로우가 나타나면 이전 플로우의 처리는 무시
Flow가 새 값을 emit할 때마다 이전에 진행 중이던 Flow를 취소하고 새로운 Flow로 전환
항상 최신 값에 대한 결과만 유지
실시간 검색이나 자동완성, 위치 업데이트, 최신 데이터만 필요한 경우에 사용
public inline fun <T, R> Flow<T>.flatMapLatest(
@BuilderInference crossinline transform: suspend (value: T) -> Flow<R>
): Flow<R> = transformLatest { emitAll(transform(it)) }
사용 예시 1
fun flowFrom(element: String) = flowOf(1, 2, 3)
.onEach { delay(1000) }
.map { "${it}_${element} " }
suspend fun main() {
flowOf("A", "B", "C")
.flatMapLatest { flowFrom(it) }
.collect { println(it) }
}
//
(1초 후)
1_C
(1초 후)
2_C
(1초 후)
3_C
사용 예시 2
fun flowFrom(element: String) = flowOf(1, 2, 3)
.onEach { delay(1000) }
.map { "${it}_${element} " }
suspend fun main() {
flowOf("A", "B", "C")
.onEach { delay(1200) }
.flatMapLatest { flowFrom(it) }
.collect { println(it) }
}
//
(2.2초 후)
1_A
(1.2초 후)
1_B
(1.2초 후)
1_C
(1초 후)
2_C
(1초 후)
3_C
필터링 연산자
filter
주어진 조건에 맞는 값을 가진 Flow 반환
public inline fun <T> Flow<T>.filter(
crossinline predicate: suspend (T) -> Boolean
): Flow<T> = transform { value ->
if (predicate(value)) return@transform emit(value)
}
사용 예시
fun isEven(num: Int): Boolean = num % 2 == 0
suspend fun main() {
(1..6).asFlow() // [1, 2, 3, 4, 5, 6]
.filter { it <= 3 } // [1, 2, 3]
.filter { isEven(it) } // [2]
.collect { print(it) } // [2]
}
// 2
take
지정된 개수만큼만 값을 가져오고 나머지는 취소하는 연산자
// count만큼 collect하고 기존 Flow 취소
public fun <T> Flow<T>.take(count: Int): Flow<T> {
require(count > 0) { "Requested element count $count should be positive" }
return flow {
var consumed = 0
try {
collect { value ->
if (++consumed < count) {
return@collect emit(value)
} else {
return@collect emitAbort(value)
}
}
} catch (e: AbortFlowException) {
e.checkOwnership(owner = this)
}
}
}
사용 예시
suspend fun main() {
('A'..'Z').asFlow()
.take(3)
.collect { print(it) }
}
// ABC
drop
특정 수의 원소를 무시하는 연산자
public fun <T> Flow<T>.drop(count: Int): Flow<T> {
require(count >= 0) { "Drop count should be non-negative, but had $count" }
return flow {
var skipped = 0
collect { value ->
if (skipped >= count) emit(value) else ++skipped
}
}
}
사용 예시
suspend fun main() {
('A'..'Z').asFlow()
.drop(23)
.collect { print(it) }
}
// XYZ
distinctUntilChanged (중복 제거 함수 )
연속된 중복 값을 필터링하는 연산자. 바로 이전의 원소와 동일한 원소만 제거
StateFlow에는 이미 이 기능이 내장되어 있다
public fun <T> Flow<T>.distinctUntilChanged(): Flow<T> =
when (this) {
is StateFlow<*> -> this // state flows are always distinct
else -> distinctUntilChangedBy(keySelector = defaultKeySelector, areEquivalent = defaultAreEquivalent)
}
사용 예시 1
suspend fun main() {
flowOf(1, 2, 2, 3, 2, 1, 1, 3)
.distinctUntilChanged()
.collect { print(it) }
}
// 123213
사용 예시 2
// data class 라 equals로 인해 동일한 객체로 판별
data class User(val id: Int, val name: String)
suspend fun main() {
flowOf(
User(1, "Park"),
User(1, "Park"),
User(1, "Kim"),
User(2, "Kim")
)
.distinctUntilChanged()
.collect { println(it) }
}
//
User(id=1, name=Park)
User(id=1, name=Kim)
User(id=2, name=Kim)
distinctUntilChangedBy
특정 키를 기준으로 연속된 중복을 필터링하는 연산자
public fun <T, K> Flow<T>.distinctUntilChangedBy(keySelector: (T) -> K): Flow<T> =
distinctUntilChangedBy(keySelector = keySelector, areEquivalent = defaultAreEquivalent)
사용 예시
data class User(val id: Int, val name: String) {
override fun toString(): String = "[$id] $name"
}
suspend fun main() {
val users = flowOf(
User(1, "Park"),
User(1, "Kim"),
User(2, "Kim"),
User(2, "Lee")
)
println(users.distinctUntilChangedBy { it.id }.toList())
println(users.distinctUntilChangedBy { it.name }.toList())
println(users.distinctUntilChanged { prev, next ->
prev.id == next.id || prev.name == next.name
}.toList())
}
//
[[1] Park, [2] Kim]
[[1] Park, [1] Kim, [2] Lee]
[[1] Park, [2] Kim]
결합 연산자
merge
여러 Flow를 하나의 Flow로 합치는 연산자
동시에 모든 flow를 수집하고 순서는 보장하지 않음
public fun <T> merge(vararg flows: Flow<T>): Flow<T> =
flows.asIterable().merge()
사용 예시 1
suspend fun main() {
val together: Flow<Number> = merge(
flowOf(1, 2, 3),
flowOf(0.1, 0.2, 0.3)
)
print(together.toList())
}
//
[1, 2, 3, 0.1, 0.2, 0.3]
[1, 0.1, 0.2, 0.3, 2, 3]
등의 가능한 조합 중 하나
사용 예시 2
suspend fun main() {
val together: Flow<Number> = merge(
flowOf(1, 2, 3).onEach { delay(100) },
flowOf(0.1, 0.2, 0.3)
)
print(together.toList())
}
//
[0.1, 0.2, 0.3, 1, 2, 3]
zip
두 Flow로부터 쌍을 만드는 연산자
하나의 Flow가 완료되면 전체 Flow가 완료되고 남은 Flow 원소는 유실
public fun <T1, T2, R> Flow<T1>.zip(
other: Flow<T2>,
transform: suspend (T1, T2) -> R
): Flow<R> = zipImpl(this, other, transform)
사용 예시
suspend fun main() {
val flow1 = flowOf("A", "B", "C").onEach { delay(400) }
val flow2 = flowOf(1, 2, 3, 4).onEach { delay(1000) }
flow1.zip(flow2) { f1, f2 ->
"${f1}_${f2}"
}.collect { println(it) }
}
//
(1초 후)
A_1
(1초 후)
B_2
(1초 후)
C_3
combine
여러 Flow의 가장 최근 값들을 결합하는 연산자 (각 flow에서 최근 값만을 유지)
한 Flow의 값이 새로 emit되면 다른 Flow의 최신 값과 결합
순서나 쌍이 정해져 있지 않음
모든 flow에서 최소 하나의 값이 emit되어야 결합 시작
public fun <T1, T2, R> Flow<T1>.combine(
flow: Flow<T2>,
transform: suspend (a: T1, b: T2) -> R
): Flow<R> = flow {
combineInternal(
arrayOf(this@combine, flow),
nullArrayFactory(),
{ emit(transform(it[0] as T1, it[1] as T2)) }
)
}
사용 예시
suspend fun main() {
val flow1 = flowOf("A", "B", "C").onEach { delay(400) }
val flow2 = flowOf(1, 2, 3, 4).onEach { delay(1000) }
flow1.combine(flow2) { f1, f2 ->
"${f1}_${f2}"
}.collect { println(it) }
}
//
(1초 후)
B_1
(0.2초 후)
C_1
(0.8초 후)
C_2
(1초 후)
C_3
(1초 후)
C_4
예외 연산자
retry
업스트림 Flow에서 예외가 발생했을 때 지정된 횟수만큼 재시도를 하는 연산자
업스트림 예외만 재시도 (다운스트림 예외는 처리 안함)
public fun <T> Flow<T>.retry(
retries: Long = Long.MAX_VALUE,
predicate: suspend (cause: Throwable) -> Boolean = { true }
): Flow<T> {
require(retries > 0) { "Expected positive amount of retries, but had $retries" }
return retryWhen { cause, attempt -> attempt < retries && predicate(cause) }
}
public fun <T> Flow<T>.retryWhen(
predicate: suspend FlowCollector<T>.(cause: Throwable, attempt: Long) -> Boolean): Flow<T> =
flow {
var attempt = 0L
var shallRetry: Boolean
do {
shallRetry = false
val cause = catchImpl(this)
if (cause != null) {
if (predicate(cause, attempt)) {
shallRetry = true
attempt++
} else {
throw cause
}
}
} while (shallRetry)
}
사용 예시 1
suspend fun main() {
flow {
emit(1)
emit(2)
error("E")
emit(3)
}.retry(3) {
print(it.message)
true
}.collect { print(it) }
}
//
12E12E12E12 Exception in thread "main" java.lang.IllegalStateException: E
사용 예시 2
flow {
emit(api.fetchData())
}.retry(retries = 3) { cause ->
// NetworkException일 때만 재시도
cause is NetworkException
}
'코루틴' 카테고리의 다른 글
[코루틴-Flow] StateFlow (상태플로우) (1) | 2024.12.15 |
---|---|
[코루틴-Flow] SharedFlow (공유플로우) (0) | 2024.12.12 |
[코루틴-Flow] 생명주기 함수, empty, catch, flowOn, launchIn (0) | 2024.12.06 |
[코루틴] Flow 생성 (0) | 2024.12.01 |
[코루틴] Flow 원리 (0) | 2024.11.28 |