본문 바로가기

코루틴

[코루틴] 구조화된 동시성

구조화된 동시성 (Structured Concurrency)

비동기 작업을 구조화함으로써 프로그래밍을 안정적이고 예측할 수 있게 만드는 원칙

 

 

코루틴의 구조화된 동시성 특성

  • Parent - Child 관계
fun main() = runBlocking<Unit> {
    launch {		// 부모 코루틴
        launch {  }	// 자식 코루틴
    }
}
  • 부모 코루틴의 실행 환경(Context)가 자식 코루틴에게 상속
  • Job을 제어하는데 사용
  • 부모 코루틴이 취소되면 자식 코루틴도 취소
  • 부모 코루틴은 자식 코루틴이 완료될 때까지 대기
  • CoroutineScope을 사용해 코루틴이 실행되는 범위 제한 가능

 


 

 

1. 실행 환경 상속

기본적으로 부모 코루틴이 자식 코루틴을 생성하면 Parent CoroutineContext -> Child Coroutine 전달

fun main() = runBlocking<Unit> {
    val coroutineContext = newSingleThreadContext("MyThread") + CoroutineName("CoroutineA")
    launch(coroutineContext) {
        println("[${Thread.currentThread().name}, ${coroutineContext[CoroutineName]}] 부모 코루틴 실행")
        launch {
            println("[${Thread.currentThread().name}, ${coroutineContext[CoroutineName]}] 자식 코루틴 실행")
        }
    }
}

// 결과
[MyThread, CoroutineName(CoroutineA)] 부모 코루틴 실행
[MyThread, CoroutineName(CoroutineA)] 자식 코루틴 실행

 

상속되지 않는 경우

자식 코루틴 빌더에 새로운 CoroutineContext를 전달하는 경우 (재정의)

// Child Coroutine에 새로운 CoroutineName 할당
fun main() = runBlocking<Unit> {
    val parentContext = newSingleThreadContext("MyThread") + CoroutineName("CoroutineA")
    launch(parentContext) {
        println("[${Thread.currentThread().name}, ${coroutineContext[CoroutineName]}] 부모 코루틴 실행")
        launch(CoroutineName("ChildCoroutine")) {
            println("[${Thread.currentThread().name}, ${coroutineContext[CoroutineName]}] 자식 코루틴 실행")
        }
    }
}
// 결과
[MyThread, CoroutineName(CoroutineA)] 부모 코루틴 실행
[MyThread, CoroutineName(ChildCoroutine)] 자식 코루틴 실행

 

 


 

 

다른 Context Element들과 다르게 Job 객체는 상속되지 않고 코루틴 빌더 함수가 호출되면 새롭게 생성

 

2. 상속되지 않는 Job

모든 코루틴 빌더는 코루틴 추상체인 Job 객체를 새롭게 생성

Job은 코루틴을 제어하는데 필요한데 부모로부터 상속받게 되면 각각의 코루틴 제어가 어려워지기 때문이다

fun main() = runBlocking<Unit> {
    val testJob = coroutineContext[Job]
    launch {
        val launchJob = coroutineContext[Job]
        if (testJob == launchJob) {
            println("testJob 은 launchJob 과 동일하다")
        } else {
            println("testJob 은 launchJob 과 다르다")
        }
    }
}
// 결과
testJob 은 launchJob 과 다르다

 

 

상속되지는 않지만 부모로부터 전달받은 Job 객체는 코루틴을 구조화 하는데 사용된다

부모 코루틴 Job과 자식 코루틴 Job은 양방향 참조를 가진다

Job 구조화

 

public interface Job : CoroutineContext.Element {
    public companion object Key : CoroutineContext.Key<Job>

    // 부모 코루틴이 없을 수 있고(root) 최대 하나
    @ExperimentalCoroutinesApi
    public val parent: Job?
    // 복수의 자식 코루틴
    public val children: Sequence<Job>

    public val isActive: Boolean
    public val isCompleted: Boolean
    public val isCancelled: Boolean
    ...
}

 

