๋ณธ๋ฌธ ๋ฐ”๋กœ๊ฐ€๊ธฐ
BackEnd๐ŸŒฑ/Etc

[Kafka] ์ปจ์Šˆ๋จธ ์˜คํ”„์…‹ ์ˆ˜๋™์œผ๋กœ ์ปค๋ฐ‹ํ•˜๊ธฐ

by ์•ˆ์ฃผํ˜• 2024. 3. 6.

๊ฐœ์š”

๋ฉ”์‹œ์ง€ ์†์‹ค์„ ๋ฐฉ์ง€ํ•˜๊ธฐ ์œ„ํ•ด ๋ฉ”์‹œ์ง€ ์ฒ˜๋ฆฌ๊ฐ€ ๋ฌธ์ œ์—†์ด ์™„๋ฃŒ๋˜์—ˆ์„ ๊ฒฝ์šฐ์—๋งŒ commit์„ ์ˆ˜ํ–‰ํ•˜๋„๋ก, ์ปจ์Šˆ๋จธ์˜ offset commit์„ ์ˆ˜๋™์œผ๋กœ ์„ค์ •ํ•˜๋Š” ๊ฒฝ์šฐ๊ฐ€ ์—ฌ๋Ÿฌ ์กด์žฌํ•ฉ๋‹ˆ๋‹ค. ํ•˜์ง€๋งŒ ๋‹จ์ˆœํžˆ auto.offset.commit๋งŒ false๋กœ ์ง€์ •ํ•˜๋Š” ๊ฒฝ์šฐ์—๋Š” ์›ํ•˜๋Š” ๋ฐฉํ–ฅ์œผ๋กœ ๋™์ž‘ํ•˜์ง€ ์•Š์„ ์ˆ˜๋„ ์žˆ์Šต๋‹ˆ๋‹ค.

์ด๋ฒˆ ๊ฒŒ์‹œ๊ธ€์—์„œ๋Š” ์ž๋™ ์ปค๋ฐ‹์˜ ๋™์ž‘ ๊ณผ์ •๊ณผ ์ฃผ์˜์ , ๊ทธ๋ฆฌ๊ณ  ์ปค๋ฐ‹์„ ์ˆ˜๋™์œผ๋กœ ์ œ์–ดํ•˜๊ธฐ ์œ„ํ•œ auto.offset.commit ์„ค์ •๊ณผ ack-mode ์„ค์ •์— ๋Œ€ํ•ด ์•Œ์•„๋ณด๊ฒ ์Šต๋‹ˆ๋‹ค.
 
 

auto.offset.commit

Kafka์˜ ์ปจ์Šˆ๋จธ๋Š” ์ฝ์€ ๋ฉ”์‹œ์ง€์˜ ์œ„์น˜๋ฅผ ์ถ”์ ํ•˜๊ธฐ ์œ„ํ•ด offset์„ commit ํ•˜๋Š” ์—ญํ• ์„ ๋‹ด๋‹นํ•ฉ๋‹ˆ๋‹ค. ์ด offset์€ Kafka์˜ ๋‚ด๋ถ€ ํ† ํ”ฝ์ธ __consumer_offsets์— ์ €์žฅ๋˜๋ฉฐ, ์ปจ์Šˆ๋จธ๋Š” commit์ด ๋ฐœ์ƒํ•  ๋•Œ๋งˆ๋‹ค ์ด offset ๊ฐ’์„ ๊ฐฑ์‹ ํ•ฉ๋‹ˆ๋‹ค. ์ดํ›„ ์ปจ์Šˆ๋จธ๋Š” ์ด offset ๊ฐ’์„ ์ฐธ์กฐํ•˜์—ฌ ๋‹ค์Œ์— ์ฒ˜๋ฆฌํ•  ๋ ˆ์ฝ”๋“œ๋ฅผ ์ฝ์–ด์˜ต๋‹ˆ๋‹ค.

์ปจ์Šˆ๋จธ๊ฐ€ offset์„ commit ํ•˜๋Š” ๋ฐฉ์‹์—๋Š” ์ž๋™ ์ปค๋ฐ‹๊ณผ ์ˆ˜๋™ ์ปค๋ฐ‹์ด ์žˆ์Šต๋‹ˆ๋‹ค. auto commit์€ auto.offset.commit ์„ค์ •์ด true์ผ ๋•Œ ํ™œ์„ฑํ™”๋˜๋ฉฐ, ๊ธฐ๋ณธ์ ์œผ๋กœ ์ด ๊ฐ’์€ true๋กœ ์„ค์ •๋˜์–ด ์žˆ์Šต๋‹ˆ๋‹ค.

