본문 바로가기

코루틴

[코루틴] async & Deferred

async란?

- 코루틴 빌더

- launch 빌더와 비슷, async는 결과값을 담기 위해 Deferred<T>를 반환

// async. Deferred 반환
public fun <T> CoroutineScope.async(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> T
): Deferred<T> {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyDeferredCoroutine(newContext, block) else
        DeferredCoroutine<T>(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}

// launch. Job을 반환
public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}

 

async 사용해 결과값 수신하기

- Deferred 객체 반환

- 명시적인 타입 설정

- Deferred는 미래의 어느 시점에 결과값이 반환될 수 있음을 표현하는 코루틴 객체

- 결과값이 필요하다면 값이 수신될 때까지 대기

- await 함수를 사용해 결과값 수신 가능

// String 타입 설정
fun main() = runBlocking<Unit> {
    val networkDeferred: Deferred<String> = async(Dispatchers.IO) {
        delay(1000L)
        return@async "Dummy Response"
    }
    // await시 Deferred 코루틴이 실행 완료될 때까지 runBlocking 코루틴을 일시 중단
    // 코루틴이 실행 완료되면 결과값 반환하고 코루틴 재개
    // Job join 함수와 유사하게 동작
    val result = networkDeferred.await()
    
    println(result)
}
// 결과 : Dummy Response

 

Deferred에 대해

- 특수한 형태의 Job

- Job 인터페이스 확장

- Job 함수, 프로퍼티 사용 가능 (join, isActive, isCancelled ...)

// Deferred Interface. Job을 wrapping. T 반환
@OptIn(ExperimentalSubclassOptIn::class)
@SubclassOptInRequired(markerClass = InternalForInheritanceCoroutinesApi::class)
public interface Deferred<out T> : Job {
    public suspend fun await(): T
    public val onAwait: SelectClause1<T>

    @ExperimentalCoroutinesApi
    public fun getCompleted(): T

    @ExperimentalCoroutinesApi
    public fun getCompletionExceptionOrNull(): Throwable?
}

 

Job 객체 join 사용

fun main() = runBlocking<Unit> {
    val networkDeferred: Deferred<String> = async(Dispatchers.IO) {
        delay(1000L)
        return@async "Dummy Response"
    }
    networkDeferred.join()
    printJobState(networkDeferred)
}

fun printJobState(job: Job) {
    println(
        "Job State\n" +
            "isActive >> ${job.isActive}\n" +
            "isCancelled >> ${job.isCancelled}\n" +
            "isCompleted >> ${job.isCompleted}\n"
    )
}
// 결과
Job State
isActive >> false
isCancelled >> false
isCompleted >> true

 

 

복수의 코루틴으로부터 결과값 수신하기

여러 비동기 작업으로부터 결과값을 반환받아 합쳐야 할 때 await, awaitAll 함수 사용

 

주의할 점

함수의 호출 시점에 따라 결과가 달라진다

 

await 함수 사용

비효율적인 사용

/**
* 2개의 코루틴이 동시에 실행되지 않고 순차적으로 처리해 비효율적
*/ 
fun main() = runBlocking<Unit> {
    val startTime = System.currentTimeMillis()
    val deferredGroup1: Deferred<List<String>> = async(Dispatchers.IO) {
        delay(1000L)
        return@async listOf("Park", "Lee")
    }
    // group1 결과가 수신될 때까지 대기
    val group1 = deferredGroup1.await()

    val deferredGroup2: Deferred<List<String>> = async(Dispatchers.IO) {
        delay(1000L)
        return@async listOf("Kim")
    }
    // group2 결과가 수신될 때까지 대기
    val group2 = deferredGroup2.await()

    println("${getElapsedTime(startTime)}, Group : ${listOf(group1, group2)}")
}

// 결과
지난 시간: 2053ms, Group : [[Park, Lee], [Kim]]

fun getElapsedTime(startTime: Long): String =
    "지난 시간: ${System.currentTimeMillis() - startTime}ms"

 

 

효율적인 사용

/**
* deferredGroup1과 deferredGroup2가 동시에 실행
*/
fun main() = runBlocking<Unit> {
    val startTime = System.currentTimeMillis()

    val deferredGroup1: Deferred<List<String>> = async(Dispatchers.IO) {
        delay(1000L)
        return@async listOf("Park", "Lee")
    }
    val deferredGroup2: Deferred<List<String>> = async(Dispatchers.IO) {
        delay(1000L)
        return@async listOf("Kim")
    }
    val group1 = deferredGroup1.await()
    val group2 = deferredGroup2.await()

    println("${getElapsedTime(startTime)}, Group : ${listOf(group1, group2)}")
}
// 결과
지난 시간: 1027ms, Group : [[Park, Lee], [Kim]]

 

 

awaitAll 함수 사용

 

awaitAll 함수

public suspend fun <T> awaitAll(vararg deferreds: Deferred<T>): List<T>

 

사용

fun main() = runBlocking<Unit> {
    val startTime = System.currentTimeMillis()

    val deferredGroup1: Deferred<List<String>> = async(Dispatchers.IO) {
        delay(1000L)
        return@async listOf("Park", "Lee")
    }
    val deferredGroup2: Deferred<List<String>> = async(Dispatchers.IO) {
        delay(1000L)
        return@async listOf("Kim")
    }
    // 대상이 된 코루틴들의 실행이 완료될 때까지 일시 중단
    val results: List<List<String>> = awaitAll(deferredGroup1, deferredGroup2)
    println("${getElapsedTime(startTime)}, Group : $results")
}
// 결과
지난 시간: 1041ms, Group : [[Park, Lee], [Kim]]

