본문 바로가기

코루틴

[코루틴] 공유 상태를 사용하는 코루틴

공유 상태를 사용하는 코루틴과 문제점

 

가변 변수 사용 시 문제점

상태를 공유하기 위해서 가변 변수 사용

 

문제

Inconsistency (불일치)
var count = 0
fun main() = runBlocking<Unit> {
    withContext(Dispatchers.Default) {
        repeat(10_000) {
            launch {
                count += 1
            }
        }
    }
    println("count = ${count}")
}
// 결과
// 1차 count = 8852
// 2차 count = 8875
// 3차 count = 8700

 

원인

메모리 가시성 (Memory Visibility)

 

한 스레드에서 변경한 데이터를 다른 스레드에서 볼 수 있게 하는 것

 

하드웨어의 메모리 구조와 연관

CPU 캐시를 사용하는 경우, CPU 캐시의 값이 메인 메모리에 전파되는 데의 시간으로 인해 메인 메모리와의 데이터 불일치 문제 발생

다른 스레드에서 변수 Read 시 변경된 것을 확인하지 못할 수 있다

 

경쟁 상태 (Race Condition)

 

공유 자원을 둘 이상의 스레드가 읽거나 쓰면서 결과값이 의도와 달라질 수 있는 문제

 

 

JVM 메모리 공간이 헤드웨어 메모리 구조와 연결되는 방식

 

JVM 메모리 구조

JVM 메모리 구조

 

 

Stack Area

Primitive 타입 데이터 저장
Heap 영역에 저장된 객체에 대한 참조 저장

 

Heap Area

JVM 스레드에서 공통으로 사용되는 메모리 공간
크고 복잡한 데이터 저장 (객체나 배열 등)

 

 

 

H/W 메모리 구조

HW Memory 구조

 

 

 

 

CPU는 CPU 캐시 메모리를 두고

 

데이터 조회 시 공통 영역인 메인 메모리까지 가지 않고 캐시 메모리에서 데이터를 조회

 

Memory Access 속도 향상

 

 

 

 

 

JVM 메모리 구조와 H/W 메모리 구조 연결

JVM과 H/W 메모리 구조 연결

 

 

하드웨어 메모리 구조는 JVM Stack, Heap 영역 구분 X

 

Stack, Heap의 데이터들은 CPU 레지스터, Cache M,

Main M 모두에 있을 수 있다

 

이로 인해 멀티 스레드 환경에서 공유 상태를 사용할 때 문제 발생

 

 

 

 

메모리 가시성 (Memory Visibility) 문제

하나의 스레드가 다른 스레드가 변경된 상태를 확인하지 못하는 것

서로 다른 CPU에서 실행되는 스레드들에서 공유 상태 Read/Write 시 생기는 문제

 

 

 

1. 공유될 상태는 메인 메모리에 저장

 

2. 하나의 스레드가 상태를 읽으면 CPU는 캐시 메모리에 저장

 

3. 스레드는 캐시 메모리의 값을 사용해 연산 실행

 

 

 

 

 

 

4. 연산이 완료되면 스레드는 CPU 캐시 메모리에 Write

 

 

 

 

 

 

 

 

 

 

5. 메인 메모리로 Write되지 않은 경우 다른 CPU의 스레드가 같은 공유 상태를 읽을 경우

    저장되지 않은 기존 값 Read

 

 

 

 

 

 

 

 

 

 

6. 2번의 연산이 일어났지만 메인 메모리의 값은 한 번만 실행된 값 저장

 

 

 

 

 

 

 

 

 

 

 

해결

@Volatile 사용

 

변수의 Read/Write 동작이 캐시 메모리가 아닌 항상 메인 메모리에서 직접 이루어지도록 보장

@Volatile
var count = 0

// 결과
// 1차 count = 9686
// 2차 count = 8632
// 3차 count = 9005

 

Volatile의 문제점

가시성 문제는 해결

하지만 여전히 Race Condition (경쟁 상태) 문제 발생

 

 

경쟁 상태 문제

여러 스레드에서 동시에 메인 메모리에 저장된 값에 접근하면서 발생하는 문제

@Volatile
var count = 0

fun main() = runBlocking<Unit> {
    withContext(Dispatchers.Default) {
        // 각 스레드에서 실행 중인 코루틴들이 count에 동시 접근. 연산 중복 실행
        repeat(10_000) {
            launch {
                count += 1
            }
        }
    }
    println("count = ${count}")
}

 

연산이 2번 일어나지만 값은 한 번만 증가

 

