크게 2가지 데이터 소스
핫 스트림 데이터 (Hot Stream Data)
콜드 스트림 데이터 (Cold Stream Data)
핫 스트림 데이터 (Hot Stream Data)
구독자의 유무와 상관없이 데이터 발행
즉, 데이터의 소비가 없어도 데이터를 생성
List, Set, Channel 등
Channel
기본적으로 채널은 Hot Stream
// 구독자가 없는 Channel을 사용한 예시
// 1. Channel (Hot)
suspend fun main(): Unit = coroutineScope {
val rendezvousChannel = Channel<Int>()
launch {
// Buffer = 0이고 trySend라 큐에 저장되지 않고 바로 실패 반환, non-blocking
val result = rendezvousChannel.trySend(3)
println("Sent3, result : $result")
println("rendezvousChannel isEmpty : ${rendezvousChannel.isEmpty}")
}
launch {
rendezvousChannel.send(1) // Buffer = 0 이기에 내부 queue에 저장되지만 수신자가 없어 일시중단
println("Sent1")
rendezvousChannel.send(2) // Buffer = 0
println("Sent2")
}
delay(2000) // 2초 대기
val bufferedChannel = Channel<Int>(Channel.BUFFERED)
launch {
bufferedChannel.send(4) // Buffer가 64로 가득차기 전까지 버퍼에 바로 저장
println("Sent4")
bufferedChannel.send(5) // Buffer가 64로 가득차기 전까지 버퍼에 바로 저장
println("Sent5")
println("bufferedChannel isEmpty : ${bufferedChannel.isEmpty}")
}
}
// 결과
Sent3, result : Value(Failed)
rendezvousChannel isEmpty : true
.. 2초후 ..
Sent4
Sent5
bufferedChannel isEmpty : false
구독자가 없을 경우
버퍼가 없는 채널에서 일시중단 함수인 send를 호출하게 되면 구독자가 생기기 전까지 queue에 저장되어 대기
버퍼가 있는 채널에서 send를 호출하면 버퍼가 비어있을 시 버퍼에, 가득 찰 경우 queue에 저장
internal abstract class AbstractSendChannel<E>(
@JvmField protected val onUndeliveredElement: OnUndeliveredElement<E>?
) : SendChannel<E> {
protected val queue = LockFreeLinkedListHead()
}
// abstract class AbstractChannel
public override val isEmpty: Boolean get() = isEmptyImpl
protected val isEmptyImpl: Boolean get() = queue.nextNode !is Send && isBufferEmpty
isEmpty는
queue에 send 요청이 없으며 버퍼도 비어있는지 모두 체크
List
데이터가 즉시 연산되어 실행
// List
fun main() {
val list = buildList {
repeat(3) {
add("User$it")
println("List: Added User")
}
}
val mapList = list.map {
println("List: Processing")
"Processed $it"
}
}
// 결과
List: Added User
List: Added User
List: Added User
List: Processing
List: Processing
List: Processing
중간 연산 과정을 모두 계산하고 데이터 처리가 완료된 컬렉션을 반환하는 List
// List
fun main() {
listOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.map { mapNumber(it) }
.find { filterNumber(it) }
.let { print(it) }
}
fun mapNumber(i: Int): Int {
print("m$i ")
return i * i
}
fun filterNumber(i: Int): Boolean {
println("f$i ")
return i >= 10
}
// 결과
m1 m2 m3 m4 m5 m6 m7 m8 m9 m10 f1
f4
f9
f16
16
하지만 데이터가 여러 번 사용될 경우 매번 결과를 다시 계산할 필요가 없어 Sequence보다 더 효율적일 수 있다
fun main() {
val l = listOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.map { mapNumber(it) }
println(l)
println(l.find { it > 10 })
println(l.find { it > 10 })
println(l.find { it > 10 })
}
// 결과
m1 m2 m3 m4 m5 m6 m7 m8 m9 m10 [1, 4, 9, 16, 25, 36, 49, 64, 81, 100]
16
16
16
콜드 스트림 데이터 (Cold Stream Data)
구독자가 요청할 때만 데이터 발행
구독자마다 새로운 데이터 스트림 생성
필요할 때만 메모리를 사용해 최소한의 연산을 수행하기에 메모리를 적게 사용
최종 연산에서 값이 필요할 때 처리
Sequence, Stream, Flow, RxJava 등등
Sequence
sequence는 지연 계산을 사용하고, 요청할 때만 계산
중간 연산은 이전에 만든 sequence에 새로운 연산을 첨가
최종 연산이 모든 작업을 실행 (find, toList)
fun main() {
val sequence = sequence {
repeat(3) {
yield("User$it")
println("S: Added User")
}
}
// map 중간 연산
val mapSequence = sequence.map {
println("S: Processing")
"Processed $it"
}
}
// 결과
// 요청이 없기에 아무것도 출력되지 않는다
중간 연산 결과를 저장하지 않는 Sequence
fun main() {
sequenceOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.map { mapNumber(it) }
.find { filterNumber(it) }
.let { print(it) }
}
fun mapNumber(i: Int): Int {
print("m$i ")
return i * i
}
fun filterNumber(i: Int): Boolean {
println("f$i ")
return i >= 10
}
// 결과
m1 f1
m2 f4
m3 f9
m4 f16
16
하지만 데이터를 여러 번 사용해야 할 경우에는
매번 다시 계산을 하게 되어 List가 더 효율적일 수 있다
fun main() {
val s = sequenceOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.map { mapNumber(it) }
println(s.toList())
println(s.find { it > 10 })
println(s.find { it > 10 })
println(s.find { it > 10 })
}
// 결과
m1 m2 m3 m4 m5 m6 m7 m8 m9 m10 [1, 4, 9, 16, 25, 36, 49, 64, 81, 100]
m1 m2 m3 m4 16
m1 m2 m3 m4 16
m1 m2 m3 m4 16
Hot Channel vs Cold Flow
Hot channel
채널은 핫 스트림이라 값을 바로 계산
별도의 코루틴에서 계산을 수행하기에 produce는 CoroutineScope의 확장 함수로 정의
소비자와 상관없이 값을 생성
원소는 한 번만 받을 수 있다
첫 번째 수신자가 모든 원소를 소비하고 나면
두 번째 소비자는 채널이 비어 있고 이미 닫혀 있다는 걸 발견하고 어떤 원소도 받을 수 없다
suspend fun main(): Unit = coroutineScope {
val channel = makeChannel()
delay(1000)
println("Calling Channel...")
for (value in channel) {
println(value)
}
println("Consuming again...")
for (value in channel) {
println(value)
}
}
private fun CoroutineScope.makeChannel() = produce {
println("Channel Started")
for (i in 1..3) {
delay(1000)
send(i)
}
}
// 결과
Channel Started
(1초 후)
Calling Channel...
1
(1초 후)
2
(1초 후)
3
Consuming again...
Cold Flow
Flow는 콜드 데이터 소스이기에 값이 필요할 때만 생성
최종 연산(collect 같은)이 호출될 때 원소가 어떻게 생성되어야 하는지 정의
flow 빌더는 빌더를 호출한 최종 연산의 scope에서 실행
각 최종 연산은 처음부터 데이터를 처리하기 시작
suspend fun main(): Unit = coroutineScope {
val flow = makeFlow()
delay(1000)
println("Calling Flow...")
flow.collect { value -> println(value) }
println("Consuming again...")
flow.collect { value -> println(value) }
}
private fun makeFlow() = flow {
println("Flow started")
for (i in 1..3) {
delay(1000)
emit(i)
}
}
// 결과
(1초 후)
Calling Flow...
Flow started
(1초 후)
1
(1초 후)
2
(1초 후)
3
Consuming again...
Flow started
(1초 후)
1
(1초 후)
2
(1초 후)
3
'코루틴' 카테고리의 다른 글
[코루틴] Flow 원리 (0) | 2024.11.28 |
---|---|
[코루틴] Flow란 - (1) (0) | 2024.11.24 |
[코루틴] select (0) | 2024.11.18 |
[코루틴] Channel (채널) (0) | 2024.11.18 |
[코루틴] 코루틴의 동작 방식 (0) | 2024.09.29 |