// Collection Extension
public suspend fun <T> Collection<Deferred<T>>.awaitAll(): List<T>

val results: List<List<String>> = listOf(deferredGroup1, deferredGroup2).awaitAll()

 

 

WithContext

withContext로 async-await 대체

 

withContext 함수

- CoroutineContext 객체를 사용해 block 람다식 실행하고 결과 반환

- 람다식이 모두 실행되면 기존 CoroutineContext를 사용해 코루틴 재개

- async-await 연속 실행했을 때의 동작과 비슷

- async-await 보다 깔끔한 코드

public suspend fun <T> withContext(
    context: CoroutineContext,
    block: suspend CoroutineScope.() -> T
): T {

 

사용

 

fun main() = runBlocking<Unit> {
    val result: String = withContext(Dispatchers.IO) {
        delay(1000L)
        return@withContext "Dummy Response"
    }
    println(result)
}
// 결과
Dummy Response

 

withContext의 동작 방식

- 새로운 코루틴을 생성해 작업을 처리

- 실행 중이던 코루틴을 그대로 유지한 채 코루틴의 실행 환경(Context)만 변경해 작업을 처리

public suspend fun <T> withContext(
    context: CoroutineContext,
    block: suspend CoroutineScope.() -> T
): T {
    contract {
        callsInPlace(block, InvocationKind.EXACTLY_ONCE)
    }
    return suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
        // context 결합
        val oldContext = uCont.context
        val newContext = oldContext.newCoroutineContext(context)
        newContext.ensureActive()
        
        // 최적화1. same Context
        if (newContext === oldContext) {
            val coroutine = ScopeCoroutine(newContext, uCont)
            return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
        }
        // 최적화2. same Dispatcher
        if (newContext[ContinuationInterceptor] == oldContext[ContinuationInterceptor]) {
            val coroutine = UndispatchedCoroutine(newContext, uCont)
            withCoroutineContext(coroutine.context, null) {
                return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
            }
        }
        // 새로운 dispatcher 사용
        val coroutine = DispatchedCoroutine(newContext, uCont)
        block.startCoroutineCancellable(coroutine, coroutine)
        coroutine.getResult()
    }
}

 

/**
* 실행하는 스레드는 다르지만, 코루틴은 같다
*/
fun main() = runBlocking<Unit>(CoroutineName("Coroutine#1")) {
    println("[${Thread.currentThread().name}] [${currentCoroutineContext()[CoroutineName]}] runBlocking 블록 실행")
    withContext(Dispatchers.IO) {
        println("[${Thread.currentThread().name}] [${currentCoroutineContext()[CoroutineName]}] withContext 블록 실행")
    }
}
// 결과
[main] [CoroutineName(Coroutine#1)] runBlocking 블록 실행
[DefaultDispatcher-worker-1] [CoroutineName(Coroutine#1)] withContext 블록 실행

 

 

withContext가 호출되면

- Context Switching 발생 (코루틴의 실행 환경이 withContext 함수의 context 인자 값으로 변경되어 실행)

 

async-await & withContext 차이

- async-await은 새로운 코루틴 생성. await 함수를 통해 순차 처리가 되어 동기적으로 실행

- withContext는 코루틴 유지. 실행 환경만 변경되기 때문에 동기적으로 실행

 

withContext 사용 시 주의할 점

- 하나의 코루틴에서 withContext 함수가 여러 번 호출되면 순차적으로 실행

- 여러 작업 병렬 처리 시 withContext 주의

- withContext가 새로운 코루틴을 생성하지 않음을 명심하고 사용

/**
* main 함수는 runBlocking을 통해 생성된 하나의 코루틴 보유
* helloString은 기존 코루틴 유지. 실행 스레드만 변경. 1초 대기 후 반환.
* worldSstring도 마찬가지
*/
fun main() = runBlocking<Unit> {
    val startTime = System.currentTimeMillis()
    val helloString = withContext(Dispatchers.IO) {
        delay(1000L)
        return@withContext "Hello"
    }
    val worldString = withContext(Dispatchers.IO) {
        delay(1000L)
        return@withContext "World"
    }
    println("[${getElapsedTime(startTime)}] $helloString $worldString")
}
// 결과
[지난 시간: 2028ms] Hello World

 

 

withContext를 사용한 코루틴 스레드 전환

하나의 코루틴에서 여러 스레드로 전환

private val myDispatcher1 = newSingleThreadContext("MyThread1")
private val myDispatcher2 = newSingleThreadContext("MyThread2")

// runBlocking을 통해 하나의 코루틴 생성
fun main() = runBlocking<Unit> {
    println("[${Thread.currentThread().name}] 코루틴 실행")
    // 코루틴의 실행 스레드 전환
    withContext(myDispatcher1) {
        println("[${Thread.currentThread().name}] 코루틴 실행")
        // 코루틴의 실행 스레드 전환
        withContext(myDispatcher2) {
            println("[${Thread.currentThread().name}] 코루틴 실행")
        }
        println("[${Thread.currentThread().name}] 코루틴 실행")
    }
    println("[${Thread.currentThread().name}] 코루틴 실행")
}
// 결과
[main] 코루틴 실행
[MyThread1] 코루틴 실행
[MyThread2] 코루틴 실행
[MyThread1] 코루틴 실행
[main] 코루틴 실행

'코루틴' 카테고리의 다른 글

[코루틴] 일시 중단 함수 - suspend  (0) 2024.09.18
[코루틴] 예외 처리  (0) 2024.09.16
[코루틴] 구조화된 동시성  (0) 2024.09.08
[코루틴] CoroutineContext  (0) 2024.08.31
CoroutineDispatcher 란 무엇인가  (0) 2024.08.25