본문 바로가기

코루틴

[코루틴 - flow] Flow 처리 (중간 연산자)

 

Flow 처리 (Flow Processing)

Flow 생성과 최종 연산 사이의 연산

 

플로우 처리를 위해 사용하는 함수들

변환 연산자, 필터링 연산자, 결합 연산자, 컬렉션 연산자 등등..

 

 

변환 연산자

map

transform 사용 / 항상 하나의 값만 emit

public inline fun <T, R> Flow<T>.map(
    crossinline transform: suspend (value: T) -> R
): Flow<R> = transform { value ->
    return@transform emit(transform(value))
}

 

사용 예시

suspend fun main() {
    flowOf(1, 2, 3)
        .map { it * it }
        .collect { print(it) }
}
// 1 4 9

 

 

fold

초기값부터 시작해서 각 요소에 대해 왼쪽에서 오른쪽으로 2개의 값을 하나로 합치는 최종 연산자

public inline fun <T, R> Iterable<T>.fold(
    initial: R,
    operation: (acc: R, T) -> R
): R {
    var accumulator = initial
    for (element in this) accumulator = operation(accumulator, element)
    return accumulator
}

 

사용 예시

suspend fun main() {
    val list = listOf(1, 2, 3, 4)
    val res = list.fold(0) { acc, i -> acc + i }
    println(res)
    val res2 = list.fold(1) { acc , i -> acc * i }
    println(res2)

    val flowList = flowOf(1, 2, 3, 4)
        .onEach { delay(1000) }
    val flowRes = flowList.fold(0) { acc, i -> acc + i }
    println(flowRes
}
//
10
24
(4초 후)
10

 

 

scan

누적되는 과정의 모든 값을 생성하는 중간 연산자

fold와 비슷하지만, 중간 결과를 모두 emit

public fun <T, R> Flow<T>.scan(
    initial: R,
    @BuilderInference operation: suspend (accumulator: R, value: T) -> R
): Flow<R> = runningFold(initial, operation)

public fun <T, R> Flow<T>.runningFold(
    initial: R, 
    @BuilderInference operation: suspend (accumulator: R, value: T) -> R
): Flow<R> = flow {
    var accumulator: R = initial
    emit(accumulator)
    collect { value ->
        accumulator = operation(accumulator, value)
        emit(accumulator)
    }
}

 

사용 예시

suspend fun main() {
    val list = listOf(1, 2, 3, 4)
    val res = list.scan(0) { acc, i -> acc + i }
    println(res)

    flowOf(1, 2, 3, 4)
        .onEach { delay(1000) }
        .scan(0) { acc, v -> acc + v }
        .collect { println(it) }
}
//
[0, 1, 3, 6, 10]
0
(1초 후)
1
(1초 후)
3
(1초 후)
6
(1초 후)
10

 

 

Flow에서는 동시성 문제로 인해 flatMap 함수 대신 상황에 맞는 구체적인 변환 연산자 제공

 

flatMapConcat

생성된 플로우를 하나씩 처리

두 번째 Flow는 첫 번째 플로우가 완료되었을 때 시작

 

Flow의 각 값을 다른 Flow로 변환하고, 그 Flow들을 순차적으로 연결하여 평탄화

순차적으로 실행되어 이전 Flow가 완료될 때까지 다음 Flow 시작 안함

Flow 원소의 순서가 보장된다

public fun <T, R> Flow<T>.flatMapConcat(
    transform: suspend (value: T) -> Flow<R>
): Flow<R> = map(transform).flattenConcat()

public fun <T> Flow<Flow<T>>.flattenConcat(): Flow<T> = flow {
    collect { value -> emitAll(value) }
}

 

사용 예시

fun flowFrom(element: String) = flowOf(1, 2, 3)
    .onEach { delay(1000) }
    .map { "${it}_${element} " }

suspend fun main() {
    flowOf("A", "B", "C")
        .flatMapConcat { flowFrom(it) }
        .collect { println(it) }
}
//
(1초 후)
1_A 
(1초 후)
2_A 
(1초 후)
3_A 
(1초 후)
1_B 
(1초 후)
2_B 
(1초 후)
3_B 
(1초 후)
1_C 
(1초 후)
2_C 
(1초 후)
3_C

 

 

flatMapMerge

만들어진 플로우를 동시에 처리.

 

Flow의 각 값을 다른 Flow로 변환하고, 이 Flow들을 동시에(concurrent) 실행하여 결과를 병합

동시성 제한 가능 (concurrency 파라미터)

순서 미보장

// concurrency로 동시 처리 가능한 플로우 수 설정 가능
public fun <T, R> Flow<T>.flatMapMerge(
    concurrency: Int = DEFAULT_CONCURRENCY,
    transform: suspend (value: T) -> Flow<R>
): Flow<R> =
    map(transform).flattenMerge(concurrency)
    
public fun <T> Flow<Flow<T>>.flattenMerge(concurrency: Int = DEFAULT_CONCURRENCY): Flow<T> {
    require(concurrency > 0) { "Expected positive concurrency level, but had $concurrency" }
    return if (concurrency == 1) flattenConcat() else ChannelFlowMerge(this, concurrency)
}

 

사용 예시

fun flowFrom(element: String) = flowOf(1, 2, 3)
    .onEach { delay(1000) }
    .map { "${it}_${element} " }

suspend fun main() {
    flowOf("A", "B", "C")
        .flatMapMerge { flowFrom(it) }
        .collect { println(it) }
}
//
(1초 후)
1_A 
1_B 
1_C 
(1초 후)
2_A 
2_B 
2_C 
(1초 후)
3_A 
3_C 
3_B

 

 

flatMapLatest

새로운 플로우가 나타나면 이전 플로우의 처리는 무시

Flow가 새 값을 emit할 때마다 이전에 진행 중이던 Flow를 취소하고 새로운 Flow로 전환

항상 최신 값에 대한 결과만 유지

 

실시간 검색이나 자동완성, 위치 업데이트, 최신 데이터만 필요한 경우에 사용

public inline fun <T, R> Flow<T>.flatMapLatest(
    @BuilderInference crossinline transform: suspend (value: T) -> Flow<R>
): Flow<R> = transformLatest { emitAll(transform(it)) }

 

사용 예시 1

fun flowFrom(element: String) = flowOf(1, 2, 3)
    .onEach { delay(1000) }
    .map { "${it}_${element} " }

suspend fun main() {
    flowOf("A", "B", "C")
        .flatMapLatest { flowFrom(it) }
        .collect { println(it) }
}
//
(1초 후)
1_C
(1초 후)
2_C 
(1초 후)
3_C

 

사용 예시 2

fun flowFrom(element: String) = flowOf(1, 2, 3)
    .onEach { delay(1000) }
    .map { "${it}_${element} " }

suspend fun main() {
    flowOf("A", "B", "C")
        .onEach { delay(1200) }
        .flatMapLatest { flowFrom(it) }
        .collect { println(it) }
}
//
(2.2초 후)
1_A 
(1.2초 후)
1_B 
(1.2초 후)
1_C 
(1초 후)
2_C 
(1초 후)
3_C

 

 

 

필터링 연산자

filter

주어진 조건에 맞는 값을 가진 Flow 반환

public inline fun <T> Flow<T>.filter(
    crossinline predicate: suspend (T) -> Boolean
): Flow<T> = transform { value ->
    if (predicate(value)) return@transform emit(value)
}

 

사용 예시

fun isEven(num: Int): Boolean = num % 2 == 0

suspend fun main() {
    (1..6).asFlow()             // [1, 2, 3, 4, 5, 6]
        .filter { it <= 3 }     // [1, 2, 3]
        .filter { isEven(it) }  // [2]
        .collect { print(it) }  // [2]
}
// 2

 

 

take

지정된 개수만큼만 값을 가져오고 나머지는 취소하는 연산자

// count만큼 collect하고 기존 Flow 취소
public fun <T> Flow<T>.take(count: Int): Flow<T> {
    require(count > 0) { "Requested element count $count should be positive" }
    return flow {
        var consumed = 0
        try {
            collect { value ->
                if (++consumed < count) {
                    return@collect emit(value)
                } else {
                    return@collect emitAbort(value)
                }
            }
        } catch (e: AbortFlowException) {
            e.checkOwnership(owner = this)
        }
    }
}

 

사용 예시

suspend fun main() {
    ('A'..'Z').asFlow()
        .take(3)
        .collect { print(it) }
}
// ABC

 

 

drop

특정 수의 원소를 무시하는 연산자

public fun <T> Flow<T>.drop(count: Int): Flow<T> {
    require(count >= 0) { "Drop count should be non-negative, but had $count" }
    return flow {
        var skipped = 0
        collect { value ->
            if (skipped >= count) emit(value) else ++skipped
        }
    }
}

 

사용 예시

suspend fun main() {
    ('A'..'Z').asFlow()
        .drop(23)
        .collect { print(it) }
}
// XYZ

 

 

distinctUntilChanged (중복 제거 함수 )

연속된 중복 값을 필터링하는 연산자. 바로 이전의 원소와 동일한 원소만 제거

StateFlow에는 이미 이 기능이 내장되어 있다

public fun <T> Flow<T>.distinctUntilChanged(): Flow<T> =
    when (this) {
        is StateFlow<*> -> this // state flows are always distinct
        else -> distinctUntilChangedBy(keySelector = defaultKeySelector, areEquivalent = defaultAreEquivalent)
    }

 

사용 예시 1

suspend fun main() {
    flowOf(1, 2, 2, 3, 2, 1, 1, 3)
        .distinctUntilChanged()
        .collect { print(it) }
}
// 123213

 

사용 예시 2

// data class 라 equals로 인해 동일한 객체로 판별
data class User(val id: Int, val name: String)
suspend fun main() {
    flowOf(
        User(1, "Park"),
        User(1, "Park"),
        User(1, "Kim"),
        User(2, "Kim")
    )
        .distinctUntilChanged()
        .collect { println(it) }
}
//
User(id=1, name=Park)
User(id=1, name=Kim)
User(id=2, name=Kim)

 

 

distinctUntilChangedBy

특정 키를 기준으로 연속된 중복을 필터링하는 연산자

public fun <T, K> Flow<T>.distinctUntilChangedBy(keySelector: (T) -> K): Flow<T> =
    distinctUntilChangedBy(keySelector = keySelector, areEquivalent = defaultAreEquivalent)

 

사용 예시

data class User(val id: Int, val name: String) {
    override fun toString(): String = "[$id] $name"
}

suspend fun main() {
    val users = flowOf(
        User(1, "Park"),
        User(1, "Kim"),
        User(2, "Kim"),
        User(2, "Lee")
    )

    println(users.distinctUntilChangedBy { it.id }.toList())
    println(users.distinctUntilChangedBy { it.name }.toList())
    println(users.distinctUntilChanged { prev, next ->
        prev.id == next.id || prev.name == next.name
    }.toList())
}
//
[[1] Park, [2] Kim]
[[1] Park, [1] Kim, [2] Lee]
[[1] Park, [2] Kim]

 

 

 

결합 연산자

merge

여러 Flow를 하나의 Flow로 합치는 연산자

동시에 모든 flow를 수집하고 순서는 보장하지 않음

public fun <T> merge(vararg flows: Flow<T>): Flow<T> = 
   flows.asIterable().merge()

 

사용 예시 1

suspend fun main() {
    val together: Flow<Number> = merge(
        flowOf(1, 2, 3),
        flowOf(0.1, 0.2, 0.3)
    )
    print(together.toList())
}
//
[1, 2, 3, 0.1, 0.2, 0.3]
[1, 0.1, 0.2, 0.3, 2, 3]
등의 가능한 조합 중 하나

 

사용 예시 2

suspend fun main() {
    val together: Flow<Number> = merge(
        flowOf(1, 2, 3).onEach { delay(100) },
        flowOf(0.1, 0.2, 0.3)
    )
    print(together.toList())
}
// 
[0.1, 0.2, 0.3, 1, 2, 3]

 

 

zip

두 Flow로부터 쌍을 만드는 연산자

하나의 Flow가 완료되면 전체 Flow가 완료되고 남은 Flow 원소는 유실

public fun <T1, T2, R> Flow<T1>.zip(
    other: Flow<T2>,
    transform: suspend (T1, T2) -> R
): Flow<R> = zipImpl(this, other, transform)

 

사용 예시

suspend fun main() {
    val flow1 = flowOf("A", "B", "C").onEach { delay(400) }
    val flow2 = flowOf(1, 2, 3, 4).onEach { delay(1000) }
    flow1.zip(flow2) { f1, f2 ->
        "${f1}_${f2}"
    }.collect { println(it) }
}
// 
(1초 후)
A_1
(1초 후)
B_2
(1초 후)
C_3

 

 

combine

여러 Flow의 가장 최근 값들을 결합하는 연산자 (각 flow에서 최근 값만을 유지)

한 Flow의 값이 새로 emit되면 다른 Flow의 최신 값과 결합

순서나 쌍이 정해져 있지 않음

모든 flow에서 최소 하나의 값이 emit되어야 결합 시작

public fun <T1, T2, R> Flow<T1>.combine(
    flow: Flow<T2>,
    transform: suspend (a: T1, b: T2) -> R
): Flow<R> = flow {
    combineInternal(
        arrayOf(this@combine, flow),
        nullArrayFactory(),
        { emit(transform(it[0] as T1, it[1] as T2)) }
    )
}

 

사용 예시

suspend fun main() {
    val flow1 = flowOf("A", "B", "C").onEach { delay(400) }
    val flow2 = flowOf(1, 2, 3, 4).onEach { delay(1000) }
    flow1.combine(flow2) { f1, f2 ->
        "${f1}_${f2}"
    }.collect { println(it) }
}
// 
(1초 후)
B_1
(0.2초 후)
C_1
(0.8초 후)
C_2
(1초 후)
C_3
(1초 후)
C_4

 

 

예외 연산자 

retry

업스트림 Flow에서 예외가 발생했을 때 지정된 횟수만큼 재시도를 하는 연산자

업스트림 예외만 재시도 (다운스트림 예외는 처리 안함)

public fun <T> Flow<T>.retry(
    retries: Long = Long.MAX_VALUE,
    predicate: suspend (cause: Throwable) -> Boolean = { true }
): Flow<T> {
    require(retries > 0) { "Expected positive amount of retries, but had $retries" }
    return retryWhen { cause, attempt -> attempt < retries && predicate(cause) }
}

public fun <T> Flow<T>.retryWhen(
    predicate: suspend FlowCollector<T>.(cause: Throwable, attempt: Long) -> Boolean): Flow<T> =
    flow {
        var attempt = 0L
        var shallRetry: Boolean
        do {
            shallRetry = false
            val cause = catchImpl(this)
            if (cause != null) {
                if (predicate(cause, attempt)) {
                    shallRetry = true
                    attempt++
                } else {
                    throw cause
                }
            }
        } while (shallRetry)
    }

 

사용 예시 1

suspend fun main() {
    flow {
        emit(1)
        emit(2)
        error("E")
        emit(3)
    }.retry(3) {
        print(it.message)
        true
    }.collect { print(it) }
}
//
12E12E12E12 Exception in thread "main" java.lang.IllegalStateException: E

 

사용 예시 2

flow {
    emit(api.fetchData())
}.retry(retries = 3) { cause ->
    // NetworkException일 때만 재시도
    cause is NetworkException
}