본문 바로가기

코루틴

CoroutineDispatcher 란 무엇인가

 
코틀린 코루틴의 정석 책을 참고하여 CoroutineDispatcher에 대해 알아보자.
 
 

CoroutineDispatcher 란 무엇일까?

Coroutine(코루틴) + Dispatcher(보내다)로 코루틴을 보내는 주체를 말한다.
그렇다면 코루틴을 어디에다 보내는 것일까? 스레드(Thread)로 보낸다.
 
즉, ThreadPool 안의 스레드에게 코루틴을 보내 실행시키는 역할을 담당한다.
 
코루틴 Dispatcher는 아래와 같이 CoroutineContext의 Element임을 알 수 있다.

public abstract class CoroutineDispatcher : AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor

 
CoroutineContext는 Element들의 집합이다. 각 Element는 고유한 key를 가진다.

@SinceKotlin("1.3")
public interface CoroutineContext {
    // key에 따른 context의 Element 반환
    public operator fun <E : Element> get(key: Key<E>): E?
    // 초기값을 시작으로 주어진 람다 함수를 이용하여 Context Element들을 병합한 후 결과 반환
    public fun <R> fold(initial: R, operation: (R, Element) -> R): R
    // 현재 Context와 주어진 다른 Context가 가지는 Element들을 모두 포함하는 CoroutineContext 반환
    public operator fun plus(context: CoroutineContext): CoroutineContext =
        if (context === EmptyCoroutineContext) this else // fast path -- avoid lambda creation
            context.fold(this) { acc, element ->
                ...
            }
    // Context에서 해당 key를 갖는 Element들을 제외한 새로운 context 반환
    public fun minusKey(key: Key<*>): CoroutineContext
    public interface Key<E : Element>
}

 
CoroutineContext는 Job, CoroutineDispatcher, CoroutineName, CoroutineExceptionHandler로 이루어져 있다.
 
 

CoroutineDispatcher 동작

CoroutineDispatcher 객체는 실행되야 하는 작업을 저장하는 작업 대기열을 가진다.
ThreadPool은 CoroutineDisptacher가 사용할 수 있는 스레드를 포함한다.

작업 대기열과 스레드풀

 
 
1. CoroutineDispatcher에 코루틴 실행이 요청되면 Dispatcher는 코루틴을 작업 대기열에 넣는다.
2. 사용할 수 있는 스레드가 있는지 확인한다.
3. 더 이상 사용할 수 있는 스레드가 없다면 대기한다. (ex. Coroutine3은 Thread-1, 2가 모두 사용중이기에 대기한다)

 
4. ThreadPool의 스레드 중 완료된 코루틴이 있다면 사용할 수 있는 스레드를 사용하도록 한다.

 
 
 

CoroutineDispatcher의 역할

코루틴의 실행을 관리하는 주체요청된 코루틴들을 작업 대기열에 넣고, 사용할 수 있는 스레드가 사용 가능한 상태라면 코루틴을 스레드로 보내 실행될 수 있게 만드는 역할.(option에 따라 다르다)
 
 

CoroutineDispatcher의 종류

1. Confined Dispatcher (제한된 디스패처)
2. Unconfined Dispatcher (무제한 디스패처)
 
Confined Dispatcher
사용할 수 있는 스레드나 스레드풀이 제한된 dispatcher
일반적으로 Dispatcher 객체별로 어떤 작업을 처리할지 미리 역할을 부여하고 이에 맞춰 실행을 요청하는 것이 효율적이다.
(I/O 작업 -> IO Dispatcher, CPU 연산 작업 -> CPU 연산 작업용 Dispatcher)
사용할 수 있는 스레드가 하나인 Coroutine Dispatcher 객체를 Single-Thread Dispatcher라고 부른다.
 
Unconfined Dispatcher
사용할 수 있는 스레드나 스레드풀이 제한되지 않은 dispatcher
실행 요청된 코루틴이 이전 코드가 실행되던 스레드에서 계속해서 실행되도록 한다.
매번 실행되는 스레드가 달라질 수 있고 특정 스레드의 제한이 아니라 서 무제한이라고 불린다.
 
Confined Dispatcher 생성
Single-Thread Dispatcher -> newSingleThreadContext 사용

@ExperimentalCoroutinesApi
@DelicateCoroutinesApi
public fun newSingleThreadContext(name: String): CloseableCoroutineDispatcher =
    newFixedThreadPoolContext(1, name)

 
 
Multi-Thread Dispatcher -> newFixedThreadPoolContext 사용

