Flow를 시작하는 방법
flowOf 사용
public fun <T> flowOf(vararg elements: T): Flow<T> = flow {
for (element in elements) {
emit(element)
}
}
suspend fun main() {
flowOf(1, 2, 3, 4, 5)
.collect { print(it) }
}
// 12345
emptyFlow 사용
값이 없는 플로우
public fun <T> emptyFlow(): Flow<T> = EmptyFlow
private object EmptyFlow : Flow<Nothing> {
override suspend fun collect(collector: FlowCollector<Nothing>) = Unit
}
suspend fun main() {
emptyFlow<Int>()
.collect { print(it) }
}
// Nothing
컨버터 사용
Iterable, Iterator, Sequence 등을 Flow로 변경
public fun <T> Iterable<T>.asFlow(): Flow<T> = flow {
forEach { value ->
emit(value)
}
}
suspend fun main() {
listOf(1, 2, 3, 4, 5)
.asFlow()
.collect { print(it) }
}
// 12345
중단함수를 플로우로 변경
public fun <T> (suspend () -> T).asFlow(): Flow<T> = flow {
emit(invoke())
}
suspend fun main() {
val function = suspend {
delay(1000)
"Kancho"
}
function.asFlow()
.collect { println(it) }
}
// Kancho
일반함수를 플로우로 변경 (함수 참조값 필요)
fun getUserName(): String {
return "Kancho"
}
suspend fun main() {
::getUserName
.asFlow()
.collect { println(it) }
}
// Kancho
플로우 빌더
플로우 생성 시 가장 많이 사용되는 방법
// flow 빌더
public fun <T> flow(
@BuilderInference block: suspend FlowCollector<T>.() -> Unit
): Flow<T> = SafeFlow(block)
// SafeFlow 클래스
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
override suspend fun collectSafely(collector: FlowCollector<T>) {
collector.block()
}
}
// Flow 인터페이스
public interface Flow<out T> {
public suspend fun collect(collector: FlowCollector<T>)
}
// FlowCollector 함수형 인터페이스
public fun interface FlowCollector<in T> {
public suspend fun emit(value: T)
}
public abstract class AbstractFlow<T> : Flow<T>, CancellableFlow<T> {
public final override suspend fun collect(collector: FlowCollector<T>) {
val safeCollector = SafeCollector(collector, coroutineContext)
try {
collectSafely(safeCollector)
} finally {
safeCollector.releaseIntercepted()
}
}
public abstract suspend fun collectSafely(collector: FlowCollector<T>)
}
1. flow 빌더 생성
2. collect 호출
3. 람다 { value -> println(value) } 가 내부적으로 FlowCollector 구현체로 변환
4. collector.block 함수 실행
suspend fun main() {
// block 함수
flow {
emit("A")
emit("B")
emit("C")
}.collect { value -> // FlowCollector 구현체
println(value)
}
}
// A B C
채널플로우 (ChannelFlow)
여러 개의 값을 독립적으로 계산해야 할 경우 주로 사용
ProducerScope<T>를 리시버로 하는 suspend 람다를 파라미터로 받음
ProducerScope -> CoroutineScope, SendChannel 상속 (launch, send 사용 가능)
public fun <T> channelFlow(
@BuilderInference block: suspend ProducerScope<T>.() -> Unit
): Flow<T> = ChannelFlowBuilder(block)
public interface ProducerScope<in E> : CoroutineScope, SendChannel<E> {
public val channel: SendChannel<E>
}
사용 예시
suspend fun main() {
val flow = channelFlow {
launch {
for (i in 1..5) {
delay(100)
send("Value $i")
}
}
launch {
for (i in 'A'..'E') {
delay(150)
send("Letter $i")
}
}
}
flow.collect { value ->
println("Received: $value")
}
}
// 결과
Received: Value 1
Received: Letter A
Received: Value 2
Received: Letter B
Received: Value 3
Received: Value 4
Received: Letter C
Received: Value 5
Received: Letter D
Received: Letter E
순차적으로 page를 받아오는 코드 (channelFlow 사용 전)
data class User(val name: String)
interface UserApi {
suspend fun takePage(pageNum: Int): List<User>
}
class FakeUserApi : UserApi {
private val users = List(10) {
User("User$it")
}
private val pageSize = 3
override suspend fun takePage(pageNum: Int): List<User> {
delay(1000)
return users
.drop(pageSize * pageNum)
.take(pageSize)
}
}
fun allUsersFlow(api: UserApi): Flow<User> = flow {
var page = 0
do {
println("Fetching page $page")
val users = api.takePage(page++) // 중단함수
emitAll(users.asFlow())
} while (users.isNotEmpty())
}
suspend fun main() {
val users = allUsersFlow(FakeUserApi())
val user = users.first {
println("Checking $it")
delay(1000) // 중단함수
it.name == "User3"
}
}
// 결과
Fetching page 0
Checking User(name=User0)
Checking User(name=User1)
Checking User(name=User2)
Fetching page 1
Checking User(name=User3)
원소를 처리하는 도중 미리 페이지를 받아오는 코드 (channelFlow 사용 후)
데이터 생성과 소비하는 과정이 별개로 진행
fun allUsersFlow(api: UserApi): Flow<User> = channelFlow {
var page = 0
do {
println("Fetching page $page")
val users = api.takePage(page++) // 중단함수
users.forEach { send(it) }
} while (users.isNotEmpty())
}
suspend fun main() {
val users = allUsersFlow(FakeUserApi())
val user = users.first {
println("Checking $it")
delay(1000) // 중단함수
it.name == "User3"
}
}
// 결과
Fetching page 0
Checking User(name=User0)
Fetching page 1
Checking User(name=User1)
Fetching page 2
Checking User(name=User2)
Fetching page 3
Checking User(name=User3)
Fetching page 4
콜백플로우 (callbackFlow)
콜백 기반의 API를 Flow로 변환할 때 사용하는 플로우 빌더
주로 이벤트 리스너(클릭)나 콜백 기반 API를 리액티브 스트림으로 변환할 때 유용
public fun <T> callbackFlow(
@BuilderInference block: suspend ProducerScope<T>.() -> Unit
): Flow<T> = CallbackFlowBuilder(block)
네트워크 상태 모니터링하는 코드
fun networkStateFlow(context: Context) = callbackFlow {
val networkCallback = object : ConnectivityManager.NetworkCallback() {
override fun onAvailable(network: Network) {
trySend(true)
}
override fun onLost(network: Network) {
trySend(false)
}
}
val connectivityManager =
context.getSystemService(Context.CONNECTIVITY_SERVICE) as ConnectivityManager
connectivityManager.registerDefaultNetworkCallback(networkCallback)
awaitClose {
connectivityManager.unregisterNetworkCallback(networkCallback)
}
}
callbackFlow의 주요한 함수
awaitClose
채널이 닫힐 때까지 중단되는 함수
코루틴이 종료되는 것을 막는다
리소스 정리
trySend
비차단 방식으로 값 전송
close
채널을 닫고 에러 전파
cancel
채널을 종료하고 플로우에 예외를 던진다
'코루틴' 카테고리의 다른 글
[코루틴 - flow] Flow 처리 (중간 연산자) (1) | 2024.12.09 |
---|---|
[코루틴-Flow] 생명주기 함수, empty, catch, flowOn, launchIn (0) | 2024.12.06 |
[코루틴] Flow 원리 (0) | 2024.11.28 |
[코루틴] Flow란 - (1) (0) | 2024.11.24 |
[코루틴] 핫 데이터와 콜드 데이터 (0) | 2024.11.23 |