본문 바로가기

코루틴

[코루틴] 핫 데이터와 콜드 데이터

 

크게 2가지 데이터 소스

핫 스트림 데이터 (Hot Stream Data)
콜드 스트림 데이터 (Cold Stream Data)

 

 

핫 스트림 데이터 (Hot Stream Data)

구독자의 유무와 상관없이 데이터 발행

즉, 데이터의 소비가 없어도 데이터를 생성

List, Set, Channel

 

Channel

기본적으로 채널은 Hot Stream

// 구독자가 없는 Channel을 사용한 예시
// 1. Channel (Hot)
suspend fun main(): Unit = coroutineScope {
    val rendezvousChannel = Channel<Int>()
    launch {
        // Buffer = 0이고 trySend라 큐에 저장되지 않고 바로 실패 반환, non-blocking
        val result = rendezvousChannel.trySend(3)
        println("Sent3, result : $result")
        println("rendezvousChannel isEmpty : ${rendezvousChannel.isEmpty}")
    }
    launch {
        rendezvousChannel.send(1) // Buffer = 0 이기에 내부 queue에 저장되지만 수신자가 없어 일시중단
        println("Sent1")
        rendezvousChannel.send(2) // Buffer = 0
        println("Sent2")
    }

    delay(2000) // 2초 대기

    val bufferedChannel = Channel<Int>(Channel.BUFFERED)
    launch {
        bufferedChannel.send(4) // Buffer가 64로 가득차기 전까지 버퍼에 바로 저장
        println("Sent4")
        bufferedChannel.send(5) // Buffer가 64로 가득차기 전까지 버퍼에 바로 저장
        println("Sent5")
        println("bufferedChannel isEmpty : ${bufferedChannel.isEmpty}")
    }
}
// 결과
Sent3, result : Value(Failed)
rendezvousChannel isEmpty : true
.. 2초후 ..
Sent4
Sent5
bufferedChannel isEmpty : false

 

 

구독자가 없을 경우

버퍼가 없는 채널에서 일시중단 함수인 send를 호출하게 되면 구독자가 생기기 전까지 queue에 저장되어 대기

버퍼가 있는 채널에서 send를 호출하면 버퍼가 비어있을 시 버퍼에, 가득 찰 경우 queue에 저장

internal abstract class AbstractSendChannel<E>(
    @JvmField protected val onUndeliveredElement: OnUndeliveredElement<E>?
) : SendChannel<E> {
    protected val queue = LockFreeLinkedListHead()
}


// abstract class AbstractChannel 
public override val isEmpty: Boolean get() = isEmptyImpl
protected val isEmptyImpl: Boolean get() = queue.nextNode !is Send && isBufferEmpty

isEmpty는 
queue에 send 요청이 없으며 버퍼도 비어있는지 모두 체크

 

 

List

데이터가 즉시 연산되어 실행

// List
fun main() {
    val list = buildList {
        repeat(3) {
            add("User$it")
            println("List: Added User")
        }
    }

    val mapList = list.map {
        println("List: Processing")
        "Processed $it"
    }
}
// 결과
List: Added User
List: Added User
List: Added User
List: Processing
List: Processing
List: Processing

 

중간 연산 과정을 모두 계산하고 데이터 처리가 완료된 컬렉션을 반환하는 List

// List
fun main() {
    listOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
        .map { mapNumber(it) }
        .find { filterNumber(it) }
        .let { print(it) }
}

fun mapNumber(i: Int): Int {
    print("m$i ")
    return i * i
}

fun filterNumber(i: Int): Boolean {
    println("f$i ")
    return i >= 10
}
// 결과
m1 m2 m3 m4 m5 m6 m7 m8 m9 m10 f1 
f4 
f9 
f16 
16

 

하지만 데이터가 여러 번 사용될 경우 매번 결과를 다시 계산할 필요가 없어 Sequence보다 더 효율적일 수 있다

fun main() {
    val l = listOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
        .map { mapNumber(it) }
    println(l)
    println(l.find { it > 10 })
    println(l.find { it > 10 })
    println(l.find { it > 10 })
}
// 결과
m1 m2 m3 m4 m5 m6 m7 m8 m9 m10 [1, 4, 9, 16, 25, 36, 49, 64, 81, 100]
16
16
16

 

 

 

콜드 스트림 데이터 (Cold Stream Data)