๋งŒ์•ฝ ์ž๋™ ์ปค๋ฐ‹์ด ํ™œ์„ฑํ™”๋˜์–ด ์žˆ์„ ๊ฒฝ์šฐ, poll() ๋ฉ”์„œ๋“œ๊ฐ€ ์‹คํ–‰๋˜์—ˆ์„ ๋•Œ ์ˆ˜ํ–‰ ์‹œ๊ฐ„์ด auto.commit.interval.ms ์„ค์ •๊ฐ’์„ ์ดˆ๊ณผํ•˜๋Š” ๊ฒฝ์šฐ ์ž๋™์œผ๋กœ offset์ด commit ๋ฉ๋‹ˆ๋‹ค.

https://www.conduktor.io/kafka/delivery-semantics-for-kafka-consumers/

๋”ฐ๋ผ์„œ ์ปจ์Šˆ๋จธ๋Š” poll์„ ์š”์ฒญํ•  ๋•Œ๋งˆ๋‹ค auto.commit.interval.ms์— ์ •์˜ํ•œ ์ปค๋ฐ‹ ํ•  ์‹œ๊ฐ„์ธ์ง€ ์•„๋‹Œ์ง€ ์ฒดํฌํ•˜๊ฒŒ ๋˜๊ณ , poll ์š”์ฒญ์œผ๋กœ ๊ฐ€์ ธ์˜จ ๋งˆ์ง€๋ง‰ ์˜คํ”„์…‹์„ ์ปค๋ฐ‹ํ•ฉ๋‹ˆ๋‹ค. 

์‹ค์ œ๋กœ Kafka์˜ ์†Œ์Šค ์ฝ”๋“œ๋ฅผ ์‚ดํŽด๋ณด๋ฉด, poll() ๋ฉ”์„œ๋“œ ๋‚ด๋ถ€์—์„œ maybeAutoCommitOffsetsAsync() ๋ฉ”์„œ๋“œ๊ฐ€ ํ˜ธ์ถœ๋˜์–ด ๋น„๋™๊ธฐ์ ์œผ๋กœ auto commit์ด ์ผ์–ด๋‚˜๋Š” ๊ฒƒ์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. - ConsumerCoordinator.java #L1214

public void maybeAutoCommitOffsetsAsync(long now) {
    if (autoCommitEnabled) {
        nextAutoCommitTimer.update(now);
        if (nextAutoCommitTimer.isExpired()) {
            nextAutoCommitTimer.reset(autoCommitIntervalMs);
            doAutoCommitOffsetsAsync();
        }
    }
}

