SharedFlow
여러 구독자에게 값을 브로드캐스팅하는 데 사용되는 Hot Flow
일반적인 Flow는 Cold Data라 요청 시 값이 계산되지만
SharedFlow는 여러 수신자가 동시에 값을 수집할 수 있다
// Read-only
public interface SharedFlow<out T> : Flow<T> {
// 현재 replay 버퍼에 저장된 값들의 스냅샷
public val replayCache: List<T>
// 절대 완료되지 않는다, 취소되거나 예외가 발생하지 않는 한 영원히 실행
override suspend fun collect(collector: FlowCollector<T>): Nothing
}
MutableSharedFlow
Broadcast Channel과 비슷
replay는 마지막으로 전송한 값들을 저장하는 갯수
// Read-Write
public fun <T> MutableSharedFlow(
replay: Int = 0,
extraBufferCapacity: Int = 0,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T> { ... }
// SharedFlow와 FlowCollector 모두 상속
public interface MutableSharedFlow<T> : SharedFlow<T>, FlowCollector<T> {
override suspend fun emit(value: T)
// non-suspend emit
public fun tryEmit(value: T): Boolean
// 현재 활성화된 구독자 수
public val subscriptionCount: StateFlow<Int>
// replayCache 초기화, 새 구독자는 초기화 이후의 값만 받는다
@ExperimentalCoroutinesApi
public fun resetReplayCache()
}
예시 1
대기하고 있는 모든 코루틴이 수신
suspend fun main(): Unit = coroutineScope {
val mutableSharedFlow = MutableSharedFlow<String>()
launch {
mutableSharedFlow.collect { println("#1 Received $it") }
}
launch {
mutableSharedFlow.collect { println("#2 Received $it") }
}
delay(1000)
mutableSharedFlow.emit("Message1")
mutableSharedFlow.emit("Message2")
}
//
#1 Received Message1
#1 Received Message2
#2 Received Message1
#2 Received Message2
예시 2
replay 사용한 코드
suspend fun main(): Unit = coroutineScope {
// 최근 2개의 값만 캐시에 저장
val sharedFlow = MutableSharedFlow<String>(replay = 2)
sharedFlow.emit("Message1")
sharedFlow.emit("Message2")
sharedFlow.emit("Message3")
val snapshot = sharedFlow.replayCache
println(snapshot)
launch {
sharedFlow.collect {
println("#1 received $it")
}
}
delay(100)
sharedFlow.resetReplayCache()
val snapshot2 = sharedFlow.replayCache
println(snapshot2)
}
//
[Message2, Message3]
#1 received Message2
#1 received Message3
[]
예시 3
SharedFlow, FlowCollector를 사용해 값을 내보내거나 collect 함수만 노출하기 위해 사용
suspend fun main(): Unit = coroutineScope {
val mutableSharedFlow = MutableSharedFlow<String>()
val sharedFlow: SharedFlow<String> = mutableSharedFlow
val collector: FlowCollector<String> = mutableSharedFlow
launch {
mutableSharedFlow.collect { println("#1 received $it")}
}
launch {
sharedFlow.collect { println("#2 received $it") }
}
delay(1000)
mutableSharedFlow.emit("Message1")
collector.emit("Message2")
}
//
(1초 후)
#1 received Message1
#1 received Message2
#2 received Message1
#2 received Message2
shareIn
coldFlow를 Hot SharedFlow로 만들고 Flow의 원소를 보낸다
scope는 Flow를 실행할 코루틴 스코프
started는 Flow 시작 전략
public fun <T> Flow<T>.shareIn(
scope: CoroutineScope,
started: SharingStarted,
replay: Int = 0
): SharedFlow<T> {
val config = configureSharing(replay)
val shared = MutableSharedFlow<T>(
replay = replay,
extraBufferCapacity = config.extraBufferCapacity,
onBufferOverflow = config.onBufferOverflow
)
@Suppress("UNCHECKED_CAST")
val job = scope.launchSharing(config.context, config.upstream, shared, started, NO_VALUE as T)
return ReadonlySharedFlow(shared, job)
}
예시
suspend fun main(): Unit = coroutineScope {
val f = flowOf("A", "B", "C")
.onEach { delay(1000) }
val sharedFlow: SharedFlow<String> = f.shareIn(
scope = this,
started = SharingStarted.Eagerly,
replay = 0
)
delay(500)
launch {
sharedFlow.collect { println("#1 $it") }
}
delay(1000)
launch {
sharedFlow.collect { println("#2 $it") }
}
delay(1000)
launch {
sharedFlow.collect { println("#3 $it") }
}
}
//
(1초 후)
#1 A
(1초 후)
#2 B
#1 B
(1초 후)
#1 C
#2 C
#3 C
SharingStarted
Flow 공유를 시작하고 중지하는 전략을 정의하는 인터페이스
public fun interface SharingStarted {
public companion object {
// 구독자 유무와 관계없이 즉시 시작하고 멈추지 않음
public val Eagerly: SharingStarted = StartedEagerly()
// 첫 구독자가 나타날 때 시작하고 이후 멈추지 않음
public val Lazily: SharingStarted = StartedLazily()
// 구독자가 있을 때만 활성화
@Suppress("FunctionName")
public fun WhileSubscribed(
stopTimeoutMillis: Long = 0, // 마지막 구독자가 사라진 후 얼마나 기다렸다가 중지할지
replayExpirationMillis: Long = Long.MAX_VALUE // replay 캐시를 얼마나 유지할지
): SharingStarted =
StartedWhileSubscribed(stopTimeoutMillis, replayExpirationMillis)
}
public fun command(subscriptionCount: StateFlow<Int>): Flow<SharingCommand>
}
SharingStarted.Eagerly
즉시 값을 감지하기 시작하고 플로우로 값 전송
예시
(마지막 값을 받고 싶다면 replay 1 설정)
suspend fun main() = coroutineScope {
val flow = flowOf("A", "B", "C")
// flow 값을 즉시 방출
val sharedFlow: SharedFlow<String> = flow.shareIn(
scope = this,
started = SharingStarted.Eagerly
)
delay(100)
launch { sharedFlow.collect { println("#1 $it")} }
print("Done")
}
//
(0.1초 후)
Done
SharingStarted.Lazily
첫 번째 구독자가 나올 때 감지 시작
첫 구독자는 내보내진 모든 값을 수신하는 것이 보장되며, 이후의 구독자는 replay 수만큼 가장 최근에 젖아된 값들을 받는다
예시 1
suspend fun main() = coroutineScope {
val flow1 = flowOf("A", "B", "C")
val flow2 = flowOf("D").onEach { delay(1000) }
val sharedFlow = merge(flow1, flow2).shareIn(
scope = this,
started = SharingStarted.Lazily,
)
delay(100)
launch {
sharedFlow.collect { println("#1 $it")}
}
delay(1000)
launch {
sharedFlow.collect { println("#2 $it")}
}
}
//
(0.1초 후)
#1 A
#1 B
#1 C
(1초 후)
#2 D
#1 D
예시 2
suspend fun main() = coroutineScope {
val flow1 = flowOf("A", "B", "C")
val flow2 = flowOf("D").onEach { delay(1000) }
val sharedFlow = merge(flow1, flow2).shareIn(
scope = this,
started = SharingStarted.Lazily,
replay = 1 // flow1의 C 캐시
)
delay(100)
launch {
sharedFlow.collect { println("#1 $it")}
}
delay(1000)
launch {
sharedFlow.collect { println("#2 $it")}
}
}
//
#1 A
#1 B
#1 C
#2 C
#2 D
#1 D
SharingStarted.WhileSubscribed()
첫 구독자가 나올때 감지 시작, 마지막 구독자가 사라지면 플로우 종료
새로운 구독자가 나오면 플로우 다시 시작
stopTimeoutMillis - 마지막 구독자가 사라진 후 Flow를 중지하기까지 대기하는 시간
replayExpirationMillis - Flow가 중지된 후 replay 캐시를 유지하는 시간
public fun WhileSubscribed(
stopTimeoutMillis: Long = 0,
replayExpirationMillis: Long = Long.MAX_VALUE
): SharingStarted = StartedWhileSubscribed(stopTimeoutMillis, replayExpirationMillis)
예시 1
suspend fun main() = coroutineScope {
val flow = flowOf("A", "B", "C", "D")
.onStart { println("Started") }
.onCompletion { println("Finished") }
.onEach { delay(1000) }
val sharedFlow = flow.shareIn(
scope = this,
started = SharingStarted.WhileSubscribed(),
)
delay(3000)
launch { println("#1 ${sharedFlow.first()}") } // 첫 값만 받고 종료
launch { println("#2 ${sharedFlow.take(2).toList()}") } // 두 값만 받고 종료 (모든 구독자 종료되어 flow도 종료해 Finished 출력 )
delay(3000)
launch { println("#3 ${sharedFlow.first()}") } // flow 다시 시작
}
//
(3초 후)
Started
(1초 후)
#1 A
(1초 후)
#2 [A, B]
Finished
(1초 후)
Started
(1초 후)
#3 A
Finished
예시 2
(stopTimeoutMillis = 4000)
suspend fun main4() = coroutineScope {
val flow = flowOf("A", "B", "C", "D")
.onStart { println("Started") }
.onCompletion { println("Finished") }
.onEach { delay(1000) }
val sharedFlow = flow.shareIn(
scope = this,
started = SharingStarted.WhileSubscribed(
stopTimeoutMillis = 4000
),
)
delay(3000)
launch { println("#1 ${sharedFlow.first()}") }
launch { println("#2 ${sharedFlow.take(2).toList()}") }
delay(3000)
launch { println("#3 ${sharedFlow.first()}") }
}
//
(3초 후)
Started
#1 A
#2 [A, B]
#3 C
Finished
주의할 점
호출할 때마다 새로운 SharedFlow를 만드는 것이 아니라 SharedFlow를 만든 후 property로 저장해서 사용해야 한다
'코루틴' 카테고리의 다른 글
[코루틴-Flow] StateFlow (상태플로우) (1) | 2024.12.15 |
---|---|
[코루틴 - flow] Flow 처리 (중간 연산자) (1) | 2024.12.09 |
[코루틴-Flow] 생명주기 함수, empty, catch, flowOn, launchIn (0) | 2024.12.06 |
[코루틴] Flow 생성 (0) | 2024.12.01 |
[코루틴] Flow 원리 (0) | 2024.11.28 |