코드

@OptIn(ExperimentalCoroutinesApi::class)
fun main() = runBlocking<Unit> {
    val parentJob = coroutineContext[Job]
    launch {
        val childJob = coroutineContext[Job]
        // 부모, 자식 코루틴은 같은가 - >false
        println("${parentJob === childJob}")
        // 자식 코루틴 Job의 부모는 parentJob? -> True
        println("${childJob?.parent === parentJob}")
        // 부모 코루틴 Job은 자식 코루틴 Job에 대한 참조를 갖는가? -> True
        println("${parentJob?.children?.contains(childJob)}")
    }
}

 

 


 

 

3. 작업 제어

취소 전파

부모 코루틴으로 취소가 요청되면 자식 코루틴으로 전파

코루틴 취소 전파 ex1

 

부모 코루틴으로는 취소가 전파되지 않는다

코루틴 취소 전파 ex2

 

 

부모 코루틴의 자식 코루틴에 대한 완료 의존성

부모 코루틴은 모든 자식 코루틴이 실행 완료되어야 완료 가능

fun main() = runBlocking<Unit> {
    val startTime = System.currentTimeMillis()
    val parentJob = launch {
        launch {
            delay(1000L)
            println("[${getElapsedTime(startTime)}] 자식 코루틴 실행 완료")
        }
        println("[${getElapsedTime(startTime)}] 부모 코루틴이 실행하는 마지막 코드")
    }
    parentJob.invokeOnCompletion {
        println("[${getElapsedTime(startTime)}] 부모 코루틴 실행 완료")
    }
}
// 결과
[지난 시간: 3ms] 부모 코루틴이 실행하는 마지막 코드
[지난 시간: 1015ms] 자식 코루틴 실행 완료
[지난 시간: 1016ms] 부모 코루틴 실행 완료

 

invokeOnCompletion 콜백은 코루틴이 실행 완료되었을 때와 취소 완료된 경우에 동작한다

 

 

부모 코루틴은 자식 코루틴의 실행 완료를 기다릴 때까지 "실행 완료 중" 상태를 가진다

부모 코루틴의 모든 코드가 실행되었지만 자식 코루틴이 실행중인 경우 부모가 가지는 상태

 

실행 완료 중 상태의 Job

fun main() = runBlocking<Unit> {
    val startTime = System.currentTimeMillis()
    val parentJob = launch {
        launch {
            delay(1000L)
            println("[${getElapsedTime(startTime)}] 자식 코루틴 실행 완료")
        }
        println("[${getElapsedTime(startTime)}] 부모 코루틴이 실행하는 마지막 코드")
    }
    parentJob.invokeOnCompletion {
        println("[${getElapsedTime(startTime)}] 부모 코루틴 실행 완료")
    }
    delay(500L)
    printJobState(parentJob)
}
// 결과
[지난 시간: 10ms] 부모 코루틴이 실행하는 마지막 코드
Job State
isActive >> true
isCancelled >> false
isCompleted >> false

[지난 시간: 1016ms] 자식 코루틴 실행 완료
[지난 시간: 1017ms] 부모 코루틴 실행 완료

 

일반적으로 실행 완료 중 상태와 실행 중 상태와 구분 없이 사용

상태 isActive isCancelled isCompleted
생성 false false false
실행 중 true false false
실행 완료 중 true false false
실행 완료 false false true
취소 중 false true false
취소 완료 false true true

 

 


 

 

CoroutineScope 사용해 코루틴 관리

CoroutineScope을 사용해 코루틴이 실행되는 범위 제한 가능

 

CoroutineScope 생성하는 방법

 

1. CoroutineScope Interface 사용

// 인터페이스를 구현해 CoroutineScope 객체 생성
public interface CoroutineScope {
    // scope의 context
    // context는 scope에 의해 캡슐화되어 scope의 extension인 코루틴 빌더의 구현에 사용된다
    public val coroutineContext: CoroutineContext
}

class TestCoroutineScope : CoroutineScope {
    override val coroutineContext: CoroutineContext = Job() + newSingleThreadContext("TestScopeThread")
}