์œ„ ๋ฉ”์„œ๋“œ์˜ ๋™์ž‘ ๊ณผ์ •์€ ๋‹ค์Œ๊ณผ ๊ฐ™์Šต๋‹ˆ๋‹ค.

  1. ์ž๋™ ์ปค๋ฐ‹ ํ™œ์„ฑํ™” ์—ฌ๋ถ€ ํ™•์ธ: autoCommitEnabled ๊ฐ’์„ ํ†ตํ•ด ์ž๋™ ์ปค๋ฐ‹ ๊ธฐ๋Šฅ์ด ํ™œ์„ฑํ™”๋˜์–ด ์žˆ๋Š”์ง€ ํ™•์ธํ•ฉ๋‹ˆ๋‹ค.
  2. ํƒ€์ด๋จธ ์—…๋ฐ์ดํŠธ (nextAutoCommitTimer.update(now)): ํ˜„์žฌ ์‹œ๊ฐ(now)์„ ๊ธฐ์ค€์œผ๋กœ ๋‹ค์Œ ์ž๋™ ์ปค๋ฐ‹์ด ์˜ˆ์ •๋œ ํƒ€์ด๋จธ๋ฅผ ์—…๋ฐ์ดํŠธํ•ฉ๋‹ˆ๋‹ค. update ๋ฉ”์„œ๋“œ๋Š” ํ˜„์žฌ ์‹œ๊ฐ„๊ณผ ๋งˆ์ง€๋ง‰ ์˜คํ”„์…‹ ์ปค๋ฐ‹ ์‹œ๊ฐ„ ์‚ฌ์ด์˜ ๊ฒฝ๊ณผ ์‹œ๊ฐ„์„ ๊ณ„์‚ฐํ•˜์—ฌ, ๋‹ค์Œ ์ž๋™ ์ปค๋ฐ‹ ์‹คํ–‰๊นŒ์ง€ ๋‚จ์€ ์‹œ๊ฐ„์„ ์กฐ์ •ํ•ฉ๋‹ˆ๋‹ค. ์ด ๊ณผ์ •์€ ์ž๋™ ์ปค๋ฐ‹ ์ฃผ๊ธฐ๋ฅผ ๊ด€๋ฆฌํ•˜๊ธฐ ์œ„ํ•œ ๊ฒƒ์œผ๋กœ, ์‹ค์ œ ์ปค๋ฐ‹ ์ž‘์—…์ด ์‹œ์ž‘๋˜๊ธฐ ์ „์— ํ˜„์žฌ ์‹œ๊ฐ„๊ณผ ์„ค์ •๋œ ์ปค๋ฐ‹ ๊ฐ„๊ฒฉ์„ ๊ธฐ๋ฐ˜์œผ๋กœ ํƒ€์ด๋จธ ์ƒํƒœ๋ฅผ ์ตœ์‹ ํ™”ํ•ฉ๋‹ˆ๋‹ค.
  3. ํƒ€์ด๋จธ ๋งŒ๋ฃŒ ์—ฌ๋ถ€ ํ™•์ธ (nextAutoCommitTimer.isExpired()): ์—…๋ฐ์ดํŠธ๋œ ํƒ€์ด๋จธ๋ฅผ ๊ฒ€์‚ฌํ•˜์—ฌ ์„ค์ •๋œ ์ž๋™ ์ปค๋ฐ‹ ๊ฐ„๊ฒฉ์ด ๋งŒ๋ฃŒ๋˜์—ˆ๋Š”์ง€ ํ™•์ธํ•ฉ๋‹ˆ๋‹ค. ๋งŒ์•ฝ ํƒ€์ด๋จธ๊ฐ€ ๋งŒ๋ฃŒ๋˜์—ˆ๋‹ค๋ฉด (์ฆ‰, auto.commit.interval.ms์— ์„ค์ •๋œ ์‹œ๊ฐ„์ด ๊ฒฝ๊ณผํ–ˆ๋‹ค๋ฉด), ๋‹ค์Œ ๋‹จ๊ณ„๋กœ ์ง„ํ–‰ํ•ฉ๋‹ˆ๋‹ค.
  4. ํƒ€์ด๋จธ ๋ฆฌ์…‹ (nextAutoCommitTimer.reset(autoCommitIntervalMs)): ํƒ€์ด๋จธ๋ฅผ ๋ฆฌ์…‹ํ•˜์—ฌ ๋‹ค์Œ ์ž๋™ ์ปค๋ฐ‹ ์‹œ์ ์„ ํ˜„์žฌ ์‹œ๊ฐ์œผ๋กœ๋ถ€ํ„ฐ ์„ค์ •๋œ ๊ฐ„๊ฒฉ(autoCommitIntervalMs) ์ดํ›„๋กœ ์กฐ์ •ํ•ฉ๋‹ˆ๋‹ค. ์ด๋Š” ์ž๋™ ์ปค๋ฐ‹์ด ์‹คํ–‰๋œ ํ›„ ๋‹ค์Œ ์ž๋™ ์ปค๋ฐ‹ ์ฃผ๊ธฐ๋ฅผ ์‹œ์ž‘ํ•˜๊ธฐ ์œ„ํ•œ ์ค€๋น„ ๋‹จ๊ณ„์ž…๋‹ˆ๋‹ค.
  5. ๋น„๋™๊ธฐ ์˜คํ”„์…‹ ์ปค๋ฐ‹ ์‹คํ–‰ (doAutoCommitOffsetsAsync()): ์‹ค์ œ๋กœ ์˜คํ”„์…‹ ์ปค๋ฐ‹์„ ๋น„๋™๊ธฐ์ ์œผ๋กœ ์ˆ˜ํ–‰ํ•˜๋Š” ๋ฉ”์„œ๋“œ๋ฅผ ํ˜ธ์ถœํ•ฉ๋‹ˆ๋‹ค. ์ด ๋‹จ๊ณ„์—์„œ ์ปจ์Šˆ๋จธ๊ฐ€ ์ตœ๊ทผ์— ์ฒ˜๋ฆฌํ•œ ๋ฉ”์‹œ์ง€์˜ ์˜คํ”„์…‹์ด Kafka์— ์ปค๋ฐ‹๋ฉ๋‹ˆ๋‹ค.

 
 

