코루틴 공식 페이지의 Channel 부분을 나름 번역하고 정리한 글이다. 소수 예제와 Ticker 관련 내용은 포함하지 않았다. 이후 Flow 사용하면 Channel을 따로 사용할까 싶어서인데, 이후 필요하면 내용 추가하려고 한다.
채널?
코루틴에서 채널은 BlockingQueue와 유사하다. 차이점은 Queue에서의 put 동작은 코루틴에서는 send. Queue에서의 take 동작은 코루틴에서는 receive라는 것이다.
fun main() = runBlocking {
val channel = Channel<Int>()
launch {
// this might be heavy CPU-consuming computation or async logic, we'll just send five squares
for (x in 1..5) channel.send(x * x)
}
// here we print five received integers:
repeat(5) { println(channel.receive()) }
println("Done!")
}
결과는 다음과 같다.
//1
//4
//9
//16
//25
//Done!
Int 타입을 저장할 수 있는 Channel을 생성 후, send를 통해 Channel에 추가하고, receive를 통해 Channel에 있는 내용을 가져온다. BlockingQueue로 보면 간단한 내용이며, 중요한 점은 서로 다른 코루틴에서의 데이터를 공유할 수 있다는 것이다. runBlocking 코루틴 빌더 부분(receive)과 launch 코루틴 빌더 부분(send)으로 서로 다른 코루틴이라는 점이다.
채널 닫기와 순환
채널에서는 데이터가 더 추가되지 않는다는 의미로 close를 호출한다. 그리고 채널 내부의 데이터에 순차적 접근을 하기 위해 이전 예제처럼 repeat으로 접근하는 것이 아니라 for-in 반복문을 이용해 채널의 데이터에 접근한다.
fun main() = runBlocking {
val channel = Channel<Int>()
launch {
for (x in 1..5) channel.send(x * x)
channel.close() // we're done sending
}
// here we print received values using `for` loop (until the channel is closed)
for (y in channel) println(y)
println("Done!")
}
결과는 다음과 같으며, 채널이 close 이후에 채널에 접근(send, receive)을 하게 되면 Exception이 발생한다.
//1
//4
//9
//16
//25
//Done!
채널 프로듀서?!
위에서 Channel에 데이터를 추가하는 내용들은 간단하게 for 반복문을 이용했다. 코루틴에서는 더 쉽게 데이터 요소(Element)를 시퀀스로 생성하는 produce라는 코루틴 빌더가 있다. 또한 소비하는 측에서는 Channel의 확장함수인 consumeEach가 있다. 이번 예제는 앞선 예제들의 생산자-소비자 패턴을 이미 코루틴과 Channel에서 제공하는 내용으로 변경한 것 뿐이다.
fun CoroutineScope.produceSquares(): ReceiveChannel<Int> = produce {
for (x in 1..5) send(x * x)
}
fun main() = runBlocking {
val squares = produceSquares()
squares.consumeEach { println(it) }
println("Done!")
}
생산자측은 produce를 이용해서 Channel에 데이터를 추가하고 있으며, 소비자측에서는 Channel 확장함수인 consumeEach를 이용해 화면에 출력하고 있다. 결과는 이전과 예제들과 동일하다.
파이프라인?
프로듀서(produce)가 나오더니 갑자기 파이프라인이라 의문을 갖을 수도 있다. 파이프라인이라는 것은 하나의 코루틴이 무한한 값의 스트림을 생성하는 패턴이다. 파이프라인이 있고 끝에 수도꼭지가 달려있어 수도꼭지를 틀면 물이 계속 나오는 것처럼, 파이프라인은 물 대신 데이터가 무한히 나오는 것. 파이프라인은 코루틴과 이어서 사용할 수 있다. 즉 A 코루틴이 데이터를 생성 스트림이고, 이걸 B 코루틴에서 데이터를 소비하도록 구현할 수 있다.
fun main() = runBlocking {
val numbers = produceNumbers() // produces integers from 1 and on
val squares = square(numbers) // squares integers
repeat(5) {
println(squares.receive()) // print first five
}
println("Done!") // we are done
coroutineContext.cancelChildren() // cancel children coroutines
}
fun CoroutineScope.produceNumbers() = produce<Int> {
var x = 1
while (true) send(x++) // infinite stream of integers starting from 1
}
fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce {
for (x in numbers) send(x * x)
}
파이프라인은 패턴이다. 패턴! 위 예제에서는 produce를 이용해서 무한으로 1부터 정수를 생성하는 스트림(produceNumbers)과, Channel을 인자로 받고, 안의 데이터를 제곱해서 생성하는 스트림(square) 2개가 존재한다.
그리고 2개의 스트림을 연결(square(numbers))한 뒤에 처음 5개 숫자만 출력을 하고 있다. 결과는 앞선 예제들과 동일하다.
Fan-out
하나의 채널을 여러 코루틴이 접근해서 데이터를 처리할 수 있다는 내용이다.
fun main() = runBlocking<Unit> {
val producer = produceNumbers()
repeat(5) { launchProcessor(it, producer) }
delay(950)
producer.cancel() // cancel producer coroutine and thus kill them all
}
fun CoroutineScope.produceNumbers() = produce<Int> {
var x = 1 // start from 1
while (true) {
send(x++) // produce next
delay(100) // wait 0.1s
}
}
fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
for (msg in channel) {
println("Processor #$id received $msg")
}
}
produceNumbers는 초당 10개의 정수를 생성하는 produce이다(produce의 반환값은 Channel 형태). 그리고 launchProcessor는 코루틴으로 repeat 반복문 안에서 실행되면서 서로 다른 코루틴 영역에서 생성되서 실행된다.
결과는 아래와 완전히 동일하진 않지만 비슷하게 출력될 것이다.
//Processor #0 received 1
//Processor #0 received 2
//Processor #1 received 3
//Processor #2 received 4
//Processor #3 received 5
//Processor #4 received 6
//Processor #0 received 7
//Processor #1 received 8
//Processor #2 received 9
//Processor #3 received 10
출력의 Processor가 서로 다르나, Channel의 데이터 소비는 차례대로 일어나고 있는 것을 볼 수 있다. 주의해서 볼 것은 Channel 데이터 소비를 consumeEach가 아니라 for 반복문을 통해 한다는 것이다. for 반복문의 경우 코루틴이 실패하더라도 다른 프로세서 코루틴이 동작하지만, consumeEach는 한 프로세서 코루틴이 실패하면 다른 프로세서 코루틴 또한 종료 시킬수 있다.
Fan-In
Fan-out이 하나의 Channel을 여러 코루틴에서 데이터 소비를 할 수 있는 것이라고 했다면, Fan-In은 반대로 여러 코루틴들이 하나의 Channel로 데이터를 전송할 수 있다는 내용이다.
fun main() = runBlocking {
val channel = Channel<String>()
launch { sendString(channel, "foo", 200L) }
launch { sendString(channel, "BAR!", 500L) }
repeat(6) { // receive first six
println(channel.receive())
}
coroutineContext.cancelChildren() // cancel all children to let main finish
}
suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
while (true) {
delay(time)
channel.send(s)
}
}
공통으로 데이터를 이용할 channel을 생성한 후에, launch 코루틴 빌더를 이용해 서로 다른 문자열 값을 공통 channel로 전송하고 있다. 결과값을 보면 foo를 전송하는 코루틴과 Bar를 전송하는 코루틴 모두 하나의 Channel로 데이터를 전송하고 있다는 결과를 보여준다.
//foo
//foo
//BAR!
//foo
//foo
//BAR!
버퍼 채널
앞선 예제들의 Channel에서는 버퍼가 없다. 버퍼가 없다는 말은 데이터의 송신-수신이 꼭 쌍으로 이루어진다는 뜻이다.
예를 들어 send가 호출되면, receive가 호출될 때까지 일시 중지된다. 즉 send 후, receive 전까지 send 할 수 없다는 말이다. 반대로 receive가 먼저 호출되면 send 호출될 때까지 일시 중지된다.
버퍼 채널은 채널 생성 시, 버퍼값을 전달해 줌으로써 송신-수신이 항상 쌍으로 이루어지는 것에 여유를 줄 수 있다.
fun main() = runBlocking<Unit> {
val channel = Channel<Int>(4) // create buffered channel
val sender = launch { // launch sender coroutine
repeat(10) {
print("Sending $it") // print before sending each element
channel.send(it) // will suspend when buffer is full
print("\t: Done\n")
}
}
// don't receive anything... just wait....
delay(1000)
sender.cancel() // cancel sender coroutine
}
channel을 생성할 때 크기가 4인 버퍼를 주어 선언했으며, 해당 channel로 10번 반복해서 정수를 보낸다. 위 예제에서는 receive 함수가 없고 send만 동작하는데 크기를 4를 주었기 때문에 0~3까지 4개의 정수만 Channel에 수신된다.
//Sending 0 : Done
//Sending 1 : Done
//Sending 2 : Done
//Sending 3 : Done
//Sending 4
결과값을 보면 Sending 3 : Done까지 출력되고, channel.send(4)에서 일시 중지된 상태에서 1초 후에 코루틴 자체를 취소(sender.cancel)한다.
채널은 공정하다
채널의 데이터 송수신의 경우 FIFO(First-In First-Out)로 동작한다는 것을 의미한다. 하나의 채널에 여러 코루틴들이 데이터를 송수신 한다면 호출 순서대로 동작한다.
data class Ball(var hits: Int)
fun main() = runBlocking {
val table = Channel<Ball>() // a shared table
launch { player("ping", table) }
launch { player("pong", table) }
table.send(Ball(0)) // serve the ball
delay(1000) // delay 1 second
coroutineContext.cancelChildren() // game over, cancel them
}
suspend fun player(name: String, table: Channel<Ball>) {
for (ball in table) { // receive the ball in a loop
ball.hits++
println("$name $ball")
delay(300) // wait a bit
table.send(ball) // send the ball back
}
}
table이란 Channel에 ping 코루틴이 먼저 시작되고, pong 코루틴이 다음에 시작되어 데이터를 send하고 있다.
//ping Ball(hits=1)
//pong Ball(hits=2)
//ping Ball(hits=3)
//pong Ball(hits=4)
이것만 알아두자
- 채널(Channel)은 BlockingQueue와 유사함. put 대신 send, take 대신 receive 사용.
- 채널은 데이터 송수신이 항상 쌍으로 동작해야함. send하면 receive 할 때까지 일시 중지. receive하면 send할 때까지 일시 중지.
- 채널에 버퍼를 설정하면, 송수신을 쌍으로 동작하지 않아도 됨.
- 채널은 여러 코루틴과 데이터 공유 가능함.
'Android' 카테고리의 다른 글
Android Studio .gitignore 적용 안될 때 (0) | 2023.03.17 |
---|---|
Android Studio 빌드 후 자동 실행 안될 때 (0) | 2023.03.13 |
[AOS] 코틀린(Kotlin) 코루틴 취소(Coroutines cancel) (0) | 2023.02.06 |
[AOS] 코틀린(Kotlin) 코루틴 기초(Coroutines basic) (0) | 2023.02.04 |
[AOS] ViewModel (0) | 2023.01.19 |