본문 바로가기

코루틴

[코루틴] Channel (채널)

 

Channel (채널)

Channel은 송신자(SendChannel을 통해)와 수신자(ReceiveChannel을 통해) 간의 논블로킹 통신을 위한 기본 요소

Channel은 Java의 BlockingQueue와 유사하지만 블로킹 연산 대신 일시 중단 연산을 사용하며 닫을 수 있다는 차이가 있다

public interface Channel<E> : SendChannel<E>, ReceiveChannel<E> {
    public companion object Factory {
        // 무제한 용량의 버퍼를 가진 채널. send가 중단되지 않음
        public const val UNLIMITED: Int = Int.MAX_VALUE
        
        // 랑데부 채널(버퍼가 없는 채널 - 용량 0)
        // 송신자와 수신자가 만날 때만 원소를 교환
        public const val RENDEZVOUS: Int = 0
        
        // conflated 채널 설정. 버퍼 크기가 1. 새로운 원소가 이전 원소를 대체
        public const val CONFLATED: Int = -1
        
        // 특정 용량 크기 또는 기본 버퍼 용량(64)을 가진 buffered 채널
        // VM에서 [DEFAULT_BUFFER_PROPERTY_NAME]을 설정하여 이를 변경할 수 있습니다.
        // 일시 중단되지 않는 채널의 경우, 용량이 1인 버퍼가 사용됨
        public const val BUFFERED: Int = -2
        
        // 내부 사용 전용
        internal const val OPTIONAL_CHANNEL = -3
        // 팩토리 함수에서 [BUFFERED]가 매개변수로 사용될 때 기본 채널 용량을 정의하는 프로퍼티의 이름
        
        public const val DEFAULT_BUFFER_PROPERTY_NAME: String = "kotlinx.coroutines.channels.defaultBuffer"
        internal val CHANNEL_DEFAULT_CAPACITY = systemProp(DEFAULT_BUFFER_PROPERTY_NAME,
            64, 1, UNLIMITED - 1
        )
    }
}

 

 

SendChannel 인터페이스

원소를 보내거나 채널을 닫을 때 사용

send 함수는 일시중단 함수

  • 채널 버퍼가 가득 찼거나 empty일 경우 일시 중단

trySend 함수는 ChannelResult 반환

ChannelResult는 success, fail에 대한 정보를 담고 있음

public interface SendChannel<in E> {
    // close 호출로 채널이 닫혔는지 여부를 반환
    // true인 경우 send 호출 시 예외 발생
    @ExperimentalCoroutinesApi
    public val isClosedForSend: Boolean

    // 지정된 요소를 채널에 전송, 채널 버퍼가 가득 찼거나 없는 경우 호출자를 일시중단
    // 채널이 isClosedForSend인 경우 예외 발생
    // 일시중단 후 채널이 닫혀도 send는 중단되지 않음
    // 취소 가능하며, Job 취소 시 CancellationException 발생
    // suspend하지 않고 채널로 전송을 시도하려면 trySend 사용
    public suspend fun send(element: E)

    public val onSend: SelectClause2<E, SendChannel<E>>
    
    // 용량 제한을 위반하지 않는 경우, 지정된 element를 채널에 즉시 추가하고 성공 결과 반환
    // send 함수의 동기 버전. send가 일시 중단되거나 예외를 throw하는 상황에서는 실행을 포기
    public fun trySend(element: E): ChannelResult<Unit>

    // 채널 close
    // 이 함수가 호출된 직후, isClosedForSend는 true를 반환하기 시작
    // 하지만, ReceiveChannel의 isClosedForReceive는 이전에 전송된 모든 요소들이 수신된 후에야 true를 반환하기 시작
    // cause 파라미터 없이 닫힌 채널은 send, receive시 Exception 발생
    // cause가 null이 아니면서 닫힌 채널은 실패한 채널. 실패한 채널에서 송신이나 수신을 시도하면 지정된 cause 발생
    public fun close(cause: Throwable? = null): Boolean   
}

 

 

ReceiveChannel 인터페이스

원소를 받을 때 사용

receive 함수는 일시중단 함수

  • 채널이 empty일 경우 일시중단

tryReceive 함수는 ChannelResult 반환

ChannelResult는 success, fail에 대한 정보를 담고 있음

public interface ReceiveChannel<out E> {
    // SendChannel에서 close 호출로 채널이 닫히고 이전에 전송된 모든 항목들이 이미 수신되었다면 true를 반환
    @ExperimentalCoroutinesApi
    public val isClosedForReceive: Boolean

    // 채널이 빈 경우(No Elements) true 리턴, receive 시도가 일시중단 될 것을 의미
    // 채널이 isClosedForReceive인 경우 false 리턴
    @ExperimentalCoroutinesApi
    public val isEmpty: Boolean

    // 채널이 비어있지 않을 때 요소를 가져와 제거, 채널이 비어있으면 호출자를 일시중단
    // 채널이 isClosedForReceive인 경우 ClosedReceiveChannelException 발생
    // 취소 가능한 함수이며, Job이 취소되면 CancellationException 발생
    public suspend fun receive(): E

