Search

코루틴 API 살펴보기

생성일
2023/12/31 14:37
태그
Coroutine
Kotlin

코루틴과 동시성

코루틴은 매우 가볍다

코루틴은 JVM의 플랫폼 스레드보다 리소스 집약도가 낮다. 즉, 훨씬 적은 메모리를 사용하여 더 많은 일을 할 수 있게된다.
예제는 1,000,000개의 코루틴을 만들어서 각각 5초를 기다린 다음 마침표(.)을 출력하는 예제이다.
import kotlinx.coroutines.* fun main() = runBlocking { repeat(1_000_000) { // launch a lot of coroutines launch { delay(5000L) print(".") } } }
Kotlin
복사
스레드 예제) 테스트 PC 기준 4000개 정도 생성된 후 OOME가 발생한다.
fun main() { repeat(1_000_000) { // launch a lot of threads thread { Thread.sleep(5000L) print(".") } } Thread.sleep(10000L) } [0.344s][warning][os,thread] Failed to start thread "Unknown thread" - pthread_create failed (EAGAIN) for attributes: stacksize: 2048k, guardsize: 16k, detached. [0.344s][warning][os,thread] Failed to start the native thread for java.lang.Thread "Thread-4074" Exception in thread "main" java.lang.OutOfMemoryError: unable to create native thread: possibly out of memory or process/resource limits reached at java.base/java.lang.Thread.start0(Native Method) at java.base/java.lang.Thread.start(Thread.java:1526) at kotlin.concurrent.ThreadsKt.thread(Thread.kt:42) at kotlin.concurrent.ThreadsKt.thread$default(Thread.kt:20) at Example1Kt.main(example1.kt:17) at Example1Kt.main(example1.kt)
Kotlin
복사

코루틴 빌더

코루틴 빌더는 코루틴을 만드는 함수를 말한다.

1. runBlocking

runBlocking은 코루틴을 생성하는 코루틴 빌더이다.
runBlocking으로 감싼 코드는 코루틴 내부의 코드가 수행이 끝날때 까지 스레드가 블로킹된다
import kotlinx.coroutines.* fun main() { runBlocking { println("Hello") } println("World") } // Hello // World
Kotlin
복사
일반적으로 코루틴은 스레드를 차단하지 않고 사용해야하므로 runBlocking을 사용하는 것은 좋지 않지만 꼭 사용해야하는 경우가 있다.
코루틴을 지원하지 않는 경우 예) 테스트 코드, 스프링 배치 등
실행옵션에 -Dkotlinx.coroutines.debug 을 붙여주면 코루틴에서 수행되는 스레드는 이름 뒤에 @coroutine#1 이 붙어있는 것을 볼 수 있다

2. launch

launch는 스레드 차단 없이 새 코루틴을 시작하고 결과로 job을 반환하는 코루틴 빌더이다
launch는 결과를 만들어내지 않는 비동기 작업에 적합하기 때문에 Unit을 반환하는 람다를 인자로 받는다
fun main() = runBlocking<Unit> { launch { delay(500L) println("World!") } println("Hello") } // Hello // World
Kotlin
복사
delay() 함수는 코루틴 라이브러리에 정의된 일시 중단 함수이며 Thread.sleep() 과 유사하지만 현재 스레드를 차단하지 않고 일시 중단 시킨다. 이때 일시 중단 된 스레드는 코루틴내에서 다른 일시 중단 함수를 수행한다
launch를 사용해서 여러개의 작업을 동시에 수행할 수 있다
import kotlinx.coroutines.* import kotlin.system.measureTimeMillis fun main() { runBlocking { launch { val timeMillis = measureTimeMillis { delay(150) } println("async task-1 $timeMillis ms") } launch { val timeMillis = measureTimeMillis { delay(100) } println("async task-2 $timeMillis ms") } } }
Kotlin
복사
launch가 반환하는 Job 을 사용해 현재 코루틴의 상태를 확인하거나 실행 또는 취소도 가능하다
import kotlinx.coroutines.* import kotlin.system.measureTimeMillis fun main() = runBlocking<Unit> { val job1: Job = launch { val timeMillis = measureTimeMillis { delay(150) } println("async task-1 $timeMillis ms") } job1.cancel() // 취소 val job2: Job = launch(start = CoroutineStart.LAZY) { val timeMillis = measureTimeMillis { delay(100) } println("async task-2 $timeMillis ms") } println("start task-2") job2.start() }
Kotlin
복사
job1.cancel() 을 호출해 코루틴을 취소할 수 있다
launch(start = CoroutineStart.LAZY) 를 사용해서 start 함수를 호출하는 시점에 코루틴을 동작시킬 수 있다
start 함수를 주석처리하면 launch가 동작하지 않는다

3. async

async 빌더는 비동기 작업을 통해 결과를 만들어내는 경우에 적합하다
import kotlinx.coroutines.* fun sum(a: Int, b: Int) = a + b fun main() = runBlocking<Unit> { val result1: Deferred<Int> = async { delay(100) sum(1, 3) } println("result1 : ${result1.await()}") val result2: Deferred<Int> = async { delay(100) delay(100) sum(2, 5) } println("result2 : ${result2.await()}") }
Kotlin
복사
async는 비동기 작업의 결과로 Deferred 라는 특별한 인스턴스를 반환하는데 await 이라는 함수를 통해 async로 수행한 비동기 작업의 결과를 받아올 수 있다
자바 스크립트나 파이썬과 같이 다른 언어의 async-await키워드 인 경우가 보통이지만 코틀린의 코루틴은 async-await이 함수인 점이 차이점이다
자바 스크립트의 async-await 예시)
async function showAvatar() { // JSON 읽기 let response = await fetch('/article/promise-chaining/user.json'); let user = await response.json(); // github 사용자 정보 읽기 let githubResponse = await fetch(`https://api.github.com/users/${user.name}`); let githubUser = await githubResponse.json(); // 아바타 보여주기 let img = document.createElement('img'); img.src = githubUser.avatar_url; img.className = "promise-avatar-example"; document.body.append(img); // 3초 대기 await new Promise((resolve, reject) => setTimeout(resolve, 3000)); img.remove(); return githubUser; } showAvatar();
JavaScript
복사

구조적 동시성

동시 실행 가능한 작업을 구조화된 방식으로 관리하여 코드의 가독성을 높이고 오류 가능성을 줄인다.
계층 구조로 관리되어 부모 코루틴은 자식 코루틴의 작업이 모두 끝나기 전까지 종료되지 않는다.
자식 코루틴에서 발생한 에러는 부모 코루틴으로 전파된다. 이를 통해 에러 처리를 중앙화할 수있고, 동시성을 처리하는 개별 코루틴의 에러핸들링이 가능하다.

suspend 함수

suspend 함수는 코루틴의 핵심 요소로써 일시 중단이 가능한 함수를 말한다
suspend는 키워드이다
suspend 함수는 일반 함수를 마음껏 호출할 수 있지만 일반 함수에선 suspend 함수를 호출할 수 없다
package structuredconcurrency fun main() { printHello() // 컴파일 에러 } suspend fun printHello() = println("hello")
Kotlin
복사
Caller 함수에서 suspend 키워드를 붙여주면 된다
package structuredconcurrency suspend fun main() { printHello() } suspend fun printHello() = println("hello")
Kotlin
복사
일시 중단 함수는 IntelliJ 에서 suspension point가 표시된다.

coroutineScope

runBlocking은 현재 스레드를 블로킹시키고 결과를 기다리지만 coroutineScope 는 스레드가 블로킹되지 않고 결과를 기다린다.
package structuredconcurrency import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay import kotlinx.coroutines.launch suspend fun main() { doSomething() } private suspend fun doSomething() = coroutineScope { launch { delay(200) println("world!") } launch { println("hello") } }
Kotlin
복사

예외 처리

coroutineScope 내부의 자식 코루틴에서 에러가 발생하면 모든 코루틴이 종료된다.
private suspend fun doSomething() = coroutineScope { launch { delay(200) println("world!") // 실행안됨 } launch { throw RuntimeException("error") } } Exception in thread "DefaultDispatcher-worker-2" java.lang.RuntimeException at structuredconcurrency.CoroutineScopeErrorKt$doSomething$2$2$1.invokeSuspend(coroutineScopeError.kt:21) at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33) at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106) at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:570) at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:750) at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:677) at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:664) Suppressed: kotlinx.coroutines.DiagnosticCoroutineContextException: [StandaloneCoroutine{Cancelling}@4bbea199, Dispatchers.Default]
Kotlin
복사
코루틴 빌더 내부에서 try-catch로 핸들링
private suspend fun doSomething() = coroutineScope { launch { delay(200) println("world!") } launch { try { throw RuntimeException() } catch (e: Exception) { println("hello") } } } hello world!
Kotlin
복사
supervisorScope 를 사용해서 예외를 부모 코루틴으로 전파하지않는 방법
private suspend fun doSomething() = coroutineScope { launch { delay(200) println("world!") } supervisorScope { launch { throw RuntimeException() } } } Exception in thread "DefaultDispatcher-worker-2" java.lang.RuntimeException at structuredconcurrency.CoroutineScopeErrorKt$doSomething$2$2$1.invokeSuspend(coroutineScopeError.kt:21) at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33) at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106) at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:570) at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:750) at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:677) at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:664) Suppressed: kotlinx.coroutines.DiagnosticCoroutineContextException: [StandaloneCoroutine{Cancelling}@4bbea199, Dispatchers.Default] world!
Kotlin
복사
코루틴의 예외 상황을 체계적으로 관리하고 싶을때 CEH(CoroutineExceptionHandler 를 사용한다
supervisorScopeCoroutineExceptionHandler 함께 사용
private suspend fun doSomething() = coroutineScope { launch { delay(200) println("world!") } supervisorScope { launch(handler) { throw RuntimeException() } } } val handler = CoroutineExceptionHandler { _, throwable -> println("Caught $throwable") } Caught java.lang.RuntimeException world!
Kotlin
복사

코루틴 채널

Channel 은 코루틴간의 통신을 위한 파이프라인이다
기본 컨셉은 BlockingQueue 와 유사한 자료구조로 볼 수 있다.
데이터 발신자가 채널에 send 로 데이터를 전달하고 수신 측에서 receive 를 사용해 채널로 부터 데이터를 받아온다.

BlockingQueue 를 사용해 주식 시세를 처리하는 처리하는 예제

package channel import kotlinx.coroutines.* import java.util.concurrent.BlockingQueue import java.util.concurrent.LinkedBlockingQueue import kotlin.random.Random fun main() = runBlocking { val queue: BlockingQueue<String> = LinkedBlockingQueue() // 생산자와 소비자 코루틴을 실행 producer(queue, 20) consumer(queue) } suspend fun producer(queue: BlockingQueue<String>, count: Int) = coroutineScope { launch { repeat(count) { val stockPrice = "APPLE : ${Random.nextInt(100, 200)}" queue.put(stockPrice) delay(100L) // 0.1초 간격으로 데이터 생성 } } } fun consumer(queue: BlockingQueue<String>) { for (stockPrice in queue) { println("Processed $stockPrice") } }
Kotlin
복사
put, take 가 모두 블로킹으로 이뤄진다

Channel 를 사용해 주식 시세를 처리하는 처리하는 예제

package channel import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel import java.util.concurrent.BlockingQueue import java.util.concurrent.LinkedBlockingQueue import kotlin.random.Random fun main() = runBlocking { val channel: Channel<String> = Channel() // 생산자와 소비자 코루틴을 실행 producer(channel, 20) consumer(channel) } suspend fun CoroutineScope.producer(channel: Channel<String>, count: Int) { launch { repeat(count) { val stockPrice = "APPLE : ${Random.nextInt(100, 200)}" channel.send(stockPrice) delay(100L) // 0.1초 간격으로 데이터 생성 } channel.close() // 모든 데이터를 보냈으면 채널을 닫습니다. } } suspend fun consumer(channel: Channel<String>) { for (stockPrice in channel) { println("Processed $stockPrice") } }
Kotlin
복사
데이터 송수신 연산(send, receive)이 모두 일시중단되므로 논-블로킹으로 이뤄진다.

플로우 : 비동기 데이터 스트림

Flow 는 코루틴에서 리액티브 프로그래밍 스타일로 작성할 수 있도록 만들어진 비동기 스트림 API이다
코루틴의 suspend 함수는 단일 값을 반환하지만 Flow를 사용하면 무한대 값을 반환할 수 있다

플로우 빌더

flow 빌더를 사용해 코드를 작성하고 emit 을 사용해 데이터를 전달한다
package flow import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.flow import kotlinx.coroutines.runBlocking fun main() = runBlocking { val flow = simple() flow.collect { value -> println(value) } } fun simple(): Flow<Int> = flow { println("Flow started") for (i in 1..3) { delay(100) emit(i) } }
Kotlin
복사
리액티브 스트림과 같이 Terminal Operator(최종 연산자)collect 를 호출하지 않으면 아무런 일도 일어나지 않는다
flowOf 를 사용하면 여러 값을 인자로 전달해서 플로우를 만들 수 있다.
import kotlinx.coroutines.* import kotlinx.coroutines.flow.* fun main() = runBlocking { flowOf(1, 2, 3, 4, 5).collect { value -> println(value) } }
Kotlin
복사
asFlow 를 사용하면 존재하는 컬렉션이나 시퀀스를 플로우로 변환할 수 있다
import kotlinx.coroutines.* import kotlinx.coroutines.flow.* fun main() = runBlocking<Unit> { listOf(1, 2, 3, 4, 5).asFlow().collect { value -> println(value) } (6..10).asFlow().collect { println(it) } }
Kotlin
복사

다양한 연산자 제공

리액티브 스트림 수준으로 다양한 연산자를 제공하진 못하고 있지만 일반적인 상황에선 충분하다.
package flow import kotlinx.coroutines.* import kotlinx.coroutines.flow.* suspend fun filterExample() { (1..10) .asFlow() .filter { (it % 2) == 0 } .collect { println(it) } } suspend fun takeExample() { (1..10) .asFlow() .take(2) .collect { println(it) } } suspend fun mapExample() { (1..10) .asFlow() .map { it * it } .collect { println(it) } } suspend fun zipExample() { val nums = (1..3).asFlow() val strs = listOf("one", "two", "three").asFlow() nums.zip(strs) { a, b -> "$a -> $b" } .collect { println(it) } } suspend fun flattenExample() { val left = flowOf(1, 2, 3) val right = flowOf(4, 5, 6) val flow = flowOf(left, right) flow.flattenConcat().collect { println(it) } }
Kotlin
복사

예제코드