본문 바로가기

코루틴

[코루틴-Flow] SharedFlow (공유플로우)

 

SharedFlow

여러 구독자에게 값을 브로드캐스팅하는 데 사용되는 Hot Flow

 

일반적인 Flow는 Cold Data라 요청 시 값이 계산되지만

SharedFlow는 여러 수신자가 동시에 값을 수집할 수 있다

// Read-only
public interface SharedFlow<out T> : Flow<T> {
    // 현재 replay 버퍼에 저장된 값들의 스냅샷
    public val replayCache: List<T>

    // 절대 완료되지 않는다, 취소되거나 예외가 발생하지 않는 한 영원히 실행
    override suspend fun collect(collector: FlowCollector<T>): Nothing
}

 

 

MutableSharedFlow

Broadcast Channel과 비슷

replay는 마지막으로 전송한 값들을 저장하는 갯수

// Read-Write
public fun <T> MutableSharedFlow(
    replay: Int = 0,
    extraBufferCapacity: Int = 0,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T> { ... }

// SharedFlow와 FlowCollector 모두 상속
public interface MutableSharedFlow<T> : SharedFlow<T>, FlowCollector<T> {
    override suspend fun emit(value: T)
    // non-suspend emit
    public fun tryEmit(value: T): Boolean
    // 현재 활성화된 구독자 수
    public val subscriptionCount: StateFlow<Int>
    // replayCache 초기화, 새 구독자는 초기화 이후의 값만 받는다
    @ExperimentalCoroutinesApi
    public fun resetReplayCache()
}

 

예시 1

대기하고 있는 모든 코루틴이 수신

suspend fun main(): Unit = coroutineScope {
    val mutableSharedFlow = MutableSharedFlow<String>()

    launch {
        mutableSharedFlow.collect { println("#1 Received $it") }
    }

    launch {
        mutableSharedFlow.collect { println("#2 Received $it") }
    }

    delay(1000)
    mutableSharedFlow.emit("Message1")
    mutableSharedFlow.emit("Message2")
}
// 
#1 Received Message1
#1 Received Message2
#2 Received Message1
#2 Received Message2

 

예시 2

replay 사용한 코드

suspend fun main(): Unit = coroutineScope {
    // 최근 2개의 값만 캐시에 저장
    val sharedFlow = MutableSharedFlow<String>(replay = 2)
    sharedFlow.emit("Message1")
    sharedFlow.emit("Message2")
    sharedFlow.emit("Message3")

    val snapshot = sharedFlow.replayCache
    println(snapshot)

    launch {
        sharedFlow.collect {
            println("#1 received $it")
        }
    }

    delay(100)
    sharedFlow.resetReplayCache()

    val snapshot2 = sharedFlow.replayCache
    println(snapshot2)
}
// 
[Message2, Message3]
#1 received Message2
#1 received Message3
[]

 

예시 3

SharedFlow, FlowCollector를 사용해 값을 내보내거나 collect 함수만 노출하기 위해 사용

suspend fun main(): Unit = coroutineScope {
    val mutableSharedFlow = MutableSharedFlow<String>()
    val sharedFlow: SharedFlow<String> = mutableSharedFlow
    val collector: FlowCollector<String> = mutableSharedFlow

    launch {
        mutableSharedFlow.collect { println("#1 received $it")}
    }
    launch {
        sharedFlow.collect { println("#2 received $it") }
    }

    delay(1000)
    mutableSharedFlow.emit("Message1")
    collector.emit("Message2")
}
//
(1초 후)
#1 received Message1
#1 received Message2
#2 received Message1
#2 received Message2

 

 

shareIn

coldFlow를 Hot SharedFlow로 만들고 Flow의 원소를 보낸다

scope는 Flow를 실행할 코루틴 스코프

started는 Flow 시작 전략

public fun <T> Flow<T>.shareIn(
    scope: CoroutineScope,
    started: SharingStarted,
    replay: Int = 0
): SharedFlow<T> {
    val config = configureSharing(replay)
    val shared = MutableSharedFlow<T>(
        replay = replay,
        extraBufferCapacity = config.extraBufferCapacity,
        onBufferOverflow = config.onBufferOverflow
    )
    @Suppress("UNCHECKED_CAST")
    val job = scope.launchSharing(config.context, config.upstream, shared, started, NO_VALUE as T)
    return ReadonlySharedFlow(shared, job)
}

 

예시

suspend fun main(): Unit = coroutineScope {
    val f = flowOf("A", "B", "C")
        .onEach { delay(1000) }
    val sharedFlow: SharedFlow<String> = f.shareIn(
        scope = this,
        started = SharingStarted.Eagerly,
        replay = 0
    )

    delay(500)
    launch {
        sharedFlow.collect { println("#1 $it") }
    }
    delay(1000)
    launch {
        sharedFlow.collect { println("#2 $it") }
    }
    delay(1000)
    launch {
        sharedFlow.collect { println("#3 $it") }
    }
}
//
(1초 후)
#1 A
(1초 후)
#2 B
#1 B
(1초 후)
#1 C
#2 C
#3 C

 

 

SharingStarted

Flow 공유를 시작하고 중지하는 전략을 정의하는 인터페이스

public fun interface SharingStarted {
    public companion object {
        // 구독자 유무와 관계없이 즉시 시작하고 멈추지 않음
        public val Eagerly: SharingStarted = StartedEagerly()
        // 첫 구독자가 나타날 때 시작하고 이후 멈추지 않음
        public val Lazily: SharingStarted = StartedLazily()
        // 구독자가 있을 때만 활성화
        @Suppress("FunctionName")
        public fun WhileSubscribed(
            stopTimeoutMillis: Long = 0,  // 마지막 구독자가 사라진 후 얼마나 기다렸다가 중지할지
            replayExpirationMillis: Long = Long.MAX_VALUE  // replay 캐시를 얼마나 유지할지
        ): SharingStarted =
            StartedWhileSubscribed(stopTimeoutMillis, replayExpirationMillis)
    }
    public fun command(subscriptionCount: StateFlow<Int>): Flow<SharingCommand>
}

 

 

SharingStarted.Eagerly

즉시 값을 감지하기 시작하고 플로우로 값 전송

 

예시

(마지막 값을 받고 싶다면 replay 1 설정)

suspend fun main() = coroutineScope {
    val flow = flowOf("A", "B", "C")
    // flow 값을 즉시 방출
    val sharedFlow: SharedFlow<String> = flow.shareIn(
        scope = this,
        started = SharingStarted.Eagerly
    )
    delay(100)
    launch { sharedFlow.collect { println("#1 $it")} }
    print("Done")
}
//
(0.1초 후)
Done

 

 

SharingStarted.Lazily

첫 번째 구독자가 나올 때 감지 시작

첫 구독자는 내보내진 모든 값을 수신하는 것이 보장되며, 이후의 구독자는 replay 수만큼 가장 최근에 젖아된 값들을 받는다

 

예시 1

suspend fun main() = coroutineScope {
    val flow1 = flowOf("A", "B", "C")
    val flow2 = flowOf("D").onEach { delay(1000) }

    val sharedFlow = merge(flow1, flow2).shareIn(
        scope = this,
        started = SharingStarted.Lazily,
    )
    delay(100)
    launch {
        sharedFlow.collect { println("#1 $it")}
    }
    delay(1000)
    launch {
        sharedFlow.collect { println("#2 $it")}
    }
}
// 
(0.1초 후)
#1 A
#1 B
#1 C
(1초 후)
#2 D
#1 D

 

예시 2

suspend fun main() = coroutineScope {
    val flow1 = flowOf("A", "B", "C")
    val flow2 = flowOf("D").onEach { delay(1000) }

    val sharedFlow = merge(flow1, flow2).shareIn(
        scope = this,
        started = SharingStarted.Lazily,
        replay = 1	// flow1의 C 캐시
    )
    delay(100)
    launch {
        sharedFlow.collect { println("#1 $it")}
    }
    delay(1000)
    launch {
        sharedFlow.collect { println("#2 $it")}
    }
}
// 
#1 A
#1 B
#1 C
#2 C
#2 D
#1 D

 

 

SharingStarted.WhileSubscribed()

첫 구독자가 나올때 감지 시작, 마지막 구독자가 사라지면 플로우 종료

새로운 구독자가 나오면 플로우 다시 시작

stopTimeoutMillis - 마지막 구독자가 사라진 후 Flow를 중지하기까지 대기하는 시간

replayExpirationMillis - Flow가 중지된 후 replay 캐시를 유지하는 시간

public fun WhileSubscribed(
    stopTimeoutMillis: Long = 0,
    replayExpirationMillis: Long = Long.MAX_VALUE
): SharingStarted = StartedWhileSubscribed(stopTimeoutMillis, replayExpirationMillis)

 

예시 1

suspend fun main() = coroutineScope {
    val flow = flowOf("A", "B", "C", "D")
        .onStart { println("Started") }
        .onCompletion { println("Finished") }
        .onEach { delay(1000) }

    val sharedFlow = flow.shareIn(
        scope = this,
        started = SharingStarted.WhileSubscribed(),
    )

    delay(3000)
    launch { println("#1 ${sharedFlow.first()}") }		// 첫 값만 받고 종료
    launch { println("#2 ${sharedFlow.take(2).toList()}") }	// 두 값만 받고 종료 (모든 구독자 종료되어 flow도 종료해 Finished 출력 )
    delay(3000)
    launch { println("#3 ${sharedFlow.first()}") }		// flow 다시 시작
}
//
(3초 후)
Started
(1초 후)
#1 A
(1초 후)
#2 [A, B]
Finished
(1초 후)
Started
(1초 후)
#3 A
Finished

 

예시 2

(stopTimeoutMillis = 4000)

suspend fun main4() = coroutineScope {
    val flow = flowOf("A", "B", "C", "D")
        .onStart { println("Started") }
        .onCompletion { println("Finished") }
        .onEach { delay(1000) }

    val sharedFlow = flow.shareIn(
        scope = this,
        started = SharingStarted.WhileSubscribed(
            stopTimeoutMillis = 4000
        ),
    )

    delay(3000)
    launch { println("#1 ${sharedFlow.first()}") }
    launch { println("#2 ${sharedFlow.take(2).toList()}") }
    delay(3000)
    launch { println("#3 ${sharedFlow.first()}") }
}
// 
(3초 후)
Started
#1 A
#2 [A, B]
#3 C
Finished

 

 

주의할 점

호출할 때마다 새로운 SharedFlow를 만드는 것이 아니라 SharedFlow를 만든 후 property로 저장해서 사용해야 한다