해결

공유 상태에 동시에 접근하지 못하도록 제한

 

동시 접근을 제한하는 방법

1. 임계 영역 (Critical Section) 설정
2. 공유 변수에 접근할 때 하나의 스레드만 사용하도록 강제
3. 원자성 있는 객체 사용 (Atomic)

 

 

임계 영역 생성

공유 변수의 변경 가능 지점에 임계 영역 설정

 

Mutex 사용

코틀린에서 코루틴에 대한 임계 영역 생성

var count = 0
val mutex = Mutex()

fun main() = runBlocking<Unit> {
    withContext(Dispatchers.Default) {
        repeat(10_000) {
            // lock 해제 전까지 다른 코루틴이 임계 영역에 진입 X
            launch {
                mutex.lock() // 임계 영역 시작
                count += 1   // 이 영역에 여러 코루틴이 접근 X
                mutex.unlock() // 임계 영역 종료
            }
        }
    }
    println("count = ${count}")
}
// 결과
// 1차 count = 10000
// 2차 count = 10000
// 3차 count = 10000

// Locks this mutex, suspending caller while the mutex is locked
public suspend fun lock(owner: Any? = null)

// Unlocks this mutex. Throws IllegalStateException if invoked on a mutex 
// that is not locked or was locked with a different owner token
public fun unlock(owner: Any? = null)

 

휴먼 에러를 줄이기 위한

withLock 함수

var count = 0
val mutex = Mutex()

fun main() = runBlocking<Unit> {
    withContext(Dispatchers.Default) {
        repeat(10_000) {
            launch {
                mutex.withLock {
                    count += 1
                }
            }
        }
    }
    println("count = ${count}")
}

// 내부적으로 lock, unlock
@OptIn(ExperimentalContracts::class)
public suspend inline fun <T> Mutex.withLock(owner: Any? = null, action: () -> T): T {
    contract { 
        callsInPlace(action, InvocationKind.EXACTLY_ONCE)
    }

    lock(owner)
    try {
        return action()
    } finally {
        unlock(owner)
    }
}

 

 

하나의 스레드만 사용하도록 설정

Single Thread 코루틴 디스패처 사용

var count = 0
val countChangeDispatcher = newSingleThreadContext("CountChangeThread")

fun main() = runBlocking<Unit> {
    withContext(Dispatchers.Default) {
        repeat(10_000) {
            launch { increaseCount() }
        }
    }
    println("count = ${count}")
}
// 결과
// count = 10000

// 호출한 launch 코루틴의 실행 스레드가 countChangeDispatcher로 변경되어 동시 접근 X
suspend fun increaseCount() = coroutineScope {
    withContext(countChangeDispatcher) {
        count += 1
    }
}

 

 

원자성 있는 객체 사용

스레드 동시 접근 시 원자적 연산을 제공하여 데이터의 일관성 보장

하나의 스레드만 접근하도록 제한

AtomicInteger, AtomicLong, AtomicBoolean 등등

var count = AtomicInteger(0)

fun main() = runBlocking<Unit> {
    withContext(Dispatchers.Default) {
        repeat(10_000) {
            launch {
                count.getAndUpdate {
                    it + 1
                }
            }
        }
    }
    println("count = ${count}")
}

 

Atomic 객체는 내부적으로 CAS(Compare-And-Swap) 루프를 사용하여 Atomic 연산을 구현

 

 

복잡한 객체의 참조에 대한 원자성

AtomicReference 사용

data class Counter(val name: String, val count: Int)
val atomicCounter = AtomicReference(Counter("MyCounter", 0))

fun main() = runBlocking<Unit> {
    withContext(Dispatchers.Default) {
        repeat(10_000) {
            launch {
                atomicCounter.getAndUpdate {
                    it.copy(count = it.count + 1)
                }
            }
        }
    }
    println(atomicCounter.get())
}
// 결과
// Counter(name=MyCounter, count=10000)

 

 

Atomic 객체를 사용할 때 주의점

read/write를 따로 실행하지 않도록 주의

(값이 변경되기 전 다른 스레드에서 값에 대해 접근 가능)

var count = AtomicInteger(0)

fun main() = runBlocking<Unit> {
    withContext(Dispatchers.Default) {
        repeat(10_000) {
            launch {
                val currentCount = count.get()
                // 이 사이에 다른 thread가 count 값 read, write 가능
                count.set(currentCount + 1)
            }
        }
    }
    println("count = $count")
}
// 결과
// count = 8664