본문 바로가기

코루틴

[코루틴-Flow] StateFlow (상태플로우)

 

StateFlow (상태플로우)

상태플로우는 Hot flow이며 공유플로우의 개념을 확장

replay가 1인 공유플로우와 비슷하게 동작

접근 가능한 값 하나(value)를 가지고 있다

초기값은 생성자로 전달

안드로이드에서 LiveData를 대체하는 최신 방식

public interface StateFlow<out T> : SharedFlow<T> {
    // 현재 value
    public val value: T
}

public fun <T> MutableStateFlow(value: T): MutableStateFlow<T> = 
    StateFlowImpl(value ?: NULL)

// StateFlow, MutableSharedFlow 구현
public interface MutableStateFlow<T> : StateFlow<T>, MutableSharedFlow<T> {
    // 현재 상태 값을 읽거나 변경
    // 이전 값과 동일한 값을 설정하면 아무런 동작도 하지 않는다
    // Thread-safe -> 동시성 환경에서 안전하게 사용 가능
    public override var value: T
    
    public fun compareAndSet(expect: T, update: T): Boolean
}

 

예시 1

suspend fun main5() = coroutineScope {
    val state = MutableStateFlow("A")
    println(state.value)
    launch {
        state.collect { println("Value changed to $it") }
    }
    delay(1000)
    state.value = "B"

    delay(1000)
    launch {
        state.collect { println("and now it is $it") }
    }

    delay(1000)
    state.value = "C"
}
//
A
Value changed to A
Value changed to B
and now it is B
Value changed to C
and now it is C

 

 

StateFlow는 MutableSharedFlow의 아래 구현처럼 동작한다

MutableSharedFlow<T>(
    replay = 1,                     // 최신 값 하나 유지
    onBufferOverflow = DROP_OLDEST  // 새 값이 들어오면 이전 값 삭제
)

 

예시 2

데이터를 덮어씌우기 때문에 관찰이 느린 경우 중간의 변화를 받을 수 없는 경우도 있다

suspend fun main(): Unit = coroutineScope {
    val state = MutableStateFlow('X')
    launch {
        for (c in 'A'..'E') {
            delay(300)
            state.value = c
        }
    }

    state.collect {
        delay(1000)
        println(it)
    }
}
//
X C E

 

 

stateIn

Flow를 StateFlow로 변환하는 함수

 

첫 번째 형태

중단함수이며 StateFlow는 항상 값을 가져야 해서 값을 명시하지 않았을 경우 첫 번째 값이 계산될 때 까지 기다려야 한다

첫 번째 값이 emit될 때까지 일시 중단

public suspend fun <T> Flow<T>.stateIn(scope: CoroutineScope): StateFlow<T> {
    val config = configureSharing(1)
    val result = CompletableDeferred<StateFlow<T>>()
    scope.launchSharingDeferred(config.context, config.upstream, result)
    return result.await()
}

 

예시

suspend fun main(): Unit = coroutineScope {
    val flow = flowOf("A", "B", "C")
        .onEach { delay(1000) }
        .onEach { println("Produced $it") }
        
    // flow가 시작되고 첫 번째 값을 기다림
    // 첫 번째 값을 받을 때까지 일시 중단
    val stateFlow: StateFlow<String> = flow.stateIn(this)

    println("Listening")
    println(stateFlow.value)
    stateFlow.collect { println("Received $it") }
}
//
(1초 후)
Produced A
Listening
A
Received A
(1초 후)
Produced B
Received B
(1초 후)
Produced C
Received C

 

2번째 형태

중단 함수가 아니며 초기값과 started 모드를 지정해야 한다 (SharedFlow의 shareIn과 동일)

초기값이 제공되어 즉시 StateFlow 생성 가능

public fun <T> Flow<T>.stateIn(
    scope: CoroutineScope,
    started: SharingStarted,
    initialValue: T
): StateFlow<T> {
    val config = configureSharing(1)
    val state = MutableStateFlow(initialValue)
    val job = scope.launchSharing(config.context, config.upstream, state, started, initialValue)
    return ReadonlyStateFlow(state, job)
}

 

예시

suspend fun main(): Unit = coroutineScope {
    val flow = flowOf("A", "B")
        .onEach { delay(1000) }
        .onEach { println("Produced $it") }

    val stateFlow: StateFlow<String> = flow.stateIn(
        scope = this,
        started = SharingStarted.Lazily,
        initialValue = "Empty"
    )
    println(stateFlow.value)

    delay(2000)
    stateFlow.collect { println("Received $it") }
}
//
Empty
(2초 후)
Received Empty
(1초 후)
Produced A
Received A
(1초 후)
Produced B
Received B