구독자가 요청할 때만 데이터 발행

구독자마다 새로운 데이터 스트림 생성

필요할 때만 메모리를 사용해 최소한의 연산을 수행하기에 메모리를 적게 사용

최종 연산에서 값이 필요할 때 처리

Sequence, Stream, Flow, RxJava 등등

 

 

Sequence

sequence는 지연 계산을 사용하고, 요청할 때만 계산

중간 연산은 이전에 만든 sequence에 새로운 연산을 첨가

최종 연산이 모든 작업을 실행 (find, toList)

fun main() {
    val sequence = sequence {
        repeat(3) {
            yield("User$it")
            println("S: Added User")
        }
    }
    // map 중간 연산
    val mapSequence = sequence.map {
        println("S: Processing")
        "Processed $it"
    }
}
// 결과
// 요청이 없기에 아무것도 출력되지 않는다

 

중간 연산 결과를 저장하지 않는 Sequence

fun main() {
    sequenceOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
        .map { mapNumber(it) }
        .find { filterNumber(it) }
        .let { print(it) }
}

fun mapNumber(i: Int): Int {
    print("m$i ")
    return i * i
}

fun filterNumber(i: Int): Boolean {
    println("f$i ")
    return i >= 10
}
// 결과
m1 f1 
m2 f4 
m3 f9 
m4 f16 
16

 

하지만 데이터를 여러 번 사용해야 할 경우에는 

매번 다시 계산을 하게 되어 List가 더 효율적일 수 있다

fun main() {
    val s = sequenceOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
        .map { mapNumber(it) }
    println(s.toList())
    println(s.find { it > 10 })
    println(s.find { it > 10 })
    println(s.find { it > 10 })
}
// 결과
m1 m2 m3 m4 m5 m6 m7 m8 m9 m10 [1, 4, 9, 16, 25, 36, 49, 64, 81, 100]
m1 m2 m3 m4 16
m1 m2 m3 m4 16
m1 m2 m3 m4 16

 

 

 

Hot Channel vs Cold Flow

Hot channel

채널은 핫 스트림이라 값을 바로 계산

별도의 코루틴에서 계산을 수행하기에 produce는 CoroutineScope의 확장 함수로 정의

소비자와 상관없이 값을 생성

 

원소는 한 번만 받을 수 있다

첫 번째 수신자가 모든 원소를 소비하고 나면

두 번째 소비자는 채널이 비어 있고 이미 닫혀 있다는 걸 발견하고 어떤 원소도 받을 수 없다

suspend fun main(): Unit = coroutineScope {
    val channel = makeChannel()
    delay(1000)
    println("Calling Channel...")
    for (value in channel) {
        println(value)
    }
    println("Consuming again...")
    for (value in channel) {
        println(value)
    }
}

private fun CoroutineScope.makeChannel() = produce {
    println("Channel Started")
    for (i in 1..3) {
        delay(1000)
        send(i)
    }
}
// 결과
Channel Started
(1초 후)
Calling Channel...
1
(1초 후)
2
(1초 후)
3
Consuming again...

 

 

Cold Flow

Flow는 콜드 데이터 소스이기에 값이 필요할 때만 생성

최종 연산(collect 같은)이 호출될 때 원소가 어떻게 생성되어야 하는지 정의

flow 빌더는 빌더를 호출한 최종 연산의 scope에서 실행

각 최종 연산은 처음부터 데이터를 처리하기 시작

suspend fun main(): Unit = coroutineScope {
    val flow = makeFlow()
    delay(1000)
    println("Calling Flow...")
    flow.collect { value -> println(value) }
    println("Consuming again...")
    flow.collect { value -> println(value) }
}

private fun makeFlow() = flow {
    println("Flow started")
    for (i in 1..3) {
        delay(1000)
        emit(i)
    }
}
// 결과
(1초 후)
Calling Flow...
Flow started
(1초 후)
1
(1초 후)
2
(1초 후)
3
Consuming again...
Flow started
(1초 후)
1
(1초 후)
2
(1초 후)
3

'코루틴' 카테고리의 다른 글

[코루틴] Flow 원리  (0) 2024.11.28
[코루틴] Flow란 - (1)  (0) 2024.11.24
[코루틴] select  (0) 2024.11.18
[코루틴] Channel (채널)  (0) 2024.11.18
[코루틴] 코루틴의 동작 방식  (0) 2024.09.29