์ž๋™ ์ปค๋ฐ‹ ์„ค์ • ์‹œ ์ฃผ์˜์ 

์ž๋™ ์ปค๋ฐ‹ ์„ค์ •์€ ํŽธ๋ฆฌํ•˜์ง€๋งŒ ์ปจ์Šˆ๋จธ ๊ทธ๋ฃน์˜ ๋ฆฌ๋ฐธ๋Ÿฐ์‹ฑ์ด ๋ฐœ์ƒํ•  ๋•Œ ๋ฉ”์‹œ์ง€์˜ ์ค‘๋ณต ์ฒ˜๋ฆฌ๋‚˜ ์œ ์‹ค ๋ฌธ์ œ๊ฐ€ ๋ฐœ์ƒํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

1. ๋ฉ”์‹œ์ง€ ์œ ์‹ค

์ž๋™ ์ปค๋ฐ‹ ์„ค์ •์ด 1์ดˆ๋กœ ์ง€์ •๋œ ๊ฒฝ์šฐ๋ฅผ ์ƒ์ƒํ•ด ๋ณด๊ฒ ์Šต๋‹ˆ๋‹ค. ์ปจ์Šˆ๋จธ๋Š” ์ •์ƒ์ ์œผ๋กœ poll() ๋ฉ”์„œ๋“œ๋ฅผ ํ†ตํ•ด ๋ฉ”์‹œ์ง€๋ฅผ ๊ฐ€์ ธ์˜ค๊ณ  ํ•ด๋‹น ๋ฉ”์‹œ์ง€๋ฅผ ์ฒ˜๋ฆฌํ•ฉ๋‹ˆ๋‹ค.

๊ทธ๋Ÿฌ๋‚˜ ๋ฉ”์‹œ์ง€ ์ฒ˜๋ฆฌ ๊ณผ์ •์ด 1์ดˆ๋ฅผ ์ดˆ๊ณผํ•˜๋Š” ์‹œ๊ฐ„์ด ๊ฑธ๋ฆด ๊ฒฝ์šฐ ๋ฌธ์ œ๊ฐ€ ๋ฐœ์ƒํ•ฉ๋‹ˆ๋‹ค. ์˜ˆ๋ฅผ ๋“ค์–ด ๋ฉ”์‹œ์ง€๋ฅผ DB์— ์ €์žฅํ•˜๊ฑฐ๋‚˜ ๋‹ค๋ฅธ ์„œ๋น„์Šค์— ์ „๋‹ฌํ•˜๋Š” ๊ณผ์ •์—์„œ ๋„คํŠธ์›Œํฌ ์ง€์—ฐ์ด๋‚˜ ์žฅ์•  ๋“ฑ ์–ด๋–ค ์ด์œ ๋กœ๋“  ์ฒ˜๋ฆฌ ์‹œ๊ฐ„์ด 1์ดˆ๋ฅผ ์ดˆ๊ณผํ•˜์˜€๋‹ค๊ณ  ๊ฐ€์ •ํ•ด ๋ด…์‹œ๋‹ค. ์ด๋Ÿฌํ•œ ์ƒํ™ฉ์—์„œ ๋ฉ”์‹œ์ง€ ์ฒ˜๋ฆฌ๊ฐ€ ์™„๋ฃŒ๋˜๊ธฐ ์ „์— ์˜คํ”„์…‹ ์ปค๋ฐ‹์ด ์ผ์–ด๋‚˜๊ณ , ๊ทธ ํ›„ ๋ฉ”์‹œ์ง€ ์ฒ˜๋ฆฌ์—์„œ ์—๋Ÿฌ๊ฐ€ ๋ฐœ์ƒํ•˜๋Š” ๊ฒฝ์šฐ ํ•ด๋‹น ๋ฉ”์‹œ์ง€๋Š” ๋‹ค์‹œ ์ฒ˜๋ฆฌ๋  ์ˆ˜ ์—†๊ฒŒ ๋ฉ๋‹ˆ๋‹ค. ์ฆ‰ ๋ฉ”์‹œ์ง€ ์œ ์‹ค์ด ์ผ์–ด๋‚˜๊ฒŒ ๋ฉ๋‹ˆ๋‹ค.