fun main() = runBlocking<Unit> {
    val testCoroutineScope = TestCoroutineScope()
    testCoroutineScope.launch {
        delay(100L)
        println("[${Thread.currentThread().name}] 코루틴 실행 완료")
    }
    Thread.sleep(1000L)
}
// 결과
[TestScopeThread] 코루틴 실행 완료

 

 

2. CoroutineScope 함수 사용

@Suppress("FunctionName")
public fun CoroutineScope(context: CoroutineContext): CoroutineScope =
    ContextScope(if (context[Job] != null) context else context + Job())

fun main() {
    val testCoroutineScope = CoroutineScope(Dispatchers.IO)
    testCoroutineScope.launch {
        delay(100L)
        println("[${Thread.currentThread().name}] 코루틴 실행 완료")
    }
    Thread.sleep(1000L)
}
// 결과
[DefaultDispatcher-worker-1] 코루틴 실행 완료

 

 

CoroutineScope이 CoroutineContext를 제공하는 방법

CoroutineScope가 context를 제공하는 방식

/**
* 1. CoroutineScope로부터 CoroutineContext 객체 제공받음
* 2. 제공받은 Context에 launch 함수의 context 인자로 넘어온 context를 더한다
* 3. 생성된 CoroutineContext에 코루틴 빌더 함수가 호출되어 새로 생성되는 Job을 더한다
*/
public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    ...
}

@OptIn(ExperimentalStdlibApi::class)
fun main() {
    // MyCoroutine + IO + 새로운 Job으로 구성된 Context를 포함한 newScope 생성
    val newScope = CoroutineScope(CoroutineName("MyCoroutine") + Dispatchers.IO)
    // launch 빌더는 새로운 Job을 생성하고 newScope의 Job을 parent로 설정
    newScope.launch(CoroutineName("LaunchCoroutine")) {
        // this --> CoroutineScope. CoroutineScope 수신객체가 제공되었기 때문에
        println(this.coroutineContext[CoroutineName])
        println(this.coroutineContext[CoroutineDispatcher])
        val launchJob = this.coroutineContext[Job]
        val newScopeJob = newScope.coroutineContext[Job]
        println("launchJob?.parent === newScopeJob >> ${launchJob?.parent === newScopeJob}")
    }
    Thread.sleep(1000L)
}
// 결과
CoroutineName(LaunchCoroutine)
Dispatchers.IO
launchJob?.parent === newScopeJob >> true

 

 

 

CoroutineScope에 속한 코루틴의 범위

코루틴 빌더 람다식에서 수신 객체로 제공되는 CoroutineScope 객체는 빌더로 생성되는 코루틴과 람다식 내에서 CoroutineScope를 사용해 실행되는 모든 코루틴을 포함한다

// 새로운 CoroutineScope를 사용하면 새로운 Job 객체가 생성되어 새로운 계층 구조를 갖게 되기 때문에 runBlocking Scope에서 벗어날 수 있다
fun main() = runBlocking<Unit> {
    launch(CoroutineName("Coroutine1")) {
        launch(CoroutineName("Coroutine3")) {
            println("[${Thread.currentThread().name}, ${coroutineContext[CoroutineName]}] 코루틴 실행")
        }
        // 새로운 CoroutineScope을 생성함으로 runBlocking의 CoroutineScope 범위에서 벗어난다
        CoroutineScope(Dispatchers.IO).launch(CoroutineName("Coroutine4")) {
            println("[${Thread.currentThread().name}, ${coroutineContext[CoroutineName]}] 코루틴 실행")
        }
    }

    launch(CoroutineName("Coroutine2")) {
        println("[${Thread.currentThread().name}, ${coroutineContext[CoroutineName]}] 코루틴 실행")
    }
}
// 결과
[main, CoroutineName(Coroutine2)] 코루틴 실행
[DefaultDispatcher-worker-1, CoroutineName(Coroutine4)] 코루틴 실행
[main, CoroutineName(Coroutine3)] 코루틴 실행

 

 