    public val onReceive: SelectClause1<E>

    // receive와 유사하지만 ChannelResult로 결과를 래핑하여 반환
    // 채널이 닫힌 경우 close 원인과 함께 결과 반환
    public suspend fun receiveCatching(): ChannelResult<E>

    // 비동기로 채널에서 요소를 가져오려 시도, 성공 시 successful 결과 반환
    // 채널이 비어있으면 failed 결과 반환, 채널이 닫혔으면 closed 결과 반환
    public fun tryReceive(): ChannelResult<E>

    // for 루프 사용해서 채널에서 element 수신하기 위한 Iterator 리턴
    // 채널이 정상적으로 닫히면 반복이 완료됨
    public operator fun iterator(): ChannelIterator<E>
    
    // 채널의 나머지 요소 수신을 취소, 선택적으로 취소 원인을 지정할 수 있음
    // 채널을 닫고 버퍼에 있는 모든 전송된 요소를 제거
    // 호출 직후 isClosedForReceive와 isClosedForSend가 true 반환 시작
    // 이후 송수신 시도는 CancellationException 발생
    public fun cancel(cause: CancellationException? = null)
}

 

 

 

기본적인 사용

(채널을 Close하거나 예외처리가 되어있지 않은 불완전한 코드)

suspend fun main(): Unit = coroutineScope {
    val channel = Channel<Int>()
    launch {
        repeat(5) { index ->
            delay(1000L)
            println("Producing next one")
            channel.send(index * 2)
        }
    }

    launch {
        repeat(5) {
            val received = channel.receive()
            println(received)
        }
    }
}

 

 

Produce를 사용한 개선된 코드

produce는 

  • ReceiveChannel 리턴
  • 코루틴이 종료되면 채널 close (안전, 편리)
suspend fun main(): Unit = coroutineScope {
    val channel = produce {
        repeat(5) { index ->
            println("Producing next one")
            delay(1000)
            send(index * 2)
        }
    }

    for (element in channel) {
        println(element)
    }
}
// 결과
Producing next one
0
Producing next one
2
Producing next one
4
Producing next one
6
Producing next one
8

 

Produce 함수

// ReceiveChannel 타입 리턴
// 코루틴이 완료되거나 예외가 발생하면 자동으로 채널을 닫음
// CoroutineScope의 자식으로 동작하여 부모 코루틴이 취소되면 함께 취소됨
@ExperimentalCoroutinesApi
public fun <E> CoroutineScope.produce(
    context: CoroutineContext = EmptyCoroutineContext,
    capacity: Int = 0,
    @BuilderInference block: suspend ProducerScope<E>.() -> Unit
): ReceiveChannel<E> =
    produce(context, capacity, BufferOverflow.SUSPEND, CoroutineStart.DEFAULT, onCompletion = null, block = block)
    
    
 // ProducerScope
 // (CoroutineScope, SendChannel 구현) 코루틴 스코프의 기능과 채널 전송 기능을 모두 사용 가능
 public interface ProducerScope<in E> : CoroutineScope, SendChannel<E> {
    public val channel: SendChannel<E>
}

 

 

채널 타입

  • 무제한 (Unlimited)
  • 버퍼 (Buffered)
  • 랑데뷰 (Rendezvous)
  • 융합 (Conflated)

 

무제한 (Unlimited)

모든 원소를 받고 수신자가 하나씩 가져가게 한다

suspend fun main(): Unit = coroutineScope {
    val channel = produce(capacity = Channel.UNLIMITED) {
        repeat(5) { index ->
            send(index * 2)
            delay(100)
            println("Sent")
        }
    }

    delay(1000)
    for (element in channel) {
        println(element)
        delay(1000)
    }
}
// 결과
Sent
Sent
Sent
Sent
Sent
0
2
4
6
8

 

 

버퍼 (Buffered)

버퍼가 가득 찰 때까지 원소가 생성하고 이후 생성자는 수신자가 원소를 소비하기를 기다린다

suspend fun main(): Unit = coroutineScope {
    val channel = produce(capacity = 3) {
        repeat(5) { index ->
            send(index * 2)
            delay(100)
            println("Sent")
        }
    }

    delay(1000)
    for (element in channel) {
        println(element)
        delay(1000)
    }
}
// 결과
Sent
Sent
Sent
0
Sent
2
Sent
4
6
8

 

 

랑데뷰 (Rendezvous)

송신자는 매번 수신자를 기다린다

suspend fun main(): Unit = coroutineScope {
    val channel = produce(capacity = Channel.RENDEZVOUS) {
        repeat(5) { index ->
            send(index * 2)
            delay(100)
            println("Sent")
        }
    }

    delay(1000)
    for (element in channel) {
        println(element)
        delay(1000)
    }
}
// 결과
0
Sent
2
Sent
4
Sent
6
Sent
8
Sent

 

 

융합 (Conflated)

새로운 원소가 이전 원소를 대체하여 최근 원소만 받는다