2. ๋ฉ”์‹œ์ง€ ์ค‘๋ณต

๋‹ค์Œ๊ณผ ๊ฐ™์€ ์„ค์ •์˜ ๊ฒฝ์šฐ, ๋ฉ”์‹œ์ง€์˜ ์ค‘๋ณต ์ฒ˜๋ฆฌ๊ฐ€ ๋ฐœ์ƒํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

  • enable.auto.commit = true
  • auto.commit.interval.ms = 5000ms
  • max.poll.records = 100

์ปจ์Šˆ๋จธ๋Š” poll() ๋ฉ”์„œ๋“œ๋ฅผ ํ˜ธ์ถœํ•˜์—ฌ ์ตœ๋Œ€ 100๊ฐœ์˜ ๋ ˆ์ฝ”๋“œ๋ฅผ ๊ฐ€์ ธ์˜ต๋‹ˆ๋‹ค. ๊ทธ ํ›„ ์ด ์ค‘ 30๊ฐœ์˜ ๋ ˆ์ฝ”๋“œ๊นŒ์ง€ ์ฒ˜๋ฆฌํ•œ ์ƒํƒœ์—์„œ ์ปค๋ฐ‹์ด ์ด๋ฃจ์–ด์ง€๊ธฐ ์ „์— ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜์ด ์ข…๋ฃŒ๋˜๊ฑฐ๋‚˜ ๋ฆฌ๋ฐธ๋Ÿฐ์‹ฑ์ด ๋ฐœ์ƒํ•˜๋ฉด ์ปจ์Šˆ๋จธ๋“ค์ด ์žฌํ• ๋‹น๋ฉ๋‹ˆ๋‹ค. ์ดํ›„ ์ปจ์Šˆ๋จธ๋Š” ๋งˆ์ง€๋ง‰์œผ๋กœ ์ปค๋ฐ‹๋œ offset๋ถ€ํ„ฐ ๋‹ค์‹œ ๋ฐ์ดํ„ฐ๋ฅผ polling ํ•˜๊ฒŒ ๋˜๋Š”๋ฐ, ์ด๋ฏธ ์„ฑ๊ณต์ ์œผ๋กœ ์ฒ˜๋ฆฌํ–ˆ๋˜ 30๊ฐœ์˜ ๋ ˆ์ฝ”๋“œ๋ฅผ ๋‹ค์‹œ ์ค‘๋ณต ์ฒ˜๋ฆฌํ•˜๊ฒŒ ๋ฉ๋‹ˆ๋‹ค.

๋ฉ”์‹œ์ง€ ์œ ์‹ค์˜ ๊ฒฝ์šฐ Dead Letter Queue(DLT)๋ฅผ ๊ตฌ์„ฑํ•˜์—ฌ ์ฒ˜๋ฆฌํ•˜์ง€ ๋ชปํ•œ ๋ฉ”์‹œ์ง€๋ฅผ ๋ณ„๋„๋กœ ๊ด€๋ฆฌํ•˜๊ณ  ์žฌ์ฒ˜๋ฆฌํ•˜๋Š” ๋ฐฉ์‹์œผ๋กœ ํ•ด๊ฒฐํ•  ์ˆ˜ ์žˆ๊ณ , ๋ฉ”์‹œ์ง€ ์ค‘๋ณต์˜ ๊ฒฝ์šฐ๋Š” ๋น„์ฆˆ๋‹ˆ์Šค ๋กœ์ง ๋‚ด๋ถ€์—์„œ ๋ฉฑ๋“ฑ์„ฑ์„ ๋ณด์žฅํ•˜๋Š” ๋กœ์ง์„ ํ†ตํ•ด ํ•ด๊ฒฐํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

