본문 바로가기

코루틴

[코루틴] Flow 원리

 

실제 Flow 구현 코드

제네릭 타입 T
FlowCollector<T> 리시버 suspend 람다 파라미터
Flow<T> 반환
safeFlow 생성

 

public fun <T> flow(
    @BuilderInference block: suspend FlowCollector<T>.() -> Unit
): Flow<T> = SafeFlow(block)

private class SafeFlow<T>(
    private val block: suspend FlowCollector<T>.() -> Unit
) : AbstractFlow<T>() {
    override suspend fun collectSafely(collector: FlowCollector<T>) {
        collector.block()
    }
}

 

SafeFlow 클래스

AbstractFlow 상속
collect 시점에 block 실행
안전한 수집 보장
private class SafeFlow<T>(
    private val block: suspend FlowCollector<T>.() -> Unit
) : AbstractFlow<T>() {
    override suspend fun collectSafely(collector: FlowCollector<T>) {
        collector.block()
    }
}

 

 

AbstractFlow 클래스

Flow 인터페이스의 collect 메소드 구현
CancellableFlow도 구현하여 취소 가능
public abstract class AbstractFlow<T> : Flow<T>, CancellableFlow<T> {

    public final override suspend fun collect(collector: FlowCollector<T>) {
        // SafeCollector로 래핑
        val safeCollector = SafeCollector(collector, coroutineContext)
        try {
            // 안전한 수집
            collectSafely(safeCollector)
        } finally {
            safeCollector.releaseIntercepted()
        }
    }

    public abstract suspend fun collectSafely(collector: FlowCollector<T>)
}

 

 

FlowCollector 인터페이스

단일 추상 메서드(SAM)를 가진 함수형 인터페이스
public fun interface FlowCollector<in T> {
    // not thread-safe
    public suspend fun emit(value: T)
}

 

 

 

Flow 구현 단계별 코드

람다식부터 시작해서 변화하는 코드

 

람다식 f
파라미터, 리턴 타입 X 
fun main() {
    val f: () -> Unit = {
        print("A")
        print("B")
        print("C")
    }
    f()
    f()
}
// 결과
ABCABC

 

suspend 람다식 f
파라미터, 리턴 타입 X
suspend fun main() {
    val f: suspend () -> Unit = {
        println("A")
        delay(1000)
        println("B")
        delay(1000)
        println("C")
    }
    f()
    f()
}
// 결과
A
(1초 후)
B
(1초 후)
C
A
(1초 후)
B
(1초 후)
C

 

suspend 람다식 f
매개변수로 emit 이름을 가진 (String) -> Unit 타입, Unit 리턴
suspend fun main() {
    val f: suspend ((String) -> Unit) -> Unit = { emit ->
        emit("A")
        emit("B")
        emit("C")
    }
    f { print(it)} 
    f { print(it)}
}
// 결과
ABCABC

 

파라미터 타입을 FlowCollector 함수형 인터페이스로 분리
fun interface FlowCollector {
    suspend fun emit(value: String)
}

suspend fun main() {
    val f: suspend (FlowCollector) -> Unit = {
        it.emit("A")
        it.emit("B")
        it.emit("C")
    }
    f { print(it) } // ABC
    f { print(it) } // ABC
}
// 결과
ABCABC

 

FlowCollector 리시버로 변경 (this 참조)
fun interface FlowCollector {
    suspend fun emit(value: String)
}

suspend fun main() {
    val f: suspend FlowCollector.() -> Unit = {
        emit("A")
        emit("B")
        emit("C")
    }
    f { print(it) }
    f { print(it) }
}
// 결과
ABCABC

 

Flow 인터페이스 추가해서 객체 표현식으로 래핑
fun interface FlowCollector {
    suspend fun emit(value: String)
}

interface Flow {
    suspend fun collect(collector: FlowCollector)
}

suspend fun main() {
    val builder: suspend FlowCollector.() -> Unit = {
        emit("A")
        emit("B")
        emit("C")
    }
    val flow: Flow = object : Flow {
        override suspend fun collect(collector: FlowCollector) {
            collector.builder()
        }
    }
    flow.collect { print(it) }
    flow.collect { print(it) }
}
// 결과
ABCABC

 

Flow 생성을 간단하게 하기 위해 빌더 정의
fun interface FlowCollector {
    suspend fun emit(value: String)
}

interface Flow {
    suspend fun collect(collector: FlowCollector)
}

fun flow(
    builder: suspend FlowCollector.() -> Unit
) = object : Flow {
    override suspend fun collect(collector: FlowCollector) {
        collector.builder()
    }
}

suspend fun main() {
    val f: Flow = flow {
        emit("A")
        emit("B")
        emit("C")
    }
    f.collect { print(it) }
    f.collect { print(it) }
}
// 결과
ABCABC

 

제네릭 타입 사용
fun interface FlowCollector<T> {
    suspend fun emit(value: T)
}

interface Flow<T> {
    suspend fun collect(collector: FlowCollector<T>)
}

fun <T> flow(
    builder: suspend FlowCollector<T>.() -> Unit
) = object : Flow<T> {
    override suspend fun collect(collector: FlowCollector<T>) {
        collector.builder()
    }
}

suspend fun main() {
    val f: Flow<String> = flow {
        emit("A")
        emit("B")
        emit("C")
    }
    f.collect { print(it) }
    f.collect { print(it) }
}
// 결과
ABCABC

 

 

실행 순서

1. collect를 호출

2. flow 빌더 호출

3. flow 함수에 람다식 전달

{
    emit("A")
    emit("B")
    emit("C")
}

 

4. Flow<String> 타입 객체 생성 (객체 표현식)

5. collect 메서드가 FlowCollector 구현체 생성

{ print(it) } -> FlowCollector 구현체로 생성

object : FlowCollector<String> {
    override suspend fun emit(value: String) {
        print(value)
    }
}

 

6. collector.builder() 호출

builder()는 FlowCollector를 리시버로 가지는 람다식
빌더 내부에서 emit이 순차적으로 실행
emit 값이 print(it)로 전달

 

 

'코루틴' 카테고리의 다른 글

[코루틴-Flow] 생명주기 함수, empty, catch, flowOn, launchIn  (0) 2024.12.06
[코루틴] Flow 생성  (0) 2024.12.01
[코루틴] Flow란 - (1)  (0) 2024.11.24
[코루틴] 핫 데이터와 콜드 데이터  (0) 2024.11.23
[코루틴] select  (0) 2024.11.18