CoroutineScope 취소

확장함수로 cancel 함수 지원

public fun CoroutineScope.cancel(cause: CancellationException? = null) {
    val job = coroutineContext[Job] ?: error("Scope cannot be cancelled because it does not have a job: $this")
    job.cancel(cause)
}

 

범위에서 실행 중인 모든 코루틴에 취소 요청

fun main() = runBlocking<Unit> {
    launch(CoroutineName("Coroutine1")) {
        launch(CoroutineName("Coroutine3")) {
            delay(100L)
            println("[${Thread.currentThread().name}, ${coroutineContext[CoroutineName]}] 코루틴 실행 완료")
        }
        launch(CoroutineName("Coroutine4")) {
            delay(100L)
            println("[${Thread.currentThread().name}, ${coroutineContext[CoroutineName]}] 코루틴 실행 완료")
        }
        this.cancel()
    }

    launch(CoroutineName("Coroutine2")) {
        delay(100L)
        println("[${Thread.currentThread().name}, ${coroutineContext[CoroutineName]}] 코루틴 실행 완료")
    }
}
// 결과
[main, CoroutineName(Coroutine2)] 코루틴 실행 완료

 

 

CoroutineScope 활성화 상태 확인

확장함수로 isActive 제공

// Job의 isActive 확인
public val CoroutineScope.isActive: Boolean
    get() = coroutineContext[Job]?.isActive ?: true

 

Job에 취소 요청되면 isActive -> false로 변경

fun main() = runBlocking<Unit> {
    val infiniteJob: Job = launch(Dispatchers.Default) {
        while (this.isActive) {
            println("작업 중")
        }
    }
    delay(100L)
    infiniteJob.cancel()
}

 

 


 

 

구조화와 Job

Job은 코루틴 구조화의 중심

CoroutineScope를 조작하는 것은 실제로 CoroutineContext의 Job을 조작

 

Job 구조화를 깨는 방법

  • CoroutineScope 사용
fun main() = runBlocking<Unit> {
    // 새로운 CoroutineScope를 생성함으로써 구조화를 깸
    val newScope = CoroutineScope(Dispatchers.IO)
    newScope.launch(CoroutineName("Coroutine1")) {
        delay(100L)
        println("[${Thread.currentThread().name}, ${coroutineContext[CoroutineName]}] 코루틴 실행")
    }
    newScope.launch(CoroutineName("Coroutine2")) {
        delay(100L)
        println("[${Thread.currentThread().name}, ${coroutineContext[CoroutineName]}] 코루틴 실행")
    }
}

 

위 코드의 구조화 그림

CoroutineScope 구조화

 

  • Job 사용
fun main() = runBlocking<Unit> {
    val newRootJob = Job()
    launch(CoroutineName("Coroutine1") + newRootJob) {
        delay(100L)
        println("[${Thread.currentThread().name}, ${coroutineContext[CoroutineName]}] 코루틴 실행")
    }
    launch(CoroutineName("Coroutine2") + newRootJob) {
        delay(100L)
        println("[${Thread.currentThread().name}, ${coroutineContext[CoroutineName]}] 코루틴 실행")
    }
    delay(1000L)
}
// 결과
[main, CoroutineName(Coroutine1)] 코루틴 실행
[main, CoroutineName(Coroutine2)] 코루틴 실행

 

 

Job으로 일부 코루틴만 취소되지 않게 하는 방법