ํ•˜์ง€๋งŒ ๋ฉ”์‹œ์ง€ ์ฒ˜๋ฆฌ๋ฅผ ํ™•์ธํ•œ ํ›„์— ์ˆ˜๋™์œผ๋กœ ์ปค๋ฐ‹ ์‹œ์ ์„ ์ ์ ˆํ•˜๊ฒŒ ์กฐ์ ˆํ•œ๋‹ค๋ฉด ์‚ฌ์ „์— ์ด๋Ÿฌํ•œ ๋ฌธ์ œ๋ฅผ ์กฐ๊ธˆ์ด๋‚˜๋งˆ ๋ฐฉ์ง€ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.
 
 

์ˆ˜๋™ ์ปค๋ฐ‹ ์„ค์ •

Kafka ์ปจ์Šˆ๋จธ์˜ ์ปค๋ฐ‹์„ ์ˆ˜๋™์œผ๋กœ ์„ค์ •ํ•˜๊ธฐ ์œ„ํ•ด์„œ๋Š” auto.offset.commit ์„ค์ •๋ฟ๋งŒ ์•„๋‹ˆ๋ผ ack-mode ์„ค์ •๊นŒ์ง€ ์ด๋ฃจ์–ด์ ธ์•ผ ํ•ฉ๋‹ˆ๋‹ค.

๋จผ์ € Kafka ์ปจ์Šˆ๋จธ ์„ค์ •์—์„œ ENABLE_AUTO_COMMIT_CONFIG๋ฅผ false๋กœ ์ง€์ •ํ•˜์—ฌ ์ž๋™ ์ปค๋ฐ‹ ๊ธฐ๋Šฅ์„ ๋น„ํ™œ์„ฑํ™”ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. enable.auto.commit ๊ธฐ๋ณธ๊ฐ’์€ true์ด๊ธฐ ๋•Œ๋ฌธ์—, false๋กœ ๋ช…์‹œ์ ์œผ๋กœ ์„ค์ •ํ•ด์•ผ ์ž๋™ ์ปค๋ฐ‹ ๊ธฐ๋Šฅ์ด ๋น„ํ™œ์„ฑํ™”๋ฉ๋‹ˆ๋‹ค.

private Map<String, Object> consumerConfigs() {
    Map<String, Object> configs = new HashMap<>();
    // ...
    configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    return configs;
}

๋‹ค์Œ์œผ๋กœ @KafkaListener์™€ ๊ฐ™์€ ๋ฆฌ์Šค๋„ˆ ์ปจํ…Œ์ด๋„ˆ๋ฅผ ์ด์šฉํ•ด ๋ฉ”์‹œ์ง€๋ฅผ ์†Œ๋น„ํ•œ๋‹ค๊ณ  ๊ฐ€์ •ํ•˜๊ฒ ์Šต๋‹ˆ๋‹ค.

@KafkaListener(topics = "test", groupId = "test")
public void listen(String message) {
    System.out.println("Received Message in group 'test': " + message);
}

์œ„์™€ ๊ฐ™์ด ์„ค์ •ํ•˜๋ฉด ๋ณ„๋„์˜ ์ปค๋ฐ‹์„ ์ˆ˜ํ–‰ํ•˜๋Š” ์ฝ”๋“œ๋ฅผ ์ž‘์„ฑํ•˜์ง€ ์•Š์•˜์œผ๋ฏ€๋กœ, ๋ฉ”์‹œ์ง€๋Š” ์†Œ๋น„๋˜์ง€๋งŒ offset commit์€ ๋ฐœ์ƒํ•˜์ง€ ์•Š์„ ๊ฒƒ์ด๋ผ๊ณ  ๊ธฐ๋Œ€ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ๊ทธ๋Ÿฌ๋‚˜ ์‹ค์ œ๋กœ๋Š” enable.auto.commit์„ false๋กœ ์ง€์ •ํ–ˆ์Œ์—๋„ ๋ถˆ๊ตฌํ•˜๊ณ  ์ปค๋ฐ‹์ด ์ •์ƒ์ ์œผ๋กœ ์ด๋ฃจ์–ด์ง€๋ฉฐ Lag๊ฐ€ ์Œ“์ด์ง€ ์•Š์Šต๋‹ˆ๋‹ค.