@DelicateCoroutinesApi
public actual fun newFixedThreadPoolContext(nThreads: Int, name: String): ExecutorCoroutineDispatcher {
    require(nThreads >= 1) { "Expected at least one thread, but $nThreads specified" }
    val threadNo = AtomicInteger()
    val executor = Executors.newScheduledThreadPool(nThreads) { runnable ->
        val t = Thread(runnable, if (nThreads == 1) name else name + "-" + threadNo.incrementAndGet())
        // 데몬 스레드로 생성
        t.isDaemon = true
        t
    }
    return executor.asCoroutineDispatcher()
}

 
Single, MultiThread 모두 코드 내부적으로 보면 newFixedThreadPoolContext를 사용한다.
 
 

CoroutineDispatcher를 사용해 코루틴 실행

1. launch 사용

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}

 
 
singleThread 사용해 launch에 사용

val dispatcher = newSingleThreadContext("SingleThread")
lifecycleScope.launch(context = dispatcher) { }

 
launch 함수의 context 인자로 Dispatcher 객체를 넘기면 된다.
launch 함수를 통해 만들어진 코루틴이 dispatcher 객체로 실행 요청된다.
 
multiThread 사용해 launch에 사용

val multiThreadDispatcher = newFixedThreadPoolContext(
    2,
    "MultiThread"
)
lifecycleScope.launch(context = multiThreadDispatcher) {}
lifecycleScope.launch(context = multiThreadDispatcher) {}

 
 

2. 부모 코루틴의 Dispatcher를 사용해 자식 코루틴 실행

코루틴은 구조화, 계층화를 통해 내부에서 새로운 코루틴을 실행할 수 있다.
Child 코루틴은 기본적으로 Parent 코루틴의 CoroutineDispatcher 객체를 상속받아 사용한다.

val multiThreadDispatcher = newFixedThreadPoolContext(
    2,
    "MultiThread"
)
// 부모 코루틴
lifecycleScope.launch(multiThreadDispatcher) { 
    // 자식 코루틴 
    launch {
        // 부모 코루틴의 Dispatcher 사용
    }
    launch {
        // 부모 코루틴의 Dispatcher 사용
    }
}

 
부모 코루틴은 자식 코루틴에게 실행 환경(Context)를 전달.
Child 코루틴에 Dispatcher가 지정되지 않았으면 Parent 코루틴의 Dispatcher를 사용한다.
 
 

미리 제공된 Coroutine Dispatcher

newFixedThreadPoolContext를 사용해 CoroutineDispatcher 객체를 만드는 것은 비효율적일 가능성이 높다.
특정 CoroutineDispatcher 객체에서만 사용되는 ThreadPool이 생성되며, 속한 스레드의 수가 적거나 많이 생성되어 비효율적으로 동작할 수 있기 때문이다.
 
코루틴 라이브러리는 몇 가지의 Coroutine Dispatcher를 기본적으로제공한다.
 
Dispatchers.Default
Dispatchers.Main
Dispatchers.Unconfined
Dispatchers.IO

public actual object Dispatchers {
    @JvmStatic
    public actual val Default: CoroutineDispatcher = DefaultScheduler
    @JvmStatic
    public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
    @JvmStatic
    public actual val Unconfined: CoroutineDispatcher = kotlinx.coroutines.Unconfined
    @JvmStatic
    public val IO: CoroutineDispatcher get() = DefaultIoScheduler
}

 
 

Dispatchers.IO

네트워크 요청이나 I/O 작업을 위한 CoroutineDispatcher
여러 I/O 작업을 동시에 수행할 수 있다.
시스템 속성 값이 없을 경우의 기본값을 설정하게 되는데, 기본적으로 사용할 수 있는 스레드의 수는 JVM에서 사용이 가능한 AVAILABLE_PROCESSORS (사용 가능한 프로세서 수를 나타내는 상수)와 64 중 더 큰 값을 선택한다.
 
기본적으로 IO 작업은 요청을 실행한 후 결과를 받기 전까지 스레드를 사용하지 않는다.
코루틴을 사용하면 작업 실행 후 스레드가 대기하는 동안 해당 스레드에서 다른 I/O 작업을 동시에 실행할 수 있어 효율적이다.

// 사용
lifecycleScope.launch(Dispatchers.IO) {}
// 결과
// DefaultDispatcher-worker가 붙은 스레드는 코루틴 라이브러리에서 제공하는 공유 스레드풀에 속한 스레드로
// Dispatchers.IO는 공유 스레드풀의 스레드를 사용할 수 있도록 구현되었기 때문
[DefaultDispatcher-worker-1] 코루틴 실행


