코루틴

[코루틴 - flow] Flow 처리 (중간 연산자)

kancho 2024. 12. 9. 09:27
반응형

 

Flow 처리 (Flow Processing)

Flow 생성과 최종 연산 사이의 연산

 

플로우 처리를 위해 사용하는 함수들

변환 연산자, 필터링 연산자, 결합 연산자, 컬렉션 연산자 등등..

 

 

변환 연산자

map

transform 사용 / 항상 하나의 값만 emit

public inline fun <T, R> Flow<T>.map(
    crossinline transform: suspend (value: T) -> R
): Flow<R> = transform { value ->
    return@transform emit(transform(value))
}

 

사용 예시

suspend fun main() {
    flowOf(1, 2, 3)
        .map { it * it }
        .collect { print(it) }
}
// 1 4 9

 

 

fold

초기값부터 시작해서 각 요소에 대해 왼쪽에서 오른쪽으로 2개의 값을 하나로 합치는 최종 연산자

public inline fun <T, R> Iterable<T>.fold(
    initial: R,
    operation: (acc: R, T) -> R
): R {
    var accumulator = initial
    for (element in this) accumulator = operation(accumulator, element)
    return accumulator
}

 

사용 예시

suspend fun main() {
    val list = listOf(1, 2, 3, 4)
    val res = list.fold(0) { acc, i -> acc + i }
    println(res)
    val res2 = list.fold(1) { acc , i -> acc * i }
    println(res2)

    val flowList = flowOf(1, 2, 3, 4)
        .onEach { delay(1000) }
    val flowRes = flowList.fold(0) { acc, i -> acc + i }
    println(flowRes
}
//
10
24
(4초 후)
10

 

 

scan

누적되는 과정의 모든 값을 생성하는 중간 연산자

fold와 비슷하지만, 중간 결과를 모두 emit

public fun <T, R> Flow<T>.scan(
    initial: R,
    @BuilderInference operation: suspend (accumulator: R, value: T) -> R
): Flow<R> = runningFold(initial, operation)

public fun <T, R> Flow<T>.runningFold(
    initial: R, 
    @BuilderInference operation: suspend (accumulator: R, value: T) -> R
): Flow<R> = flow {
    var accumulator: R = initial
    emit(accumulator)
    collect { value ->
        accumulator = operation(accumulator, value)
        emit(accumulator)
    }
}

 

사용 예시

suspend fun main() {
    val list = listOf(1, 2, 3, 4)
    val res = list.scan(0) { acc, i -> acc + i }
    println(res)

    flowOf(1, 2, 3, 4)
        .onEach { delay(1000) }
        .scan(0) { acc, v -> acc + v }
        .collect { println(it) }
}
//
[0, 1, 3, 6, 10]
0
(1초 후)
1
(1초 후)
3
(1초 후)
6
(1초 후)
10

 

 

Flow에서는 동시성 문제로 인해 flatMap 함수 대신 상황에 맞는 구체적인 변환 연산자 제공

 

flatMapConcat

생성된 플로우를 하나씩 처리

두 번째 Flow는 첫 번째 플로우가 완료되었을 때 시작

 

Flow의 각 값을 다른 Flow로 변환하고, 그 Flow들을 순차적으로 연결하여 평탄화

순차적으로 실행되어 이전 Flow가 완료될 때까지 다음 Flow 시작 안함

Flow 원소의 순서가 보장된다

public fun <T, R> Flow<T>.flatMapConcat(
    transform: suspend (value: T) -> Flow<R>
): Flow<R> = map(transform).flattenConcat()

public fun <T> Flow<Flow<T>>.flattenConcat(): Flow<T> = flow {
    collect { value -> emitAll(value) }
}

 

사용 예시

fun flowFrom(element: String) = flowOf(1, 2, 3)
    .onEach { delay(1000) }
    .map { "${it}_${element} " }

suspend fun main() {
    flowOf("A", "B", "C")
        .flatMapConcat { flowFrom(it) }
        .collect { println(it) }
}
//
(1초 후)
1_A 
(1초 후)
2_A 
(1초 후)
3_A 
(1초 후)
1_B 
(1초 후)
2_B 
(1초 후)
3_B 
(1초 후)
1_C 
(1초 후)
2_C 
(1초 후)
3_C

 

 

flatMapMerge

만들어진 플로우를 동시에 처리.

 

Flow의 각 값을 다른 Flow로 변환하고, 이 Flow들을 동시에(concurrent) 실행하여 결과를 병합

동시성 제한 가능 (concurrency 파라미터)

순서 미보장

// concurrency로 동시 처리 가능한 플로우 수 설정 가능
public fun <T, R> Flow<T>.flatMapMerge(
    concurrency: Int = DEFAULT_CONCURRENCY,
    transform: suspend (value: T) -> Flow<R>
): Flow<R> =
    map(transform).flattenMerge(concurrency)
    
public fun <T> Flow<Flow<T>>.flattenMerge(concurrency: Int = DEFAULT_CONCURRENCY): Flow<T> {
    require(concurrency > 0) { "Expected positive concurrency level, but had $concurrency" }
    return if (concurrency == 1) flattenConcat() else ChannelFlowMerge(this, concurrency)
}

 

사용 예시

fun flowFrom(element: String) = flowOf(1, 2, 3)
    .onEach { delay(1000) }
    .map { "${it}_${element} " }

suspend fun main() {
    flowOf("A", "B", "C")
        .flatMapMerge { flowFrom(it) }
        .collect { println(it) }
}
//
(1초 후)
1_A 
1_B 
1_C 
(1초 후)
2_A 
2_B 
2_C 
(1초 후)
3_A 
3_C 
3_B

 

 

flatMapLatest

새로운 플로우가 나타나면 이전 플로우의 처리는 무시

Flow가 새 값을 emit할 때마다 이전에 진행 중이던 Flow를 취소하고 새로운 Flow로 전환

항상 최신 값에 대한 결과만 유지

 

실시간 검색이나 자동완성, 위치 업데이트, 최신 데이터만 필요한 경우에 사용

public inline fun <T, R> Flow<T>.flatMapLatest(
    @BuilderInference crossinline transform: suspend (value: T) -> Flow<R>
): Flow<R> = transformLatest { emitAll(transform(it)) }

 

사용 예시 1

fun flowFrom(element: String) = flowOf(1, 2, 3)
    .onEach { delay(1000) }
    .map { "${it}_${element} " }

suspend fun main() {
    flowOf("A", "B", "C")
        .flatMapLatest { flowFrom(it) }
        .collect { println(it) }
}
//
(1초 후)
1_C
(1초 후)
2_C 
(1초 후)
3_C

 

사용 예시 2

fun flowFrom(element: String) = flowOf(1, 2, 3)
    .onEach { delay(1000) }
    .map { "${it}_${element} " }

suspend fun main() {
    flowOf("A", "B", "C")
        .onEach { delay(1200) }
        .flatMapLatest { flowFrom(it) }
        .collect { println(it) }
}
//
(2.2초 후)
1_A 
(1.2초 후)
1_B 
(1.2초 후)
1_C 
(1초 후)
2_C 
(1초 후)
3_C

 

 

 

필터링 연산자

filter

주어진 조건에 맞는 값을 가진 Flow 반환

public inline fun <T> Flow<T>.filter(
    crossinline predicate: suspend (T) -> Boolean
): Flow<T> = transform { value ->
    if (predicate(value)) return@transform emit(value)
}

 

사용 예시

fun isEven(num: Int): Boolean = num % 2 == 0

suspend fun main() {
    (1..6).asFlow()             // [1, 2, 3, 4, 5, 6]
        .filter { it <= 3 }     // [1, 2, 3]
        .filter { isEven(it) }  // [2]
        .collect { print(it) }  // [2]
}
// 2

 

 

take

지정된 개수만큼만 값을 가져오고 나머지는 취소하는 연산자

// count만큼 collect하고 기존 Flow 취소
public fun <T> Flow<T>.take(count: Int): Flow<T> {
    require(count > 0) { "Requested element count $count should be positive" }
    return flow {
        var consumed = 0
        try {
            collect { value ->
                if (++consumed < count) {
                    return@collect emit(value)
                } else {
                    return@collect emitAbort(value)
                }
            }
        } catch (e: AbortFlowException) {
            e.checkOwnership(owner = this)
        }
    }
}

 

사용 예시

suspend fun main() {
    ('A'..'Z').asFlow()
        .take(3)
        .collect { print(it) }
}
// ABC

 

 

drop

특정 수의 원소를 무시하는 연산자

public fun <T> Flow<T>.drop(count: Int): Flow<T> {
    require(count >= 0) { "Drop count should be non-negative, but had $count" }
    return flow {
        var skipped = 0
        collect { value ->
            if (skipped >= count) emit(value) else ++skipped
        }
    }
}

 

사용 예시

suspend fun main() {
    ('A'..'Z').asFlow()
        .drop(23)
        .collect { print(it) }
}
// XYZ

 

 

distinctUntilChanged (중복 제거 함수 )

연속된 중복 값을 필터링하는 연산자. 바로 이전의 원소와 동일한 원소만 제거

StateFlow에는 이미 이 기능이 내장되어 있다

public fun <T> Flow<T>.distinctUntilChanged(): Flow<T> =
    when (this) {
        is StateFlow<*> -> this // state flows are always distinct
        else -> distinctUntilChangedBy(keySelector = defaultKeySelector, areEquivalent = defaultAreEquivalent)
    }

 

사용 예시 1

suspend fun main() {
    flowOf(1, 2, 2, 3, 2, 1, 1, 3)
        .distinctUntilChanged()
        .collect { print(it) }
}
// 123213

 

사용 예시 2

// data class 라 equals로 인해 동일한 객체로 판별
data class User(val id: Int, val name: String)
suspend fun main() {
    flowOf(
        User(1, "Park"),
        User(1, "Park"),
        User(1, "Kim"),
        User(2, "Kim")
    )
        .distinctUntilChanged()
        .collect { println(it) }
}
//
User(id=1, name=Park)
User(id=1, name=Kim)
User(id=2, name=Kim)

 

 

distinctUntilChangedBy

특정 키를 기준으로 연속된 중복을 필터링하는 연산자

public fun <T, K> Flow<T>.distinctUntilChangedBy(keySelector: (T) -> K): Flow<T> =
    distinctUntilChangedBy(keySelector = keySelector, areEquivalent = defaultAreEquivalent)

 

사용 예시

data class User(val id: Int, val name: String) {
    override fun toString(): String = "[$id] $name"
}

suspend fun main() {
    val users = flowOf(
        User(1, "Park"),
        User(1, "Kim"),
        User(2, "Kim"),
        User(2, "Lee")
    )

    println(users.distinctUntilChangedBy { it.id }.toList())
    println(users.distinctUntilChangedBy { it.name }.toList())
    println(users.distinctUntilChanged { prev, next ->
        prev.id == next.id || prev.name == next.name
    }.toList())
}
//
[[1] Park, [2] Kim]
[[1] Park, [1] Kim, [2] Lee]
[[1] Park, [2] Kim]

 

 

 

결합 연산자

merge

여러 Flow를 하나의 Flow로 합치는 연산자

동시에 모든 flow를 수집하고 순서는 보장하지 않음

public fun <T> merge(vararg flows: Flow<T>): Flow<T> = 
   flows.asIterable().merge()

 

사용 예시 1

suspend fun main() {
    val together: Flow<Number> = merge(
        flowOf(1, 2, 3),
        flowOf(0.1, 0.2, 0.3)
    )
    print(together.toList())
}
//
[1, 2, 3, 0.1, 0.2, 0.3]
[1, 0.1, 0.2, 0.3, 2, 3]
등의 가능한 조합 중 하나

 

사용 예시 2

suspend fun main() {
    val together: Flow<Number> = merge(
        flowOf(1, 2, 3).onEach { delay(100) },
        flowOf(0.1, 0.2, 0.3)
    )
    print(together.toList())
}
// 
[0.1, 0.2, 0.3, 1, 2, 3]

 

 

zip

두 Flow로부터 쌍을 만드는 연산자

하나의 Flow가 완료되면 전체 Flow가 완료되고 남은 Flow 원소는 유실

public fun <T1, T2, R> Flow<T1>.zip(
    other: Flow<T2>,
    transform: suspend (T1, T2) -> R
): Flow<R> = zipImpl(this, other, transform)

 

사용 예시

suspend fun main() {
    val flow1 = flowOf("A", "B", "C").onEach { delay(400) }
    val flow2 = flowOf(1, 2, 3, 4).onEach { delay(1000) }
    flow1.zip(flow2) { f1, f2 ->
        "${f1}_${f2}"
    }.collect { println(it) }
}
// 
(1초 후)
A_1
(1초 후)
B_2
(1초 후)
C_3

 

 

combine

여러 Flow의 가장 최근 값들을 결합하는 연산자 (각 flow에서 최근 값만을 유지)

한 Flow의 값이 새로 emit되면 다른 Flow의 최신 값과 결합

순서나 쌍이 정해져 있지 않음

모든 flow에서 최소 하나의 값이 emit되어야 결합 시작

public fun <T1, T2, R> Flow<T1>.combine(
    flow: Flow<T2>,
    transform: suspend (a: T1, b: T2) -> R
): Flow<R> = flow {
    combineInternal(
        arrayOf(this@combine, flow),
        nullArrayFactory(),
        { emit(transform(it[0] as T1, it[1] as T2)) }
    )
}

 

사용 예시

suspend fun main() {
    val flow1 = flowOf("A", "B", "C").onEach { delay(400) }
    val flow2 = flowOf(1, 2, 3, 4).onEach { delay(1000) }
    flow1.combine(flow2) { f1, f2 ->
        "${f1}_${f2}"
    }.collect { println(it) }
}
// 
(1초 후)
B_1
(0.2초 후)
C_1
(0.8초 후)
C_2
(1초 후)
C_3
(1초 후)
C_4

 

 

예외 연산자 

retry

업스트림 Flow에서 예외가 발생했을 때 지정된 횟수만큼 재시도를 하는 연산자

업스트림 예외만 재시도 (다운스트림 예외는 처리 안함)

public fun <T> Flow<T>.retry(
    retries: Long = Long.MAX_VALUE,
    predicate: suspend (cause: Throwable) -> Boolean = { true }
): Flow<T> {
    require(retries > 0) { "Expected positive amount of retries, but had $retries" }
    return retryWhen { cause, attempt -> attempt < retries && predicate(cause) }
}

public fun <T> Flow<T>.retryWhen(
    predicate: suspend FlowCollector<T>.(cause: Throwable, attempt: Long) -> Boolean): Flow<T> =
    flow {
        var attempt = 0L
        var shallRetry: Boolean
        do {
            shallRetry = false
            val cause = catchImpl(this)
            if (cause != null) {
                if (predicate(cause, attempt)) {
                    shallRetry = true
                    attempt++
                } else {
                    throw cause
                }
            }
        } while (shallRetry)
    }

 

사용 예시 1

suspend fun main() {
    flow {
        emit(1)
        emit(2)
        error("E")
        emit(3)
    }.retry(3) {
        print(it.message)
        true
    }.collect { print(it) }
}
//
12E12E12E12 Exception in thread "main" java.lang.IllegalStateException: E

 

사용 예시 2

flow {
    emit(api.fetchData())
}.retry(retries = 3) { cause ->
    // NetworkException일 때만 재시도
    cause is NetworkException
}

 

반응형