๊ฐ์
@KafkaListener์ ์์ฑ ์ค์๋ concurrency ์ค์ ์ด ์กด์ฌํ๋ค. ์ด ์ค์ ์ Kafka ๋ฆฌ์ค๋์ ๋ณ๋ ฌ ์๋น(Parallel Consumption)๋ฅผ ์ค์ ํ๋ ์ต์ ์ผ๋ก, ํ๋์ Kafka ํ ํฝ์ ๋ํด ์ฌ๋ฌ ๊ฐ์ ์๋น์ ์ค๋ ๋๋ฅผ ํตํด ๋ณ๋ ฌ๋ก ๋ฉ์์ง๋ฅผ ์ฒ๋ฆฌํ ์ ์๋๋ก ํ๋ ๊ธฐ๋ฅ์ด๋ค.
๊ทธ๋ฌ๋ ๋จ์ํ concurrency ๊ฐ์ ๋์ธ๋ค๊ณ ํด์ ๋ฐ๋์ ์ฑ๋ฅ์ด ํฅ์๋๋ ๊ฒ์ ์๋๋ค. ํน์ ์ํฉ์์๋ ๋ณ๋ ฌ ์ฒ๋ฆฌ๊ฐ ํจ๊ณผ์ ์ด์ง๋ง, ์ ์ ํ์ง ์์ ๊ฒฝ์ฐ ์คํ๋ ค ๋ถํ์ํ ๋ฆฌ์์ค ๋ญ๋น๋ก ์ด์ด์ง ์ ์๋ค. ๋ฐ๋ผ์ ์ด๋ฒ ๊ฒ์๊ธ์์๋ concurrency์ ๊ฐ๋ ๊ณผ ์์ ๋ฅผ ํตํด ํจ๊ณผ์ ์ผ๋ก ๋ณ๋ ฌ ์ฒ๋ฆฌ๋ฅผ ํ์ฉํ ์ ์๋ ๊ฒฝ์ฐ๋ฅผ ์ดํด๋ณด๊ณ ์ ํ๋ค.
Concurrency ๊ธฐ๋ณธ ๊ฐ๋
Kafka๋ ๊ธฐ๋ณธ์ ์ผ๋ก ์ปจ์๋จธ ๊ทธ๋ฃน(Consumer Group) ๋จ์๋ก ๋ฉ์์ง๋ฅผ ํ ๋นํ๋ค. ๊ทธ๋ฆฌ๊ณ ๋์ผํ ์ปจ์๋จธ ๊ทธ๋ฃน ๋ด์์๋ ๊ฐ ํํฐ์ ์ด ํ๋์ ์ปจ์๋จธ์๊ฒ๋ง ํ ๋น๋๋ฉฐ, ์ฌ๋ฌ ์ปจ์๋จธ๊ฐ ๋์ผํ ํํฐ์ ์ ์ค๋ณต์ผ๋ก ์๋นํ ์ ์๋ค.
์์ ๊ฐ์ด Consumer-1๊ณผ Consumer-2๊ฐ ์กด์ฌํ๊ณ , Topic-01์ Topic-02์ ์ฌ๋ฌ ํํฐ์ ์ด ๊ฐ๊ฐ ์ด ๋ ์ปจ์๋จธ์๊ฒ ํ ๋น๋๋ ๊ตฌ์กฐ๋ฅผ ์ดํด๋ณด์. ๊ธฐ๋ณธ ์ค์ ์์๋ ๊ฐ ์ปจ์๋จธ๊ฐ ํ ๋น๋ฐ์ ํํฐ์ ์์ ๋ฉ์์ง๋ฅผ ํ๋์ฉ ์์ฐจ์ ์ผ๋ก ์ฒ๋ฆฌํ๋๋ฐ, Consumer-1์ด Topic-01์ Partition-1์์ ๋ฉ์์ง๋ฅผ ์ฒ๋ฆฌํ ํ Topic-02์ Partition-1์ ๋ฉ์์ง๋ฅผ ์ฒ๋ฆฌํ๊ฑฐ๋ ๊ทธ ๋ฐ๋ ์์๋ก ์ฒ๋ฆฌํ๋ ๋ฐฉ์์ด๋ค. ์ด๋ Kafka ์ปจ์๋จธ ์ค๋ ๋์ ๊ธฐ๋ณธ ์ค์ (concurrency)์ด 1๊ฐ์ด๊ธฐ ๋๋ฌธ์ด๋ค. (์ฐธ๊ณ ๋ก ๋ฉ์์ง๋ฅผ ์ฒ๋ฆฌํ๋ ์์๋ ์์ฐจ์ ์ด์ง๋ง, poll()์ ํธ์ถํ ๋๋ ์ฌ๋ฌ ํํฐ์ ์์ ํ๊บผ๋ฒ์ ๋ฉ์์ง๋ฅผ ๊ฐ์ ธ์ฌ ์ ์๋ค. ์ฆ ๊ฐ์ ธ์ค๋ ๊ฒ์ ๋์์ ๊ฐ๋ฅํ์ง๋ง ์ฒ๋ฆฌ ์์ฒด๋ ํ๋์ฉ ์ด๋ฃจ์ด์ง๋ค.)
์ด ๊ฒฝ์ฐ๋ ๋ง์ฐฌ๊ฐ์ง๋ค. Consumer-1์ด Topic-01์ ์ธ ๊ฐ์ ํํฐ์ ์ ํ ๋น๋ฐ์ผ๋ฉด, ํ๋์ ์ค๋ ๋๊ฐ ๊ฐ ํํฐ์ ๋ค์ ๋ฉ์์ง๋ฅผ ์์ฐจ์ ์ผ๋ก ์ฒ๋ฆฌํ๋ค.
๊ทธ๋ฐ๋ฐ ์ ์๊ฐํด ๋ณด์. ๊ทธ๋ฆผ 1๋ฒ์์ Topic-01์ Topic-02๋ ์๋ก ๋ ๋ฆฝ์ ์ธ ๊ฐ๋ ์ด๋ฏ๋ก ๋ฐ๋์ ์์ฐจ์ ์ผ๋ก ์ฒ๋ฆฌํ ํ์๋ ์์ ๊ฒ์ด๋ค. ๋ํ ๊ทธ๋ฆผ 2์์๋ ๋ฉ์์ง ์ฒ๋ฆฌ ์์๊ฐ ์ค์ํ์ง ์๋ค๋ฉด ์ฌ๋ฌ ํํฐ์ ์ ๋์์ ์ฒ๋ฆฌํ๋ ๊ฒ์ด ๋ ํจ์จ์ ์ผ ์ ์์ ๊ฒ์ด๋ค.
์ด๋ฅผ ์ํด @KafkaListener์๋ concurrency ์์ฑ์ ์ ๊ณตํ๋๋ฐ, ์ด ๊ฐ์ ์ค์ ํ๋ฉด ์ปจ์๋จธ ์ค๋ ๋ ๊ฐ์๋ฅผ ์์ฝ๊ฒ ์ง์ ํ ์ ์๋ค. concurrency ๊ฐ์ ์ง์ ํ๋ฉด ๋ด๋ถ์ ์ผ๋ก ์ฌ๋ฌ ๊ฐ์ Kafka ์ปจ์๋จธ ์ค๋ ๋๊ฐ ์์ฑ๋๋ฉฐ, ํ๋์ ๋ฆฌ์ค๋๊ฐ ์ฌ๋ฌ ๊ฐ์ ํํฐ์ ์ ๋์์ ์๋นํ ์ ์์ด ๋ณ๋ ฌ ์ฒ๋ฆฌ๊ฐ ๊ฐ๋ฅํด์ง๋ค.
@KafkaListener(topics = "my-topic", concurrency = "2")
public void listen(String message) {
System.out.println("Received: " + message);
}
์์ ๊ฐ์ด concurrency ๊ฐ์ 2๋ก ์ค์ ํ๋ฉด ํ๋์ ๋ฆฌ์ค๋์์ ๋ ๊ฐ์ ์ปจ์๋จธ ์ค๋ ๋๊ฐ ์คํ๋์ด ๋ณ๋ ฌ๋ก ๋ฉ์์ง๋ฅผ ์ฒ๋ฆฌํ ์ ์๊ฒ ๋๋ค.
concurrency ๊ฐ์๊ฐ ๋์ผ๋ฉด ๋ฌด์กฐ๊ฑด ์ข์๊น?
concurrency ๊ฐ์ ๋์ด๋ฉด ๋ ๋ง์ Kafka ์ปจ์๋จธ ์ค๋ ๋๊ฐ ์์ฑ๋๋ฉฐ, ์ฌ๋ฌ ํํฐ์ ์ ๋ฉ์์ง๋ฅผ ๋์์ ์ฒ๋ฆฌํ ์ ์์ด ์ฒ๋ฆฌ ์๋๊ฐ ํฅ์๋๋ค. ํ์ง๋ง ๋ฌด์กฐ๊ฑด ๋์ ๊ฐ์ ์ค์ ํ๋ค๊ณ ํด์ ํญ์ ์ต์ ์ ์ฑ๋ฅ์ ๋ณด์ฅํ๋ ๊ฒ์ ์๋๋ค. ์์ ์ค๋ช ํ ๊ฐ๋ ์์ ๋์น์ฑ์ ์๋ ์๊ฒ ์ง๋ง, concurrency ๊ฐ์ ์ค์ ํ ๋๋ ๋ฐ๋์ ํ ๋น๋ฐ์ ํํฐ์ ์ ๊ฐ์๋ฅผ ๊ณ ๋ คํด์ผ ํ๋ค. Kafka์ ๋ฉ์์ง ์๋น๋ ํํฐ์ ๋จ์๋ก ์ด๋ฃจ์ด์ง๊ธฐ ๋๋ฌธ์ concurrency ๊ฐ์ด ํํฐ์ ๊ฐ์๋ฅผ ์ด๊ณผํ๋ฉด ์ฑ๋ฅ ํฅ์์๋ ๋์์ด ๋์ง ์๋๋ค.
์์๋ฅผ ํตํด ์ง์ ์ดํด๋ณด์.
1. ํ ํฝ์ด 1๊ฐ์ผ ๊ฒฝ์ฐ(๋ฉ์์ง 3๊ฐ)
- ํํฐ์ 1๊ฐ, concurrency 1
- ํํฐ์ 1๊ฐ, concurrency 2
- ํํฐ์ 3๊ฐ, concurrency 6
1. ํ ํฝ 1๊ฐ(ํํฐ์ 1๊ฐ, concurrency 1)
Received message in group 'test': Test Message 0 | Processed by: Thread-1
Received message in group 'test': Test Message 1 | Processed by: Thread-1
Received message in group 'test': Test Message 2 | Processed by: Thread-1
Kafka ํ ํฝ์ ํ๋์ ํํฐ์ ๋ง ์กด์ฌํ๋ ์ํ์์ concurrency ๊ฐ์ 1๋ก ์ค์ ํ ๊ฒฝ์ฐ, ๋จ์ผ ์ค๋ ๋๊ฐ ๋ชจ๋ ๋ฉ์์ง๋ฅผ ์์ฐจ์ ์ผ๋ก ์ฒ๋ฆฌํ๊ฒ ๋๋ค. ์ค์ ๋ก 3๊ฐ์ ๋ฉ์์ง๋ฅผ ๋ฐํํ ํ ๋ก๊ทธ๋ฅผ ํ์ธํด ๋ณด๋ฉด, ํ๋์ ์ค๋ ๋๊ฐ ๋ชจ๋ ๋ฉ์์ง๋ฅผ ๋ด๋นํ๊ณ ์๋ ๊ฑธ ํ์ธํ ์ ์๋ค.
2. ํ ํฝ 1๊ฐ(ํํฐ์ 1๊ฐ, concurrency 2)
Received message in group 'test': Test Message 0 | Processed by: Thread-1
Received message in group 'test': Test Message 1 | Processed by: Thread-1
Received message in group 'test': Test Message 2 | Processed by: Thread-1
์ด์ concurrency ๊ฐ์ 2๋ก ์ฆ๊ฐ์์ผ ๋ณด์๋ค. ์ง๊ด์ ์ผ๋ก ์๊ฐํ๋ฉด ๋ ๊ฐ์ ์ค๋ ๋๊ฐ ์์ฑ๋์ด, ๋ณ๋ ฌ ์ฒ๋ฆฌ๋ฅผ ํตํด ์์ ์๊ฐ์ด ๋จ์ถ๋ ๊ฒ์ฒ๋ผ ๋ณด์ธ๋ค. ํ์ง๋ง ์ค์ ๋ก๋ ์ฌ์ ํ ํ๋์ ์ค๋ ๋๋ง ๋ฉ์์ง๋ฅผ ์๋นํ๋ค. ์ด์ ๋ ๋์ผํ ํํฐ์ ์ด๊ธฐ ๋๋ฌธ์ ํ๋์ ์ค๋ ๋๋ง ํด๋น ํํฐ์ ์ ๋ ์ ์ ์ผ๋ก ๊ฐ์ ธ๊ฐ๊ณ , ๋๋จธ์ง ์ค๋ ๋๋ ๋๊ธฐ ์ํ๊ฐ ๋๊ธฐ ๋๋ฌธ์ด๋ค.(์ด๊ฑด Kafka ๋์์ ๊ทผ๋ณธ๊ณผ ๊ด๋ จ๋ ๊ฒ์ผ๋ก ๋์ผํ ํํฐ์ ๋ด์์๋ ๋ฉ์์ง์ ์์๋ฅผ ๋ณด์ฅํด์ผ ํ๊ธฐ ๋๋ฌธ์ด๋ค.)
๊ฒฐ๊ณผ์ ์ผ๋ก concurrency=2๋ผ๊ณ ํด๋ concurrency=1๊ณผ ๋์ผํ ๋ฐฉ์์ผ๋ก ๋ฉ์์ง๊ฐ ์ฒ๋ฆฌ๋์๋ค.
3. ํ ํฝ 1๊ฐ(ํํฐ์ 3๊ฐ, concurrency 6)
Received message in group 'test': Test Message 0 | Processed by: Thread-1
Received message in group 'test': Test Message 1 | Processed by: Thread-2
Received message in group 'test': Test Message 2 | Processed by: Thread-3
์ด๋ฒ์๋ ํํฐ์ ์ 3๊ฐ๋ก ๋๋ฆฌ๊ณ , concurrency ๊ฐ์ 6์ผ๋ก ์ค์ ํด ๋ณด์๋ค. Kafka๋ ํํฐ์ ๋จ์๋ก ์ปจ์๋จธ๋ฅผ ํ ๋นํ๊ธฐ ๋๋ฌธ์, ์ด ๊ฒฝ์ฐ ์๋ก ๋ค๋ฅธ ํํฐ์ ์ด๊ธฐ์ ์ต๋ 3๊ฐ์ ์ค๋ ๋๊ฐ ๋์์ ๋ฉ์์ง๋ฅผ ์๋นํ ์ ์๋ค. ์คํ ๊ฒฐ๊ณผ๋ฅผ ๋ณด๋ฉด 3๊ฐ์ ์ค๋ ๋๊ฐ ๊ฐ๊ฐ ๋ค๋ฅธ ํํฐ์ ์์ ๋ฉ์์ง๋ฅผ ๋ณ๋ ฌ๋ก ์ฒ๋ฆฌํ๊ณ ์์์ ํ์ธํ ์ ์๋ค. ํ์ง๋ง concurrency ๊ฐ์ 6์ผ๋ก ์ค์ ํ์์๋ 3๊ฐ์ ์ค๋ ๋๋ง ์ฌ์ฉ๋์๋๋ฐ, Kafka์์ ํ์ฑํ๋๋ ์ปจ์๋จธ ์ค๋ ๋ ๊ฐ์๋ ํ ๋น๋ ํํฐ์ ๊ฐ์๋ฅผ ์ด๊ณผํ ์ ์๊ธฐ ๋๋ฌธ์ด๋ค.
ํน์ ํ ํฝ-ํํฐ์ ์ ๋ช ์์ ์ผ๋ก ์ง์ ํ์ ๋ vs ์ง์ ํ์ง ์์์ ๋
์ฌ๊ธฐ์ ์กฐ๊ธ ๋ ๋ฅ ๋ค์ด๋ธ๋ฅผ ํด๋ณด๋ฉด ์ฌ์ค ์ ํํ๋ ํ ํฝ-ํํฐ์ ์ ๋ช ์์ ์ผ๋ก ์ง์ ํ๋์ง ์ฌ๋ถ์ ๋ฐ๋ผ concurrency ์ค๋ ๋์ ์์ฑ ๋ฐฉ์์ ๋ฌ๋ผ์ง๋ค.
1. ํน์ ํ ํฝ-ํํฐ์ ์ ๋ช ์์ ์ผ๋ก ์ง์ ํ ๊ฒฝ์ฐ
์๋ฅผ ๋ณด๋ฉด @KafkaListener์์ ๋ช ์์ ์ผ๋ก ํ ํฝ-ํํฐ์ ๊ฐ์๋ฅผ ์ง์ ํด ์คฌ๋ค. ์ด๋ด ๊ฒฝ์ฐ concurrency๋ฅผ 6์ผ๋ก ์ค์ ํ๋๋ผ๋ Kafka๋ ์ ์ด์ ์ง์ ๋ ํํฐ์ ๊ฐ์๊น์ง๋ง ์์ฑํ๋ค. ๋ฐ๋ผ์ ์ค์ ๋ก ์คํ๋๋ ์ปจ์๋จธ ์ค๋ ๋๋ 3๊ฐ์ด๋ฉฐ ๋ถํ์ํ ์ถ๊ฐ ์ค๋ ๋๋ ์์ฑ๋์ง ์๋๋ค.
2. ํ ํฝ-ํํฐ์ ์ ๋ช ์์ ์ผ๋ก ์ง์ ํ์ง ์์์ ๊ฒฝ์ฐ
๋ฐ๋ฉด ํน์ ํํฐ์ ์ ๋ช ์์ ์ผ๋ก ์ง์ ํ์ง ์์ ๊ฒฝ์ฐ concurrency=6์ผ๋ก ์ค์ ํ๋ฉด Spring Kafka๋ 6๊ฐ์ ์ปจ์๋จธ ์ค๋ ๋๋ฅผ ์์ฑํ๋ค. ๊ทธ๋ฆฌ๊ณ Kafka๋ ํํฐ์ ๊ฐ์์ ๋ง์ถฐ ์ปจ์๋จธ๋ฅผ ํ ๋นํ๊ธฐ ๋๋ฌธ์ ์ค์ ๋ก ๋ฉ์์ง๋ฅผ ์๋นํ๋ ์ค๋ ๋๋ 3๊ฐ๋ฟ์ด๋ฉฐ, ๋๋จธ์ง 3๊ฐ๋ ์ ํ์ํ๋ก ๋จ๊ฒ ๋๋ค. ์ฆ 6๊ฐ์ ์ค๋ ๋๊ฐ ์์ฑ๋์ง๋ง ์ค์ ๋ก ๋ฉ์์ง๋ฅผ ์๋นํ๋ ์ค๋ ๋๋ 3๊ฐ๋ฟ์ด๋ผ๋ ๋ง์ด๋ค.
์ค์ ๋ด๋ถ ๊ตฌํ ์ฝ๋๋ ์๋์ ConcurrentMessageListenerContainer.java#L243๋ฅผ ๋ณด๋ฉด ๋๋ค.
๋ญ ์ผ๋ฐ์ ์ผ๋ก ์ค์ ํ์ ์์๋ ์ผ๋ฐ์ ์ผ๋ก ํํฐ์ ๊ฐ์๋ฅผ ๋ช ์์ ์ผ๋ก ์ง์ ํ๋ ๊ฒฝ์ฐ๋ ์๋ง ์ ์์ ๊ฒ์ด๋ค. ๋ณดํต ํ ํฝ๋ง ์ง์ ํ๊ณ ํน์ ํํฐ์ ๊น์ง๋ ์ง์ ํ์ง ์์ผ๋๊น.. ๊ทธ๋ฅ ์์๋๋ฉด ์ข์ ๋ด์ฉ์ด๋ค.
์ ๋ฆฌ
์ด๋ฒ ๊ธ์์๋ Kafka์ @KafkaListener์์ concurrency ๊ฐ์ ํ์ฉํ ๋ณ๋ ฌ ์ฒ๋ฆฌ ๋ฐฉ์์ ์ดํด๋ณด์๋ค. concurrency๋ Kafka์์ ๋ณ๋ ฌ ์ฒ๋ฆฌ๋ฅผ ๊ฐ๋ฅํ๊ฒ ํ๋ ์ ์ฉํ ์ค์ ์ด์ง๋ง, ํํฐ์ ๊ฐ์๋ฅผ ์ด๊ณผํ๋ concurrency ๊ฐ์ ์๋ฏธ๊ฐ ์์ผ๋ฉฐ, ์ค์ ๋ฐฉ์์ ๋ฐ๋ผ ์์ฑ๋๋ ์ปจ์๋จธ ์ค๋ ๋ ๊ฐ์๊ฐ ๋ฌ๋ผ์ง ์ ์์์ ํ์ธํ๋ค.
ํนํ ํน์ ํํฐ์ ์ ๋ช ์์ ์ผ๋ก ์ง์ ํ ๊ฒฝ์ฐ์๋ Kafka๊ฐ ํด๋น ํํฐ์ ๊ฐ์๊น์ง๋ง ์ปจ์๋จธ๋ฅผ ์์ฑํ์ง๋ง, ๊ทธ๋ ์ง ์์ผ๋ฉด concurrency ๊ฐ๋งํผ ์ปจ์๋จธ ์ค๋ ๋๊ฐ ์์ฑ๋๋ ์ผ๋ถ๋ ์ ํด ์ํ๋ก ๋จ์๋ค.
๊ฒฐ๊ตญ ํจ์จ์ ์ธ ๋ณ๋ ฌ ์ฒ๋ฆฌ๋ฅผ ์ํด์๋ concurrency ๊ฐ์ ํํฐ์ ๊ฐ์์ ๋ง์ถ๋ ๊ฒ์ด ์ค์ํ๋ค. ๊ทธ๋ฆฌ๊ณ ์ค๋ฌด์ ์ ์ฉํ ๋๋ ๊ณ ๋ คํด์ผ ํ ์ ์ด ์๋ค. ๋ฐ๋ก ๋ฆฌ๋ฐธ๋ฐ์ฑ์ด ์ผ์ด๋ ๋์ธ๋ฐ ๋ฆฌ๋ฐธ๋ฐ์ฑ์ด ๋ฐ์ํ๋ฉด ํํฐ์ ์ฌํ ๋น์ด ์ผ์ด๋๊ณ ์ด๋ ๊ฐ ์ปจ์๋จธ๊ฐ ํญ์ ๋์ผํ ๊ฐ์์ ํํฐ์ ์ ํ ๋น๋ฐ๋๋ค๋ ๋ณด์ฅ์ด ์์ผ๋ฏ๋ก, ์ ํด ์ํ์ ์ค๋ ๋๊ฐ ๋ฐ์ํ ๊ฐ๋ฅ์ฑ์ด ์๋ค. ๊ทธ๋์ ์ค๋ฌด์ ์ ์ฉํ๋ค๋ฉด ์ด์ ํ๊ฒฝ๊ณผ ์์คํ ์ ํน์ฑ์ ๋ง์ถฐ ์ ์ฐํ๊ฒ ์กฐ์ ํด์ผ ํจ์ ์ธ์งํ์.
'BackEnd๐ฑ > Kafka' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
Spring Boot์์ ์ฌ๋ฌ Kafka ํด๋ฌ์คํฐ ์ฌ์ฉํ๊ธฐ (0) | 2024.08.17 |
---|---|
[Kafka] ์ปจ์๋จธ ์คํ์ ์๋์ผ๋ก ์ปค๋ฐํ๊ธฐ (0) | 2024.03.06 |
[Kafka] ํ๋ก๋์ ๋ฉฑ๋ฑ์ฑ ๋ณด์ฅํ๊ธฐ (0) | 2024.02.01 |
[Kafka] ๋ฆฌ๋ฐธ๋ฐ์ฑ ์ข ๋ฅ์ ํํฐ์ ํ ๋น ์ ๋ต (0) | 2024.01.07 |
๋๊ธ