// Dispatchers.IO
internal object DefaultIoScheduler : ExecutorCoroutineDispatcher(), Executor {

    private val default = UnlimitedIoScheduler.limitedParallelism(
        systemProp(
            IO_PARALLELISM_PROPERTY_NAME,
            64.coerceAtLeast(AVAILABLE_PROCESSORS)
        )
    )
    
    // 스레드의 수를 제한없이 만들어낼 수 있다
    override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher {
        return UnlimitedIoScheduler.limitedParallelism(parallelism, name)
    }
}

 
 

Dispatchers.Default

CPU를 많이 사용하는 연산 작업을 위한 CoroutineDisptacher
대용량 데이터를 처리해야하는 CPU 바운드 작업 시 사용한다.
기본적으로 CPU 바운드 작업은 작업을 하는 동안 스레드를 지속적으로 사용. (I/O와의 차이점)
코루틴을 사용하더라도 스레드가 지속적으로 사용되기 때문에 스레드 기반 작업을 사용했을 때와 큰 차이가 없다.

// 사용
lifecycleScope.launch(Dispatchers.Default) {
    println("[${Thread.currentThread().name}] 코루틴 실행")
}
// 결과
[DefaultDispatcher-worker-1] 코루틴 실행



// 스레드 풀 사용하고 코어 크기와 최대 크기를 별도로 지정하여 유연성을 제공
// Instance of Dispatchers.Default
internal object DefaultScheduler : SchedulerCoroutineDispatcher(
    CORE_POOL_SIZE, MAX_POOL_SIZE,
    IDLE_WORKER_KEEP_ALIVE_NS, DEFAULT_SCHEDULER_NAME
) {
    // Default의 일부 스레드만 사용해 특정 연산을 실행할 수 있도록
    override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher {
        parallelism.checkParallelism()
        if (parallelism >= CORE_POOL_SIZE) {
            return namedOrThis(name)
        }
        return super.limitedParallelism(parallelism, name)
    }
}

 
Default를 사용해 무겁고 오래 걸리는 연산을 처리하면 이를 위해 Default의 모든 스레드가 사용될 수 있다.
이 때는 다른 연산이 실행되지 못한다.
그래서 코루틴 라이브러리는 limitedParalleism 함수를 지원한다.

lifecycleScope.launch(Dispatchers.Default.limitedParallelism(2)) {
    repeat(8) {
        launch {
            println("[${Thread.currentThread().name}] 코루틴 실행")
        }
    }
}

// 결과
// 2개의 스레드만 사용해 실행
[DefaultDispatcher-worker-3] 코루틴 실행
[DefaultDispatcher-worker-2] 코루틴 실행
[DefaultDispatcher-worker-3] 코루틴 실행
[DefaultDispatcher-worker-3] 코루틴 실행
[DefaultDispatcher-worker-2] 코루틴 실행
[DefaultDispatcher-worker-3] 코루틴 실행
[DefaultDispatcher-worker-3] 코루틴 실행
[DefaultDispatcher-worker-3] 코루틴 실행

 
 
Dispatchers.IO와 Dispatchers.Default 모두 DefaultDispatcher-worker 이름을 사용한다.
이는 같은 스레드 풀을 사용한다는 것을 의미한다.
How? 두 Dispatcher 모두 코루틴 라이브러리의 공유 스레드풀을 사용하기 때문이다.
코루틴 라이브러리는 스레드의 생성과 관리를 효율적으로 할 수 있도록 Application 레벨의 공유 스레드풀을 제공한다.
스레드 풀 내에서 IO와 Default가 사용하는 스레드는 구분된다.

공유 스레드풀

 
newFixedThreadPoolContext로 만들어지는 Dispatcher는 자신만 사용할 수 있는 스레드풀을 생성하게 된다.
 
 

Dispatchers.Main

메인 스레드를 사용하기 위한 CoroutineDispatcher
UI가 있는 Application에서 Main Thread의 사용을 위해 사용되는 특별한 CoroutineDispatcher 객체이다.

// 사용
lifecycleScope.launch(Dispatchers.Main) {
    println("[${Thread.currentThread().name}] 코루틴 실행")
}
// 결과
[main] 코루틴 실행

 

'코루틴' 카테고리의 다른 글

[코루틴] 일시 중단 함수 - suspend  (0) 2024.09.18
[코루틴] 예외 처리  (0) 2024.09.16
[코루틴] 구조화된 동시성  (0) 2024.09.08
[코루틴] CoroutineContext  (0) 2024.08.31
[코루틴] async & Deferred  (0) 2024.08.30