용량이 1이며 onBufferOverflow가 Drop Oldest

suspend fun main(): Unit = coroutineScope {
    val channel = produce(capacity = Channel.CONFLATED) {
        repeat(5) { index ->
            send(index * 2)
            delay(100)
            println("Sent")
        }
    }

    delay(1000)
    for (element in channel) {
        println(element)
        delay(1000)
    }
}
// 결과
Sent
Sent
Sent
Sent
Sent
8

 

 

Channel 파라미터 살펴보기

기본적으로 우리가 코드에서 Channel을 만들 때 capacity, onBufferOverflow, onUndeliveredElement 3가지를 커스텀할 수 있다

public fun <E> Channel(
    capacity: Int = RENDEZVOUS,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
    onUndeliveredElement: ((E) -> Unit)? = null
): Channel<E>

 

1. onBufferOverflow

버퍼가 모두 찼을 때의 행동 정의

public enum class BufferOverflow {
    // send 함수 중단 
    SUSPEND,
    // 가장 오래된 원소 제거
    DROP_OLDEST,
    // 가장 최근 원소 제거
    DROP_LATEST
}

 

예시

suspend fun main(): Unit = coroutineScope {
    val channel = Channel<Int>(capacity = 2, onBufferOverflow = BufferOverflow.DROP_OLDEST)

    launch {
        repeat(5) { index ->
            channel.send(index * 2)
            delay(100)
            println("Sent")
        }
        channel.close()
    }
    delay(1000)
    for (element in channel) {
        println(element)
        delay(1000)
    }
}
// 결과
Sent
Sent
Sent
Sent
Sent
6
8

 

 

2. onUndeliveredElement

원소가 어떠한 이유로 처리되지 않을 때 호출 (대부분 채널이 닫히거나 취소되었음을 의미)

주로 채널에서 보낸 자원을 닫을 때 사용된다

suspend fun main(): Unit = coroutineScope {
    val channel = Channel<Resource>(
        capacity = capacity,
        onUndeliveredElement = { resource -> resource.close() }
    )
}

 

 

 

Channel의 중요한 패턴

 

팬아웃(Fan-out)

여러 개의 코루틴이 하나의 채널로부터 원소를 받을 수 있다

하나의 생산자(Producer)여러 소비자(Consumer)에게 작업을 분배하는 패턴

채널은 코루틴들을 FIFO 큐로 가지고 있어 순차적으로 분배

 

부하 분산 / 처리량 향상 / 병렬 처리 등의 장점

 

suspend fun main(): Unit = coroutineScope {
    val channel = produceNumbers()
    repeat(3) { id ->
        delay(10)
        launchProcessor(id, channel)
    }
}

// producer(생산자) : 숫자 생성하여 채널에 send
fun CoroutineScope.produceNumbers() = produce<Int> {
    repeat(10) {
        delay(100)
        send(it)
    }
}

// consumer(소비자) : 채널에서 숫자를 받아 처리
fun CoroutineScope.launchProcessor(
    id: Int,
    channel: ReceiveChannel<Int>
) = launch {
    for (msg in channel) {
        println("#$id received $msg")
    }
}
// 결과
#0 received 0
#1 received 1
#2 received 2
#0 received 3
#1 received 4
#2 received 5
#0 received 6
#1 received 7
#2 received 8
#0 received 9

 

 

 

팬인(Fan-in)

여러 개의 코루틴이 하나의 채널로 원소를 전송할 수 있다

여러 생산자(Producer)하나의 채널을 통해 단일 소비자(Consumer)에게 데이터를 전송하는 패턴

데이터 통합할 때 유용

// 두 생산자를 실행하고 하나의 소비자가 데이터를 수신
fun main(): Unit = runBlocking {
    val channel = Channel<String>()
    launch { sendString(channel, "send1", 200L) }
    launch { sendString(channel, "send2", 500L) }
    
    // 하나의 소비자가 데이터 수신
    repeat(50) {
        println(channel.receive())
    }
    coroutineContext.cancelChildren()
}

// 생산자 함수: 지정된 간격으로 문자열을 채널에 전송
suspend fun sendString(
    channel: SendChannel<String>,
    text: String,
    time: Long
) {
    while (true) {
        delay(time)
        channel.send(text)
    }
}
// 결과
send1
send1
send2
send1
send1
send2
send1
send1
send1
send2
...

 

 

파이프라인 (pipeline)

데이터가 순차적인 단계를 거쳐 처리되는 구조

한 채널로부터 받은 원소를 다른 채널로 전송하는 것을 의미

suspend fun main(): Unit = coroutineScope {
    val numbers = numbers()
    val squared = square(numbers)
    for (num in squared) {
        println(num)
    }
}

fun CoroutineScope.numbers(): ReceiveChannel<Int> =
    produce {
        repeat(3) { num ->
            send(num + 1)
        }
    }

fun CoroutineScope.square(numbers: ReceiveChannel<Int>) =
    produce {
        for (num in numbers) {
            send(num * num)
        }
    }
    
// 결과
1
4
9