fun main() = runBlocking<Unit> {
    val newRootJob = Job()
    launch(CoroutineName("Coroutine1") + newRootJob) {
        launch(CoroutineName("Coroutine3")) {
            delay(100L)
            println("[${Thread.currentThread().name}, ${coroutineContext[CoroutineName]}] 코루틴 실행")
        }
        launch(CoroutineName("Coroutine4")) {
            delay(100L)
            println("[${Thread.currentThread().name}, ${coroutineContext[CoroutineName]}] 코루틴 실행")
        }
    }
    launch(CoroutineName("Coroutine2") + newRootJob) {
        // Job() 을 추가해 Coroutine5 만 계층 구조를 끊음. Job()이 parent, Coroutine5 Job이 child
        launch(CoroutineName("Coroutine5") + Job()) {
            delay(100L)
            println("[${Thread.currentThread().name}, ${coroutineContext[CoroutineName]}] 코루틴 실행")
        }
    }
    delay(50L)
    // cancel 해도 coroutine5는 다른 계층 구조라 실행이 됨
    newRootJob.cancel()
    delay(1000L)
}
// 결과
[main, CoroutineName(Coroutine5)] 코루틴 실행

 

 

생성된 Job 부모를 명시적으로 설정하는 방법

// Job() 으로 생성할 경우 paren가 null이 되어 rootJob 생성
public fun Job(parent: Job? = null): CompletableJob = JobImpl(parent)

 

  • 코루틴의 계층을 끊는 Job
fun main() = runBlocking<Unit> {
    launch(CoroutineName("Coroutine1")) {
        val newJob = Job()
        launch(CoroutineName("Coroutine2") + newJob) {
            delay(100L)
            println("[${Thread.currentThread().name}, ${coroutineContext[CoroutineName]}] 코루틴 실행")
        }
    }
    delay(1000L)
}
// 결과
[main, CoroutineName(Coroutine2)] 코루틴 실행

코루틴 계층 끊는 Job

 

  • 계층을 깨지 않는 Job
fun main() = runBlocking<Unit> {
    launch(CoroutineName("Coroutine1")) {
        val coroutine1Job = this.coroutineContext[Job] // Coroutine1 Job
        val newJob = Job(parent = coroutine1Job)
        launch(CoroutineName("Coroutine2") + newJob) {
            delay(100L)
            println("[${Thread.currentThread().name}, ${coroutineContext[CoroutineName]}] 코루틴 실행")
        }
    }
}

 

계층 유지하는 Job

 

 

생성된 Job은 자동으로 실행 완료되지 않음

코루틴 빌더를 통해 생성된 Job 객체는 Child coroutine이 모두 실행 완료되면 자동으로 실행 완료

코루틴 빌더는 내부적으로 AbstractCoroutine을 상속

내부적으로 onCompletionInternal 함수 호출

public abstract class AbstractCoroutine<in T>(
    parentContext: CoroutineContext,
    initParentJob: Boolean,
    active: Boolean
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {

    ...
    
    // 모든 자식 코루틴이 완료되면 자동으로 호출
    @Suppress("UNCHECKED_CAST")
    protected final override fun onCompletionInternal(state: Any?) {
        if (state is CompletedExceptionally)
            onCancelled(state.cause, state.handled)
        else
            onCompleted(state as T)
    }
}

 

 

Job 함수를 통한 Job 객체는 complete 함수를 호출해야 완료

complete 함수를 통해 내부적으로 tryMakeCompleting 호출함으로써 onCompletionInternal 실행

private fun tryMakeCompleting(state: Any?, proposedUpdate: Any?): Any?
public fun Job(parent: Job? = null): CompletableJob = JobImpl(parent)

@PublishedApi
internal open class JobImpl(parent: Job?) : JobSupport(true), CompletableJob {
    init { initParentJob(parent) }
    override val onCancelComplete get() = true
    override val handlesException: Boolean = handlesException()
    
    // 명시적으로 호출해야 완료 가능
    override fun complete() = makeCompleting(Unit)
    
    override fun completeExceptionally(exception: Throwable): Boolean =
        makeCompleting(CompletedExceptionally(exception))
        
    ....
}

 

 

'코루틴' 카테고리의 다른 글

[코루틴] 일시 중단 함수 - suspend  (0) 2024.09.18
[코루틴] 예외 처리  (0) 2024.09.16
[코루틴] CoroutineContext  (0) 2024.08.31
[코루틴] async & Deferred  (0) 2024.08.30
CoroutineDispatcher 란 무엇인가  (0) 2024.08.25