본문 바로가기

코루틴

[코루틴] Flow 생성

 

Flow를 시작하는 방법

flowOf 사용

public fun <T> flowOf(vararg elements: T): Flow<T> = flow {
    for (element in elements) {
        emit(element)
    }
}

suspend fun main() {
    flowOf(1, 2, 3, 4, 5)
        .collect { print(it) }
}
// 12345

 

 

emptyFlow 사용

값이 없는 플로우

public fun <T> emptyFlow(): Flow<T> = EmptyFlow

private object EmptyFlow : Flow<Nothing> {
    override suspend fun collect(collector: FlowCollector<Nothing>) = Unit
}

suspend fun main() {
    emptyFlow<Int>()
        .collect { print(it) }
}
// Nothing

 

 

컨버터 사용

Iterable, Iterator, Sequence 등을 Flow로 변경

public fun <T> Iterable<T>.asFlow(): Flow<T> = flow {
    forEach { value ->
        emit(value)
    }
}

suspend fun main() {
    listOf(1, 2, 3, 4, 5)
        .asFlow()
        .collect { print(it) }
}
// 12345

 

중단함수를 플로우로 변경

public fun <T> (suspend () -> T).asFlow(): Flow<T> = flow {
    emit(invoke())
}

suspend fun main() {
    val function = suspend {
        delay(1000)
        "Kancho"
    }

    function.asFlow()
        .collect { println(it) }
}
// Kancho

 

일반함수를 플로우로 변경 (함수 참조값 필요)

fun getUserName(): String {
    return "Kancho"
}

suspend fun main() {
   ::getUserName
       .asFlow()
       .collect { println(it) }
}
// Kancho

 

 

플로우 빌더

플로우 생성 시 가장 많이 사용되는 방법

// flow 빌더
public fun <T> flow(
    @BuilderInference block: suspend FlowCollector<T>.() -> Unit
): Flow<T> = SafeFlow(block)

// SafeFlow 클래스
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
    override suspend fun collectSafely(collector: FlowCollector<T>) {
        collector.block()
    }
}

// Flow 인터페이스
public interface Flow<out T> {
    public suspend fun collect(collector: FlowCollector<T>)
}

// FlowCollector 함수형 인터페이스
public fun interface FlowCollector<in T> {
    public suspend fun emit(value: T)
}

public abstract class AbstractFlow<T> : Flow<T>, CancellableFlow<T> {
    public final override suspend fun collect(collector: FlowCollector<T>) {
        val safeCollector = SafeCollector(collector, coroutineContext)
        try {
            collectSafely(safeCollector)
        } finally {
            safeCollector.releaseIntercepted()
        }
    }
    public abstract suspend fun collectSafely(collector: FlowCollector<T>)
}

 

 

1. flow 빌더 생성

2. collect 호출

3. 람다 { value -> println(value) } 가 내부적으로 FlowCollector 구현체로 변환

4. collector.block 함수 실행

suspend fun main() {
   // block 함수
   flow {
       emit("A")
       emit("B")
       emit("C")
   }.collect { value ->  // FlowCollector 구현체
       println(value)
   }
}
// A B C

 

 

채널플로우 (ChannelFlow)

여러 개의 값을 독립적으로 계산해야 할 경우 주로 사용
ProducerScope<T>를 리시버로 하는 suspend 람다를 파라미터로 받음
ProducerScope -> CoroutineScope, SendChannel 상속 (launch, send 사용 가능)
public fun <T> channelFlow(
    @BuilderInference block: suspend ProducerScope<T>.() -> Unit
): Flow<T> = ChannelFlowBuilder(block)

public interface ProducerScope<in E> : CoroutineScope, SendChannel<E> {
    public val channel: SendChannel<E>
}

 

사용 예시

suspend fun main() {
    val flow = channelFlow {
        launch {
            for (i in 1..5) {
                delay(100)
                send("Value $i")
            }
        }
        launch {
            for (i in 'A'..'E') {
                delay(150)
                send("Letter $i")
            }
        }
    }
    
    flow.collect { value ->
        println("Received: $value")
    }
}
// 결과
Received: Value 1
Received: Letter A
Received: Value 2
Received: Letter B
Received: Value 3
Received: Value 4
Received: Letter C
Received: Value 5
Received: Letter D
Received: Letter E

 

순차적으로 page를 받아오는 코드 (channelFlow 사용 전)

data class User(val name: String)

interface UserApi {
    suspend fun takePage(pageNum: Int): List<User>
}

class FakeUserApi : UserApi {
    private val users = List(10) {
        User("User$it")
    }
    private val pageSize = 3

    override suspend fun takePage(pageNum: Int): List<User> {
        delay(1000)
        return users
            .drop(pageSize * pageNum)
            .take(pageSize)
    }
}

fun allUsersFlow(api: UserApi): Flow<User> = flow {
    var page = 0
    do {
        println("Fetching page $page")
        val users = api.takePage(page++) // 중단함수
        emitAll(users.asFlow())
    } while (users.isNotEmpty())
}

suspend fun main() {
    val users = allUsersFlow(FakeUserApi())
    val user = users.first {
        println("Checking $it")
        delay(1000) // 중단함수
        it.name == "User3"
    }
}
// 결과
Fetching page 0
Checking User(name=User0)
Checking User(name=User1)
Checking User(name=User2)
Fetching page 1
Checking User(name=User3)

 

 

원소를 처리하는 도중 미리 페이지를 받아오는 코드 (channelFlow 사용 후)

데이터 생성과 소비하는 과정이 별개로 진행

fun allUsersFlow(api: UserApi): Flow<User> = channelFlow {
    var page = 0
    do {
        println("Fetching page $page")
        val users = api.takePage(page++) // 중단함수
        users.forEach { send(it) }
    } while (users.isNotEmpty())
}

suspend fun main() {
    val users = allUsersFlow(FakeUserApi())
    val user = users.first {
        println("Checking $it")
        delay(1000) // 중단함수
        it.name == "User3"
    }
}
// 결과
Fetching page 0
Checking User(name=User0)
Fetching page 1
Checking User(name=User1)
Fetching page 2
Checking User(name=User2)
Fetching page 3
Checking User(name=User3)
Fetching page 4

 

 

콜백플로우 (callbackFlow)

콜백 기반의 API를 Flow로 변환할 때 사용하는 플로우 빌더

주로 이벤트 리스너(클릭)나 콜백 기반 API를 리액티브 스트림으로 변환할 때 유용

public fun <T> callbackFlow(
    @BuilderInference block: suspend ProducerScope<T>.() -> Unit
): Flow<T> = CallbackFlowBuilder(block)

 

네트워크 상태 모니터링하는 코드

fun networkStateFlow(context: Context) = callbackFlow {
    val networkCallback = object : ConnectivityManager.NetworkCallback() {
        override fun onAvailable(network: Network) {
            trySend(true)
        }
        override fun onLost(network: Network) {
            trySend(false)
        }
    }

    val connectivityManager =
        context.getSystemService(Context.CONNECTIVITY_SERVICE) as ConnectivityManager

    connectivityManager.registerDefaultNetworkCallback(networkCallback)

    awaitClose {
        connectivityManager.unregisterNetworkCallback(networkCallback)
    }
}

 

callbackFlow의 주요한 함수

 

awaitClose

채널이 닫힐 때까지 중단되는 함수

코루틴이 종료되는 것을 막는다
리소스 정리

 

trySend

비차단 방식으로 값 전송

 

close

채널을 닫고 에러 전파

 

cancel

채널을 종료하고 플로우에 예외를 던진다