Channel (채널)
Channel은 송신자(SendChannel을 통해)와 수신자(ReceiveChannel을 통해) 간의 논블로킹 통신을 위한 기본 요소
Channel은 Java의 BlockingQueue와 유사하지만 블로킹 연산 대신 일시 중단 연산을 사용하며 닫을 수 있다는 차이가 있다
public interface Channel<E> : SendChannel<E>, ReceiveChannel<E> {
public companion object Factory {
// 무제한 용량의 버퍼를 가진 채널. send가 중단되지 않음
public const val UNLIMITED: Int = Int.MAX_VALUE
// 랑데부 채널(버퍼가 없는 채널 - 용량 0)
// 송신자와 수신자가 만날 때만 원소를 교환
public const val RENDEZVOUS: Int = 0
// conflated 채널 설정. 버퍼 크기가 1. 새로운 원소가 이전 원소를 대체
public const val CONFLATED: Int = -1
// 특정 용량 크기 또는 기본 버퍼 용량(64)을 가진 buffered 채널
// VM에서 [DEFAULT_BUFFER_PROPERTY_NAME]을 설정하여 이를 변경할 수 있습니다.
// 일시 중단되지 않는 채널의 경우, 용량이 1인 버퍼가 사용됨
public const val BUFFERED: Int = -2
// 내부 사용 전용
internal const val OPTIONAL_CHANNEL = -3
// 팩토리 함수에서 [BUFFERED]가 매개변수로 사용될 때 기본 채널 용량을 정의하는 프로퍼티의 이름
public const val DEFAULT_BUFFER_PROPERTY_NAME: String = "kotlinx.coroutines.channels.defaultBuffer"
internal val CHANNEL_DEFAULT_CAPACITY = systemProp(DEFAULT_BUFFER_PROPERTY_NAME,
64, 1, UNLIMITED - 1
)
}
}
SendChannel 인터페이스
원소를 보내거나 채널을 닫을 때 사용
send 함수는 일시중단 함수
- 채널 버퍼가 가득 찼거나 empty일 경우 일시 중단
trySend 함수는 ChannelResult 반환
ChannelResult는 success, fail에 대한 정보를 담고 있음
public interface SendChannel<in E> {
// close 호출로 채널이 닫혔는지 여부를 반환
// true인 경우 send 호출 시 예외 발생
@ExperimentalCoroutinesApi
public val isClosedForSend: Boolean
// 지정된 요소를 채널에 전송, 채널 버퍼가 가득 찼거나 없는 경우 호출자를 일시중단
// 채널이 isClosedForSend인 경우 예외 발생
// 일시중단 후 채널이 닫혀도 send는 중단되지 않음
// 취소 가능하며, Job 취소 시 CancellationException 발생
// suspend하지 않고 채널로 전송을 시도하려면 trySend 사용
public suspend fun send(element: E)
public val onSend: SelectClause2<E, SendChannel<E>>
// 용량 제한을 위반하지 않는 경우, 지정된 element를 채널에 즉시 추가하고 성공 결과 반환
// send 함수의 동기 버전. send가 일시 중단되거나 예외를 throw하는 상황에서는 실행을 포기
public fun trySend(element: E): ChannelResult<Unit>
// 채널 close
// 이 함수가 호출된 직후, isClosedForSend는 true를 반환하기 시작
// 하지만, ReceiveChannel의 isClosedForReceive는 이전에 전송된 모든 요소들이 수신된 후에야 true를 반환하기 시작
// cause 파라미터 없이 닫힌 채널은 send, receive시 Exception 발생
// cause가 null이 아니면서 닫힌 채널은 실패한 채널. 실패한 채널에서 송신이나 수신을 시도하면 지정된 cause 발생
public fun close(cause: Throwable? = null): Boolean
}
ReceiveChannel 인터페이스
원소를 받을 때 사용
receive 함수는 일시중단 함수
- 채널이 empty일 경우 일시중단
tryReceive 함수는 ChannelResult 반환
ChannelResult는 success, fail에 대한 정보를 담고 있음
public interface ReceiveChannel<out E> {
// SendChannel에서 close 호출로 채널이 닫히고 이전에 전송된 모든 항목들이 이미 수신되었다면 true를 반환
@ExperimentalCoroutinesApi
public val isClosedForReceive: Boolean
// 채널이 빈 경우(No Elements) true 리턴, receive 시도가 일시중단 될 것을 의미
// 채널이 isClosedForReceive인 경우 false 리턴
@ExperimentalCoroutinesApi
public val isEmpty: Boolean
// 채널이 비어있지 않을 때 요소를 가져와 제거, 채널이 비어있으면 호출자를 일시중단
// 채널이 isClosedForReceive인 경우 ClosedReceiveChannelException 발생
// 취소 가능한 함수이며, Job이 취소되면 CancellationException 발생
public suspend fun receive(): E
public val onReceive: SelectClause1<E>
// receive와 유사하지만 ChannelResult로 결과를 래핑하여 반환
// 채널이 닫힌 경우 close 원인과 함께 결과 반환
public suspend fun receiveCatching(): ChannelResult<E>
// 비동기로 채널에서 요소를 가져오려 시도, 성공 시 successful 결과 반환
// 채널이 비어있으면 failed 결과 반환, 채널이 닫혔으면 closed 결과 반환
public fun tryReceive(): ChannelResult<E>
// for 루프 사용해서 채널에서 element 수신하기 위한 Iterator 리턴
// 채널이 정상적으로 닫히면 반복이 완료됨
public operator fun iterator(): ChannelIterator<E>
// 채널의 나머지 요소 수신을 취소, 선택적으로 취소 원인을 지정할 수 있음
// 채널을 닫고 버퍼에 있는 모든 전송된 요소를 제거
// 호출 직후 isClosedForReceive와 isClosedForSend가 true 반환 시작
// 이후 송수신 시도는 CancellationException 발생
public fun cancel(cause: CancellationException? = null)
}
기본적인 사용
(채널을 Close하거나 예외처리가 되어있지 않은 불완전한 코드)
suspend fun main(): Unit = coroutineScope {
val channel = Channel<Int>()
launch {
repeat(5) { index ->
delay(1000L)
println("Producing next one")
channel.send(index * 2)
}
}
launch {
repeat(5) {
val received = channel.receive()
println(received)
}
}
}
Produce를 사용한 개선된 코드
produce는
- ReceiveChannel 리턴
- 코루틴이 종료되면 채널 close (안전, 편리)
suspend fun main(): Unit = coroutineScope {
val channel = produce {
repeat(5) { index ->
println("Producing next one")
delay(1000)
send(index * 2)
}
}
for (element in channel) {
println(element)
}
}
// 결과
Producing next one
0
Producing next one
2
Producing next one
4
Producing next one
6
Producing next one
8
Produce 함수
// ReceiveChannel 타입 리턴
// 코루틴이 완료되거나 예외가 발생하면 자동으로 채널을 닫음
// CoroutineScope의 자식으로 동작하여 부모 코루틴이 취소되면 함께 취소됨
@ExperimentalCoroutinesApi
public fun <E> CoroutineScope.produce(
context: CoroutineContext = EmptyCoroutineContext,
capacity: Int = 0,
@BuilderInference block: suspend ProducerScope<E>.() -> Unit
): ReceiveChannel<E> =
produce(context, capacity, BufferOverflow.SUSPEND, CoroutineStart.DEFAULT, onCompletion = null, block = block)
// ProducerScope
// (CoroutineScope, SendChannel 구현) 코루틴 스코프의 기능과 채널 전송 기능을 모두 사용 가능
public interface ProducerScope<in E> : CoroutineScope, SendChannel<E> {
public val channel: SendChannel<E>
}
채널 타입
- 무제한 (Unlimited)
- 버퍼 (Buffered)
- 랑데뷰 (Rendezvous)
- 융합 (Conflated)
무제한 (Unlimited)
모든 원소를 받고 수신자가 하나씩 가져가게 한다
suspend fun main(): Unit = coroutineScope {
val channel = produce(capacity = Channel.UNLIMITED) {
repeat(5) { index ->
send(index * 2)
delay(100)
println("Sent")
}
}
delay(1000)
for (element in channel) {
println(element)
delay(1000)
}
}
// 결과
Sent
Sent
Sent
Sent
Sent
0
2
4
6
8
버퍼 (Buffered)
버퍼가 가득 찰 때까지 원소가 생성하고 이후 생성자는 수신자가 원소를 소비하기를 기다린다
suspend fun main(): Unit = coroutineScope {
val channel = produce(capacity = 3) {
repeat(5) { index ->
send(index * 2)
delay(100)
println("Sent")
}
}
delay(1000)
for (element in channel) {
println(element)
delay(1000)
}
}
// 결과
Sent
Sent
Sent
0
Sent
2
Sent
4
6
8
랑데뷰 (Rendezvous)
송신자는 매번 수신자를 기다린다
suspend fun main(): Unit = coroutineScope {
val channel = produce(capacity = Channel.RENDEZVOUS) {
repeat(5) { index ->
send(index * 2)
delay(100)
println("Sent")
}
}
delay(1000)
for (element in channel) {
println(element)
delay(1000)
}
}
// 결과
0
Sent
2
Sent
4
Sent
6
Sent
8
Sent
융합 (Conflated)
새로운 원소가 이전 원소를 대체하여 최근 원소만 받는다
용량이 1이며 onBufferOverflow가 Drop Oldest
suspend fun main(): Unit = coroutineScope {
val channel = produce(capacity = Channel.CONFLATED) {
repeat(5) { index ->
send(index * 2)
delay(100)
println("Sent")
}
}
delay(1000)
for (element in channel) {
println(element)
delay(1000)
}
}
// 결과
Sent
Sent
Sent
Sent
Sent
8
Channel 파라미터 살펴보기
기본적으로 우리가 코드에서 Channel을 만들 때 capacity, onBufferOverflow, onUndeliveredElement 3가지를 커스텀할 수 있다
public fun <E> Channel(
capacity: Int = RENDEZVOUS,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
onUndeliveredElement: ((E) -> Unit)? = null
): Channel<E>
1. onBufferOverflow
버퍼가 모두 찼을 때의 행동 정의
public enum class BufferOverflow {
// send 함수 중단
SUSPEND,
// 가장 오래된 원소 제거
DROP_OLDEST,
// 가장 최근 원소 제거
DROP_LATEST
}
예시
suspend fun main(): Unit = coroutineScope {
val channel = Channel<Int>(capacity = 2, onBufferOverflow = BufferOverflow.DROP_OLDEST)
launch {
repeat(5) { index ->
channel.send(index * 2)
delay(100)
println("Sent")
}
channel.close()
}
delay(1000)
for (element in channel) {
println(element)
delay(1000)
}
}
// 결과
Sent
Sent
Sent
Sent
Sent
6
8
2. onUndeliveredElement
원소가 어떠한 이유로 처리되지 않을 때 호출 (대부분 채널이 닫히거나 취소되었음을 의미)
주로 채널에서 보낸 자원을 닫을 때 사용된다
suspend fun main(): Unit = coroutineScope {
val channel = Channel<Resource>(
capacity = capacity,
onUndeliveredElement = { resource -> resource.close() }
)
}
Channel의 중요한 패턴
팬아웃(Fan-out)
여러 개의 코루틴이 하나의 채널로부터 원소를 받을 수 있다
하나의 생산자(Producer)가 여러 소비자(Consumer)에게 작업을 분배하는 패턴
채널은 코루틴들을 FIFO 큐로 가지고 있어 순차적으로 분배
부하 분산 / 처리량 향상 / 병렬 처리 등의 장점
suspend fun main(): Unit = coroutineScope {
val channel = produceNumbers()
repeat(3) { id ->
delay(10)
launchProcessor(id, channel)
}
}
// producer(생산자) : 숫자 생성하여 채널에 send
fun CoroutineScope.produceNumbers() = produce<Int> {
repeat(10) {
delay(100)
send(it)
}
}
// consumer(소비자) : 채널에서 숫자를 받아 처리
fun CoroutineScope.launchProcessor(
id: Int,
channel: ReceiveChannel<Int>
) = launch {
for (msg in channel) {
println("#$id received $msg")
}
}
// 결과
#0 received 0
#1 received 1
#2 received 2
#0 received 3
#1 received 4
#2 received 5
#0 received 6
#1 received 7
#2 received 8
#0 received 9
팬인(Fan-in)
여러 개의 코루틴이 하나의 채널로 원소를 전송할 수 있다
여러 생산자(Producer)가 하나의 채널을 통해 단일 소비자(Consumer)에게 데이터를 전송하는 패턴
데이터 통합할 때 유용
// 두 생산자를 실행하고 하나의 소비자가 데이터를 수신
fun main(): Unit = runBlocking {
val channel = Channel<String>()
launch { sendString(channel, "send1", 200L) }
launch { sendString(channel, "send2", 500L) }
// 하나의 소비자가 데이터 수신
repeat(50) {
println(channel.receive())
}
coroutineContext.cancelChildren()
}
// 생산자 함수: 지정된 간격으로 문자열을 채널에 전송
suspend fun sendString(
channel: SendChannel<String>,
text: String,
time: Long
) {
while (true) {
delay(time)
channel.send(text)
}
}
// 결과
send1
send1
send2
send1
send1
send2
send1
send1
send1
send2
...
파이프라인 (pipeline)
데이터가 순차적인 단계를 거쳐 처리되는 구조
한 채널로부터 받은 원소를 다른 채널로 전송하는 것을 의미
suspend fun main(): Unit = coroutineScope {
val numbers = numbers()
val squared = square(numbers)
for (num in squared) {
println(num)
}
}
fun CoroutineScope.numbers(): ReceiveChannel<Int> =
produce {
repeat(3) { num ->
send(num + 1)
}
}
fun CoroutineScope.square(numbers: ReceiveChannel<Int>) =
produce {
for (num in numbers) {
send(num * num)
}
}
// 결과
1
4
9
'코루틴' 카테고리의 다른 글
[코루틴] 핫 데이터와 콜드 데이터 (0) | 2024.11.23 |
---|---|
[코루틴] select (0) | 2024.11.18 |
[코루틴] 코루틴의 동작 방식 (0) | 2024.09.29 |
[코루틴] 무제한 디스패처 (Unconfined Dispatcher) (0) | 2024.09.29 |
[코루틴] CoroutineStart (0) | 2024.09.29 |