๊ทธ ์ด์œ ๋Š” enable.auto.commit=false ์„ค์ •์— ์˜ํ•ด Kafka ํด๋ผ์ด์–ธํŠธ์˜ ์ž๋™ ์ปค๋ฐ‹์€ ๋น„ํ™œ์„ฑ ๋˜์ง€๋งŒ, Spring์€ ๊ธฐ๋ณธ์ ์œผ๋กœ ์˜คํ”„์…‹ ์ปค๋ฐ‹์„ ํ™œ์„ฑํ™” ์ƒํƒœ๋กœ ๋‘๊ณ  ์žˆ๊ธฐ ๋•Œ๋ฌธ์ž…๋‹ˆ๋‹ค. ๋•Œ๋ฌธ์— ์œ„ ์„ค์ •์—์„œ๋Š” ์ž๋™ ์ปค๋ฐ‹ ๋•Œ์ฒ˜๋Ÿผ auto.commit.interval.ms์˜ ์˜ํ–ฅ์€ ๋ฐ›์ง€๋Š” ์•Š์ง€๋งŒ ๋ฆฌ์Šค๋„ˆ ๋ฉ”์„œ๋“œ๊ฐ€ ์™„๋ฃŒ๋˜๋ฉด ์ž๋™์œผ๋กœ ์ปค๋ฐ‹์ด ์ผ์–ด๋‚˜๊ฒŒ ๋ฉ๋‹ˆ๋‹ค.

๋”ฐ๋ผ์„œ ๋ฆฌ์Šค๋„ˆ ์ปจํ…Œ์ด๋„ˆ์˜ Ackmode๊นŒ์ง€ ๋ณ„๋„๋กœ ์ˆ˜์ •ํ•ด์•ผ๋งŒ ์ง„์ •ํ•œ ์˜๋ฏธ์˜ ์ˆ˜๋™ ์ปค๋ฐ‹์ด ์ด๋ฃจ์–ด์งˆ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. AckMode๋Š” ์„ค์ •์„ ์œ„ํ•œ ์—ฌ๋Ÿฌ ๋ฐฉ๋ฒ•์ด ์žˆ์ง€๋งŒ ์•„๋ž˜์ฒ˜๋Ÿผ ContainerFactory๋ฅผ ํ†ตํ•ด ๊ฐ„๋‹จํ•˜๊ฒŒ ์„ค์ •ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> testKafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> testFactory = new ConcurrentKafkaListenerContainerFactory<>();
    testFactory.getContainerProperties().setAckMode(AckMode.MANUAL);
    testFactory.setConsumerFactory(testConsumerFactory());
    return testFactory;
}
AcksMode ์„ค๋ช…
RECORD ๋ ˆ์ฝ”๋“œ ๋‹จ์œ„๋กœ ํ”„๋กœ์„ธ์‹ฑ ์ดํ›„ ์ปค๋ฐ‹ํ•œ๋‹ค.
BATCH poll() ๋ฉ”์„œ๋“œ๋กœ ํ˜ธ์ถœ๋œ ๋ ˆ์ฝ”๋“œ๊ฐ€ ๋ชจ๋‘ ์ฒ˜๋ฆฌ๋œ ์ดํ›„ ์ปค๋ฐ‹ํ•œ๋‹ค.
TIME ํŠน์ • ์‹œ๊ฐ„ ์ดํ›„์— ์ปค๋ฐ‹ํ•œ๋‹ค.
์ด ์˜ต์…˜์„ ์‚ฌ์šฉํ•  ๊ฒฝ์šฐ์—๋Š” ์‹œ๊ฐ„ ๊ฐ„๊ฒฉ์„ ์„ ์–ธํ•˜๋Š” `AckTime`์˜ต์…˜์„ ์„ค์ •ํ•ด์•ผ ํ•œ๋‹ค.
COUNT ํŠน์ • ๊ฐœ์ˆ˜๋งŒํผ ๋ ˆ์ฝ”๋“œ๊ฐ€ ์ฒ˜๋ฆฌ๋œ ์ดํ›„์— ์ปค๋ฐ‹ํ•œ๋‹ค.
์ด ์˜ต์…˜์„ ์‚ฌ์šฉํ•  ๊ฒฝ์šฐ์—๋Š” ๋ ˆ์ฝ”๋“œ ๊ฐœ์ˆ˜๋ฅผ ์„ ์–ธํ•˜๋Š” `AckCount`์˜ต์…˜์„ ์„ค์ •ํ•ด์•ผ ํ•œ๋‹ค.
COUNT_TIME TIME, COUNT ์˜ต์…˜ ์ค‘ ๋งž๋Š” ์กฐ๊ฑด์ด ํ•˜๋‚˜๋ผ๋„ ๋‚˜์˜ฌ ๊ฒฝ์šฐ ์ปค๋ฐ‹ํ•œ๋‹ค.
MANUAL Acknowledgement.acknowledge() ๋ฉ”์„œ๋“œ๊ฐ€ ํ˜ธ์ถœ๋˜๋ฉด ๋‹ค์Œ๋ฒˆ poll()๋•Œ ์ปค๋ฐ‹ํ•œ๋‹ค.
๋งค๋ฒˆ acknowledge() ๋ฉ”์„œ๋“œ๋ฅผ ํ˜ธ์ถœํ•˜๋ฉด BATCH ์˜ต์…˜๊ณผ ๋™์ผํ•˜๊ฒŒ ๋™์ž‘ํ•œ๋‹ค.
์ด ์˜ต์…˜์„ ์‚ฌ์šฉํ•  ๊ฒฝ์šฐ AcknowledgingMessageListener ๋˜๋Š” BatchAcknowledgingMessageListener๋ฅผ ๋ฆฌ์Šค๋„ˆ๋กœ ์‚ฌ์šฉํ•ด์•ผ ํ•œ๋‹ค.
MANUAL_IMMEDIATE Acknowledgement.acknowledge() ๋ฉ”์„œ๋“œ๋ฅผ ํ˜ธ์ถœํ•œ ์ฆ‰์‹œ ์ปค๋ฐ‹ํ•œ๋‹ค.
์ด ์˜ต์…˜์„ ์‚ฌ์šฉํ•  ๊ฒฝ์šฐ AcknowledgingMessageListener ๋˜๋Š” BatchAcknowledgingMessageListener๋ฅผ ๋ฆฌ์Šค๋„ˆ๋กœ ์‚ฌ์šฉํ•ด์•ผ ํ•œ๋‹ค.

