[Kafka] νλ‘λμ λ©±λ±μ± 보μ₯νκΈ°
κ°μ
λ©±λ±μ±μ λμΌν μμ μ μ¬λ¬ λ² μννλλΌλ λμΌν κ²°κ³Όκ° λνλλ νΉμ±μ μλ―Έν©λλ€. λ°λΌμ λ©±λ±μ±μ μ§λ νλ‘λμλ κ°μ λ°μ΄ν°λ₯Ό μ¬λ¬ λ² μ μ‘νλλΌλ ν΄λΉ λ°μ΄ν°κ° μΉ΄νμΉ΄ ν΄λ¬μ€ν°μ λ¨ ν λ²λ§ μ μ₯λλλ‘ λ³΄μ₯ν©λλ€.
κ·Έλ λ€λ©΄ νλ‘λμμ μ΄λ»κ² λ©±λ±μ±μ μ μ©νκ³ , μ΄λ₯Ό ν΅ν΄ λ°μ΄ν°μ μ€λ³΅ μ μ₯ μμ΄ μ νν ν λ²λ§ μ μ₯λλλ‘ λ³΄μ₯ν μ μμκΉμ? μ΄λ² κΈμμλ νλ‘λμμ λ©±λ±μ±μ 보μ₯νλλ‘ μ μ©νλ λ°©λ²μ λν΄ μμΈνκ² μμ보λλ‘ νκ² μ΅λλ€.
λ©μμ§ μ μ‘ λ°©μ
Producerμ Brokerμ μ€μ λ° κ΅¬μ±μ λ°λΌ Kafkaμμλ λ©μμ§ μ λ¬ λ³΄μ₯ μμ€μ μλμ κ°μ΄ μΈ κ°μ§ λ°©μμΌλ‘ ꡬλΆν©λλ€.
- μ΅λ ν λ² (At most once): λ©μμ§κ° ν λ²λ§ μ μ‘λλ©°, μ¬μ μ‘μ λ°μνμ§ μλλ€. κ·Έλ¬λ μ΄λ‘ μΈν΄ λ©μμ§κ° μ μ€λ μ μλ κ°λ₯μ±μ΄ μλ€.
- μ΅μ ν λ² (At least once): λ©μμ§κ° λ°λμ μ μ‘λλ©°, μ΅μν ν λ² μ΄μμ μ μ‘μ΄ λ³΄μ₯λλ€. κ·Έλ¬λ μ΄ κ³Όμ μμ λ©μμ§κ° μ€λ³΅μΌλ‘ μ μ‘λ μ μλ€.
- μ νν ν λ² (Exactly once): λ©μμ§κ° μ νν ν λ²λ§ μ μ‘λλ©°, μ¬μ μ‘μ μΌμ΄λμ§ μλλ€. λ°λΌμ, λ©μμ§κ° μ μ€λμ§ μλ κ²μ΄ 보μ₯λλ€.
κ°κ°μ μ²λ¦¬ λ°©μμ λν΄ νλ‘λμμ κ΄μ μμ μ‘°κΈ λ μμΈν μ΄ν΄λ³΄λλ‘ νκ² μ΅λλ€.
At most once(μ΅λ νλ²)
'μ΅λ ν λ²' μ λ΅μμλ λ©μμ§κ° λ¨ ν λ²λ§ μ μ‘λ©λλ€. μ΄λ λ©μμ§μ λν μλ΅ νμΈ(Ack)μ λ°μ§ λͺ»νλλΌλ μ¬μ μ‘μ νμ§ μμμ μλ―Ένλ©°, μ΄λ₯Ό ν΅ν΄ μ€λ³΅ μ μ‘μ λ°©μ§ν©λλ€. κ·Έλ¬λ μ΄ μ λ΅μ νκ³μ μΌλ‘λ λ€νΈμν¬ λ¬Έμ λ±μΌλ‘ μΈν΄ λ©μμ§λ₯Ό μ λλ‘ λ°μ§ λͺ»νλ κ²½μ°, ν΄λΉ λ©μμ§κ° μ μ€λ μ μλ€λ μ μ΄ μμ΅λλ€.
μ΄ μ λ΅μ λ©μμ§μ μμ€ κ°λ₯μ±μ κ°μνλ©΄μλ μ€λ³΅ μ μ‘μ λ°©μ§νλ μ κ·Ό λ°©μμ΄λ©°, λμ μ²λ¦¬λκ³Ό 짧μ λκΈ° μκ°μ μ½κ² λ¬μ±ν μ μμ΅λλ€. λ°λΌμ λλμ λ‘κ·Έ μμ§μ΄λ IoT λ±μ νκ²½μμ μ μ©νκ² μ¬μ©λλ μ λ΅μ λλ€.
At least once(μ΅μ ν λ²)
'μ΅μ ν λ²' λ°©μμμλ λ©μμ§μ μ€λ³΅μ νμ©νμ§λ§, λ©μμ§μ μμ€μ νμ©νμ§ μμ΅λλ€. μ΄ λ°©μμμ νλ‘λμλ λΈλ‘컀λ‘λΆν° μλ΅ νμΈ(Ack)μ λ°μ§ λͺ»νμ λ λΈλ‘μ»€κ° λ©μμ§λ₯Ό μ μ₯μ νμ§λ§ μλ΅λ§ μ μ‘νμ§ λͺ»νλμ§, μλλ©΄ λ©μμ§ μ μ₯κ³Ό μλ΅ μ μ‘ λͺ¨λ μ€ν¨νλμ§λ₯Ό νλ¨ν μ μμ΅λλ€.
λ°λΌμ, λΈλ‘μ»€κ° λ©μμ§λ₯Ό μ μ₯νμΌλ μλ΅λ§ μ μ‘νμ§ λͺ»ν κ²½μ°, νλ‘λμλ μλ΅μ λ°μ§ λͺ»νκΈ° λλ¬Έμ κ°μ λ©μμ§λ₯Ό μ¬μ μ‘νκ² λ©λλ€. μ΄λ‘ μΈν΄ μ€λ³΅λ λ©μμ§κ° λΈλ‘컀μ μ μ₯λ μ μμ΅λλ€. μ¦, μ΄ μ λ΅μ λ©μμ§μ μμ€μ λ°©μ§νκΈ° μν΄ μ€λ³΅ μ μ₯μ νμ©νλ μ κ·Ό λ°©μμΌλ‘ λ°μ΄ν°μ μμ μ±μ μ΅μ°μ μΌλ‘ λλ κ²½μ°μ λ§€μ° μ μ©ν©λλ€.
Exactly once(μ νν ν λ²)
'μ νν ν λ²' λ°©μμ λ©±λ±μ±(idempotence) μμΉμ κ°μ₯ μ λ°λ₯΄λ λ°©μμ λλ€. μ΄ λ°©μμμλ λ©μμ§κ° λ± ν λ²λ§ μ λ¬λλ©°, μ΄λ ν λ©μμ§λ μμ€λμ§ μμμΌ ν©λλ€. νλ‘λμ IDμ μνμ€ λ²νΈλ₯Ό νμ©ν΄ μλ΅ νμΈ(Ack)μ λ°κ³ λ©μμ§λ₯Ό μ μ‘νλ κ³Όμ μ΄ νμμ μ λλ€. νμ§λ§ μ΄λ° νΉμ± λλ¬Έμ λ€λ₯Έ λ°©μμ λΉν΄ μλ΅ λκΈ° μκ°μ΄ κΈΈμ΄μ§κ³ , μ²λ¦¬λμ΄ μλμ μΌλ‘ λ¨μ΄μ§λλ€.
μ΄ λ°©μμ λ©μμ§κ° μ μ€ μμ΄ μ νν ν λ²λ§ μ μ‘λλ κ²μ 보μ₯ν΄μΌ νλ μν©μμ μ¬μ©λ©λλ€.
Producer Acknowledgements(acks)
Apache Kafkaμ Producer Acknowledgements (acks) μ€μ μ νλ‘λμκ° λΈλ‘컀μκ² λ©μμ§λ₯Ό λ³΄λΈ ν λ°λ μλ΅μ μμ€μ κ²°μ ν©λλ€.
νλ‘λμμ νμΈ μλ΅(acks) μ€μ μλ 3κ°μ§μ μ΅μ μ΄ μ‘΄μ¬ν©λλ€.
- acks=0: μ΄ μ€μ μμλ νλ‘λμκ° λΈλ‘컀μκ² λ©μμ§λ₯Ό 보λ΄κ³ , μ΄λ ν μλ΅λ κΈ°λ€λ¦¬μ§ μμ΅λλ€. μ΄λ κ°μ₯ λΉ λ₯Έ μ μ‘ μλλ₯Ό μ 곡νμ§λ§, λ©μμ§κ° μ€μ λ‘ λΈλ‘컀μ λλ¬νλμ§ νμΈν μ μμ΄ λ°μ΄ν° μμ€μ μνμ΄ κ°μ₯ λμ΅λλ€.
- acks=1: νλ‘λμλ 리λ λΈλ‘μ»€κ° λ©μμ§λ₯Ό λ°μμ λλ§ μλ΅μ λ°μ΅λλ€. μ΄ μ€μ μ λ©μμ§κ° μ΅μ ν κ³³μ λΈλ‘컀μ μ μ₯λμμμ 보μ₯νμ§λ§, μ΄ λΈλ‘μ»€κ° μ₯μ κ° λ°μνλ©΄ λ©μμ§κ° μμ€λ μ μμ΅λλ€. μ΄ μ΅μ μ μ μ‘ μλμ μ λ’°μ± μ¬μ΄μμ κ· νμ μ 곡ν©λλ€.
- acks=all λλ acks=-1: μ΄ μ€μ μ νλ‘λμκ° λ¦¬λ λΈλ‘컀 λΏλ§ μλλΌ λͺ¨λ ISR(In-Sync Replicas)μ μν΄ λ©μμ§κ° μ²λ¦¬λμλ€λ νμΈμ λ°μ λκΉμ§ κΈ°λ€λ¦½λλ€. μ΄λ λ°μ΄ν° μ λ’°μ±μ μ΅λννμ§λ§, μ μ‘ μλκ° λλ €μ§ μ μμ΅λλ€. μ΄ μ΅μ μ λͺ¨λ λ νλ¦¬μΉ΄κ° λκΈ°νλμ΄ λ°μ΄ν°μ λ΄κ΅¬μ±μ΄ κ°μ₯ μ€μν κ²½μ°μ μ¬μ©λ©λλ€.
μ‘°κΈ λ μμΈν μμλ³΄κ² μ΅λλ€.
acks=0
acks=0 μ€μ μ 'μ΅λ νλ²(At most once)' μ μ‘ μ λ΅μ ν΄λΉν©λλ€.
μ΄ μ€μ μ μ¬μ©νλ κ²½μ°, νλ‘λμλ λ©μμ§λ₯Ό μ μ‘ν μκ°μ λ©μμ§κ° μ±κ³΅μ μΌλ‘ μμ±λ κ²μΌλ‘ κ°μ£Όν©λλ€. μ¦, λΈλ‘컀λ‘λΆν°μ μλ΅μ κΈ°λ€λ¦¬μ§ μμ΅λλ€.
μ΄λ° μν©μμ, λ§μ½ λΈλ‘μ»€κ° μ€νλΌμΈ μνκ° λκ±°λ μμΈκ° λ°μνμ¬ λ°μ΄ν° μ°κΈ° μμ μ΄ μ€λ¨λλλΌλ νλ‘λμλ μ΄λ₯Ό μ μ μμ΅λλ€. κ²°κ³Όμ μΌλ‘ μ΄λ° κ²½μ°μλ λ°μ΄ν°κ° μμ€λ μ μκΈ°μ, acks=0 μ€μ μ λ©μμ§μ μ μ€μ΄ νμ©λλ μν©μμ μ μ©νκ² μ¬μ©λ©λλ€.
acks=1
acks=1 μ€μ μ 'μ΅μ νλ²(At least once)' μ μ‘ μ λ΅μ ν΄λΉλλ©°, μ΄ μ€μ μ μ¬μ©νλ©΄ νλ‘λμλ 리λ λΈλ‘컀λ‘λΆν° νμΈ μλ΅μ λ°λ μμ μ λ©μμ§ μ°κΈ°κ° μ±κ³΅νλ€κ³ νλ¨ν©λλ€.(μΉ΄νμΉ΄ v1.0λΆν° v2.8κΉμ§ default μ€μ μ λλ€.)
μ£Όμν μ μ acks=1 μ€μ μμλ νλ‘λμκ° μ€μ§ 리λκ° λ°μ΄ν°λ₯Ό μ μ₯ν μμ μλ§ μλ΅μ λ°κΈ° λλ¬Έμ, 리λ μ΄μΈμ λ ν리카λ€μ΄ λ°μ΄ν°λ₯Ό μ±κ³΅μ μΌλ‘ 볡μ νλμ§λ νμΈνμ§ λͺ»ν©λλ€. λ°λΌμ 리λ λΈλ‘μ»€κ° κ°μκΈ° μ€νλΌμΈ μνκ° λκ³ , λ ν리카λ€μ΄ μμ§ λ°μ΄ν°λ₯Ό 볡μ νμ§ λͺ»ν κ²½μ°μλ λ°μ΄ν° μμ€μ΄ λ°μν μ μμ΅λλ€.
λ§μ½ λΈλ‘μ»€κ° νμΈ μμ²μ λ°μ§ λͺ»νλ€λ©΄ νλ‘λμλ λ°μ΄ν° μ°κΈ°λ₯Ό μ±κ³΅μ μΌλ‘ μ²λ¦¬νκΈ° μν΄ μμ²μ μ¬μλν μ μμ΅λλ€. κ²°κ΅ acks=1 μ€μ μ μ¬μ©νλ©΄ μ¬μλλ₯Ό ν΅ν΄ μ€λ³΅ μ μ‘μ νμ©νμ§λ§, νλ‘λμκ° λ¦¬λλ₯Ό ν΅ν΄ λ°μ΄ν° μ°κΈ°κ° μ±κ³΅νλμ§ νμΈν μ μκΈ° λλ¬Έμ λ°μ΄ν°μ μμ μ±μ acks=0μ λΉν΄ λμμ§κ² λ©λλ€.
acks=all or -1
acks=all λλ acks=-1 μ€μ μ 'μ νν ν λ²(Exactly once)' μ μ‘ μ λ΅μ ν΄λΉν©λλ€. μ΄ μ€μ μ μ¬μ©νλ©΄, νλ‘λμλ 리λ λΈλ‘컀λ₯Ό ν¬ν¨ν λͺ¨λ λ ν리카 λΈλ‘컀λ€λ‘λΆν° νμΈ μλ΅μ λ°μ μμ μ λ©μμ§ μ°κΈ°κ° μ±κ³΅νλ€κ³ νλ¨ν©λλ€.(μΉ΄νμΉ΄ v3.0λΆν° default μ€μ μ λλ€.)
μ΄ λ°©μμ acks=1 μ€μ μ λΉν΄ λ λμ λ°μ΄ν° μμ μ±μ μ 곡νλλ°, νλ‘λμλ λͺ¨λ λ ν리카 λΈλ‘컀λ€μ΄ λ°μ΄ν°λ₯Ό μ±κ³΅μ μΌλ‘ μ μ₯νλ€λ νμΈμ λ°κΈ° λλ¬Έμ μ΄λ ν λΈλ‘컀μ λ¬Έμ κ° λ°μνλλΌλ λ°μ΄ν° μμ€ μνμ΄ ν¬κ² μ€μ΄λλλ€. νμ§λ§ λͺ¨λ λ ν리카 λΈλ‘컀λ€λ‘λΆν° μλ΅μ λ°μμΌ νλ―λ‘, λ€νΈμν¬ μ§μ°μ΄λ λΈλ‘컀μ λΆν λ±μΌλ‘ μΈν΄ μ 체 λ©μμ§ μ μ‘ μκ°μ΄ λμ΄λ μ μμ΅λλ€.
νμ§λ§ μμ§κΉμ§λ μ€λ³΅ μ μ‘μ΄ κ°λ₯νλ€λ λ¬Έμ κ° μ‘΄μ¬νλλ°, λ€μκ³Ό κ°μ μν©μ λλ€.
μ μ΄λ―Έμ§λ νλ‘λμκ° λΈλ‘컀μκ² λ©μμ§λ₯Ό 보λ΄κ³ , λΈλ‘μ»€κ° νλ‘λμμκ² ackλ₯Ό 보λ΄λ κ³Όμ μ λλ€. νμ¬κΉμ§λ λͺ¨λ μ μμ μ λλ€. νμ§λ§ νλ‘λμκ° λΈλ‘컀μκ² λ³΄λΈ λ©μμ§μ λν μλ΅μ΄ λ€νΈμν¬ μ§μ° λ±μ μ΄μ λ‘ νλ‘λμμκ² λμμ€μ§ μλ μν©μ κ°μ ν΄ λ³΄κ² μ΅λλ€.
μ μν©μμλ 'y'λΌλ λ©μμ§λ₯Ό νλ‘λμκ° λΈλ‘컀μκ² λ³΄λλλ°, μ΄ λ©μμ§λ λΈλ‘컀μ μν΄ μ μμ μΌλ‘ νν°μ μ μ μ₯λμμ§λ§ ackκ° νλ‘λμμκ² λμ°©νμ§ λͺ»νμ΅λλ€. μ΄λ¬ν κ²½μ° νλ‘λμλ μ€λ₯λ‘ νλ¨νκ³ λμΌν λ©μμ§μΈ 'y'λ₯Ό λ€μ 보λ΄κ² λ©λλ€. κ·Έ κ²°κ³Ό λΈλ‘컀μλ 'y'λ©μμ§κ° μ€λ³΅μΌλ‘ μ μ₯λ©λλ€.
λ°λΌμ λ©±λ±μ± μλ 'μ νν ν λ²(Exactly once)'μ ꡬννκΈ° μν΄μλ acks μ€μ λΏλ§ μλλΌ idempotent μ€μ κ·Έλ¦¬κ³ Brockerμ max.in.flight.requests.per.connection μ€μ , min.insync.replicas μ€μ , retriesλ κ°μ΄ λμ΄μΌ ν©λλ€. μ΄ λΆλΆμ μλμμ μ’ λ μμλ³΄κ² μ΅λλ€.
λ©±λ±μ± 보μ₯ μ€μ
λ©±λ±μ±(idempotence)μ΄λ νλ‘λμκ° λ μ½λλ₯Ό μ μ‘νμ§λ§, λ€νΈμν¬ μ΄μ λ±μΌλ‘ μΈν΄ ackλ₯Ό λ°μ§ λͺ»ν κ²½μ°μλ μ€λ³΅ λ©μμ§μ μ λ¬ λ° μ μ₯μ λ°©μ§νλ κ²μ μλ―Έν©λλ€. μ΄λ₯Ό μν΄ μΉ΄νμΉ΄λ νλ‘λμ ID(PID)μ μνμ€ λ²νΈλ₯Ό κΈ°λ°μΌλ‘ λ©μμ§λ₯Ό μΆμ νκ³ , λμΌν 컀λ°μ λν΄ μ€λ³΅ μ²λ¦¬λ₯Ό λ°©μ§ν©λλ€.
λ©±λ±μ±μ 보μ₯νκΈ° μν μ€μ μ λ€μκ³Ό κ°μ΅λλ€.
- acks: νλ‘λμκ° μμ²μ 보λ΄κ³ 리λκ° λ ν리카μ μμ μ νμΈν΄μΌ νλ κ°μλ₯Ό κ²°μ . 'all' λλ '-1'λ‘ μ€μ νλ€.
- enable.idempotence: νλ‘λμκ° λ μ½λ μ°κΈ° μμ μ λ¨ ν λ²λ§ νμ©ν κ²μΈμ§λ₯Ό κ²°μ . 'true'λ‘ μ€μ ν΄μΌ λ©±λ±μ±μ 보μ₯νλ€.
- max.in.flight.requests.per.connection: ν λ²μ λͺ κ°μ μμ²μ μ μ‘ν κ²μΈμ§λ₯Ό κ²°μ . μ΄ κ°μ 1 μ΄μ 5 μ΄νλ‘ μ€μ ν΄μΌ νλ€.
- retries: λ©μμ§λ₯Ό μ μ‘νκΈ° μν΄ μ¬μλλλ νμλ₯Ό κ²°μ . μ΄ κ°μ 0 μ΄μμΌλ‘ μ€μ ν΄μΌ νλ€.
- min.insync.replicas: λκΈ°ννλ λ ν리카μ μλ₯Ό κ²°μ . acks=allμΌ κ²½μ° λ³΄ν΅ 2λ‘ μ€μ νλ€(λΈλ‘컀 μ€μ μ΄λ€).
μμ λͺ¨λ μ€μ κ°μ μ μ©νμ§ μμΌλ©΄ ConfigExceptionμ΄ λ°μνλ―λ‘ μ£Όμκ° νμν©λλ€.(μ€μ κ°μ νμΈνλ λΆλΆ)
enable.idempotence
μΉ΄νμΉ΄ v3.0 λΆν°λ acks=all(-1)κ³Ό enable.idempotence=trueκ° defaultλ‘ μ μ©λμμ΅λλ€ κ·Έλ¬λ v2.8 μ΄ν λ²μ μμλ acks=-1κ³Ό enable.idempotence=falseκ° default μ€μ μ΄λ―λ‘, μ΄λ¬ν λ²μ μμλ λ³λλ‘ μ€μ μ λ³κ²½ν΄μ€μΌ ν©λλ€.
enable.idempotence=trueλ‘ μ€μ νλ©΄, μ μ΄λ―Έμ§μ²λΌ κ° νλ‘λμμλ κ³ μ ν νλ‘λμ ID(PID)κ° ν λΉλ©λλ€. νλ‘λμλ λ©μμ§λ₯Ό λΈλ‘컀μκ² λ³΄λΌ λλ§λ€ μ΄ PIDλ₯Ό ν¬ν¨νλ©°, κ° λ©μμ§λ μμ°¨μ μΌλ‘ μ¦κ°νλ μνμ€ λ²νΈλ₯Ό λ°μ΅λλ€. νλ‘λμκ° λ©μμ§λ₯Ό 보λ΄λ κ° ν ν½ νν°μ λ§λ€ λ³λμ μνμ€κ° μ μ§λκ³ , λΈλ‘컀λ νν°μ λ³λ‘ μ±κ³΅μ μΌλ‘ μ²λ¦¬λ PID-μνμ€ λ²νΈ μ‘°ν© μ€ κ°μ₯ ν° κ°μ μΆμ ν©λλ€.
μ μ΄λ―Έμ§λ₯Ό 보면 λΈλ‘컀λ νλ‘λμμ μμ²μ΄ PID/ν ν½νν°μ μμμ λ§μ§λ§μΌλ‘ 컀λ°λ λ©μμ§λ³΄λ€ μνμ€ λ²νΈκ° μ νν 1λ§νΌ ν¬μ§ μμ κ²½μ°, νλ‘λμμ μμ²μ κ±°λΆνλ κ²μ λ³Ό μ μμ΅λλ€. μ΄λ₯Ό ν΅ν΄ νλ‘λμλ μ€ν¨μ λ°λ₯Έ μμ² μ¬μλλ₯Ό ν μ μμ§λ§, λͺ¨λ λ©μμ§λ λ‘κ·Έμ μ νν ν λ²λ§ κΈ°λ‘λ©λλ€. νμ§λ§ μλ‘μ΄ νλ‘λμ μΈμ€ν΄μ€μλ μλ‘μ΄ κ³ μ PIDκ° ν λΉλλ―λ‘, λ¨μΌ νλ‘λμ μΈμ λ΄μμλ§ λ©±λ±μ±μ 보μ₯ν μ μμ΅λλ€.
max.in.flight.requests.per.connection
μΉ΄νμΉ΄μμ max.in.flight.requests.per.connection μ€μ μ λ©±λ±μ±(idempotence)μ 보μ₯νλ λ° μ€μν μν μ ν©λλ€. μ΄ μ€μ μ ν λ²μ μ°κ²°μμ λμμ μ²λ¦¬ν μ μλ μ΅λ μμ² μλ₯Ό μλ―Έν©λλ€. μΉ΄νμΉ΄ v0.11 λ²μ μ΄μμμλ λ©±λ±μ±μ΄ νμ±νλ νλ‘λμμ κ²½μ°, μ΄ κ°μ 5 μ΄νλ‘ μ€μ νλ κ²μ΄ κΆμ₯λλλ° κ·Έ μ΄μ λ λ€μκ³Ό κ°μ΅λλ€.
- λ©μμ§ μμ 보μ₯: μΉ΄νμΉ΄μ λ©±λ± νλ‘λμλ κ°μ νν°μ μ λν΄ λ©μμ§μ μμλ₯Ό 보μ₯ν΄μΌ ν©λλ€. max.in.flight.requests.per.connection κ°μ΄ 5 μ΄νμΌ λ, νλ‘λμλ ν λ©μμ§κ° μ€ν¨νλ©΄ κ·Έ νμ λ©μμ§λ€μ μλ²μ μ μ‘λμ§ μμ΅λλ€. μ΄λ¬ν λμμ λ©μμ§μ μμκ° λ³κ²½λμ§ μλλ‘ λ³΄μ₯νλ λ° μ€μν©λλ€. λ§μ½ μ΄ κ°μ΄ 5λ³΄λ€ ν¬λ€λ©΄, νλ‘λμλ μ¬μλνλ λμ λ€λ₯Έ λ©μμ§λ€μ μ μ‘ν μ μμ΄, λ©μμ§ μμκ° λ€λ°λ μνμ΄ μμ΅λλ€.
- μ€λ³΅ λ©μμ§ λ°©μ§: λ©±λ±μ± νλ‘λμλ λ©μμ§μ μ€λ³΅ μ μ‘μ λ°©μ§ν©λλ€. max.in.flight.requests.per.connection κ°μ΄ λ무 ν¬λ©΄ λ€νΈμν¬ μ§μ° λ±μ λ¬Έμ λ‘ μΈν΄ μ€λ³΅ λ©μμ§κ° λ°μν κ°λ₯μ±μ΄ μ¦κ°ν©λλ€. λ°λΌμ μ΄ κ°μ 5 μ΄νλ‘ μ€μ νμ¬ μ΄λ¬ν λ¬Έμ λ₯Ό μ€μΌ μ μμ΅λλ€.
μ½λ
μ€μ μ½λμμλ λ€μκ³Ό κ°μ΄ νλ‘λμμ μμ±μ μ€μ νμ¬ λ©±λ±μ± κΈ°λ₯μ νμ±νν μ μμ΅λλ€.
Properties properties = new Properties();
// λ©±λ±μ± νμ±ν μ€μ
properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
enable.idempotenceλ₯Ό trueλ‘ μ€μ νλ©΄ λ°μ΄ν°κ° μ νν ν λ²λ§ μ μ¬λλλ‘ νκΈ° μν΄ νλ‘λμμ λͺ κ°μ§ μ΅μ μ΄ μλμΌλ‘ μ€μ λ©λλ€. λνμ μΌλ‘ λ°μ΄ν° μ¬μ μ‘ νμλ₯Ό κ²°μ νλ retriesλ κΈ°λ³Έκ°μΈ Integer.MAX_VALUEλ‘ μ€μ λκ³ , acks μ΅μ μ allλ‘ μ€μ λ©λλ€.
λ©±λ±μ± νλ‘λμμ νκ³
λ©±λ±μ± νλ‘λμλ λμΌν μΈμ λ΄μμλ§ μ νν ν λ²μ μ λ¬μ 보μ₯ν©λλ€. μ¬κΈ°μ 'λμΌν μΈμ 'μ΄λ, PID(Producer ID)μ μλͺ μ£ΌκΈ°λ₯Ό μλ―Έν©λλ€. λ§μ½ λ©±λ±μ± νλ‘λμλ‘ μλνλ νλ‘λμ μ ν리μΌμ΄μ μ λ¬Έμ κ° λ°μν΄ μ’ λ£λκ³ λ€μ μμνλ©΄ PIDκ° λ³κ²½λ©λλ€.
λμΌν λ°μ΄ν°λ₯Ό μ μ‘νλλΌλ, PIDκ° λ°λλ©΄ λΈλ‘컀λ λ€λ₯Έ νλ‘λμ μ ν리μΌμ΄μ μ΄ λ€λ₯Έ λ°μ΄ν°λ₯Ό 보λλ€κ³ νλ¨ν©λλ€. λ°λΌμ λ©±λ±μ± νλ‘λμλ μ₯μ κ° λ°μνμ§ μλ μν©μμλ§ λ°μ΄ν°λ₯Ό μ νν ν λ² μ μ¬νλ κ²μ 보μ₯νλ€λ μ μ λͺ μ¬ν΄μΌ ν©λλ€.
λ©±λ±μ± νλ‘λμ μ¬μ© μ μ€λ₯ νμΈ
λ©±λ±μ± νλ‘λμλ μνμ€ λ²νΈλ₯Ό 0λΆν° μμνμ¬, λ°μ΄ν°λ₯Ό μ μ‘ν λλ§λ€ μ΄ λ²νΈλ₯Ό 1μ© μ¦κ°μν΅λλ€. λΈλ‘컀λ λ©±λ±μ± νλ‘λμκ° μ μ‘ν λ°μ΄ν°μ PIDμ μνμ€ λ²νΈλ₯Ό νμΈνλ κ³Όμ μμ, μνμ€ λ²νΈκ° μΌκ΄μ±μ μ μ§νμ§ μλ κ²½μ°μλ OutOfOrderSequenceExceptionμ λ°μμν¬ μ μμ΅λλ€.
μ΄ μμΈλ λΈλ‘μ»€κ° μμνλ μνμ€ λ²νΈμ μ€μ λ‘ λ°μ λ°μ΄ν°μ μνμ€ λ²νΈκ° μΌμΉνμ§ μμ λ λ°μν©λλ€. OutOfOrderSequenceExceptionμ΄ λ°μνλ©΄, μνμ€ λ²νΈμ μμκ° λ€λ°λ μ μμΌλ―λ‘, μμκ° μ€μν λ°μ΄ν°λ₯Ό μ μ‘νλ νλ‘λμλ μ΄ μμΈκ° λ°μνμ λ μ μ ν λμν μ μλ λ°©μμ κ³ λ €ν΄μΌ ν©λλ€.