Ackmode์˜ default๋Š” BATCH์ด๊ธฐ ๋•Œ๋ฌธ์—, enable.auto.commit=false๋กœ ์ง€์ •ํ•˜๋”๋ผ๋„ Listener ๋ฉ”์„œ๋“œ๊ฐ€ ์ข…๋ฃŒ๋  ๋•Œ ์Šคํ”„๋ง์—์„œ ์ž๋™์œผ๋กœ ์ปค๋ฐ‹์ด ์ˆ˜ํ–‰ํ•˜์—ฌ ์ปค๋ฐ‹์ด ์ผ์–ด๋‚˜๊ฒŒ ๋ฉ๋‹ˆ๋‹ค.

๋”ฐ๋ผ์„œ MANUAL ํ˜น์€ MANUAL_IMMEDIATE๋กœ ์ง€์ •ํ•ด์•ผ์ง€๋งŒ ์›๋ž˜ ๊ฐ€์ •ํ–ˆ๋˜ ๊ฒƒ์ฒ˜๋Ÿผ ๋ฆฌ์Šค๋„ˆ์—์„œ ํ† ํ”ฝ์„ ์†Œ๋น„ํ•˜์ง€๋งŒ offset commit์€ ์ผ์–ด๋‚˜์ง€ ์•Š๊ณ , Lag๊ฐ€ ์Œ“์ด๊ฒŒ ๋ฉ๋‹ˆ๋‹ค. ๊ทธ๋ฆฌ๊ณ  ๋‹ค์Œ๊ณผ ๊ฐ™์ด Acknowledgement.acknowledge()๋ฅผ ํ˜ธ์ถœํ•˜์—ฌ ์ปค๋ฐ‹ ์‹œ์ ์„ ์ง์ ‘์ ์œผ๋กœ ์กฐ์ ˆํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

@KafkaListener(topics = "test", groupId = "test")
public void listen(ConsumerRecord<?, ?> record, Acknowledgment acknowledgment) {
    System.out.println("Received Message in group 'test': " + record.value());
    // ๋ฉ”์‹œ์ง€ ์ฒ˜๋ฆฌ ๋กœ์ง
    // ...
    // ์ฒ˜๋ฆฌ๊ฐ€ ์™„๋ฃŒ๋œ ํ›„์— Acknowledgment๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ์ปค๋ฐ‹
    acknowledgment.acknowledge();
}

 
 

์ฐธ๊ณ 

๋Œ“๊ธ€